<a href="https://colab.research.google.com/github/NLP-END3/Session10/blob/main/Session10_END3_Transformer_Model_EN_DE_Translation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 1. Objective

## : References


# 2. Loading required libraries and lanugage models

In [None]:
%%bash
python -m spacy download en
python -m spacy download de

Collecting en_core_web_sm==2.2.5
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-2.2.5/en_core_web_sm-2.2.5.tar.gz (12.0 MB)
[38;5;2m✔ Download and installation successful[0m
You can now load the model via spacy.load('en_core_web_sm')
[38;5;2m✔ Linking successful[0m
/usr/local/lib/python3.7/dist-packages/en_core_web_sm -->
/usr/local/lib/python3.7/dist-packages/spacy/data/en
You can now load the model via spacy.load('en')
Collecting de_core_news_sm==2.2.5
  Downloading https://github.com/explosion/spacy-models/releases/download/de_core_news_sm-2.2.5/de_core_news_sm-2.2.5.tar.gz (14.9 MB)
[38;5;2m✔ Download and installation successful[0m
You can now load the model via spacy.load('de_core_news_sm')
[38;5;2m✔ Linking successful[0m
/usr/local/lib/python3.7/dist-packages/de_core_news_sm -->
/usr/local/lib/python3.7/dist-packages/spacy/data/de
You can now load the model via spacy.load('de')


In [None]:
!pip install torchtext

In [None]:
# Loading required libraries
import torch
import torch.nn as nn
import torch.optim as optim

from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torchtext.datasets import Multi30k
from typing import Iterable, List

import spacy
import numpy as np

import random
import math
import time

In [None]:
# Setting Seed to make the experiment reproducible
SEED = 1234

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

# 3. Defining dataloader

In [None]:
SRC_LANGUAGE = 'en'
TGT_LANGUAGE = 'de'

token_transform = {}
vocab_transform = {}

token_transform[SRC_LANGUAGE] = get_tokenizer('spacy', language='en')
token_transform[TGT_LANGUAGE] = get_tokenizer('spacy', language='de')

def yield_tokens(data_iter: Iterable, language: str) -> List[str]:
    language_index = {SRC_LANGUAGE: 0, TGT_LANGUAGE: 1}

    for data_sample in data_iter:
        yield token_transform[language](data_sample[language_index[language]])

# Define spcial symbols and indices
UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3
special_symbols = ['<unk>', '<pad>', '<bos>', '<eos>']

for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
    train_iter = Multi30k(split='train', language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))
    vocab_transform[ln] = build_vocab_from_iterator(yield_tokens(train_iter, ln),
                                                    min_freq = 1,
                                                    specials = special_symbols,
                                                    special_first = True)
    
    vocab_transform[ln].set_default_index(UNK_IDX)

100%|██████████| 1.21M/1.21M [00:01<00:00, 645kB/s]


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 4. Building the networks

In [None]:
# Encoder block
class Encoder(nn.Module):
    def __init__(self,input_dim,hid_dim,n_layers,n_heads,pf_dim,dropout,device,max_length=100):
        super().__init__()
        self.device = device

        self.tok_embedding = nn.Embedding(input_dim, hid_dim)
        self.pos_embedding = nn.Embedding(max_length, hid_dim)

        self.layers = nn.ModuleList([EncoderLayer(hid_dim, n_heads, pf_dim, dropout, device) for _ in range(n_layers)])
        
        self.dropout = nn.Dropout(dropout)
        self.scale = torch.sqrt(torch.FloatTensor([hid_dim]).to(device))

    def forward(self, src, src_mask):
        # src dimension [batch_size, src_len]
        # src_mask dimension [batch_size, 1, 1, src_len]

        batch_size = src.shape[0]
        src_len = src.shape[1]

        pos = torch.arrange(0, src_len).unsqueeze(0).repeat(batch_size,1)

        src = self.dropout((self.tok_embedding(src) * self.scale ) + self.pos_embedding(pos))

        for layer in self.layers:
            src = layer(src,src_mask)


        # src dimension [batch_size, src_len, hid_dim]

        return src

In [None]:
# Encoder layer
class EncoderLayer(nn.Module):
    def __init__(hid_dim, n_heads, pf_dim, dropout, device):
        super().__init__()

        self.self_attn_layer_norm = nn.LayerNorm(hid_dim)
        self.ff_layer_norm = nn.LayerNorm(hid_dim)
        self.self_attention = MultiHeadAttentionLayer(hid_dim, n_heads, dropout, device)
        self.pointwise_feedforward = PointWiseFeedForwardLayer(hid_dim, pf_dim, dropout)
        self.dropout = nn.Dropout(dropout)

    def forward(self,src,src_mask):
        #src dimension [batch_size, src_len, hid_dim]
        # src_mask dimension [batch_size, 1, 1, src_len]

        #self attention
        _src, _ = self.self_attention(src, src, src, src_mask)

        # dropout, residual connection and layer norm
        src = self.self_attn_layer_norm(src + self.dropout(_src))

        # pointwise feedforward
        src = self.pointwise_feedforward(src)

        # dropout, residual connection and layer norm
        src = self.ff_layer_norm(src + self.droupout(src))

        # src dimension [batch_size, src_len, hid_dim]

        return src

In [None]:
class MultiHeadAttentionLayer(nn.Module):
    def __init__(self, hid_dim, n_heads, dropout, device):
        super().__init__()

        assert hid_dim % n_heads == 0

        self.hid_dim = hid_dim
        self.n_heads = n_heads
        self.head_dim = hid_dim // n_heads

        self.fc_q = nn.Linear(hid_dim, hid_dim)
        self.fc_k = nn.Linear(hid_dim, hid_dim)
        self.fc_v = nn.Linear(hid_dim, hid_dim)

        self.fc_o = nn.Linear(hid_dim, hid_dim)

        self.dropout = nn.Dropout(dropout)
        self.scale = torch.sqrt(torch.FloatTensor([self.head_dim])).to(device)

    def forward(self, query, key, value, mask=None):
        batch_size = query.shape[0]

        # Query [batch_size, query_len, hid_dim]
        # Key [batch_size, key_len, hid_dim]
        # Value [batch_size, value_len, hid_dim]

        Q = self.fc_q(query)
        K = self.fc_k(query)
        V = self.fc_v(query)

        #Q = [batch size, query len, hid dim]
        #K = [batch size, key len, hid dim]
        #V = [batch size, value len, hid dim]

        Q = Q.view(batch_size,-1,self.n_heads, self.head_dim).permute(0,2,1,3)
        K = K.view(batch_size,-1,self.n_heads, self.head_dim).permute(0,2,1,3)
        V = V.view(batch_size,-1,self.n_heads, self.head_dim).permute(0,2,1,3)


        #Q = [batch size, n_heads, query len, head_dim]
        #K = [batch size,  n_heads, key len, head_dim]
        #V = [batch size,  n_heads, value len, head_dim]

        #K.permute = [batch size, , head_dim, n_head, key len]

        energy = torch.matmul(Q, K.permute(0,1,3,2)) / self.scale

        #energy = [batch size, n heads, query len, key len]

        if mask is not None:
            energy = energy.mask_fill(mask==0, -1e10)

        attention = torch.softmax(energy, dim=-1)
        #attention = [batch size, n heads, query len, key len]
        #V = [batch size,  n_heads, value len, head_dim]

        x = torch.matmul(self.dropout(attention), V)
        #x = [batch size, n heads, query len, head_dim]

        x = x.permute(0,2,1,3).contiguous()
        #x = [batch size, query len, n heads, head_dim]

        x = x.view(batch_size, -1, self.hid_dim)
        #x = [batch size, query len, hid_dim]

        x = self.fc_o(x)
        #x = [batch size, query len, hid_dim]

        return x, attention

In [None]:
class PositionFeedForwardLayer(nn.Module):
    def __init__(self, hid_dim, pf_dim, dropout):
        super().__init__()

        self.fc_1 = nn.Linear(hid_dim, pf_dim)
        self.fc_2 = nn.Linear(pf_dim, hid_dim)

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):

        # x dimension [batch_size, seq_len, hid_dim]

        x = self.dropout(torch.relu(self.fc_1(x)))

        # x dimension [batch_size, seq_len, pf_dim]

        x = self.fc_1(x)

        # x dimension [batch_size, seq_len, hid_dim]

        return x