## Introduction

In this notebook, we will create a LSTM seq2seq model for Arabic text diacritization. The goal is to add diacritics to the Arabic text, which can help in pronunciation and understanding.


## Libraries

Importing the necessary libraries, including Pytorch for building and training our model.

In [3]:
import re
import matplotlib.pyplot as plt
import math
import statistics
import nltk
from nltk import word_tokenize
import gensim
from gensim.models import Word2Vec
import multiprocessing
from tashaphyne import normalize
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.feature_extraction.text import CountVectorizer
import numpy as np
import pickle
import os
import pandas as pd
from tqdm import tqdm
import torch
from torch import nn
import random as rnd
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence



In [4]:
nltk.download('punkt')

[nltk_data] Downloading package punkt to /usr/share/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [6]:
class featureType():
    __slots__ = ('BagOfWords','TF_IDF')

# Data preprocessing

In [7]:
def preprocessing(data, maxSentenceLength = 20):
    # Lists to store processed data
    tokenized_sentences = []
    tokenized_sentences_with_diacritics = []
    vocab = set()  # Set to store unique tokens

    # Preprocessing steps
    pre_data = re.sub(r'\n',' \n',data)
    pre_data = re.sub(r'[,،;!~\"*\d\(\){}\[\]/\\\'«»`\-\–(\u200f)]','',pre_data)
    pre_data = re.sub(r'\s+',' ',pre_data)
    sentenses = re.split(r'[؛.:؟\n]',pre_data)
    last_sentences = []
    
    # Loop through each sentence
    for i in range(len(sentenses)):
        words = sentenses[i].split()
        # Break long sentences into chunks with a maximum length of `maxSentenceLength`
        new_sentences = [" ".join(words[i:i+maxSentenceLength]) for i in range(0, len(words), maxSentenceLength)]
        
        # Process each chunk
        for sentence in new_sentences:
            # Text normalization
            sentence = normalize.strip_tatweel(sentence)
            sentence = normalize.normalize_lamalef(sentence)
            
            # Tokenize the sentence with diacritics
            tokens_with_diacritics = word_tokenize(sentence)
            tokenized_sentences_with_diacritics.append(tokens_with_diacritics)
            
            # Remove diacritics
            sentence = normalize.strip_tashkeel(sentence)
            last_sentences.append(sentence.strip())
            
            # Tokenize the sentence without diacritics
            tokens = word_tokenize(sentence)
            tokenized_sentences.append(tokens)
            
            # Update vocabulary set with unique tokens
            vocab.update(tokens)
            
    # Return processed data
    return last_sentences, tokenized_sentences, tokenized_sentences_with_diacritics, vocab

# Feature extraction

In [8]:
def featureExtraction(tokenized_sentences, sentences, vocab, type = featureType.TF_IDF):
    # Lists to store extracted features and words
    features = []
    words = []
    
    # Check the feature type requested
    if type is featureType.BagOfWords:
        print("Bag Of Words")
        # Create a Bag of Words model
        model = CountVectorizer()
        # Fit the model on the tokenized sentences and transform into a feature matrix
        features = model.fit_transform(sentences).toarray()
        # Get the feature names (words)
        words = model.get_feature_names_out()
    elif type is featureType.TF_IDF:
        print("TF-IDF")
        # Create a TF-IDF model
        model = TfidfVectorizer()
        # Fit the model on the tokenized sentences and transform into a feature matrix
        features = model.fit_transform(sentences).toarray()
        # Get the feature names (words)
        words = model.get_feature_names_out()

    # Return the model, features, and words
    return model, features, words

In [9]:
def wordIndexer(tokenized_sentences, vocab, sentenceLength=20):
    # Convert vocabulary set to a list
    vocab_list = list(vocab)
    # Create a dictionary to map each word to its index in the vocabulary list
    vocab_dict = {item: index for index, item in enumerate(vocab_list)}
    # List to store indexed sentences
    sentences_indexer = []

    # Iterate through each tokenized sentence
    for sentence in tokenized_sentences:
        sentence_indexer = []

        # Map each word to its index in the vocabulary or use a special index for out-of-vocabulary words
        for i in range(len(sentence)):
            sentence_indexer.append(vocab_dict.get(sentence[i], len(vocab) + 3))

        # Pad the sentence with a special index if its length is less than `sentenceLength`
        for i in range(len(sentence), sentenceLength, 1):
            sentence_indexer.append(len(vocab) + 2)

        # Add the indexed sentence to the list
        sentences_indexer.append(sentence_indexer)

    # Return the list of indexed sentences
    return sentences_indexer

In [10]:
def charIndexer(tokenized_sentences_with_diacritics, max_length, diacritics, arabic_letters):
    # Lists to store indexed characters and diacritics for each sentence
    sentences_chars = []
    sentences_diacritics = []

    # Iterate through each tokenized sentence with diacritics
    for sentence in tokenized_sentences_with_diacritics:
        chars = []  # List to store indexed characters for the current sentence
        diacs = []  # List to store indexed diacritics for the current sentence

        # Iterate through each word in the sentence
        for word in sentence:
            for i in range(len(word)):
                if word[i] not in diacritics:
                    # Map the character to its index in `arabic_letters`
                    chars.append(arabic_letters[word[i]])

                    # Check for diacritics and map them to their indices in `diacritics`
                    if i + 2 < len(word) and word[i+1:i+3] in diacritics:
                        diacs.append(diacritics[word[i+1:i+3]])
                        i += 2
                    elif i + 1 < len(word) and word[i+1] in diacritics:
                        diacs.append(diacritics[word[i+1]])
                        i += 1
                    else:
                        diacs.append(diacritics[''])

        # Pad the sentence with special indices if its length is less than `max_length`
        for i in range(len(chars), max_length):
            chars.append(arabic_letters['$'])
            diacs.append(diacritics[''])

        # Add the indexed characters and diacritics to the respective lists
        sentences_chars.append(chars)
        sentences_diacritics.append(diacs)

    # Return the lists of indexed characters and diacritics
    return sentences_chars, sentences_diacritics

# ArabicDataset
The class that impelements the dataset for the model

In [11]:
class ArabicDatasetTFIDF(torch.utils.data.Dataset):

  def __init__(self, tokenized_sentences, features, sentences_chars, sentences_diacritics, words, maxLength = 130):
    self.tokenized_sentences = tokenized_sentences
    self.features = features
    self.sentences_chars = torch.tensor(sentences_chars, dtype=torch.float32)
    self.sentences_diacritics = torch.tensor(sentences_diacritics)
    self.words = words
    self.maxLength = maxLength

  def __len__(self):
    return len(self.sentences_diacritics)

  def __getitem__(self, idx):
    sentence_tokens_features = []
    for i in range(len(self.tokenized_sentences[idx])):
      if self.tokenized_sentences[idx][i] in self.words:
        ind = np.where(self.words == self.tokenized_sentences[idx][i])[0][0]
        sentence_tokens_features.append(list(self.features[:,ind]))
      else:
        sentence_tokens_features.append(list([1]*(self.features.shape[0])))
    for i in range(len(self.tokenized_sentences[idx]),self.maxLength):
      sentence_tokens_features.append(list([1]*(self.features.shape[0])))

    return torch.tensor(sentence_tokens_features), self.sentences_chars[idx,:], self.sentences_diacritics[idx,:]



In [12]:
class ArabicDataset(torch.utils.data.Dataset):

  def __init__(self, vocab_size,sentences_indexer, sentences_chars, sentences_diacritics):
    self.sentences_indexer = torch.tensor(sentences_indexer)
    self.sentences_chars = torch.tensor(sentences_chars)
    self.sentences_diacritics = torch.tensor(sentences_diacritics)
    self.vocab_size = vocab_size


  def __len__(self):
    return len(self.sentences_indexer)

  def __getitem__(self, idx):
    
    return self.sentences_indexer[idx], self.sentences_chars[idx], self.sentences_diacritics[idx]

# Arabic Diacritization Model
The class that implements the pytorch model

In [13]:
class ArabicDiacritization(nn.Module):
  def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers, dropout,n_classes = 15):
    super(ArabicDiacritization, self).__init__()

    # Word embedding layer
    self.embedding_word = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim)

    # Bidirectional LSTM for words
    self.lstm_words = nn.LSTM(
                    input_size=embedding_dim,
                    hidden_size=hidden_size,
                    batch_first=True,
                    bidirectional=True,
                    num_layers=num_layers,
                    dropout = dropout
                    )
    
    # Character embedding layer
    self.embedding_char = nn.Embedding(num_embeddings=37, embedding_dim=70)
    
    # Bidirectional LSTM for characters
    self.lstm_chars = nn.LSTM(
                    input_size = 70,
                    hidden_size=hidden_size,
                    batch_first=True,
                    bidirectional=True,
                    num_layers=num_layers,
                    dropout = dropout
                    )

    # Linear layer for final classification
    self.linear = nn.Linear(in_features = hidden_size * 2, out_features=n_classes)

  def forward(self, sentences, chars):

    # Word embedding
    embedding_word = self.embedding_word(sentences)
    
    # Character embedding
    embedding_cahr = self.embedding_char(chars)
    
    # LSTM for words
    word_out, (h,c) = self.lstm_words(embedding_word)

    # LSTM for characters using the hidden states from the word LSTM
    char_out, _ = self.lstm_chars(embedding_cahr,(h,c))
    
    # Final linear layer for classification
    final_output = self.linear(char_out)

    return final_output

# Training

In [14]:
def training(model, train_dataset, pad, batch_size=512, epochs=10, learning_rate=0.01):

  # Create a DataLoader for the training dataset
  train_dataloader = DataLoader(train_dataset,batch_size=batch_size, shuffle=True)

  # Define the CrossEntropyLoss criterion
  criterion = torch.nn.CrossEntropyLoss()

  # Define the Adam optimizer
  optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate, )

  # Check if CUDA (GPU) is available
  use_cuda = torch.cuda.is_available()
  device = torch.device("cuda" if use_cuda else "cpu")
  
  if use_cuda:
    print("Cuda")
    # Move the model and criterion to GPU
    model = model.cuda()
    criterion = criterion.cuda()

  # Convert the padding value to a tensor and move it to the appropriate device
  pad = torch.tensor(pad)
  pad = pad.to(device)

  # Loop through epochs
  for epoch_num in range(epochs):
    total_acc_train = 0
    total_loss_train = 0

    # Lists to store output and predictions
    f_output = []
    total_predection = 0

    # Loop through batches in the training dataloader
    for sentences_indexer, sentences_char, sentences_diacritics in tqdm(train_dataloader):
      
      # Move data to the appropriate device (CPU or GPU)
      sentences_indexer = sentences_indexer.to(device)
      sentences_char = sentences_char.to(device)
      sentences_diacritics = sentences_diacritics.to(device)

      # Forward pass
      output = model(sentences_indexer,sentences_char)
      output = output.to(device)
        
      # Reshape tensors for calculation of loss
      sentences_char = sentences_char.view(-1)
      output = output.view(-1, output.size(-1))
      sentences_diacritics = sentences_diacritics.view(-1)

      # Apply mask to exclude padding values
      mask = sentences_char != pad
      sentences_char = sentences_char[mask]
      output = output[mask]
      sentences_diacritics = sentences_diacritics[mask]

      # Calculate batch loss
      batch_loss = criterion(output,sentences_diacritics)
      total_loss_train += batch_loss

      # Calculate batch accuracy
      acc = (sentences_diacritics == torch.argmax(output,dim=1)).sum().item()
      total_acc_train += acc
      total_predection += sentences_diacritics.size(0)

      # Backward pass and optimization step
      optimizer.zero_grad()
      batch_loss.backward()
      optimizer.step()

    # Calculate average loss and accuracy for the epoch
    epoch_loss = total_loss_train / len(train_dataset)
    epoch_acc = total_acc_train / total_predection

    # Print epoch summary
    print(
        f'Epochs: {epoch_num + 1} | Train Loss: {epoch_loss} \
        | Train Accuracy: {epoch_acc}\n')


In [15]:
torch.cuda.is_available()

True

In [16]:
# Load training Dataset
with open('train.txt','r',encoding= 'utf-8') as file:
    train = file.read()

In [17]:
sentences, tokenized_sentences, tokenized_sentences_with_diacritics,vocab = preprocessing(train)
print(len(sentences),len(tokenized_sentences), len(vocab))

163296 163296 105744


In [18]:
tfIdf_model, features, words = featureExtraction(tokenized_sentences,sentences,vocab,featureType.BagOfWords)

Bag Of Words


In [19]:
print(features.shape)

(163296, 105713)


In [16]:
# get max sentence length
max_length = 0
lengthOfSentences = []
for sentence in sentences:
    lengthOfSentences.append(len(sentence))
    max_length = max(len(sentence),max_length)

print(max_length)

145


In [17]:
# make letter to index
letter_to_code = {}
fixed_length = 6
with open('arabic_letters.pickle','rb') as file:
    letters = pickle.load(file,encoding='utf-8')
    letters = list(letters)
    # padding
    letters.append('$')
    letter_to_code = {item: index for index, item in enumerate(letters)}
#     for char, indx in letters_dict.items():
#         binary_representation = format(indx, f'0{fixed_length}b')
#         binary_representation = [int(i) for i in binary_representation]
#         # binary_representation.reverse()
#         letter_to_code[char] = binary_representation

In [18]:
with open('diacritic2id.pickle','rb') as file:
    diacritics = pickle.load(file,encoding='utf-8')

In [19]:
sentences_indexer = wordIndexer(tokenized_sentences,vocab)

In [20]:
sentences_char, sentences_diacritics = charIndexer(tokenized_sentences_with_diacritics,max_length,diacritics,letter_to_code)

In [21]:
train_dataset = ArabicDataset(len(vocab)+3,sentences_indexer, sentences_char, sentences_diacritics)

In [65]:
train_dataset = ArabicDatasetTFIDF(tokenized_sentences, features, sentences_char, sentences_diacritics, words, max_length)

In [20]:
model = ArabicDiacritization(vocab_size=len(vocab)+3, embedding_dim=300, hidden_size=256, num_layers=3, dropout = 0.1, n_classes=15)
print(model)

ArabicDiacritization(
  (embedding_word): Embedding(105747, 300)
  (lstm_words): LSTM(300, 256, num_layers=3, batch_first=True, dropout=0.1, bidirectional=True)
  (embedding_char): Embedding(37, 70)
  (lstm_chars): LSTM(70, 256, num_layers=3, batch_first=True, dropout=0.1, bidirectional=True)
  (linear): Linear(in_features=512, out_features=15, bias=True)
)


In [35]:
training(model, train_dataset, letter_to_code['$'])

Cuda


100%|██████████| 319/319 [02:11<00:00,  2.43it/s]


Epochs: 1 | Train Loss: 0.0012200954370200634         | Train Accuracy: 0.7881250480453879



100%|██████████| 319/319 [02:12<00:00,  2.41it/s]


Epochs: 2 | Train Loss: 0.00042389988084323704         | Train Accuracy: 0.9277684740353743



100%|██████████| 319/319 [02:12<00:00,  2.41it/s]


Epochs: 3 | Train Loss: 0.00033702413202263415         | Train Accuracy: 0.9421607768110027



100%|██████████| 319/319 [02:11<00:00,  2.42it/s]


Epochs: 4 | Train Loss: 0.00029877739143557847         | Train Accuracy: 0.9484056594533327



100%|██████████| 319/319 [02:11<00:00,  2.42it/s]


Epochs: 5 | Train Loss: 0.00027823366690427065         | Train Accuracy: 0.951869716953095



100%|██████████| 319/319 [02:12<00:00,  2.40it/s]


Epochs: 6 | Train Loss: 0.0002648368536029011         | Train Accuracy: 0.9541973289039377



100%|██████████| 319/319 [02:12<00:00,  2.41it/s]


Epochs: 7 | Train Loss: 0.0002565908362157643         | Train Accuracy: 0.9554805748156194



100%|██████████| 319/319 [02:12<00:00,  2.40it/s]


Epochs: 8 | Train Loss: 0.0002526135358493775         | Train Accuracy: 0.956133034176705



100%|██████████| 319/319 [02:11<00:00,  2.42it/s]


Epochs: 9 | Train Loss: 0.000250633544055745         | Train Accuracy: 0.956419211066592



100%|██████████| 319/319 [02:12<00:00,  2.40it/s]

Epochs: 10 | Train Loss: 0.00024814880453050137         | Train Accuracy: 0.9568590134584561






# Save the Trained Model

In [60]:
torch.save(model.state_dict(), 'model_state_dict.pt')

In [61]:
torch.save(model, 'model.pt')

# Evaluation

In [55]:
def evaluate(model, test_dataset, pad,batch_size=512):

  # Create a DataLoader for the test dataset
  test_dataloader = DataLoader(test_dataset,batch_size=batch_size)

  # Check if CUDA (GPU) is available
  use_cuda = torch.cuda.is_available()
  device = torch.device("cuda" if use_cuda else "cpu")
  if use_cuda:
    # Move the model to GPU if available
    model = model.cuda()

  # Convert the padding value to a tensor and move it to the appropriate device
  pad = torch.tensor(pad)
  pad = pad.to(device)

  # Variables to store evaluation metrics
  total_acc_test = 0
  total_predection = 0

  # Tensor to store test predictions
  test_predections = torch.tensor([])
  test_predections = test_predections.to(device)

  # Disable gradient computation during evaluation
  with torch.no_grad():

     # Loop through batches in the test dataloader
    for sentences_indexer, sentences_char, sentences_diacritics in tqdm(test_dataloader):

      # Move data to the appropriate device (CPU or GPU)
      sentences_indexer = sentences_indexer.to(device)
      sentences_char = sentences_char.to(device)
      sentences_diacritics = sentences_diacritics.to(device)

      # Forward pass
      output = model(sentences_indexer,sentences_char)
      output = output.to(device)
    
      # Reshape tensors for calculation of accuracy
      sentences_char = sentences_char.view(-1)
      output = output.view(-1, output.size(-1))
      sentences_diacritics = sentences_diacritics.view(-1)

      # Apply mask to exclude padding values
      mask = sentences_char != pad
      sentences_char = sentences_char[mask]
      output = output[mask]
      sentences_diacritics = sentences_diacritics[mask]

      # Get the index of the maximum value as the predicted class
      output = torch.argmax(output,dim=1)

      # Calculate batch accuracy
      acc = (sentences_diacritics == output).sum().item()
      total_acc_test += acc
      total_predection += sentences_diacritics.size(0)

      # Concatenate the predictions to the tensor
      test_predections = torch.cat((test_predections, output))

    # Calculate overall accuracy
    total_acc_test /= total_predection
    print(total_acc_test)

  # Convert predictions to int64 and move to CPU
  test_predections = test_predections.to(torch.int64)
  test_predections = test_predections.to(torch.device('cpu'))
  
  # Create a DataFrame to store predictions
  df = pd.DataFrame({'ID': range(len(test_predections)), 'label': test_predections})

  # Save predictions to a CSV file
  csv_file_path = 'predections.csv'
  df.to_csv(csv_file_path, index=False)

  # Print accuracy and return the test predections
  print(f'\nTest Accuracy: {total_acc_test}')
  return test_predections

In [56]:
with open('val.txt','r',encoding= 'utf-8') as file:
    val = file.read()
val_sentences, val_tokenized_sentences, val_tokenized_sentences_with_diacritics, val_vocab = preprocessing(val)
print(len(val_sentences),len(val_tokenized_sentences), len(val_vocab))
val_max_length = 0
for val_sentence in val_sentences:
    val_max_length = max(len(val_sentence),val_max_length)
print(val_max_length)
val_sentences_indexer = wordIndexer(val_tokenized_sentences,val_vocab)
val_sentences_char, val_sentences_diacritics = charIndexer(val_tokenized_sentences_with_diacritics,val_max_length,diacritics,letter_to_code)
val_dataset = ArabicDataset(len(vocab)+3,val_sentences_indexer, val_sentences_char, val_sentences_diacritics)

402 402 2694
121


In [None]:
test_predections = evaluate(model,val_dataset,letter_to_code['$'])