In [None]:
import numpy as np
import csv
import pandas as pd
import os
import torch.optim as optim
import torch
from tqdm import tqdm
import math
import matplotlib.pyplot as plt
from matplotlib.font_manager import FontProperties
import torch.nn as nn
import random
import heapq
import torch.nn.functional as F
import wandb
from torch.nn.utils import clip_grad_norm_
import warnings
warnings.filterwarnings("ignore")
import seaborn




In [None]:
val=torch.cuda.is_available()
if val == 1:
    device= torch.device('cuda')
else:
    device = torch.device('gpu')
    
print(device)

In [None]:
def loadData(params):
    dataset_path = params['dataset_path']
    train_data = csv.reader(open(dataset_path + '/hin/hin_train.csv',encoding='utf8'))
    val_data = csv.reader(open(dataset_path + '/hin/hin_valid.csv',encoding='utf8'))
    test_data = csv.reader(open(dataset_path + '/hin/hin_test.csv',encoding='utf8'))
    train_translations = []
    test_words=[]
    val_translations = []
    val_words=[]
    train_words =[]
    test_translations = []
    pad=''
    start='$'
    end ='&' 
    train_data_list = list(train_data)
    train_len = len(train_data_list)
    i = 0
    while i < train_len:
        pair = train_data_list[i]
        train_words.append(pair[0] + end)
        train_translations.append(start + pair[1] + end)
        i += 1  
    
    i=0
    val_data_list = list(val_data)
    val_len = len(val_data_list)
    while i < val_len :
        pair=val_data_list[i]
        val_words.append(pair[0]+end)
        val_translations.append(start+pair[1]+end)
        i+=1
        
    i=0
    test_data_list = list(test_data)
    test_len = len(test_data_list)
    while i < test_len :
        pair=test_data_list[i]
        test_words.append(pair[0]+end)
        test_translations.append(start+pair[1]+end)
        i+=1   
        
 
    
    test_words =np.array(test_words)
    train_translations = np.array(train_translations)
    val_translations =np.array(val_translations)
    train_words = np.array(train_words)
    test_translations = np.array(test_translations)
    val_words =np.array(val_words)
    
    
    
    output_vocab,input_vocab = set() , set()
    i = 0
    word_len=len(train_words)
    while i < word_len :
        word = train_words[i]
        character_index = 0
        while character_index < len(word):
            character = word[character_index]
            input_vocab.add(character)
            character_index += 1
        i += 1         
    
    
    i = 0
    word_len=len(val_words)
    while i < word_len :
        word = val_words[i]
        character_index = 0
        while character_index < len(word):
            character = word[character_index]
            input_vocab.add(character)
            character_index += 1
        i += 1
    
    i = 0
    word_len=len(test_words)
    while i < word_len :
        word = test_words[i]
        character_index = 0
        while character_index < len(word):
            character = word[character_index]
            input_vocab.add(character)
            character_index += 1
        i += 1

    i = 0
    word_len=len(train_translations)
    while i < word_len :
        word = train_translations[i]
        character_index = 0
        while character_index < len(word):
            character = word[character_index]
            output_vocab.add(character)
            character_index += 1
        i += 1
        
    i = 0
    word_len=len(val_translations)
    while i < word_len :
        word = val_translations[i]
        character_index = 0
        while character_index < len(word):
            character = word[character_index]
            output_vocab.add(character)
            character_index += 1
        i += 1
        
    i = 0
    word_len=len(test_translations)
    while i < word_len :
        word = test_translations[i]
        character_index = 0
        while character_index < len(word):
            character = word[character_index]
            output_vocab.add(character)
            character_index += 1
        i += 1
    
    output_vocab.remove(start)
    input_vocab.remove(end)
    output_vocab.remove(end)
    
    output_vocab= [pad, start, end] + list(sorted(output_vocab))
    input_vocab = [pad, start, end] + list(sorted(input_vocab))
            
 
    output_index,input_index = {char: idx for idx, char in enumerate(output_vocab)},{char: idx for idx, char in enumerate(input_vocab)}
    output_index_rev,input_index_rev = {idx: char for char, idx in output_index.items()},{idx: char for char, idx in input_index.items()}
    

    max_len = max(max([len(word) for word in np.hstack((train_words, test_words, val_words))]), max([len(word) for word in np.hstack((train_translations, val_translations, test_translations))]))
        
    preprocessed_data = {
        'SOS' : start,
        'EOS' : end,
        'PAD' : pad,
        'train_words' : train_words,
        'train_translations' : train_translations,
        'val_words' : val_words,
        'val_translations' : val_translations,
        'test_words' : test_words,
        'test_translations' : test_translations,
        'max_enc_len' : max([len(word) for word in np.hstack((train_words, test_words, val_words))]),
        'max_dec_len' : max([len(word) for word in np.hstack((train_translations, val_translations, test_translations))]),
        'max_len' : max_len,
        'input_index' : input_index,
        'output_index' : output_index,
        'input_index_rev' : input_index_rev,
        'output_index_rev' : output_index_rev
    }
    return preprocessed_data

In [None]:
def create_tensor(preprocessed_data):
    prop_data=preprocessed_data['max_len']
    leng=len(preprocessed_data['train_words'])
    d_type='int64'
    input_data = np.zeros((prop_data,leng), dtype = d_type)
    output_data = np.zeros((prop_data,leng), dtype = d_type)
    leng=len(preprocessed_data['val_words'])
    val_input_data = np.zeros((prop_data,leng), dtype = d_type)
    val_output_data = np.zeros((prop_data,leng), dtype = d_type)
    leng=len(preprocessed_data['test_words'])
    test_input_data = np.zeros((prop_data,leng), dtype = d_type)
    test_output_data = np.zeros((prop_data,leng), dtype = d_type)
    

    idx = 0
    while idx < len(preprocessed_data['train_words']):
        w = preprocessed_data['train_words'][idx]
        t = preprocessed_data['train_translations'][idx]

        i = 0
        while i < len(w):
            char = w[i]
            input_data[i, idx] = preprocessed_data['input_index'][char]
            i += 1

        i = 0
        while i < len(t):
            char = t[i]
            output_data[i, idx] = preprocessed_data['output_index'][char]
            i += 1
        idx += 1            
        

            
    idx = 0
    while idx < len(preprocessed_data['val_words']):
        w = preprocessed_data['val_words'][idx]
        t = preprocessed_data['val_translations'][idx]

        i = 0
        while i < len(w):
            char = w[i]
            val_input_data[i, idx] = preprocessed_data['input_index'][char]
            i += 1

        i = 0
        while i < len(t):
            char = t[i]
            val_output_data[i, idx] = preprocessed_data['output_index'][char]
            i += 1
        idx += 1            
        
            
    idx = 0
    while idx < len(preprocessed_data['test_words']):
        w = preprocessed_data['test_words'][idx]
        t = preprocessed_data['test_translations'][idx]

        i = 0
        while i < len(w):
            char = w[i]
            test_input_data[i, idx] = preprocessed_data['input_index'][char]
            i += 1

        i = 0
        while i < len(t):
            char = t[i]
            test_output_data[i, idx] = preprocessed_data['output_index'][char]
            i += 1
        idx += 1            
        
            
            
    output_data=torch.tensor(output_data, dtype = torch.int64)
    input_data = torch.tensor(input_data,dtype = torch.int64)
    val_output_data=torch.tensor(val_output_data, dtype = torch.int64)
    val_input_data = torch.tensor(val_input_data,dtype = torch.int64)
    test_output_data=torch.tensor(test_output_data, dtype = torch.int64)
    test_input_data= torch.tensor(test_input_data,dtype = torch.int64)
    
    tensors = {
        'input_data' : input_data,
        'output_data' : output_data,
        'val_input_data' : val_input_data,
        'val_output_data' : val_output_data, 
        'test_input_data' : test_input_data,
        'test_output_data' : test_output_data
    }
    return tensors

In [None]:
class Attention(nn.Module):
    def __init__(self, hidden_size):
        val=hidden_size
        super(Attention, self).__init__()
        self.hidden_size = val
    def dot_score(self,hidden_state, encoder_states):
        cal = torch.sum(hidden_state * encoder_states, dim=2)
        return cal
    def forward(self, hidden, encoder_outputs):
        cal = F.softmax(self.dot_score(hidden, encoder_outputs).t(), dim=1).unsqueeze(1)
        return cal

In [None]:
class Encoder_Attention(nn.Module):
    
    def __init__(self, params, preprocessed_data):
        hd='hidden_size'
        dp='dropout'
        super(Encoder_Attention, self).__init__() 
        self.bi_directional,self.cell_type = params['bi_dir'],params['cell_type']
        leng=len(preprocessed_data['input_index'])
        self.dropout,self.embedding = nn.Dropout(params[dp]),nn.Embedding(leng, params[es])
        self.hidden_size = params[hd]
        val=self.cell_type
        nm='num_layers_enc'
        es='embedding_size' 
        if val == 'GRU':
            self.cell = nn.GRU(params[es], params[hd], params[nm], dropout = params[dp], bidirectional = self.bi_directional)
        if val == 'RNN':
            self.cell = nn.RNN(params[es], params[hd], params[nm], dropout = params[dp], bidirectional = self.bi_directional)
        
    def forward(self, x):
        encoder_states, hidden = self.cell(self.dropout(self.embedding(x)))
        val=self.bi_directional
        if val:
            encoder_states = encoder_states[:, :, :self.hidden_size] + encoder_states[:, : ,self.hidden_size:]
        return encoder_states, hidden

In [None]:
class Decoder_Attention(nn.Module):
    def __init__(self, params, preprocessed_data):
        
        # Initialize the Decoder_Attention module
        super(Decoder_Attention, self).__init__()
        dp = 'dropout'
        
        # Define dropout layer and embedding layer
        self.dropout = nn.Dropout(params[dp])
        ct,nm = 'cell_type','num_layers_dec'
        self.num_layers,self.cell_type= params[nm] , params[ct]
        es = 'embedding_size'
        leng = len(preprocessed_data['output_index'])
        self.embedding = nn.Embedding(leng, params[es])
        hs = 'hidden_size'
        val=self.cell_type
        
        # Initialize RNN cell based on cell_type
        if val == 'GRU':
            self.cell = nn.GRU(params[es], params[hs], self.num_layers, dropout = params[dp])
        if val == 'RNN':
            self.cell = nn.RNN(params[es], params[hs], self.num_layers, dropout = params[dp])
        val = params[hs] * 2
        leng = len(preprocessed_data['output_index'])
        self.fc , self.concat = nn.Linear(params[hs], leng),nn.Linear(val, params[hs])
        self.log_softmax , self.attn = nn.LogSoftmax(dim=1),Attention(params[hs])

    def forward(self, x, encoder_states, hidden, cell):
        
        # Perform forward pass of the Decoder_Attention module
        # Embed input token (unsqueeze to add batch dimension)
        outputs, (hidden) = self.cell(self.dropout(self.embedding(x.unsqueeze(0))), hidden)
        context = self.attn(outputs, encoder_states).bmm(encoder_states.transpose(0, 1))
        return self.log_softmax(self.fc(torch.tanh(self.concat(torch.cat((outputs.squeeze(0), context.squeeze(1)), 1))))), hidden, attention_weights.squeeze(1)

In [None]:
# Define a sequence-to-sequence model with attention mechanism
class Seq2Seq_Attention(nn.Module):
    # Initialize the Seq2Seq_Attention module
    def __init__(self, encoder, decoder, params,  preprocessed_data):
        super(Seq2Seq_Attention, self).__init__()
        # Extract necessary parameters from the params dictionary
        nm='num_layers_dec'
        leng=len(preprocessed_data['output_index'])
        self.output_index_len = leng
        tf= 'teacher_fr'
        self.tfr = params[tf]
        self.num_layers_dec = params[nm]
        
        # Set encoder and decoder modules
        self.encoder  = encoder
        self.decoder  = decoder 
        
        
    def forward(self, source, target):
        # Perform forward pass of the Seq2Seq_Attention module
        
        # Initialize decoder input (start token)
        x = target[0,:]
        
        # Encode the source sequence using the encoder
        encoder_op, hidden = self.encoder(source)
        
        # Initialize output tensor for predictions
        outputs = torch.zeros(target.shape[0], source.shape[1], self.output_index_len)
        outputs=outputs.to(device)
        
        # Limit decoder hidden state to the specified number of layers
        hidden =  hidden[:self.decoder.num_layers]
        t = 1
        while t < target.shape[0] :  
            # Perform decoding step using the decoder module
            output, hidden, _ = self.decoder(x, encoder_op, hidden, None)
            # Determine the next input token based on teacher forcing ratio
            best_guess = output.argmax(1)
            # Store the output predictions
            outputs[t] = output
            x = best_guess if random.random() >= self.tfr else target[t]
            t+=1
        # Return the predicted outputs from the decoder    
        return outputs

In [None]:
# Define an Encoder module with attention using LSTM
class Encoder_Attention_LSTM(nn.Module):
    ct='cell_type'
    bi='bi_dir'
    def __init__(self, params, preprocessed_data):
        
        #Initialize the Encoder_Attention_LSTM module
        super(Encoder_Attention_LSTM, self).__init__()
        self.cell_type = params['cell_type']
        self.bi_directional = params['bi_dir']
        es='embedding_size'
        leng=len(preprocessed_data['input_index'])
        self.embedding = nn.Embedding(leng, params[es])
        dp='dropout'
        hs='hidden_size'
        self.hidden_size = params[hs]
        nm='num_layers_enc'
        self.dropout = nn.Dropout(params[dp])
        
        # Initialize LSTM cell
        self.cell = nn.LSTM(params[es], params[hs], params[nm], dropout = params[dp], bidirectional = self.bi_directional)

    def forward(self, x):
        # Perform forward pass of the Encoder_Attention_LSTM module
        
        # Embed the input sequence
        encoder_states, (hidden, cell) = self.cell(self.dropout(self.embedding(x)))
        
        # If bidirectional, concatenate the states from both directions
        val=self.bi_directional
        if val:
            encoder_states = encoder_states[:, :, :self.hidden_size] + encoder_states[:, : ,self.hidden_size:]
        
        # Return encoder, hidden and cell states    
        return encoder_states, hidden, cell

In [None]:
# Define a Decoder module with attention using LSTM
class Decoder_Attention_LSTM(nn.Module):
    def __init__(self, params, preprocessed_data):
        
        # Initialize the Decoder_Attention_LSTM module
        super(Decoder_Attention_LSTM, self).__init__()
        nm='num_layers_dec'
        dp='dropout'
        
        # Set up dropout layer
        self.dropout = nn.Dropout(params[dp])
        self.num_layers = params[nm]
        ot='output_index'
        
        # Initialize embedding layer based on output vocabulary size and embedding size
        leng=len(preprocessed_data[ot])
        es='embedding_size'
        self.embedding = nn.Embedding(leng, params[es])
        hs='hidden_size'
        
        # Initialize LSTM cell for decoding
        self.cell = nn.LSTM(params[es], params[hs], self.num_layers, dropout = params[dp])
        val=params[hs] * 2
        
        # Initialize attention mechanism and log softmax layer
        leng=len(preprocessed_data[ot])
        self.concat , self.fc = nn.Linear(val, params[hs]),nn.Linear(params[hs], leng)
        self.attn,self.log_softmax  = Attention(params[hs]),nn.LogSoftmax(dim=1)

    def forward(self, x, encoder_states, hidden, cell):
        # Perform forward pass of the Decoder_Attention_LSTM module
        
        # Embed the input token
        outputs, (hidden, cell) = self.cell(self.dropout(self.embedding(x.unsqueeze(0))), (hidden, cell))
        
        # Apply attention to the LSTM outputs and encoder states
        context = self.attn(outputs, encoder_states).bmm(encoder_states.transpose(0, 1))
        
        return self.log_softmax(self.fc(torch.tanh(self.concat(torch.cat((outputs.squeeze(0), context.squeeze(1)), 1))))), hidden, cell, self.attn(outputs, encoder_states).squeeze(1)

In [None]:
# Define a Sequence-to-Sequence model with attention using LSTM
class Seq2Seq_Attention_LSTM(nn.Module):
    def __init__(self, encoder, decoder, params,  preprocessed_data):
        nm='num_layers_dec'
        super(Seq2Seq_Attention_LSTM, self).__init__()
        leng=len(preprocessed_data['output_index'])
        
        # Extract parameters from params and preprocessed_data
        self.encoder  = encoder
        self.decoder  = decoder
        self.output_index_len = leng
        fr='teacher_fr'
        self.num_layers_dec = params[nm]
        self.tfr = params[fr]

    def forward(self, source, target):
         # Perform the forward pass of the Seq2Seq_Attention_LSTM module
        
        # Initialize the first input token (start of sequence) for decoding
        x = target[0,:]# Start with the first token in the target sequence
        
        # Encode the source sequence using the encoder
        encoder_op, hidden, cell = self.encoder(source)
        
        # Prepare a tensor to store the decoder outputs
        outputs = torch.zeros(target.shape[0], source.shape[1], self.output_index_len)
        outputs=outputs.to(device)
                
        # Restrict hidden and cell states to the decoder's number of layers
        hidden =  hidden[:self.decoder.num_layers]
        cell = cell[:self.decoder.num_layers]
        
        t=1
        while t< target.shape[0]:
            # Perform decoding step with attention using the decoder
            output, hidden, cell, _ = self.decoder(x, encoder_op, hidden, cell)
            
            # Determine the next input token based on teacher forcing ratio
            best_guess = output.argmax(1)
            
            # Store the decoder output in the outputs tensor
            outputs[t] = output
            x = best_guess if random.random() >= self.tfr else target[t]
            t+=1
        # Return the predicted outputs from the decoder for each time step    
        return outputs