In [1]:
import torch 
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

from transformers import AutoTokenizer
import math

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [2]:

block_size = 128
sample_num = 1000000 # 1M
batch_size = 256
max_vocab_size = 30000

d_model = 512

## Tokenization 

In [3]:
from tokenizers import Tokenizer
from tokenizers.models import WordLevel
from tokenizers.trainers import WordLevelTrainer
from tokenizers.pre_tokenizers import Whitespace
from torch.utils.data import Dataset, DataLoader
from pathlib import Path


europarl_en = open('Data/europarl-v7.fr-en.en', encoding='utf-8').read().split('\n')[:sample_num]
europarl_fr = open('Data/europarl-v7.fr-en.fr', encoding='utf-8').read().split('\n')[:sample_num]


In [4]:
en_iterator = iter(europarl_en)
fr_iterator = iter(europarl_fr)

def get_or_build_tokenizer(path, iterator, lang, max_vocab_size):
    tokenizer_path = Path(path+"tokenizer_"+lang+".json")
    if not Path.exists(tokenizer_path):
        tokenizer = Tokenizer(WordLevel(unk_token='[UNK]'))
        tokenizer.pre_tokenizer = Whitespace()
        trainer = WordLevelTrainer(special_tokens=["[UNK]","[PAD]","[BOS]","[EOS]"], min_frequency=2, vocab_size=max_vocab_size)
        tokenizer.train_from_iterator(iterator, trainer=trainer)
        tokenizer.save(str(tokenizer_path))
    else:
        tokenizer = Tokenizer.from_file(str(tokenizer_path))
    return tokenizer

In [5]:
path = "/home/ismailko/Documents/Projects/All Neural Networks Scratch/Transformer/"
tokenizer_en = get_or_build_tokenizer(path,en_iterator,"english", max_vocab_size=max_vocab_size)
tokenizer_fr = get_or_build_tokenizer(path,fr_iterator,"french", max_vocab_size=max_vocab_size)

class MachineTranslationDataset(Dataset):
    def __init__(self,sentences_src,sentences_trg, tokenizer_src,tokenizer_trg, seq_len):
        self.sentences_src = sentences_src
        self.sentences_trg = sentences_trg
        self.tokenizer_src = tokenizer_src
        self.tokenizer_trg = tokenizer_trg
        self.seq_len = seq_len

        self.bos_token = torch.tensor([tokenizer_src.token_to_id("[BOS]")], dtype=torch.int64)
        self.eos_token = torch.tensor([tokenizer_src.token_to_id("[EOS]")], dtype=torch.int64)
        self.pad_token = torch.tensor([tokenizer_src.token_to_id("[PAD]")], dtype=torch.int64)

    def __len__(self):
        return len(self.sentences_src)
    
    def __getitem__(self, idx):
        english_sentence = self.sentences_src[idx]
        french_sentence = self.sentences_trg[idx]

        english_sentence = torch.tensor(self.tokenizer_src.encode(english_sentence).ids, dtype=torch.int64)
        french_sentence = torch.tensor(self.tokenizer_trg.encode(french_sentence).ids, dtype=torch.int64)
        # add bos and eos tokens
        english_sentence = torch.cat((self.bos_token, english_sentence, self.eos_token))
        french_sentence = torch.cat((self.bos_token, french_sentence, self.eos_token))

        # pad sentences to seq_len
        english_sentence = torch.cat((english_sentence, self.pad_token.repeat(self.seq_len - english_sentence.shape[0])))
        french_sentence = torch.cat((french_sentence, self.pad_token.repeat(self.seq_len - french_sentence.shape[0])))

        return english_sentence, french_sentence

In [6]:
dataset = MachineTranslationDataset(europarl_en, europarl_fr, tokenizer_en, tokenizer_fr, block_size)
data_loader = DataLoader(dataset, batch_size=batch_size)

In [32]:
class PositionalEncoding(nn.Module):

    def __init__(self, d_model, max_sequence_length):
        super().__init__()
        self.max_sequence_length = max_sequence_length
        self.d_model = d_model

    def forward(self):
        even_i = torch.arange(0, self.d_model, 2).float()
        denominator = torch.pow(10000, even_i/self.d_model)
        position = torch.arange(self.max_sequence_length).reshape(self.max_sequence_length, 1)
        even_PE = torch.sin(position / denominator)
        odd_PE = torch.cos(position / denominator)
        stacked = torch.stack([even_PE, odd_PE], dim=2)
        PE = torch.flatten(stacked, start_dim=1, end_dim=2)
        return PE

In [37]:
class EmbeddingLayer(nn.Module):
    def __init__(self, d_model, vocab_size, block_size):
        super().__init__()
        self.token_embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, block_size)
    def forward(self, x):
        out = self.token_embedding(x) + self.positional_encoding()
        return  out

In [39]:
x = next(iter(data_loader))
src = x[0]

embed_layer = EmbeddingLayer(d_model,max_vocab_size, block_size)

embed_layer(src).shape

torch.Size([256, 128, 512])

In [42]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads

        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        
        self.d_qkv = d_model // num_heads

        self.W_keys = nn.Linear(d_model, d_model)
        self.W_queries = nn.Linear(d_model, d_model)
        self.W_values = nn.Linear(d_model, d_model)

    def forward(self, key_src, query_src, value_src, mask=None):
        
        B,T,C = key_src.shape # (batch_size, seq_len, d_model)

        keys = self.W_keys(key_src) # (batch_size, seq_len, d_model)
        queries = self.W_queries(query_src) # (batch_size, seq_len, d_model)
        values = self.W_values(value_src) # (batch_size, seq_len, d_model)
        
        keys = keys.view(B,T,self.num_heads,self.d_qkv) # (batch_size, seq_len, num_heads, d_qkv)
        queries = queries.view(B,T,self.num_heads,self.d_qkv) # (batch_size, seq_len, num_heads, d_qkv)
        values = values.view(B,T,self.num_heads,self.d_qkv) # (batch_size, seq_len, num_heads, d_qkv)

        atn_scr = queries @ keys.transpose(-2,-1) # (batch_size, seq_len, num_heads, num_heads)
        scaled_atn_scr = atn_scr / self.d_qkv**0.5
        if mask is not None:
            scaled_atn_scr = scaled_atn_scr.masked_fill(mask==0,'-inf')

        attention_weights = torch.softmax(scaled_atn_scr, dim=-1) # (batch_size, seq_len, num_heads, num_heads)
        out = attention_weights @ values # (batch_size, seq_len, num_heads, d_qkv)
        out = out.transpose(1,2) # (batch_size, num_heads, seq_len, d_qkv)
        out = out.reshape(B,T,C) # (batch_size, seq_len, d_model)

        return out

In [45]:
class FeedForwardNet(nn.Module):
    def __init__(self, d_model, forward_expansion):
        super(FeedForwardNet, self).__init__()
        self.fc1 = nn.Linear(d_model, d_model * forward_expansion)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(d_model*forward_expansion, d_model)
        
    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        return out

In [46]:
class EncoderStack(nn.Module):
    def __init__(self,d_model, num_heads, forward_expansion):
        super().__init__()
        self.MHA = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.FFN = FeedForwardNet(d_model=d_model, forward_expansion=forward_expansion)
        self.layer_norm1 = nn.LayerNorm(d_model)
        self.layer_norm2 = nn.LayerNorm(d_model)
    
    def forward(self, x):
        # X = [batch_size, seq_len, d_model]
        out = x + self.MHA(x, x, x) # [batch_size, seq_len, d_model]
        norm_out = self.layer_norm1(out)
        out = norm_out + self.FFN(norm_out) # [batch_size, seq_len, d_model]
        norm_out = self.layer_norm2(out)
        return norm_out

In [47]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, block_size, d_model, num_heads, forward_expansion, num_layers):
        super().__init__()
        self.block_size = block_size
        self.d_model = d_model
        self.embeding_layer = EmbeddingLayer(d_model, vocab_size, block_size)
        self.layers = nn.ModuleList([EncoderStack(d_model, num_heads, forward_expansion) for _ in range(num_layers)])
    
    def forward(self, x, mask):
        x = self.embeding_layer(x)
        for layer in self.layers:
            x = layer(x, mask)
        return x

In [48]:
class DecoderStack(nn.Module):
    def __init__(self, d_model, num_heads, forward_expansion):
        super(DecoderStack, self).__init__()
        self.Masked_MHA = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.Crossed_MHA = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.FFN = FeedForwardNet(d_model=d_model, forward_expansion=forward_expansion)
        self.LayerNorm1 = nn.LayerNorm(d_model)
        self.LayerNorm2 = nn.LayerNorm(d_model)
        self.LayerNorm3 = nn.LayerNorm(d_model)

    def forward(self,x,encoder_out,trg_mask):
        masked_att_out = self.Masked_MHA(x,x,x,trg_mask)
        masked_att_out = self.LayerNorm1(masked_att_out + x)
        crossed_att_out = self.Crossed_MHA(encoder_out,masked_att_out,encoder_out)
        crossed_att_out = self.LayerNorm2(crossed_att_out + masked_att_out)
        ffn_out = self.FFN(crossed_att_out)
        ffn_out = self.LayerNorm3(ffn_out + crossed_att_out)
        return ffn_out

In [50]:
class Decoder(nn.Module):
    def __init__(self,vocab_size, block_size, d_model, num_heads, forward_expansion, num_layers):
        super().__init__()
        self.block_size = block_size
        self.d_model = d_model
        self.embeding_layer = EmbeddingLayer(d_model, vocab_size, block_size)
        self.layers = nn.ModuleList([Decoder(d_model, num_heads, forward_expansion) for _ in range(num_layers)])

    def forward(self, x, encoder_output, trg_mask):
        x = self.embeding_layer(x)
        for layer in self.layers:
            x = layer(x, encoder_output, trg_mask)
        return x

In [51]:
class Transformer(nn.Module):
    def __init__(self, vocab_size, block_size, d_model, nhead, num_encoder_layers, num_decoder_layers,
                  forward_expansion):
        super(Transformer, self).__init__()
        self.encoder = Encoder(vocab_size, block_size, d_model, nhead, forward_expansion, num_encoder_layers)
        self.decoder = Decoder(vocab_size, block_size, d_model, nhead, forward_expansion, num_decoder_layers)
        self.out = nn.Linear(d_model*block_size, vocab_size)
        self.vocab_size = vocab_size

        self.mask = torch.tril(torch.ones((block_size, block_size)))

    def forward(self, src, trg):
        B, T = trg.shape
        enc_src = self.encoder(src, mask=None)
        out = self.decoder(trg, enc_src, self.mask)
        out = self.out(out)
        out = out.reshape(B, self.vocab_size)
        return out
