### Add all imports


In [1]:
# from utils import *
import re
from pyarabic.araby import strip_diacritics
import numpy as np

In [2]:
import unicodedata
import nltk
import torch
from torch import lstm_cell, nn
import time
import random
import pickle as pkl
import tensorflow as tf
import pickle

from tensorflow.keras.models import Model
from tensorflow.keras.layers import (
    Embedding,
    Dense,
    Dropout,
    LSTM,
    Bidirectional,
    TimeDistributed,
    Input,
)
from tensorflow.keras.utils import Sequence
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.initializers import glorot_normal
from tensorflow.keras.callbacks import ModelCheckpoint
import matplotlib.pyplot as plt
import re
from sklearn.model_selection import train_test_split




In [3]:
########################################################################################
# Read the letters from the pickle files which we will use
def get_letters():
    file_path = "constants/arabic_letters.pickle"
    with open(file_path, "rb") as file:
        letters = pickle.load(file)
    letters.add("<s>")
    letters.add("</s>")
    letters.add("<PAD>")
    return letters


########################################################################################
# Read the diacritics from the pickle files which we will use
def get_diacritics():
    file_path = "constants/diacritics.pickle"
    with open(file_path, "rb") as file:
        diacritics = pickle.load(file)
    # diacritics.add("<s>")
    # diacritics.add("</s>")
    # diacritics.add("<PAD>")
    return diacritics


########################################################################################
# Read the diacritics from the pickle files which we will use
def get_diacritics2id():
    file_path = "constants/diacritic2id.pickle"
    with open(file_path, "rb") as file:
        diacritics2id = pickle.load(file)
    # add no tashkeel
    return diacritics2id


########################################################################################
# Read TRAINING dataset given
def read_training_dataset(file_path="dataset/train.txt"):
    training_sentences = []
    with open(file_path, "r", encoding="utf-8") as file:
        # Read each line in the file
        for line in file:
            # Strip any leading or trailing whitespace from the line
            line = line.strip()
            # Add the line to the list
            training_sentences.append(line)
    # if(len(training_sentences)==50000):
    #     print("Read training set successfully")
    return training_sentences


########################################################################################
# Read DEV dataset given
def read_dev_dataset(file_path="dataset/val.txt"):
    dev = []
    with open(file_path, "r", encoding="utf-8") as file:
        # Read each line in the file
        for line in file:
            line = line.strip()
            dev.append(line)
    # print(len(dev))
    # if(len(dev)==2500):
    #     print("Read validation set successfully")
    return dev


########################################################################################


def separate_word_to_letters_diacritics(arabic_text, arabic_letters=get_letters()):
    # Normalize the text to handle different Unicode representations
    normalized_text = unicodedata.normalize("NFKD", arabic_text)
    letters = []
    diacritics = []
    # arabic_text = arabic_text[::-1]
    # for ind in range(len(arabic_text)):
    #     print(arabic_text[ind])
    ind = 0

    while ind < len(arabic_text):
        temp = []
        if not unicodedata.combining(arabic_text[ind]):
            # print(arabic_text[ind])
            # if arabic_text[ind] in arabic_letters:
            letters.append(arabic_text[ind])
            # print("added to letters",arabic_text[ind])

            if ind + 1 < len(arabic_text) and not unicodedata.combining(
                arabic_text[ind + 1]
            ):
                diacritics.append(temp)
                # print("added to diacritics from 1st",temp)
            if ind == (len(arabic_text) - 1):
                diacritics.append(temp)
            ind += 1

        else:
            while ind < len(arabic_text) and unicodedata.combining(arabic_text[ind]):
                # diacritics.pop(0)
                # print(arabic_text[ind])
                temp.append(arabic_text[ind])
                ind += 1
            temp = unicodedata.normalize("NFC", "".join(temp))
            # temp=[temp[::-1]]
            diacritics.append([temp])
            # print("added to diacritics",temp)
    # letters.reverse()
    # diacritics.reverse()
    return letters, diacritics


########################################################################################
def tokenize_to_vocab(data, vocab):
    (
        tokenized_sentences_word,
        tokenized_sentences_letters,
        tokenized_sentences_diacritics,
    ) = ([], [], [])

    for d in data:
        tokens = nltk.word_tokenize(d, language="arabic", preserve_line=True)
        # Add the start sentence <s> and end sentence </s> tokens at the beginning and end of each tokenized sentence
        tokens.reverse()
        tokens.insert(0, "<s>")
        tokens.append("</s>")

        vocab.update(tokens)

        word_letters = []
        word_diacritics = []
        for token in tokens:
            if token != "<s>" and token != "</s>":
                # letters = separate_arabic_to_letters(token)
                letter, diacritic = separate_word_to_letters_diacritics(token)
                word_diacritics.append(diacritic)
                word_letters.append(letter)
            else:
                word_letters.append(token)
                word_diacritics.append(token)

        tokenized_sentences_letters.append(word_letters)
        tokenized_sentences_diacritics.append(word_diacritics)
        tokenized_sentences_word.append(tokens)

    return (
        vocab,
        tokenized_sentences_word,
        tokenized_sentences_letters,
        tokenized_sentences_diacritics,
    )


def extract_sentences(training_dataset):
    # This pattern keeps Arabic letters, diacritics, and whitespaces and endlines
    pattern = re.compile(
        r'[^\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\s,.؟،«»؛(){};:!?\-\'"]'
    )

    # Replace unmatched characters with an empty string
    cleaned_corpus = [re.sub(pattern, "", t) for t in training_dataset]
    cleaned_corpus = [re.sub("\s\s+", " ", c) for c in cleaned_corpus]

    print(len(cleaned_corpus))

    data, labels = [], []

    first = True
    for c in cleaned_corpus:
        sentences = re.split(r'[,.؟،«»؛(){};:!?\-\'"]+', c)  # split on all punctuation
        labels += sentences

        without_dialects = [
            strip_diacritics(s) for s in sentences
        ]  # get the letters without dialects
        data += without_dialects

    # remove any spaces from line
    data = [d.strip() for d in data]
    labels = [l.strip() for l in labels]

    # remove empty lines
    data = [i for i in data if i]
    labels = [i for i in labels if i]
    return data, labels, cleaned_corpus


########################################################################################

In [4]:
# 1- Read data embeddings we have for letters and diacritics
letters, diacritics, diacritics2id = (
    get_letters(),
    get_diacritics(),
    get_diacritics2id(),
)
# 2- Have mapping ready
#### Letters ---- IDs
letters2id = {item: index for index, item in enumerate(letters)}
id2letters = {index: item for index, item in enumerate(letters)}

# add <PAD> to the mapping
diacritics2id["<PAD>"] = len(diacritics2id)
diacritics2id["<s>"] = len(diacritics2id)
diacritics2id["</s>"] = len(diacritics2id)
id2diacritics = {value: key for key, value in diacritics2id.items()}
for diacritic, id in diacritics2id.items():
    if diacritic not in diacritics:
        diacritics.add(diacritic)

for letter, id in letters2id.items():
    if letter not in letters:
        letters.add(letter)
# print(diacritics2id)

In [14]:
def map_data(data_raw):
    """Splists data lines into an array of charachers as integers and an array of discritics as one-hot-encodings"""

    # initialize data and diacritics lists
    X = list()
    Y = list()

    # loop on data lines
    for line in data_raw:
        lit, dicc = separate_word_to_letters_diacritics(line)
        x = [letters2id["<s>"]]
        y = [diacritics2id["<s>"]]
        for i in range(min(len(lit), len(dicc))):
            if len(dicc[i]) >= 1:
                dicc[i] = dicc[i][0]
            elif len(dicc[i]) == 0:
                dicc[i] = ""
            if dicc[i] not in diacritics2id:
                dicc[i] = ""

            if lit[i] not in letters2id:
                lit[i] = "<PAD>"
                dicc[i] = "<PAD>"

            x.append(letters2id[lit[i]])
            y.append(diacritics2id[dicc[i]])
        # append end of sentence character
        x.append(letters2id["</s>"])
        y.append(diacritics2id["</s>"])

        # convert diacritics integers to one_hot_encodings
        # y = to_categorical(y, len(diacritics2id))

        # append line's data and diacritics lists to total data and diacritics lists
        X.append(x)
        Y.append(y)

    return X, Y

### tHE MAIN MODEL AND FEEDING


### Data generator


In [6]:
from torch.utils.data import Dataset, DataLoader
import torch
from torch.nn.utils.rnn import pad_sequence


class DiacriticsDataset(Dataset):
    def __init__(
        self, lines, letters2id, diacritics2id, device, max_length=256, padding_value=0
    ):
        self.lines = lines
        X, Y = map_data(lines)
        X = [torch.LongTensor(x) for x in X]
        Y = [torch.LongTensor(x) for x in Y]
        X_padded = pad_sequence(X, batch_first=True, padding_value=padding_value)[
            :, :max_length
        ]
        Y_padded = pad_sequence(Y, batch_first=True, padding_value=padding_value)[
            :, :max_length
        ]
        X_padded = X_padded.to(device)
        Y_padded = Y_padded.to(device)
        self.x = X_padded
        self.y = Y_padded
        self.letters2id = letters2id
        self.diacritics2id = diacritics2id
        self.max_length = max_length
        self.padding_value = padding_value

    def __len__(self):
        return len(self.lines)

    def __getitem__(self, idx):
        # line = self.lines[idx]
        # X, Y = map_data([line])  # Your existing function
        # X = torch.LongTensor(X[0])[:, : self.max_length]
        # Y = torch.FloatTensor(Y[0])[;, :self]
        return self.x[idx], self.y[idx]

    # @staticmethod
    # def collate_fn(batch, padding_value=0):
    #     # print(self.max_length)
    #     # while True:
    #     #     pass
    #     X_batch, Y_batch = zip(*batch)

    #     # Pad X_batch
    #     X_padded = pad_sequence(X_batch, batch_first=True, padding_value=padding_value)

    #     # Pad Y_batch
    #     # If Y_batch is 2-dimensional (e.g., just indices), it should be padded differently.
    #     Y_padded = pad_sequence(Y_batch, batch_first=True, padding_value=padding_value)

    #     # Ensure all sequences are the same length
    #     # This is important if X and Y can be of different lengths; adjust as needed.
    #     # max_length = max(X_padded.shape[1], Y_padded.shape[1])
    #     max_length = 256
    #     if X_padded.shape[1] < max_length:
    #         # Pad X_padded to max_length
    #         padding = X_padded.new_full(
    #             (X_padded.size(0), max_length - X_padded.size(1), *X_padded.shape[2:]),
    #             padding_value,
    #         )
    #         X_padded = torch.cat([X_padded, padding], dim=1)
    #     if Y_padded.shape[1] < max_length:
    #         # Pad Y_padded to max_length
    #         padding = Y_padded.new_full(
    #             (Y_padded.size(0), max_length - Y_padded.size(1)), padding_value
    #         )
    #         Y_padded = torch.cat([Y_padded, padding], dim=1)

    #     return X_padded, Y_padded

### Model Definition


In [7]:
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence


# class DiacritizationModel(nn.Module):
#     def __init__(self, letters2id, diacritics2id):
#         super(DiacritizationModel, self).__init__()
#         self.embedding = nn.Embedding(num_embeddings=len(letters2id), embedding_dim=25)
#         self.blstm1 = nn.LSTM(
#             input_size=25, hidden_size=256, bidirectional=True, batch_first=True
#         )
#         self.dropout1 = nn.Dropout(0.1)
#         self.blstm2 = nn.LSTM(
#             input_size=512, hidden_size=256, bidirectional=True, batch_first=True
#         )
#         self.dropout2 = nn.Dropout(0.1)
#         self.dense1 = nn.Linear(512, 512)
#         self.relu = nn.ReLU()
#         self.dense2 = nn.Linear(512, len(diacritics2id))
#         self.softmax = nn.Softmax(dim=2)

#     def forward(self, x):
#         x = self.embedding(x)
#         x, _ = self.blstm1(x)
#         x = self.dropout1(x)
#         x, _ = self.blstm2(x)
#         x = self.dropout2(x)
#         x = self.dense1(x)
#         x = self.relu(x)
#         x = self.dense2(x)
#         x = self.softmax(x)
#         return x


class DiacritizationModel(nn.Module):
    def __init__(
        self,
        letters2id,
        diacritics2id,
        hidden_size=256,
        embedding_dim=256,
        in_vocab=25,
        out_vocab=25,
    ):
        super(DiacritizationModel, self).__init__()

        self.embedding = nn.Embedding(
            num_embeddings=len(letters2id), embedding_dim=embedding_dim
        )

        self.blstm1 = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_size,
            bidirectional=True,
            batch_first=True,
        )
        self.dropout1 = nn.Dropout(0.5)

        self.blstm2 = nn.LSTM(
            input_size=hidden_size * 2,
            hidden_size=hidden_size,
            bidirectional=True,
            batch_first=True,
        )
        self.dropout2 = nn.Dropout(0.5)

        self.dense1 = nn.Linear(hidden_size * 2, 512)
        self.relu = nn.ReLU()
        self.dense2 = nn.Linear(512, len(diacritics2id))

    def forward(self, x):
        x = self.embedding(x)
        x, _ = self.blstm1(x)
        x = self.dropout1(x)
        x, _ = self.blstm2(x)
        x = self.dropout2(x)
        x = self.dense1(x)
        x = self.relu(x)
        x = self.dense2(x)
        return x

### Training Loop


In [8]:
from torch.optim import Adam


def train_model(model, epochs, train_loader, val_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters())
    padding_value = 0
    for epoch in range(epochs):
        model.train()
        for i, (inputs, targets) in enumerate(train_loader):
            # print(inputs.shape, targets.shape)

            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)  #

            loss = criterion(
                outputs.transpose(1, 2), targets
            )  # Assumes targets are one-hot encoded
            loss.backward()
            optimizer.step()

            # Print statistics
            if i % 100 == 0:
                print(
                    f"Epoch [{epoch+1}/{epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item()}"
                )

        # Validation step
        model.eval()
        with torch.no_grad():
            total_loss = 0
            for inputs, targets in val_loader:
                inputs, targets = inputs.to(device), targets.to(device)
                outputs = model(inputs)
                loss = criterion(outputs.transpose(1, 2), targets)
                total_loss += loss.item()
            print(f"Validation Loss: {total_loss / len(val_loader)}")

### Prediction


In [9]:
def predict_pytorch(line, model, letters2id, diacritics2id, id2diacritics):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.eval()  # Set the model to evaluation mode
    model.to(device)  # Ensure the model is on the correct device

    # Preprocess the input line
    # print(line)
    X, _ = map_data([line])  # Use your existing function
    X = (
        torch.LongTensor(X[0]).unsqueeze(0).to(device)
    )  # Add batch dimension and move to device

    # Predict using the model
    with torch.no_grad():
        predictions = model(X).squeeze(0)  # Remove batch dimension

    # Move predictions to CPU for further processing if necessary
    predictions = predictions.cpu()

    # Process the model's output
    output = ""
    char_list = []
    diac_list = []
    undiacritized_line = line

    # Map the predicted diacritics back to the original text
    for char, pred in zip(undiacritized_line, predictions):
        # if char != " ":
        char_list.append(char)
        diacritic_index = (
            pred.argmax().item()
        )  # Get the index of the highest probability
        diac_list.append(diacritic_index)

        output += char
        # Append the diacritic if it's a valid Arabic letter and has a corresponding diacritic
        if char in letters and "<" not in id2diacritics[diacritic_index]:
            output += id2diacritics[diacritic_index]

    return output, char_list, diac_list

### Read Data


In [10]:
training_dataset = read_training_dataset()
dev_dataset = read_dev_dataset()

### Pre-process and clean data


In [11]:
# 1- Clean the data
data, labels, cleaned_corpus = extract_sentences(training_dataset)
val_data, val_labels, val_cleaned_corpus = extract_sentences(dev_dataset)

50000
2500


In [12]:
import tqdm

In [15]:
import numpy as np

# Calculate the lengths of all strings
lengths = np.array([len(s) for s in data])

# Determine the length that keeps 99% of the data
length_99_percentile = int(np.percentile(lengths, 99))

# length_99_percentile = 500
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

train_loader = DataLoader(
    DiacriticsDataset(
        labels, letters2id, diacritics2id, device, max_length=length_99_percentile
    ),
    batch_size=128,
    shuffle=True,
    # collate_fn=DiacriticsDataset.collate_fn,
)
val_loader = DataLoader(
    DiacriticsDataset(
        val_labels, letters2id, diacritics2id, device, max_length=length_99_percentile
    ),
    batch_size=128,
    # collate_fn=DiacriticsDataset.collate_fn,
)

In [16]:
model = DiacritizationModel(letters2id, diacritics2id)

# print(train_loader[0])
lolo = train_model(model, 3, train_loader, val_loader)

Epoch [1/3], Step [1/2175], Loss: 2.9391896724700928
Epoch [1/3], Step [101/2175], Loss: 0.10735046863555908
Epoch [1/3], Step [201/2175], Loss: 0.07242868095636368
Epoch [1/3], Step [301/2175], Loss: 0.047480203211307526
Epoch [1/3], Step [401/2175], Loss: 0.0545131154358387
Epoch [1/3], Step [501/2175], Loss: 0.034626808017492294
Epoch [1/3], Step [601/2175], Loss: 0.03932067006826401
Epoch [1/3], Step [701/2175], Loss: 0.03423409163951874
Epoch [1/3], Step [801/2175], Loss: 0.023445913568139076
Epoch [1/3], Step [901/2175], Loss: 0.029815522953867912
Epoch [1/3], Step [1001/2175], Loss: 0.02192300744354725
Epoch [1/3], Step [1101/2175], Loss: 0.030136939138174057
Epoch [1/3], Step [1201/2175], Loss: 0.03407389298081398
Epoch [1/3], Step [1301/2175], Loss: 0.027252042666077614
Epoch [1/3], Step [1401/2175], Loss: 0.022839564830064774
Epoch [1/3], Step [1501/2175], Loss: 0.021497759968042374
Epoch [1/3], Step [1601/2175], Loss: 0.02180376648902893
Epoch [1/3], Step [1701/2175], Loss: 

In [17]:
PATH = './saved_models_pytorch/three_epochs_model'

In [18]:
torch.save(model.state_dict(), PATH)


In [43]:
# print(predict("العصفور فوق الشجرة", model))
test_data = read_dev_dataset("dataset/val.txt")
test_data, test_labels, test_corpus = extract_sentences(test_data)
# data_gen = DataGenerator(test_labels, 1,letters2id, diacritics2id)
data_gen = DiacriticsDataset(
    test_labels, letters2id, diacritics2id, device, max_length=length_99_percentile
)
# acc = model.evaluate(data_gen)[1]
acc = predict_pytorch(
    "العصفور فوق الشجرة", model, letters2id, diacritics2id, id2diacritics
)
print("accracy in the test set is : ", acc[0])

2500
accracy in the test set is :  اُلُعُصُفُوُرُ فُوُقُ اُلُشَجُرُةُ


In [None]:
# print(predict("ذهب زياد الي المدرسة", model)[0])
# print(predict("العصفور فوق الشجرة الكبيرة", model)[0])
# print(predict("احمد يحب حنان", model)[0])
# print(predict("الولد يلعب تحت الشجره", model)[0])

### SAVE


In [None]:
model.save("model_epoch1")

AttributeError: 'DiacritizationModel' object has no attribute 'save'

### Load model


In [None]:
from tensorflow.keras.models import load_model

new_model = load_model("model_epoch1")

### load test data and clean it


In [None]:
import csv

test_data = read_dev_dataset("./test.txt")
data, labels, cleaned_corpus = extract_sentences(test_data)
last_id = 0
# open csv fiel as write
file = open("result.csv", "w", newline="", encoding="utf-8")
writer = csv.writer(file)
writer.writerow(["ID", "label"])


for i in range(len(data)):
    out, char_list, diac_list = predict(data[i], model)
    for j in range(len(char_list)):
        if char_list[j] == " ":
            continue
        writer.writerow([last_id, diac_list[j]])
        last_id += 1
file.close()

### CALC THE ACCURACY IF WE HAVE THE GOLD OUTPUT


In [None]:
import pandas as pd

# Load the datasets
gold_data = pd.read_csv("sample_test_set_gold.csv")
predicted_data = pd.read_csv("result.csv")  # Assuming this contains your predictions

# Ensure both dataframes have the same length
assert len(gold_data) == len(
    predicted_data
), "Datasets must have the same number of rows"

# Compare and calculate accuracy
correct_predictions = 0
for gold_label, predicted_label in zip(gold_data["label"], predicted_data["label"]):
    if gold_label == predicted_label:
        correct_predictions += 1

accuracy = correct_predictions / (len(gold_data) - 1)
print(f"Accuracy: {accuracy:.2f}")