<a href="https://colab.research.google.com/github/CS22M029/cs6910_assignment3/blob/main/VanilaSeq2seq.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Import necessary packages
import os
import torch
import random
import pandas as pd
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as Function
from torch.autograd import Variable
from torch.utils.data import DataLoader
from google.colab import files 

# Check if CUDA is available
use_cuda = torch.cuda.is_available()

# Set the device type to CUDA if available, otherwise use CPU
if use_cuda:
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

# Define constants for special cases
Start_Symbol, End_Symbol, Unknown, Padding = 0, 1, 2, 3

#Define a class for a Vocabulary that will hold mappings between characters and their indices
class Vocabulary:
    def __init__(self):
        self.char2count = {}
        self.char2index = {}
        self.n_chars = 4
        self.index2char = {0: "<", 1: ">", 2: "?", 3: "."}

    def addWord(self, word):
        for char in word:
            if char not in self.char2index:
                self.char2index[char] = self.n_chars
                self.index2char[self.n_chars] = char
                self.char2count[char] = 1
                self.n_chars += 1
            else:
                self.char2count[char] += 1


# Define a function to prepare the data
def prepareData(dir):
    # Upload the CSV file
    print("Upload", dir)
    uploaded = files.upload()
    
    # Read the CSV file into a DataFrame with columns "input" and "target"
    data = pd.read_csv(next(iter(uploaded)), sep=",", names=["input", "target"])


    # Find the maximum length of input and target sequences
    max_input_length = max([len(txt) for txt in data["input"].to_list()])
    max_target_length = max([len(txt) for txt in data["target"].to_list()])
    max_len=max(max_input_length,max_target_length)

    # Create Vocabulary objects for input and output languages
    input_lang = Vocabulary()
    output_lang = Vocabulary()

    # Create pairs of input and target sequences
    pairs = []
    input_list, target_list = data["input"].to_list(), data["target"].to_list()
    for i in range(len(input_list)):
        pairs.append([input_list[i], target_list[i]])

    # Add words to the respective vocabularies
    for pair in pairs:
        input_lang.addWord(pair[0])
        output_lang.addWord(pair[1])

    # Create a dictionary containing prepared data
    prepared_data = {
        "input_lang": input_lang,
        "output_lang": output_lang,
        "pairs": pairs,
        "max_len": max_len
    }

    return prepared_data

# Define a helper function to convert a word to a tensor
def helpTensor(lang, word, max_length):
    index_list = []
    for char in word:
        if char in lang.char2index.keys():
            index_list.append(lang.char2index[char])
        else:
            index_list.append(Unknown)
    indexes = index_list
    indexes.append(End_Symbol)
    indexes.extend([Padding] * (max_length - len(indexes)))
    result = torch.LongTensor(indexes)
    if use_cuda:
        return result.cuda()
    else:
        return result

# Define a function to convert pairs of input and target sequences to tensors
def MakeTensor(input_lang, output_lang, pairs, reach):
    res = []
    for pair in pairs:
        # Convert input and target sequences to tensors using the helpTensor function
        input_variable = helpTensor(input_lang, pair[0], reach)
        target_variable = helpTensor(output_lang, pair[1], reach)
        res.append((input_variable, target_variable))
    return res

#Encoder Class
class EncoderRNN(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, num_layers_encoder, cell_type, drop_out, bi_directional):
        super(EncoderRNN, self).__init__()

        # Initialize the EncoderRNN with the provided parameters
        self.embedding_size = embedding_size
        self.hidden_size = hidden_size
        self.num_layers_encoder = num_layers_encoder
        self.cell_type = cell_type
        self.drop_out = drop_out
        self.bi_directional = bi_directional

        # Create an embedding layer
        self.embedding = nn.Embedding(input_size, self.embedding_size)
        self.dropout = nn.Dropout(self.drop_out)

        # Create the specified cell layer (RNN, GRU, or LSTM)
        cell_map = {"RNN": nn.RNN, "GRU": nn.GRU, "LSTM": nn.LSTM}
        self.cell_layer = cell_map[self.cell_type](
            self.embedding_size,
            self.hidden_size,
            num_layers=self.num_layers_encoder,
            dropout=self.drop_out,
            bidirectional=self.bi_directional,
        )

    def forward(self, input, batch_size, hidden):
        # Apply dropout to the embedded input sequence
        embedded = self.dropout(self.embedding(input).view(1, batch_size, -1))

        # Pass the embedded input through the cell layer
        output, hidden = self.cell_layer(embedded, hidden)
        return output, hidden

    def initHidden(self, batch_size, num_layers_enc):
        # Initialize the hidden state with zeros
        res = torch.zeros(num_layers_enc * 2 if self.bi_directional else num_layers_enc, batch_size, self.hidden_size)

        # Move the hidden state to the GPU if use_cuda is True, else return as is
        return res.cuda() if use_cuda else res

#Decoder class
class DecoderRNN(nn.Module):
    def __init__(self, embedding_size, hidden_size, num_layers_decoder, cell_type, drop_out, bi_directional, output_size):
        super(DecoderRNN, self).__init__()

        self.embedding_size = embedding_size
        self.hidden_size = hidden_size
        self.num_layers_decoder = num_layers_decoder
        self.cell_type = cell_type
        self.drop_out = drop_out
        self.bi_directional = bi_directional

        # Create an embedding layer
        self.embedding = nn.Embedding(output_size, self.embedding_size)
        self.dropout = nn.Dropout(self.drop_out)

        # Create the specified cell layer (RNN, GRU, or LSTM)
        cell_map = {"RNN": nn.RNN, "GRU": nn.GRU, "LSTM": nn.LSTM}
        self.cell_layer = cell_map[self.cell_type](
            self.embedding_size,
            self.hidden_size,
            num_layers=self.num_layers_decoder,
            dropout=self.drop_out,
            bidirectional=self.bi_directional,
        )

        # Linear layer for output
        self.out = nn.Linear(
            self.hidden_size * 2 if self.bi_directional else self.hidden_size,
            output_size,
        )

        # Softmax activation
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, input, batch_size, hidden):
        # Apply dropout to the embedded input sequence and pass it through the cell layer
        output = Function.relu(self.dropout(self.embedding(input).view(1, batch_size, -1)))
        output, hidden = self.cell_layer(output, hidden)

        # Apply softmax activation to the output
        output = self.softmax(self.out(output[0]))
        return output, hidden

# Function to calculate loss (if is_training then training loss else validation loss)
def calc_loss(encoder, decoder, input_tensor, target_tensor, batch_size, encoder_optimizer, decoder_optimizer, criterion, cell_type, num_layers_enc, max_length, is_training, teacher_forcing_ratio=0.5):
    # Initialize the encoder hidden state
    encoder_hidden = encoder.initHidden(batch_size, num_layers_enc)

    # Check if LSTM and initialize cell state
    if cell_type == "LSTM":
        encoder_cell_state = encoder.initHidden(batch_size, num_layers_enc)
        encoder_hidden = (encoder_hidden, encoder_cell_state)

    # Zero the gradients
    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    # Get input and target sequence lengths
    input_length = input_tensor.size(0)
    target_length = target_tensor.size(0)

    # Initialize loss
    loss = 0

    # Encoder forward pass
    for ei in range(input_length):
        encoder_output, encoder_hidden = encoder(input_tensor[ei], batch_size, encoder_hidden)

    # Initialize decoder input
    decoder_input = torch.LongTensor([Start_Symbol] * batch_size)
    decoder_input = decoder_input.cuda() if use_cuda else decoder_input

    # Set decoder hidden state
    decoder_hidden = encoder_hidden

    # Determine if using teacher forcing
    use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False

    # Loop over target sequence
    if is_training:
        # Training phase
        for di in range(target_length):
            decoder_output, decoder_hidden = decoder(decoder_input, batch_size, decoder_hidden)
            loss += criterion(decoder_output, target_tensor[di])
            decoder_input = target_tensor[di] if use_teacher_forcing else decoder_output.argmax(dim=1)
    else:
        # Validation phase
        with torch.no_grad():
            for di in range(target_length):
                decoder_output, decoder_hidden = decoder(decoder_input, batch_size, decoder_hidden)
                loss += criterion(decoder_output, target_tensor[di])
                decoder_input = decoder_output.argmax(dim=1)

    # Backpropagation and optimization in training phase
    if is_training:
        loss.backward()
        encoder_optimizer.step()
        decoder_optimizer.step()

    # Return the average loss per target length
    return loss.item() / target_length


# Calculate the accuracy of the Seq2Seq model
def accuracy(encoder, decoder, loader, batch_size, criterion, cell_type, num_layers_enc, max_length, output_lang):
    with torch.no_grad():
        total = 0
        correct = 0

        for batch_x, batch_y in loader:
            # Initialize encoder hidden state
            encoder_hidden = encoder.initHidden(batch_size, num_layers_enc)

            input_variable = Variable(batch_x.transpose(0, 1))
            target_variable = Variable(batch_y.transpose(0, 1))

            # Check if LSTM and initialize cell state
            if cell_type == "LSTM":
                encoder_cell_state = encoder.initHidden(batch_size, num_layers_enc)
                encoder_hidden = (encoder_hidden, encoder_cell_state)

            input_length = input_variable.size()[0]
            target_length = target_variable.size()[0]

            output = torch.LongTensor(target_length, batch_size)

            # Initialize encoder outputs
            encoder_outputs = Variable(torch.zeros(max_length, batch_size, encoder.hidden_size))
            encoder_outputs = encoder_outputs.cuda() if use_cuda else encoder_outputs

            # Encoder forward pass
            for ei in range(input_length):
                encoder_output, encoder_hidden = encoder(input_variable[ei], batch_size, encoder_hidden)

            decoder_input = Variable(torch.LongTensor([Start_Symbol] * batch_size))
            decoder_input = decoder_input.cuda() if use_cuda else decoder_input

            decoder_hidden = encoder_hidden

            # Decoder forward pass
            for di in range(target_length):
                decoder_output, decoder_hidden = decoder(decoder_input, batch_size, decoder_hidden)
                topv, topi = decoder_output.data.topk(1)
                decoder_input = torch.cat(tuple(topi))
                output[di] = torch.cat(tuple(topi))

            output = output.transpose(0, 1)

            # Calculate accuracy
            for di in range(output.size()[0]):
                ignore = [Start_Symbol, End_Symbol, Padding]
                sent = [output_lang.index2char[letter.item()] for letter in output[di] if letter not in ignore]
                y = [output_lang.index2char[letter.item()] for letter in batch_y[di] if letter not in ignore]
                if sent == y:
                    correct += 1
                total += 1

    return (correct / total) * 100

# Train and evaluate the Seq2Seq model
def seq2seq(encoder, decoder, train_loader, val_loader, test_loader, lr, optimizer, epochs, max_length_word, num_layers_enc, output_lang):
    max_length = max_length_word - 1
    # Define the optimizer and criterion
    encoder_optimizer = optim.NAdam(encoder.parameters(), lr=lr) if optimizer == "nadam" else optim.Adam(encoder.parameters(), lr=lr)
    decoder_optimizer = optim.NAdam(decoder.parameters(), lr=lr) if optimizer == "nadam" else optim.Adam(decoder.parameters(), lr=lr)
    criterion = nn.NLLLoss()

    for epoch in range(epochs):
        train_loss_total = 0
        val_loss_total = 0

        # Training phase
        for batch_x, batch_y in train_loader:
            batch_x = Variable(batch_x.transpose(0, 1))
            batch_y = Variable(batch_y.transpose(0, 1))
            # Calculate the training loss
            loss = calc_loss(encoder, decoder, batch_x, batch_y, batch_size, encoder_optimizer, decoder_optimizer, criterion, cell_type, num_layers_enc, max_length, is_training=True)
            train_loss_total += loss

        train_loss_avg = train_loss_total / len(train_loader)
        print(f"Epoch: {epoch} | Train Loss: {train_loss_avg:.4f} |", end="")

        # Validation phase
        for batch_x, batch_y in val_loader:
            batch_x = Variable(batch_x.transpose(0, 1))
            batch_y = Variable(batch_y.transpose(0, 1))
            # Calculate the validation loss
            loss = calc_loss(encoder, decoder, batch_x, batch_y, batch_size, encoder_optimizer, decoder_optimizer, criterion, cell_type, num_layers_enc, max_length, is_training=False)
            val_loss_total += loss

        val_loss_avg = val_loss_total / len(val_loader)
        print(f"Val Loss: {val_loss_avg:.4f} |", end="")

        # Calculate validation accuracy
        val_acc = accuracy(encoder, decoder, val_loader, batch_size, criterion, cell_type, num_layers_enc, max_length, output_lang)
        val_acc /= 100
        print(f"Val Accuracy: {val_acc:.4%}")


# Define model hyperparameters
hidden_size = 512
input_lang = "eng"
target_lang = "hin"
cell_type = "LSTM"
num_layers_encoder = 3
num_layers_decoder = 3
drop_out = 0
epochs = 5
embedding_size = 64
bi_directional = True
batch_size = 32
teacher_forcing_ratio = 0.5
optimizer = "Nadam"
learning_rate = 0.001 

train_path = "hin_train.csv"
validation_path = "hin_valid.csv"
test_path = "hin_test.csv"

# Prepare training data
train_prepared_data = prepareData(train_path)
input_langs, output_langs, pairs = train_prepared_data["input_lang"], train_prepared_data["output_lang"], train_prepared_data["pairs"]
print("train:sample:", random.choice(pairs))
print(f"Number of training examples: {len(pairs)}")
max_len = train_prepared_data["max_len"]

# Prepare validation data
val_prepared_data = prepareData(validation_path)
val_pairs = val_prepared_data["pairs"]
print("validation:sample:", random.choice(val_pairs))
print(f"Number of validation examples: {len(val_pairs)}")
max_len_val = val_prepared_data["max_len"]

# Prepare test data
test_prepared_data = prepareData(test_path)
test_pairs = test_prepared_data["pairs"]
print("Test:sample:", random.choice(test_pairs))
print(f"Number of Test examples: {len(test_pairs)}")

max_len_test = test_prepared_data["max_len"]
max_len = max(max_len, max_len_val, max_len_test) + 4
print(max_len)

# Convert data to tensors and create data loaders
pairs = MakeTensor(input_langs, output_langs, pairs, max_len)
val_pairs = MakeTensor(input_langs, output_langs, val_pairs, max_len)
test_pairs = MakeTensor(input_langs, output_langs, test_pairs, max_len)

train_loader = DataLoader(pairs, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_pairs, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_pairs, batch_size=1, shuffle=True)


Upload hin_train.csv


Saving hin_train.csv to hin_train.csv
train:sample: ['gaderi', 'गदेरी']
Number of training examples: 51200
Upload hin_valid.csv


Saving hin_valid.csv to hin_valid.csv
validation:sample: ['farishte', 'फरिश्ते']
Number of validation examples: 4096
Upload hin_test.csv


Saving hin_test.csv to hin_test.csv
Test:sample: ['thaki', 'थकी']
Number of Test examples: 4096
30


In [2]:

# Create the encoder and decoder models
encoder1 = EncoderRNN(input_langs.n_chars, embedding_size, hidden_size, num_layers_encoder, cell_type, drop_out, bi_directional)
decoder1 = DecoderRNN(embedding_size, hidden_size, num_layers_encoder, cell_type, drop_out, bi_directional, output_langs.n_chars)
print(use_cuda)
if use_cuda:
   encoder1, decoder1 = encoder1.cuda(), decoder1.cuda()

print("vanilla seq2seq")
# Train and evaluate the Seq2Seq model
seq2seq(encoder1, decoder1, train_loader, val_loader, test_loader, learning_rate, optimizer, epochs, max_len, num_layers_encoder, output_langs)


True
vanilla seq2seq
Epoch: 0 | Train Loss: 0.7430 |Val Loss: 0.3891 |Val Accuracy: 15.4785%
Epoch: 1 | Train Loss: 0.3333 |Val Loss: 0.2972 |Val Accuracy: 29.3457%
Epoch: 2 | Train Loss: 0.2521 |Val Loss: 0.2775 |Val Accuracy: 33.7158%
Epoch: 3 | Train Loss: 0.2183 |Val Loss: 0.2725 |Val Accuracy: 35.5225%
Epoch: 4 | Train Loss: 0.1868 |Val Loss: 0.2687 |Val Accuracy: 36.0352%
