<a href="https://colab.research.google.com/github/Lmalviya/machineTranslationTask/blob/main/TransformerFromScratch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## scop of the notebook:
  1. built vanila transformer from scretch
  2. train it for languge tranlation english to italian


# Build Transformer from scretch

In [1]:
import torch
import torch.nn as nn

import math

### Word embedding

In [None]:
class WordEmbedding(nn.Module):
  def __init__(self, d_model: int, vocab_size: int):
    super().__init__()
    # d_model: it is the embedding vector size
    # vocab_size: number of words present into vocab

    self.d_model = d_model
    self.vocab_size = vocab_size
    self.embedding = nn.Embedding(self.vocab_size, self.d_model)

  def forward(self, x):
    return self.embedding(x)*math.sqrt(self.d_model)  ## mention in the paper "sqrt(self.d_model)"


### Postional Embedding

In [None]:
class PostionalEmbedding(nn.Module):
  def __init__(self, d_model: int, seq_len: int, dropout_p: float):
    super().__init__()
    # seq_len: number of tokens present in the input
    # dropout: used for regularization

    self.d_model = d_model
    self.seq_len = seq_len
    self.dropout = nn.Dropout(dropout_p)

    # create a matrix of shape (seq_len, d_model)
    self.pe = torch.zeros(self.seq_len, self.d_model) # postional embedding for each token

    # create vector of shape (seq_len, 1)
    position = torch.arrang(0, seq_len, dtype=torch.float).unsqueeze(1)
    div_term = torch.exp(torch.arrang(0, d_model, 2).float()*(-math.log(10000.0)/d_model) )

    # apply sin at even postion
    self.pe[:, 0::2] = math.sin(position * div_term)
    self.pe[:, 1::2] = math.cos(position * div_term)

    # now we have to add the batch dim
    self.pe = self.pe.unsqueeze(0) # (1, seq_len, d_model)

    # now define the tensor into buffer
    # when you have tensor which is not learnable parameter
    # but you want to save when model is save, then you have to put it in register buffer
    self.register_buffer('pe', self.pe)

  def forward(self, x):
    x = x + (self.pe[:, :x.shape(0), :]).requires_grad(False)
    return self.dropout(x)


### Layer normalization

In [None]:
class LayerNorm(nn.Module):
  def __init__(self, eps:float = 10**-6):
    super().__init__()
    # We also introduce two parameters, usually called gamma (multiplicative) and beta (additive)
    # that introduce some fluctuations in the data, because maybe having all values between 0 and 1
    # may be too restrictive for the network. The network will learn to tune these two parameters to
    # introduce fluctuations when necessary.

    self.eps = eps
    self.gamma = nn.Parameter(torch.once(1))  # multiplicative
    self.beta = nn.Parameter(torch.zeros(1)) # Additative

  def forward(self, x):
    mean = x.mean(dim=-1, keepdim=True) #usually mean() remove the dim, to keep the dim we used the keepdim=True
    std = x.std(dim=-1, keepdim=True)
    return self.gamm*(x - mean)/(std + self.eps) + self.beta



### Feed forward block

In [None]:
class FeedForwardBlock(nn.Module):
  def __init__(self, d_model: int, d_ff: int, dropout_p: float =0.1):
    self.linear_1 = nn.Linear(d_model, d_ff) #W1 and B1
    self.linear_2 = nn.Linear(d_ff, d_model) #W2 and B2

    self.relu = nn.ReLU()
    self.dropout = nn.Dropout(dropout_p)

  def forward(self, x):
    # input: (batch, seq_len, d_model) --> (batch, seq_len, d_ff) --> (batch, seq_len, d_model) output
    return self.linear_2(self.dropout(self.relu(self.linear_1(x))))


### Multi-head Attention

In [None]:
class MultiheadAttentionBlock(nn.Model):
  def __init__(self):
    super().__init__(self, d_model: int, head: int, dropout_p: float):
    # we take input and replecate into three vector each vector name as follows: key, query, value
    # we three matrix caled: key_mul, query_mul, val_mul
    # multiply each vectoer with its corresponding matrix and get output which we called key_hat_mat, query_hat_mat, value_hat_mat
    # each vecter key_hat_mat, query_hat_mat, value_hat_mat divide alog the d_model dim
    # means each head full access of the sequence but different part of each word

    self.head = head
    self.d_model = d_model
    self.attention_score = None
    assert d_model % head != 0,  'd_model is not divisible by head'

    self.d_k = self.d_model // self.head
    self.w_q = nn.Linear(d_model, d_model)
    self.w_k = nn.Linear(d_model, d_model)
    self.w_v = nn.Linear(d_model, d_model)

    self.w_o = nn.Linear(d_model, d_model)
    self.dropout = nn.Dropout(dropout_p)

  @staticmethod
  def attention(query, key, value, mask, dropout: nn.Dropout):
    d_k = query.shape[-1]

    attention_score = (query @ key.transpose(-2, -1))/math.sqrt(d_k)
    if mask is not None:
      attention_score.maksed_fill(mask == 0, -1e9) # In the mask where mask[i][j] == 0, replace -1e9 in the attention_score

    attention_score = attention_score.softmax(dim = -1) # (batch, h, seq_len, seq_len)
    if dropout is not None:
      attention_score = nn.Dropout(attention_score)

    return (attention_score @ value), attention_score

  def forward(self, q, k, v, mask): #mask used to restrick some words to interect with other words
    query = self.w_q(q) # (batch, seq_len, d_model) --> (batch, seq_len, d_model)
    key  = self.w_k(k)
    value = self.w_v(v)

    # divid for multi head
    # (batch, seq_len, d_model) --> (batch, seq_len, head, d_k) --> (batch, head, seq_len, d_k)
    query = query.view(query.shape[0], query.shape[1], self.head, self.d_k).transpose(1, 2)
    key = key.view(key.shape[0], key.shape[1], self.head, self.d_k).transpose(1, 2)
    value = value.view(value.shape[0], value.shape[1], self.head, self.d_k).transpose(1, 2)

    x, self.attention_score  = MultiheadAttentionBlock.attention(query, key, value, self.mask, self.dropout)

    # (batch, head, seq_len, d_k) --> (batch, seq_len, head, d_k) --> (batch, seq_len, d_model)
    x = x.transpose(1, 2).contiguous().view(x.shape[0], -1, self.head*self.d_k) # contiguous() used, because we want contiguous memory allocation

    # (batch, seq_len, d_model) --> (batch, seq_len, d_model)
    return self.w_o(x)


## Residual connection

In [None]:
class ResidualConnection(nn.Module):
  def __init__(self, dropout_p: float):
    super().__init__()
    self.dropout = nn.Dropout(dropout_p)
    self.norm = LayerNorm()

  def forward(self, x, sublayer):
    return x + self.dropout(sublayer(self.norm(x)))

## Encoder Block

In [None]:
class EncoderBlock(nn.Module):
  def __init__(self, self_attention_block: MultiheadAttentionBlock, feed_forward_block: FeedForwardBlock, dropout_p: float):
    super().__init__()
    self.self_attention_block  = self_attention_block,
    self.feed_forward_block = feed_forward_block
    self.residualConnection = nn.ModuleList([ResidualConnection(dropout_p) for _ in range(2)])

  def forward(self, x, src_mask):
    x = self.residualConnection[0](x, lambda x: self.self_attention_block(x, x, x, src_mask))
    x = self.residualConnection[1](x, self.feed_forward_block)
    return x


## Encoder

In [None]:
class Encoder(nn.Module):
  def __init__(self, layers: nn.ModuleList):
    self.layers = layers
    self.norm = LayerNorm()

  def forward(self, x, mask):
    for layer in self.layers:
      x = layer(x, mask)
    return self.norm(x)


## Decoder Block

In [None]:
class DecoderBlock(nn.Module):
  def __init__(self, self_attention_block: MultiheadAttentionBlock, cross_attention_block: MultiheadAttentionBlock, feed_forward_block: FeedForwardBlock, dropout_p: float):
    self.self_attention_block = self_attention_block
    self.cross_attention_block = cross_attention_block
    self.feed_forward_block = feed_forward_block
    self.residualConnection = nn.ModuleList([ResidualConnection(dropout_p) for _ in range(3)])
    self.dropout = nn.Dropout(dropout_p)

  def forward(self, x, encoder_output, src_mask, tgt_mask):
    # src_mask: it is for source language
    # tgt_mask: it is for target language

    x = self.residualConnection[0](x, lambda x: self.self_attention_block(x, x, x, tgt_mask))
    x = self.residualConnection[1](x, lambda x: self.cross_attention_block(x, encoder_output, encoder_output, src_mask)) # query comming from the decoder and key and value comming from the encoder
    x = self.residualConnection[2](x, self.feed_forward_block(x))
    return x


### Decoder block

In [None]:
class Decoder(nn.Module):
  def __init__(self, layers: nn.ModuleList):
    self.layers = layers
    self.norm = LayerNorm()

  def forward(self, x, encoder_output, src_mask, tgt_mask):
    for layer in self.layers:
      x = layer(x, encoder_output, src_mask, tgt_mask)
    return self.norm(x)

### last decoder output layer (projection layer)

In [None]:
class ProjectionLayer(nn.Module):
  def __init__(self, d_model: int, vocab_size: int):
    self.proj = nn.Linear(d_model, vocab_size)

  def forward(self, x):
    # (batch, seq_len, d_model) --> (batch, seq_len, vocab_size)
    return torch.log_softmax(self.proj(x), dim = -1)


### Transformer


In [None]:
class Transformer(nn.Module):
  def __init__(self, encoder: Encoder, decoder: Decoder, src_embedding: WordEmbedding, tgt_embedding: WordEmbedding, src_pos: PostionalEmbedding, tgt_pos: PostionalEmbedding, proj: ProjectionLayer):
    self.encoder = encoder
    self.decoder = decoder
    self.src_embed = src_embedding
    self.tgt_embed = tgt_embedding
    self.src_pos = src_pos
    self.tgt_pos = tgt_pos
    self.projectionLayer = proj

  def encode(self, src, src_mask):
    src = self.src_embed(src)
    src = self.src_pos(src)
    return self.encoder(src, src_mask)

  def decode(self, encoder_output, src_mask, tgt, tgt_mask):
    tgt = self.tgt_embed(tgt)
    tgt = self.tgt_pos(tgt)
    return self.decoder(tgt, encoder_output, src_mask, tgt_mask)

  def projection(self, x):
    return self.projectionLayer(x)


## Build Tranformer

In [None]:
def buildTransformer(src_vocab_size: int, tgt_vocab_size: int, src_seq_len: int, tgt_seq_len: int, d_model: int = 512, head: int= 8, d_ff: int= 2048, N: int=6, dropout_p: float = 0.1):
  # create source and target embedding
  src_embedding = WordEmbedding(d_model, src_vocab_size)
  tgt_embedding = WordEmbedding(d_model, tgt_vocab_size)

  # create postional embedding for source and target
  src_pos_embed = PostionalEmbedding(d_model, src_seq_len, dropout_p)
  tgt_pos_embed = PostionalEmbedding(d_model, tgt_seq_len, dropout_p)

  EncoderBlocks = []
  for _ in range(N):
    encoder_self_attention = MultiheadAttentionBlock(d_model, head, dropout_p)
    feed_forward = FeedForwardBlock(d_model, d_ff, dropout_p)
    encoder_block = EncoderBlock(encoder_self_attention, feed_forward, dropout_p)
    EncoderBlocks.append(encoder_block)

  DecoderBlocks = []
  for _ in range(N):
    decoder_self_attention = MultiheadAttentionBlock(d_model, head, dropout_p)
    decoder_cross_attention = MultiheadAttentionBlock(d_model, head, dropout_p)
    feed_forward = FeedForwardBlock(d_model, d_ff, dropout_p)
    decoder_block = DecoderBlock(decoder_self_attention, decoder_cross_attention, feed_forward, dropout_p)
    DecoderBlocks.append(decoder_block)

  # create encoder and decoder
  encoder = Encoder(nn.ModuleList(EncoderBlocks))
  decoder = Decoder(nn.ModuleList(DecoderBlocks))

  # projection layer
  projectionLayer = ProjectionLayer(d_model, tgt_vocab_size)

  # Transformer
  transformer = Transformer(encoder, decoder, src_embedding, tgt_embedding, src_pos_embed, tgt_pos_embed, projectionLayer)

  # initialize paramers so training faster
  for p in transformer.parameter():
    if p.dim() > 1:
      nn.init.xavier_uniform(p)

  return transformer

# Build Language Translation from English to Italian using Transformer

In [7]:
!pip3 install datasets --quiet

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m507.1/507.1 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m115.3/115.3 kB[0m [31m13.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m134.8/134.8 kB[0m [31m14.8 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m134.8/134.8 kB[0m [31m14.3 MB/s[0m eta [36m0:00:00[0m
[?25h

In [8]:
from typing import Any

import torch
import torch.nn as nn
# from torch.utils.data import Dataset, DataLoader, random_split
from datasets import load_dataset
from tokenizers import Tokenizer
from tokenizers.models import WordLevel
from tokenizers.trainers import WordLevelTrainer
from tokenizers.pre_tokenizers import Whitespace

from pathlib import Path #create absolut path using relative path

In [4]:
from torch.utils.data import Dataset, DataLoader, random_split

### Tokenizer

In [None]:
def get_all_sentence(dataset, lang):
  for item in dataset:
    yield item['translation'][lang]

def buildTokenizer(config, dataset, lang):
  # Ex: config['tokenizer_file'] = '../tokenizers/tokenizer_{}.json}'
  tokenizer_path = Path(config['tokenizer_file'].formate(lang))
  if not Path.exists(tokenizer_path):
    tokenizer = Tokenizer(WordLevel(unk_token='[UNK]'))
    tokenizer.pre_tokenizers = Whitespace()
    trainer = WordLevelTrainer(special_tokens = ['[UNK]', '[PAD]', '[SOS]', '[EOS]'], min_frequency=2)
    tokenizer.train_from_iterator(get_all_sentence(dataset, lang), trainer=trainer)
    tokenizer.save(str(tokenizer_path))
  else:
    tokenizer = Tokenizer.from_file(str(tokenizer_path))
  return tokenizer

### get dataset from HuggingFace

In [None]:
def get_data(config):
  ds_raw = load_dataset('opus_book', f"{config['lang-src']-config['lang-tgt']}", split='train')

  # build tokenizers
  tokenizer_src = buildTokenizer(config, ds_row, config['lang-src'])
  tokenizer_tgt = buildTokenizer(config, ds_row, config['lang-tgt'])

  #split dataset int train and validation set
  train_size = int(0.9*len(ds_raw))
  val_size = len(ds_raw) - train_size
  train_raw, val_raw = random_split(ds_raw, [train_size, val_size])

In [None]:
class BilingualDataset(nn.Module):
  def __init__(self, dataset, tokenizer_src, tokenizer_tgt, src_lang, tgt_lang, seq_len):
    super().__init__()
    self.dataset = dataset
    self.tokenizer_src = tokenizer_src
    self.tokenizer_tgt = tokenizer_tgt
    self.src_lang = src_lang
    self.tgt_lang = tgt_lang
    self.seq_len = seq_len

    self.sos_token = torch.tensor(self.tokenizer_src.token_to_id(['[SOS]']), dtype=torch.int64)
    self.pad_token = torch.tensor(self.tokenizer_src.token_to_id(['[PAD]']), dtype=torch.int64)
    self.eos_token = torch.tensor(self.tokenizer_src.token_to_id(['[EOS]']), dtype=torch.int64)

  def __len__(self):
    return len(self.dataset)

  def __getitem__(self, index: Any):
    src_target_text = self.dataset[index]
    src_text = src_target_text['translation'][self.src_lang]
    tgt_text = src_target_text['translation'][self.tgt_lang]

    enc_input_tokens = self.tokenizer_src.encode(src_text).ids #convert each src word into id give as array of id's
    dec_input_tokens = self.tokenizer_tgt.encode(tgt_text).ids

    enc_num_padding_tokens = self.seq_len - len(enc_input_tokens) - 2 # 2 becasue we add [SOS] and [EOS]
    dec_num_padding_tokens = self.seq_len = len(dec_input_tokens) -  1 # 1 because we only add [SOS] to the decoder side

    if enc_num_padding_tokens < 0 or dec_num_padding_tokens < 0:
      raise ValueError('Sentence is too long')




