In [113]:
import torch
from gensim.models import FastText
from gensim.test.utils import common_texts
import numpy as np
import re

# import the necessary packages
# from torch.nn import Sequential
# from torch.nn import Conv1d
# from torch.nn import MaxPool1d
# from torch.nn import LogSoftmax
from torch import flatten

# Combines arrays in a vertically stacked sequence (used for data manipulation)
from numpy import vstack

# Reads a CSV file into a DataFrame (used for loading datasets)
from pandas import read_csv

# Encodes categorical labels into numerical format (used for label preprocessing)
from sklearn.preprocessing import LabelEncoder

# Calculates the accuracy of a classification model (used for model evaluation)
from sklearn.metrics import accuracy_score

# Defines a custom dataset class for PyTorch (used for handling data)
from torch.utils.data import Dataset

# Creates a DataLoader for efficient batch processing in PyTorch (used for data loading)
from torch.utils.data import DataLoader

# Splits a dataset into training and validation sets (used for data splitting)
from torch.utils.data import random_split

# Represents a multi-dimensional matrix in PyTorch (used for tensor manipulation)
from torch import Tensor
import torch.nn as nn
import utils
# # Implements a linear layer in a neural network (used for defining neural network architecture)
# from torch.nn import Linear

# # Applies rectified linear unit (ReLU) activation function (used for introducing non-linearity)
# from torch.nn import ReLU

# # Applies sigmoid activation function (used for binary classification output)
# from torch.nn import Sigmoid

# # Base class for all neural network modules in PyTorch (used for creating custom models)
# from torch.nn import Module

# # Stochastic Gradient Descent optimizer (used for model optimization during training)
from torch.optim import Adam


# # embedding layer
# from torch.nn import Embedding
# from torch.nn import Dropout
# from torch.nn import ModuleList
import config as conf


In [42]:
config = conf.ConfigLoader().load_config()

Configurations : 
number_test_of_words : 10000
number_validation_of_words : 1000
classifier : lstm
embedding : fasttext
is_training : True
word_embeddings : False
character_embeddings : False
embedding_vector_size : 100
character_embedding_vector_size : 200
batch_size : 64
num_epochs : 7


In [2]:
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

  return torch._C._cuda_getDeviceCount() > 0


In [3]:
device

device(type='cpu')

In [154]:
# Define the vocabulary size and embedding dimension
vocab_size = 50
embedding_dim = 300

# Create an embedding layer
embedding = nn.Embedding(vocab_size, embedding_dim)

# Define a sequence of strings
strings = "Hello from the other world".split()

# Define a dictionary to map strings to indices
word_to_index = {w:i+1 for i,w in enumerate(strings)}
print(word_to_index)
# Convert the sequence of strings to a sequence of indices
indices = torch.LongTensor([word_to_index[word] for word in strings])
# Define a convolutional layer
conv = nn.Conv1d(in_channels=5, out_channels=1, kernel_size=2,stride=1)

# Pass the tensor through the convolutional layer
embed_out = embedding(indices)
print(embed_out.size())

conv_out = conv(embed_out)
# conv_out = [32, 16, 26, 26]
print(conv_out.size())
# pooling layer
pool = nn.MaxPool1d(kernel_size=2)
# Pass the convolved output through the pooling layer
pool_out = pool(conv_out)
print(pool_out.size())

# Flatten the output of the convolutional layer
flatten = nn.Flatten()
flat_out = flatten(pool_out)
print(flat_out.size())
# 17 * 100

{'Hello': 1, 'from': 2, 'the': 3, 'other': 4, 'world': 5}
torch.Size([5, 300])
torch.Size([1, 299])
torch.Size([1, 149])
torch.Size([1, 149])


In [1]:
# Define Diacritics
KASRA = "\u0650"
DAMMA = "\u064F"
FATHA = "\u064E"
KASRATAN = "\u064D"
DAMMATAN = "\u064C"
FATHATAN = "\u064B"
SUKUN = "\u0652"
SHADDA = "\u0651"
DAMMA_SHADDA =  DAMMA + SHADDA
SHADDA_DAMMA =  SHADDA + DAMMA
FATHA_SHADDA =  FATHA + SHADDA
SHADDA_FATHA =  SHADDA + FATHA
KASRA_SHADDA =  KASRA + SHADDA
SHADDA_KASRA =  SHADDA + KASRA
DAMMATAN_SHADDA =  DAMMATAN + SHADDA
SHADDA_DAMMATAN =  SHADDA + DAMMATAN
FATHATAN_SHADDA =  FATHATAN + SHADDA
SHADDA_FATHATAN =  SHADDA + FATHATAN
KASRATAN_SHADDA =  KASRATAN + SHADDA
SHADDA_KASRATAN =  SHADDA + KASRATAN
EMPTY = ""
DIACRITICS = [KASRA, DAMMA, FATHA, KASRATAN, DAMMATAN, FATHATAN, SUKUN, SHADDA, DAMMA_SHADDA, SHADDA_DAMMA, FATHA_SHADDA, SHADDA_FATHA, KASRA_SHADDA, SHADDA_KASRA, DAMMATAN_SHADDA, SHADDA_DAMMATAN, FATHATAN_SHADDA, SHADDA_FATHATAN, KASRATAN_SHADDA, SHADDA_KASRATAN, EMPTY]
ARABIC_ALPHABIT = "اأآإئءبتةثجحخدذرزسشصضطظعغفقكلمنهوؤيى"

### Functions

In [34]:
# filter data takes a list of strings and removes unwanted patterns
def filter_data(data: str) -> str:
    # data = re.sub(r"\( \d+ (/ \d+)? \)", "", data)
    # remove all numbers
    data = re.sub(r"\d+", "", data)
    # regex to remove all special characters
    data = re.sub(r"[][//,;\?؟()$:\-{}_*؛،:«»`–\"~!]", "", data)
    # remove all english letters
    data = re.sub(r"[a-zA-Z]", "", data)
    # Substituting multiple spaces with single space
    data = re.sub(r"([^\S\n])+", " ", data, flags=re.I)
    return data

In [35]:
def read_data(path: str) -> str:
    with open(path, "r", encoding="utf-8") as f:
        # train_set = f.read().splitlines()
        return f.read()

In [36]:
def split_data_to_words(data: str) -> list:
    words = re.split(r"\s+", data)
    return words

In [2]:
# This function is responsible for mapping diacritics to their corresponding strings
def diacritic_to_str(diacritic):
    if diacritic == SHADDA:
        diacritic = "SHADDA"
    elif diacritic == KASRA:
        diacritic = "KASRA"
    elif diacritic == DAMMA:
        diacritic = "DAMMA"
    elif diacritic == FATHA:
        diacritic = "FATHA"
    elif diacritic == KASRATAN:
        diacritic = "KASRATAN"
    elif diacritic == DAMMATAN:
        diacritic = "DAMMATAN"
    elif diacritic == FATHATAN:
        diacritic = "FATHATAN"
    elif diacritic == SUKUN:
        diacritic = "SUKUN"
    elif diacritic == DAMMA_SHADDA or diacritic == SHADDA_DAMMA :
        diacritic = "SHADDA_DAMMA"
    elif diacritic == FATHA_SHADDA or diacritic == SHADDA_FATHA:
        diacritic = "SHADDA_FATHA"
    elif diacritic == KASRA_SHADDA or diacritic == SHADDA_KASRA:
        diacritic = "SHADDA_KASRA"
    elif diacritic == DAMMATAN_SHADDA or diacritic == SHADDA_DAMMATAN:
        diacritic = "SHADDA_DAMMATAN"
    elif diacritic == FATHATAN_SHADDA or diacritic == SHADDA_FATHATAN:
        diacritic = "SHADDA_FATHATAN"
    elif diacritic == KASRATAN_SHADDA or diacritic == SHADDA_KASRATAN:
        diacritic = "SHADDA_KASRATAN"
    else:
        diacritic = " "
    return diacritic

### Data Class

In [72]:
class TxtDataset(Dataset):
    def __init__(self, path):
        super().__init__()
        self.x = read_data(path)
        self.x = filter_data(self.x).split('\n')[1] #only one line
        self.x = split_data_to_words(self.x)
        self.x = [word for word in self.x if word != ""]
        # get the diacritic of the last character of each word
        original_labels = []
        for i in range(len(self.x)):
            
            original_labels.append(diacritic_to_str(self.x[i][-1]))
            self.x[i] = self.x[i][:-1]
            if self.x[i] == "":
                self.x.pop(i)
                original_labels.pop(i)
        
        original_labels.reverse()
        self.y = LabelEncoder().fit_transform(original_labels)

        self.encoding_mapping = dict(zip(self.y, original_labels))

        # ensure the target is float
        self.y = self.y.astype('float32')
        self.y = self.y.reshape((len(self.y), 1))

        print(self.x)
        print(original_labels)
        print(self.encoding_mapping)
        print(len(self.x))
        print(len(self.y))

    # number of rows in the dataset
    # The __len__ function returns the number of samples in our dataset.
    def __len__(self):
        return len(self.x)

    # get a row at an index
    # The __getitem__ function loads and returns a sample from the dataset at the given index idx
    def __getitem__(self, idx):
        return self.x[idx], self.y[idx]

### Module Class

In [168]:
class CNN(nn.Module):
    def __init__(
        self,number_of_words ,vocab_size, embedding_dim, num_filters, filter_sizes, output_dim, dropout
    ):
        super().__init__()
        # embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        # convolutional layers
        self.convs = nn.ModuleList(
            [
                nn.Conv1d(
                    in_channels=number_of_words, out_channels=num_filters, kernel_size=fs
                )
                for fs in filter_sizes
            ]
        )
        # max pooling layers
        self.maxpools = nn.ModuleList(
            [nn.MaxPool1d(kernel_size=fs) for fs in filter_sizes]
        )

        # fully-connected layer
        self.flatten_layers = nn.ModuleList(
            [nn.Flatten() for _ in range(len(filter_sizes))]
        )
        # linear layer which calculates the output y = x * A.T + b
        self.hidden1 = nn.Linear(232, number_of_words,device=device)
        nn.init.kaiming_uniform_(self.hidden1.weight, nonlinearity='relu')
        self.act1 = nn.ReLU()
        # dropout layer
        self.dropout = nn.Dropout(dropout)
        self.hidden2 = nn.Linear(output_dim, output_dim,device=device)
        nn.init.xavier_uniform_(self.hidden2.weight)

    def forward(self, list_of_strings: list):
        # Define a dictionary to map strings to indices
        word_to_index = {w:i+1 for i,w in enumerate(list_of_strings)}
        # print(word_to_index)
        # Convert the sequence of strings to a sequence of indices
        indices = torch.LongTensor([word_to_index[word] for word in list_of_strings])
        # pass text through embedding layer
        embedded = self.embedding(indices)
        print("embedded.size()",embedded.size())
        # embedded is len(list_of_strings) * embedding_dim
        # initialize list for capturing output of each convolutional layer
        conved = []
        # pass embedded through convolutional layers and apply ReLU activation function
        for i in range(len(self.convs)):
            conved.append(
                nn.ReLU()(self.convs[i](embedded))
            )
            print(f"conved[{i}].size()",conved[i].size())
        # initialize list for capturing output of each max pooling layer
        pooled = []
        # pass each output of convolutional layer through max pooling layer
        for i in range(len(self.maxpools)):
            pooled.append(
                self.maxpools[i](conved[i])
            )
            print(f"pooled[{i}].size()",pooled[i].size())
        # pooled[i] = [batch size, num_filters, 1]
        # initialize list for capturing output of each flatten layer
        flattened = []
        # pass each output of max pooling layer through flatten layer
        for i in range(len(self.flatten_layers)):
            flattened.append(
                self.flatten_layers[i](pooled[i])
            )
            print(f"flattened[{i}].size()",flattened[i].size())
        # flattened[i] = [batch size, num_filters]
        # concatenate output of each flatten layer
        cat = self.dropout(torch.cat(flattened, dim=1))
        print("cat.size()",cat.size())
        # cat = torch.cat(flattened, dim=1)
        # cat = [batch size, num_filters * len(filter_sizes)]
        # pass cat through fully-connected layer
        hedden1 = self.hidden1(cat)
        print("hedden1.size()",hedden1.size())
        act1 = self.act1(hedden1)
        print("act1.size()",act1.size())
        # hidden = [batch size, output dim]
        # pass hidden through dropout layer
        dropped1 = self.dropout(act1)
        print("dropped1.size()",dropped1.size())
        # dropped1 = [batch size, output dim]
        # # pass dropped through ReLU activation function
        # act1 = self.act1(dropped1)
        # act1 = [batch size, output dim]
        # pass act1 through fully-connected layer
        hidden2 = self.hidden2(dropped1)
        print("hidden2.size()",hidden2.size())
        # hidden2 = [batch size, output dim]
        # pass hidden2 through dropout layer
        dropped2 = self.dropout(hidden2)
        print("dropped2.size()",dropped2.size())
        # dropped2 = [batch size, output dim]
        return dropped2
    

### Model Training

In [177]:
# train the model
def train_model(train_dl, model):
    # define the optimization
    criterion = nn.CrossEntropyLoss() #  Cross-Entropy
    # Stochastic Gradient Descent Optimizer
    # model.parameters(): model weights
    optimizer = Adam(model.parameters(), lr=0.01)
    # enumerate epochs
    for epoch in range(100):
        # enumerate mini batches
        for i, (inputs, targets) in enumerate(train_dl):
            print("inputs",inputs)
            print("targets",targets)
            # clear the gradients stored in the memory as the defualt behavior of pytorch is to sum the gradients
            # but we want to use gradients of each iteration separatly so we initialize the gradients of zeros
            optimizer.zero_grad()
            # compute the model output
            yhat = model(inputs)
            print("yhat",yhat)

            print("yhat.size()",yhat.size())
            print("targets.size()",targets.size())
            # calculate loss
            loss = criterion(yhat, targets)
            # credit assignment
            loss.backward()
            # update model weights
            optimizer.step()

### Model Evaluation

In [173]:

# evaluate the model
def evaluate_model(test_dl, model):
    predictions, actuals = list(), list()
    for i, (inputs, targets) in enumerate(test_dl):
        # evaluate the model on the test set
        yhat = model(inputs)
        # retrieve numpy array
        # detach(): bykon fe graph fe pytorch byrbot ben al tensors fa detach btfsl al tensor da 3n ali ablo
        yhat = yhat.detach().numpy()
        actual = targets.numpy()
        actual = actual.reshape((len(actual), 1))
        # round to class values
        yhat = yhat.round()
        # store
        predictions.append(yhat)
        actuals.append(actual)
    predictions, actuals = vstack(predictions), vstack(actuals)
    # calculate accuracy
    acc = accuracy_score(actuals, predictions)
    return acc


### Model Prediction

In [118]:
# make a class prediction for one row of data
def predict(row, model: nn.Module):
    # convert row to data
    model.eval() # This is equivalent with self.train(False)
    # model.eval : bt2ol lel model en de predict phase fa ma3tml4 operations mo3yna 34an twfr computations like drop out layers in NN m4 bst5dmha fel testing
    # bt2fl kol al layers ali m4 m7tagha fel tetsing also storing the gradients in cache is not needed so we avoid wasting memory
    row = Tensor([row])
    # make prediction
    yhat = model(row)
    # retrieve numpy array
    yhat = yhat.detach().numpy()

    return yhat

### Main

In [89]:
dataset = TxtDataset('./Dataset/train.txt')

['ابْن', 'عَرَفَة', 'قَوْلُه', 'بِلَفْظ', 'يَقْتَضِي', 'كَإِنْكَار', 'غَيْر', 'حَدِيث', 'بِالْإِسْلَام', 'وُجُوب', 'مَ', 'عُلِم', 'وُجُوبُه', 'مِن', 'الدِّين', 'ضَرُورَة', 'كَإِلْقَاء', 'مُصْحَف', 'بِقَذَر', 'وَشَدّ', 'زُنَّار', 'ابْن', 'عَرَفَة', 'قَوْل', 'ابْن', 'شَاس', 'أَو', 'بِفِعْل', 'يَتَضَمَّنُه', 'هُو', 'كَلُبْس', 'الزُّنَّار', 'وَإِلْقَاء', 'الْمُصْحَف', 'فِ', 'صَرِيح', 'النَّجَاسَة', 'وَالسُّجُود', 'لِلصَّنَم', 'وَنَحْو', 'ذَلِك', 'وَسِحْر', 'مُحَمَّد', 'قَوْل', 'مَالِك', 'وَأَصْحَابِه', 'أَنّ', 'السَّاحِر', 'كَافِر', 'بِاَللَّه', 'تَعَالَ', 'قَال', 'مَالِك', 'هُو', 'كَالزِّنْدِيق', 'إذَ', 'عَمِل', 'السِّحْر', 'بِنَفْسِه', 'قُتِل', 'وَلَم', 'يُسْتَتَب']
['SUKUN', 'SUKUN', 'FATHA', 'KASRA', 'FATHA', 'FATHA', ' ', 'KASRA', 'FATHA', 'DAMMATAN', 'FATHA', ' ', 'KASRA', 'DAMMATAN', 'FATHA', 'FATHA', 'KASRA', 'KASRATAN', 'DAMMA', 'DAMMATAN', 'KASRATAN', 'FATHA', 'KASRA', 'KASRA', 'KASRA', 'KASRA', 'KASRA', ' ', 'KASRA', 'KASRA', 'KASRA', 'KASRA', 'FATHA', 'DAMMA', 'KASRATAN', 'SUKU

In [119]:
train, test = random_split(dataset, [dataset.__len__(),0])
train_dl = DataLoader(train, batch_size=32, shuffle=True)
test_dl = DataLoader(test, batch_size=None, shuffle=False)
encoding_mapping = dataset.encoding_mapping

In [120]:
print(train_dl)
print(len(train_dl.dataset), len(test_dl.dataset))

<torch.utils.data.dataloader.DataLoader object at 0x000001E04ABBC210>
62 0


In [179]:
vocab_size = len(dataset.x) * 2
embedding_dim = 300
model = CNN(
    32,
    vocab_size,
    embedding_dim,
    num_filters=1,
    filter_sizes=[3, 4, 5],
    output_dim=len(encoding_mapping),
    dropout=0.5,
)

In [180]:
model.train(True) # set it to False for inference 
train_model(train_dl, model)
# evaluate the model
acc = evaluate_model(test_dl, model)
print('Accuracy: %.3f' % acc)

inputs ('يَتَضَمَّنُه', 'يَقْتَضِي', 'النَّجَاسَة', 'ابْن', 'مَالِك', 'أَو', 'عَمِل', 'قَوْلُه', 'قَال', 'مِن', 'مَ', 'إذَ', 'صَرِيح', 'بِلَفْظ', 'قَوْل', 'ذَلِك', 'بِفِعْل', 'شَاس', 'كَإِنْكَار', 'كَإِلْقَاء', 'تَعَالَ', 'بِالْإِسْلَام', 'وُجُوبُه', 'بِاَللَّه', 'السَّاحِر', 'الدِّين', 'وَإِلْقَاء', 'كَافِر', 'يُسْتَتَب', 'هُو', 'وُجُوب', 'الْمُصْحَف')
targets tensor([[5.],
        [3.],
        [6.],
        [7.],
        [6.],
        [5.],
        [5.],
        [3.],
        [0.],
        [2.],
        [3.],
        [5.],
        [7.],
        [5.],
        [6.],
        [1.],
        [0.],
        [5.],
        [3.],
        [5.],
        [3.],
        [3.],
        [5.],
        [1.],
        [5.],
        [3.],
        [3.],
        [7.],
        [1.],
        [5.],
        [2.],
        [1.]])
embedded.size() torch.Size([32, 300])
conved[0].size() torch.Size([1, 298])
conved[1].size() torch.Size([1, 297])
conved[2].size() torch.Size([1, 296])
pooled[0].size() torch.Size([1, 99]

ValueError: Expected input batch_size (1) to match target batch_size (32).