In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import pandas as pd
from indexer import ArabicCorpusIndexer
from lang import Lang
from torch.nn.utils.rnn import pad_sequence

# LSTM model

In [2]:
class LSTMqa(nn.Module):
    def __init__(self,vocab_size,embedding_dim,hidden_size,num_layers,dropout):
        super(LSTMqa,self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.dropout = dropout
        
        # Encoder
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.encoder_lstm = nn.LSTM(input_size= embedding_dim, hidden_size = self.hidden_size, num_layers = self.num_layers,dropout = self.dropout,batch_first=True)
        
        # Decoder
        self.decoder_lstm = nn.LSTM(input_size= embedding_dim, hidden_size = self.hidden_size, num_layers = self.num_layers,dropout = self.dropout,batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size )
        
    def forward(self, questions_seq, asnwers_seq):
        # Encoder
        embed = self.embedding(questions_seq)
        q_seq, (hidden, cell) = self.encoder_lstm(embed)
        
        # Decoder
        decoder_input = self.embedding(asnwers_seq)
        a_seq,_ = self.decoder_lstm(decoder_input, (hidden, cell))
        output = self.linear(a_seq)
        # print(f"Linear output shape {output.shape}")
        return output  

In [3]:
lang = Lang()

# Dataset class for the dataloader

In [4]:
class relDataset(Dataset):
    def __init__(self, csv_file,lang): 
        # Transform to pandas object 
        self.file_out = pd.read_excel(csv_file)
        self.file_out = self.file_out[['Question_Text','Full_Answer']]
        self.lang = lang
    
    
    def __len__(self):
        return len(self.file_out)
        
        
    def __getitem__(self,idx):
        question = self.file_out.iloc[idx]['Question_Text']
        answer = self.file_out.iloc[idx]['Full_Answer']
        
        self.lang.addSentence(question)
        self.lang.addSentence(answer)
        
        # INDEXER CLASS
        
        # q_word = question.split()
        # a_word = answer.split()
        
        # q_index = [self.indexer.get_index(word) for word in q_word]
        # a_index = [self.indexer.get_index(word) for word in a_word]
        
        # END OF INDEXER CLASS
        
        question_indices = [self.lang.word2index[word] for word in question.split() if word in self.lang.word2index]
        answer_indices = [self.lang.word2index[word] for word in answer.split() if word in self.lang.word2index]

        # To tensor 
        question_tensor = torch.tensor(question_indices)
        answer_tensor = torch.tensor(answer_indices)
        
        return {"questions": question_tensor,"answer":answer_tensor}
    
        

In [5]:
from torch.nn.utils.rnn import pad_sequence
import torch

def custom_collate_fn(batch):
    # Separate questions and answers
    questions = [item['questions'] for item in batch]
    answers = [item['answer'] for item in batch]

    # Pad sequences within the batch
    questions_padded = pad_sequence(questions, batch_first=True)
    answers_padded = pad_sequence(answers, batch_first=True)

    return {'questions': questions_padded, 'answer': answers_padded}



In [6]:
df = pd.read_excel('HAQA.xlsx')
corpus=[]
for columns in ['Question_Text','Full_Answer']:
    corpus.extend(df[columns].tolist())
# This will give an index for every word    
c_indexer = ArabicCorpusIndexer(corpus)

In [7]:
lang.n_words

2

In [8]:
HAQA_ds = relDataset('HAQA.xlsx',lang)

In [9]:
for x in HAQA_ds:
    print(x)


{'questions': tensor([2, 3, 4]), 'answer': tensor([ 5,  6,  7,  8,  9, 10, 12, 13, 14, 15, 16, 17, 19, 22, 23, 24, 25, 26,
        27, 29, 30, 31])}
{'questions': tensor([32,  3,  7, 33, 34]), 'answer': tensor([35, 36, 37, 38, 39, 40, 37, 42, 43, 38, 44, 45, 46, 47, 15, 49, 50, 51,
        52, 54, 55, 56])}
{'questions': tensor([57, 58, 59, 60, 61]), 'answer': tensor([62,  7, 63, 64, 65,  7, 67, 68, 62,  7, 63, 64, 65, 70, 71, 72, 75, 76,
        15, 77, 78, 79, 80, 81, 82, 83, 30, 31])}
{'questions': tensor([57, 85, 86, 87, 88,  4]), 'answer': tensor([ 89,  90,  91,  13,   7,  93,  15,  58,  94,  95,  96,  97,  98,  99,
        101,  42, 103, 104,  87,  13,   7, 105, 106, 107,  52, 108,   7, 109,
        110,  30,  31])}
{'questions': tensor([ 57,  58, 112,  60, 113,   4]), 'answer': tensor([114,  96, 115, 116,  51, 117, 118, 119,  24, 121, 122, 123, 124, 126,
        127, 128, 129, 130,  44, 131, 132, 133, 134, 136, 137, 138, 139])}
{'questions': tensor([ 57, 140, 141, 112, 142]), 'a

In [10]:
print(HAQA_ds.lang.index2word.get(4))

الله؟


In [11]:
for idx, sample in enumerate(HAQA_ds):
    print(idx,sample)

0 {'questions': tensor([2, 3, 4]), 'answer': tensor([  5,   6,   7,   8,   9,  10, 956, 356,  12,  13,  14,  15,  16,  17,
         19, 956, 258,  22,  23,  24,  25,  26,  27,  29,  30,  31])}
1 {'questions': tensor([32,  3,  7, 33, 34]), 'answer': tensor([  35,   36,   37,   38,   39,   40, 4980,   37,   42,   43,   38,   44,
          45,   46,   47,  956,   15,   49,   50,   51,   52,  324,  595,   54,
          55,   56])}
2 {'questions': tensor([57, 58, 59, 60, 61]), 'answer': tensor([ 62,   7,  63,  64,  65, 956, 177,   7,  67,  68,  62,   7,  63,  64,
         65,  70,  71,  72, 956,  75,  76,  15,  77,  78,  79,  80,  81,  82,
         83,  30,  31])}
3 {'questions': tensor([57, 85, 86, 87, 88,  4]), 'answer': tensor([  89,   90,   91,   13,    7,  956,  941,   93,   15,   58,   94,   95,
          96,   97,   98,   99, 4089,  101,  956,   42,  103,  104,   87,   13,
           7,  105,  106,  107,   52,  108,    7,  109,  110,   30,   31])}
4 {'questions': tensor([ 57,  58, 11

In [12]:
dataloader = DataLoader(HAQA_ds, batch_size=32, shuffle=True,collate_fn=custom_collate_fn)

In [13]:
for batch in dataloader:
    print(batch['answer'].shape)

torch.Size([32, 236])
torch.Size([32, 242])
torch.Size([32, 253])
torch.Size([32, 437])
torch.Size([32, 646])
torch.Size([32, 253])
torch.Size([32, 469])
torch.Size([32, 257])
torch.Size([32, 646])
torch.Size([32, 254])
torch.Size([32, 1637])
torch.Size([32, 236])
torch.Size([32, 286])
torch.Size([32, 286])
torch.Size([32, 410])
torch.Size([32, 253])
torch.Size([32, 157])
torch.Size([32, 348])
torch.Size([32, 253])
torch.Size([32, 646])
torch.Size([32, 646])
torch.Size([32, 646])
torch.Size([32, 356])
torch.Size([32, 348])
torch.Size([32, 242])
torch.Size([32, 516])
torch.Size([32, 646])
torch.Size([32, 469])
torch.Size([32, 242])
torch.Size([32, 380])
torch.Size([32, 469])
torch.Size([32, 426])
torch.Size([32, 268])
torch.Size([32, 646])
torch.Size([32, 356])
torch.Size([32, 225])
torch.Size([32, 410])
torch.Size([32, 437])
torch.Size([32, 410])
torch.Size([32, 348])
torch.Size([32, 356])
torch.Size([32, 437])
torch.Size([32, 380])
torch.Size([32, 646])
torch.Size([32, 253])
torch.Siz

In [14]:
from torch import optim


model = LSTMqa(vocab_size=lang.n_words, embedding_dim=8, hidden_size=4, num_layers=2, dropout=0.5)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)



def train(model, dataloader, criterion, optimizer, num_epochs=10):
    model.train()
    
    for epoch in range(num_epochs):
        total_loss = 0
        
        for batch in dataloader:
            questions, answers = batch['questions'], batch['answer']
            # print(f"questions shape {questions.shape}")
            # print(f"answers shape {answers.shape}")
            
            optimizer.zero_grad()
            outputs = model(questions,answers)
            
            outputs = outputs.view(-1, outputs.shape[-1])

            # print(f"outputs shape {outputs.shape}")
            answers = answers.view(-1)  
            # print(f"answers shape {answers.shape}")
            loss = criterion(outputs, answers)
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss/len(dataloader):.4f}')

train(model, dataloader, criterion, optimizer, num_epochs=25)


Epoch [1/25], Loss: 10.1221
Epoch [2/25], Loss: 9.8159
Epoch [3/25], Loss: 9.4899
Epoch [4/25], Loss: 9.1329
Epoch [5/25], Loss: 8.5595
Epoch [6/25], Loss: 7.9403
Epoch [7/25], Loss: 7.3896
Epoch [8/25], Loss: 6.8825
Epoch [9/25], Loss: 6.4059
Epoch [10/25], Loss: 5.9982
Epoch [11/25], Loss: 5.5947
Epoch [12/25], Loss: 5.2102
Epoch [13/25], Loss: 4.8911
Epoch [14/25], Loss: 4.4035
Epoch [15/25], Loss: 4.1453
Epoch [16/25], Loss: 3.8289
Epoch [17/25], Loss: 3.5734
Epoch [18/25], Loss: 3.2532
Epoch [19/25], Loss: 3.1213
Epoch [20/25], Loss: 2.8417
Epoch [21/25], Loss: 2.7003
Epoch [22/25], Loss: 2.5731
Epoch [23/25], Loss: 2.5822
Epoch [24/25], Loss: 2.3744
Epoch [25/25], Loss: 2.4152


In [15]:
input_sentence = "كيف نعبد الله؟"

# Assume 'lang' is an instance of your Lang class that's been populated with a vocabulary
input_tokens = [lang.word2index[word] for word in input_sentence.split(' ') if word in lang.word2index]
# Add SOS and EOS tokens
input_tokens = [0] + input_tokens + [1]


In [16]:
print(input_tokens)

[0, 2, 3, 4, 1]


In [17]:
def infer(model, input_tokens, lang, max_length=20):
    with torch.no_grad():  
        # Convert input tokens to a tensor and unsqueeze to add batch dimension (1, seq_len)
        input_tensor = torch.tensor(input_tokens).unsqueeze(0)
        
        # Encoder pass
        _, (hidden, cell) = model.encoder_lstm(model.embedding(input_tensor))
        
        # Decoder's first input is the SOS token
        decoder_input = torch.tensor([[0]])
        
        
        output_tokens = []
        
        for _ in range(max_length):
            # Decoder step
            output, (hidden, cell) = model.decoder_lstm(model.embedding(decoder_input), (hidden, cell))
            # Predict the next token
            prediction = model.linear(output.squeeze(0))
            predicted_id = prediction.argmax(1).item()
            
            # Add the predicted token ID to the sequence
            output_tokens.append(predicted_id)
            
            # Break if EOS token is generated
            if predicted_id == 1:
                break
            
            # Prepare the predicted token ID as the next input to the decoder
            decoder_input = torch.tensor([[predicted_id]])
        
        # Convert the output token IDs back to words
        output_sentence = ' '.join([lang.index2word[token_id] for token_id in output_tokens if token_id in lang.index2word])
        
        return output_sentence


In [18]:
print(infer(model,input_tokens,lang,max_length=20))

SOS SOS SOS SOS SOS SOS SOS SOS SOS SOS SOS SOS SOS SOS SOS SOS SOS SOS SOS SOS
