In [14]:
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
import random
from datetime import datetime
import pandas

In [15]:
train_input=None
train_output=None
valid_input=None
valid_output=None
test_input=None
test_output=None

In [16]:
def loadData():
    global train_input, train_output, valid_input, valid_output, test_input, test_output
    data_train = pandas.read_csv('/kaggle/input/aksharantar-sampled/aksharantar_sampled/hin/hin_train.csv', header=None)
    train_input = data_train.iloc[:,0]
    train_output = data_train.iloc[:,1]
    data_valid = pandas.read_csv('/kaggle/input/aksharantar-sampled/aksharantar_sampled/hin/hin_valid.csv', header=None)
    valid_input = data_valid.iloc[:,0]
    valid_output = data_valid.iloc[:,1]
    data_test = pandas.read_csv('/kaggle/input/aksharantar-sampled/aksharantar_sampled/hin/hin_test.csv', header=None)
    test_input = data_test.iloc[:,0]
    test_output = data_test.iloc[:,1]

In [17]:
SOW_token = 0
EOW_token = 1
loadData()

In [18]:
class Dictionary:
    def __init__(self):
        self.char2index = {}
        self.index2char = {0: "#", 1: "$"}
        self.n_chars = 2  # Count SOS and EOS

    def allWords(self, words):
        for word in words:
            for c in word:
                if c not in self.char2index:
                    self.char2index[c]=self.n_chars
                    self.index2char[self.n_chars]=c
                    self.n_chars+=1
                
    def wordToTensor(self,word):
        a=[]
        for i in word:
            a.append(self.char2index[i])
        a.append(EOW_token)
        return torch.tensor(a, dtype=torch.long, device=device).view(-1, 1)
    
    def createBatches(self, words, batch_size):
        x=[]
        for word in words:
            x.append(self.wordToTensor(word))
        batches=[]
        for i in range(0,len(x),batch_size):
            if batch_size+i >= len(x):
                break
            temp=(nn.utils.rnn.pad_sequence(x[i:i+batch_size]).squeeze(2)).to(device)
            batches.append(temp)
        return batches
    
    def driver(self, words,batch_size):
        self.allWords(words)
        return self.createBatches(words,batch_size)

In [24]:
class EncoderRNN(nn.Module):
    def __init__(self, input_size, params):
        super(EncoderRNN, self).__init__()
        self.hidden_size = params["hidden_size"]
        self.dropout = nn.Dropout(params["dropout_encoder"])
        self.num_layers = params["num_layers"]
        self.batch_size = params["batch_size"]
        self.embedding_size = params["embedding_size"]
        self.cell_type = params["cell_type"]
        self.embedding = nn.Embedding(input_size, params["embedding_size"])
        self.bidirection = params["bidirection"]
        if(params["cell_type"] == "GRU"):
            self.gru = nn.GRU(params["embedding_size"], params["hidden_size"], params["num_layers"], dropout = params["dropout_encoder"], bidirectional = params["bidirection"])
        elif(params["cell_type"] == "LSTM"):
            self.lstm = nn.LSTM(params["embedding_size"], params["hidden_size"], params["num_layers"], dropout = params["dropout_encoder"], bidirectional = params["bidirection"])
        elif(params["cell_type"] == "RNN"):
            self.rnn = nn.RNN(params["embedding_size"], params["hidden_size"], params["num_layers"], dropout = params["dropout_encoder"], bidirectional = params["bidirection"])

    def forward(self, input, hidden):
        embedded = self.embedding(input).view(-1,self.batch_size, self.embedding_size)
        output = self.dropout(embedded)
        if(self.cell_type == "GRU"):
            _, hidden = self.gru(output, hidden)
        elif(self.cell_type == "LSTM"):
            _, (hidden, cell) = self.lstm(output)
#             print(hidden.shape, cell.shape)
        elif(self.cell_type == "RNN"):
            _, hidden = self.rnn(output, hidden)
        if self.bidirection:
            hidden = hidden.reshape(2, hidden.size(0)//2, hidden.size(1), hidden.size(2))
            hidden = torch.add(hidden[0]*0.5, hidden[1]*0.5)
            if(self.cell_type == "LSTM"):
                cell = cell.reshape(2, cell.size(0)//2, cell.size(1), cell.size(2))
                cell = torch.add(cell[0]*0.5, cell[1]*0.5)
        if self.cell_type == "LSTM":
            return hidden, cell
        else:
            return hidden

    def initHidden(self):
        if self.bidirection:
            return torch.zeros(2*self.num_layers, self.batch_size, self.hidden_size, device=device)
        else:
            return torch.zeros(self.num_layers, self.batch_size, self.hidden_size, device=device)

In [28]:
class DecoderRNN(nn.Module):
    def __init__(self, params, output_size):
        super(DecoderRNN, self).__init__()
        self.hidden_size = params["hidden_size"]
        self.dropout = nn.Dropout(params["dropout_decoder"])
        self.num_layers = params["num_layers"]
        self.batch_size = params["batch_size"]
        self.embedding_size = params["embedding_size"]
        self.embedding = nn.Embedding(output_size, params["embedding_size"])
        self.cell_type = params["cell_type"]
        if(params["cell_type"] == "GRU"):
            self.gru = nn.GRU(params["embedding_size"], params["hidden_size"], params["num_layers"], dropout = params["dropout_decoder"])
        elif(params["cell_type"] == "LSTM"):
            self.lstm = nn.LSTM(params["embedding_size"], params["hidden_size"], params["num_layers"], dropout = params["dropout_decoder"])
        elif(params["cell_type"] == "RNN"):
            self.rnn = nn.RNN(params["embedding_size"], params["hidden_size"], params["num_layers"], dropout = params["dropout_decoder"])
        self.out = nn.Linear(params["hidden_size"], output_size)
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, input, hidden):
        output = self.embedding(input).view(-1, self.batch_size, self.embedding_size)
        output = self.dropout(output)
        output = torch.relu(output)
        if(self.cell_type == "GRU"):
            output, hidden = self.gru(output, hidden)
        elif(self.cell_type == "LSTM"):
            output, (hidden, cell) = self.lstm(output, (hidden[0], hidden[1]))
        elif(self.cell_type == "RNN"):
            output, hidden = self.rnn(output, hidden)
        if self.cell_type == "LSTM":
            return self.softmax(self.out(output[0])), hidden, cell
        return self.softmax(self.out(output[0])), hidden

    def initHidden(self):
        return torch.zeros(self.num_layers, self.batch_size, self.hidden_size, device=device)

In [32]:
class Combine:
    def __init__(self,encoder, decoder, epochs, learning_rate, batch_size, embedding_size, cell_type, train_batch_input, train_batch_target, valid_batch_input):
        self.teacher_forcing_ratio = 0.5
        self.trainIters(encoder, decoder, epochs, learning_rate, batch_size, embedding_size, cell_type, train_batch_input, train_batch_target, valid_batch_input)
        
    def findAccuracy(self, input, actual_output, cell_type, n, batch_size):
        correct = 0
        for i in range(len(input)):
            output_word = self.evaluate(encoder1, decoder1, input[i], cell_type, batch_size)
            for j in range(i*batch_size, i*batch_size+batch_size):
                if(actual_output[j] == output_word[j-i*batch_size]):
                    correct += 1
        return correct/n*100
    

    def train(self, input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, batch_size, cell_type):
        encoder_hidden = encoder.initHidden()

        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()

        input_length = input_tensor.size(0)
        target_length = target_tensor.size(0)

        loss = 0
        if cell_type == "LSTM":
            encoder_hidden, encoder_cell = encoder(input_tensor, encoder_hidden)
        else:
            encoder_hidden = encoder(input_tensor, encoder_hidden)

        decoder_input = torch.tensor([SOW_token]*batch_size, device=device)

        decoder_hidden = encoder_hidden
        if cell_type == "LSTM":
            decoder_cell = encoder_cell

        use_teacher_forcing = True if random.random() < self.teacher_forcing_ratio else False

        if use_teacher_forcing:
            # Teacher forcing: Feed the target as the next input
            for di in range(target_length):
                if cell_type == "LSTM":
                    decoder_output, decoder_hidden, decoder_cell = decoder(decoder_input, (decoder_hidden, decoder_cell))
                else:
                    decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
                loss += criterion(decoder_output, target_tensor[di])
                decoder_input = target_tensor[di]  # Teacher forcing

        else:
            # Without teacher forcing: use its own predictions as the next input
            for di in range(target_length):
                if cell_type == "LSTM":
                    decoder_output, decoder_hidden, decoder_cell = decoder(decoder_input, (decoder_hidden, decoder_cell))
                else:
                    decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
                topv, topi = decoder_output.topk(1)
                decoder_input = topi.squeeze().detach()  # detach from history as input

                loss += criterion(decoder_output, target_tensor[di])

        loss.backward()
        encoder_optimizer.step()
        decoder_optimizer.step()

        return loss.item() / target_length
    
    def trainIters(self, encoder, decoder, epochs, learning_rate, batch_size, embedding_size, cell_type, train_batch_input, train_batch_target, valid_batch_input):
        plot_losses = []
        print_loss_total = 0  # Reset every print_every
        plot_loss_total = 0  # Reset every plot_every

        encoder_optimizer = optim.NAdam(encoder.parameters(), lr=learning_rate, weight_decay = 0.0005)
        decoder_optimizer = optim.NAdam(decoder.parameters(), lr=learning_rate, weight_decay = 0.0005)
        criterion = nn.CrossEntropyLoss()

        for epochNum in range(epochs):
            print("Epoch ", epochNum ," started ", datetime.now())
            for i in range(len(train_batch_input)):
                loss = self.train(train_batch_input[i], train_batch_target[i], encoder,
                             decoder, encoder_optimizer, decoder_optimizer, criterion, batch_size, cell_type)
                print_loss_total += loss*batch_size

            print_loss_avg = print_loss_total / len(train_input)
            print_loss_total = 0
            print("Average loss after ", epochNum+1, "epochs is ", print_loss_avg)
    #         train_accuracy = findAccuracy(train_batch_input, train_output, cell_type, len(train_input), batch_size)
    #         print("Train accuracy is ", train_accuracy)

            valid_accuracy = self.findAccuracy(valid_batch_input, valid_output, cell_type, len(valid_input), batch_size)
            print("Valid accuracy is ", valid_accuracy)
            
    def evaluate(self, encoder, decoder, input_tensors, cell_type, batch_size):
        with torch.no_grad():

            input_length = input_tensors.size(0)
            encoder_hidden = encoder.initHidden()

            if cell_type == "LSTM":
                encoder_hidden, encoder_cell = encoder(input_tensors, encoder_hidden)
            else:
                encoder_hidden = encoder(input_tensors, encoder_hidden)

            decoder_input = torch.tensor([SOW_token]*batch_size, device=device)  # SOW

            decoder_hidden = encoder_hidden
            if cell_type == "LSTM":
                decoder_cell = encoder_cell

            decoded_words = [""]*batch_size

            for di in range(input_length):

                if cell_type == "LSTM":
                    decoder_output, decoder_hidden, decoder_cell = decoder(decoder_input, (decoder_hidden, decoder_cell))
                else:
                    decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
                topv, topi = decoder_output.data.topk(1)
                for i in range(batch_size):
                    if topi[i].item() == EOW_token or topi[i] == 0:
                        continue
                    else:
                        decoded_words[i] += hin.index2char[topi[i].item()]

                decoder_input = topi.squeeze().detach()

            return decoded_words

In [33]:
params={"hidden_size":256,
        "num_layers":2,
        "dropout_encoder":0.2,
        "dropout_decoder":0.2,
        "batch_size":32,
        "embedding_size": 256,
        "epochs":2,
        "cell_type":"GRU",
        "learning_rate":0.001,
        "bidirection":True
        }

eng=Dictionary()
hin=Dictionary()
train_batch_input=eng.driver(train_input,params["batch_size"])
train_batch_target=hin.driver(train_output,params["batch_size"])
valid_batch_input=eng.driver(valid_input,params["batch_size"])

encoder1 = EncoderRNN(eng.n_chars, params).to(device)
decoder1 = DecoderRNN(params, hin.n_chars).to(device)

Combine(encoder1, decoder1, params["epochs"], params["learning_rate"], params["batch_size"], params["embedding_size"], params["cell_type"], train_batch_input, train_batch_target, valid_batch_input)

#print("Train Accuracy started ", datetime.now())
#train_batch_input = getBatchedTensorFromWords(train_input, batch_size, lang_input)
#train_accuracy = findAccuracy(train_batch_input, train_output, cell_type, len(train_input), batch_size)
#print("Train accuracy is ", train_accuracy)

#print("Test Accuracy started ", datetime.now())

#test_batch_input = getBatchedTensorFromWords(test_input, batch_size, lang_input)
#test_accuracy = findAccuracy(test_batch_input, test_output, cell_type, len(test_input), batch_size)
#print("Test accuracy is ", test_accuracy)
#print("Test Accuracy ended ", datetime.now())

Epoch  0  started  2023-05-19 03:57:14.627565
Average loss after  1 epochs is  0.9990693296526014
Valid accuracy is  19.23828125
Epoch  1  started  2023-05-19 03:58:08.797132
Average loss after  2 epochs is  0.6578222709421359
Valid accuracy is  24.21875


<__main__.Combine at 0x7a39bf28bd60>