In [1]:
import os
import wandb
import torch
import torch.nn as nn
import random
from torch.autograd import Variable
from torch.utils.data import DataLoader
import pandas as pd
import torch.optim as optim
import torch.nn.functional as Function
import argparse


In [2]:
SYMBOL_BEGIN, SYMBOL_END, SYMBOL_UNKNOWN, SYMBOL_PADDING = 0, 1, 2, 3

INPUT_LABEL = "input"
TARGET_LABEL = "target"
DELIMETER = ","

RNN_KEY = "RNN"
GRU_KEY = "GRU"
LSTM_KEY = "LSTM"

INPUT_LANG_KEY = "input_lang"
OUTPUT_LANG_KEY = "output_lang"
PAIRS_KEY = "pairs"
MAX_LEN_KEY = "max_len"

input_lang = "eng"
TARGET_LANG = "hin"

TRAIN_LABEL = "train"
TEST_LABEL = "test"
VALID_LABEL = "valid"

DEFAULT_PATH = "/kaggle/input/aksharantar-sampled/aksharantar_sampled"
TRAIN_DATASET_PATH = f"{DEFAULT_PATH}/{TARGET_LANG}/{TARGET_LANG}_{TRAIN_LABEL}.csv"
VALIDATION_DATASET_PATH = f"{DEFAULT_PATH}/{TARGET_LANG}/{TARGET_LANG}_{VALID_LABEL}.csv"
TEST_DATASET_PATH = f"{DEFAULT_PATH}/{TARGET_LANG}/{TARGET_LANG}_{TEST_LABEL}.csv"

NADAM_KEY = "Nadam"


is_gpu = torch.cuda.is_available()


# Set the device type to CUDA if available, otherwise use CPU
if is_gpu:
    device = torch.device("cuda")
else:
    device = torch.device("cpu")


In [3]:
class Vocabulary:
    def __init__(self):
        self.str_count,self.int_encodding = dict(),dict()
        self.n_chars = 4
        self.str_encodding = {0: "<", 1: ">", 2: "?", 3: "."}

    def addWord(self, word):
        for char in word:
            try:
                self.str_count[char] += 1
            except:
                self.int_encodding[char] = self.n_chars
                self.str_encodding[self.n_chars] = char
                self.str_count[char] = 1
                self.n_chars += 1

# prepareDataWithoutAttn
def prepareData(dir):
    data = pd.read_csv(dir, sep=DELIMETER, names=[INPUT_LABEL, TARGET_LABEL])

    max_input_length = data[INPUT_LABEL].apply(len).max()
    max_target_length = data[TARGET_LABEL].apply(len).max()
    
    max_len=max(max_input_length,max_target_length)

    input_lang, output_lang = Vocabulary(), Vocabulary()

    pairs = pd.concat([data[INPUT_LABEL], data[TARGET_LABEL]], axis=1).values.tolist()

    for pair in pairs:
        input_lang.addWord(pair[0])
        output_lang.addWord(pair[1])

    return input_lang,output_lang,pairs,max_len

# helpTensorWithoutAttn
def helpTensor(lang, word, max_length):
    index_list = []
    for char in word:
        try:
            index_list.append(lang.char2index[char])
        except:
            index_list.append(SYMBOL_UNKNOWN)

    indexes = index_list
    indexes.append(SYMBOL_END)
    n = len(indexes)
    indexes.extend([SYMBOL_PADDING] * (max_length - n))
    result = torch.LongTensor(indexes)
    if is_gpu:
        return result.cuda()
    return result


# MakeTensorWithoutAttn
def makeTensor(input_lang, output_lang, pairs, reach):
    res = []
    for i in range(len(pairs)):
        # Convert input and target sequences to tensors using the helpTensorWithoutAttn function
        input_variable = helpTensor(input_lang, pairs[i][0], reach)
        target_variable = helpTensor(output_lang, pairs[i][1], reach)
        res.append((input_variable, target_variable))
    return res



In [6]:
# accuracyWithoutAttn
def accuracy(encoder, decoder, loader, batch_size, criterion, cell_type, num_layers_enc, max_length, output_lang):
    with torch.no_grad():
        total = 0
        correct = 0

        for batch_x, batch_y in loader:
            # Initialize encoder hidden state
            encoder_hidden = encoder.initHidden(batch_size, num_layers_enc)

            input_variable = Variable(batch_x.transpose(0, 1))
            target_variable = Variable(batch_y.transpose(0, 1))

            # Check if LSTM and initialize cell state
            if cell_type == "LSTM":
                encoder_cell_state = encoder.initHidden(batch_size, num_layers_enc)
                encoder_hidden = (encoder_hidden, encoder_cell_state)

            # input_length = input_variable.size()[0]
            # target_length = target_variable.size()[0]

            output = torch.LongTensor(target_variable.size()[0], batch_size)

            # Initialize encoder outputs
            # encoder_outputs = Variable(torch.zeros(max_length, batch_size, encoder.hidden_size))
            # encoder_outputs = encoder_outputs.cuda() if use_cuda else encoder_outputs

            # Encoder forward pass
            for ei in range(input_variable.size()[0]):
                encoder_hidden = encoder(input_variable[ei], batch_size, encoder_hidden)[1]

            decoder_input = Variable(torch.LongTensor([SYMBOL_BEGIN] * batch_size))
            decoder_input = decoder_input.cuda() if is_gpu else decoder_input

            decoder_hidden = encoder_hidden

            # Decoder forward pass
            for di in range(target_variable.size()[0]):
                decoder_output, decoder_hidden = decoder(decoder_input, batch_size, decoder_hidden)
                topi = decoder_output.data.topk(1)[1]
                output[di] = torch.cat(tuple(topi))
                decoder_input = torch.cat(tuple(topi))

            output = output.transpose(0, 1)

            # Calculate accuracyWithoutAttn
            for di in range(output.size()[0]):
                ignore = [SYMBOL_BEGIN, SYMBOL_END, SYMBOL_PADDING]
                sent = [output_lang.str_encodding[letter.item()] for letter in output[di] if letter not in ignore]
                y = [output_lang.str_encodding[letter.item()] for letter in batch_y[di] if letter not in ignore]
                if sent == y:
                    correct += 1
                total += 1

    return (correct / total) * 100


# calc_lossWithoutAttn
def calc_loss(encoder, decoder, input_tensor, target_tensor, batch_size, encoder_optimizer, decoder_optimizer, criterion, cell_type, num_layers_enc, max_length, is_training, teacher_forcing_ratio=0.5):
    # Initialize the encoder hidden state
    output_hidden = encoder.initHidden(batch_size, num_layers_enc)

    # Check if LSTM and initialize cell state
    if cell_type == LSTM_KEY:
        encoder_cell_state = encoder.initHidden(batch_size, num_layers_enc)
        output_hidden = (output_hidden, encoder_cell_state)

    # Zero the gradients
    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    # Get input and target sequence lengths
    # input_length = input_tensor.size(0)
    # target_length = target_tensor.size(0)

    # Initialize loss
    loss = 0

    # Encoder forward pass
    for ei in range(input_tensor.size(0)):
        output_hidden = encoder(input_tensor[ei], batch_size, output_hidden)[1]

    # Initialize decoder input
    decoder_input = torch.LongTensor([SYMBOL_BEGIN] * batch_size)
    decoder_input = decoder_input.cuda() if is_gpu else decoder_input

    # Determine if using teacher forcing
    use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False

    # Loop over target sequence
    if is_training:
        # Training phase
        for di in range(target_tensor.size(0)):
            decoder_output, output_hidden = decoder(decoder_input, batch_size, output_hidden)
            decoder_input = target_tensor[di] if use_teacher_forcing else decoder_output.argmax(dim=1)
            loss = criterion(decoder_output, target_tensor[di]) + loss
    else:
        # Validation phase
        with torch.no_grad():
            for di in range(target_tensor.size(0)):
                decoder_output, output_hidden = decoder(decoder_input, batch_size, output_hidden)
                loss += criterion(decoder_output, target_tensor[di])
                decoder_input = decoder_output.argmax(dim=1)

    # Backpropagation and optimization in training phase
    if is_training:
        loss.backward()
        encoder_optimizer.step()
        decoder_optimizer.step()

    # Return the average loss per target length
    return loss.item() / target_tensor.size(0)


# Train and evaluate the Seq2SeqWithoutAttn model
def seq2seq(encoder, decoder, train_loader, val_loader, test_loader, lr, optimizer, epochs, max_length_word, num_layers_enc, output_lang,batch_size,cell_type):
    max_length = max_length_word - 1
    # Define the optimizer and criterion
    encoder_optimizer = optim.NAdam(encoder.parameters(), lr=lr) if optimizer == "nadam" else optim.Adam(encoder.parameters(), lr=lr)
    decoder_optimizer = optim.NAdam(decoder.parameters(), lr=lr) if optimizer == "nadam" else optim.Adam(decoder.parameters(), lr=lr)
    criterion = nn.NLLLoss()

    for epoch in range(epochs):
        train_loss_total = 0
        val_loss_total = 0

        # Training phase
        for batch_x, batch_y in train_loader:
            batch_x = Variable(batch_x.transpose(0, 1))
            batch_y = Variable(batch_y.transpose(0, 1))
            # Calculate the training loss
            loss = calc_loss(encoder, decoder, batch_x, batch_y, batch_size, encoder_optimizer, decoder_optimizer, criterion, cell_type, num_layers_enc, max_length, is_training=True)
            train_loss_total += loss

        train_loss_avg = train_loss_total / len(train_loader)
        print(f"Epoch: {epoch} | Train Loss: {train_loss_avg:.4f} |", end="")

        # Validation phase
        for batch_x, batch_y in val_loader:
            batch_x = Variable(batch_x.transpose(0, 1))
            batch_y = Variable(batch_y.transpose(0, 1))
            # Calculate the validation loss
            loss = calc_loss(encoder, decoder, batch_x, batch_y, batch_size, encoder_optimizer, decoder_optimizer, criterion, cell_type, num_layers_enc, max_length, is_training=False)
            val_loss_total += loss

        val_loss_avg = val_loss_total / len(val_loader)
        print(f"Val Loss: {val_loss_avg:.4f} |", end="")

        # Calculate validation accuracyWithoutAttn
        val_acc = accuracy(encoder, decoder, val_loader, batch_size, criterion, cell_type, num_layers_enc, max_length, output_lang)
        val_acc /= 100
        print(f"Val Accuracy: {val_acc:.4%}")
        
        if epochs-1==epoch :
            test_acc = accuracy(encoder, decoder, test_loader, batch_size, criterion, cell_type, num_layers_enc, max_length, output_lang)
            test_acc /= 100
            print(f"Test Accuracy: {test_acc:.4%}")
            



# EncoderRNNWithoutAttn
class EncoderRNN(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, num_layers_encoder, cell_type, drop_out, bi_directional):
        super(EncoderRNN, self).__init__()

        self.emb_n = embedding_size
        self.hid_n = hidden_size
        self.encoder_n = num_layers_encoder
        self.model_key = cell_type
        self.is_dropout = drop_out
        self.is_bi_dir = bi_directional

        self.embedding = nn.Embedding(input_size, self.emb_n)
        self.dropout = nn.Dropout(self.is_dropout)

        cell_map = dict({RNN_KEY: nn.RNN, GRU_KEY: nn.GRU, LSTM_KEY: nn.LSTM})
        self.cell_layer = cell_map[self.model_key](
            input_size = self.emb_n,
            hidden_size = self.hid_n,
            num_layers=self.encoder_n,
            dropout=self.is_dropout,
            bidirectional=self.is_bi_dir,
        )

    def forward(self, input, batch_size, hidden):
        embedded = self.dropout(self.embedding(input).view(1, batch_size, -1))

        output, hidden = self.cell_layer(embedded, hidden)
        return output, hidden

    def initHidden(self, batch_size, num_layers_enc):
        if self.is_bi_dir:
            weights = torch.zeros(num_layers_enc * 2 , batch_size, self.hid_n)
        else:
            weights = torch.zeros(num_layers_enc, batch_size, self.hid_n)

        if is_gpu:
            return weights.cuda()
        return weights
    
# DecoderRNNWithoutAttn
class DecoderRNN(nn.Module):
    def __init__(self, embedding_size, hidden_size, num_layers_decoder, cell_type, drop_out, bi_directional, output_size):
        super(DecoderRNN, self).__init__()

        self.emb_n = embedding_size
        self.hid_n = hidden_size
        self.decoder_n = num_layers_decoder
        self.model_key = cell_type
        self.is_dropout = drop_out
        self.is_bi_dir = bi_directional

        # Create an embedding layer
        self.embedding = nn.Embedding(output_size, self.emb_n)
        self.dropout = nn.Dropout(self.is_dropout)

        cell_map = {RNN_KEY: nn.RNN, GRU_KEY: nn.GRU, LSTM_KEY: nn.LSTM}
        self.cell_layer = cell_map[self.model_key](
            input_size = self.emb_n,
            hidden_size = self.hid_n,
            num_layers=self.decoder_n,
            dropout=self.is_dropout,
            bidirectional=self.is_bi_dir,
        )

        # Linear layer for output
        if self.is_bi_dir :
            self.out = nn.Linear(self.hid_n * 2, output_size)
        else:
            self.out = nn.Linear(self.hid_n,output_size)

        # Softmax activation
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, input, batch_size, hidden):
        output = Function.relu(self.dropout(self.embedding(input).view(1, batch_size, -1)))
        output, hidden = self.cell_layer(output, hidden)

        output = self.softmax(self.out(output[0]))
        return output, hidden


def train(flag):
    optimizer = NADAM_KEY
    alpha = 0.001
    hl_size = 256
    model_key = LSTM_KEY
    encoder_n = 2
    num_layers_decoder = 2
    dropout_val = 0.2
    epochs = 5
    embedding_size = 256
    is_bi_dir = False
    batch_size = 32

    if flag:
        pass
    else:
        # Prepare training data
        input_langs,output_langs,pairs,max_len = prepareData(TRAIN_DATASET_PATH)
        print("train:sample:", random.choice(pairs))
        train_n = len(pairs)
        print(f"Number of training examples: {train_n}")

        # Prepare validation data
        input_langs,output_langs,val_pairs,max_len_val = prepareData(VALIDATION_DATASET_PATH)
        val_n = len(val_pairs)
        print("validation:sample:", random.choice(val_pairs))
        print(f"Number of validation examples: {val_n}")

        # Prepare test data
        input_langs,output_langs,test_pairs,max_len_test = prepareData(TEST_DATASET_PATH)
        test_n = len(test_pairs)
        print("Test:sample:", random.choice(test_pairs))
        print(f"Number of Test examples: {test_n}")

        max_len = max(max_len, max(max_len_val, max_len_test)) + 4
        print(max_len)

        # Convert data to tensors and create data loaders
        pairs = makeTensor(input_langs, output_langs, pairs, max_len)
        val_pairs = makeTensor(input_langs, output_langs, val_pairs, max_len)
        test_pairs = makeTensor(input_langs, output_langs, test_pairs, max_len)

        train_loader = DataLoader(dataset = pairs, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(dataset = val_pairs, batch_size=batch_size, shuffle=True)
        test_loader = DataLoader(dataset = test_pairs, batch_size=batch_size, shuffle=True)

        # Create the encoder and decoder models
        encoder1 = EncoderRNN(input_langs.n_chars, embedding_size, hl_size, encoder_n, model_key, dropout_val, is_bi_dir)
        decoder1 = DecoderRNN(embedding_size, hl_size, encoder_n, model_key, dropout_val, is_bi_dir, output_langs.n_chars)

        if is_gpu:
            encoder1, decoder1 = encoder1.cuda(), decoder1.cuda()

        print("vanilla seq2seqWithoutAttn")
        # Train and evaluate the Seq2SeqWithoutAttn model
        seq2seq(encoder1, decoder1, train_loader, val_loader, test_loader, alpha, optimizer, epochs, max_len, encoder_n, output_langs, batch_size = 32, cell_type = model_key)


In [None]:
train(False)

train:sample: ['devoki', 'देवोकी']
Number of training examples: 51200
validation:sample: ['kothiyalmadhu', 'कोठियालमधु']
Number of validation examples: 4096
Test:sample: ['qiwi', 'क्यूआईडब्ल्यूआई']
Number of Test examples: 4096
30
vanilla seq2seqWithoutAttn
Epoch: 0 | Train Loss: 0.1396 |Val Loss: 0.1738 |Val Accuracy: 21.9238%
Epoch: 1 | Train Loss: 0.1167 |Val Loss: 0.1601 |Val Accuracy: 18.0908%
