In [1]:
# Import necessary libraries
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
import re
import sys
import os
sys.path.insert(0, os.path.abspath('../..'))
from preprocessing.preprocess import preprocess_data

In [56]:
# lines limit
limit = 30000
dataset_path = '../../dataset'

# preprocess and save the data
preprocess_data(data_type='train', limit=limit, dataset_path=dataset_path)
preprocess_data(data_type='val', limit=limit, dataset_path=dataset_path)

# load data
with open(f'{dataset_path}/cleaned_train_data_without_diacritics.txt', 'r', encoding='utf-8') as file:
    # read all lines into a single string
    training_data = re.compile(r'[\n\r\t\s]').sub('', file.read())
with open(f'{dataset_path}/cleaned_val_data_without_diacritics.txt', 'r', encoding='utf-8') as file:
    # read all lines into a single string
    validation_data = re.compile(r'[\n\r\t\s]').sub('', file.read())
    
with open(f'{dataset_path}/cleaned_train_data_with_diacritics.txt', 'r', encoding='utf-8') as file:
    # read all lines into a single string
    training_data_with_diacritics = re.compile(r'[\n\r\t\s]').sub('', file.read())

print(len(training_data))
print(len(validation_data))
print(len(training_data_with_diacritics))
# Tokenize the text into sequences at the character level
vocab = set(''.join(training_data + validation_data))

char_to_index = {char: idx + 1 for idx, char in enumerate(vocab)}
index_to_char = {idx + 1: char for idx, char in enumerate(vocab)}

print(char_to_index)
print(index_to_char)

5030669
421099
9419289
{'ق': 1, 'ب': 2, 'ي': 3, 'ف': 4, 'أ': 5, 'و': 6, 'ر': 7, 'ط': 8, 'ة': 9, 'د': 10, 'س': 11, 'ك': 12, 'غ': 13, 'إ': 14, 'ت': 15, 'ج': 16, 'ه': 17, 'ز': 18, 'ص': 19, 'ذ': 20, 'ا': 21, 'ش': 22, 'خ': 23, 'ئ': 24, 'ء': 25, 'ؤ': 26, 'آ': 27, 'ن': 28, 'ى': 29, 'ظ': 30, 'ث': 31, 'م': 32, 'ض': 33, 'ع': 34, 'ح': 35, '~': 36, 'ل': 37}
{1: 'ق', 2: 'ب', 3: 'ي', 4: 'ف', 5: 'أ', 6: 'و', 7: 'ر', 8: 'ط', 9: 'ة', 10: 'د', 11: 'س', 12: 'ك', 13: 'غ', 14: 'إ', 15: 'ت', 16: 'ج', 17: 'ه', 18: 'ز', 19: 'ص', 20: 'ذ', 21: 'ا', 22: 'ش', 23: 'خ', 24: 'ئ', 25: 'ء', 26: 'ؤ', 27: 'آ', 28: 'ن', 29: 'ى', 30: 'ظ', 31: 'ث', 32: 'م', 33: 'ض', 34: 'ع', 35: 'ح', 36: '~', 37: 'ل'}


In [20]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu"); print(device)

cuda


In [57]:
# define the diacritics unicode and their corresponding labels classes indices
# note that index 0 is reserved for no diacritic
labels = {
    # no diacritic
    0: 0,
    # fath
    1614: 1,
    # damm
    1615: 2,
    # kasr
    1616: 3,
    # shadd
    1617: 4,
    # sukun
    1618: 5,
    # tanween bel fath
    1611: 6,
    # tanween bel damm
    1612: 7,
    # tanween bel kasr
    1613: 8,
    # shadd and fath
    (1617, 1614): 9,
    # shadd and damm
    (1617, 1615): 10,
    # shadd and kasr
    (1617, 1616): 11,
    # shadd and tanween bel fath
    (1617, 1611): 12,
    # shadd and tanween bel damm
    (1617, 1612): 13,
    # shadd and tanween bel kasr
    (1617, 1613): 14
}

indicies_to_labels = {
    # no diacritic
    0: 0,
    # fath
    1: 1614,
    # damm
    2: 1615,
    # kasr
    3: 1616,
    # shadd
    4: 1617,
    # sukun
    5: 1618,
    # tanween bel fath
    6: 1611,
    # tanween bel damm
    7: 1612,
    # tanween bel kasr
    8: 1613,
    # shadd and fath
    9: (1617, 1614),
    # shadd and damm
    10: (1617, 1615),
    # shadd and kasr
    11: (1617, 1616),
    # shadd and tanween bel fath
    12: (1617, 1611),
    # shadd and tanween bel damm
    13: (1617, 1612),
    # shadd and tanween bel kasr
    14: (1617, 1613)
}

print(labels)
print(indicies_to_labels)

{0: 0, 1614: 1, 1615: 2, 1616: 3, 1617: 4, 1618: 5, 1611: 6, 1612: 7, 1613: 8, (1617, 1614): 9, (1617, 1615): 10, (1617, 1616): 11, (1617, 1611): 12, (1617, 1612): 13, (1617, 1613): 14}
{0: 0, 1: 1614, 2: 1615, 3: 1616, 4: 1617, 5: 1618, 6: 1611, 7: 1612, 8: 1613, 9: (1617, 1614), 10: (1617, 1615), 11: (1617, 1616), 12: (1617, 1611), 13: (1617, 1612), 14: (1617, 1613)}


In [None]:
with open(f'{dataset_path}/cleaned_train_data_with_diacritics.txt', 'r', encoding='utf-8') as f:
    for i in range(10):
        line = f.readline()
        for char in line:
            print(f'{char} : {ord(char)}')

In [1]:
with open(f'{dataset_path}/cleaned_train_data_without_diacritics.txt', 'r', encoding='utf-8') as file:
    training_data = file.readlines()
    for i in range(len(training_data)):
        training_data[i] = re.compile(r'[\n+\r+\t+\s+]').sub('', training_data[i])
    
with open(f'{dataset_path}/cleaned_train_data_with_diacritics.txt', 'r', encoding='utf-8') as file:
    training_data_with_diacritics = file.readlines()
    for i in range(len(training_data_with_diacritics)):
        training_data_with_diacritics[i] = re.compile(r'[\n+\r+\t+\s+]').sub('', training_data_with_diacritics[i])

print(len(training_data))
print(len(training_data_with_diacritics))

NameError: name 'dataset_path' is not defined

In [58]:
# build one array that holds all sequences of training data
training_data_sequences = [char_to_index[char] for char in training_data]
print(training_data_sequences[:10])
print(len(training_data_sequences))

fixed_sequence_length = 50

# Create fixed-length sequences
fixed_sequences = [training_data_sequences[i:i+fixed_sequence_length] for i in range(0, len(training_data_sequences), fixed_sequence_length)]

# Pad 0 to last sequence if it is less than fixed_sequence_length
if len(fixed_sequences[-1]) < fixed_sequence_length:
    fixed_sequences[-1] += [0] * (fixed_sequence_length - len(fixed_sequences[-1]))

training_data_sequences = torch.tensor(fixed_sequences)

[1, 6, 37, 17, 5, 6, 1, 8, 34, 21]
5030669


In [None]:
training_data_with_diacritics = re.compile(r'[\n+\r+\t+\s+]').sub('', 'قَوْلُهُ أَوْ قَطَعَ الْأَوَّلُ يَدَهُ إلَخْ قَالَ الزَّرْكَشِيُّ')
print(training_data_with_diacritics)
print(training_data_with_diacritics[-1])
for char in training_data_with_diacritics:
    print(f'{char} : {ord(char)}')

In [76]:
print(char_to_index)

{'و': 0, 'ء': 1, 'ى': 2, 'ض': 3, 'ث': 4, 'ج': 5, 'ة': 6, 'ؤ': 7, 'ق': 8, 'ن': 9, 'ف': 10, 'ذ': 11, 'ع': 12, 'د': 13, 'ز': 14, 'ب': 15, 'خ': 16, 'ي': 17, 'أ': 18, 'ت': 19, 'غ': 20, 'م': 21, 'ه': 22, 'ط': 23, 'ل': 24, 'ر': 25, 'س': 26, 'إ': 27, 'ك': 28, 'ش': 29, 'آ': 30, 'ص': 31, 'ئ': 32, 'ح': 33, 'ظ': 34, 'ا': 35}


In [59]:
training_data_labels = []
training_size = len(training_data_with_diacritics)
index = 0
while index < training_size:
    if ord(training_data_with_diacritics[index]) not in labels:
        # char is not a diacritic
        if (index + 1) < training_size and ord(training_data_with_diacritics[index + 1]) in labels:
            # char has a diacritic
            if ord(training_data_with_diacritics[index + 1]) == 1617:
                # char has a shadd diacritic
                if (index + 2) < training_size and ord(training_data_with_diacritics[index + 2]) in labels:
                    # char has a shadd and another diacritic
                    training_data_labels.append(labels[(1617, ord(training_data_with_diacritics[index + 2]))])
                    # skip next 2 diacritics chars
                    index += 3  # increment by 3 to skip two diacritic chars
                    continue
                else:
                    # char has a shadd and no other diacritic
                    training_data_labels.append(labels[1617])
                    # skip next diacritic char
                    index += 2
                    continue
            # char has a diacritic other than shadd
            training_data_labels.append(labels[ord(training_data_with_diacritics[index + 1])])
            # skip next diacritic char
            index += 2  # increment by 2 to skip one diacritic char
            continue
        else:
            # char has no diacritic
            training_data_labels.append(0)
    index += 1  # increment by 1 for normal iteration

print(len(training_data_labels))
print(training_data_labels[:10])

# Create fixed-length sequences
fixed_labels = [training_data_labels[i:i+fixed_sequence_length] for i in range(0, len(training_data_labels), fixed_sequence_length)]

# Pad 0 to last sequence if it is less than fixed_sequence_length
if len(fixed_labels[-1]) < fixed_sequence_length:
    fixed_labels[-1] += [0] * (fixed_sequence_length - len(fixed_labels[-1]))

training_data_labels = torch.tensor(fixed_labels)

5030669
[1, 5, 2, 2, 1, 5, 1, 1, 1, 0]


In [60]:
dataset = TensorDataset(training_data_sequences, training_data_labels)

batch_size = 32
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# LSTM

## LSTM Class

In [61]:
class CharLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_size, hidden_size, output_size, drop_prob=0.5, num_layers=1):
        super(CharLSTM, self).__init__()
        # chars embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_size)
        
        # LSTM layers
        # batch_first: it means that the input tensor has its first dimension representing the batch size
        # TODO: BLSTM
        self.lstm = nn.LSTM(input_size=embedding_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)
        
        # Drop out layer, how likely would it drop some neurons (assign zeros to them)
        self.dropout = nn.Dropout(drop_prob)
        
        # output layer
        self.output = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        embedded = self.embedding(x) # batch_size * seq_length * embedding_size
        lstm_out, _ = self.lstm(embedded) # batch_size * seq_length * hidden_size
        after_dropout = self.dropout(lstm_out) # batch_size * seq_length *  hidden_size
        output = self.output(after_dropout)  # batch_size * seq_length * output_size
        output_softmax = F.softmax(output, dim=1)  # Apply softmax to the output
        return output_softmax
    
num_layers = 2
vocab_size = len(char_to_index) + 1 # +1 for the 0 padding
embedding_size = 150
output_size = len(labels)
hidden_size = 256
drop_prob = 0.5
lr=0.001

model = CharLSTM(vocab_size, embedding_size,  hidden_size, output_size, drop_prob, num_layers)

print(model)

CharLSTM(
  (embedding): Embedding(38, 150)
  (lstm): LSTM(150, 256, num_layers=2, batch_first=True)
  (dropout): Dropout(p=0.5, inplace=False)
  (output): Linear(in_features=256, out_features=15, bias=True)
)


## Training part

In [62]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
num_epochs = 5
for epoch in range(num_epochs):
    total_loss = 0
    correct_predictions = 0
    total_predictions = 0
    for batch_sequences, batch_labels in dataloader:
        optimizer.zero_grad()
        outputs = model(batch_sequences).float() # batch_size * seq_length * output_size
        # convert batch_labels to one hot encoding
        batch_labels_one_hot = F.one_hot(batch_labels, num_classes=output_size).float() # batch_size * seq_length * output_size
        
        loss = criterion(outputs, batch_labels_one_hot)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        
        # Calculate accuracy TODO: make it on validation not on training
        predicted_labels = outputs.argmax(dim=2)  # Get the index with the maximum probability
        correct_predictions += (predicted_labels == batch_labels).sum().item()
        total_predictions += batch_labels.numel()
        
    accuracy = correct_predictions / total_predictions
    print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {total_loss}, Accuracy: {accuracy * 100:.2f}%')


Epoch 1/5, Loss: 40125.60154533386, Accuracy: 40.61%
Epoch 2/5, Loss: 40013.02126789093, Accuracy: 44.71%
Epoch 3/5, Loss: 39985.91010475159, Accuracy: 44.80%
Epoch 4/5, Loss: 39966.6490354538, Accuracy: 45.41%
Epoch 5/5, Loss: 39955.23589515686, Accuracy: 45.91%


## Test LSTM

In [63]:
def lstm_predict(model, sentence):
    model.eval() # evaluation mode
    sentence = [char_to_index[char] for char in sentence]
    
    # Create fixed-length sequences
    fixed_sequences = [sentence[i:i+fixed_sequence_length] for i in range(0, len(sentence), fixed_sequence_length)]

    # Pad 0 to last sequence if it is less than fixed_sequence_length
    if len(fixed_sequences[-1]) < fixed_sequence_length:
        fixed_sequences[-1] += [0] * (fixed_sequence_length - len(fixed_sequences[-1]))

    sentence_sequences = torch.tensor(fixed_sequences).view(1, -1)  # Assuming batch size 1

    print(sentence_sequences.shape)
    outputs = model(sentence_sequences)
    print(outputs.shape)
    outputs = outputs.argmax(dim=2)
    print(outputs.shape)
    outputs = outputs.tolist()
    print(outputs)
    diacritics = []
    for output in outputs:
        for index in output:
            predicted_class = indicies_to_labels[index]
            if type(predicted_class) is tuple:
                diacritics.append(chr(predicted_class[0]) + chr(predicted_class[1]))
            elif predicted_class == 0:
                diacritics.append('')
            else:
                diacritics.append(chr(predicted_class))
    return diacritics[:len(sentence)]

In [64]:
test_sentence = 'قوله أو قطع الأول يده إلخ قال الزركشي'
test_sentence = re.compile(r'[\n+\r+\t+\s+]').sub('', test_sentence)
predicted_diacritics = lstm_predict(model, test_sentence)

diacritized_sentence = ''
for i in range(len(test_sentence)):
    diacritized_sentence += test_sentence[i] + predicted_diacritics[i]

print(diacritized_sentence)

torch.Size([1, 50])
torch.Size([1, 50, 15])
torch.Size([1, 50])
[[3, 5, 3, 2, 1, 5, 1, 3, 3, 0, 5, 1, 5, 3, 12, 3, 3, 3, 12, 3, 1, 0, 3, 0, 5, 9, 5, 3, 3, 10, 3, 1, 5, 12, 7, 13, 11, 14, 8, 14, 14, 12, 12, 12, 14, 14, 12, 14, 12, 12]]
قِوْلِهُأَوْقَطِعِالْأَوْلِيًّدِهِإِلًّخِقَالِالْزَّرْكِشِيُّ
