### -This is our unsuccessful trial to use Transformer architecture to create the model 
#### We think that could be useful to keep it for future reference

### -Our thoughts:
##### 1. We think that the model could be too complex for this task
##### 2. Or there are some mistakes in the implementation especially in initializing the weights of the model and the loss function.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import math
import copy


In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)
        
    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
        attn_probs = torch.softmax(attn_scores, dim=-1)
        output = torch.matmul(attn_probs, V)
        return output
        
    def split_heads(self, x):
        batch_size, seq_length, d_model = x.size()
        return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)
        
    def combine_heads(self, x):
        batch_size, _, seq_length, d_k = x.size()
        return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)
        
    def forward(self, Q, K, V, mask=None):
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))
        
        attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
        output = self.W_o(self.combine_heads(attn_output))
        return output


In [None]:
class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length):
        super(PositionalEncoding, self).__init__()
        
        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        self.register_buffer('pe', pe.unsqueeze(0))
        
    def forward(self, x):
        return x + self.pe[:, :x.size(1)]


In [None]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout, kernel_size=3):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.conv1d = nn.Conv1d(in_channels=d_model, out_channels=d_model, kernel_size=kernel_size, padding=1)  # Add 1d conv
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, mask):
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        x = self.conv1d(x.permute(0, 2, 1))  # Permute for 1d conv (batch, features, seq_len)
        x = x.permute(0, 2, 1)  # Permute back
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x


In [None]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout, kernel_size=3):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.conv1d = nn.Conv1d(in_channels=d_model, out_channels=d_model, kernel_size=kernel_size, padding=1)  # Add 1d conv
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, enc_output, src_mask, tgt_mask):
        attn_output = self.self_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(attn_output))
        x = self.conv1d(x.permute(0, 2, 1))  # Permute for 1d conv
        x = x.permute(0, 2, 1)  # Permute back
        attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
        x = self.norm2(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff_output))
        return x

In [None]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
        super(Transformer, self).__init__()
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_seq_length)

        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

        self.fc = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)

    def generate_mask(self, src, tgt):
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
        seq_length = tgt.size(1)
        nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()
        nopeak_mask = nopeak_mask.to(tgt.device)
        tgt_mask = tgt_mask & nopeak_mask
        return src_mask, tgt_mask

    def forward(self, src, tgt):
        src_mask, tgt_mask = self.generate_mask(src, tgt)
        src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
        tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))

        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)

        dec_output = tgt_embedded
        for dec_layer in self.decoder_layers:
            dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)

        output = self.fc(dec_output)
        return output


In [None]:
import pickle as pkl
DATASET_PATH = 'Data'

train_data_raw = []
valid_data_raw = []
test_data_raw = []

for i in range(1, 30):
    filename = f"/tashkeela_train/tashkeela_train_{i:03}.txt"
    with open(DATASET_PATH + filename, 'r', encoding='utf-8') as file:
        lines = file.readlines()
        train_data_raw.extend(lines)


for i in range(1, 5):
    filename = f"/tashkeela_val/tashkeela_val_{i:03}.txt"
    with open(DATASET_PATH + filename, 'r', encoding='utf-8') as file:
        lines = file.readlines()
        valid_data_raw.extend(lines)

for i in range(1, 2):
    filename = f"/tashkeela_test/tashkeela_test_{i:03}.txt"
    with open(DATASET_PATH + filename, 'r', encoding='utf-8') as file:
        lines = file.readlines()
        test_data_raw.extend(lines)

In [None]:
WITH_EXTRA_TRAIN = False
DATASET_PATH = 'Data'
CONSTANTS_PATH = 'helpers/constants'

with open(CONSTANTS_PATH + '/ARABIC_LETTERS_LIST.pickle', 'rb') as file:
    ARABIC_LETTERS_LIST = pkl.load(file)
with open(CONSTANTS_PATH + '/DIACRITICS_LIST.pickle', 'rb') as file:
    DIACRITICS_LIST = pkl.load(file)
if not WITH_EXTRA_TRAIN:
    with open(CONSTANTS_PATH + '/RNN_BIG_CHARACTERS_MAPPING.pickle', 'rb') as file:
        CHARACTERS_MAPPING = pkl.load(file)
else:
    with open(CONSTANTS_PATH + '/RNN_BIG_CHARACTERS_MAPPING.pickle', 'rb') as file:
        CHARACTERS_MAPPING = pkl.load(file)
with open(CONSTANTS_PATH + '/RNN_CLASSES_MAPPING.pickle', 'rb') as file:
    CLASSES_MAPPING = pkl.load(file)
with open(CONSTANTS_PATH + '/RNN_REV_CLASSES_MAPPING.pickle', 'rb') as file:
    REV_CLASSES_MAPPING = pkl.load(file)

In [None]:
def remove_diacritics(data_raw):
    return data_raw.translate(str.maketrans('', '', ''.join(DIACRITICS_LIST)))

In [None]:
def split_data(data_raw):
    data_new = list()

    for line in data_raw:
        for sub_line in line.split('\n'):
            if len(remove_diacritics(sub_line).strip()) == 0:
                continue

            if len(remove_diacritics(sub_line).strip()) > 0 and len(remove_diacritics(sub_line).strip()) <= 500:
                data_new.append(sub_line.strip())
            else:
                sub_line = sub_line.split()
                tmp_line = ''
                for word in sub_line:
                    if len(remove_diacritics(tmp_line).strip()) + len(remove_diacritics(word).strip()) + 1 > 500:
                        if len(remove_diacritics(tmp_line).strip()) > 0:
                            data_new.append(tmp_line.strip())
                        tmp_line = word
                    else:
                        if tmp_line == '':
                            tmp_line = word
                        else:
                            tmp_line += ' '
                            tmp_line += word
                if len(remove_diacritics(tmp_line).strip()) > 0:
                    data_new.append(tmp_line.strip())

    return data_new

In [None]:
train_split = split_data(train_data_raw)
val_split = split_data(valid_data_raw)

In [None]:
print('Training examples (split):', len(train_split))
print('Validation examples (split):', len(val_split))

In [None]:
def map_data(data_raw):
    X = list()
    Y = list()

    for line in data_raw:
        x = [CHARACTERS_MAPPING['<SOS>']]
        y = [CLASSES_MAPPING['<SOS>']]

        for idx, char in enumerate(line):
                if char in DIACRITICS_LIST:
                    continue

                # if char wasn't a diacritic add it to x
                try:
                    x.append(CHARACTERS_MAPPING[char])
                except KeyError as e:
                    print(f"Error: Character '{char}' not found in CHARACTERS_MAPPING at index {idx} in line: {line}")

                # if char wasn't a diacritic and wasn't an arabic letter add '' to y (no diacritic)
                if char not in ARABIC_LETTERS_LIST:
                    y.append(CLASSES_MAPPING[''])
                # if char was an arabic letter only.
                else:
                    char_diac = ''
                    if idx + 1 < len(line) and line[idx + 1] in DIACRITICS_LIST:
                        char_diac = line[idx + 1]
                        if idx + 2 < len(line) and line[idx + 2] in DIACRITICS_LIST and char_diac + line[idx + 2] in CLASSES_MAPPING:
                            char_diac += line[idx + 2]
                        elif idx + 2 < len(line) and line[idx + 2] in DIACRITICS_LIST and line[idx + 2] + char_diac in CLASSES_MAPPING: # شدة فتحة = فتحة شدة
                            char_diac = line[idx + 2] + char_diac
                    y.append(CLASSES_MAPPING[char_diac])

        
        assert(len(x) == len(y))

        x.append(CHARACTERS_MAPPING['<EOS>'])
        y.append(CLASSES_MAPPING['<EOS>'])

        X.append(x)
        Y.append(y)

        # print(len(x))
        # print("yyyyyyyyyyyyyyyyyyyyyy")
        # print(len(y))

    # X = np.asarray(X)
    # Y = np.asarray(Y)

    return X, Y

In [None]:
from torch.utils.data import Dataset, DataLoader
import time
import random
import numpy as np
import pickle as pkl


class MyDataset(Dataset):
    def __init__(self, lines, batch_size):
        self.lines = lines
        self.batch_size = batch_size

    def __len__(self):
        return int(np.floor(len(self.lines) / float(self.batch_size)))

    def __getitem__(self, idx):
        start_idx = idx * self.batch_size
        end_idx = min((idx + 1) * self.batch_size, len(self.lines))
        lines = self.lines[start_idx:end_idx]
        X_batch, Y_batch = map_data(lines)

        X_max_seq_len = 100
        y_max_seq_len = 100

        X = []
        for x in X_batch:
            x = list(x)
            x = x[:X_max_seq_len]
            x.extend([CHARACTERS_MAPPING['<PAD>']] * (X_max_seq_len - len(x)))
            X.append(np.asarray(x))

        Y_tmp = []
        for y in Y_batch:
            y_new = list(y)
            y_new = y_new[:y_max_seq_len]
            y_new.extend([CHARACTERS_MAPPING['<PAD>']] * (y_max_seq_len - len(y)))
            Y_tmp.append(np.asarray(y_new))
        Y_batch = Y_tmp

        X = np.asarray(X)
        Y_batch = np.asarray(Y_batch)

        return torch.tensor(X), torch.tensor(Y_batch)



In [None]:
def fit_model(batch_size, train_split, val_split):

    random.shuffle(train_split)
    train_split = list(sorted(train_split, key=lambda line: len(remove_diacritics(line)))) 
    random.shuffle(val_split)
    val_split = list(sorted(val_split, key=lambda line: len(remove_diacritics(line))))

    training_generator = MyDataset(train_split, batch_size)
    val_generator = MyDataset(val_split, batch_size)

    return training_generator, val_generator

In [None]:
Train_Gen, Val_Gen = fit_model(64, train_split, val_split)

In [None]:
src_vocab_size = len(CHARACTERS_MAPPING)
tgt_vocab_size = len(CLASSES_MAPPING)
d_model = 512
num_heads = 8
num_layers = 6
d_ff = 2048
max_seq_length = 100
dropout = 0.1

transformer = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout)

In [None]:
# Use the gpu for training the model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
transformer = transformer.to(device)

In [None]:
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

In [None]:
# Now train the model using the Training Generator and Validation Generator

for epoch in range(3):
    transformer.train()
    for i in range(Train_Gen.__len__()):
        optimizer.zero_grad()
        source = Train_Gen.__getitem__(i)[0].to(device)
        target = Train_Gen.__getitem__(i)[1].to(device)
        output = transformer(source, target[:, :-1])
        print(output.shape)
        print(target.shape)
        loss = criterion(output.contiguous().view(-1, tgt_vocab_size), target[:, 1:].contiguous().view(-1).type(torch.LongTensor).to(device))
        loss.backward()
        optimizer.step()
        
        print(f"Epoch: {epoch+1}, Batch: {i+1}, Loss: {loss.item()}")

    transformer.eval()
    with torch.no_grad():
        for i in range(Val_Gen.__len__()):
            source = Val_Gen.__getitem__(i)[0].to(device)
            target = Val_Gen.__getitem__(i)[1].to(device)
            output = transformer(source, target[:, :-1])
            loss = criterion(output.contiguous().view(-1, tgt_vocab_size), target[:, 1:].contiguous().view(-1).type(torch.LongTensor).to(device))
            print(f"Epoch: {epoch+1}, Validation Batch: {i+1}, Loss: {loss.item()}")
    print(f"Epoch: {epoch+1}, Loss: {loss.item()}")



In [None]:
def inference(transformer, source_vector):
    transformer.eval()  # Set the model to evaluation mode
    with torch.no_grad():  # No need to track gradients during inference
        # Assuming target vector is not needed during inference
        # Initialize target with zeros with appropriate shape
        target_vector = torch.zeros((1, source_vector.size(1)), dtype=torch.long).to(source_vector.device)
        # or initialize target with <SOS> token
        output = transformer(source_vector, target_vector)  
    return output


In [None]:
# save model 
torch.save(transformer, 'transformer6.pth')

In [None]:
line = "السلام عليكم يا قوم"

X, _ = map_data([line])
predictions = inference(transformer, torch.tensor(X).to(device))
predictions = predictions.squeeze()
#convert to numpy
predictions = predictions.cpu().numpy()

In [None]:
predictions = predictions[1:]

    
output = ''
for char, prediction in zip(remove_diacritics(line), predictions):
    output += char

    if char not in ARABIC_LETTERS_LIST:
        continue

    if '<' in REV_CLASSES_MAPPING[np.argmax(prediction)]:
        continue

    output += REV_CLASSES_MAPPING[np.argmax(prediction)]

print(output)