## LSTM

1. choose a dataset
2. download and check the shape of the dataset
4. clean the data if needed (hint: regex)
5. decide the tokenization strategy (word , character , sub character)
6. build the vocab
7. build a wrapper around the dataset
8. dataloader -> train[0] (input,label)
9. Build network architecture
10. Training loop
11. evaluate


In [None]:
!pip install datasets

In [None]:
!pip install --upgrade datasets fsspec

In [None]:
import re
import time
import torch
import torchvision
import numpy as np
import pandas as pd
import seaborn as sns
import torch.nn as nn
from tqdm import tqdm
import torch.nn.functional as F
from torchvision import transforms


import torch.optim as optim
import matplotlib.pyplot as plt
from collections import Counter
from matplotlib.pyplot import imshow
from sklearn.metrics import accuracy_score
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split



# For text preprocessing
import nltk
from nltk.tokenize import word_tokenize


download_result = nltk.download('punkt_tab')
print("Download success:", download_result)


# For loading datasets
from datasets import load_dataset

# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

print("Libraries imported successfully!")

### Data Loading & Pre-processing

In [None]:
dataset = load_dataset("iwslt2017", "iwslt2017-en-de", trust_remote_code=True)

train_data = dataset["train"]
test_data = dataset["test"]

In [None]:
# Display sample data
print(f"Dataset loaded! Total training examples: {len(train_data)}")
print(f'Sample (english & deutch): {train_data[0]["translation"]}')

In [None]:
def preprocess_text(text):
    """Clean and tokenize the text"""

    # Convert to lowercase
    text = text.lower()

    # Remove HTML tags
    text = re.sub(r'<.*?>', '', text)

    # Remove special characters and digits
    text = re.sub(r'[^a-zA-Z\s]', '', text)

    # Tokenize
    tokens = word_tokenize(text)

    return tokens



# Process training data
train_text = []
train_labels = []


for sample in tqdm(train_data):

    # Preprocess Targets
    target_tokens = preprocess_text(sample['translation']['de'])
    train_labels.append(target_tokens)

    # Preprocess text
    tokens = preprocess_text(sample['translation']['en'])
    train_text.append(tokens)



# Process test data
test_text = []
test_labels = []

for sample in tqdm(test_data):

    target_tokens = preprocess_text(sample['translation']['de'])
    test_labels.append(target_tokens)

    tokens = preprocess_text(sample['translation']['en'])
    test_text.append(tokens)


print(f"Data preprocessing complete! Example tokenized review: {train_text[0][:15]}...")
print(f"Data preprocessing complete! Example tokenized label: {train_labels[0]}...")

In [None]:
def build_english_vocab(text, max_words=10000):
    """Build a vocabulary of the most common words"""
    word_count = Counter()

    # Count all words
    for sentence in text:
        word_count.update(sentence)

    # Select most common words
    most_common = word_count.most_common(max_words - 2)  # -2 for <UNK> and <PAD>
    vocab = {word: idx+2 for idx, (word, _) in enumerate(most_common)}

    # Add special tokens
    vocab['<PAD>'] = 0
    vocab['<UNK>'] = 1

    return vocab


# Build vocabulary from training data
english_vocab = build_english_vocab(train_text)
vocab_size = len(english_vocab)

print(f"Vocabulary created with {vocab_size} words!")
print(f"Sample words: {list(english_vocab.items())[:10]}")

# Create a reverse mapping for decoding
english_idx_to_word = {idx: word for word, idx in english_vocab.items()}

In [None]:
def build_german_vocab(text, max_words=10000):
    """Build a vocabulary of the most common words"""

    word_count = Counter()

    # Count all words
    for sentence in text:
        word_count.update(sentence)

    # Select most common words
    most_common = word_count.most_common(max_words - 4)  # -4 for <UNK>, <PAD>, '<SOS>', and '<EOS>'
    vocab = {word: idx+4 for idx, (word, _) in enumerate(most_common)}

    # Add special tokens
    vocab['<PAD>'] = 0
    vocab['<UNK>'] = 1
    vocab['<SOS>'] = 2
    vocab['<EOS>'] = 3

    return vocab


# Build vocabulary from training data
german_vocab = build_english_vocab(train_labels)
vocab_size = len(german_vocab)

print(f"Vocabulary created with {vocab_size} words!")
print(f"Sample words: {list(german_vocab.items())[:10]}")

# Create a reverse mapping for decoding
german_idx_to_word = {idx: word for word, idx in german_vocab.items()}

In [None]:
# Analyze text lengths to determine optimal max_len
sentence_lengths = [len(text) for text in train_text]
max_sentence_length = max(sentence_lengths)

mean_length = np.mean(sentence_lengths)
median_length = np.median(sentence_lengths)
p95_length = np.percentile(sentence_lengths, 95)

print(f"Maximum review length: {max_sentence_length}")
print(f"Mean review length: {mean_length:.2f}")
print(f"Median review length: {median_length}")
print(f"95th percentile length: {p95_length}")

# Plotting the distribution of sentence lengths
plt.figure(figsize=(10, 6))
plt.hist(review_lengths, bins=50)
plt.axvline(x=max_sentence_length, color='r', linestyle='--', label=f'Max: {max_sentence_length}')
plt.axvline(x=p95_length, color='g', linestyle='--', label=f'95th: {p95_length:.0f}')
plt.axvline(x=median_length, color='b', linestyle='--', label=f'Median: {median_length}')

plt.title("Distribution of sentence Lengths")
plt.xlabel("Length (number of tokens)")
plt.ylabel("Count")
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

# Choose max_len based on 95th percentile to avoid excessive padding
# while still covering most sentences without truncation
chosen_max_len = int(p95_length)
print(f"Chosen max_len: {chosen_max_len}")

def encode_sentence(sentence, vocab, max_len=None, add_sos_eos=False):
    """Convert a sentence to a fixed-length sequence of integers"""

    if add_sos_eos:
      sentence = ['<SOS>'] + sentence + ['<EOS>']

    # Use the chosen max_len from analysis if none provided
    if max_len is None:
        max_len = chosen_max_len

    # Encode words, use <UNK> for unknown words
    encoded = [vocab.get(word, vocab['<UNK>']) for word in sentence[:max_len]]

    # Pad sequences to the same length
    if len(encoded) < max_len:
        encoded += [vocab['<PAD>']] * (max_len - len(encoded))

    return encoded

# Encode all sentences using the dynamically determined max_len
train_encoded = [encode_sentence(sentence, english_vocab) for sentence in tqdm(train_text)]
test_encoded = [encode_sentence(sentence, english_vocab) for sentence in tqdm(test_text)]

train_labels_encoded = [encode_sentence(sentence, german_vocab) for sentence in tqdm(train_labels)]
test_labels_encoded = [encode_sentence(sentence, german_vocab) for sentence in tqdm(test_labels)]

# Convert to tensors
train_sequences = torch.tensor(train_encoded, dtype=torch.long)
train_labels_tensor = torch.tensor(train_labels_encoded, dtype=torch.long)

test_sequences = torch.tensor(test_encoded, dtype=torch.long)
test_labels_tensor = torch.tensor(test_labels_encoded, dtype=torch.long)


print(f"Data encoding complete! Example sequence: {train_sequences[0][:15]}...")
print(f"All sequences are now of length {chosen_max_len}")

# Calculate how many sentences were truncated
truncated_count = sum(1 for length in sentence_lengths if length > chosen_max_len)
truncated_percentage = (truncated_count / len(sentence_lengths)) * 100
print(f"Sentences truncated: {truncated_count} ({truncated_percentage:.2f}%)")

In [None]:
def encode_sentence(sentence, vocab, max_len=200, add_sos_eos=False):
    """Convert a sentence to a fixed-length sequence of integers"""

    if add_sos_eos:
        sentence = ['<SOS>'] + sentence + ['<EOS>']

    # Encode words, use <UNK> for unknown words
    encoded = [vocab.get(word, vocab['<UNK>']) for word in sentence[:max_len]]

    # Pad sequences to the same length
    if len(encoded) < max_len:
        encoded += [vocab['<PAD>']] * (max_len - len(encoded))

    return encoded



# Encode all sentences
train_encoded = [encode_sentence(sentence, english_vocab) for sentence in tqdm(train_text)]
train_labels_encoded = [encode_sentence(sentence, german_vocab, add_sos_eos=True) for sentence in tqdm(train_labels)]

test_encoded = [encode_sentence(sentence, english_vocab) for sentence in tqdm(test_text)]
test_labels_encoded = [encode_sentence(sentence, german_vocab, add_sos_eos=True) for sentence in tqdm(test_labels)]


# Convert to tensors
train_sequences = torch.tensor(train_encoded, dtype=torch.long)
train_labels_tensor = torch.tensor(train_labels_encoded, dtype=torch.long)

test_sequences = torch.tensor(test_encoded, dtype=torch.long)
test_labels_tensor = torch.tensor(test_labels_encoded, dtype=torch.long)

print(f"Data encoding complete! Example sequence: {train_sequences[0][:15]}...")

In [None]:
class TranslationDataset(Dataset):

    def __init__(self, sequences, labels):
        self.sequences = sequences
        self.labels = labels

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        return self.sequences[idx], self.labels[idx]

# Create train/validation split
train_seq, val_seq, train_labels, val_labels = train_test_split(
                                                                  train_sequences, train_labels_tensor, test_size=0.1, random_state=42
                                                               )

# Create datasets
train_dataset = TranslationDataset(train_seq, train_labels)
val_dataset = TranslationDataset(val_seq, val_labels)
test_dataset = TranslationDataset(test_sequences, test_labels_tensor)

# Create dataloaders
batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

print(f"DataLoaders created! Training batches: {len(train_loader)}")

In [None]:
class Encoder(nn.Module):

    def __init__(self, input_dim, embedding_dim, hidden_dim, n_layers=2, dropout=0.5):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=n_layers, dropout=dropout, batch_first=True)
        self.dropout = nn.Dropout(dropout)


    def forward(self, src):

        '''
        A Function that takes an english sentence creates an embedding to each word in that sentence, pass these embeddings of each token/word one step at a time to the lstm,
        the lstm then output the context vector representing that sentence (of hidden_size dimension).

        Args:
          src (tensor): The input sequence (the english sentence we want to encode).

        Returns:
          hidden (tensor): Final Vector that represents the final hidden state after encoding the input sentence (context vector of the final step).
          cell (tensor): Final Vector that represents the final cell state after encoding the input sentence (context vector of the final step).
        '''

        embedded = self.dropout(self.embedding(src))                            # [batch_size, src_len, emb_dim]
        outputs, (hidden, cell) = self.lstm(embedded)

        return hidden, cell

In [None]:
class Decoder(nn.Module):

    def __init__(self, output_dim, embedding_dim, hidden_dim, n_layers=2, dropout=0.5):
        super().__init__()
        self.embedding = nn.Embedding(output_dim, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=n_layers, dropout=dropout, batch_first=True)
        self.fc_out = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)



    def forward(self, input, hidden, cell):

        '''
        A Function that takes a german sentence creates an embedding to each word in that sentence, pass these embeddings of each token/word one step at a time to the lstm along with the context vectors from previous states,
        the lstm then outputs the vector which would be passed to the linear layers to generate probabilities for each token.

        Args:
          input (tensor): The input sequence (the german sentence we want to encode).
          hidden (tensor): Final Vector that represents the final previous hidden state.
          cell (tensor): Final Vector that represents the final previous cell state.

        Returns:
          prediction (tensor):
          hidden (tensor): Final Vector that represents the final hidden state.
          cell (tensor): Final Vector that represents the final cell state.
        '''

        # input: [batch_size] -> need [batch_size, 1] for single token
        input = input.unsqueeze(1)
        embedded = self.dropout(self.embedding(input))  # [batch_size, 1, emb_dim]
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))  # output: [batch_size, 1, hidden_dim]
        prediction = self.fc_out(output.squeeze(1))  # [batch_size, output_dim]

        return prediction, hidden, cell

In [None]:
class Seq2Seq(nn.Module):

    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio=0.5):

        '''
        A function that performes translation from english to german using sequence to sequence modelling.

        Args:

          src (): The english sentence we want to translate.
          trg (): The targeted german sentence (ground truth).
          teacher_forcing_ratio ():

        Returns:
          outputs (tensor): The translated german sentence.
        '''

        batch_size = src.shape[0]
        trg_len = trg.shape[1]

        trg_vocab_size = self.decoder.fc_out.out_features

        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device)

        hidden, cell = self.encoder(src)
        input = trg[:, 0]  # first <sos> token


        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[:, t] = output
            top1 = output.argmax(1)
            input = trg[:, t] if torch.rand(1).item() < teacher_forcing_ratio else top1

        return outputs

In [None]:
def train(model, iterator, optimizer, criterion, clip, teacher_forcing_ratio=0.5):
    """
    Train the Seq2Seq model for one epoch.
    Args:
        model: The Seq2Seq model
        iterator: DataLoader iterator
        optimizer: Optimizer (e.g., Adam)
        criterion: Loss function (CrossEntropyLoss)
        clip: Gradient clipping value
        teacher_forcing_ratio: Probability of using teacher forcing during training
    Returns:
        epoch_loss: Average loss for the epoch
    """

    model.train()
    epoch_loss = 0
    epoch_acc = 0

    for batch in tqdm(iterator, desc="Training"):

        # Get batch data (source and target sequences)
        src, trg = batch
        src, trg = src.to(device), trg.to(device)

        # Zero gradients
        optimizer.zero_grad()

        # Forward pass through the model
        output = model(src, trg, teacher_forcing_ratio)

        # output shape: [batch_size, trg_len, trg_vocab_size]
        # trg shape: [batch_size, trg_len]

        # Flatten the output and target for CrossEntropyLoss
        output = output[:, 1:].reshape(-1, output.shape[-1])  # [batch_size * trg_len, output_dim]
        trg = trg[:, 1:].reshape(-1)  # [batch_size * trg_len]

        # Calculate loss
        loss = criterion(output, trg)

        # Calculate accuracy
        # Predicted token: the one with the highest probability (argmax)
        _, predicted = output.max(1)

        # Calculate the number of correct predictions
        correct = (predicted == trg).float()
        accuracy = correct.sum() / len(correct)

        # Backward pass
        loss.backward()

        # Gradient clipping to avoid exploding gradients
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)

        # Update model parameters
        optimizer.step()

        # Update the epoch loss and accuracy
        epoch_loss += loss.item()
        epoch_acc += accuracy.item()

    # Return average loss and accuracy for the epoch
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
def evaluate(model, dataloader, criterion, device, trg_pad_idx):

    model.eval()  # Set model to evaluation mode

    epoch_loss = 0
    epoch_acc = 0

    with torch.no_grad():

        for src, trg in dataloader:

            src, trg = src.to(device), trg.to(device)

            # Get model outputs
            output = model(src, trg, teacher_forcing_ratio=0)  # No teacher forcing during evaluation

            # output: [batch_size, trg_len, output_dim]
            # trg: [batch_size, trg_len]

            output_dim = output.shape[-1]
            output = output[:, 1:].reshape(-1, output_dim)
            trg = trg[:, 1:].reshape(-1)

            loss = criterion(output, trg)

            # Compute accuracy
            preds = output.argmax(dim=1)
            non_pad = trg != trg_pad_idx
            correct = (preds == trg) & non_pad
            acc = correct.sum().float() / non_pad.sum()

            epoch_loss += loss.item()
            epoch_acc += acc.item()

    return epoch_loss / len(dataloader), epoch_acc / len(dataloader)


In [None]:
INPUT_DIM = len(english_vocab)       # English vocab size
OUTPUT_DIM = len(german_vocab)      # German vocab size
EMBEDDING_DIM = 100
HIDDEN_DIM = 256
N_LAYERS = 2
DROPOUT = 0.5

enc = Encoder(INPUT_DIM, EMBEDDING_DIM, HIDDEN_DIM, N_LAYERS, DROPOUT)
dec = Decoder(OUTPUT_DIM, EMBEDDING_DIM, HIDDEN_DIM, N_LAYERS, DROPOUT)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model4 = Seq2Seq(enc, dec, device).to(device)

print(f"Model created and moved to {device}!")
print(model4)

In [None]:
# Cross Entropy Loss
criterion = nn.CrossEntropyLoss()

# Adam optimizer
optimizer = optim.Adam(model4.parameters(), lr=0.001)

print("Loss function and optimizer defined!")

In [None]:
n_epochs = 1
clip = 1.0            # Gradient clipping value

# Lists to store metrics
train_losses = []
train_accs = []

val_losses = []
val_accs = []


# Training loop
for epoch in range(n_epochs):

    print(f"\nEpoch {epoch+1}/{n_epochs}")
    print("-" * 20)

    # Train
    train_loss, train_acc = train(model4, train_loader, optimizer, criterion, clip)
    train_losses.append(train_loss)
    train_accs.append(train_acc)

    # Evaluate
    trg_pad_idx = german_vocab['<PAD>']
    val_loss, val_acc = evaluate(model4, val_loader, criterion, device, trg_pad_idx)
    val_losses.append(val_loss)
    val_accs.append(val_acc)

    # Print metrics
    print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc*100:.2f}%")
    print(f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc*100:.2f}%")

print("\nTraining complete!")

In [None]:
plt.figure(figsize=(12, 5))

# Plot loss
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()

# Plot accuracy
plt.subplot(1, 2, 2)
plt.plot(train_accs, label='Train Acc')
plt.plot(val_accs, label='Val Acc')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Training and Validation Accuracy')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
test_loss, test_acc = evaluate(model4, test_loader, criterion, device, trg_pad_idx)
print(f"Test Loss: {test_loss:.4f} | Test Acc: {test_acc*100:.2f}%")