In [205]:
!pip install alive-progress



### Libraries

In [206]:
import math
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from collections import defaultdict

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

from alive_progress import alive_bar

import warnings
warnings.filterwarnings('ignore')

### Positional Encoding
* Currenyly using sine, cosine functions as mentioned by Vasmari et al in paper.

  $PE_{(pos, 2i)} = sin(\frac{pos}{10000^{\frac{2i}{d_{model}}}})$

  $PE_{(pos, 2i+1)} = cos(\frac{pos}{10000^{\frac{2i}{d_{model}}}})$

  where, $d_{model}$ is **dimension** of model.

* Try other encoding techniques such as NoPE, Rotary PE (RoPE), Relative, and Learned PE.

In [207]:
class PositionalEncoding:
  def __init__(self, dModel, maxLen=500):
    self.encoding = torch.zeros(maxLen, dModel)
    position = torch.arange(0, maxLen, dtype=torch.float).unsqueeze(1)
    div_term = torch.exp(torch.arange(0, dModel, 2).float() * (-np.log(10000.0) / dModel))

    self.encoding[:, 0::2] = torch.sin(position * div_term)
    self.encoding[:, 1::2] = torch.cos(position * div_term)
    self.encoding = self.encoding.unsqueeze(0)

  def forward(self, x):
    return x + self.encoding[:, :x.size(1), ].to(x.device)

### Multi-Head Attention

In [208]:
def scaled_dot_product_attention(Q, K, V, mask=None):
  d_k = Q.size(-1)
  #K.transpose(-2, -1)
  scores = torch.matmul(Q, K.transpose(-1, -2))

  if mask is not None: scores = scores.masked_fill(mask == 0, float(-1e20))

  scores /= np.sqrt(d_k)

  attention = nn.functional.softmax(scores, dim=-1)
  output = torch.matmul(attention, V)

  return output, attention

In [209]:
class MultiHeadAttention(nn.Module):
  def __init__(self, dModel, numHeads):
    super(MultiHeadAttention, self).__init__()
    self.dModel = dModel
    self.numHeads = numHeads

    assert dModel % numHeads == 0, "dModel should be divisible by numHeads"
    self.depth = dModel // self.numHeads
    self.Wq = nn.Linear(dModel, dModel, bias=False)
    self.Wk = nn.Linear(dModel, dModel, bias=False)
    self.Wv = nn.Linear(dModel, dModel, bias=False)
    self.fc = nn.Linear(dModel, dModel, bias=False)

  def split_head(self, x, batchSize, seqLength):
    x = x.view(batchSize, seqLength, self.numHeads, self.depth)
    return x.transpose(1, 2)

  def forward(self, Q, K, V, mask=None):
    batchSize = K.size(0)
    seqLength = K.size(1)
    seqLengthQuery = Q.size(1)

    Q = self.Wq(Q)
    K = self.Wk(K)
    V = self.Wv(V)

    Q = self.split_head(Q, batchSize, seqLengthQuery)
    K = self.split_head(K, batchSize, seqLength)
    V = self.split_head(V, batchSize, seqLength)

    attOutput, _ = scaled_dot_product_attention(Q, K, V, mask)
    attOutput = attOutput.transpose(1, 2).contiguous().view(batchSize, seqLengthQuery, self.dModel)

    output = self.fc(attOutput)
    return output

### Feed-Forward

In [210]:
class FeedForward(nn.Module):
  def __init__(self, dModel, dFF):
    super(FeedForward, self).__init__()
    self.fc1 = nn.Linear(dModel, dFF)
    self.fc2 = nn.Linear(dFF, dModel)
    self.relu = nn.ReLU()

  def forward(self, x):
    return self.fc2(self.relu(self.fc1(x)))

### Transformer Encoder-Layer

In [211]:
class EncoderLayer(nn.Module):
  def __init__(self, dModel, numHeads, dFF):
    super(EncoderLayer, self).__init__()
    self.mha = MultiHeadAttention(dModel, numHeads)
    self.ffn = FeedForward(dModel, dFF)

    self.layerNorm1 = nn.LayerNorm(dModel)
    self.layerNorm2 = nn.LayerNorm(dModel)

    self.dropout1 = nn.Dropout(0.1)
    self.dropout2 = nn.Dropout(0.1)

  def forward(self, x, mask):
    attOutput = self.mha(x, x, x, mask)
    out1 = self.dropout1(self.layerNorm1(attOutput + x))

    ffnOutput = self.ffn(out1)
    out2 = self.dropout2(self.layerNorm2(out1 + ffnOutput))

    return out2

In [212]:
class TransformerEncoder(nn.Module):
  def __init__(self, numLayers, dModel, numHeads, dFF, input_vocab_size, maxLen, device):
    super(TransformerEncoder, self).__init__()
    self.dModel = dModel
    self.embedding = nn.Embedding(input_vocab_size, dModel)
    self.posEncoding = PositionalEncoding(dModel, maxLen)

    self.encLayers = nn.ModuleList([EncoderLayer(dModel, numHeads, dFF) for _ in range(numLayers)])
    self.dropout = nn.Dropout(0.1)

  def forward(self, x, mask):
    x = self.embedding(x) * np.sqrt(self.dModel)
    x = self.posEncoding.forward(x)

    for layer in self.encLayers:
      x = layer(x, mask)

    return x

### Transformer Decoder-Layer

In [213]:
class DecoderLayer(nn.Module):
  def __init__(self, dModel, numHeads, dFF):
    super(DecoderLayer, self).__init__()
    self.mhaMask = MultiHeadAttention(dModel, numHeads)
    self.mha = MultiHeadAttention(dModel, numHeads)
    self.ffn = FeedForward(dModel, dFF)

    self.layerNorm1 = nn.LayerNorm(dModel)
    self.layerNorm2 = nn.LayerNorm(dModel)
    self.layerNorm3 = nn.LayerNorm(dModel)

    self.dropout1 = nn.Dropout(0.1)
    self.dropout2 = nn.Dropout(0.1)
    self.dropout3 = nn.Dropout(0.1)

  def forward(self, x, encOutput, srcMask, trgMask):
    attOutput = self.mhaMask(x, x, x, trgMask)
    out1 = self.dropout1(self.layerNorm1(attOutput + x))

    attOutput = self.mha(x, encOutput, encOutput, srcMask)
    out2 = self.dropout2(self.layerNorm2(out1 + attOutput))

    ffnOutput = self.ffn(out2)
    out3 = self.dropout3(self.layerNorm3(out2 + ffnOutput))

    return out3

In [214]:
class TransformerDecoder(nn.Module):
  def __init__(self, numLayers, dModel, numHeads, dFF, output_vocab_size, maxLen, device):
    super(TransformerDecoder, self).__init__()

    self.dModel = dModel

    self.embedding = nn.Embedding(output_vocab_size, dModel)
    self.posEncoding = PositionalEncoding(dModel, maxLen)

    self.decLayers = nn.ModuleList([DecoderLayer(dModel, numHeads, dFF) for _ in range(numLayers)])

  def forward(self, x, encOutput, srcMask, trgMask):
    x = self.embedding(x) * np.sqrt(self.dModel)
    x = self.posEncoding.forward(x)

    for layer in self.decLayers:
      x = layer(x, encOutput, srcMask, trgMask)

    return x

### Transformer Assemble

In [215]:
class Transformer(nn.Module):
  def __init__(self, numEncLayers, numDecLayers, dModel, numHeads, dFF, input_vocab_size, output_vocab_size, maxLen ,device):
    super(Transformer, self).__init__()
    self.encoder = TransformerEncoder(numEncLayers, dModel, numHeads, dFF, input_vocab_size, maxLen, device)
    self.decoder = TransformerDecoder(numDecLayers, dModel, numHeads, dFF, output_vocab_size, maxLen, device)
    self.fcOut = nn.Linear(dModel, output_vocab_size)

    self.device = device

  def create_padding_mask(self, seq):
    return (seq == 0).unsqueeze(1).unsqueeze(2)

  def create_look_ahead_mask(self, size):
    mask = torch.triu(torch.ones(size, size), diagonal=1).type(torch.uint8)
    return mask

  def forward(self, src, trg):
    srcMask = self.create_padding_mask(src)
    trgMask = self.create_padding_mask(trg) & self.create_look_ahead_mask(trg.size(1))

    encOutput = self.encoder(src, srcMask)
    decOutput = self.decoder(trg, encOutput, srcMask, trgMask)
    output = self.fcOut(decOutput)

    print(output, end='\n\n')

    output = nn.functional.softmax(output)

    return output

### Masking

In [216]:
def create_padding_mask(seq):
  return (seq == 0).unsqueeze(1).unsqueeze(2)

def create_look_ahead_mask(size):
  mask = torch.triu(torch.ones(size, size), diagonal=1).type(torch.uint8)
  return mask

### Data Pre-Processing

In [217]:
data = [
    ("I love programming", "J'aime programmer"),
    ("He is reading a book", "Il lit un livre"),
    ("She is writing a letter", "Elle écrit une lettre"),
    ("The weather is nice today", "Il fait beau aujourd'hui"),
    ("They are playing football", "Ils jouent au football"),
    ("Hello There", "bonjour"),
    ("How are you", "comment vas-tu"),
    ("It is cold today", "il fait froid aujourd'hui")]

In [218]:
def tokenize(text):
  return text.lower().split()

def build_vocab(sentences):
  vocab = defaultdict(lambda: len(vocab))
  vocab['<pad>'] = 0
  vocab['<sos>'] = 1
  vocab['<eos>'] = 2

  for sentence in sentences:
    for word in tokenize(sentence):
      vocab[word]

  return vocab

englishSentences, frenchSentences = zip(*data)
englishSentences = build_vocab(englishSentences)
frenchSentences = build_vocab(frenchSentences)

In [219]:
def encode(sentence, vocab):
  return [vocab[word] for word in tokenize(sentence)] + [vocab['<eos>']]

def pad_sequences(sequences, pad_value=0):
    max_len = max(len(seq) for seq in sequences)
    return [seq + [pad_value] * (max_len - len(seq)) for seq in sequences]

encodedData = [(encode(en, englishSentences), encode(fr, frenchSentences)) for en, fr in data]

englishEncoded, frenchEncoded = zip(*encodedData)
englishEncoded = pad_sequences(englishEncoded)
frenchEncoded = pad_sequences(frenchEncoded)

In [220]:
print("English Vocab : ", englishSentences)
print("French Vocab : ", frenchSentences)

English Vocab :  defaultdict(<function build_vocab.<locals>.<lambda> at 0x78d7a17b2e60>, {'<pad>': 0, '<sos>': 1, '<eos>': 2, 'i': 3, 'love': 4, 'programming': 5, 'he': 6, 'is': 7, 'reading': 8, 'a': 9, 'book': 10, 'she': 11, 'writing': 12, 'letter': 13, 'the': 14, 'weather': 15, 'nice': 16, 'today': 17, 'they': 18, 'are': 19, 'playing': 20, 'football': 21, 'hello': 22, 'there': 23, 'how': 24, 'you': 25, 'it': 26, 'cold': 27})
French Vocab :  defaultdict(<function build_vocab.<locals>.<lambda> at 0x78d7a17b2a70>, {'<pad>': 0, '<sos>': 1, '<eos>': 2, "j'aime": 3, 'programmer': 4, 'il': 5, 'lit': 6, 'un': 7, 'livre': 8, 'elle': 9, 'écrit': 10, 'une': 11, 'lettre': 12, 'fait': 13, 'beau': 14, "aujourd'hui": 15, 'ils': 16, 'jouent': 17, 'au': 18, 'football': 19, 'bonjour': 20, 'comment': 21, 'vas-tu': 22, 'froid': 23})


In [221]:
englishTensor = torch.tensor(englishEncoded)
frenchTensor = torch.tensor(frenchEncoded)

batchSize = 4
dataset = TensorDataset(englishTensor, frenchTensor)
dataLoader = DataLoader(dataset, batchSize, shuffle=True)

### Model Configuration

In [222]:
dModel = 32
numHeads = 4
dFF = 16
numEncLayers = 4
numDecLayers = 4
input_vocab_size = len(englishSentences)
output_vocab_size = len(frenchSentences)
maxLen = 50

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = Transformer(numEncLayers=numEncLayers,
                    numDecLayers=numDecLayers,
                    dModel=dModel,
                    dFF=dFF,
                    numHeads=numHeads,
                    input_vocab_size=input_vocab_size,
                    output_vocab_size=output_vocab_size,
                    maxLen=maxLen,
                    device=device)

In [223]:
model

Transformer(
  (encoder): TransformerEncoder(
    (embedding): Embedding(28, 32)
    (encLayers): ModuleList(
      (0-3): 4 x EncoderLayer(
        (mha): MultiHeadAttention(
          (Wq): Linear(in_features=32, out_features=32, bias=False)
          (Wk): Linear(in_features=32, out_features=32, bias=False)
          (Wv): Linear(in_features=32, out_features=32, bias=False)
          (fc): Linear(in_features=32, out_features=32, bias=False)
        )
        (ffn): FeedForward(
          (fc1): Linear(in_features=32, out_features=16, bias=True)
          (fc2): Linear(in_features=16, out_features=32, bias=True)
          (relu): ReLU()
        )
        (layerNorm1): LayerNorm((32,), eps=1e-05, elementwise_affine=True)
        (layerNorm2): LayerNorm((32,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace=False)
      )
    )
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (decoder): TransformerDecode

### Training

In [224]:
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

epochs = 20

for epoch in range(epochs):
  model.train()
  for batchIdx, (src, trg) in enumerate(dataLoader):
    src = src.to(device)
    trg = trg.to(device)

    trgInput = trg[:, :-1]
    trgOutput = trg[:, 1:]

    output = model(src, trgInput)

    output = output.view(-1, output.size(-1))
    trgOutput = trgOutput.reshape(-1)

    loss = criterion(output, trgOutput)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    print(f"Epoch: {epoch+1}/{epochs}, Batch: {batchIdx+1}/{len(dataLoader)}, Loss: {loss.item():.4f}")
print("Training Complete ...")

tensor([[[-7.3945e-01,  7.4559e-01, -1.1533e+00,  7.1337e-01, -1.2451e+00,
           2.5245e-01,  5.0075e-01,  2.3887e-02,  3.5893e-02, -1.9543e-01,
          -2.0873e-01, -1.5153e-01,  8.6185e-02, -5.9252e-01,  7.5856e-01,
           8.2100e-01, -6.4644e-01,  6.0700e-01,  9.9908e-01,  5.5372e-01,
          -5.1725e-03, -9.1987e-01,  6.7001e-01,  2.1263e-01],
         [ 8.6828e-01,  1.4428e-01,  4.8738e-01,  5.6799e-01,  4.6057e-01,
           1.4450e-01,  1.1608e+00, -5.7292e-01,  4.4193e-01,  2.3967e-01,
          -1.7045e+00, -3.5985e-01,  3.5083e-01, -1.1928e-01,  8.9561e-01,
           7.9705e-01, -3.1114e-02,  3.8302e-01,  4.0519e-01,  1.5170e-01,
          -5.4727e-01, -5.3517e-01,  6.5746e-01, -4.2667e-01],
         [ 1.7915e-02,  6.0742e-01, -1.3023e-01,  1.6293e+00,  5.0089e-01,
           4.7443e-01,  2.4977e-01, -1.0667e-01, -7.2648e-01, -1.8077e-01,
          -6.7239e-01,  5.1723e-01, -6.0003e-01, -7.7119e-01, -2.0644e-01,
           2.2878e-01,  5.1884e-01,  4.3767e-01, 

### Testing

In [225]:
print(model(torch.tensor([encode("Hello There", englishSentences)]), torch.tensor([[frenchSentences["<sos>"]]])))

tensor([[[-0.0403, -0.2333, -0.2239,  0.3639,  0.2312, -0.3547,  0.9894,
           0.5328,  0.1991, -0.3148, -0.2548, -0.6133,  0.0955,  0.3766,
           0.6947,  0.0655, -0.0602, -1.3956, -0.7032, -0.7086,  1.0009,
           0.7343,  1.5533, -0.1741]]], grad_fn=<ViewBackward0>)

tensor([[[1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
          1., 1., 1., 1., 1., 1., 1.]]], grad_fn=<SoftmaxBackward0>)


In [226]:
def translate_sentence(sentence, model, englishVocab, frenchVocab, maxLen):
  sentence = sentence.lower()
  src = torch.tensor([encode(sentence, englishVocab)])

  tgt = torch.tensor([[frenchVocab["<sos>"]]])
  for _ in range(maxLen):
    output = model(src, tgt)
    next_token = output[:, -1, :].argmax(dim=-1).item()
    tgt = torch.cat([tgt, torch.tensor([[next_token]])], dim=1)

    if next_token == frenchVocab["<eos>"]:
      break

  return ' '.join([list(frenchVocab.keys())[list(frenchVocab.values()).index(token)] for token in tgt.squeeze().tolist() if token not in [0, 1, 2]])

# Test translation
test_sentence = "Hello There"
translated_sentence = translate_sentence(test_sentence, model, englishSentences, frenchSentences, maxLen)
print(f"Translated: {translated_sentence}")

tensor([[[-0.6460, -0.2255, -0.2390, -0.7922, -0.4555, -0.4167,  0.0076,
           0.8119, -0.0116, -0.1733,  0.5297, -0.0560,  0.6500, -0.0548,
           0.8035,  0.7293, -0.2935, -1.4795, -1.3266, -0.2296,  0.1399,
          -0.1391,  0.6092, -0.3466]]], grad_fn=<ViewBackward0>)

tensor([[[-0.4659, -0.8422,  0.9503,  0.1612, -0.3231,  0.4724,  1.4117,
           0.4566, -0.7057,  0.2555,  0.4873, -1.0275,  0.4824, -0.0060,
           0.8288, -0.1376,  0.9287, -0.9788, -0.4692, -0.0156,  0.9938,
          -0.7484,  0.3822,  0.5110],
         [ 1.0195,  0.3304,  0.6015,  1.0706,  0.6557,  0.3927,  0.3331,
          -0.1594, -0.7832,  0.5426, -0.3874, -0.3255, -0.4922, -0.6094,
           0.1611,  0.1100,  0.0841,  0.2036, -0.5379, -0.0943,  0.5593,
          -0.4871,  0.5552,  0.1041]]], grad_fn=<ViewBackward0>)

tensor([[[ 0.4786,  0.3509,  0.6037,  0.3528, -0.6324,  0.7701,  0.5316,
           1.2855, -0.6400, -0.0775, -0.5994, -1.3496,  0.1882, -0.3886,
           0.9355,  0.9081,