In [1]:
# !pip install --upgrade pip
# !pip install gensim
# !pip install nltk
# !pip install tokenizers
# !pip install sentencepiece
# !pip install python-bidi
# !pip install arabic-reshaper
# !pip install PyArabic

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import ToTensor
# from WordFeatureExtraction import WordFeatureExtraction
import pandas as pd
from pyarabic.araby import tokenize, strip_tashkeel
from tqdm import tqdm

from sklearn.metrics import accuracy_score
import numpy as np
from utilities import *
import textProcessing as tp
import nltk
# nltk.download('punkt')
# train_text = load_text("dataset/train.txt")
# tp.preprocessing_text(train_text,"train_preprocessed.txt")


In [3]:
from gensim.models import Word2Vec
# CBOW model
class WordFeatureExtraction():

    def __init__(self, tokenized_texts,embedding_dim = 300,window_size = 5,min_count = 1,epoches=5,alpha=0.001):
        self.tokenized_texts=tokenized_texts
        self.embedding_dim = embedding_dim
        self.window_size = window_size
        self.min_count = min_count
        self.epoches=epoches
        self.alpha=alpha

    def map_words_to_vectors(self,input_list, word_vector_dict):
        output = []
        for inner_list in input_list:
            inner_output = {}
            for word in inner_list:
                if word in word_vector_dict:
                    inner_output[word] = word_vector_dict[word]
            if inner_output:
                output.append(inner_output)
        return output

    def CBOW_train(self):
      self.cbow_model = Word2Vec(sentences=self.tokenized_texts, vector_size=self.embedding_dim, window=self.window_size, sg=0, min_count=self.min_count)

    def CBOW(self):
        return self.map_words_to_vectors(self.tokenized_texts,self.cbow_model.wv)


In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class CharLevelEncoder(nn.Module):
    def __init__(self, word_embedding_dim, char_embedding_dim, hidden_dim,num_embeddings):
        """
        Initialize the CharLevelEncoder module.

        Args:
        - word_embedding_dim (int): Dimensionality of word embeddings.
        - char_embedding_dim (int): Dimensionality of character embeddings.
        - hidden_dim (int): Dimensionality of the hidden state in the LSTM.
        - num_embeddings (int): Number of unique characters to learn embeddings for.
        """
        super(CharLevelEncoder, self).__init__()
        self.word_embedding_dim = word_embedding_dim

        # Character-level embeddings
        self.char_embedding = nn.Embedding(num_embeddings=num_embeddings, embedding_dim=char_embedding_dim)  # Example: ASCII character range

        # LSTM for character-level information
        self.char_lstm = nn.LSTM(input_size=char_embedding_dim, hidden_size=hidden_dim, batch_first=True)

        # Linear layer for combining word and character embeddings
        self.linear = nn.Linear(word_embedding_dim + hidden_dim, word_embedding_dim)  # Adjust output dimensions

    def forward(self, word_embeddings):
        """
        Forward pass of the CharLevelEncoder to generate character-level embeddings for words.

        Args:
        - word_embeddings (list): List of dictionaries containing word embeddings.

        Returns:
        - char_embeddings_list (list): List of sentences, each containing character embeddings for words.
        """

        all_sentence_list = []
        for word_dict in word_embeddings:
                    sentence_char_list = []
                    for word, word_embedding in word_dict.items():
                        arabic_word_chars = list(word)

                        char_indices = [ord(char) for char in arabic_word_chars]


                         # Convert word embedding to tensor
                        word_embedding_tensor = torch.tensor(word_embedding, dtype=torch.float32)
                        word_embedding_tensor = word_embedding_tensor.view(1,-1)
                        word_embedding_tensor = word_embedding_tensor.expand(len(arabic_word_chars),-1)


                        # Get character-level embeddings

                        char_embedded = self.char_embedding(torch.tensor(char_indices))

                        # Reshape char_embedded for LSTM input
                        char_embedded = char_embedded.view(len(arabic_word_chars), 1,-1)

                        # Get character-level LSTM output
                        _, (hidden, _) = self.char_lstm(char_embedded)

                        # Concatenate word and character embeddings
                        # print( word_embedding_tensor.shape,"hell")


                        combined = torch.cat((word_embedding_tensor, hidden.squeeze(0)), dim=1)
                        # Apply linear layer to combine embeddings
                        combined = F.relu(self.linear(combined))
                        combined_reshaped = combined.view(-1, combined.size(-1))

                        char_embedding_dict = {}
                        for idx, char_idx in enumerate(char_indices):
                            char_embedding_dict[arabic_word_chars[idx]] = combined_reshaped[idx].tolist()
                        sentence_char_list.append(char_embedding_dict)

                    all_sentence_list.append(sentence_char_list)

        return sentence_char_list

In [5]:
class Embedding_Dataset(Dataset):
    def __init__(self):
        self.data = load_text("dataset/train_preprocessed.txt")
        self.CharEmbedding=CharLevelEncoder(word_embedding_dim=300, char_embedding_dim=5, hidden_dim=10,num_embeddings=1611)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):

        sentence = self.data[idx]
        # extract the label
        labels = tp.extract_diacritics_with_previous_letter(sentence)

        # remove the label from the sentence
        sentence = tp.clear_diacritics(sentence)
        tokens = [
                strip_tashkeel(t)
                for t in tokenize(sentence)
            ]
        return tokens,labels


    def collate_fn(self, batch):
          processed_sentences = []
          processed_labels = []
          for sentence, labels in batch:
              processed_sentences.append(sentence)
              processed_labels.append(labels)
          return processed_sentences, processed_labels

    def extract_sentences_word_embedding(self,train_dataloader):
      charEmbeddingVectors=[]
      labels_batches=[]
      for batch_idx, (batch_sentences, batch_labels) in enumerate(train_dataloader):
          wordEmbedding=WordFeatureExtraction(batch_sentences)
          wordEmbedding.CBOW_train()
          wordEmbeddingVector=wordEmbedding.CBOW()
          charEmbeddingVector=self.CharEmbedding(wordEmbeddingVector)
          charEmbeddingVectors.append(charEmbeddingVector)
          labels_batches.append(batch_labels)

      return charEmbeddingVectors ,labels_batches

In [6]:
class RNNClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(RNNClassifier, self).__init__()

        self.hidden_size = hidden_size
        self.input_size = input_size
        # self.wordEmbedding=WordFeatureExtraction()
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        # self.out = nn.Sequential(nn.Linear(hidden_size, 64),nn.Tanh(),nn.Linear(64, output_size),nn.Softmax(dim=1))
        self.linear1 = nn.Linear(hidden_size, 64)
        self.tanh = nn.Tanh()
        self.linear2 = nn.Linear(64, output_size)
        # self.soft = nn.Softmax(dim=1)
        # self.out = nn.Sequential(nn.Linear(hidden_size, output_size),nn.Softmax())
        self.init_weight()

    def forward(self, input, hidden):

        output, hidden = self.rnn(input, hidden)
        # reshape the output to be able to pass it to the linear layer
        # output = output.contiguous().view(-1, self.hidden_size)
        output = self.linear1(output)
        output = self.tanh(output)
        output = self.linear2(output)
        # output = self.soft(output)
        return output

    def init_hidden(self, batch_size):
        return torch.zeros(1,batch_size, self.hidden_size)
    def init_weight(self):
        for name, param in self.named_parameters():
            if 'bias' in name:
                nn.init.constant(param, 0.0)
            elif 'weight' in name:
                nn.init.xavier_normal(param)

In [7]:
# Hyperparameter
input_size = 38
hidden_size = 64
output_size = 15
batch_size = 1024*8
num_epochs = 50

In [8]:
# connect to GPU if available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)
print(torch.cuda.is_available())
print(torch.cuda.device_count())

# Create an instance of the RNN classifier
model = RNNClassifier(input_size, hidden_size, output_size)
model.to(device)
#######################################################################################
Embedding_Dataset=Embedding_Dataset()
train_dataloader = DataLoader(Embedding_Dataset, batch_size=1024*8, shuffle=True, collate_fn=Embedding_Dataset.collate_fn)
wordEmbeddingVectors ,labels_batches = Embedding_Dataset.extract_sentences_word_embedding(train_dataloader)
#########################################################################################
# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.005)

cuda:0
True
1


  nn.init.xavier_normal(param)
  nn.init.constant(param, 0.0)


In [9]:
# Training loop
model.train()
for epoch in range(num_epochs):
    for inputs, labels in train_dataloader:
      # Reshape input and labels to (batch_size, seq_length, input_size)
      input = inputs.view(inputs.shape[0], -1, input_size)
      labels = labels.view(inputs.shape[0], -1, output_size)
      input, labels = input.to(device), labels.to(device)
      hidden = model.init_hidden(batch_size=inputs.shape[0])
      hidden = hidden.to(device)

      # Forward pass
      output = model(input, hidden)
      # Compute loss
      loss = criterion(output, labels)
      # Zero the gradients
      optimizer.zero_grad()
      # Backward pass and optimization
      loss.backward()
      optimizer.step()
      # Print loss for monitoring
      print(f"Epoch: {epoch+1}, Batch Loss: {loss.item()}")



AttributeError: ignored