In [27]:
import torch
import torch.nn as nn
import torch.optim as optim

import torchtext, datasets, math
from tqdm import tqdm

In [36]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cpu


## Task 1. Dataset Acquisition - Harry Potter

In [28]:
from datasets import load_dataset

dataset = load_dataset("KaungHtetCho/Harry_Potter_LSTM")

print(dataset)

DatasetDict({
    train: Dataset({
        features: ['text'],
        num_rows: 57435
    })
    validation: Dataset({
        features: ['text'],
        num_rows: 5897
    })
    test: Dataset({
        features: ['text'],
        num_rows: 6589
    })
})


In [29]:
print(dataset['train'].shape)

(57435, 1)


 ## Task 2. Model Training

### 1) Preprocessing the Text Data
#### a) Tokenization

In [30]:
import torchtext

tokenizer = torchtext.data.utils.get_tokenizer('basic_english')

tokenize_data = lambda example: {'tokens': tokenizer(example['text'])}

tokenized_dataset = dataset.map(tokenize_data, remove_columns=['text'])

print(tokenized_dataset['train'][0])


Map: 100%|██████████| 57435/57435 [00:03<00:00, 16922.83 examples/s]
Map: 100%|██████████| 5897/5897 [00:00<00:00, 13717.27 examples/s]
Map: 100%|██████████| 6589/6589 [00:00<00:00, 13099.43 examples/s]

{'tokens': ['harry', 'potter', 'and', 'the', 'sorcerer', "'", 's', 'stone']}





#### b) Building the Vocabulary

In [31]:
vocab = torchtext.vocab.build_vocab_from_iterator(tokenized_dataset['train']['tokens'], min_freq=3)
vocab.insert_token('<unk>', 0)
vocab.insert_token('<eos>', 1)
vocab.set_default_index(vocab['<unk>'])  # Default token for unknown words

#### c) Preparing the Data for Training

In [32]:
def get_data(dataset, vocab, batch_size):
    data = []
    for example in dataset:
        if example['tokens']:
            tokens = example['tokens'] + ['<eos>']  # Add end-of-sequence token
            tokens = [vocab[token] for token in tokens]
            data.extend(tokens)
    data = torch.LongTensor(data)
    num_batches = data.shape[0] // batch_size
    data = data[:num_batches * batch_size]
    data = data.view(batch_size, num_batches)  # [batch size, seq len]
    return data

batch_size = 128
train_data = get_data(tokenized_dataset['train'], vocab, batch_size)
valid_data = get_data(tokenized_dataset['validation'], vocab, batch_size)
test_data  = get_data(tokenized_dataset['test'], vocab, batch_size)


### 2) Model Architecture and Training Process
#### a) Model Architecture:

In [33]:
import torch
import torch.nn as nn
import math

class LSTMLanguageModel(nn.Module):
    def __init__(self, vocab_size, emb_dim, hid_dim, num_layers, dropout_rate):
        super().__init__()
        self.num_layers = num_layers
        self.hid_dim = hid_dim
        self.emb_dim = emb_dim

        self.embedding = nn.Embedding(vocab_size, emb_dim)
        self.lstm = nn.LSTM(emb_dim, hid_dim, num_layers=num_layers, dropout=dropout_rate, batch_first=True)
        self.dropout = nn.Dropout(dropout_rate)
        self.fc = nn.Linear(hid_dim, vocab_size)

        self.init_weights()

    def init_weights(self):
        init_range_emb = 0.1
        init_range_other = 1 / math.sqrt(self.hid_dim)
        self.embedding.weight.data.uniform_(-init_range_emb, init_range_other)
        self.fc.weight.data.uniform_(-init_range_other, init_range_other)
        self.fc.bias.data.zero_()
        for i in range(self.num_layers):
            self.lstm.all_weights[i][0] = torch.FloatTensor(self.emb_dim, self.hid_dim).uniform_(-init_range_other, init_range_other)
            self.lstm.all_weights[i][1] = torch.FloatTensor(self.hid_dim, self.hid_dim).uniform_(-init_range_other, init_range_other)

    def init_hidden(self, batch_size, device):
        hidden = torch.zeros(self.num_layers, batch_size, self.hid_dim).to(device)
        cell = torch.zeros(self.num_layers, batch_size, self.hid_dim).to(device)
        return hidden, cell

    def detach_hidden(self, hidden):
        hidden, cell = hidden
        hidden = hidden.detach()
        cell = cell.detach()
        return hidden, cell

    def forward(self, src, hidden):
        embedding = self.dropout(self.embedding(src))  # [batch-size, seq len, emb dim]
        output, hidden = self.lstm(embedding, hidden)  # [batch size, seq len, hid dim]
        output = self.dropout(output)
        prediction = self.fc(output)  # [batch_size, seq_len, vocab_size]
        return prediction, hidden


#### b) Training Process:

In [38]:
def get_batch(data, seq_len, idx):
    #data #[batch size, bunch of tokens]
    src    = data[:, idx:idx+seq_len]                   
    target = data[:, idx+1:idx+seq_len+1]  #target simply is ahead of src by 1            
    return src, target

In [39]:
def train(model, data, optimizer, criterion, batch_size, seq_len, clip, device):
    
    epoch_loss = 0
    model.train()
    # drop all batches that are not a multiple of seq_len
    # data #[batch size, seq len]
    num_batches = data.shape[-1]
    data = data[:, :num_batches - (num_batches -1) % seq_len]  #we need to -1 because we start at 0
    num_batches = data.shape[-1]
    
    #reset the hidden every epoch
    hidden = model.init_hidden(batch_size, device)
    
    for idx in tqdm(range(0, num_batches - 1, seq_len), desc='Training: ',leave=False):
        optimizer.zero_grad()
        
        #hidden does not need to be in the computational graph for efficiency
        hidden = model.detach_hidden(hidden)

        src, target = get_batch(data, seq_len, idx) #src, target: [batch size, seq len]
        src, target = src.to(device), target.to(device)
        batch_size = src.shape[0]
        prediction, hidden = model(src, hidden)               

        #need to reshape because criterion expects pred to be 2d and target to be 1d
        prediction = prediction.reshape(batch_size * seq_len, -1)  #prediction: [batch size * seq len, vocab size]  
        target = target.reshape(-1)
        loss = criterion(prediction, target)
        
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item() * seq_len
    return epoch_loss / num_batches

In [40]:
def evaluate(model, data, criterion, batch_size, seq_len, device):

    epoch_loss = 0
    model.eval()
    num_batches = data.shape[-1]
    data = data[:, :num_batches - (num_batches -1) % seq_len]
    num_batches = data.shape[-1]

    hidden = model.init_hidden(batch_size, device)

    with torch.no_grad():
        for idx in range(0, num_batches - 1, seq_len):
            hidden = model.detach_hidden(hidden)
            src, target = get_batch(data, seq_len, idx)
            src, target = src.to(device), target.to(device)
            batch_size= src.shape[0]

            prediction, hidden = model(src, hidden)
            prediction = prediction.reshape(batch_size * seq_len, -1)
            target = target.reshape(-1)

            loss = criterion(prediction, target)
            epoch_loss += loss.item() * seq_len
    return epoch_loss / num_batches

In [None]:
import torch.optim as optim

# Initialize model
model = LSTMLanguageModel(len(vocab), emb_dim=1024, hid_dim=1024, num_layers=2, dropout_rate=0.65).to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

# Training and evaluation loop
n_epochs = 50
seq_len = 50  # Decoding length
clip = 0.25

for epoch in range(n_epochs):
    train_loss = train(model, train_data, optimizer, criterion, batch_size, seq_len, clip, device)
    valid_loss = evaluate(model, valid_data, criterion, batch_size, seq_len, device)

    print(f'Epoch {epoch+1}:')
    print(f'\tTrain Perplexity: {math.exp(train_loss):.3f}')
    print(f'\tValid Perplexity: {math.exp(valid_loss):.3f}')


                                                           

Epoch 1:
	Train Perplexity: 516.267
	Valid Perplexity: 312.083


Training:  10%|▉         | 14/144 [02:24<23:14, 10.72s/it]