In [1]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import math
from datasets import load_dataset
# from torchinfo import summary
from tqdm import tqdm
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def generate_causal_mask(seq_len):
    # Create a matrix with ones in the lower triangle, zeros above
    mask = torch.tril(torch.ones(seq_len, seq_len))
    return mask  # shape (seq_len, seq_len)

In [7]:
class ScaledDotProductAttention(nn.Module):
    def __init__(self, d_k):
        super().__init__()
        self.scale = d_k ** 0.5

    def forward(self, Q, K, V, causal_mask=False, padding_mask=None):
        # Q, K, V: (batch_size, num_heads, seq_len, d_k)
        scores = torch.matmul(Q, K.transpose(-2, -1)) / self.scale  # (B, H, L, L)

        if causal_mask:
            causal_mask = generate_causal_mask(scores.size(-1)).expand(scores.size(0), scores.size(1), -1, -1).to(scores.device)
            scores = scores.masked_fill(causal_mask == 0, float('-inf'))
        if padding_mask is not None:
            padding_mask = padding_mask.unsqueeze(1).unsqueeze(2)  # (batch, 1, 1, seq_len)
            scores = scores.masked_fill(padding_mask == 0, float('-inf'))  # Mask out pad positions

        attention_weights = F.softmax(scores, dim=-1)  # (B, H, L, L)
        output = torch.matmul(attention_weights, V)    # (B, H, L, d_k)
        return output

In [8]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_embedding, num_heads):
        super().__init__()
        assert d_embedding % num_heads == 0
        self.d_k = d_embedding // num_heads
        self.num_heads = num_heads

        #in reality these can map to a lower dimensional space to make things faster``
        self.W_q = nn.Linear(d_embedding, d_embedding)
        self.W_k = nn.Linear(d_embedding, d_embedding)
        self.W_v = nn.Linear(d_embedding, d_embedding)
        self.W_o = nn.Linear(d_embedding, d_embedding)

        self.attention = ScaledDotProductAttention(self.d_k)

        self.norm = nn.LayerNorm(d_embedding)

    def forward(self, x, causal_mask=False, padding_mask=None):
        x_input = x
        x = self.norm(x)

        B, L, d_embedding = x.size()  # Batch, Sequence Length, Embedding Dim
        H = self.num_heads

        # Linear projections
        Q = self.W_q(x).view(B, L, H, self.d_k).transpose(1, 2)  # (B, H, L, d_k)
        K = self.W_k(x).view(B, L, H, self.d_k).transpose(1, 2)
        V = self.W_v(x).view(B, L, H, self.d_k).transpose(1, 2)

        # Apply attention
        context = self.attention(Q, K, V, causal_mask, padding_mask)  # (B, H, L, d_k)

        # Concatenate heads
        context = context.transpose(1, 2).contiguous().view(B, L, d_embedding)  # (B, L, d_embedding)

        # Final linear projection
        output = self.W_o(context)  # (B, L, d_embedding)

        # Add (& pre-Norm)
        #my preference is to do pre-norm for better stabiliy, even though the original paper used post-norm
        output = x_input + output
        return output


In [9]:
class MultiHeadCrossAttention(nn.Module):
    def __init__(self, d_embedding, num_heads):
        super().__init__()
        assert d_embedding % num_heads == 0
        self.d_k = d_embedding // num_heads
        self.num_heads = num_heads

        self.W_q = nn.Linear(d_embedding, d_embedding)
        self.W_k = nn.Linear(d_embedding, d_embedding)
        self.W_v = nn.Linear(d_embedding, d_embedding)
        self.W_o = nn.Linear(d_embedding, d_embedding)

        self.attention = ScaledDotProductAttention(self.d_k)

        self.norm = nn.LayerNorm(d_embedding)

    def forward(self, x_decoder, x_encoder, causal_mask=False, padding_mask=None):
        assert x_decoder.size() == x_encoder.size() #x and x_encoder must have the same size
        x_input = x_decoder
        x_decoder = self.norm(x_decoder)
        
        B, L, d_embedding = x_decoder.size()  # Batch, Sequence Length, Embedding Dim
        H = self.num_heads

        # Linear projections
        Q = self.W_q(x_decoder).view(B, L, H, self.d_k).transpose(1, 2)  # (B, H, L, d_k)
        K = self.W_k(x_encoder).view(B, L, H, self.d_k).transpose(1, 2)
        V = self.W_v(x_encoder).view(B, L, H, self.d_k).transpose(1, 2)

        # Apply attention
        context = self.attention(Q, K, V, causal_mask, padding_mask)  # (B, H, L, d_k)

        # Concatenate heads
        context = context.transpose(1, 2).contiguous().view(B, L, d_embedding)  # (B, L, d_embedding)

        # Final linear projection
        output = self.W_o(context)  # (B, L, d_embedding)

        output = output + x_input
        return output


In [10]:
class FeedForwardNetwork(nn.Module):
    def __init__(self, d_embedding, d_ff):
        super().__init__()
        self.linear1 = nn.Linear(d_embedding, d_ff)
        self.linear2 = nn.Linear(d_ff, d_embedding)
        self.activation = nn.ReLU()
        self.norm = nn.LayerNorm(d_embedding)
    
    def forward(self, x):
        x_input = x
        x = self.norm(x)
        x = self.linear1(x)
        x = self.activation(x)
        x = self.linear2(x)
        x = x_input + x
        return x

In [11]:
class Encoder(nn.Module):
    def __init__(self, d_embedding, num_heads, d_ff, num_layers):
        super().__init__()
        self.layers = nn.ModuleList([
            MultiHeadAttention(d_embedding, num_heads),
            FeedForwardNetwork(d_embedding, d_ff)
        ] * num_layers)
    
    def forward(self, x, causal_mask=False, padding_mask=None):
        for i, layer in enumerate(self.layers):
            if i % 2 == 0:
                x = layer(x, causal_mask=causal_mask, padding_mask=padding_mask)
            else:
                x = layer(x)
        return x

In [12]:
class Decoder(nn.Module):
    def __init__(self, encoder, d_embedding, num_heads, d_ff, num_layers, vocab_size):
        super().__init__()
        self.encoder = encoder
        self.layers = nn.ModuleList([
            MultiHeadAttention(d_embedding, num_heads),
            MultiHeadCrossAttention(d_embedding, num_heads),
            FeedForwardNetwork(d_embedding, d_ff)
        ] * num_layers)
        self.linear = nn.Linear(d_embedding, vocab_size)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x_decoder, x_encoder, causal_mask=False, encoder_padding_mask=None, decoder_padding_mask=None, inference=False):
        for i, layer in enumerate(self.layers):
            if i % 3 == 0:
                x_decoder = layer(x_decoder, causal_mask=causal_mask, padding_mask=decoder_padding_mask)
            elif i % 3 == 1:
                x_decoder = layer(x_decoder, x_encoder, causal_mask=False, padding_mask=encoder_padding_mask)
            else:
                x_decoder = layer(x_decoder)
        x_decoder = self.linear(x_decoder)
        if inference:
            x_decoder = self.softmax(x_decoder)
        return x_decoder

In [None]:
class Embedding(nn.Module):
    def __init__(self, vocab_size, d_embedding, max_len): 
        #max_len is the maximum length of the input sequence
        super().__init__()
        self.embedding = nn.Parameter(torch.randn(vocab_size, d_embedding))

        pe = torch.zeros(max_len, d_embedding)
        powers = torch.repeat_interleave(torch.arange(0, 1, 2/d_embedding), repeats=2).expand(max_len, -1)
        divisors = torch.pow(10000, powers)
        positions = torch.arange(0, max_len).view(max_len, -1).expand(-1, d_embedding)
        args = positions / divisors
        pe[:, 0::2] = torch.sin(args[:, 0::2])
        pe[:, 1::2] = torch.cos(args[:, 1::2])

        # Register as buffer so it's not a parameter but moves with `.to(device)`
        self.register_buffer('pe', pe)

    def forward(self, x):
        """
        Args:
            x: Tensor of shape (batch_size, seq_len)
        Returns:
            Tensor of shape (batch_size, seq_len, d_embedding)
        """
        seq_len = x.size(1)
        # Add positional encoding: broadcast over batch dimension
        x = self.embedding[x] + self.pe[:seq_len]
        return x

In [23]:
def create_padding_mask(padded_input, pad_token_id=0):
    return (padded_input != pad_token_id).int()  # shape: (batch, seq_len)

In [24]:
class Transformer(nn.Module):
    def __init__(self, vocab_size, d_embedding, num_heads, d_ff, num_layers, max_len):
        super().__init__()
        self.embedding = Embedding(vocab_size, d_embedding, max_len)
        self.encoder = Encoder(d_embedding, num_heads, d_ff, num_layers)
        self.decoder = Decoder(self.encoder, d_embedding, num_heads, d_ff, num_layers, vocab_size)

    def forward(self, x_encoder, x_decoder, causal_mask=False):
        encoder_padding_mask = create_padding_mask(x_encoder)
        decoder_padding_mask = create_padding_mask(x_decoder)

        x_encoder = self.embedding(x_encoder)
        x_decoder = self.embedding(x_decoder)
        x_encoder = self.encoder(x_encoder, causal_mask=False, padding_mask=encoder_padding_mask)
        output = self.decoder(x_decoder, x_encoder, causal_mask=causal_mask, encoder_padding_mask=encoder_padding_mask, decoder_padding_mask=decoder_padding_mask)
        return output

In [None]:
# params
vocab_size = 50257
max_len = 32
d_embedding = 128
d_ff = 512
num_heads = 8
num_layers = 6
# Dummy input extras
seq_len = 5
batch_size = 2

x_test = torch.randn(batch_size, seq_len, d_embedding)


In [25]:
transformer = Transformer(
    vocab_size=vocab_size,
    d_embedding=d_embedding,
    num_heads=num_heads,
    d_ff=d_ff,
    num_layers=num_layers,
    max_len=max_len)

In [31]:
# data = load_dataset('opus_books', 'en-sk')

In [None]:
data = pd.read_csv('en_sk_sentence_pairs.csv', sep=';')

In [None]:
tokenizer = AutoTokenizer.from_pretrained("gpt2")  # small, open tokenizer
tokenizer.pad_token = tokenizer.eos_token
# Or for BERT-style:
# tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

In [None]:
tokenized = [{
    'en': tokenizer(data.iloc[i]['sentence_en'], return_tensors='pt', padding='max_length', max_length=max_len, truncation=True)['input_ids'],
    'sk': tokenizer(data.iloc[i]['sentence_sk'], return_tensors='pt', padding='max_length', max_length=max_len, truncation=True)['input_ids']
} for i in range(data.shape[0])]

In [None]:
class TranslationDataset(Dataset):
    def __init__(self, tokenized):
        self.tokenized = tokenized

    def __len__(self):
        return len(self.tokenized)

    def __getitem__(self, idx):
        return (self.tokenized[idx]['en'], self.tokenized[idx]['sk'])

# tokenizer = AutoTokenizer.from_pretrained("gpt2")
# tokenizer.pad_token = tokenizer.eos_token
# dataset = TranslationDataset(examples, tokenizer)
dataset = TranslationDataset(tokenized)
dataloader = DataLoader(dataset, batch_size=512, shuffle=True)

In [None]:
cel = torch.nn.CrossEntropyLoss(ignore_index=50256)

optimizer = torch.optim.Adam(transformer.parameters(), lr=1e-4)

  0%|          | 0/52 [00:00<?, ?it/s]

In [None]:
for batch_data, batch_labels in tqdm(dataloader):
    # print(batch_data.shape, batch_labels.shape)
    batch_data = batch_data.squeeze(1)  # Remove the extra dimension
    batch_labels = batch_labels.squeeze(1)  # Remove the extra dimension

    optimizer.zero_grad()
    output = transformer(batch_data, batch_data, causal_mask=True)
    loss = cel(output.permute(0, 2, 1), batch_labels)

    loss.backward()
    torch.nn.utils.clip_grad_norm_(transformer.parameters(), 1.0)
    optimizer.step()
    print(f"Batch Loss: {loss.item()}")


project idea \
it can predict sequence of numbers, in words. \
two four six eight ten - twelve \
three six nine twelve - fifteen \
could be arithmetic and geometric. I will generate them, code up the number to string mapper, pass it mapped to strings