#  Transformers

- Angel Molina
- Daniel Marin




In [1]:
import os
import math
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import random

In [2]:
# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

Device: cpu


In [3]:
# small toy dataset with English-Spanish sentence pairs
english_sentences = [
    "hello",
    "how are you",
    "good morning",
    "i am from ecuador",
    "thank you"
]

spanish_sentences = [
    "hola",
    "como estas",
    "buenos dias",
    "soy de ecuador",
    "gracias"
]

In [4]:
# Tokenizer functions
def tokenize_en(text):
    return text.lower().split()

def tokenize_es(text):
    return text.lower().split()

# Special tokens for padding, start, end, and unknown tokens
SPECIAL_TOKENS = {"<pad>": 0, "<sos>": 1, "<eos>": 2, "<unk>": 3}

# Create vocabulary
def build_vocab(sentences, tokenizer):
    vocab = SPECIAL_TOKENS.copy()
    idx = len(vocab)

    for sentence in sentences:
        tokens = tokenizer(sentence)
        for token in tokens:
            if token not in vocab:
                vocab[token] = idx
                idx += 1

    return vocab

# Build vocab for English and Spanish
english_vocab = build_vocab(english_sentences, tokenize_en)
spanish_vocab = build_vocab(spanish_sentences, tokenize_es)

print("English Vocab:", english_vocab)
print("Spanish Vocab:", spanish_vocab)

English Vocab: {'<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3, 'hello': 4, 'how': 5, 'are': 6, 'you': 7, 'good': 8, 'morning': 9, 'i': 10, 'am': 11, 'from': 12, 'ecuador': 13, 'thank': 14}
Spanish Vocab: {'<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3, 'hola': 4, 'como': 5, 'estas': 6, 'buenos': 7, 'dias': 8, 'soy': 9, 'de': 10, 'ecuador': 11, 'gracias': 12}


In [21]:
# Translation dataset class
class TranslationDataset(Dataset):
    def __init__(self, src_sentences, trg_sentences, src_tokenizer, trg_tokenizer, src_vocab, trg_vocab):
        self.src_sentences = src_sentences
        self.trg_sentences = trg_sentences
        self.src_tokenizer = src_tokenizer
        self.trg_tokenizer = trg_tokenizer
        self.src_vocab = src_vocab
        self.trg_vocab = trg_vocab

    def __len__(self):
        return len(self.src_sentences)

    def __getitem__(self, idx):
        src = self.src_tokenizer(self.src_sentences[idx])
        trg = self.trg_tokenizer(self.trg_sentences[idx])

        # Convert tokens to indices
        src_indices = [self.src_vocab.get(token, self.src_vocab["<unk>"]) for token in src]
        trg_indices = [self.trg_vocab.get(token, self.trg_vocab["<unk>"]) for token in trg]

        # Add <sos> and <eos> tokens
        src_indices = [self.src_vocab["<sos>"]] + src_indices + [self.src_vocab["<eos>"]]
        trg_indices = [self.trg_vocab["<sos>"]] + trg_indices + [self.trg_vocab["<eos>"]]

        return torch.tensor(src_indices), torch.tensor(trg_indices)

# Create dataset and DataLoader
train_dataset = TranslationDataset(english_sentences, spanish_sentences, tokenize_en, tokenize_es, english_vocab, spanish_vocab)

from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
    src_batch, trg_batch = zip(*batch)

    # Pad sequences
    src_padded = pad_sequence(src_batch, padding_value=english_vocab["<pad>"])
    trg_padded = pad_sequence(trg_batch, padding_value=spanish_vocab["<pad>"])

    return src_padded, trg_padded


# Create DataLoader with the custom collate function
train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)

# train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)

In [22]:
# Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, emb_size: int, maxlen: int = 5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(maxlen, emb_size)
        position = torch.arange(0, maxlen, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, emb_size, 2).float() * (-math.log(10000.0) / emb_size))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:x.size(0), :]

In [29]:
class TransformerModel(nn.Module):
    def __init__(self, input_dim, output_dim, emb_dim, n_heads, n_layers, forward_dim, dropout):
        super(TransformerModel, self).__init__()

        self.src_embedding = nn.Embedding(input_dim, emb_dim)
        self.trg_embedding = nn.Embedding(output_dim, emb_dim)

        self.positional_encoding = PositionalEncoding(emb_dim)

        self.transformer = nn.Transformer(
            d_model=emb_dim,
            nhead=n_heads,
            num_encoder_layers=n_layers,
            num_decoder_layers=n_layers,
            dim_feedforward=forward_dim,
            dropout=dropout
        )

        self.fc_out = nn.Linear(emb_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src, trg, src_mask, trg_mask):
        src_emb = self.dropout(self.src_embedding(src)) * math.sqrt(src.shape[1])
        trg_emb = self.dropout(self.trg_embedding(trg)) * math.sqrt(trg.shape[1])

        src_emb = self.positional_encoding(src_emb)
        trg_emb = self.positional_encoding(trg_emb)

        # Transformer expects input as [seq_len, batch_size, emb_dim]
        src_emb = src_emb.permute(1, 0, 2)
        trg_emb = trg_emb.permute(1, 0, 2)

        # Ensure masks are of appropriate size
        src_mask = src_mask.to(src_emb.device) if src_mask is not None else None
        trg_mask = trg_mask.to(trg_emb.device) if trg_mask is not None else None

        output = self.transformer(src_emb, trg_emb, src_mask=src_mask, tgt_mask=trg_mask)
        output = self.fc_out(output)

        return output


In [30]:
# Helper functions for masking
def generate_square_subsequent_mask(sz):
    mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask


In [31]:
# Training loop
def train(model, iterator, optimizer, criterion, clip):
    model.train()

    epoch_loss = 0
    for i, (src, trg) in enumerate(iterator):
        src = src.transpose(0, 1).to(device)
        trg = trg.transpose(0, 1).to(device)

        trg_input = trg[:-1, :]

        src_mask = generate_square_subsequent_mask(src.size(1)).to(device)
        trg_mask = generate_square_subsequent_mask(trg_input.size(0)).to(device)

        optimizer.zero_grad()
        output = model(src, trg_input, src_mask, trg_mask)

        output_dim = output.shape[-1]
        output = output.view(-1, output_dim)
        trg = trg[1:].view(-1)

        loss = criterion(output, trg)
        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(iterator)

In [32]:
# Translation function
def translate_sentence(sentence, model, src_vocab, trg_vocab, max_len=10):
    model.eval()

    # Tokenize the input sentence
    tokens = tokenize_en(sentence)
    src_indices = [src_vocab.get(token, src_vocab["<unk>"]) for token in tokens]
    src_indices = [src_vocab["<sos>"]] + src_indices + [src_vocab["<eos>"]]

    # Convert to tensor and pass through the encoder
    src_tensor = torch.LongTensor(src_indices).unsqueeze(1).to(device)
    src_mask = generate_square_subsequent_mask(len(src_tensor)).to(device)

    with torch.no_grad():
        src_emb = model.positional_encoding(model.src_embedding(src_tensor))
        memory = model.transformer.encoder(src_emb, src_mask)

    trg_indices = [trg_vocab["<sos>"]]

    for _ in range(max_len):
        trg_tensor = torch.LongTensor([trg_indices[-1]]).unsqueeze(1).to(device)
        trg_mask = generate_square_subsequent_mask(len(trg_indices)).to(device)

        with torch.no_grad():
            trg_emb = model.positional_encoding(model.trg_embedding(trg_tensor))
            output = model.transformer.decoder(trg_emb, memory, trg_mask)

        pred_token = output.argmax(2)[-1, :].item()
        trg_indices.append(pred_token)

        if pred_token == trg_vocab["<eos>"]:
            break

    trg_tokens = [list(trg_vocab.keys())[list(trg_vocab.values()).index(idx)] for idx in trg_indices[1:-1]]

    return ' '.join(trg_tokens)




In [33]:
# Model parameters
INPUT_DIM = len(english_vocab)
OUTPUT_DIM = len(spanish_vocab)
EMB_DIM = 256
N_HEADS = 8
N_LAYERS = 3
FORWARD_DIM = 512
DROPOUT = 0.1

# Instantiate the model, optimizer, and loss function
model = TransformerModel(INPUT_DIM, OUTPUT_DIM, EMB_DIM, N_HEADS, N_LAYERS, FORWARD_DIM, DROPOUT).to(device)
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=english_vocab["<pad>"])

# Train the model for a few epochs
N_EPOCHS = 10
CLIP = 1



In [34]:
for epoch in range(N_EPOCHS):
    train_loss = train(model, train_loader, optimizer, criterion, CLIP)
    print(f'Epoch {epoch+1} | Train Loss: {train_loss:.3f}')

# Test the model by translating a sentence
translated_sentence = translate_sentence("hi from ecuador", model, english_vocab, spanish_vocab)
print(f'Translation: {translated_sentence}')

RuntimeError: the batch number of src and tgt must be equal