In [1]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
from tqdm import tqdm
from torch.utils.data import DataLoader, Dataset
import random
import copy

In [2]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cuda'

In [3]:

# Hindi Unicode Hex Range is 2304:2432. Source: https://en.wikipedia.org/wiki/Devanagari_(Unicode_block)
SOS_token = 0
EOS_token = 1
hindi_alphabets = [chr(alpha) for alpha in range(2304, 2432)]
english_alphabets = [chr(alpha) for alpha in range(97, 123)]
hindi_alphabet_size = len(hindi_alphabets)
english_alphabet_size = len(english_alphabets)
hindi_alpha2index = {"SOS": 0,"EOS": 1}
english_alpha2index = {"SOS": 0,"EOS": 1}
for index, alpha in enumerate(hindi_alphabets):
    hindi_alpha2index[alpha] = index+2
for index, alpha in enumerate(english_alphabets):
    english_alpha2index[alpha] = index+2
hindi_index2alpha = {0 : "SOS", 1 : "EOS"}
english_index2alpha = { 0 : "SOS", 1 : "EOS"}
for index, alpha in enumerate(hindi_alphabets):
    hindi_index2alpha[index+2] = alpha
for index, alpha in enumerate(english_alphabets):
    english_index2alpha[index+2] = alpha 
print("Hindi A2I:\n", hindi_alpha2index)
print("-"*100)
print("English A2I:\n", english_alpha2index)
print("-"*100)
print("*"*100)
print("-"*100)
print("Hindi I2A:\n", hindi_index2alpha)
print("-"*100)
print("English I2A:\n", english_index2alpha)

Hindi A2I:
 {'SOS': 0, 'EOS': 1, 'ऀ': 2, 'ँ': 3, 'ं': 4, 'ः': 5, 'ऄ': 6, 'अ': 7, 'आ': 8, 'इ': 9, 'ई': 10, 'उ': 11, 'ऊ': 12, 'ऋ': 13, 'ऌ': 14, 'ऍ': 15, 'ऎ': 16, 'ए': 17, 'ऐ': 18, 'ऑ': 19, 'ऒ': 20, 'ओ': 21, 'औ': 22, 'क': 23, 'ख': 24, 'ग': 25, 'घ': 26, 'ङ': 27, 'च': 28, 'छ': 29, 'ज': 30, 'झ': 31, 'ञ': 32, 'ट': 33, 'ठ': 34, 'ड': 35, 'ढ': 36, 'ण': 37, 'त': 38, 'थ': 39, 'द': 40, 'ध': 41, 'न': 42, 'ऩ': 43, 'प': 44, 'फ': 45, 'ब': 46, 'भ': 47, 'म': 48, 'य': 49, 'र': 50, 'ऱ': 51, 'ल': 52, 'ळ': 53, 'ऴ': 54, 'व': 55, 'श': 56, 'ष': 57, 'स': 58, 'ह': 59, 'ऺ': 60, 'ऻ': 61, '़': 62, 'ऽ': 63, 'ा': 64, 'ि': 65, 'ी': 66, 'ु': 67, 'ू': 68, 'ृ': 69, 'ॄ': 70, 'ॅ': 71, 'ॆ': 72, 'े': 73, 'ै': 74, 'ॉ': 75, 'ॊ': 76, 'ो': 77, 'ौ': 78, '्': 79, 'ॎ': 80, 'ॏ': 81, 'ॐ': 82, '॑': 83, '॒': 84, '॓': 85, '॔': 86, 'ॕ': 87, 'ॖ': 88, 'ॗ': 89, 'क़': 90, 'ख़': 91, 'ग़': 92, 'ज़': 93, 'ड़': 94, 'ढ़': 95, 'फ़': 96, 'य़': 97, 'ॠ': 98, 'ॡ': 99, 'ॢ': 100, 'ॣ': 101, '।': 102, '॥': 103, '०': 104, '१': 105, '२': 106, '३': 107, '४': 108, '५'

In [4]:
data_train = pd.read_csv("hin_train.csv",header= None)
data_train = pd.DataFrame(np.array(data_train),columns=["English","Hindi"])
data_val = pd.read_csv("hin_valid.csv",header= None)
data_val = pd.DataFrame(np.array(data_val),columns=["English","Hindi"])
data_test = pd.read_csv("hin_test.csv",header= None)
data_test = pd.DataFrame(np.array(data_test),columns=["English","Hindi"])
print(data_train.shape,data_val.shape,data_test.shape)
data_train.head()

(51200, 2) (4096, 2) (4096, 2)


Unnamed: 0,English,Hindi
0,shastragaar,शस्त्रागार
1,bindhya,बिन्द्या
2,kirankant,किरणकांत
3,yagyopaveet,यज्ञोपवीत
4,ratania,रटानिया


In [5]:
data_train_X = np.array(data_train["English"])
data_train_y = np.array(data_train["Hindi"])
data_train_X,data_train_y

(array(['shastragaar', 'bindhya', 'kirankant', ..., 'asahmaton',
        'sulgaayin', 'anchuthengu'], dtype=object),
 array(['शस्त्रागार', 'बिन्द्या', 'किरणकांत', ..., 'असहमतों', 'सुलगायीं',
        'अंचुतेंगु'], dtype=object))

In [6]:
class Tokenize():
    def __init__(self,Lang_From,Lang_To):
        # Hindi Unicode Hex Range is 2304:2432. Source: https://en.wikipedia.org/wiki/Devanagari_(Unicode_block)
        self.L1 = Lang_From
        self.L2 = Lang_To
        self.SOS_token = 0
        self.EOS_token = 1
        hindi_alphabets = [chr(alpha) for alpha in range(2304, 2432)]
        english_alphabets = [chr(alpha) for alpha in range(97, 123)]
        hindi_alphabet_size = len(hindi_alphabets)
        english_alphabet_size = len(english_alphabets)
        hindi_alpha2index = {"SOS": 0,"EOS": 1}
        english_alpha2index = {"SOS": 0,"EOS": 1}
        for index, alpha in enumerate(hindi_alphabets):
            hindi_alpha2index[alpha] = index+2
        for index, alpha in enumerate(english_alphabets):
            english_alpha2index[alpha] = index+2
        hindi_index2alpha = {0 : "SOS", 1 : "EOS"}
        english_index2alpha = { 0 : "SOS", 1 : "EOS"}
        for index, alpha in enumerate(hindi_alphabets):
            hindi_index2alpha[index+2] = alpha
        for index, alpha in enumerate(english_alphabets):
            english_index2alpha[index+2] = alpha 

        self.Lang_From_Alpha_2_Index = english_alpha2index
        self.Lang_To_Alpha_2_Index = hindi_alpha2index
        self.Lang_From_Index_2_Alpha = english_index2alpha
        self.Lang_To_Index_2_Alpha = hindi_index2alpha

    def tensorFromWord(self,Lang, word):
        if Lang == "L1":
            indexes = [self.Lang_From_Alpha_2_Index[letter] for letter in word]
        elif Lang == "L2":
            indexes = [self.SOS_token]+[self.Lang_To_Alpha_2_Index[letter] for letter in word]
        #print([self.EOS_token]*(30-len(indexes)))
        indexes+=[self.EOS_token]*(30-len(indexes))
        return torch.tensor(indexes, dtype=torch.long, device=device)#.view(-1, 1)

    def tensorsFromPair(self,pair):
        input_tensor = self.tensorFromWord("L1",pair[self.L1])
        target_tensor = self.tensorFromWord("L2",pair[self.L2])
        return (input_tensor, target_tensor)
    def tensorsFromData(self,Data):
        Tensors_Val = []
        for i in tqdm(range(Data.shape[0])):
            Tensors_Val.append(self.tensorsFromPair(Data.iloc[i]))
        return Tensors_Val
    def WordFromtensors(self,Lang, word):
        if Lang == "L1":
            letters = [self.Lang_From_Index_2_Alpha[letter.item()] for letter in word if ((letter.item() != EOS_token) and (letter.item() != SOS_token))]
        elif Lang == "L2":
            letters = [self.Lang_To_Index_2_Alpha[letter.item()] for letter in word if ((letter.item() != EOS_token) and (letter.item() != SOS_token))]
        #print([self.EOS_token]*(30-len(indexes)))
        word = ''.join(letters)
        return word
    def PairFromtensors(self,pair):
        input_word = self.WordFromtensors("L1",pair[0])
        target_word = self.WordFromtensors("L2",pair[1])
        return (input_word, target_word)
    '''def DataFromtensors(self,Data):
        Tensors_Val = []
        for i in tqdm(range(Data.shape[0])):
            Tensors_Val.append(self.PairFromtensors(data_train.iloc[i]))
        return Tensors_Val'''
                                             
                                        

In [7]:
T = Tokenize("English","Hindi")
data_train_num = T.tensorsFromData(data_train)
data_val_num = T.tensorsFromData(data_val)
data_test_num = T.tensorsFromData(data_test)
#tensorFromWord(english_alpha2index,data_train_X[0])

100%|██████████| 51200/51200 [00:11<00:00, 4596.00it/s]
100%|██████████| 4096/4096 [00:00<00:00, 5729.96it/s]
100%|██████████| 4096/4096 [00:00<00:00, 5972.87it/s]


In [8]:
# Define a custom dataset class
class CustomDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

In [9]:
train_set=CustomDataset(data_train_num)
valid_set=CustomDataset(data_val_num)
test_set=CustomDataset(data_test_num)

In [10]:
train_data_set=DataLoader(train_set, batch_size=64, shuffle=True)
valid_data_set=DataLoader(valid_set, batch_size=64, shuffle=False)
test_data_set=DataLoader(test_set, batch_size=64, shuffle=False)

# Encoder

In [11]:
class Encoder(nn.Module):
    def __init__(self,input_size,embedding_size,hidden_size,num_layers, dropouts,cell_type,bidirectional):
        super(Encoder,self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.dropout = nn.Dropout(dropouts)
        self.embedding = nn.Embedding(input_size,embedding_size)
        self.cell_type = cell_type
        self.bidirectional = bidirectional
        if num_layers >1:
            if self.cell_type == "LSTM":
                self.rnn = nn.LSTM(embedding_size,hidden_size,num_layers,dropout=dropouts,bidirectional=self.bidirectional)
            elif self.cell_type == "RNN":
                self.rnn = nn.RNN(embedding_size,hidden_size,num_layers,dropout=dropouts,bidirectional=self.bidirectional)
            elif self.cell_type == "GRU":
                self.rnn = nn.GRU(embedding_size,hidden_size,num_layers,dropout=dropouts,bidirectional=self.bidirectional)
        else:
            if self.cell_type == "LSTM":
                self.rnn = nn.LSTM(embedding_size,hidden_size,num_layers,bidirectional=self.bidirectional)
            elif self.cell_type == "RNN":
                self.rnn = nn.RNN(embedding_size,hidden_size,num_layers,bidirectional=self.bidirectional)
            elif self.cell_type == "GRU":
                self.rnn = nn.GRU(embedding_size,hidden_size,num_layers,bidirectional=self.bidirectional)
                
    def forward(self,x):
        # X : (seq_length,N)
        embedding = self.dropout(self.embedding(x))
        # embedding : seq_length,N,embedding_size)
        if self.cell_type == "LSTM":
            outputs,(hidden,cell) = self.rnn(embedding)
        else:
            outputs,hidden = self.rnn(embedding)
            cell = None
        return hidden,cell
            

# Decoder

In [12]:
class Decoder(nn.Module):
    def __init__(self,input_size,embedding_size,hidden_size,output_size,num_layers,dropouts,cell_type,bidirectional):
        super(Decoder,self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.dropout = nn.Dropout(dropouts)
        self.embedding = nn.Embedding(input_size,embedding_size)
        self.cell_type = cell_type
        self.bidirectional = bidirectional
        if num_layers>1:            
            if self.cell_type == "LSTM":
                self.rnn = nn.LSTM(embedding_size,hidden_size,num_layers,dropout=dropouts,bidirectional=self.bidirectional)
            elif self.cell_type == "RNN":
                self.rnn = nn.RNN(embedding_size,hidden_size,num_layers,dropout=dropouts,bidirectional=self.bidirectional)
            elif self.cell_type == "GRU":
                self.rnn = nn.GRU(embedding_size,hidden_size,num_layers,dropout=dropouts,bidirectional=self.bidirectional)
        else:
            if self.cell_type == "LSTM":
                self.rnn = nn.LSTM(embedding_size,hidden_size,num_layers,bidirectional=self.bidirectional)
            elif self.cell_type == "RNN":
                self.rnn = nn.RNN(embedding_size,hidden_size,num_layers,bidirectional=self.bidirectional)
            elif self.cell_type == "GRU":
                self.rnn = nn.GRU(embedding_size,hidden_size,num_layers,bidirectional=self.bidirectional)
        self.fc = nn.Linear((1+self.bidirectional*1)*hidden_size,output_size)
        self.softmax = nn.LogSoftmax(dim=1)
    def forward(self,x,hidden,cell):
        # x :(N) but we want (1,N)
        x = x.unsqueeze(0)
        embedding = self.dropout(self.embedding(x))
        # embedding : (1,N,embedding_size)
        if self.cell_type == "LSTM":
            outputs,(hidden,cell) = self.rnn(embedding,(hidden,cell))
        else:
            outputs,hidden = self.rnn(embedding,hidden)
            cell = None
        # outputs : (1,N,hidden_size)
        predictions = self.fc(outputs)
        #predictions : (1,N,output_vocab_size)
        predictions = self.softmax(predictions[0])
        #predictions = predictions.squeeze(0)
        
        return predictions,hidden,cell


# Seq2Seq

In [13]:
class Seq2Seq(nn.Module):
    def __init__(self,encoder,decoder):
        super(Seq2Seq,self).__init__()
        self.encoder = encoder
        self.decoder = decoder
    
    def forward(self,source,target,teacher_forcing=0.5):
        batch_size = source.shape[1]
        self.target_len = target.shape[0]
        target_vocab_size = len(hindi_alpha2index)
        
        outputs = torch.zeros(self.target_len,batch_size,target_vocab_size).to(device)
        hidden,cell = self.encoder(source)
        
        # Start Token
        x = target[0]
        for t in range(1,self.target_len):
            output,hidden,cell = self.decoder(x,hidden,cell)
            outputs[t] = output
            best_guess = output.argmax(1)
            x = target[t] if random.random() < teacher_forcing else best_guess
        return outputs
    def predict(self,source):
        batch_size = source.shape[1]
        target_vocab_size = len(hindi_alpha2index)
        
        outputs = torch.zeros(self.target_len,batch_size,target_vocab_size).to(device)
        hidden,cell = self.encoder(source)
        
        # Start Token
        x = 0*source[0]
        for t in range(1,self.target_len):
            output,hidden,cell = self.decoder(x,hidden,cell)
            outputs[t] = output
            best_guess = output.argmax(1)
            x = best_guess
        return outputs
        

# Early Stopper

In [14]:
class EarlyStopper:
    def __init__(self, patience=1, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.max_validation_Acc = 0

    def early_stop(self, validation_Acc):
        if validation_Acc > self.max_validation_Acc:
            self.max_validation_loss = validation_Acc
            self.counter = 0
        elif validation_Acc < (self.max_validation_Acc + self.min_delta):
            self.counter += 1
            if self.counter >= self.patience:
                return True
        return False

# Train Model

In [15]:
def Train_Model(num_epochs = 10,learning_rate = 0.001,input_size_encoder = len(english_alpha2index),input_size_decoder = len(hindi_alpha2index),output_size = len(hindi_alpha2index),encoder_embeddings_size = 256,decoder_embeddings_size = 256,hidden_size = 512,num_enc_layers = 3,num_dec_layers = 3,enc_dropout = 0.2,dec_dropout = 0.2,cell_type = "LSTM",bidirectional = True):
    encoder_net = Encoder(input_size_encoder,encoder_embeddings_size,hidden_size,num_enc_layers,enc_dropout,cell_type,bidirectional).to(device)
    decoder_net = Decoder(input_size_decoder,decoder_embeddings_size,hidden_size,output_size,num_enc_layers,dec_dropout,cell_type,bidirectional).to(device)

    model = Seq2Seq(encoder_net,decoder_net).to(device)
    optimizer = optim.Adam(model.parameters(),lr = learning_rate)
    pad_idx = EOS_token
    criterion = nn.CrossEntropyLoss()#ignore_index=pad_idx)
    Loss_log = []
    Max_Acc=0
    for epoch in range(num_epochs):
        epoch_loss = 0
        for batch in tqdm(train_data_set):
            inp_data = batch[0].T.to(device)
            target = batch[1].T.to(device)
            #print(inp_data.shape)
            #print(inp_data)
            output = model(inp_data,target)
            #output : (trg_len,batch_size,output_dim)
            output = output[1:].reshape(-1,output.shape[2])
            target = target[1:].reshape(-1)

            optimizer.zero_grad()
            loss = criterion(output,target)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(),max_norm = 1)
            optimizer.step()
            epoch_loss += loss.item()
        Loss_log.append(epoch_loss)
        Train_epoch_loss = epoch_loss/len(train_data_set)

        Predictions_List = []
        Total = 0
        crct = 0
        Val_epoch_loss = 0
        for batch in valid_data_set:
            inp_data = batch[0].T.to(device)
            target = batch[1].T.to(device)
            output = model.predict(inp_data)
            #print(output_val[2])
            best_guess = output.argmax(2)
            predictions = best_guess.squeeze()
            #print(predictions.shape)
            output = output[1:].reshape(-1,output.shape[2])
            target = target[1:].reshape(-1)
            loss = criterion(output,target)
            Val_epoch_loss += loss.item()
            for i in range(batch[1].shape[0]):
                Pairs_P = T.PairFromtensors((batch[0][i],predictions.T[i]))
                Pairs_T = T.PairFromtensors((batch[0][i],batch[1][i]))
                Total+=1
                if Pairs_P[1] == Pairs_T[1]:
                    crct +=1
        Val_epoch_loss=Val_epoch_loss/len(valid_data_set)
        Val_Accuracy = crct/Total
        print("Epoch = [{}/{}] : Train_loss = {}, val_loss = {}, val_accuracy = {}".format(epoch,num_epochs,Train_epoch_loss,Val_epoch_loss,Val_Accuracy))
        if Val_Accuracy>Max_Acc:
            torch.save(model.state_dict(),'ME19B031_Attn_Model.model')
            Model_weights=copy.deepcopy(model.state_dict())
            Max_Acc=Val_Accuracy
    model.load_state_dict(Model_weights)    
    return model

In [16]:
model = Train_Model(num_epochs=2,learning_rate = 0.001,input_size_encoder = len(english_alpha2index),input_size_decoder = len(hindi_alpha2index),output_size = len(hindi_alpha2index),encoder_embeddings_size = 256,decoder_embeddings_size = 256,hidden_size = 512,num_enc_layers = 3,num_dec_layers = 3,enc_dropout = 0.2,dec_dropout = 0.2,cell_type = "LSTM",bidirectional = True):)

100%|██████████| 800/800 [12:33<00:00,  1.06it/s]


Epoch = [0/2] : Train_loss = 0.47205864734947683, val_loss = 0.29551906511187553, val_accuracy = 0.256103515625


100%|██████████| 800/800 [12:24<00:00,  1.07it/s]


Epoch = [1/2] : Train_loss = 0.2287938866764307, val_loss = 0.27811011392623186, val_accuracy = 0.3193359375


# Validation

In [17]:
Valid_Predictions_List = []
Total = 0
crct = 0
for batch in valid_data_set:
    inp_data = batch[0].T.to(device)
    output_val = model.predict(inp_data)
    #print(output_val[2])
    best_guess = output_val.argmax(2)
    predictions = best_guess.squeeze()
    #print(predictions.shape)
    for i in range(batch[1].shape[0]):
        Pairs_P = T.PairFromtensors((batch[0][i],predictions.T[i]))
        Pairs_T = T.PairFromtensors((batch[0][i],batch[1][i]))
        Total+=1
        if Pairs_P[1] == Pairs_T[1]:
            crct +=1
        Valid_Predictions_List.append([Pairs_T[0],Pairs_T[1],Pairs_P[1]])
print("Validation Accuracy =",crct/Total)

Validation Accuracy = 0.31640625
