# Pre Requisite

In [None]:
import numpy as np
import os
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
%matplotlib inline
from IPython.display import clear_output

In [3]:
import spacy
!python -m spacy download de_core_news_sm -q # Specific tokenizer

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.6/14.6 MB[0m [31m51.6 MB/s[0m eta [36m0:00:00[0m
[?25h[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('de_core_news_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m23.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m755.5/755.5 MB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.7/4.7 MB[0m [31m71.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m410.6/410.6 MB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━

# Data Preprocessing and Visualization
Dataset is imported from torchtext (Multi30K).

In [1]:
!pip install torchtext==0.17.0 -q
!pip install 'portalocker>=2.0.0' -q # Required libraries. If throwing error even after install, restart the session (Libraries are kept even after restart).
from torchtext.datasets import Multi30k

# Importing the dataset

batch_size=32

train_iter, val_iter = Multi30k(split=('train', 'valid'), language_pair=('de', 'en'))# Dataset
from torchtext.data.functional import to_map_style_dataset # Map style gives no warning messages.
train_iter, val_iter= to_map_style_dataset(train_iter), to_map_style_dataset(val_iter)



Text Visualization

In [30]:
# Number of datapoints

def get_size(iter):
  return  sum([len(z) for i,z in iter])

print(f'Number of datapoints in train set: {get_size(train_iter)},Number of datapoints in train set:{get_size(val_iter)}')

# Number of characters

def chars(iter):
  char_en=[]
  char_de=[]
  return set(k for i, z in iter for j in i for k in j),set(k for i, z in iter for j in z for k in j)

char_de,char_en= chars(train_iter+ val_iter)
print(f'number of english characters:{len(char_en)},number of german characters:{len(char_de)}')


# Frequency of characters

def chars(iter):
  char_en={}
  char_de={}
  for i,z in iter:
    idx1=[k for j in i for k in j]
    idx2=[k for j in z  for k in j ]
    for i in idx1:
      char_de[i]=char_de.get(i,0)+1
    for i in idx2:
      char_en[i]=char_en.get(i,0)+1
  return char_en, char_de

# chars(train_iter+val_iter) # Run this to take a look a the chars and their frequency

Number of datapoints in train set: 1772238,Number of datapoints in train set:62283
number of english characters:80,number of german characters:99


Let's check wordwise

In [31]:
def vocab(iter):
    vocab_en = {}
    vocab_de = {}

    for en_sentence, de_sentence in iter:
        # Process the German sentence
        for word in de_sentence.split():
            vocab_de[word] = vocab_de.get(word, 0) + 1

        # Process the English sentence
        for word in en_sentence.split():
            vocab_en[word] = vocab_en.get(word, 0) + 1

    # Sort the vocabularies by frequency
    sorted_vocab_en = sorted(vocab_en.items(), key=lambda x: -x[1])
    sorted_vocab_de = sorted(vocab_de.items(), key=lambda x: -x[1])

    return sorted_vocab_en, sorted_vocab_de

vocab_en,vocab_de= vocab(train_iter)
# clear_output
print(f'number of english words:{len(vocab_en)},number of german words:{len(vocab_de)}')
vocab_en[:20],vocab_de[:20]

number of english words:24889,number of german words:15456


([('Ein', 13901),
  ('einem', 13697),
  ('in', 11829),
  ('und', 8925),
  ('mit', 8816),
  ('auf', 8409),
  ('Mann', 7433),
  ('einer', 6747),
  ('Eine', 5932),
  ('ein', 4852),
  ('der', 4497),
  ('eine', 3972),
  ('Frau', 3895),
  ('die', 3606),
  ('einen', 3479),
  ('Zwei', 3175),
  ('im', 3079),
  ('an', 2569),
  ('von', 2360),
  ('dem', 2132)],
 [('a', 31704),
  ('A', 17457),
  ('in', 14830),
  ('the', 9922),
  ('on', 7810),
  ('is', 7521),
  ('and', 7375),
  ('man', 7165),
  ('of', 6859),
  ('with', 6171),
  ('are', 3714),
  ('woman', 3652),
  ('to', 3123),
  ('Two', 3116),
  ('at', 2905),
  ('wearing', 2616),
  ('people', 2348),
  ('white', 2104),
  ('young', 2055),
  ('his', 1969)])

Tokenizing is done with spacy. We add token ["\<bos>"] at the beginning of the sentence and ["\<eos>"] at the end.

# Tokenizer

In [None]:
max_length=80

In [None]:
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torch.utils.data import DataLoader
import spacy


# Load Spacy tokenizers
spacy_de = spacy.load('de_core_news_sm')
spacy_en = spacy.load('en_core_web_sm')

def tokenize_de(text):
    return [tok.text for tok in spacy_de.tokenizer(text)]

def tokenize_en(text):
    return [tok.text for tok in spacy_en.tokenizer(text)]

# Separate yield_tokens functions for German (source) and English (target)
def yield_tokens(data_iter, tokenizer, is_source=True):
    for src, tgt in data_iter:
        if is_source:
            yield tokenizer(src)  # German (source)
        else:
            yield tokenizer(tgt)  # English (target)

from torch.nn.utils.rnn import pad_sequence

def preprocess_sentence(sentence, vocab, tokenizer):
    tokens = tokenizer(sentence)
    tokens = [vocab['<bos>']] + [vocab[token] for token in tokens] + [vocab['<eos>']]
    if len(tokens) > max_length:
      tokens = tokens[:max_length-1] + [vocab['<eos>']]
    else:
      tokens += [vocab['<pad>']] * (max_length - len(tokens))
    return torch.tensor(tokens, dtype=torch.long)

def collate_fn(batch):
    src_batch, tgt_batch = [], []
    for src_sample, tgt_sample in batch:
      src_batch.append(preprocess_sentence(src_sample, vocab_de, tokenize_de))
      tgt_batch.append(preprocess_sentence(tgt_sample, vocab_en, tokenize_en))
    src_batch = torch.stack(src_batch)
    tgt_batch = torch.stack(tgt_batch)
    return src_batch, tgt_batch



In [None]:
# Build vocabularies for German (source) and English (target)
vocab_de = build_vocab_from_iterator(yield_tokens(train_iter, tokenize_de, is_source=True), specials=["<unk>", "<pad>", "<bos>", "<eos>"])
vocab_en = build_vocab_from_iterator(yield_tokens(train_iter, tokenize_en, is_source=False), specials=["<unk>", "<pad>", "<bos>", "<eos>"])

# Set default index to handle unknown tokens
vocab_de.set_default_index(vocab_de["<unk>"])
vocab_en.set_default_index(vocab_en["<unk>"])

bos_idx = vocab_de["<bos>"]
eos_idx = vocab_de["<eos>"]
pad_idx = vocab_de["<pad>"]


# DataLoader
train_loader = DataLoader(list(train_iter), batch_size=32, collate_fn=collate_fn)
valid_loader = DataLoader(list(val_iter), batch_size=32, collate_fn=collate_fn)



# GPT


## Model

In [None]:
n_embd=512
n_head= 8
n_layers= 6
dropout= 0.2

In [None]:
class maskless_head(nn.Module):
  def __init__(self,head_size):
    super().__init__()
    self.head_size= head_size
    self.key=nn.Linear(n_embd,head_size,bias=False)
    self.query=nn.Linear(n_embd,head_size,bias=False)
    self.value=nn.Linear(n_embd,head_size,bias=False)
    self.softmax=nn.Softmax(dim=-1)
    self.dropout= nn.Dropout(dropout)

  def forward(self,x):
    k=self.key(x) # b,T,N
    q=self.query(x) # b,T,N
    v=self.value(x) # b,T,N
    k = k.transpose(-2, -1)  # b,N,T
    wei= q @ k * self.head_size**-0.5 # b,T,T
    wei=self.softmax(wei)
    wei = self.dropout(wei)
    return wei @ v # b,T,N

class masked_head(nn.Module):
    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(max_length, max_length)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B,T,C = x.shape
        k = self.key(x)   # (B,T,C)
        q = self.query(x) # (B,T,C)
        # compute attention scores ("affinities")
        wei = q @ k.transpose(-2,-1) * C**-0.5 # (B, T, C) @ (B, C, T) -> (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1) # (B, T, T)
        wei = self.dropout(wei)
        # perform the weighted aggregation of the values
        v = self.value(x) # (B,T,C)
        out = wei @ v # (B, T, T) @ (B, T, C) -> (B, T, C)
        return out

class maskless_cross_head(nn.Module):
  def __init__(self,head_size):
    super().__init__()
    self.head_size= head_size
    self.key=nn.Linear(n_embd,head_size,bias=False)
    self.query=nn.Linear(n_embd,head_size,bias=False)
    self.value=nn.Linear(n_embd,head_size,bias=False)
    self.softmax=nn.Softmax(dim=-1)
    self.dropout= nn.Dropout(dropout)

  def forward(self,x,y):
    k=self.key(y) # b,T,N
    q=self.query(x) # b,T,N
    v=self.value(y) # b,T,N
    k = k.transpose(-2, -1) * self.head_size**-0.5 # b,N,T
    wei= q @ k # b,T,T
    wei=self.softmax(wei)
    wei = self.dropout(wei)
    return wei @ v # b,T,N

class FeedForward(nn.Module):
  def __init__(self, n_embd):
    super().__init__()
    self.net = nn.Sequential(
    nn.Linear(n_embd, 4 * n_embd),
    nn.ReLU(),
    nn.Linear(4 * n_embd, n_embd),
    nn.Dropout(dropout),
    )

  def forward(self, x):
        return self.net(x)

In [None]:
class MultiHeadAttention(nn.Module):
  def __init__(self, num_heads, head_size, masked='True'):
    super().__init__()
    if masked=='True':
      self.heads = nn.ModuleList([masked_head(head_size) for _ in range(num_heads)])
    elif masked=='cross':
      self.heads = nn.ModuleList([maskless_cross_head(head_size) for _ in range(num_heads)])
    else:
      self.heads = nn.ModuleList([maskless_head(head_size) for _ in range(num_heads)])
    self.proj = nn.Linear(n_embd, n_embd)
    self.dropout = nn.Dropout(dropout)

  def forward(self, x,y=None):
    if y is not None:
      out = torch.cat([h(x,y) for h in self.heads], dim=-1)
    else:
      out = torch.cat([h(x) for h in self.heads], dim=-1)
    out = self.dropout(self.proj(out))
    return out

In [None]:
class encoder_block(nn.Module):
  def __init__(self, n_embd, n_head):
    # n_embd: embedding dimension, n_head: the number of heads we'd like
    super().__init__()
    head_size = n_embd // n_head
    self.sa = MultiHeadAttention(n_head, head_size,masked='False')
    self.ffwd = FeedForward(n_embd)
    self.ln1 = nn.LayerNorm(n_embd)
    self.ln2 = nn.LayerNorm(n_embd)

  def forward(self, x):
    x = x + self.sa(self.ln1(x))
    x = x + self.ffwd(self.ln2(x))
    return x

class decoder_block(nn.Module):
  def __init__(self, n_embd, n_head):
    # n_embd: embedding dimension, n_head: the number of heads we'd like
    super().__init__()
    head_size = n_embd // n_head
    self.sa = MultiHeadAttention(n_head, head_size,masked='False')
    self.ca = MultiHeadAttention(n_head, head_size,masked='cross')
    self.ffwd = FeedForward(n_embd)
    self.ln1 = nn.LayerNorm(n_embd)
    self.ln2 = nn.LayerNorm(n_embd)
    self.ln3 = nn.LayerNorm(n_embd)
  def forward(self, x,encoder_output):
    x = x + self.sa(self.ln1(x))
    x=  x + self.ca(self.ln2(x),encoder_output)
    x = x + self.ffwd(self.ln3(x))
    return x

In [None]:
class PositionalEncoding(nn.Module):
  def __init__(self, model_dimension, dropout_probability, expected_max_sequence_length=max_length):
      super().__init__()
      self.dropout = nn.Dropout(p=dropout_probability)
      position_id = torch.arange(0, expected_max_sequence_length).unsqueeze(1)
      frequencies = torch.pow(10000., -torch.arange(0, model_dimension, 2, dtype=torch.float) / model_dimension)

      positional_encodings_table = torch.zeros(expected_max_sequence_length, model_dimension)
      positional_encodings_table[:, 0::2] = torch.sin(position_id * frequencies)  # sine on even positions
      positional_encodings_table[:, 1::2] = torch.cos(position_id * frequencies)  # cosine on odd positions
      self.register_buffer('positional_encodings_table', positional_encodings_table)

  def forward(self, embeddings_batch):
      assert embeddings_batch.ndim == 3 and embeddings_batch.shape[-1] == self.positional_encodings_table.shape[1], \
          f'Expected (batch size, max token sequence length, model dimension) got {embeddings_batch.shape}'

      positional_encodings = self.positional_encodings_table[:embeddings_batch.shape[1]]
      return self.dropout(embeddings_batch + positional_encodings)



In [None]:
class Transformer(nn.Module):
  def __init__(self, n_embd, n_head, n_layers, max_length, dropout=0.1):
    super().__init__()
    self.encoding= nn.Embedding(len(vocab_de),n_embd)
    self.decoding= nn.Embedding(len(vocab_en),n_embd)
    self.encoder = nn.ModuleList([encoder_block(n_embd, n_head) for _ in range(n_layers)])
    self.decoder = nn.ModuleList([decoder_block(n_embd, n_head) for _ in range(n_layers)])
    self.src_pos_embedding = PositionalEncoding(n_embd, dropout)
    self.trg_pos_embedding = PositionalEncoding(n_embd, dropout)
    self.dropout = nn.Dropout(dropout)
    self.fc_out = nn.Linear(n_embd, len(vocab_en))
    self.softmax = nn.Softmax(dim=-1)
    self.init_weights()
  def init_weights(self):
    for p in self.parameters():
      if p.dim() > 1:
        nn.init.xavier_uniform_(p)

  def forward(self, x, y):
    x = self.encoding(x)
    y = self.decoding(y)
    x = self.src_pos_embedding(x)
    y = self.trg_pos_embedding(y)

    for enc_block in self.encoder:
        x = enc_block(x)
    # Decoder
    for dec_block in self.decoder:
        y = dec_block(y, x)

    # Final linear layer
    out = self.fc_out(y)
    return out


## Training Loop

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model= Transformer(n_embd,n_head,n_layers,max_length)
model.to(device)
optimizer= torch.optim.Adam(model.parameters(),lr=1e-4)

In [None]:
loss_fn  = torch.nn.CrossEntropyLoss()
epochs=10

In [None]:
save_path = '/content/drive/MyDrive/model_checkpoints'
checkpoint_file = f'{save_path}/model_checkpoint.pt'
if os.path.exists(checkpoint_file):
  os.remove(checkpoint_file)
torch.save(model.state_dict(), checkpoint_file)
for epoch in range(epochs):
  model.train()  # Set model to training mode
  total_loss = 0
  for batch, (src, tgt) in enumerate(train_loader):
    optimizer.zero_grad()
    src, tgt = src.to(device), tgt.to(device)  # Move data to GPU if available

    # Forward pass through the model
    logits = model(src, tgt)
    logits = logits.view(-1, logits.size(-1))

    # Flatten target to [batch_size * sequence_length]
    tgt = tgt.view(-1)

    loss = loss_fn(logits, tgt)

    # Backpropagation and optimization step
    optimizer.zero_grad()  # Clear previous gradients
    loss.backward()        # Compute gradients
    optimizer.step()       # Update weights

    # Accumulate total loss for reporting
    total_loss += loss.item()

    # Epoch-level reporting
    print(f'Epoch {epoch+1}: Batch= {batch} Loss: {(loss.item()):.4f}')
  avg_loss = total_loss / len(train_loader)
  print(f'Epoch {epoch+1}: Loss: {avg_loss:.4f}')
  if os.path.exists(checkpoint_file):
    os.remove(checkpoint_file)
  torch.save(model.state_dict(), checkpoint_file)


# Inference

In [None]:
save_path = '/content/drive/MyDrive/model_checkpoints'
checkpoint_file = f'{save_path}/model_checkpoint.pt'
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model= Transformer(n_embd,n_head,n_layers,max_length)
model.to(device)
model.load_state_dict(torch.load(checkpoint_file,map_location=device))

<All keys matched successfully>

In [None]:
k='Hey Alter, wie geht es dir? '
k=preprocess_sentence(k, vocab_de, tokenize_de).unsqueeze(0).to(device)

In [None]:
start_token=vocab_de["<bos>"]
end_token = vocab_de["<eos>"]
def generate(model, src, start_token, max_len, device):
    model.eval()
    src = src.to(device)
    target = torch.tensor([[start_token]], device=device)

    with torch.no_grad():
      for _ in range(max_len):
        # Pass the source and current target through the model
        logits = model(src, target)
        # Get the predicted next token (highest probability)
        next_token = logits[:, -1, :].argmax(dim=-1, keepdim=True)

        # Append the predicted token to the target sequence
        target = torch.cat([target, next_token], dim=1)

        # Stop if end token is generated
        if next_token.item() == end_token:
          break
    return target

target_sequence = generate(model, k, start_token, max_length, device)


In [None]:
with model.eval():  # Set model to training mode
  total_loss = 0
  for batch, (src, tgt) in enumerate(valid_loader):
    optimizer.zero_grad()
    src, tgt = src.to(device), tgt.to(device)  # Move data to GPU if available

    # Forward pass through the model
    logits = model(src, tgt)
    logits = logits.view(-1, logits.size(-1))

    # Flatten target to [batch_size * sequence_length]
    tgt = tgt.view(-1)

    loss = loss_fn(logits, tgt)

    # Accumulate total loss for reporting
    total_loss += loss.item()
    # Epoch-level reporting
    print(f' Batch= {batch} Loss: {(loss.item()):.4f}')
  avg_loss = total_loss / len(train_loader)
  print(f'Total Avg_loss =Loss: {avg_loss:.4f}')