In [5]:
import torch
import torch.nn as nn
import re
import os
import numpy as np
import string
import torch.optim as optim
import random
import ast
import time
from bs4 import BeautifulSoup

In [7]:
# implementation of a paper : 

class model(nn.Module): 

    def __init__(self, data, batch_size ,embedding_size, hidden_size,num_layers ,dropout, teacher_forcing_ratio, learning_rate):
        super().__init__()
        
        '''
        our input data are arabic words with their roots, with the hypothesis that each word has it's own root.
        
        (our dataset for this is named root_data)
        '''
        self.sow = '$'
        self.eow = '£'
        self.lr = learning_rate
        self.ratio = 0.9
        self.batch_size = batch_size
        self.data = data
        self.batches, self.vocab, self.char_index_dic = self.prepare_data(self.data)
        self.embedding_size = embedding_size
        self.hidden_size = hidden_size
        self.input_dense = nn.Linear(self.hidden_size * 2,self.embedding_size)
        
        self.num_layers = num_layers
        
        self.dropout = dropout
        self.embedding = nn.Embedding(num_embeddings = len(self.vocab), embedding_dim = self.embedding_size, padding_idx = self.char_index_dic['%']) 
        
        self.Dropout = nn.Dropout(self.dropout / 2)
        
        #self.bigru = nn.GRU(input_size=self.embedding_size, hidden_size=self.hidden_size, num_layers=self.num_layers, bidirectional=True, batch_first=True)
        
        self.BILSTM = nn.LSTM(input_size=self.embedding_size, hidden_size=self.hidden_size, num_layers=self.num_layers, bidirectional=True, batch_first=True, dropout = self.dropout)

        
        #self.gru = nn.GRU(input_size= self.embedding_size ,hidden_size = self.hidden_size * 2, num_layers = self.num_layers, batch_first = True)
        self.LSTM = nn.LSTM(input_size= self.embedding_size ,hidden_size = self.hidden_size*2, num_layers = self.num_layers, batch_first = True , dropout = self.dropout)
                
        self.criterion = nn.CrossEntropyLoss(ignore_index =self.char_index_dic['%'])
        self.teacher_forcing_ratio = teacher_forcing_ratio
        self.Linear = nn.Linear(self.hidden_size * 2,len(self.vocab))
        #self.Linear = nn.Linear(self.hidden_size * 2,1)
        
        #self.optimizer = optim.Adam([*self.BILSTM.parameters(), *self.LSTM.parameters()], lr = 0.1)
        #self.optimizer = optim.AdamW(self.parameters(), lr = self.lr)
        #self.optimizer = optim.RMSprop([*self.LSTM.parameters(), *self.BILSTM.parameters()], lr = self.lr)
        #self.optimizer = optim.Adamax([*self.LSTM.parameters(), *self.BILSTM.parameters()], lr=self.lr)
        #self.opt1 = optim.RMSprop(self.BILSTM.parameters(), lr = self.lr)

        self.opt1 = optim.Adam(self.BILSTM.parameters(), lr = self.lr )
        self.opt2 = optim.Adam([*self.LSTM.parameters(), *self.input_dense.parameters()], lr = self.lr)
    
    
    def prepare_data(self, data):
    
        #Le'ts create a padding for ouriinstances : 

        pad_char = '%'
        padded_data = []
        ls_words = []
        ls_roots = []
        for instance in data : 
            ls_words.append(instance[0])
            ls_roots.append(instance[1])
        
        # Let's calculate the biggest length
        max_len_words = max([len(item) for item in ls_words])
        max_len_roots = max([len(item) for item in ls_roots])

        # Now we pad the word until we reach the max length
        for instance in data: 
            tmp = []
            word,root = instance[0], instance[1]
            while(len(word) != max_len_words):
                word += pad_char
            tmp.append(word)
            while(len(root) != max_len_roots):
                root += pad_char
            tmp.append(root)
            padded_data.append(tmp)

        # let's create our vocab : 

        vocab = []
        for word in padded_data :
            for item in word : 
                tmp = set(item)
                for k in tmp : 
                    if k not in vocab : 
                        vocab.append(k)

        # Let's create our dictionnary with unique indexes

        char_to_idx_map = {char: idx for idx, char in enumerate(vocab)}

        # Let's now split our data to batches

        final_data = []
        for instance in padded_data : 
            tmp = []
            word = [char_to_idx_map[char] for char in instance[0]]
            root = [char_to_idx_map[char] for char in instance[1]]
            tmp.append(word)
            tmp.append(root)
            final_data.append(tmp)

        size= self.batch_size 
        batches = [final_data[i:i + size] for i in range(0, len(final_data), size)]
        
        return batches , vocab , char_to_idx_map
    
    
    def word_to_seq(self, word):
        '''
        this function returns a sequence of the unique indexes for the given word 
        (sequence is tensor that can be changed using a .tolist() )
        '''
        word_char_idx_seq =[self.char_index_dic[char] for char in word]    
        return word_char_idx_seq # word sequence
    
    
    # Let's now construct our model : 
    
    # we should think about character embeddings in order to create an embeded matrix for each word
        
    
    
    def encode(self, batch):    
        '''
        input : a batch of sequences of instances : [word_seq , root_seq] * batch_size
                input_size : (input_size,2)
        '''
        
        word_batch = [] # list of words in the batch
        root_batch = [] # list of roots in the batch
        
        for instance in batch : 
            word_batch.append(instance[0])
            root_batch.append(instance[1])
            
        word_batch = torch.tensor(word_batch)
        root_batch = torch.tensor(root_batch)
        
        # we create embedding of the word batch : 
        
        embedded_word_batch = self.embedding(word_batch)
        
        
        
        init_hid = nn.init.xavier_normal_(torch.zeros(2*self.num_layers, len(batch), self.hidden_size), gain=0.5)
        init_ce = nn.init.xavier_normal_(torch.zeros(2*self.num_layers, len(batch), self.hidden_size), gain=0.5)
            
        outputs, (hidden, cell) = self.BILSTM(embedded_word_batch, (init_hid, init_ce)) # we pass the emebedded vector through the bi-GRU 
    
        # hidden size : [2 * num_layers, batch_size , hidden_size]
        
        # we want hidden size : [num_layers , batch_size  , 2 * hidden_size]
        
        # we return an adequate layer for the decoder : 
        
        final_hid, final_ce = [], []
        for k in range(0,hidden.size(0), 2):
            
            tmp_hid = hidden[k:k+2 , :, :]
            tmp_ce = cell[k:k+2, :, :]
            
            
            cct_hid = torch.cat((tmp_hid[0], tmp_hid[1]), dim  = 1).tolist()
            cct_ce = torch.cat((tmp_ce[0], tmp_ce[1]), dim  = 1).tolist()
            
            final_hid.append(cct_hid)
            final_ce.append(cct_ce)
        
        final_hid, final_ce = torch.tensor(final_hid), torch.tensor(final_ce)
    
        return root_batch , outputs ,(final_hid, final_ce)
        
    
    def decode(self, encoder_outputs ,encoder_hidden_cell , batch, teacher_forcing_bool, epoch):
        
        '''
        input : encoding_hidden_layer => corresponds to the concatenation of the final hidden layers 
                                        of the bidirectionnal gru in our encoder
                
                batch : subset of data that contains the roots of the words we encoded.
                
        output : we'll see :) 
        
        '''

        (hidden_layer , cell) , root_batch = encoder_hidden_cell , batch 
                        
        embedded_char = self.embedding(torch.unsqueeze(root_batch[:, 0], 1))
            
        outputs = []
        
        #topk_indexes = []
        
        for i in range(root_batch.size(1)): 
            
            self.Dropout(embedded_char)
            
            decoder_output , (hidden_layer, cell) = self.LSTM(embedded_char, (hidden_layer, cell))
                        
            # Let's calculate the scores  :

            input_decoder_output = self.input_dense(decoder_output)
            
            embedded_char = input_decoder_output
    
            mask = np.where([random.random() <= (self.teacher_forcing_ratio) for i in range(root_batch.size(0))])[0]
            
            teacher_forcing_input = self.embedding(torch.unsqueeze(torch.clone(root_batch[:, i]), 1))
            
            if teacher_forcing_bool : 

                embedded_char[mask] = teacher_forcing_input[mask] 
                
            Dense_decoded_output = self.Linear(decoder_output)
            
            soft = nn.Softmax(dim = 2)
            
            soft_out = soft(Dense_decoded_output)

            #tst = torch.squeeze(soft_out, 1)
            
            #[128, 39]
            
            #tmp = torch.topk(tst, 3, dim = 1).tolist()
            
            #topk_indexes.append(tmp)
            
            outputs.append(soft_out)
            
            
        return outputs 
                            
        
    
    def train_model(self, batches, teacher_forcing_bool, epoch):
                
        train_batches = batches        
         
        epoch_loss = 0
        
        n = 0            
                
        test_word = '$' + 'تحليل' + '£'
        
        for batch in train_batches :
            
            #print(self.predict(test_word))
        
            self.opt1.zero_grad()
            self.opt2.zero_grad()

            root_batch, encoder_output, encoder_states = self.encode(batch)

            outputs = self.decode(encoder_output,encoder_states, root_batch, teacher_forcing_bool, epoch)

            a = [torch.squeeze(item, 1) for item in outputs]
            a = [torch.unsqueeze(item, 0) for item in a]

            output = torch.cat(a, dim = 0)
                        
            output_dim = output.shape[-1]

            output = output.view(-1, output_dim)
            
            trg = root_batch.transpose(0, 1)
    
            trg = trg.reshape(-1)
        
            loss = self.criterion(output, trg)
        
            loss.backward()

            torch.nn.utils.clip_grad_norm_([*self.LSTM.parameters(), *self.BILSTM.parameters()], 1)

            self.opt1.step()
            self.opt2.step()
            
            #self.optimizer.step()

            epoch_loss+=loss.item()

            n+=1

            print('the loss of the train batch ', n ,' is : ', loss.item())
    
        return epoch_loss/n

    def evaluate_model(self, batches, teacher_forcing_bool, epoch):
        '''
        this method evaluates our model :=)
        will be similar to train but without the teacher forcing/ using an optimizer 
        '''          
        self.eval()

        val_batches = batches

        n = 0

        epoch_loss = 0
        
        with torch.no_grad() :

            for batch in val_batches :

                root_batch, encoder_output ,encoder_states = self.encode(batch)

                outputs = self.decode(encoder_output ,encoder_states, root_batch, teacher_forcing_bool, epoch)

                a = [torch.squeeze(item, 1) for item in outputs]
                a = [torch.unsqueeze(item, 0) for item in a]

                output = torch.cat(a, dim = 0)

                output_dim = output.shape[-1]

                output = output.view(-1, output_dim)

                trg = root_batch.transpose(0, 1)

                trg = trg.reshape(-1)
                
                #print(output.size(), trg.size())
                
                loss = self.criterion(output, trg)

                epoch_loss+=loss.item()

                n+=1

                print('the loss of the val batch ', n ,' is : ', loss.item())

        return epoch_loss / n
    
    def predict(self, word):
        '''
        this is the adaptation of encoder-decoder network on a single word w/o optimization
        '''
        
        # Let's turn the word into a sequence of word indexes 
        word_seq = self.word_to_seq(word)

        # Let's create an embedding of the word seq
        embedded_word = self.embedding(torch.tensor(word_seq))

        
        init_hid = nn.init.xavier_normal_(torch.zeros(2*self.num_layers, self.hidden_size), gain=0.5)
        init_ce = nn.init.xavier_normal_(torch.zeros(2*self.num_layers, self.hidden_size), gain=0.5)
        # Let's feed our word embedding to the encoder network
        outputs, (hidden, cell) = self.BILSTM(embedded_word, (init_hid, init_ce))
        
        #print(hidden.size())
        
        final_hid, final_ce = [], []
        for k in range(0,hidden.size(0), 2):
            
            tmp_hid = hidden[k:k+2 ,:]
            tmp_ce = cell[k:k+2, :]

            cct_hid = torch.cat((tmp_hid[0], tmp_hid[1]), dim  = -1).tolist()
            cct_ce = torch.cat((tmp_ce[0], tmp_ce[1]), dim  = -1).tolist()

            final_hid.append(cct_hid)
            final_ce.append(cct_ce)
        
        final_hidden, final_cell = torch.tensor(final_hid), torch.tensor(final_ce)

        #initialize the input of the decoder

        embedded_char = torch.unsqueeze(self.embedding(torch.tensor(self.char_index_dic[self.sow])), 0)

        prediction_output = [] # a list of the outputs of the decoder 
     
        # we create a softmax layer : 

        soft = nn.Softmax(dim = 1)
        
        key_list = list(self.char_index_dic.keys())
        val_list = list(self.char_index_dic.values())
        
        for i in range(5):
                        
            decoder_output , (final_hidden, final_cell) = self.LSTM(embedded_char, (final_hidden, final_cell))

            input_dense = nn.Linear(self.hidden_size * 2,self.embedding_size)
            input_decoder_output = input_dense(decoder_output)

            embedded_char = input_decoder_output

            Dense_decoded_output = self.Linear(decoder_output)
            prediction_output.append(soft(Dense_decoded_output).tolist())

        prediction_output = torch.squeeze(torch.tensor(prediction_output), 1)
        
        #print(prediction_output.size())
        
        test_word_seq = word_seq[1:]
        test_word_seq = test_word_seq[:-1]
        
        precision = 5
        
        top_idx = torch.topk(prediction_output, precision, dim = 1).indices

        
        init_char = 0
        final_char = 0        
        
        
        init_char = self.char_index_dic[self.sow]
    
        final_char = self.char_index_dic[self.eow]
       
        
        grid = []
        
        for i in range(precision): 
            for j in range(precision):
                for k in range(precision):
                    tmp = []
                    tmp.append((top_idx[1][i]).item())
                    tmp.append((top_idx[2][j]).item())
                    tmp.append((top_idx[3][k]).item())
                    grid.append(tmp)
        
        # we check the possibilities : 
        
        best_cases = []
        
        print(grid)
        
        for case in grid : 
            
            s = [item for item in case if item in set(test_word_seq)] # we select elts from a that are in l 
            b = [item for item in test_word_seq if item in set(s)] # 
            
            #print(s, b)
            
            if s == b and s != [] : 
                best_cases.append(case)
            
        
        
        # potential roots : 
        
        pot_seq = []
        
        for item in best_cases : 
            
            tmp =  item  
            if ("$" not in tmp)  and ("£" not in tmp) :
                pot_seq.append(tmp)           
            

        #best_char_indexes = [torch.argmax(item).item() for item in prediction_output]
        
        #t = torch.squeeze(torch.tensor(prediction_output), 1)
        
        #topk_out =  torch.topk(t,3,  dim = 1).indices.tolist()
        
        final_roots =[]
        
        for seq in pot_seq : 
            
            position = [val_list.index(item) for item in seq]

            result_char = [key_list[pos] for pos in position]
            predicted_root = ''.join(result_char)
            final_roots.append(predicted_root)

            
    
        return final_roots

    

    def fit(self, num_epochs):
        
        """
        let's first prepare our data
        
        """
        
        print(f'The model has {self.count_parameters():,} trainable parameters')
        
        data = self.data
        
        data = random.sample(data, len(data))
        data_size = len(data)
        middle_index = int(data_size * self.ratio)        
        train_data , val_data = data[:middle_index], data[middle_index:]
        
        train_batches, voc, dic = self.prepare_data(train_data)
        val_batches ,voc , dic = self.prepare_data(val_data)
        
        epochs = list(range(num_epochs))
        
        best_val_loss = 1000
        best_model_par = 0
        
        losses =[]
        predicted_roots = []
        test_word = '$' + 'تحليل' + '£'
 
        for epoch in epochs : 
                
            print('epoch num : ', epoch) 
            print(self.char_index_dic)

            
            
            t1 = time.time()
            
            train_batches = random.sample(train_batches , len(train_batches))
            #val_batches = random.sample(val_batches, len(val_batches))
                        
            train_loss= self.train_model(train_batches, 1, epoch)
            val_loss = self.evaluate_model(val_batches, 0, epoch) # we set the teacher forcing to false            
            t2 = time.time()
            
            predicted_root = self.predict(test_word)
            print(predicted_root)
            predicted_roots.append(predicted_root)
            
            
            
            tmp = [train_loss, val_loss]
            losses.append(tmp)
            
            print('the training loss : ', train_loss , 'the val loss :', val_loss)
            print('epoch num : ' ,epoch , ' lasted : ', t2 - t1 , 'seconds')
            
            if val_loss < best_val_loss :
                
                best_val_loss = val_loss 
                best_model_par = self.state_dict()

            
        torch.save(best_model_par, 'best_model.pt')
            
        return losses
    
    def count_parameters(self):
        '''
        function to calculate the total number of parameters in the model
        '''
        return sum(torch.numel(p) for p in self.parameters() if p.requires_grad)


In [8]:
directory = 'corpus_morphological_analysis'
file_paths = []
for filename in os.listdir(directory):
    f = os.path.join(directory, filename)
    file_paths.append(f)   

In [9]:
temp_file_paths = file_paths[:4000] # we take the first 5000 files out of 29000 files 

In [10]:
# identifie le prefixe, racine et le suffixe et les placent dans un dictionnaire
def identify(word_l):
    if len(word_l) < 4 :
        return None
    dictt = {}
    dictt['word'] = word_l[0]
    # le cas s'il existe un préfixe
    if word_l[2] != '' and word_l[2] != ' ' :   
        if word_l[4] not in word_l[0]: 
            if word_l[5] in word_l[0] and word_l[5] != '': 
                dictt['prefixe'] = word_l[2]
                dictt['root'] = word_l[8]
                dictt['suffixe'] = word_l[9]
            elif word_l[3] in word_l[0] and word_l[3] != '':
                dictt['prefixe'] = word_l[2]
                dictt['root'] = word_l[3]
                dictt['suffixe'] = ''
        else :
            dictt['prefixe'] = word_l[2]
            dictt['root'] = word_l[7]
            dictt['suffixe'] = word_l[8]
    # s'il n'existe pas un préfixe
    else : 
        if word_l[2] == '' : 
            dictt['prefixe'] = word_l[2]
            dictt['root'] = word_l[6]
            dictt['suffixe'] = word_l[7]
        elif  word_l[2] == ' ' :
            dictt['prefixe'] = ''
            dictt['root'] = word_l[3]
            dictt['suffixe'] = ''    
    return dictt

In [11]:
content = []
#i = 0
for filepath in temp_file_paths :
    #print(i)
    with open(filepath, encoding='utf-8') as f :
        html = f.read()
    soup = BeautifulSoup(html, features="html.parser")
    for script in soup(["script", "style"]):
        script.extract()  
    text = soup.get_text()
    content.append(text)
    #i+=1
split_list = []
for item in content : 
    tmp = item.splitlines()
    split_list.append(tmp)
work_list = []
for k in split_list :
    l = [item for item in k if 'لا توجد نتائج لتحليل هذه الكلمة' not in item]
    tmp_l = [item.replace("#",'') for item in l]
    work_list.append(tmp_l)
final_list = []
for k in work_list :
    tst = [item.split(':') for item in k]
    final_list.append(tst)

In [12]:
# filtre la liste de mots en liste de dictionnaires
def word_to_dict_list(wordlist):
    dictlist = []
    for k in wordlist : 
        dictlist.append(identify(k))
    return dictlist

In [13]:
final = []
for k in final_list: 
    for j in k :
        s = identify(j)
        if s == None :
            continue
        final.append(identify(j))

In [14]:
def dic_to_list(listt):
    L = []
    for k in listt : 
        tmp = []
        #print(k)
        if len(k) == 4 : 
            tmp.append(k['word'])
            tmp.append(k['prefixe'])
            tmp.append(k['root'])
            tmp.append(k['suffixe'])
            L.append(tmp)
    return L
data = dic_to_list(final)

In [15]:
final_l = dic_to_list(final)

In [16]:
root_data = []
for word in data : 
    tmp =[]
    tmp.append(word[0])
    tmp.append(word[2])
    root_data.append(tmp)
#root_data

In [17]:
data_root = []
for item in root_data : 
    tmp = []
    if len(item[1]) <= 3 and len(item[1]) != 0:
        tmp.append('$'+item[0]+'£')
        tmp.append('$'+item[1]+'£')
        data_root.append(tmp)

In [18]:
print(len(data_root))
for item in data_root :
    if len(item[0])==15 or len(item[0])==16:
        data_root.pop(data_root.index(item))
print(len(data_root))
d = []
for item in data_root:
    if len(item[0]) > 4 :
        d.append(item)
print(len(d))

1563071
1563071
1363580


In [19]:
test_model = model(d[:10000], 512, 64 , 100 , 3 , 0.2 ,0.35, 0.0005)
test_model.load_state_dict(torch.load('best_model.pt'), strict=False)

<All keys matched successfully>

In [20]:
word = '$' + 'مستخدم' + '£'
test = test_model.predict(word)
test_final = []
for w in test :
    if ("$" not in w ) and ("£" not in w) :
       test_final.append(w)
test_final

[[3, 18, 18], [3, 18, 22], [3, 18, 0], [3, 18, 7], [3, 18, 19], [3, 22, 18], [3, 22, 22], [3, 22, 0], [3, 22, 7], [3, 22, 19], [3, 0, 18], [3, 0, 22], [3, 0, 0], [3, 0, 7], [3, 0, 19], [3, 3, 18], [3, 3, 22], [3, 3, 0], [3, 3, 7], [3, 3, 19], [3, 7, 18], [3, 7, 22], [3, 7, 0], [3, 7, 7], [3, 7, 19], [1, 18, 18], [1, 18, 22], [1, 18, 0], [1, 18, 7], [1, 18, 19], [1, 22, 18], [1, 22, 22], [1, 22, 0], [1, 22, 7], [1, 22, 19], [1, 0, 18], [1, 0, 22], [1, 0, 0], [1, 0, 7], [1, 0, 19], [1, 3, 18], [1, 3, 22], [1, 3, 0], [1, 3, 7], [1, 3, 19], [1, 7, 18], [1, 7, 22], [1, 7, 0], [1, 7, 7], [1, 7, 19], [22, 18, 18], [22, 18, 22], [22, 18, 0], [22, 18, 7], [22, 18, 19], [22, 22, 18], [22, 22, 22], [22, 22, 0], [22, 22, 7], [22, 22, 19], [22, 0, 18], [22, 0, 22], [22, 0, 0], [22, 0, 7], [22, 0, 19], [22, 3, 18], [22, 3, 22], [22, 3, 0], [22, 3, 7], [22, 3, 19], [22, 7, 18], [22, 7, 22], [22, 7, 0], [22, 7, 7], [22, 7, 19], [0, 18, 18], [0, 18, 22], [0, 18, 0], [0, 18, 7], [0, 18, 19], [0, 22, 18]

['تةة',
 'تةف',
 'تةك',
 'تةش',
 'تفة',
 'تفف',
 'تفك',
 'تفش',
 'تكة',
 'تكف',
 'تكك',
 'تكش',
 'فمم',
 'كمم',
 'همم']