# 1- Setup Project

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
%cd /content/drive/MyDrive/ATD-WSD

# Create dir to for storing trained model
#!mkdir Baseline-w2v

/content/drive/MyDrive/ATD-WSD


In [None]:
!pip install numpy==1.23.1
!pip install tensorflow==2.14.0

In [None]:
import re
import pickle
import json
import numpy as np
import time
import random
import joblib

import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from gensim.models import Word2Vec
from sklearn.feature_extraction.text import TfidfVectorizer
from keras.models import Model
from tensorflow.keras.layers import Input
from keras.layers import Embedding, Dense, Dropout, LSTM, Bidirectional, TimeDistributed, InputLayer
from tensorflow.keras.models import Sequential
from keras.optimizers import Adam
from keras.utils import Sequence
from keras.initializers import glorot_normal
from keras.callbacks import ModelCheckpoint
from rich import print_json

In [None]:
if tf.test.gpu_device_name():
    print('Default GPU Device: {}'.format(tf.test.gpu_device_name()))
else:
    print("GPU device not found: working on CPU")

GPU device not found: working on CPU


# 2- Importing Dataset
The dataset that was prepared using Gemini which is 20% of the train, 100% val and testing taken from [original dataset](https://github.com/AliOsm/arabic-text-diacritization/tree/master/dataset)
```
[{
    "sentence": "some text in arabic",
    "words": [
      {
        "word": "word_1",
        "word_sense": "definition_1"
        "pos" : "part_of_speech_1"
      }
    ]
}]
```

***⚠️This baseline model will use the sentences without the sense***

In [None]:
# Helpers
def read_json(file_path):
  with open(file_path, mode="r", encoding="utf-8") as json_data:
    return get_sentences(json.load(json_data))

def get_sentences(data):
  sentences = []
  for s in data:
    sentences.append(s['sentence'])
  return sentences

def pprint(json_data):
  print_json(data=json_data, highlight=False)

In [None]:
train_data = read_json("/content/10485_train_wsd.json")
val_data = read_json("/content/2517_val_wsd.json")

print('Training data length:', len(train_data))
print("Train Sample")
print(train_data[100])

print('Validation data length:', len(val_data))
print("Val Sample")
print(val_data[1])

Training data length: 10485
Train Sample
فَاسِدٌ
Validation data length: 2517
Val Sample
وَلَوْ لَمْ تَزِدْ( 26 / 106 )


# 3- Constants

In [None]:
# Helpers
def CHAR_IDX(LIST):
    char2idx = {}
    idx2char = {}

    for i, char in enumerate(LIST):
        char2idx[char] = i
        idx2char[i] = char

    return char2idx, idx2char

In [None]:
ARABIC_CHAR = "ىعظحرسيشضقثلصطكآماإهزءأفؤغجئدةخوبذتن"
NUMBERS = "0123456789٠١٢٣٤٥٦٧٨٩"

# 15 possible Diacritics
FATHATAN = u'\u064b'
DAMMATAN = u'\u064c'
KASRATAN = u'\u064d'
FATHA = u'\u064e'
DAMMA = u'\u064f'
KASRA = u'\u0650'
SHADDA = u'\u0651'
SUKUN = u'\u0652'

DIACRITICS = [
    "",              # No Diacritic
    FATHA,           # Fatha
    FATHATAN,        # Fathatah
    DAMMA,           # Damma
    DAMMATAN,        # Dammatan
    KASRA,           # Kasra
    KASRATAN,        # Kasratan
    SUKUN,           # Sukun
    SHADDA,          # Shadda
    SHADDA+FATHA,    # Shadda + Fatha
    SHADDA+FATHATAN, # Shadda + Fathatah
    SHADDA+DAMMA,    # Shadda + Damma
    SHADDA+DAMMATAN, # Shadda + Dammatan
    SHADDA+KASRA,    # Shadda + Kasra
    SHADDA+KASRATAN  # Shadda + Kasratan
]

PUNCTUATIONS = [
    ".",    "،",    ":",    "؛",
    "-",    "–",    "«",    "»",
    "~",    "؟",    "!",    "*",
    "(",    ")",    "[",    "]",
    "{",    "}",    ";",    "\n",
    "'",    "\"",   "`",    "/",
    ",",    "?",    '’',    '“',
    '…',    '﴾',    '﴿',    "+",
    "*",    "=",    "&",    "_",
    "\n",   "\u200d",       "\u200f"
]


# Special Tokens
UNK_TOKEN = "<unk>"
PAD_TOKEN = "<pad>"
SOS_TOKEN = "<s>"
EOS_TOKEN = "</s>"
SPECIAL_TOKENS = [PAD_TOKEN, SOS_TOKEN, EOS_TOKEN]

# Combine
ARABIC_CHAR_SPACE = list(ARABIC_CHAR) + [' ']
ARABIC_CHAR_VALID = ARABIC_CHAR_SPACE + DIACRITICS
ALLCHARS = ARABIC_CHAR_SPACE + list(NUMBERS) + PUNCTUATIONS + SPECIAL_TOKENS

In [None]:
char_mapping, reverse_char_mapping = CHAR_IDX(ALLCHARS)
class_mapping, reverse_class_mapping = CHAR_IDX(DIACRITICS)

print("Char Mapping Size:", len(char_mapping))
print("Class Mapping Size:", len(class_mapping))

Char Mapping Size: 97
Class Mapping Size: 15


# 4- Helper Functions

In [None]:
def remove_diacritics_line(data):
    return data.translate(str.maketrans('', '', ''.join(DIACRITICS)))

def get_max_size(data):
  return max(len(remove_diacritics_line(item).strip()) for item in data)

def get_min_size(data):
  return min(len(remove_diacritics_line(item).strip()) for item in data)

def one_hot_matrix(data, size):
    one_hot_matrix = [[1 if j == i else 0 for j in range(size)] for i in data]
    return one_hot_matrix

def one_hot_vector(index , size):
    one_hot_vector = [1 if j == index else 0 for j in range(size)]
    return one_hot_vector

def punc_split(data):
  split_data = list()
  for line in data:
    line = line.replace('.', '.\n')
    line = line.replace(',', ',\n')
    line = line.replace('،', '،\n')
    line = line.replace(':', ':\n')
    line = line.replace(';', ';\n')
    line = line.replace('؛', '؛\n')
    line = line.replace('(', '\n(')
    line = line.replace(')', ')\n')
    line = line.replace('[', '\n[')
    line = line.replace(']', ']\n')
    line = line.replace('{', '\n{')
    line = line.replace('}', '}\n')
    line = line.replace('«', '\n«')
    line = line.replace('»', '»\n')
    line = line.replace('؟', '؟\n')
    line = line.replace('?', '?\n')
    line = line.replace('!', '!\n')
    line = line.replace('-', '-\n')

    split_data += line.split('\n')

  return split_data

def split_on_length(data, max_len=500):
    splitted_data = list()

    for sentence in punc_split(data):

       new_sentence = remove_diacritics_line(sentence).strip()

       if len(new_sentence) != 0:
          if len(new_sentence) > 0 and len(new_sentence) <= max_len:
                  splitted_data.append(sentence.strip())
          else:
            sentence_words = sentence.split()
            temp_sentence = ''

            for word in sentence_words:
              if len(remove_diacritics_line(temp_sentence).strip()) + len(remove_diacritics_line(word).strip()) + 1 > max_len:
                  if len(remove_diacritics_line(temp_sentence).strip()) > 0:
                      splitted_data.append(temp_sentence.strip())
                  temp_sentence = word

              else:
                  temp_sentence = word if temp_sentence == '' else temp_sentence + ' ' + word

            if len(remove_diacritics_line(temp_sentence).strip()) > 0:
                  splitted_data.append(temp_sentence.strip())

    return splitted_data

def train_word_embeddings(docs):
    tokenizer = Tokenizer(oov_token='<OOV>')
    tokenizer.fit_on_texts(docs)

    sentences = [doc.split() for doc in docs ]
    sentences.append([UNK_TOKEN])
    word2vec_model = Word2Vec(sentences, vector_size = 300, window=5, min_count=1, workers=4)

    word_embeddings = word2vec_model.wv

    return word_embeddings, tokenizer

def get_word_embeddings(word):
    encoded_docs = tokenizer.texts_to_sequences(word)
    word_embeddings_for_sample = []
    for word_index in encoded_docs:
      if len(word_index) > 0:
        if word_index[0] in data_embeddings:
          word_embeddings_for_sample.append(data_embeddings[word_index[0]])

    return word_embeddings_for_sample

# 5- Prepare Data

In [None]:
split_length_train_data      = split_on_length(train_data)
split_length_val_data      = split_on_length(val_data)

print("Train Data Size:", len(split_length_train_data))
print('Training data max:', get_max_size(split_length_train_data))
print('Training data min:', get_min_size(split_length_train_data))
print("Train Sample:", split_length_train_data[0:2])
print()

print("Val Data Size:", len(split_length_val_data))
print('Validation data max:', get_max_size(split_length_val_data))
print('Validation data min:', get_min_size(split_length_val_data))
print("Val Sample:", split_length_val_data[0:2])

Train Data Size: 35722
Training data max: 500
Training data min: 1
Train Sample: ['وَلَوْ جَمَعَ ثُمَّ عَلِمَ تَرْكَ رُكْنٍ مِنْ الْأُولَى بَطَلَتَا وَيُعِيدُهُمَا جَامِعًا ،', 'أَوْ مِنْ الثَّانِيَةِ ،']

Val Data Size: 15120
Validation data max: 500
Validation data min: 1
Val Sample: ['وَقَوْلُهُ', '( وَلَوْ حَلَفَ لَا يَجْلِسُ عَلَى سَرِيرٍ )']


#### Data without diacritics and without punc and numbers

In [None]:
clean_diac_train_data = [(''.join(char for char in text if char in ARABIC_CHAR_VALID)).strip() for text in split_length_train_data]
clean_diac_val_data = [(''.join(char for char in text if char in ARABIC_CHAR_VALID)).strip() for text in split_length_val_data]

clean_diac_train_data = [item for item in clean_diac_train_data if item != ""]
clean_diac_val_data   = [item for item in clean_diac_val_data if item != ""]

print('Training data length:', len(clean_diac_train_data))
print('Validation data length:', len(clean_diac_val_data))

print(clean_diac_train_data[0:5])

Training data length: 31578
Validation data length: 13458
['وَلَوْ جَمَعَ ثُمَّ عَلِمَ تَرْكَ رُكْنٍ مِنْ الْأُولَى بَطَلَتَا وَيُعِيدُهُمَا جَامِعًا', 'أَوْ مِنْ الثَّانِيَةِ', 'فَإِنْ لَمْ يَطُلْ تَدَارَكَ', 'وَإِلَّا فَبَاطِلَةٌ وَلَا جَمَعَ', 'وَلَوْ جَهِلَ أَعَادَهُمَا لِوَقْتَيْهِمَا']


#### Data with diacritics and without punc and numbers

In [None]:
clean_train_data = [remove_diacritics_line(text) for text in clean_diac_train_data]
clean_val_data = [remove_diacritics_line(text) for text in clean_diac_val_data]

print('Training data length:', len(clean_train_data))
print('Validation data length:', len(clean_val_data))

print(clean_train_data[0:2])

Training data length: 31578
Validation data length: 13458
['ولو جمع ثم علم ترك ركن من الأولى بطلتا ويعيدهما جامعا', 'أو من الثانية']


#### Train word embedding

In [None]:
data_to_embeddings = clean_train_data + clean_val_data
data_embeddings, tokenizer = train_word_embeddings(data_to_embeddings)

# 6- Custom Data Generator

In [None]:
def get_sentence_classes(sentence):
  x = []
  y = []

  unk_emb = get_word_embeddings([UNK_TOKEN])[0]

  vec = []
  vec = one_hot_vector(char_mapping[SOS_TOKEN],len(char_mapping))
  vec.extend(unk_emb)
  x.append(vec)

  y.append(one_hot_vector(class_mapping[''],len(class_mapping)))
  split_sentence = [i for j in sentence.split() for i in (j, ' ')][:-1]

  for word in split_sentence:
    emb2 = get_word_embeddings(remove_diacritics_line(word))

    if (len(emb2) == 0):
        emb = unk_emb
    else:
        emb = emb2[0]

    if word in PUNCTUATIONS:
      emb = unk_emb
    else:
      if (len(emb2) == 0):
        emb = unk_emb
      else:
        emb = emb2[0]

    for index, char in enumerate(word):
      if char not in DIACRITICS:
        vec = []
        vec = one_hot_vector(char_mapping[char],len(char_mapping))
        vec.extend(emb)
        x.append(vec)
        char_diacritic = ''
        sentence_len = len(sentence)

        if index + 1 < sentence_len:
          if sentence[index + 1] in DIACRITICS:
            char_diacritic = sentence[index + 1]

            if index + 2 < sentence_len:
               char_diacritic = char_diacritic + sentence[index + 2] if sentence[index + 2] in DIACRITICS and (char_diacritic + sentence[index + 2] in class_mapping) else sentence[index + 2] + char_diacritic if sentence[index + 2] in DIACRITICS and (sentence[index + 2] + char_diacritic in class_mapping) else char_diacritic
        y.append(one_hot_vector(class_mapping[char_diacritic],len(class_mapping)))

  vec = []
  vec = one_hot_vector(char_mapping[EOS_TOKEN],len(char_mapping))
  vec.extend(unk_emb)
  x.append(vec)

  y.append(one_hot_vector(class_mapping[''],len(class_mapping)))

  assert(len(x) == len(y))

  return x, y

In [None]:
def get_classes(data):
  X = []
  Y = []

  for sentence in data:
    x, y = get_sentence_classes(sentence)
    X.append(x)
    Y.append(y)

  X = np.asarray(X)
  Y = np.asarray(Y)

  return X, Y

class custom_data_generator(Sequence):

    def __init__(self, data, batch_size):
        self.data = data
        self.batch_size = batch_size

    def __len__(self):
        return int(np.ceil(len(self.data) / float(self.batch_size)))

    def __getitem__(self, index):

        start_index = index * self.batch_size
        end_index = (index + 1) * self.batch_size

        batch = self.data[start_index : end_index]
        X_batch, Y_batch = get_classes(batch)

        max_length_X = np.max([len(x) for x in X_batch])
        max_length_Y = np.max([len(y) for y in Y_batch])

        assert(max_length_X == max_length_Y)

        vec = []
        vec = one_hot_vector(char_mapping[PAD_TOKEN],len(char_mapping))
        vec.extend(get_word_embeddings([PAD_TOKEN])[0])

        X = []
        for x in X_batch:
          padding_length = max_length_X - len(x)
          x = list(x)
          x.extend([vec] * (padding_length))
          X.append(np.asarray(x))

        Y = []
        for y in Y_batch:
          padding_length = max_length_Y - len(y)
          y = list(y)

          y.extend(one_hot_matrix([class_mapping['']] * (padding_length), len(class_mapping)))
          Y.append(np.asarray(y))

        X, Y = np.asarray(X), np.asarray(Y)
        return X, Y

In [None]:
def build_model():
   model = Sequential()
   model.add(InputLayer(input_shape=(None, 397)))

   model.add(Bidirectional(LSTM(units=256,return_sequences=True,kernel_initializer=glorot_normal(seed=500))))
   model.add(Dropout(0.5))
   model.add(Bidirectional(LSTM(units=256,return_sequences=True,kernel_initializer=glorot_normal(seed=500))))
   model.add(Dropout(0.5))
   model.add(Bidirectional(LSTM(units=256,return_sequences=True,kernel_initializer=glorot_normal(seed=500))))
   model.add(TimeDistributed(Dense(units=512,activation='relu',kernel_initializer=glorot_normal(seed=500))))
   model.add(TimeDistributed(Dense(units=512,activation='relu',kernel_initializer=glorot_normal(seed=500))))
   model.add(TimeDistributed(Dense(units=len(class_mapping),activation='softmax',kernel_initializer=glorot_normal(seed=500))))
   model.compile(loss='categorical_crossentropy', optimizer=Adam(), metrics=['accuracy'])
   return model

In [None]:
model = build_model()
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 bidirectional (Bidirection  (None, None, 512)         1339392   
 al)                                                             
                                                                 
 dropout (Dropout)           (None, None, 512)         0         
                                                                 
 bidirectional_1 (Bidirecti  (None, None, 512)         1574912   
 onal)                                                           
                                                                 
 dropout_1 (Dropout)         (None, None, 512)         0         
                                                                 
 bidirectional_2 (Bidirecti  (None, None, 512)         1574912   
 onal)                                                           
                                                        

In [None]:
def fit_model(model, epochs, batch_size, train_data, val_data):
    random.shuffle(train_data)
    random.shuffle(val_data)

    train_data = list(sorted(train_data, key=lambda item: len(remove_diacritics_line(item))))
    val_data   = list(sorted(val_data,   key=lambda item: len(remove_diacritics_line(item))))

    checkpoint_path = '/content/drive/MyDrive/ATD-WSD/Baseline-w2v/epoch{epoch:02d}.ckpt'

    checkpoint_cb = ModelCheckpoint(checkpoint_path, verbose=0)

    training_generator = custom_data_generator(train_data, batch_size)
    val_generator = custom_data_generator(val_data, batch_size)

    history =  model.fit(training_generator,validation_data=val_generator,epochs=epochs,callbacks=[checkpoint_cb])
    return history

In [None]:
history = fit_model(model, 5, 256, clean_diac_train_data, clean_diac_val_data)

Epoch 1/5


  X = np.asarray(X)
  Y = np.asarray(Y)


Epoch 2/5


  X = np.asarray(X)
  Y = np.asarray(Y)


Epoch 3/5
  1/124 [..............................] - ETA: 5:11 - loss: 0.7551 - accuracy: 0.8431

  X = np.asarray(X)
  Y = np.asarray(Y)


Epoch 4/5


  X = np.asarray(X)
  Y = np.asarray(Y)


Epoch 5/5
  2/124 [..............................] - ETA: 3:00 - loss: 0.6629 - accuracy: 0.7327

  X = np.asarray(X)
  Y = np.asarray(Y)




# 7- Checkpoint

In [None]:
joblib.dump(model, 'baseline.joblib')
filename = 'baseline.sav'
pickle.dump(model, open(filename, 'wb'))

# 8- Predict

In [None]:
def predict(line, model):
    line = remove_diacritics_line(line)

    X, _ = get_classes([line])
    predictions = model.predict(X).squeeze()

    output = ''
    for char, prediction in zip(line, predictions):
        output += char
        if char not in ARABIC_CHAR:
            continue
        output += reverse_class_mapping[np.argmax(prediction)]
    return output

def predict_text(data, model, file_name):
  for idx, line in enumerate(data):
    output = predict(line, model)
    with open(f"{file_name}_out.txt", 'a') as file:
      file.write(output + "\n")

    with open(f"{file_name}_inp.txt", 'a') as file:
      file.write(line + "\n")

In [None]:
model_file_path = 'baseline.joblib'
model = joblib.load(model_file_path)

In [None]:
test_data = read_json("/content/2528_test_wsd.json")

print('Testing data length:', len(test_data))
print("Test Sample")
print(test_data[100])

Testing data length: 2528
Test Sample
قَوْلُهُ لَمْ يَجُزْ لَهُ التَّيَمُّمُ ) يُتَأَمَّلُ وَجْهُ ذَلِكَ .


In [None]:
predict_text(test_data, model, "Baseline_w2v")



# 9- Error Calculation

In [1]:
!pip install diacritization_evaluation

Collecting diacritization_evaluation
  Downloading diacritization_evaluation-0.5-py3-none-any.whl.metadata (945 bytes)
Downloading diacritization_evaluation-0.5-py3-none-any.whl (7.2 kB)
Installing collected packages: diacritization_evaluation
Successfully installed diacritization_evaluation-0.5


In [13]:
def calculate_der(original_path, predicted_path, case_ending=False ):
  with open(original_path, encoding="utf8") as file:
        original_content = file.readlines()

  with open(predicted_path, encoding="utf8") as file:
        predicted_content = file.readlines()

  avg_der = 0
  number_of_sentences = 0
  for sentence in zip(original_content, predicted_content):
    try:
      avg_der += der.calculate_der(sentence[0], sentence[1], case_ending=case_ending)
      number_of_sentences += 1
    except:
      continue

  return avg_der / number_of_sentences

def calculate_wer(original_path, predicted_path, case_ending=False ):
  with open(original_path, encoding="utf8") as file:
        original_content = file.readlines()

  with open(predicted_path, encoding="utf8") as file:
        predicted_content = file.readlines()

  avg_wer = 0
  number_of_sentences = 0
  for sentence in zip(original_content, predicted_content):
    try:
      avg_wer += wer.calculate_wer(sentence[0], sentence[1], case_ending=case_ending)
      number_of_sentences += 1
    except:
      continue

  return avg_wer / number_of_sentences

In [14]:
from diacritization_evaluation import wer, der
original_path = "/content/drive/MyDrive/ATD-WSD/Baseline_w2v_inp.txt"
predicted_path  = "/content/drive/MyDrive/ATD-WSD/Baseline_w2v_out.txt"

print(calculate_der(original_path, predicted_path))
print(calculate_wer(original_path, predicted_path))

47.48712079207926
87.47833995234295
