In [None]:
import os
import pandas as pd
import torch
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, Dataset

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
class Vocabulary():
    """
    This class(Vocabulary), builds a character-level vocabulary for a given list of words.
    It initializes the vocabulary with four special tokens (PAD, SOW, EOW, and UNK) and creates
    two dictionaries (stoi and itos) to map characters to integers and vice versa.
    Tokenizer: Tokenizes a given text into individual characters.
    build_vocabulary(): Takes a list of words and adds each unique character 
    to the vocabulary, along with a unique integer ID.
    numericalize(): Converts a given text into a list of integers, where each 
    integer corresponds to the ID of a character in the vocabulary.
    
    """
    def __init__(self):
        self.itos = {0:"<PAD>",1:"<SOW>",2:"<EOW>",3:"<UNK>"}
        self.stoi = {"<PAD>":0,"<SOW>":1,"<EOW>":2,"<UNK>":3}
        #self.freq_threshold = freq_threshold
    
    def __len__(self):
        return len(self.itos)
    
    @staticmethod
    def tokenizer(text):
        return [*text]
    
    def build_vocabulary(self, word_list):
        char_list = []
        idx = 4
        
        for word in word_list:
            for char in self.tokenizer(word):
                if char not in char_list:
                    char_list.append(char)
                    self.stoi[char] = idx
                    self.itos[idx] = char
                    idx+=1
                    
                    
    def numericalize(self, text):
        tokenized_text = self.tokenizer(text)
        
        return [self.stoi[token] if token in self.stoi else self.stoi["<UNK>"] for token in tokenized_text]
                 

In [None]:
class aksharantar(Dataset):
    """
    This class used to process text data for a machine translation task.
    root_dir: the root directory where the data is stored
    out_lang: the target language for translation 
    dataset_type: either "train", "test", or "val" indicating which dataset is being used.
    After loadinf data __init__() builds the vocabulary for each language by adding all unique characters in 
    the language's text data to the corresponding Vocabulary object.
    The __getitem__() method takes an index and returns the numericalized form of the corresponding input 
    and output sentences.
    It tokenizes each sentence into characters and adds special start-of-word (<SOW>) and end-of-word (<EOW>) 
    tokens to the beginning and end of the numericalized output sentence.
    Finally, it returns PyTorch tensors of the numericalized input and output sentences.
    
    """
        
    def __init__(self, root_dir, out_lang, dataset_type): 
        
        # Read file
        self.file_name = out_lang + "_" + dataset_type + ".csv"
        self.file_dir = os.path.join(root_dir, out_lang, self.file_name)
        self.df = pd.read_csv(self.file_dir, names = ["latin", "hindi"])
        
        # Get columns of input and output language
        self.latin = self.df["latin"]
        self.hindi = self.df["hindi"]
        
        # Initialize vocabulary and build vocab
        self.vocab_eng = Vocabulary()
        self.vocab_eng.build_vocabulary(self.latin.tolist())
        
        # Initialize vocabulary and build vocab
        self.vocab_hin = Vocabulary()
        self.vocab_hin.build_vocabulary(self.hindi.tolist())
        
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, index):
        latin = self.latin[index]
        hindi = self.hindi[index]
        
        numericalized_hindi = [self.vocab_hin.stoi["<SOW>"]]
        numericalized_hindi += self.vocab_hin.numericalize(hindi)
        numericalized_hindi.append(self.vocab_hin.stoi["<EOW>"])
        
        numericalized_latin = [self.vocab_eng.stoi["<SOW>"]]
        numericalized_latin += self.vocab_eng.numericalize(latin)
        numericalized_latin.append(self.vocab_eng.stoi["<EOW>"])
        
        return torch.tensor(numericalized_latin), torch.tensor(numericalized_hindi) 
               
        

In [None]:
class MyCollate:
    """
    This class is used to collate the data items into batches for DataLoader. 
    It takes two arguments, pad_idx_eng and pad_idx_hin, which are the index of the <PAD> token
    in the English and Hindi vocabularies respectively.
    
    
    """
    def __init__(self, pad_idx_eng, pad_idx_hin):
        self.pad_idx_eng = pad_idx_eng
        self.pad_idx_hin = pad_idx_hin
        
    def __call__(self, batch):
        inputs = [item[0] for item in batch]
        inputs = pad_sequence(inputs, batch_first=False, padding_value=self.pad_idx_eng)
        
        targets = [item[1] for item in batch]
        targets = pad_sequence(targets, batch_first=False, padding_value=self.pad_idx_hin)
        
        return inputs, targets
        
        

In [None]:
def get_loader(root_dir, out_lang, dataset_type, batch_size, pin_memory=True ):
    """
    This class returns a PyTorch DataLoader object and a custom dataset object. 
    The DataLoader object loads the data in batches.
    
    """
    
    dataset = aksharantar(root_dir, out_lang, dataset_type)
    
    pad_idx_eng = dataset.vocab_eng.stoi["<PAD>"]
    pad_idx_hin = dataset.vocab_hin.stoi["<PAD>"]
    
    loader = DataLoader(dataset=dataset,batch_size=batch_size,
                       pin_memory=pin_memory,
                       collate_fn=MyCollate(pad_idx_eng=pad_idx_eng, pad_idx_hin=pad_idx_hin),
                       shuffle=True)
    return loader, dataset


In [None]:
class Encoder(nn.Module):
    """
    This code defines an Encoder class for a sequence-to-sequence model.
    The class takes in an input size, embedding size, hidden size, 
    number of layers, dropout rate, cell type (GRU, LSTM, or RNN), 
    and whether the network is bidirectional. The forward method takes in 
    an input tensor x, applies dropout to its embedded representation, and 
    passes it through a GRU, LSTM, or RNN layer depending on the cell type specified. 
    The final hidden states of the layer(s) are returned.
    
    """
    #input_size represents the dimensionality of the 
    #encoder's input space, indicating the number of possible input tokens or
    #categories that the coder can generate.
    
    def __init__(self, input_size, embedding_size, hidden_size, num_layers, p, cell_type, bi_directional):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.cell_type = cell_type
        self.dropout = nn.Dropout(p)
        
        if bi_directional:
            self.fc_hidden = nn.Linear(2*hidden_size, hidden_size)
        else:
            self.fc_hidden = nn.Linear(hidden_size, hidden_size) 
            
        if bi_directional:
            self.fc_cell = nn.Linear(2*hidden_size, hidden_size)
        else:
            self.fc_cell = nn.Linear(hidden_size, hidden_size) 
            
        self.embedding = nn.Embedding(input_size, embedding_size)
        self.gru = nn.GRU(embedding_size, hidden_size, num_layers, dropout=p, bidirectional=bi_directional)
        self.lstm = nn.LSTM(embedding_size, hidden_size, num_layers, dropout=p,bidirectional=bi_directional)
        self.rnn = nn.RNN(embedding_size, hidden_size, num_layers, dropout=p,bidirectional=bi_directional)

    def forward(self, x):
        # x, shape=(seq_length, N)
        embedding = self.dropout(self.embedding(x))
        # embedding shape = (seq_length, N,embedding_size )
        
        if self.cell_type == 'gru':
            encoder_states, hidden = self.gru(embedding)
            hidden = self.fc_hidden(torch.cat((hidden[0:1], hidden[1:2]), dim=2)
            return encoder_states, hidden
        
        if self.cell_type == 'lstm':
            encoder_states, (hidden, cell) = self.lstm(embedding)
            hidden = self.fc_hidden(torch.cat((hidden[0:1], hidden[1:2]), dim=2))
            cell = self.fc_cell(torch.cat((cell[0:1], cell[1:2]), dim=2))
            return encoder_states, hidden, cell
        
        if self.cell_type == 'rnn':
            encoder_states, hidden = self.rnn(embedding)
            hidden = self.fc_hidden(torch.cat((hidden[0:1], hidden[1:2]), dim=2)
            return encoder_states, hidden
        
          
    # This method is called at the beginning of each new input sequence
    # to reset the hidden state.
#     def initHidden(self):
#         return torch.zeros(1, 1, self.hidden_size, device=device)

In [2]:
class Decoder(nn.Module):
    """
    This code defines the Decoder class, which is responsible for decoding the encoded input sequence
    and generating the target sequence. 
    The method first unsqueezes x to add a batch dimension and then applies dropout to the embedding layer. 
    It then passes the embedded input sequence through the decoder's RNN layer, 
    which can be either GRU, LSTM, or RNN.
    Then passes the output through a linear layer to get the predictions, which are returned 
    along with the hidden and cell states.
    Finally, the method squeezes the predictions tensor to remove the batch dimension before returning 
    the predictions and hidden/cell states.
    
    """
    def __init__(self, input_size, embedding_size, hidden_size, output_size, num_layers,
                 p, cell_type, bi_directional ):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.cell_type = cell_type
        self.dropout = nn.Dropout(p)
        self.energy = nn.Linear(hidden_size * 3, 1)
        self.softmax = nn.Softmax(dim=0)
        self.relu = nn.ReLU()
        self.embedding = nn.Embedding(input_size, embedding_size)
        self.gru = nn.GRU(hidden_size * 2 + embedding_size, hidden_size, num_layers, dropout=p,bidirectional=bi_directional )
        self.lstm = nn.LSTM(hidden_size * 2 + embedding_size, hidden_size,num_layers, dropout=p, bidirectional=bi_directional)
        self.rnn = nn.RNN(hidden_size * 2 + embedding_size, hidden_size,num_layers, dropout=p, bidirectional=bi_directional)
        if bi_directional:
            self.fc = nn.Linear(2*hidden_size, output_size)
        else:
            self.fc = nn.Linear(hidden_size, output_size)        
        
    def forward(self, x, encoder_states, hidden, cell):
        # x, shape=(N) but we want (1, N)
        x = x.unsqueeze(0)
        
        embedding = self.dropout(self.embedding(x))
        # embedding shape = (1, N,embedding_size )
        
        sequence_length = encoder_states.shape[0]
        h_reshaped = hidden.repeat(sequence_length, 1, 1)
        
        energy = self.relu(self.energy(torch.cat((h_reshaped, encoder_states), dim=2)))
        
        attention = self.softmax(energy)
        # attention: (seq_length, N, 1)
        
        context_vector = torch.einsum("snk,snl->knl", attention, encoder_states)

        rnn_input = torch.cat((context_vector, embedding), dim=2)
        # rnn_input: (1, N, hidden_size*2 + embedding_size)
        
        if self.cell_type == 'gru':
            outputs, hidden = self.gru(rnn_input, hidden)
            #shape of output (1,N,hidden_size)
            
        if self.cell_type == 'lstm':
            outputs, (hidden, cell) = self.lstm(rnn_input, (hidden, cell))
            
        if self.cell_type == 'rnn':
            outputs, hidden = self.rnn(rnn_input, hidden)
            
        predictions = self.fc(outputs).squeeze(0)
        # shape of predictions = (1, N, length_of_vocabs)
        
        
        if self.cell_type == 'lstm':
            return predictions, hidden, cell
        else:
            return predictions, hidden
    

In [None]:
class Seq2Seq(nn.Module):
    """
    This class have functions which takes words as input and target words to find the 
    predictions using the model build in the forward function.
    This function uses the encoder and decoder formed earlier.
    
    """
    def __init__(self, encoder, decoder, cell_type):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.cell_type = cell_type
        
    def forward(self, word_input, word_target, teacher_force_ratio=0.5):
        
        batch_size = word_input.shape[1]
        target_length = word_target.shape[0]
        
        outputs = torch.zeros(target_length, batch_size, len(train_data.vocab_hin)).to(device)
        
        if self.cell_type == 'lstm':
            encoder_states, hidden, cell = self.encoder(word_input)
        else:
            encoder_states, hidden = self.encoder(word_input)
            
        # grab start token
        x= word_target[0]
        
        for t in range(1, target_length):
            if self.cell_type == "lstm":
                output, hidden, cell = self.decoder(x, encoder_states, hidden, cell)
            else:
                output, hidden = self.decoder(x, encoder_states, hidden, 0)
                
            outputs[t] = output
            
            best_pred = output.argmax(1)
            
            x = word_target[t] if random.random() < teacher_force_ratio else best_pred
            
        return outputs
    