Let's Create The Transfomrer

# **Includes**

In [None]:
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
import math
import numpy as np
import pandas as pd
import re

# **Transformer**

## **Preprocess**

### **Positinoal Encoding**

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_sequence_length):
        super().__init__()
        self.max_sequence_length = max_sequence_length
        self.d_model = d_model

    def forward(self):
        even_indices = torch.arange(0, self.d_model, 2).float() # d_model/2 # Even indices
        denominator = torch.pow(10000, even_indices/self.d_model) # d_model/2 # Denominator (same for even and odd indices)
        position = torch.arange(self.max_sequence_length).reshape(self.max_sequence_length, 1) # max_sequence_length x 1 # 0 -> max_sequence_length
        even_PE = torch.sin(position / denominator) # max_sequence_length x d_model/2
        odd_PE = torch.cos(position / denominator) # max_sequence_length x d_model/2
        stacked = torch.stack([even_PE, odd_PE], dim=2) # max_sequence_length x d_model/2 x 2
        PE = torch.flatten(stacked, start_dim=1, end_dim=2) # max_sequence_length x d_model
        return PE.unsqueeze(0) # 1 x max_sequence_length x d_model

### **Tokenize and Embedding**


 In PyTorch, tensors live on a specific device (CPU or GPU).\
 Operations cannot mix devices: you can’t add a CPU tensor to a GPU tensor.\
 You have to manually move tensors and models to the same device using .to(device) or .cuda(). (like x = tensor([1, 2, 3]).cuda())\
_______________________________
 TensorFlow has a default device context.\
 When you create a tensor, TF automatically puts it on a “default” device (usually GPU if available).\
 You don’t usually have to manually call .to(device); TensorFlow moves everything under the hood

In [None]:
def get_device(): # Return cuda (GPU) if possible, else return cpu
    return torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [None]:
class SentenceEmbedding(nn.Module):
    "For a given sentence, create an embedding"
    def __init__(self, max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN):
        super().__init__()
        self.vocab_size = len(language_to_index)
        self.max_sequence_length = max_sequence_length
        self.embedding = nn.Embedding(self.vocab_size, d_model) # maps token IDs → dense vectors of size d_model
        self.language_to_index = language_to_index
        self.position_encoder = PositionalEncoding(d_model, max_sequence_length)
        self.dropout = nn.Dropout(p=0.1)
        self.START_TOKEN = START_TOKEN
        self.END_TOKEN = END_TOKEN
        self.PADDING_TOKEN = PADDING_TOKEN
        # compute positional encoding
        self.positional_encoding = self.position_encoder().to(get_device())


    def batch_tokenize(self, batch, start_token, end_token):

        def tokenize(sentence, start_token, end_token): # Tokenize the sentence
            # Convert tokens into indices
            sentence_indices = [self.language_to_index[token] for token in list(sentence)] # Iterate over letters, if u want to iterate over words, use sentence.split(), if you switch to words, make sure the vocabulary is words not letters, also that max_seq_len is in number of words not letters, finally You may want a <UNK> token for words not in your vocabulary, which wasn’t needed for characters.
            # Insert start token at the begining
            if start_token:
                sentence_indices.insert(0, self.language_to_index[self.START_TOKEN])
            # Append the end token at the end
            if end_token:
                sentence_indices.append(self.language_to_index[self.END_TOKEN])
            # Pad to max_sequence_length
            for _ in range(len(sentence_indices), self.max_sequence_length):
                sentence_indices.append(self.language_to_index[self.PADDING_TOKEN])
            # Truncate if too long
            sentence_indices = sentence_indices[:self.max_sequence_length]
            return torch.tensor(sentence_indices)

        tokenized = []
        for sentence in range(len(batch)): # Iterate through sentences in the batch
           tokenized.append(tokenize(batch[sentence], start_token, end_token)) # Tokenize the sentence
        # tokenized now is a python list of tensor each with length max_seq_len, the list has batch_size tensors
        # We want to convert this list of tensors into a 2d tensor with shape batch_size x max_seq_len
        tokenized = torch.stack(tokenized)
        return tokenized.to(get_device()) # get_device() returns the device (GPU, CPU ..) then .to(device) moves the tensor to that device
        # This ensures that your tokenized tensor is on the same device as the model before you feed it into the embedding layer. Without it, you could get a device mismatch error .
    def forward(self, x, start_token, end_token): # sentence
        x = self.batch_tokenize(x, start_token, end_token)
        x = self.embedding(x) # Convert tokens into embeddings
        x = self.dropout(x + self.positional_encoding) # When you do x + positional_encoding, PyTorch requires both tensors to be on the same device. If x is on GPU but pos is still on CPU, you would get an error, that's why we also moved the positional encoding to gpu.
        return x
        # Now x is on GPU (if possible), any further computation on it, the output will be on the same device. If a tensor is computed from other tensors already on the correct device, it automatically stays on that device.


### **Masks**

In [None]:
class Masks(nn.Module):
  def __init__(self, max_seq_len):
    super().__init__()
    self.max_seq_len = max_seq_len
    # Compute the look ahead mask in __init__ only once as it's the same for all batches avoiding recreating it every forward pass — more efficient.
    self.look_ahead_mask = torch.triu(torch.ones((self.max_seq_len, self.max_seq_len), dtype = torch.bool), diagonal = 1) # All elements below diagonal + diagonal are False

  def forward(self, enc_lang_batch, dec_lang_batch,
                  enc_start_token, enc_end_token,
                  dec_start_token, dec_end_token, NEG_INF = -1e9):
          num_sentences = len(enc_lang_batch)
          # Create the masks with boolean values where Trues will be replaced by -inf and Falses with 0
          encoder_padding_mask_selfAttention = torch.zeros((num_sentences, self.max_seq_len, self.max_seq_len), dtype = torch.bool) # the matrix is filled with zeros, then making the type boolean converts them to False
          decoder_padding_mask_selfAttention = torch.zeros((num_sentences, self.max_seq_len, self.max_seq_len), dtype = torch.bool)
          decoder_padding_mask_crossAttention = torch.zeros((num_sentences, self.max_seq_len, self.max_seq_len), dtype = torch.bool)
          # You can shape the masks as [num_sentences, 1, max_seq_len, max_seq_len] so you can remove the permutation in the
          # scaled product function and let the masks broadcast through the heads
          # To do this, in the end of this function, add: encoder_padding_mask_selfAttention = encoder_padding_mask_selfAttention.unsqueeze(1) (same for the other two)

          for idx in range(num_sentences):
            enc_lang_sent_len, dec_lang_sent_len = len(enc_lang_batch[idx]), len(dec_lang_batch[idx])

            enc_padding_start = enc_lang_sent_len + (1 if enc_start_token else 0) + (1 if enc_end_token else 0)
            enc_lang_padding = np.arange(enc_padding_start , self.max_seq_len)

            dec_padding_start = dec_lang_sent_len + (1 if dec_start_token else 0) + (1 if dec_end_token else 0)
            dec_lang_padding = np.arange(dec_padding_start , self.max_seq_len)

            encoder_padding_mask_selfAttention[idx, :, enc_lang_padding] = True
            encoder_padding_mask_selfAttention[idx, enc_lang_padding, :] = True

            decoder_padding_mask_selfAttention[idx, :, dec_lang_padding] = True
            decoder_padding_mask_selfAttention[idx, dec_lang_padding, :] = True

            decoder_padding_mask_crossAttention[idx, :, enc_lang_padding] = True
            # decoder_padding_mask_crossAttention[idx, dec_lang_padding, :] = True # Remove the line masking decoder padding in cross-attention.
            # It is redundant and incorrect; only encoder padding should be masked there.

          encoder_mask_selfAttention = torch.where(encoder_padding_mask_selfAttention, NEG_INF, 0)
          decoder_mask_selfAttention = torch.where(decoder_padding_mask_selfAttention | self.look_ahead_mask, NEG_INF, 0)
          decoder_mask_crossAttention = torch.where(decoder_padding_mask_crossAttention, NEG_INF, 0)

          encoder_mask_selfAttention = encoder_mask_selfAttention.unsqueeze(1)
          decoder_mask_selfAttention = decoder_mask_selfAttention.unsqueeze(1)
          decoder_mask_crossAttention = decoder_mask_crossAttention.unsqueeze(1)

          return encoder_mask_selfAttention.to(get_device()), decoder_mask_selfAttention.to(get_device()), decoder_mask_crossAttention.to(get_device())

## **Methods**

### Multi-Head Attention

In [None]:
# Multi-head attention
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model # d_model
        self.num_heads = num_heads # num_heads
        self.head_dim = d_model // num_heads # head_dim
        self.qkv_layer = nn.Linear(d_model , 3 * d_model) # d_model x 3*d_model
        self.linear_layer = nn.Linear(d_model, d_model) # d_model

    def forward(self, x, mask=None):
        batch_size, max_sequence_length, d_model = x.size()  # Batch x max_seq_len, d_model
        qkv = self.qkv_layer(x) # Batch x max_seq_len x 3*d_model
        qkv = qkv.reshape(batch_size, max_sequence_length, self.num_heads, 3 * self.head_dim) # Batch x max_seq_len x num_heads x 3*head_dim
        qkv = qkv.permute(0, 2, 1, 3) # Batch x num_heads x max_seq_len x 3*head_dim
        q, k, v = qkv.chunk(3, dim=-1) # Batch x num_heads x max_seq_len x head_dim each
        values, attention = self.scaled_dot_product(q, k, v, mask) # Batch x num_heads x max_seq_len x head_dim
        values = values.permute(0, 2, 1, 3).reshape(batch_size, max_sequence_length, self.num_heads * self.head_dim) # value = Batch x max_seq_len x d_model, attention = Batch x num_heads x max_seq_len x max_seq_len
        out = self.linear_layer(values) # Batch x max_seq_len x d_model
        return out

    def scaled_dot_product(self, q, k, v, mask=None):
        # q, k, v are Batch x num_heads x max_seq_len x head_dim each
        d_k = q.size()[-1] # head_dim
        scaled = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(d_k) # Batch x num_heads x max_seq_len x max_seq_len
        if mask is not None:
            scaled += mask
        attention = F.softmax(scaled, dim=-1) # Batch x num_heads x max_seq_len x max_seq_len
        values = torch.matmul(attention, v) # Batch x num_heads x max_seq_len x head_dim
        return values, attention

### Multi-Head Cross Attention

In [None]:
# MultiHead Cross Attention
class MultiHeadCrossAttention(nn.Module):
  def __init__(self, d_model, num_heads):
      super().__init__()
      self.d_model = d_model
      self.num_heads = num_heads
      self.head_dim = d_model // num_heads
      self.kv_layer = nn.Linear(d_model , 2 * d_model) # 1024
      self.q_layer = nn.Linear(d_model , d_model)
      self.linear_layer = nn.Linear(d_model, d_model)

  def forward(self, x, y, mask=None):
      batch_size, sequence_length, d_model = x.size() # Batch x max_seq_len x d_model
      kv = self.kv_layer(x) # Batch x max_seq_len x 1024 # Output of Encoder to generate the keys and values
      q = self.q_layer(y) # Batch x max_seq_len x d_model
      kv = kv.reshape(batch_size, sequence_length, self.num_heads, 2 * self.head_dim)  # Batch x max_seq_len x num_heads x 2*head_dim
      q = q.reshape(batch_size, sequence_length, self.num_heads, self.head_dim)  # Batch x max_seq_len x num_heads x head_dim
      kv = kv.permute(0, 2, 1, 3) # Batch x num_heads x max_seq_len x 2*head_dim
      q = q.permute(0, 2, 1, 3) # Batch x num_heads x max_seq_len x head_dim
      k, v = kv.chunk(2, dim=-1) # K: Batch x num_heads x max_seq_len x head_dim, v: Batch x num_heads x max_seq_len x head_dim
      values, attention = scaled_dot_product(q, k, v, mask) #  Batch x num_heads x max_seq_len x head_dim
      values = values.permute(0, 2, 1, 3).reshape(batch_size, sequence_length, d_model) #  Batch x max_seq_len x d_model
      out = self.linear_layer(values)  #  Batch x max_seq_len x d_model
      return out  #  Batch x max_seq_len x d_model

### Layer Normalization

In [None]:
# Layer Normalization
class LayerNormalization(nn.Module):
    def __init__(self, parameters_shape, eps=1e-5):
        super().__init__()
        self.parameters_shape=parameters_shape # d_model
        self.eps=eps
        self.gamma = nn.Parameter(torch.ones(parameters_shape)) # d_model
        self.beta =  nn.Parameter(torch.zeros(parameters_shape)) # d_model

    def forward(self, inputs): # Batch x max_seq_len x d_model
        dims = [-(i + 1) for i in range(len(self.parameters_shape))] # [-1]
        mean = inputs.mean(dim=dims, keepdim=True) # Batch x max_seq_len x 1 (1 bcs of keepdim=True)
        var = ((inputs - mean) ** 2).mean(dim=dims, keepdim=True) # Batch x max_seq_len x 1
        std = (var + self.eps).sqrt() # Batch x max_seq_len x d_model
        y = (inputs - mean) / std # Batch x max_seq_len x d_model
        out = self.gamma * y  + self.beta # Batch x max_seq_len x d_model
        return out

### Position-wise FeedForward

In [None]:
# Postion-wise Feedforward
class PositionwiseFeedForward(nn.Module):

    def __init__(self, d_model, hidden, drop_prob=0.1):
        super(PositionwiseFeedForward, self).__init__() # After python 3, you can remove the class name and self arguments from super, just write super().__init__() like we did in the classes above
        self.linear1 = nn.Linear(d_model, hidden) # d_model x 2045
        self.linear2 = nn.Linear(hidden, d_model) # 2048 x d_model
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=drop_prob)

    def forward(self, x): # Batch x max_seq_len x d_model
        x = self.linear1(x) # Batch x max_seq_len x 2048
        x = self.relu(x) # Batch x max_seq_len x 2048
        x = self.dropout(x) # Batch x max_seq_len x 2048
        x = self.linear2(x) # Batch x max_seq_len x d_model
        return x

## **Encoder**

### Encoder Layer

In [None]:
# Single Encoder Layer
class EncoderLayer(nn.Module):
  def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
    super(EncoderLayer, self).__init__() # After python 3, you can remove the class name and self arguments from super, just write super().__init__() like we did in the classes above
    self.attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
    self.norm1 = LayerNormalization(parameters_shape=[d_model])
    self.dropout1 = nn.Dropout(p=drop_prob)
    self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
    self.norm2 = LayerNormalization(parameters_shape=[d_model])
    self.dropout2 = nn.Dropout(p=drop_prob)

  def forward(self, x, mask):
      residual_x = x # Batch x max_seq_len x d_model
      x = self.attention(x, mask=mask) # Batch x max_seq_len x d_model
      x = self.dropout1(x) # Batch x max_seq_len x d_model
      x = self.norm1(x + residual_x) # Batch x max_seq_len x d_model
      residual_x = x # Batch x max_seq_len x d_model
      x = self.ffn(x) # Batch x max_seq_len x d_model
      x = self.dropout2(x) # Batch x max_seq_len x d_model
      x = self.norm2(x + residual_x) # Batch x max_seq_len x d_model
      return x

### **Sequential Encoder**

In [None]:
class SequentialEncoder(nn.Sequential):
    def forward(self, *inputs):
        x, self_attention_mask  = inputs
        for module in self._modules.values():
            x = module(x, self_attention_mask)
        return x

### Full Encoder

In [None]:
class Encoder(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers,max_sequence_length,language_to_index,START_TOKEN,END_TOKEN, PADDING_TOKEN):
        super().__init__()
        self.sentence_embedding = SentenceEmbedding(max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
        self.layers = SequentialEncoder(*[EncoderLayer(d_model, ffn_hidden, num_heads, drop_prob)
                                      for _ in range(num_layers)])

    def forward(self, x, self_attention_mask, start_token, end_token):
        x = self.sentence_embedding(x, start_token, end_token)
        x = self.layers(x, self_attention_mask)
        return x

## **Decoder**

  ### Decoder Layer


In [None]:
# Decoder Layer
class DecoderLayer(nn.Module):

    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob,):
        super(DecoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.norm1 = LayerNormalization(parameters_shape=[d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)

        self.multi_head_cross_attention = MultiHeadCrossAttention(d_model=d_model, num_heads=num_heads)
        self.norm2 = LayerNormalization(parameters_shape=[d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)

        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm3 = LayerNormalization(parameters_shape=[d_model])
        self.dropout3 = nn.Dropout(p=drop_prob)

    def forward(self, x, y, self_attention_mask, cross_attention_mask):
        residual_y = y # Batch x max_seq_len x d_model
        y = self.self_attention(y, mask=self_attention_mask) # Batch x max_seq_len x d_model
        y = self.dropout1(y) # Batch x max_seq_len x d_model
        y = self.norm1(y + residual_y) # Batch x max_seq_len x d_model

        residual_y = y # Batch x max_seq_len x d_model
        y = self.multi_head_cross_attention(x, y, mask=cross_attention_mask) #Batch x max_seq_len x d_model
        y = self.dropout2(y)
        y = self.norm2(y + residual_y)  #Batch x max_seq_len x d_model

        residual_y = y  #Batch x max_seq_len x d_model
        y = self.ffn(y) #Batch x max_seq_len x d_model
        y = self.dropout3(y) #Batch x max_seq_len x d_model
        y = self.norm3(y + residual_y) #Batch x max_seq_len x d_model
        return y #Batch x max_seq_len x d_model

### Sequential Decoder

In [None]:
# Sequential Decoder
# We implement our own sequential because we want to pass more than one parameter x, y, and mask
# and the normal sequential accepts only one, so this extends sequential
class SequentialDecoder(nn.Sequential):
    def forward(self, *inputs):
        x, y, self_attention_mask, cross_attention_mask = inputs
        for module in self._modules.values():
            y = module(x, y, self_attention_mask, cross_attention_mask) #Batch x max_seq_len x d_model # the output y is passed to the next decoder layer and so on
        return y

### Full Decoder

In [None]:
# Full Decoder
class Decoder(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers,max_sequence_length, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN):
        super().__init__()
        self.sentence_embedding = SentenceEmbedding(max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
        self.layers = SequentialDecoder(*[DecoderLayer(d_model, ffn_hidden, num_heads, drop_prob) for _ in range(num_layers)])

    def forward(self, x, y, self_attention_mask, cross_attention_mask, start_token, end_token):
        #x : Input (output of decoder):  Batch x max_seq_len x d_model
        #y : Previous Decoder output(ground-truth for first decoder) :  Batch x max_seq_len x d_model
        #mask : max_seq_len x max_seq_len
        y = self.sentence_embedding(y, start_token, end_token)
        y = self.layers(x, y, self_attention_mask, cross_attention_mask)
        return y # Batch x max_seq_len x d_model

## **Full Transformer**

In [None]:
class Transformer(nn.Module):
    def __init__(self,
                d_model,
                ffn_hidden,
                num_heads,
                drop_prob,
                num_layers,
                max_sequence_length,
                out_lang_vocab_size,
                input_language_to_index,
                output_language_to_index,
                START_TOKEN,
                END_TOKEN,
                PADDING_TOKEN
                ):
        super().__init__()
        self.create_masks = Masks(max_sequence_length)
        self.encoder = Encoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers, max_sequence_length, input_language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
        self.decoder = Decoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers, max_sequence_length, output_language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
        self.linear = nn.Linear(d_model, out_lang_vocab_size)

        self.device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

    def forward(self,
                x,
                y,
                enc_start_token=False,
                enc_end_token=False,
                dec_start_token=True, # We should make this true
                dec_end_token=False): # x, y are batch of sentences
        encoder_self_attention_mask, decoder_self_attention_mask, decoder_cross_attention_mask = self.create_masks(x, y, enc_start_token, enc_end_token, dec_start_token, dec_end_token)
        x = self.encoder(x, encoder_self_attention_mask, start_token=enc_start_token, end_token=enc_end_token)
        out = self.decoder(x, y, decoder_self_attention_mask, decoder_cross_attention_mask, start_token=dec_start_token, end_token=dec_end_token)
        out = self.linear(out)
        return out

# **Dataset**

In [None]:
df = pd.read_csv("hf://datasets/salehalmansour/english-to-arabic-translate/en_ar_final.tsv", sep="\t")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [None]:
# Get all rows that contain at least one null value
rows_with_null = df[df.isnull().any(axis=1)] # axis=1 to search through columns

print(f'Number of null rows = {len(rows_with_null)}\n')
print("Null rows:")
print(rows_with_null)
print("_" * 100)

print("Null count per column:")
print(df.isnull().sum())

# Nulls in specific columns
en_null = df[df['en'].isnull()]
ar_null = df[df['ar'].isnull()]

print("_" * 100)
print(f'Number of English nulls = {len(en_null)}')
print(f'Number of Arabic nulls = {len(ar_null)}')
print("_" * 100)


# Drop rows with any null values
df = df.dropna()

Number of null rows = 12

Null count per column:
en    12
ar     0
dtype: int64
____________________________________________________________________________________________________
Number of English nulls = 12
Number of Arabic nulls = 0
____________________________________________________________________________________________________
Null rows:
          en                           ar
801677   NaN  >التدابير الرئيسية الحالية<
801678   NaN  >التدابير الرئيسية الحالية<
801679   NaN  >التدابير الرئيسية الحالية<
1007977  NaN         >الأسلحة البيولوجية<
1007978  NaN         >الأسلحة الكيميائية<
1007979  NaN            >الأسلحة النووية<
1007982  NaN         >الأسلحة البيولوجية<
1007986  NaN         >الأسلحة الكيميائية<
1007987  NaN            >الأسلحة النووية<
1008001  NaN            >الأسلحة النووية<
1008002  NaN              >وسائل إيصالها<
1008003  NaN         >الأسلحة الكيميائية<


In [None]:
# values without Nulls
Arabic = df['ar']
English = df['en']
print(len(Arabic))
print(len(English))

1325887
1325887


In [None]:
# Vocabulary
START_TOKEN = '<START>'
END_TOKEN = '<END>'
PADDING_TOKEN = '<PADDING>'

arabic_voc = [
     START_TOKEN, END_TOKEN, PADDING_TOKEN, ' ', '؟', '!', '.', ',',  #  punctuation
    '0','1','2','3','4','5','6','7','8','9',  # digits
    'ء', 'آ', 'أ', 'ؤ', 'إ', 'ئ', 'ا', 'ب', 'ة', 'ت', 'ث', 'ج', 'ح', 'خ', 'د', # letter
    'ذ', 'ر', 'ز', 'س', 'ش', 'ص', 'ض', 'ط', 'ظ', 'ع', 'غ', 'ف', 'ق', 'ك', 'ل',
    'م', 'ن', 'ه', 'و', 'ى', 'ي']

english_voc = [START_TOKEN, END_TOKEN, PADDING_TOKEN, ' ', '?', '!', '.', ',',  #  punctuation
                        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', # digits
                        'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', # letter
                        'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x',
                        'y', 'z']


In [None]:
# character to index maps
arabic_to_index = {c:i for i, c in enumerate(arabic_voc)}
english_to_index = {c:i for i, c in enumerate(english_voc)}
index_to_arabic = {i:c for i, c in enumerate(arabic_voc)}
index_to_english = {i:c for i, c in enumerate(english_voc)}
print(arabic_to_index)
print(english_to_index)
print(index_to_arabic)
print(index_to_english)

{'<START>': 0, '<END>': 1, '<PADDING>': 2, ' ': 3, '؟': 4, '!': 5, '.': 6, ',': 7, '0': 8, '1': 9, '2': 10, '3': 11, '4': 12, '5': 13, '6': 14, '7': 15, '8': 16, '9': 17, 'ء': 18, 'آ': 19, 'أ': 20, 'ؤ': 21, 'إ': 22, 'ئ': 23, 'ا': 24, 'ب': 25, 'ة': 26, 'ت': 27, 'ث': 28, 'ج': 29, 'ح': 30, 'خ': 31, 'د': 32, 'ذ': 33, 'ر': 34, 'ز': 35, 'س': 36, 'ش': 37, 'ص': 38, 'ض': 39, 'ط': 40, 'ظ': 41, 'ع': 42, 'غ': 43, 'ف': 44, 'ق': 45, 'ك': 46, 'ل': 47, 'م': 48, 'ن': 49, 'ه': 50, 'و': 51, 'ى': 52, 'ي': 53}
{'<START>': 0, '<END>': 1, '<PADDING>': 2, ' ': 3, '?': 4, '!': 5, '.': 6, ',': 7, '0': 8, '1': 9, '2': 10, '3': 11, '4': 12, '5': 13, '6': 14, '7': 15, '8': 16, '9': 17, 'a': 18, 'b': 19, 'c': 20, 'd': 21, 'e': 22, 'f': 23, 'g': 24, 'h': 25, 'i': 26, 'j': 27, 'k': 28, 'l': 29, 'm': 30, 'n': 31, 'o': 32, 'p': 33, 'q': 34, 'r': 35, 's': 36, 't': 37, 'u': 38, 'v': 39, 'w': 40, 'x': 41, 'y': 42, 'z': 43}


In [None]:
# Convert vocab to string for regex (escape special characters)
# re.escape(c) → ensures special characters like ?, ., ! are treated literally in regex
arabic_chars = ''.join([re.escape(c) for c in arabic_voc])
english_chars = ''.join([re.escape(c) for c in english_voc])
print(arabic_chars)
print(english_chars)

# Optional Arabic normalization function
def normalize_arabic(text):
    text = str(text)
    # remove diacritics (tashkeel) if present
    text = re.sub(r'[\u0617-\u061A\u064B-\u0652]', '', text)
    # normalize Alef variants
    text = text.replace('آ', 'ا').replace('أ', 'ا').replace('إ', 'ا')
    # normalize final Yaa
    text = text.replace('ى', 'ي')
    return text

# Remvoe characters from Arabic column not in Arabic vocabulary
# f"[^{arabic_chars}]" → matches any character NOT in vocabulary
# re.sub(..., "", x) → removes all characters not in the list
# Preprocessing Arabic
Arabic = Arabic.apply(lambda x: re.sub(f"[^{arabic_chars}]", "", normalize_arabic(x)))

# Preprocessing English
# Remvoe characters from English column not in English vocabulary
English = English.apply(lambda x: re.sub(f"[^{english_chars}]", "", str(x).lower()))

<START><END><PADDING>\ ؟!\.,0123456789ءآأؤإئابةتثجحخدذرزسشصضطظعغفقكلمنهوىي
<START><END><PADDING>\ \?!\.,0123456789abcdefghijklmnopqrstuvwxyz


In [None]:
# English vodab
print(f'English vocabulary: {english_voc}')
print(f'English vocab size = {len(english_voc)}')
# English to index map
print(f'English to index map: {english_to_index}')
# Index to English map
print(f'Index to English map: {index_to_english}')
print("_"*10)
# Arabic vodab
print(f'Arabic vocabulary {arabic_voc}')
print(f'Arabic vocab size = {len(arabic_voc)}')
# Arabic to index map
print(f'Arabic to index map: {arabic_to_index}')
# Index to Arabic map
print(f'Index to Arabic map: {index_to_arabic}')
print("_"*10)

# random_idx = np.random.randint(0, len(English))
# print(f'Random English sentence:\n{English.iloc[random_idx]}')
# print(f'It\'s Arabic Translation:\n{Arabic.iloc[random_idx]}')

English vocabulary: ['<START>', '<END>', '<PADDING>', ' ', '?', '!', '.', ',', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
English vocab size = 44
English to index map: {'<START>': 0, '<END>': 1, '<PADDING>': 2, ' ': 3, '?': 4, '!': 5, '.': 6, ',': 7, '0': 8, '1': 9, '2': 10, '3': 11, '4': 12, '5': 13, '6': 14, '7': 15, '8': 16, '9': 17, 'a': 18, 'b': 19, 'c': 20, 'd': 21, 'e': 22, 'f': 23, 'g': 24, 'h': 25, 'i': 26, 'j': 27, 'k': 28, 'l': 29, 'm': 30, 'n': 31, 'o': 32, 'p': 33, 'q': 34, 'r': 35, 's': 36, 't': 37, 'u': 38, 'v': 39, 'w': 40, 'x': 41, 'y': 42, 'z': 43}
__________
Arabic vocabulary ['<START>', '<END>', '<PADDING>', ' ', '؟', '!', '.', ',', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'ء', 'آ', 'أ', 'ؤ', 'إ', 'ئ', 'ا', 'ب', 'ة', 'ت', 'ث', 'ج', 'ح', 'خ', 'د', 'ذ', 'ر', 'ز', 'س', 'ش', 'ص', 'ض', 'ط', 'ظ', 'ع', 'غ', 'ف', 'ق', 'ك', 'ل', 'م

In [None]:
# Convert inot lists
english = English.tolist()
arabic = Arabic.tolist()

In [None]:
print(f'Number of examples: {len(english)}')

random_idx = np.random.randint(0, len(English))
print(f'Random English sentence:\n{english[random_idx]}')
print(f'It\'s Arabic Translation:\n{arabic[random_idx]}')

1325887
zuma rising
صعود نجم زوما


In [None]:
# Set the max sentece length to be more than the length of 98% of th data
english_98th_len  = np.percentile([ len(sentence) for sentence in english], 98)
arabic_98th_len = np.percentile([ len(sentence) for sentence in arabic], 98)
max_sequence_length  = min(english_98th_len, arabic_98th_len)


# check of the sentence characters all in the vocabulary
def is_valid_tokens(sentence, vocab):
    for token in list(set(sentence)):
        if token not in vocab:
            return False
    return True

# check if sentnece is shorter than the max sentence length
def is_valid_length(sentence, max_sequence_length):
    return len(list(sentence)) < (max_sequence_length)


valid_sentence_indicies = []
for index in range(len(arabic)):
    arabic_sentence, english_sentence = arabic[index], english[index]
    if is_valid_length(arabic_sentence, max_sequence_length) \
      and is_valid_length(english_sentence, max_sequence_length) \
      and is_valid_tokens(arabic_sentence, arabic_voc) and is_valid_tokens(english_sentence, english_vocab):
        valid_sentence_indicies.append(index)

print(f"Number of sentences: {len(arabic)}")
print(f"Number of valid sentences: {len(valid_sentence_indicies)}")

Number of sentences: 1325887
Number of valid sentences: 1290630


In [None]:
# Only continue with valid sentences
arabic_sentences = [arabic[i] for i in valid_sentence_indicies]
english_sentences = [english[i] for i in valid_sentence_indicies]

# Set a mximum number of examples
TOTAL_SENTENCES = 1000000
arabic_sentences = arabic_sentences[:TOTAL_SENTENCES]
english_sentences = english_sentences[:TOTAL_SENTENCES]

In [None]:

class TextDataset(Dataset):

    def __init__(self, english_sentences, arabic_sentences):
        self.english_sentences = english_sentences
        self.arabic_sentences = arabic_sentences

    def __len__(self):
        return len(self.english_sentences)

    def __getitem__(self, idx):
        return self.english_sentences[idx], self.arabic_sentences[idx]


dataset = TextDataset(english_sentences, arabic_sentences)
# dataset[5:10]
print(len(dataset))

1000000


In [None]:
batch_size = 128
train_loader = DataLoader(dataset, batch_size)
iterator = iter(train_loader)

print("First 3 batches:")
for batch_num, batch in enumerate(iterator):
  if batch_num>2:
    break
  batch_eng, batch_ar = batch
  print(batch_eng)
  print(batch_ar)

('and this', 'it was um', 'what is she doing here', 'i dont like it', 'did you get the part', ' its none of your business', ' uhhuh', 'i was wrong', ' others', 'im much your majesty', 'do i make myself clear', 'thank you', 'i bet you do', 'thats enough', 'yeah of course', 'im him', 'you okay', 'no no', 'shes all yours', 'you want the truth', 'how about you', ' goodbye', ' thats okay', 'im sorry', 'i must go', 'maybe we have something in common', 'right alright alright relax', ' have a seat', 'hows it goin', ' are you all right', 'whats up', ' what did you do', 'what happened', 'is this all youve got', 'ma is the greatest he bought it and delivered it', ' that right', 'as in mother id like to fuck', ' ooh ooh ooh honey', 'what is your name', 'what was that', 'smells good', 'from what', 'it wasnt easy', 'and you', 'i dont think so', 'just take your time', ' over here', 'hell no', 'excuse me', 'what are you waiting for', 'miss prescott goodbye', 'like you', 'this is why youve got me out o

In [None]:
# split the dataset into train and test sets
train_size = int(0.9 * len(dataset))   # 90% train
test_size  = len(dataset) - train_size # 10% test

train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

print(f'Size of training set: {len(train_dataset)}')
print(f'Size of test set: {len(test_dataset)}')

('fifth committee', 'اللجنة الخامسة')


# **Train**

In [None]:
d_model = 512
batch_size = 30
ffn_hidden = 2048
num_heads = 8
drop_prob = 0.1
num_layers = 1
max_sequence_length = 80
ar_vocab_size = len(arabic_voc)
batch_size = 128
num_epochs = 10

In [None]:
transformer = Transformer(d_model,
                          ffn_hidden,
                          num_heads,
                          drop_prob,
                          num_layers,
                          max_sequence_length,
                          ar_vocab_size,
                          english_to_index,
                          arabic_to_index,
                          START_TOKEN,
                          END_TOKEN,
                          PADDING_TOKEN)

In [None]:
transformer

In [None]:
def train_transformer(transformer, dataset, batch_size=128, num_epochs, lr=1e-4, device=None):
    # check which device is available
    if device is None:
        device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
    transformer.to(device)

    # Load the training dataset
    train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    PADDING_TOKEN = transformer.encoder.sentence_embedding.PADDING_TOKEN
    token_to_index = transformer.encoder.sentence_embedding.language_to_index

    # Loss function ignoring padding
    criterion = nn.CrossEntropyLoss(ignore_index=token_to_index[PADDING_TOKEN], reduction = 'none') # none to get the loss of each sentence, set it to mean to get the mean loss of the batch (a single scaler)

    # Optimizer
    optimizer = torch.optim.Adam(transformer.parameters(), lr=lr)

    # Xavier initialization for weights > 1D
    for param in transformer.parameters():
        if param.dim() > 1:
            nn.init.xavier_uniform_(param)

    for epoch in range(num_epochs):
        print(f"______________________________ Epoch {epoch + 1}/{num_epochs} ______________________________")
        transformer.train()

        for batch_num, batch in enumerate(train_loader):
            # Unpack batch and ensure each sentence is a list
            src_sentences, tgt_sentences = batch
            src_sentences = list(src_sentences)
            tgt_sentences = list(tgt_sentences)

            optimizer.zero_grad()

            # Forward pass
            predictions = transformer(
                src_sentences,
                tgt_sentences,
                enc_start_token=False,
                enc_end_token=False,
                dec_start_token=True,
                dec_end_token=True
            )

            # Tokenize target sentences for loss (the ground truth)
            labels = transformer.decoder.sentence_embedding.batch_tokenize(
                tgt_sentences, start_token=False, end_token=True
            ).to(device)

            # Compute loss
            loss = criterion(predictions.view(-1, predictions.size(-1)), labels.view(-1))
            # Compute mean loss without the padding tokens
            valid_indicies = torch.where(labels.view(-1) == token_to_index[PADDING_TOKEN], False, True)
            # Compute average loss through the batch
            loss = loss.sum() / valid_indicies.sum()
            # Or also compute the mean using:
            # loss_value = loss.sum() / (labels.view(-1) != padding_idx).sum()

            # Backpropagation
            loss.backward()
            # Update parameters
            optimizer.step()

            if (batch_num + 1) % 100 == 0 or (batch_num + 1) == len(train_loader):
                print(f"Epoch {epoch + 1}, Batch {batch_num + 1}/{len(train_loader)}, Loss: {loss.item():.4f}")

In [None]:
train_transformer(transformer, train_dataset, batch_size, num_epochs)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Batch 6400/7032, Loss: 1.1972
Batch 6410/7032, Loss: 1.1978
Batch 6420/7032, Loss: 1.1237
Batch 6430/7032, Loss: 1.2404
Batch 6440/7032, Loss: 1.2087
Batch 6450/7032, Loss: 1.1822
Batch 6460/7032, Loss: 1.2713
Batch 6470/7032, Loss: 1.1681
Batch 6480/7032, Loss: 1.2210
Batch 6490/7032, Loss: 1.1638
Batch 6500/7032, Loss: 1.1429
Batch 6510/7032, Loss: 1.1796
Batch 6520/7032, Loss: 1.2902
Batch 6530/7032, Loss: 1.2279
Batch 6540/7032, Loss: 1.2637
Batch 6550/7032, Loss: 1.1877
Batch 6560/7032, Loss: 1.2156
Batch 6570/7032, Loss: 1.2784
Batch 6580/7032, Loss: 1.3165
Batch 6590/7032, Loss: 1.2908
Batch 6600/7032, Loss: 1.3150
Batch 6610/7032, Loss: 1.1442
Batch 6620/7032, Loss: 1.1963
Batch 6630/7032, Loss: 1.0852
Batch 6640/7032, Loss: 1.0901
Batch 6650/7032, Loss: 1.1132
Batch 6660/7032, Loss: 1.3150
Batch 6670/7032, Loss: 1.2079
Batch 6680/7032, Loss: 1.1327
Batch 6690/7032, Loss: 1.1520
Batch 6700/7032, Loss: 1.2450
Batch

In [None]:
# Save parameters
save_dir = "/content/transformer.pt"
torch.save(transformer.state_dict(), save_dir)

# Load parameters to same architecture transformer
checkpoint = torch.load(save_dir, map_location="cuda")
transformer.load_state_dict(checkpoint)


# **Test**

In [None]:
def test_transformer(transformer, test_dataset, batch_size=128, device=None):
    if dNevice is None:
        device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
    transformer.to(device)

    # Load test set
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    # Get padding token index
    PADDING_TOKEN = transformer.encoder.sentence_embedding.PADDING_TOKEN
    token_to_index = transformer.encoder.sentence_embedding.language_to_index
    padding_idx = token_to_index[PADDING_TOKEN]

    # Same loss setup as training (ignoring padding)
    criterion = nn.CrossEntropyLoss(ignore_index=padding_idx, reduction='none')

    total_loss = 0
    total_batches = 0

    transformer.eval()
    with torch.no_grad():
        for batch_num, batch in enumerate(test_loader):
            src_sentences, tgt_sentences = batch
            src_sentences = list(src_sentences)
            tgt_sentences = list(tgt_sentences)

            # Forward pass
            predictions = transformer(
                src_sentences,
                tgt_sentences,
                enc_start_token=False,
                enc_end_token=False,
                dec_start_token=True,
                dec_end_token=True
            )

            # Tokenize the true sentences
            labels = transformer.decoder.sentence_embedding.batch_tokenize(
                tgt_sentences, start_token=False, end_token=True
            ).to(device)

            # Compute per-token loss
            loss = criterion(predictions.view(-1, predictions.size(-1)),
                             labels.view(-1))

            # Mask out padding and compute mean
            valid = labels.view(-1) != padding_idx
            loss = loss.sum() / valid.sum()

            total_loss += loss.item()
            total_batches += 1

    avg_loss = total_loss / total_batches
    print(f"\nTest Loss: {avg_loss:.4f}")

    return avg_loss

In [None]:
print(test_transformer(transformer, test_dataset))

# **Translate**

In [1]:
def translate(ip_sentence, device=None):
  if device is None:
      device = torch.device('cuda') if torch.cuda.is_available() else 'cpu'

  transformer.eval()
  transformer.to(device) # Ensure the model is on the correct device

  ip_sentence = (ip_sentence.lower(),)
  op_sentence = ("",)

  max_sequence_length = transformer.decoder.sentence_embedding.max_sequence_length
  # lookup table
  idx_to_token = transformer.decoder.sentence_embedding.language_to_index

  END_TOKEN = transformer.decoder.sentence_embedding.END_TOKEN
  END_IDX = idx_to_token[END_TOKEN]


  for step in range(max_sequence_length):
    predictions = transformer(
            ip_sentence,
            op_sentence,
            enc_start_token=False,
            enc_end_token=False,
            dec_start_token=True,   # important
            dec_end_token=False     # we don't want end token in interference
        )

    next_token_prob_distribution = predictions[0][step]
    next_token_index = torch.argmax(next_token_prob_distribution).item() # .item() to convert a 0-dim PyTorch tensor into a plain Python number.
    next_token = idx_to_token[next_token_index]

    if next_token == END_TOKEN:
      break

    op_sentence = (op_sentence[0] + next_token, )

  return  op_sentence[0]

In [None]:
sentence = translate(transformer, "how are you")
print(sentence)

كيف حالك؟


In [None]:
sentence = translate(transformer, "I went there today")
print(sentence)

ذهبت اليوم


In [None]:
sentence = translate(transformer, "show me your strength")
print(sentence)

ارني الامر الامر الامر
