In [60]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/harry-potter-books/03 Harry Potter and the Prisoner of Azkaban.txt
/kaggle/input/harry-potter-books/06 Harry Potter and the Half-Blood Prince.txt
/kaggle/input/harry-potter-books/05 Harry Potter and the Order of the Phoenix.txt
/kaggle/input/harry-potter-books/02 Harry Potter and the Chamber of Secrets.txt
/kaggle/input/harry-potter-books/07 Harry Potter and the Deathly Hallows.txt
/kaggle/input/harry-potter-books/01 Harry Potter and the Sorcerers Stone.txt
/kaggle/input/harry-potter-books/04 Harry Potter and the Goblet of Fire.txt


In [80]:
#Preprocessing 
import torch
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

FILE_PATH = "/kaggle/input/harry-potter-books/" 

books_text = []
folder_path = FILE_PATH

# Load the text data
for file in os.listdir(folder_path):
    with open(os.path.join(folder_path, file), 'r', encoding='utf-8') as f:
        books_text.append(f.read())
print(books_text[0][:50])
all_text = " ".join(books_text)
#print(all_text[:100])
all_text = all_text[:500]


tokenizer = get_tokenizer('basic_english')
tokens = tokenizer(all_text)
#print(tokens[4000:4500])


vocabulary = build_vocab_from_iterator([tokens], specials=['<unk>', '<pad>', '<bos>', '<eos>'])
vocabulary.set_default_index(vocabulary['<unk>'])
print(len(vocabulary))




numericalized_data = torch.tensor(vocabulary(tokens), dtype=torch.long)
print(numericalized_data.shape)




Harry Potter was a highly unusual boy in many ways
77
torch.Size([112])


In [81]:
#DataLoader
from torch.utils.data import DataLoader, TensorDataset


SEQUENCE_LENGTH = 10
sequence_length = SEQUENCE_LENGTH
sequences = [numericalized_data[i:i+sequence_length+1] for i in range(len(numericalized_data)-sequence_length)]

# Create a DataLoader
dataset = TensorDataset(torch.stack(sequences))
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

In [82]:
#Model Architecture

import torch.nn as nn
import math

class TransformerModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_heads, num_layers, hidden_dim, dropout):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.pos_encoder = PositionalEncoding(embed_dim, dropout)
        
        decoder_layer = nn.TransformerDecoderLayer(embed_dim, num_heads, hidden_dim, dropout)
        self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers)
        
        self.fc = nn.Linear(embed_dim, vocab_size)
        self.embed_dim = embed_dim

    def forward(self, src, tgt, tgt_mask, memory_mask):
        src = self.embedding(src) * math.sqrt(self.embed_dim)
        src = self.pos_encoder(src)
        
        tgt = self.embedding(tgt) * math.sqrt(self.embed_dim)
        tgt = self.pos_encoder(tgt)
        
        memory = self.transformer_decoder(tgt, src, tgt_mask=tgt_mask, memory_mask=memory_mask)
        
        output = self.fc(memory)
        return output

class PositionalEncoding(nn.Module):
    def __init__(self, embed_dim, dropout, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, embed_dim)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embed_dim, 2).float() * (-math.log(10000.0) / embed_dim))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

def generate_square_subsequent_mask(sz):
    mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask



In [93]:
import torch.optim as optim


VOCAB_SIZE = len(vocabulary)
EMBED_DIM = 512
NUM_HEADS = 8
NUM_LAYERS = 6
HIDDEN_DIM = 2048
DROPOUT = 0.0


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TransformerModel(VOCAB_SIZE, EMBED_DIM, NUM_HEADS, NUM_LAYERS, HIDDEN_DIM, DROPOUT).to(device)


optimizer = optim.Adam(model.parameters(), lr=0.001) # try AdamW?
criterion = nn.CrossEntropyLoss()







def train(model, dataloader, optimizer, criterion):
    model.train()
    total_loss = 0
    for batch in dataloader:
        optimizer.zero_grad()
        
        # Get source and target sequences
        src = batch[0][:, :-1].T.to(device)
        tgt_input = batch[0][:, :-1].T.to(device)  
        tgt_output = batch[0][:, 1:].T.to(device)  
        
        # Print shapes for debugging
        #print(f"src shape: {src.shape}")
        #print(f"tgt_input shape: {tgt_input.shape}")
        #print(f"tgt_output shape: {tgt_output.shape}")
        
        # Generate mask
        tgt_mask = generate_square_subsequent_mask(tgt_input.size(0)).to(device)
        
        # Forward pass
        output = model(src, tgt_input, tgt_mask, None)
        
        # Print output shape for debugging
        #print(f"output shape: {output.shape}")
        
        # Compute loss
        loss = criterion(output.view(-1, VOCAB_SIZE), tgt_output.reshape(-1))
        
        # Backpropagation
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    
    return total_loss / len(dataloader)

N_EPOCHS = 1000
for epoch in range(N_EPOCHS):
    train_loss = train(model, dataloader, optimizer, criterion)
    print(f'Epoch {epoch+1}, Train Loss: {train_loss:.4f}')


Epoch 1, Train Loss: 3.9686
Epoch 2, Train Loss: 3.1534
Epoch 3, Train Loss: 2.8339
Epoch 4, Train Loss: 2.6890
Epoch 5, Train Loss: 2.6826
Epoch 6, Train Loss: 2.6297
Epoch 7, Train Loss: 2.6238
Epoch 8, Train Loss: 2.5615
Epoch 9, Train Loss: 2.5638
Epoch 10, Train Loss: 2.5255
Epoch 11, Train Loss: 2.5328
Epoch 12, Train Loss: 2.4874
Epoch 13, Train Loss: 2.5182
Epoch 14, Train Loss: 2.4879
Epoch 15, Train Loss: 2.4722
Epoch 16, Train Loss: 2.4741
Epoch 17, Train Loss: 2.4669
Epoch 18, Train Loss: 2.4660
Epoch 19, Train Loss: 2.4458
Epoch 20, Train Loss: 2.4526
Epoch 21, Train Loss: 2.4389
Epoch 22, Train Loss: 2.4475
Epoch 23, Train Loss: 2.4320
Epoch 24, Train Loss: 2.4394
Epoch 25, Train Loss: 2.4187
Epoch 26, Train Loss: 2.3937
Epoch 27, Train Loss: 2.4030
Epoch 28, Train Loss: 2.4117
Epoch 29, Train Loss: 2.3839
Epoch 30, Train Loss: 2.4054
Epoch 31, Train Loss: 2.3970
Epoch 32, Train Loss: 2.3907
Epoch 33, Train Loss: 2.3985
Epoch 34, Train Loss: 2.3764
Epoch 35, Train Loss: 2

In [94]:

import torch.nn.functional as F

def generate_text(model, tokenizer, vocabulary, seed_text, max_length=50, temperature=1.0):
    model.eval()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    
    tokens = tokenizer(seed_text)
    numericalized_tokens = [vocabulary[token] for token in tokens]
    input_tensor = torch.tensor(numericalized_tokens, dtype=torch.long).unsqueeze(1).to(device)
    
    generated_tokens = numericalized_tokens
    
    with torch.no_grad():
        for _ in range(max_length):
            tgt_input = torch.tensor(generated_tokens[-30:], dtype=torch.long).unsqueeze(1).to(device)
            tgt_mask = generate_square_subsequent_mask(tgt_input.size(0)).to(device)
            
            output = model(input_tensor, tgt_input, tgt_mask, None)
            output = output[-1, 0, :] / temperature  
            probabilities = F.softmax(output, dim=-1)
            next_token = torch.multinomial(probabilities, 1).item()
            
            generated_tokens.append(next_token)
            
            if vocabulary.get_itos()[next_token] == '<eos>':
                break
    
    generated_text = " ".join([vocabulary.get_itos()[token] for token in generated_tokens])
    return generated_text

# Example usage:
seed_text = "Harry Potter was a highly unusual boy in many ways"
generated_text = generate_text(model, tokenizer, vocabulary, seed_text, max_length=50, temperature=1.0)
print(generated_text)

harry potter was a highly unusual boy in many ways many potter a a highly potter in in boy . . highly was many boy many many potter many unusual boy potter potter highly many unusual many highly many boy a unusual . unusual highly unusual in unusual potter . unusual in was many was . boy unusual was many
