In [89]:
pip install tiktoken



In [90]:
import torch
import torch.nn as nn
import numpy as np
import math
import torch.nn.functional as F
import pandas as pd
from torch.utils.data import Dataset
import tiktoken

## Setting the Hyperparameters

In [91]:
from dataclasses import dataclass
@dataclass
class Config:
    def __init__(self):
        self.batch_size = 128
        self.max_seq_len = 56
        self.input_dim = 512
        self.d_model = 512
        self.num_heads = 8
        self.ffn_hidden = 2048
        self.num_layers = 6
        self.dropout_rate = 0.1
        self.learning_rate = 3e-4
        self.num_epochs = 1
        self.vocab_size = 50304
        self.decoder_vocab_size = 0

In [92]:
def get_device():
    return torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

## Multihead Self Attention

In [93]:
def scaled_dot_product_attention(q,k,v,config=Config(),mask=None,):
    d_k = q.size(-1)
    qk = torch.matmul(q, k.transpose(-2, -1)) /  math.sqrt(d_k)
    if mask is not None:
        qk = qk.permute(1, 0, 2, 3) + mask
        qk = qk.permute(1, 0, 2, 3)
    qk = F.softmax(qk, dim=-1)
    new_qkv = torch.matmul(qk, v)
    return new_qkv

class Multihead_Self_Attention(nn.Module):
    def __init__(self,input_dim, d_model, num_heads, config=Config()):
        super(Multihead_Self_Attention, self).__init__()
        self.config = config
        self.input_dim = input_dim
        self.model_dim = d_model
        self.num_heads = num_heads
        self.head_dim = self.model_dim // self.num_heads
        self.qkv_layer = nn.Linear(input_dim, 3 * self.model_dim)
        self.concat_layer = nn.Linear(self.model_dim, self.model_dim)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self,x,mask=None):
        qkv = self.qkv_layer(x)
        a,b,c = qkv.size()
        qkv = qkv.view(self.config.batch_size,self.config.max_seq_len,self.num_heads,3*self.head_dim)
        qkv = qkv.permute(0,2,1,3)
        q,k,v = qkv.chunk(3,dim=-1)
        new_qkv = scaled_dot_product_attention(q,k,v,mask)
        new_qkv = new_qkv.permute(0,2,1,3)
        new_qkv = new_qkv.reshape(self.config.batch_size,self.config.max_seq_len,self.model_dim)
        out = self.concat_layer(new_qkv)
        return out

## Multihead Cross Attention

In [94]:
class Multihead_Cross_Attention(nn.Module):
    def __init__(self,input_dim, model_dim, num_heads, config=Config()):
        super(Multihead_Cross_Attention, self).__init__()
        self.config = config
        self.input_dim = input_dim
        self.model_dim = model_dim
        self.num_heads = num_heads
        self.head_dim = model_dim // num_heads
        self.qk_layer = nn.Linear(input_dim, 2 * model_dim)
        self.v_layer = nn.Linear(input_dim, model_dim)
        self.concat_layer = nn.Linear(model_dim, model_dim)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self,x,y,mask=None):
        x_B,x_T,x_C = x.size()
        y_B,y_T,y_C = y.size()
        qk = self.qk_layer(x)
        v = self.v_layer(y)
        qk = qk.view(self.config.batch_size,self.config.max_seq_len,self.config.num_heads,2*self.head_dim)
        v = v.view(self.config.batch_size,self.config.max_seq_len,self.config.num_heads,self.head_dim)
        qk = qk.permute(0,2,1,3)
        v = v.permute(0,2,1,3)
        q,k = qk.chunk(2,dim=-1)
        new_qkv = scaled_dot_product_attention(q,k,v,mask)
        new_qkv = new_qkv.permute(0,2,1,3)
        new_qkv = new_qkv.reshape(self.config.batch_size,self.config.max_seq_len,self.model_dim)
        out = self.concat_layer(new_qkv)
        return out

## Positional Encoding

In [95]:
class PostionalEncoding(nn.Module):
    def __init__(self,max_seq_len,d_model):
        super(PostionalEncoding,self).__init__()
        self.max_seq_len = max_seq_len
        self.d_model = d_model
        self.encoding = torch.zeros(self.max_seq_len,self.d_model)

    def forward(self,x):
        even_index = torch.arange(0,self.d_model,2).float()
        domenator = torch.pow(10000,even_index/self.d_model)
        position = torch.arange(0,self.max_seq_len).unsqueeze(1)
        PE_even = torch.sin(position/domenator)
        PE_odd = torch.cos(position/domenator)
        stacked = torch.stack([PE_even,PE_odd],dim=2)
        PE_flatten = torch.flatten(stacked,start_dim=1,end_dim=2)
        return PE_flatten

## Normalization Layer

In [96]:
class NormalizationLayer(nn.Module):
    def __init__(self, parameter_dim):
        super(NormalizationLayer, self).__init__()
        self.parameters_shape = parameter_dim
        self.gamma = nn.Parameter(torch.ones(parameter_dim))
        self.beta = nn.Parameter(torch.zeros(parameter_dim))
        self.eps = 1e-6

    def forward(self, x):
        dims = [-(i + 1) for i in range(len(self.parameters_shape))]
        mean = x.mean(dims, keepdim=True)
        std = x.std(dims, keepdim=True)
        out = self.gamma * (x - mean) / (std + self.eps) + self.beta
        return out

## Position-wise Feed-Forward

In [97]:
class FeedForward(nn.Module):
    def __init__(self,d_model,ffn_hidden,config=Config()):
        super(FeedForward,self).__init__()
        self.d_model = d_model
        self.ffn_hidden = ffn_hidden
        self.layer1 = nn.Linear(self.d_model,self.ffn_hidden)
        self.layer2 = nn.Linear(self.ffn_hidden,self.d_model)
        self.dropout = nn.Dropout(p=config.dropout_rate)

    def forward(self,x):
        x = self.layer1(x)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.layer2(x)
        return x

## Encoder

In [98]:
class Encoder_Layer(nn.Module):
    def __init__(self,num_heads, d_model, ffn_hidden, max_seq_len, dropout_rate,config=Config()):
        super(Encoder_Layer,self).__init__()
        self.multihead_attention = Multihead_Self_Attention(config.input_dim,d_model,num_heads)
        self.pos_encoding = PostionalEncoding(max_seq_len,d_model)
        self.feedforward = FeedForward(d_model,ffn_hidden)
        self.norm1 = NormalizationLayer([d_model])
        self.norm2 = NormalizationLayer([d_model])
        self.dropout1 = nn.Dropout(dropout_rate)
        self.dropout2 = nn.Dropout(dropout_rate)

    def forward(self,x,mask=None):
        reseduial_x = x
        x = self.multihead_attention(x,mask)
        x = self.dropout1(x)
        x = x + reseduial_x
        x = self.norm1(x)
        reseduial_x = x
        x = self.feedforward(x)
        x = self.dropout2(x)
        x = x + reseduial_x
        x = self.norm2(x)
        return x

In [99]:
class SequentialEncoder(nn.Sequential):
    def forward(self, *inputs):
        x, self_attention_mask  = inputs
        for module in self._modules.values():
            x = module(x, self_attention_mask)
        return x

In [100]:

class Encoder(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, dropout_rate, num_layers,max_seq_len):
        super().__init__()
        self.layers = SequentialEncoder(*[Encoder_Layer(num_heads, d_model, ffn_hidden, max_seq_len, dropout_rate)
                                     for _ in range(num_layers)])
    def forward(self, x, mask=None):
        x = self.layers(x, mask)
        return x

## Decoder

In [101]:
class Decoder_Layer(nn.Module):
    def __init__(self,num_heads, d_model, ffn_hidden, max_seq_len, dropout_rate,config=Config()):
        super(Decoder_Layer,self).__init__()
        self.multihead_self_attention =  Multihead_Self_Attention(config.input_dim,d_model,num_heads)
        self.multihead_cross_attention = Multihead_Cross_Attention(config.input_dim,d_model,num_heads)
        self.feedforward = FeedForward(d_model,ffn_hidden)
        self.norm1 = NormalizationLayer([d_model])
        self.norm2 = NormalizationLayer([d_model])
        self.norm3 = NormalizationLayer([d_model])
        self.dropout1 = nn.Dropout(dropout_rate)
        self.dropout2 = nn.Dropout(dropout_rate)
        self.dropout3 = nn.Dropout(dropout_rate)

    def forward(self, x, y, self_attention_mask=None, cross_attention_mask=None):
        reseduial_y = y
        y = self.multihead_self_attention(y,mask = self_attention_mask)
        y = self.dropout1(y)
        y = y + reseduial_y
        y = self.norm1(y)

        reseduial_y = y
        y = self.multihead_cross_attention(x,y,mask = cross_attention_mask)
        y = self.dropout2(y)
        y = y + reseduial_y
        y = self.norm2(y)


        reseduial_y = y
        y = self.feedforward(y)
        y = self.dropout3(y)
        y = y + reseduial_y
        y = self.norm3(y)

        return y

In [102]:
class Sequential_Decoder(nn.Sequential):
    def forward(self,*input):
        x, y, self_attention_mask, cross_attention_mask = input
        for module in self._modules.values():
            y = module(x, y, self_attention_mask, cross_attention_mask)
        return y

In [103]:
class Decoder(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, dropout_rate, num_layers,max_seq_len):
        super().__init__()
        self.layers = Sequential_Decoder(*[Decoder_Layer(num_heads, d_model, ffn_hidden, max_seq_len, dropout_rate)
                                     for _ in range(num_layers)])
    def forward(self, x, y,self_attention_mask=None, cross_attention_mask=None):
        y = self.layers(x, y,self_attention_mask, cross_attention_mask)
        return y

In [104]:
NEG_INFTY = -1e9

def create_masks(ar_batch, meter_batch,config=Config()):
    num_sentences = len(ar_batch)
    look_ahead_mask = torch.full([config.max_seq_len, config.max_seq_len] , True)
    look_ahead_mask = torch.triu(look_ahead_mask, diagonal=1)
    encoder_padding_mask = torch.full([num_sentences, config.max_seq_len, config.max_seq_len] , False)
    decoder_padding_mask_self_attention = torch.full([num_sentences, config.max_seq_len, config.max_seq_len] , False)
    decoder_padding_mask_cross_attention = torch.full([num_sentences, config.max_seq_len, config.max_seq_len] , False)

    for idx in range(num_sentences):
      ar_sentence_length, meter_sentence_length = len(ar_batch[idx]), len(meter_batch[idx])
      ar_chars_to_padding_mask = np.arange(ar_sentence_length + 1, config.max_seq_len)
      meter_chars_to_padding_mask = np.arange(meter_sentence_length + 1, config.max_seq_len)
      encoder_padding_mask[idx, :, ar_chars_to_padding_mask] = True
      encoder_padding_mask[idx, ar_chars_to_padding_mask, :] = True
      decoder_padding_mask_self_attention[idx, :, meter_chars_to_padding_mask] = True
      decoder_padding_mask_self_attention[idx, meter_chars_to_padding_mask, :] = True
      decoder_padding_mask_cross_attention[idx, :, ar_chars_to_padding_mask] = True
      decoder_padding_mask_cross_attention[idx, meter_chars_to_padding_mask, :] = True

    encoder_self_attention_mask = torch.where(encoder_padding_mask, NEG_INFTY, 0)
    decoder_self_attention_mask =  torch.where(look_ahead_mask + decoder_padding_mask_self_attention, NEG_INFTY, 0)
    decoder_cross_attention_mask = torch.where(decoder_padding_mask_cross_attention, NEG_INFTY, 0)

    return encoder_self_attention_mask, decoder_self_attention_mask, decoder_cross_attention_mask

## Transformer

![image.png](attachment:image.png)

In [105]:
tiktoken.get_encoding('gpt2').encode(' ')

[220]

In [106]:
class Transformer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.enc_sentence_embedding = nn.Embedding(config.vocab_size, config.d_model)
        self.enc_positional_encoding = nn.Embedding(config.max_seq_len, config.d_model)
        self.encoder = Encoder(config.d_model, config.ffn_hidden, config.num_heads, config.dropout_rate, config.num_layers, config.max_seq_len)

        self.dec_sentence_embedding = nn.Embedding(config.vocab_size, config.d_model)
        self.dec_positional_encoding = nn.Embedding(config.max_seq_len, config.d_model)
        self.decoder = Decoder(config.d_model, config.ffn_hidden, config.num_heads, config.dropout_rate, config.num_layers, config.max_seq_len)

        self.linear = nn.Linear(config.d_model, config.vocab_size)

    def forward(self, x, y):
        target = y
        encoder_self_attention_mask, decoder_self_attention_mask, decoder_cross_attention_mask = create_masks(x, y, self.config)
        encoder_self_attention_mask = encoder_self_attention_mask.to(get_device())
        decoder_self_attention_mask = decoder_self_attention_mask.to(get_device())
        decoder_cross_attention_mask = decoder_cross_attention_mask.to(get_device())

        B, T = x.size()
        pos = torch.arange(0, self.config.max_seq_len, dtype=torch.long, device=x.device) # shape (T)
        pos_emb = self.enc_positional_encoding(pos) # position embeddings of shape (T, n_embd)
        embedding = self.enc_sentence_embedding(x)
        # add padding to embedding by adding space at the end of it with token (220) to reach the max_seq_len
        #embedding = torch.cat((embedding, torch.zeros(B, self.config.max_seq_len - T, self.config.d_model).to(get_device())), dim=1)
        x = embedding + pos_emb
        x = self.encoder(x, encoder_self_attention_mask)

        B, T = y.size()
        pos = torch.arange(0, self.config.max_seq_len, dtype=torch.long, device=y.device)
        pos_emb = self.dec_positional_encoding(pos)
        embedding = self.dec_sentence_embedding(y)
        y = embedding + pos_emb
        out = self.decoder(x, y, decoder_self_attention_mask, decoder_cross_attention_mask)

        out = self.linear(out)
        # return logits and loss for training
        loss = F.cross_entropy(out.view(-1, out.size(-1)), target.view(-1),ignore_index=220)
        return out, loss

## Data Loader

In [107]:
class TextDataset(Dataset):
    def __init__(self, phrase, meter):
        self.phrase = phrase
        self.meter = meter

    def __len__(self):
        return len(self.phrase)

    def __getitem__(self, idx):
        return self.phrase[idx], self.meter[idx]

In [108]:
def tokenizer(text):
    return tiktoken.get_encoding('gpt2').encode(text)

In [109]:
def tokenizer(text):
    return tiktoken.get_encoding('gpt2').encode(text)
def decoder(text):
    return tiktoken.get_encoding('gpt2').decode(text)

In [110]:
decoder([220,52,56,56,56,56,56])

' UYYYYY'

## Training

In [111]:
meter = pd.read_csv('meter.csv',nrows=1000000)

In [112]:
meter.head()

Unnamed: 0,الشطر,البحر
0,خَليلَيَّ لا تَستَعجِلا أَن تَزَوَّدا,الطويل
1,فَما لَبَثٌ يَوماً بِسابِقٍ مَغنَمٍ,الطويل
2,وَإِن تُنظِراني اليَومَ أَقضِ لُبانَةً,الطويل
3,لَعَمرُكَ ما نَفسٌ بِجِدٍ رَشيدَةٍ,الطويل
4,وَإِن ظَهَرَت مِنهُ قَوارِصُ جَمَّةٌ,الطويل


In [113]:
shadr = [tokenizer(tok) for tok in meter['الشطر']]

In [114]:
meters = [tokenizer(tok) for tok in meter['البحر']]

In [115]:
class create_batch:
    def __init__(self, shadr,meters,batch_size,max_seq_len):
        self.shadr = shadr
        self.meters = meters
        self.batch_size = batch_size
        self.max_seq_len = max_seq_len
        self.pointer = 0
    def next_batch(self):
        if self.pointer + self.batch_size > len(self.shadr):
            self.pointer = 0
        start = self.pointer
        end = self.pointer + self.batch_size
        batch_shadr = self.shadr[start:end]
        batch_meters = self.meters[start:end]
        self.pointer += self.batch_size
        # loop over the batch and pad the sentences to the max_seq_len with the token 220
        for i in range(len(batch_shadr)):
            if len(batch_shadr[i]) < self.max_seq_len:
                batch_shadr[i] += [220] * (self.max_seq_len - len(batch_shadr[i]))
            else:
                batch_shadr[i] = batch_shadr[i][:self.max_seq_len]
        for i in range(len(batch_meters)):
            if len(batch_meters[i]) < self.max_seq_len:
                batch_meters[i] += [220] * (self.max_seq_len - len(batch_meters[i]))
            else:
                batch_meters[i] = batch_meters[i][:self.max_seq_len]
        return batch_shadr, batch_meters

In [116]:
config = Config()
model = Transformer(config)
model.to(get_device())
#model = torch.compile(model)

batchify = create_batch(shadr,meters,config.batch_size,config.max_seq_len)
optimizer = torch.optim.Adam(model.parameters(), lr=config.learning_rate)
num_batchs = len(shadr) // config.batch_size
print(num_batchs)
for i in range(num_batchs):
    x,y = batchify.next_batch()
    x = torch.tensor(x).to(get_device())
    y = torch.tensor(y).to(get_device())
    optimizer.zero_grad()
    out, loss = model(x, y)
    loss.backward()
    optimizer.step()
    print(f"iteration : {i+1} loss: {loss.item()}")


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
iteration : 2813 loss: 0.5891317129135132
iteration : 2814 loss: 1.6805458068847656
iteration : 2815 loss: 1.8120754957199097
iteration : 2816 loss: 1.591102957725525
iteration : 2817 loss: 1.5710046291351318
iteration : 2818 loss: 1.096391201019287
iteration : 2819 loss: 1.4825377464294434
iteration : 2820 loss: 1.582400918006897
iteration : 2821 loss: 0.845054030418396
iteration : 2822 loss: 0.6848943829536438
iteration : 2823 loss: 1.1076363325119019
iteration : 2824 loss: 1.313017725944519
iteration : 2825 loss: 0.9258073568344116
iteration : 2826 loss: 0.9749688506126404
iteration : 2827 loss: 0.8350116014480591
iteration : 2828 loss: 0.701933741569519
iteration : 2829 loss: 0.6784744262695312
iteration : 2830 loss: 0.5967182517051697
iteration : 2831 loss: 0.5004682540893555
iteration : 2832 loss: 0.3217141628265381
iteration : 2833 loss: 0.20874951779842377
iteration : 2834 loss: 0.8996937274932861
iteration : 2835

In [117]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [118]:
torch.save(model.state_dict(), 'drive/MyDrive/model_weights.pth')

In [119]:
torch.save(model.state_dict(), 'model_weights.pth')

In [123]:
def decoder(text):
    return tiktoken.get_encoding('gpt2').decode(text)

In [133]:
batchify = create_batch(shadr,meters,config.batch_size,config.max_seq_len)
# Sampling loop
sample_rng = torch.Generator(device=get_device())
sample_rng.manual_seed(42)
for i in range(100):
    with torch.no_grad():
      x,y = batchify.next_batch()
      x = torch.tensor(x).to(get_device())
      y = torch.tensor(y).to(get_device())
      out, loss = model(x, y)
      out_ = torch.argmax(out[0], axis=1)
      print(f"target: {decoder(list(y[0]))}, predicted: {decoder(list(out_))}")

target: الطويل                                                  , predicted: الالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالال
target: المتقارب                                                , predicted: الالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالال
target: الطويل                                                  , predicted: الالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالال
target: الطويل                                                  , predicted: الالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالال
target: المتقارب                                                , predicted: الالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالالال
target: الوافر                                    