# Generative AI

In [1]:
import numpy as np
import pandas as pd

%matplotlib inline
import matplotlib.pyplot as plt

import torch
from torch import nn

from datasets import load_dataset
# check for gpu
print(torch.cuda.is_available())

import warnings
warnings.filterwarnings("ignore")

True


## Making the data ready

In [2]:
from datasets import load_dataset
from transformers import AutoTokenizer

# Load the dataset (ensure the right dataset is loaded)
dataset = load_dataset("wmt14", "cs-en", cache_dir = "./data_cache")

# Define the tokenizer (using a pretrained model like T5)
tokenizer = AutoTokenizer.from_pretrained("t5-small")  # Example with T5



# Only use first 10k samples
small_dataset = dataset['train'].select(range(10_000))

# Check the structure of the first example in the train dataset
print(small_dataset[0])


{'translation': {'cs': 'Následný postup na základě usnesení Parlamentu: viz zápis', 'en': "Action taken on Parliament's resolutions: see Minutes"}}


In [3]:
small_dataset

Dataset({
    features: ['translation'],
    num_rows: 10000
})

In [4]:
max_length = 64
def tokenize_function(examples):
    # Tokenize source (EN) and target (DE) from the dictionary keys
    sources = examples["translation"]  # This is a list of dictionaries
    source_texts = [source["en"] for source in sources]  # Extract English sentences
    target_texts = [source["cs"] for source in sources]  # Extract German sentences
    
    # Tokenize the source texts (for the encoder)
    model_inputs = tokenizer(
        source_texts,
        max_length=max_length,
        padding="max_length",
        truncation=True,
        return_tensors="pt"  # Ensure that the output is in tensor format
    )
    
    # Tokenize the target texts (for the decoder)
    labels = tokenizer(
        target_texts,
        max_length=max_length,
        padding="max_length",
        truncation=True,
        return_tensors="pt"
    )
    
    # Add labels to the model inputs dictionary
    model_inputs["labels"] = labels["input_ids"]
    
    return model_inputs


# Apply tokenizer across the entire dataset
tokenized_dataset = small_dataset.map(tokenize_function, batched=True, remove_columns=["translation"])

# Verify the result
print(tokenized_dataset[0])

{'input_ids': [6776, 1026, 30, 12876, 31, 7, 3161, 7, 10, 217, 13687, 7, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'labels': [445, 2975, 7, 1361, 29, 2, 442, 413, 3, 29, 9, 3, 172, 2975, 8142, 26, 2, 178, 1496, 35, 2, 13636, 76, 10, 3, 7302, 3, 172, 2975, 102, 159, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}


In [5]:
from torch.utils.data import DataLoader

# Set format to PyTorch tensors
tokenized_dataset.set_format(type="torch", columns=["input_ids", "labels"])

# Create PyTorch DataLoader
train_dataloader = DataLoader(tokenized_dataset, batch_size=32, shuffle=True)


## Creating architecture

In [6]:
import math
from torch.nn import functional as F
class ScaledDotProductAttention(nn.Module):
    def __init__(self):
        super(ScaledDotProductAttention, self).__init__()

    def forward(self, Q, K, V, mask=None):
        # Q, K, V: (B, h, L, d_k)
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(Q.size(-1))  # (B, h, L, L)

        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)

        attn = F.softmax(scores, dim=-1)  # (B, h, L, L)
        output = torch.matmul(attn, V)  # (B, h, L, d_k)

        return output, attn  # return both output and attention weights (as in paper)

In [7]:
attention = ScaledDotProductAttention()

batch_size = 2
seq_len = 5
dk = 8
dv = 8

Q = torch.randn(batch_size, seq_len, dk)
K = torch.randn(batch_size, seq_len, dk)
V = torch.randn(batch_size, seq_len, dv)

output, attn = attention(Q, K, V)
print("Output shape:", output.shape)

Output shape: torch.Size([2, 5, 8])


In [8]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

        self.num_heads = num_heads
        self.head_dim = d_model // num_heads

        # Linear projections for Q, K, V
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)

        # Final output projection
        self.W_o = nn.Linear(d_model, d_model)

        self.attention = ScaledDotProductAttention()

    def forward(self, Q, K, V, mask=None):
        B, L, d_model = Q.size()

        # Linear projections
        Q = self.W_q(Q).view(B, L, self.num_heads, self.head_dim).transpose(1, 2)  # (B, h, L, d_k)
        K = self.W_k(K).view(B, L, self.num_heads, self.head_dim).transpose(1, 2)
        V = self.W_v(V).view(B, L, self.num_heads, self.head_dim).transpose(1, 2)

        # Apply attention
        attn_output, attn_weights = self.attention(Q, K, V, mask)  # (B, h, L, d_k)

        # Concatenate heads
        attn_output = attn_output.transpose(1, 2).contiguous().view(B, L, d_model)  # (B, L, d_model)

        # Final linear projection
        output = self.W_o(attn_output)  # (B, L, d_model)

        return output, attn_weights


In [9]:
mha = MultiHeadAttention(d_model=512, num_heads=8)
Q = torch.randn(2, 10, 512)  # batch_size=2, seq_len=10, d_model=512
K = torch.randn(2, 10, 512)
V = torch.randn(2, 10, 512)

output, attn_weights = mha(Q, K, V)
print("Output shape:", output.shape)       # (2, 10, 512)
print("Attention weights shape:", attn_weights.shape)  # (2, 8, 10, 10)


Output shape: torch.Size([2, 10, 512])
Attention weights shape: torch.Size([2, 8, 10, 10])


In [10]:
class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super().__init__()
        self.relu = nn.ReLU()
        self.d_model = d_model
        self.d_ff = d_ff
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)

    def forward(self, x:torch.Tensor) -> torch.Tensor:
        x = self.relu(self.linear1(x))
        return self.linear2(x)

In [11]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, max_len=64):
        super().__init__()

        # Create a (max_len, d_model) matrix
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float32).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position* div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe) # save as buffer (not a paramter)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        seq_len = x.size(1)
        # add positional_encoding
        x = x + self.pe[:, :seq_len, :] # type: ignore
        return x


## Building the model

### Encoder Block

In [12]:
class EncoderCell(nn.Module):
    def __init__(self, d_model: int, num_heads: int, d_ff: int):
        super().__init__()

        self.layer_norm1 = nn.LayerNorm(d_model)
        self.layer_norm2 = nn.LayerNorm(d_model)
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ff = PositionWiseFeedForward(d_model, d_ff)

    def forward(self, x: torch.Tensor, mask=None) -> torch.Tensor:
        # x : (batch_size, seq_len, d_model)
        # mask : (batch_size, 1, 1, seq_len), optional for padding masking

        # Multi-Head Self Attention
        attn_output, _ = self.mha(x, x, x, mask=mask)
        
        # Add residual connection and layer normalization
        x = self.layer_norm1(x + attn_output)

        # Feed-forward network
        ff_output = self.ff(x)

        # Add residual connection and layer normalization
        x = self.layer_norm2(x + ff_output)

        return x


In [13]:
x = torch.randn((32, 64, 512))
encoder_cell = EncoderCell(512, 8, 2048)

encoder_cell(x).shape


torch.Size([32, 64, 512])

In [14]:
class DecoderCell(nn.Module):
    def __init__(self, d_model: int, num_heads: int, d_ff:int):
        super().__init__()

        self.layer_norm1 = nn.LayerNorm(d_model)
        self.layer_norm2 = nn.LayerNorm(d_model)
        self.layer_norm3 = nn.LayerNorm(d_model)
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.enc_dec_attn = MultiHeadAttention(d_model, num_heads)
        self.ff = PositionWiseFeedForward(d_model, d_ff)

    def forward(self, x: torch.Tensor, enc_output: torch.Tensor, tgt_mask=None, src_mask=None) -> torch.Tensor:
        # x : (batch_size, tgt_seq_len, d_model)
        # enc_output: (batch_size, src_seq_len, d_model)

        # Self-attention with masking (prevent future token attention in decoder)
        attn_output1, _ = self.self_attn(x, x, x, mask=tgt_mask)
        x = self.layer_norm1(x + attn_output1)  # Residual + LayerNorm

        # Encoder-Decoder attention (decoder attends to encoder output)
        attn_output2, _ = self.enc_dec_attn(x, enc_output, enc_output, mask=src_mask)
        x = self.layer_norm2(x + attn_output2)  # Residual + LayerNorm

        # Feed forward
        ff_output = self.ff(x)
        x = self.layer_norm3(x + ff_output)  # Residual + LayerNorm

        return x


In [15]:
class Encoder(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, d_ff, vocab_size, max_len=512):
        super().__init__()

        self.token_embed = nn.Embedding(vocab_size, d_model)
        self.pos_embed = PositionalEncoding(d_model, max_len)

        self.layers = nn.ModuleList([
            EncoderCell(d_model, num_heads, d_ff)
            for _ in range(num_layers)
        ])

        self.norm = nn.LayerNorm(d_model)

    def forward(self, src, src_mask=None):
        x = self.token_embed(src)  # (batch_size, src_seq_len, d_model)
        x = self.pos_embed(x)

        for layer in self.layers:
            x = layer(x, src_mask)

        x = self.norm(x)  # Final norm after last layer
        return x  # (batch_size, src_seq_len, d_model)


In [16]:
class Decoder(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, d_ff, vocab_size, max_len=512):
        super().__init__()

        self.token_embed = nn.Embedding(vocab_size, d_model)
        self.pos_embed = PositionalEncoding(d_model, max_len)

        self.layers = nn.ModuleList([
            DecoderCell(d_model, num_heads, d_ff)
            for _ in range(num_layers)
        ])

        self.norm = nn.LayerNorm(d_model)

    def forward(self, tgt, enc_output, tgt_mask=None, src_mask=None):
        x = self.token_embed(tgt)  # (batch_size, tgt_seq_len, d_model)
        x = self.pos_embed(x)

        for layer in self.layers:
            x = layer(x, enc_output, tgt_mask, src_mask)

        x = self.norm(x)  # Final norm
        return x  # (batch_size, tgt_seq_len, d_model)


In [17]:
import torch
import torch.nn as nn

class Transformer(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, d_ff, src_vocab_size, tgt_vocab_size, max_len=512):
        super().__init__()

        
        # Encoder and Decoder
        self.encoder = Encoder(num_layers, d_model, num_heads, d_ff, src_vocab_size, max_len)
        self.decoder = Decoder(num_layers, d_model, num_heads, d_ff, tgt_vocab_size, max_len)

        # Final linear layer projects decoder output to vocab logits
        self.output_linear = nn.Linear(d_model, tgt_vocab_size)

    def forward(self, src, tgt, src_mask=None, tgt_mask=None):
        
        # Forward pass through the encoder and decoder
        enc_output = self.encoder(src, src_mask)
        dec_output = self.decoder(tgt, enc_output, tgt_mask, src_mask)

        # Project decoder output to logits
        logits = self.output_linear(dec_output)  # (batch_size, tgt_seq_len, tgt_vocab_size)
        return logits


In [18]:
def create_padding_mask(seq, pad_token_id=0):
    # seq: (batch_size, seq_len)
    return (seq == pad_token_id).unsqueeze(1).unsqueeze(2)
    # output shape: (batch_size, 1, 1, seq_len) ➔ broadcastable for attention

def create_look_ahead_mask(size):
    mask = torch.triu(torch.ones((size, size)), diagonal=1)
    return mask == 1  # boolean mask

def create_decoder_mask(tgt_seq, pad_token_id=0):
    tgt_seq_len = tgt_seq.size(1)

    look_ahead_mask = create_look_ahead_mask(tgt_seq_len).to(tgt_seq.device)  # (tgt_seq_len, tgt_seq_len)
    padding_mask = create_padding_mask(tgt_seq, pad_token_id)  # (batch_size, 1, 1, tgt_seq_len)

    combined_mask = look_ahead_mask.unsqueeze(0).unsqueeze(0) | padding_mask
    return combined_mask  # (batch_size, 1, tgt_seq_len, tgt_seq_len)


In [19]:
def shift_right(tgt_batch, pad_token_id=0, bos_token_id=2):
    shifted = torch.full_like(tgt_batch, pad_token_id)
    shifted[:, 1:] = tgt_batch[:, :-1]
    shifted[:, 0] = bos_token_id  # Start with BOS token
    return shifted


In [26]:
import torch
import torch.optim as optim
import torch.nn.functional as F
from tqdm import tqdm

def shift_right(tgt_batch, pad_token_id=0, bos_token_id=2):
    shifted = torch.full_like(tgt_batch, pad_token_id)
    shifted[:, 1:] = tgt_batch[:, :-1]
    shifted[:, 0] = bos_token_id  # Start with BOS token
    return shifted

def train_model(model, train_dataloader, num_epochs=10, lr=3e-4, device='cuda', pad_token_id=0, bos_token_id=2):
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)

    for epoch in tqdm(range(num_epochs)):
        model.train()
        epoch_loss = 0

        for batch in tqdm(train_dataloader, desc="Training", leave=False):
            src_batch = batch['input_ids'].to(device)  # shape: (B, src_len)
            tgt_batch = batch['labels'].to(device)     # shape: (B, tgt_len)

            # Shift target right for decoder input
            tgt_input = shift_right(tgt_batch, pad_token_id, bos_token_id).to(device)

            # Create masks
            src_mask = create_padding_mask(src_batch, pad_token_id).to(device)  # (B, 1, 1, src_len)
            tgt_mask = create_decoder_mask(tgt_input, pad_token_id).to(device)  # (B, 1, tgt_len, tgt_len)

            # Forward pass
            logits = model(src_batch, tgt_input, src_mask, tgt_mask)  # (B, tgt_len, vocab_size)

            # Compute loss
            loss = F.cross_entropy(
                logits.view(-1, logits.size(-1)),  # (B * tgt_len, vocab_size)
                tgt_batch.view(-1),                # (B * tgt_len)
                ignore_index=pad_token_id
            )

            # Backprop
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss / len(train_dataloader):.4f}")

    return model


In [27]:
# Hyperparams
num_layers = 6
d_model = 512
num_heads = 8
d_ff = 2048
src_vocab_size = 32000  # depends on your tokenizer
tgt_vocab_size = 32000
max_len = 512

# 1. Instantiate model
model = Transformer(num_layers, d_model, num_heads, d_ff, src_vocab_size, tgt_vocab_size, max_len)
model = model.to('cuda')

In [None]:
# Assuming your model is already defined (e.g., transformer_model)
trained_model = train_model(model, train_dataloader, num_epochs=10, lr=3e-4, device='cuda')


 10%|█         | 1/10 [00:57<08:33, 57.08s/it]

Epoch 1/10, Loss: 1.4747


 20%|██        | 2/10 [01:53<07:35, 56.98s/it]

Epoch 2/10, Loss: 0.0807


