<a href="https://colab.research.google.com/github/Dhananjay42/cs6910-assn3/blob/main/assn3_nb.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##CS6910: Assignment-3
-- Submitted by Dhananjay Balakrishnan, ME19B012

##Setup and Loading the Dataset

In [1]:
import csv
from __future__ import unicode_literals, print_function, division
from io import open
import unicodedata
import string
import re
import random

import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [2]:
from google.colab import drive
drive.mount('/content/gdrive/')

Mounted at /content/gdrive/


In [3]:
data_dir = '/content/gdrive/MyDrive/CS6910_A3/aksharantar_sampled/tam/'

In [4]:
def obtain_data(dir):
  x = []
  y = []

  with open(dir, 'r') as file:
    reader = csv.DictReader(file, fieldnames=['x', 'y'])
  
    for row in reader:
      x.append(row['x'])
      y.append(row['y'])
    
  return x, y

In [5]:
x_train, y_train = obtain_data(data_dir + 'tam_train.csv')
x_test, y_test = obtain_data(data_dir + 'tam_test.csv')
x_val, y_val = obtain_data(data_dir + 'tam_valid.csv')

In [6]:
for i in range(0, 2):
  print(x_train[i], y_train[i])

thottacharya தொட்டாச்சார்ய
menmaithaan மென்மைதான்


In [7]:
start_token = 0
end_token = 1

In [8]:
eng_characters = {}

In [9]:
class Language:
  def __init__(self, name):
    self.name = name
    self.char2index = {}
    self.index2char = {0: "SOS", 1: "EOS", 2: "unknown"}
    self.n_chars = 3  # Count SOS and EOS
    self.max_size = 0

  def update_vocab(self, x):
    for word in x:
      if len(word) > self.max_size:
        self.max_size = len(word)
        
      for letter in word:
        if letter not in self.char2index.keys():
          self.char2index[letter] = self.n_chars
          self.index2char[self.n_chars] = letter
          self.n_chars = self.n_chars + 1
  
  def get_index(self, character):
    if character in self.char2index.keys():
      return self.char2index[character]
    else:
      return 2

In [10]:
english = Language('eng')
tamil = Language('tam')

In [11]:
english.update_vocab(x_train)
tamil.update_vocab(y_train)

In [12]:
def encoded_word(language, word):
  coded = [language.get_index(letter) for letter in word]
  coded.append(end_token)
  return coded

def get_pairs(lang1, lang2, inputs, targets):
  return [(torch.tensor(encoded_word(lang1, x), dtype=torch.long, device=device).view(-1, 1), torch.tensor(encoded_word(lang2, y), dtype=torch.long, device=device).view(-1, 1)) 
  for (x,y) in zip(inputs,targets)]

In [13]:
train_data = get_pairs(english, tamil, x_train, y_train)
test_data = get_pairs(english, tamil, x_test, y_test)
val_data = get_pairs(english, tamil, x_val, y_val)

#RNN without Attention

In [26]:
class Encoder(nn.Module):
  def __init__(self, inp_vocab_size, embedding_size, n_layers, hl_size, cell_type = 'RNN'):
    super(Encoder, self).__init__()
    self.vocab_size = inp_vocab_size
    self.embedding_size = embedding_size
    self.n_layers = n_layers
    self.hl_size = hl_size

    if cell_type == 'RNN':
      self.cell = nn.RNN(self.embedding_size, self.hl_size, num_layers = self.n_layers).to(device)
    elif cell_type == 'GRU':
      self.cell = nn.GRU(self.embedding_size, self.hl_size, num_layers = self.n_layers).to(device)
    elif cell_type == 'LSTM':
      self.cell = nn.LSTM(self.embedding_size, self.hl_size, num_layers = self.n_layers).to(device)
      #you'll have to fix the LSTM part as the return/passed parameters are different. 
    else:
      print('Wrong Cell Type.')
      exit()
    self.embedding_layer = nn.Embedding(self.vocab_size, self.embedding_size).to(device)
  
  def forward(self, input, hidden):
    embedded = self.embedding_layer(input)
    output, hidden = self.cell(embedded)
    return output, hidden
  
  def init_hidden(self):
    return torch.zeros(self.n_layers, 1, self.hl_size, device = device)

In [27]:
class DecoderVanilla(nn.Module):
  def __init__(self, out_vocab_size, embedding_size, n_layers, hl_size, cell_type = 'RNN'):
    super(DecoderVanilla, self).__init__()
    self.vocab_size = out_vocab_size
    self.embedding_size = embedding_size
    self.n_layers = n_layers
    self.hl_size = hl_size
    self.linear = nn.Linear(self.hl_size, self.vocab_size).to(device)
    self.softmax = nn.LogSoftmax(dim=1)

    if cell_type == 'RNN':
      self.cell = nn.RNN(self.embedding_size, self.hl_size, num_layers = self.n_layers).to(device)
    elif cell_type == 'GRU':
      self.cell = nn.GRU(self.embedding_size, self.hl_size, num_layers = self.n_layers).to(device)
    elif cell_type == 'LSTM':
      self.cell = nn.LSTM(self.embedding_size, self.hl_size, num_layers = self.n_layers).to(device)
    else:
      print('Wrong Cell Type.')
      exit()
    
    self.embedding_layer = nn.Embedding(self.vocab_size, self.embedding_size).to(device)
  
  def forward(self, input, hidden):
    embedded = self.embedding_layer(input)
    output = F.relu(embedded)

    output, hidden = self.cell(output, hidden)
    output = self.linear(output[0])
    output = self.softmax(output)
    return output, hidden

In [51]:
class seq2seq_vanilla():
  def __init__(self, inp_language, out_language, embedding_size, n_layers, hl_size, cell_type = 'RNN', lr = 0.001, teacher_forcing_ratio = 0.5):
    self.encoder = Encoder(inp_language.n_chars, embedding_size, n_layers, hl_size, cell_type)
    self.decoder = DecoderVanilla(out_language.n_chars, embedding_size, n_layers, hl_size, cell_type)
    self.lr = lr
    self.teacher_forcing = teacher_forcing_ratio
    self.max_length = out_language.max_size

    self.encoder_optimizer = optim.SGD(self.encoder.parameters(), lr=self.lr)
    self.decoder_optimizer = optim.SGD(self.decoder.parameters(), lr = self.lr)

    self.loss_fn = nn.NLLLoss()

  def train_step(self, input, target):
    encoder_hidden = self.encoder.init_hidden()
    self.encoder_optimizer.zero_grad()
    self.decoder_optimizer.zero_grad()

    input_length = input.size(0)
    target_length = target.size(0)

    loss = 0

    for i in range(0, input_length):
      encoder_output, encoder_hidden = self.encoder.forward(input[i], encoder_hidden)
    
    decoder_input = torch.tensor([[start_token]], device=device)

    decoder_hidden = encoder_hidden.unsqueeze(1)

    if random.random() < self.teacher_forcing:
      #here, we use teacher forcing. 
      for j in range(0, target_length):
        decoder_output, decoder_hidden = self.decoder.forward(decoder_input, decoder_hidden)
        loss = loss + self.loss_fn(decoder_output, target[j])
        decoder_input = target[j].unsqueeze(0)

    else:
      #here, there is no teacher forcing. the predictions themselves are used. 
      for j in range(0, target_length):
        decoder_output, decoder_hidden = self.decoder.forward(decoder_input, decoder_hidden)
        loss = loss + self.loss_fn(decoder_output, target[j])
        value, index = decoder_output.topk(1)
        decoder_input = index
        if decoder_input.item() == end_token:
          break
            
    loss.backward()
    self.encoder_optimizer.step()
    self.decoder_optimizer.step()

    return loss.item()/target_length
  
  def predict(self, input, target):
    loss = 0
    with torch.no_grad():
      encoder_hidden = self.encoder.init_hidden()

      input_length = input.size(0)
      for i in range(0, input_length):
        encoder_output, encoder_hidden = self.encoder.forward(input[i], encoder_hidden)

      decoder_input = torch.tensor([[start_token]], device=device)
      decoder_hidden = encoder_hidden.unsqueeze(1)

      outputs = []
      for i in range(0, min(target.size(0), self.max_length)):
        decoder_output, decoder_hidden = self.decoder.forward(decoder_input, decoder_hidden)
        loss = loss + self.loss_fn(decoder_output, target[i])

        value, index = decoder_output.topk(1)
        decoder_input = index
        outputs.append(decoder_input.item())
        if decoder_input.item() == end_token:
          break

      return loss.item()/len(outputs), outputs


  def evaluate(self, data, loss_flag = True):
    total_loss = 0
    correct = 0

    for pair in data:
      input = pair[0]
      target = pair[1]
      loss, pred = self.predict(input, target)
      target = target.tolist()
      
      if loss_flag:
        total_loss = total_loss + loss
      
      if len(pred) != len(target):
        continue
      else:
        if pred == target:
          correct = correct + 1
    
    if loss_flag:
      return total_loss, correct/len(x_test)

    else:
      return correct/len(x_test)    

In [52]:
model = seq2seq_vanilla(inp_language = english, out_language = tamil, embedding_size = 16, n_layers = 3, hl_size = 64)

In [53]:
n_iters = 45000

In [54]:
training_pairs = [random.choice(train_data) for i in range(0, n_iters)]

In [None]:
train_loss = 0
for i in range(0, n_iters):
  training_pair = training_pairs[i]
  x = training_pair[0]
  y = training_pair[1]
  loss = model.train_step(x, y)
  train_loss = train_loss + loss

  if (i+1)%5000 == 0:
     print(train_loss/5000)
     test_loss, test_acc = model.evaluate(val_data, loss_flag = True)
     print(test_loss, test_acc)
     train_loss = 0

2.7873415006148496
12384.246377424968 0.0
2.593003779720923
12680.817673816937 0.0
2.5375316890756086
12222.915682475545 0.0
2.510175545051388
12210.768765169512 0.0
2.4818409175910765


In [31]:
for i in range(0, n_epochs):
  train_loss = 0

  for training_pair in train_data:
    x = training_pair[0]
    y = training_pair[1]
    loss = model.train_step(x, y)
    train_loss = train_loss + loss
  
  train_acc = model.evaluate(train_data, loss_flag = False)
  test_loss, test_acc = model.evaluate(val_data, loss_flag = True)
  print(f'At the end of epoch {i+1}, train loss:{train_loss}, train accuracy:{train_acc}, test loss:{test_loss}, and test accuracy: {test_acc}.')

KeyboardInterrupt: ignored