<a href="https://colab.research.google.com/github/dixit2003/End-toEnd-Transformer-Model-for-Language-Translation/blob/main/Transformers.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import nltk
nltk.download('punkt')
from nltk.tokenize import word_tokenize

import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from tensorflow.keras.preprocessing.text import one_hot

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [None]:
config = {
    'batch_size':  3,
    'num_epochs': 1,
    'lr': 10**-4,
    'seq_len': 350,
    'd_model': 512,
    'lang_src': 'en',
    'lang_tgt': 'it',
    'model_folder': 'weights',
    'model_baseename': 'tmodel_',
    'preload': None,
    'tokenizer_file': 'tokenizer_{}.json',
    'experiment_name': 'runs/tmodel'
}

## **Input Embeddings**

In [None]:
class InputEmbedding(nn.Module):
  def __init__(self, d_model: int, vocab_size: int):
    super().__init__()
    self.d_model = d_model
    self.vocab_size = vocab_size
    self.embedding = nn.Embedding(vocab_size, d_model)

  def forward(self, x):
    return self.embedding(x)

## **Positional Encoding**

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self,  seq_length: int, d_model: int, dropout: float):
        super().__init__()
        self.seq_length = seq_length
        self.d_model = d_model
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        even_idx = torch.arange(0, self.d_model, 2).float()
        denominator = torch.pow(10000, even_idx / self.d_model)
        position = torch.arange(self.seq_length).reshape(self.seq_length, 1)
        even_pe = torch.sin(position / denominator)
        odd_pe = torch.cos(position / denominator)
        stacked = torch.stack([even_pe, odd_pe], dim=2)
        pe = torch.flatten(stacked, start_dim=1, end_dim=2)
        x += pe
        return self.dropout(x)

## **Multi-Head Attention**

In [None]:
class MultiHeadAttention(nn.Module):
  def __init__(self, d_model: int, heads: int, dropout: float):
    super().__init__()
    self.d_model = d_model
    self.heads = heads
    self.d_k = d_model // heads

    self.w_q = nn.Linear(d_model, d_model)
    self.w_k = nn.Linear(d_model, d_model)
    self.w_v = nn.Linear(d_model, d_model)
    self.w_o = nn.Linear(d_model, d_model)
    self.dropout = nn.Dropout(dropout)

  def attention(self, q: torch.Tensor, k: torch.Tensor, v: torch.Tensor, mask=None):
    d_k = q.size(-1)
    scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(d_k)

    if mask is not None:
      scores = scores.masked_fill(mask == 0, -1e9)

    attention_weights = F.softmax(scores, dim=-1)

    if self.dropout is not None:
      attention_weights = self.dropout(attention_weights)

    output = torch.matmul(attention_weights, v)
    return output, attention_weights

  def forward(self, q, k, v, mask=None):
    batch_size = q.size(0)

    query = self.w_q(q)
    key = self.w_k(k)
    value = self.w_v(v)

    query = query.view(batch_size, -1, self.heads, self.d_k).transpose(1, 2)
    key = key.view(batch_size, -1, self.heads, self.d_k).transpose(1, 2)
    value = value.view(batch_size, -1, self.heads, self.d_k).transpose(1, 2)

    attention_output, attention_weights = self.attention(query, key, value, mask)

    attention_output = attention_output.transpose(1, 2).contiguous().view(batch_size, -1, self.heads * self.d_k)

    output = self.w_o(attention_output)
    return output, attention_weights

### **Add and Normalize**

In [None]:
class LayerNormalization(nn.Module):
  def __init__(self, eps: float = 1e-6) -> None:
    super().__init__()
    self.eps = eps
    self.alpha = nn.Parameter(torch.ones(1))
    self.beta = nn.Parameter(torch.zeros(1))

  def forward(self, x):
    mean = x.mean(-1, keepdim=True)
    std = x.std(-1, keepdim=True)
    return self.alpha * (x - mean) / (std + self.eps) + self.beta

## **Feed Forward Neural Network**

In [None]:
class FeedForward(nn.Module):
  def __init__(self, d_model: int, d_ff: int, dropout: float):
    super().__init__()
    self.linear1 = nn.Linear(d_model, d_ff)
    self.linear2 = nn.Linear(d_ff, d_model)
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    return self.linear2(self.dropout(F.relu(self.linear1(x))))

## **Residual/Skip Connection**

In [None]:
class ResidualConnection(nn.Module):
  def __init__(self, dropout: float) -> None:
    super().__init__()
    self.norm = LayerNormalization()
    self.dropout = nn.Dropout(dropout)

  def forward(self, x, sublayer):
    return x + self.dropout(sublayer(self.norm(x)))

## **Encoder Block**

In [None]:
class EncoderBlock(nn.Module):
  def __init__(self, d_model: int, heads: int, d_ff: int, dropout: float):
    super().__init__()
    self.attention = MultiHeadAttention(d_model, heads, dropout)
    self.feed_forward = FeedForward(d_model, d_ff, dropout)
    self.residual_connection1 = ResidualConnection(dropout)
    self.residual_connection2 = ResidualConnection(dropout)

  def forward(self, x, mask):
    x = self.residual_connection1(x, lambda x: self.attention(x, x, x, mask)[0])
    x = self.residual_connection2(x, self.feed_forward)
    return x

In [None]:
class Encoder(nn.Module):
  def __init__(self, layers: nn.ModuleList) -> None:
    super().__init__()
    self.layers = layers
    self.norm = LayerNormalization()

  def forward(self, x, src_mask):
    for layer in self.layers:
      x = layer(x, src_mask)
    return self.norm(x)

In [None]:
encoder = Encoder(nn.ModuleList([EncoderBlock(512, 8, 2048, 0.1) for _ in range(6)]))

## **Decoder Block**

In [None]:
class DecoderBlock(nn.Module):
  def __init__(self, d_model: int, heads: int, d_ff: int, dropout: float):
    super().__init__()
    self.attention1 = MultiHeadAttention(512, 8, 0.1)
    self.attention2 = MultiHeadAttention(512, 8, 0.1)
    self.feed_forward = FeedForward(512, 2048, 0.1)
    self.residual_connection1 = ResidualConnection(dropout)
    self.residual_connection2 = ResidualConnection(dropout)
    self.residual_connection3 = ResidualConnection(dropout)

  def forward(self, x, encoder_output, src_mask, tgt_mask):
    attention1_output = self.attention1(x, x, x, tgt_mask)[0]
    residual1_output = self.residual_connection1(x, lambda x: attention1_output)
    attention2_output = self.attention2(residual1_output, encoder_output, encoder_output, src_mask)[0]
    residual2_output = self.residual_connection2(residual1_output, lambda x: attention2_output)
    residual3_output = self.residual_connection3(residual2_output, lambda x: self.feed_forward(x))
    return residual3_output

## **Decoder**

In [None]:
class Decoder(nn.Module):
  def __init__(self, layers: nn.ModuleList):
    super().__init__()
    self.layers = layers
    self.norm = LayerNormalization()

  def forward(self, x, encoder_output, src_mask, tgt_mask):
    for layer in self.layers:
      x = layer(x, encoder_output, src_mask, tgt_mask)
    return self.norm(x)

## **Projection Layer**

In [None]:
class ProjectionLayer(nn.Module):
  def __init__(self, d_model: int, vocab_size: int) -> None:
    super().__init__()
    self.proj = nn.Linear(d_model, vocab_size)

  def forward(self, x):
    return F.softmax(self.proj(x), dim=-1)

## **Transformer Block**

In [None]:
class TransformerBlock(nn.Module):
  def __init__(self, encoder: EncoderBlock, decoder: DecoderBlock, src_embedding: InputEmbedding, tgt_embedding: InputEmbedding, src_pos: PositionalEncoding, tgt_pos: PositionalEncoding, proj: ProjectionLayer):
    super().__init__()
    self.encoder = encoder
    self.decoder = decoder
    self.src_embedding = src_embedding
    self.tgt_embedding = tgt_embedding
    self.src_pos = src_pos
    self.tgt_pos = tgt_pos
    self.proj = proj

  def encode(self, src, src_mask):
    src = self.src_embedding(src)
    src = self.src_pos(src)
    return self.encoder(src, src_mask)
  def decode(self, tgt, encoder_output, src_mask, tgt_mask):
    tgt = self.tgt_embedding(tgt.long())
    tgt = self.tgt_pos(tgt)
    return self.decoder(tgt, encoder_output, src_mask, tgt_mask)
  def project(self, x):
    return self.proj(x)

# **Transformer**

In [None]:
def build_transformer(src_vocab_size: int, tgt_vocab_size: int, src_seq_len: int,
                     target_seq_len: int, d_model:int = 512, N: int = 6, d_ff: int = 2048, n_heads: int = 8, dropout: int = 0.1):
  src_embedding = InputEmbedding(d_model, src_vocab_size)
  tgt_embedding = InputEmbedding(d_model, tgt_vocab_size)

  src_pos = PositionalEncoding(src_seq_len, d_model, dropout)
  tgt_pos = PositionalEncoding(target_seq_len, d_model, dropout)

  proj = ProjectionLayer(d_model, tgt_vocab_size)

  encoder = EncoderBlock(d_model, n_heads, d_ff, dropout)
  decoder = DecoderBlock(d_model, n_heads, d_ff, dropout)

  encoder = Encoder(nn.ModuleList([encoder for _ in range(N)]))
  decoder = Decoder(nn.ModuleList([decoder for _ in range(N)]))

  transformer = TransformerBlock(encoder, decoder, src_embedding, tgt_embedding, src_pos, tgt_pos, proj)
  return transformer

In [None]:
build_transformer(1000, 1000, 4, 3)

TransformerBlock(
  (encoder): Encoder(
    (layers): ModuleList(
      (0-5): 6 x EncoderBlock(
        (attention): MultiHeadAttention(
          (w_q): Linear(in_features=512, out_features=512, bias=True)
          (w_k): Linear(in_features=512, out_features=512, bias=True)
          (w_v): Linear(in_features=512, out_features=512, bias=True)
          (w_o): Linear(in_features=512, out_features=512, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
        )
        (feed_forward): FeedForward(
          (linear1): Linear(in_features=512, out_features=2048, bias=True)
          (linear2): Linear(in_features=2048, out_features=512, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
        )
        (residual_connection1): ResidualConnection(
          (norm): LayerNormalization()
          (dropout): Dropout(p=0.1, inplace=False)
        )
        (residual_connection2): ResidualConnection(
          (norm): LayerNormalization()
          (dropout): Dropout(p

# **Training Process**

In [None]:
!pip install datasets



In [None]:
from torch.utils.data import Dataset, DataLoader, random_split

from datasets import load_dataset
from tokenizers import Tokenizer
from tokenizers.models import WordLevel
from tokenizers.trainers import WordLevelTrainer
from tokenizers.pre_tokenizers import Whitespace

from pathlib import Path

In [None]:
def get_all_sentences(ds, lang):
  for item in ds:
    yield item['translation'][lang]

In [None]:
def get_or_build_tokenizer(config, ds, lang):
  tokenizer_path = Path(config['tokenizer_file'].format(lang))
  if not tokenizer_path.exists():
    tokenizer = Tokenizer(WordLevel(unk_token="[UNK]"))
    tokenizer.pre_tokenizer = Whitespace()
    trainer = WordLevelTrainer(special_tokens=["[UNK]", "[PAD]", "[BOS]", "[EOS]"], min_frequency=2)
    tokenizer.train_from_iterator(get_all_sentences(ds, lang), trainer=trainer)
    tokenizer.save(str(tokenizer_path))
  else:
    tokenizer = Tokenizer.from_file(str(tokenizer_path))
  return tokenizer

In [None]:
def get_ds(config):
  ds_raw = load_dataset('opus_books', f'{config["lang_src"]}-{config["lang_tgt"]}', split='train')
  # Build Tokenizer
  tokenizer_src = get_or_build_tokenizer(config, ds_raw, config['lang_src'])
  tokenizer_tgt = get_or_build_tokenizer(config, ds_raw, config['lang_tgt'])
  # Split Dataset
  train_ds_size = int(0.9 * len(ds_raw))
  val_ds_size = len(ds_raw) - train_ds_size
  train_ds_raw, val_ds_raw = random_split(ds_raw, [train_ds_size, val_ds_size])

  # Create Dataset
  train_ds = BilingualDataset(train_ds_raw, tokenizer_src, tokenizer_tgt, config['lang_src'], config['lang_tgt'], config['seq_len'])
  val_ds = BilingualDataset(val_ds_raw, tokenizer_src, tokenizer_tgt, config['lang_src'], config['lang_tgt'], config['seq_len'])

  max_len_src = 0
  max_len_tgt = 0

  for item in ds_raw:
    src_ids = tokenizer_src.encode(item['translation'][config['lang_src']]).ids
    tgt_ids = tokenizer_tgt.encode(item['translation'][config['lang_tgt']]).ids
    max_len_src = max(max_len_src, len(src_ids))
    max_len_tgt = max(max_len_tgt, len(tgt_ids))

  print(f'Max length src: {max_len_src}')
  print(f'Max length tgt: {max_len_tgt}')

  train_dataloader = DataLoader(train_ds, batch_size=config['batch_size'], shuffle=True)
  val_dataloader = DataLoader(val_ds, batch_size=1, shuffle=True)

  return train_dataloader, val_dataloader, tokenizer_src, tokenizer_tgt

## **Dataset**

In [None]:
class BilingualDataset(Dataset):
  def mask(size):
    mask = torch.triu(torch.ones(1, size, size), diagonal=1).type(torch.int)
    return mask == 0

  def __init__(self, ds, tokenizer_src, tokenizer_tgt, src_lang, tgt_lang, seq_len):
    self.ds = ds
    self.tokenizer_src = tokenizer_src
    self.tokenizer_tgt = tokenizer_tgt
    self.src_lang = src_lang
    self.tgt_lang = tgt_lang
    self.seq_len = seq_len

    self.bos_token = torch.tensor([tokenizer_src.token_to_id('[BOS]')], dtype=torch.int64)
    self.eos_token = torch.tensor([tokenizer_src.token_to_id('[EOS]')], dtype=torch.int64)
    self.pad_token = torch.tensor([tokenizer_src.token_to_id('[PAD]')], dtype=torch.int64)

  def __len__(self):
    return len(self.ds)

  def __getitem__(self, idx):
    src_target_pair = self.ds[idx]
    src = src_target_pair['translation'][self.src_lang]
    tgt = src_target_pair['translation'][self.tgt_lang]

    enc_input_tokens = self.tokenizer_src.encode(src).ids
    dec_input_tokens = self.tokenizer_tgt.encode(tgt).ids

    enc_num_padding_tokens = self.seq_len - len(enc_input_tokens) - 2
    dec_num_padding_tokens = self.seq_len - len(dec_input_tokens) - 1

    if enc_num_padding_tokens < 0 or dec_num_padding_tokens < 0:
      raise ValueError("sentence length exceeds maximum sequence length")

    encoder_input = torch.cat([
        self.bos_token,
        torch.tensor(enc_input_tokens, dtype=torch.int64),
        self.eos_token,
        torch.tensor([self.pad_token] * enc_num_padding_tokens, dtype=torch.int64)
    ])

    decoder_input = torch.cat(
        [
            self.bos_token,
            torch.tensor(dec_input_tokens, dtype=torch.int64),
            torch.tensor([self.pad_token] * dec_num_padding_tokens, dtype=torch.int64)
        ]
    )

    label = torch.cat([
        torch.tensor(dec_input_tokens, dtype=torch.int64),
        self.eos_token,
        torch.tensor([self.pad_token] * dec_num_padding_tokens, dtype=torch.int64)
    ])

    assert len(encoder_input) == self.seq_len
    assert len(decoder_input) == self.seq_len
    assert len(label) == self.seq_len

    return {"Encoder_Input": encoder_input,
            "Decoder_Input": decoder_input,
            "Encoder_mask": (encoder_input != self.pad_token).unsqueeze(0).unsqueeze(0).int(),
            "Decoder_mask": (decoder_input != self.pad_token).unsqueeze(0).unsqueeze(0).int() & BilingualDataset.mask(decoder_input.size(0)),
            "label": label,
            "src_text": src,
            "tgt_text": tgt}

## **Build Model**

In [None]:
def get_model(config, vocab_src_len, vocab_tgt_len):
  model = build_transformer(vocab_src_len, vocab_tgt_len, config['seq_len'], config['seq_len'], config['d_model'])
  return model

In [None]:
def get_weights_file_path(config, epochs: str):
  model_folder = config['model_folder']
  model_basename = config['model_basename']
  model_filename = f'{model_basename}{epochs}.pt'
  return str(Path('.') / model_folder / model_filename)

In [None]:
from torch.utils.tensorboard import SummaryWriter
import tqdm

def train_model(config):
  train_dataloader, val_dataloader, tokenizer_src, tokenizer_tgt = get_ds(config)
  model = get_model(config, tokenizer_src.get_vocab_size(), tokenizer_tgt.get_vocab_size())

  writer = SummaryWriter(config['experiment_name'])
  optimizer = torch.optim.Adam(model.parameters(), lr=config['lr'])

  initial_epoch = 0
  global_step = 0
  if config['preload']:
    model_filename = get_weights_file_path(config, config['preload'])
    print(f'Preloading model {model_filename}')
    state = torch.load(model_filename)

    initial_epoch = state['epoch'] + 1
    optimizer.load_state_dict(state['optimizer'])
    global_step = state['global_step']

  loss_fn = nn.CrossEntropyLoss(ignore_index=tokenizer_src.token_to_id('[PAD]'), label_smoothing=0.1)

  for epoch in range(initial_epoch, config['num_epochs']):
    model.train()
    batch_iterator = tqdm.tqdm(train_dataloader, desc=f'Processing epoch {epoch: 02d}')

    for batch in batch_iterator:
      encoder_input = batch['Encoder_Input'].long()
      decoder_input = batch['Decoder_Input'].long()
      encoder_mask = batch['Encoder_mask'].long()
      decoder_mask = batch['Decoder_mask'].long()
      label = batch['label'].long()

      encoder_output = model.encode(encoder_input, encoder_mask)
      decoder_output = model.decode(decoder_input, encoder_output, encoder_mask, decoder_mask)
      proj_out = model.project(decoder_output)

      label = batch['label']
      loss = loss_fn(proj_out.view(-1, tokenizer_tgt.get_vocab_size()), label.view(-1))
      batch_iterator.set_postfix({f'loss': f'{loss.item():6.3f}'})

      writer.add_scalar('train loss', loss.item(), global_step)
      writer.flush

      loss.backward()

      optimizer.step()
      optimizer.zero_grad()

      global_step += 1

    model_filename = get_weights_file_path(config, f'{epoch: 02d}')
    torch.save(
        {
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer': optimizer.state_dict(),
            'global_step': global_step
        },
        model_filename
    )

train_model(config)
