In [None]:
import numpy as np
import pandas as pd
import random
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
# import langdetect
# from langdetect import detect

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
!unzip /content/gdrive/MyDrive/aksharantar_sampled.zip > /dev/null

## READING DATA

In [None]:
train_data=pd.read_csv('aksharantar_sampled/hin/hin_train.csv',header=None)
X_train = list(train_data[0])
Y_train = list(train_data[1])

validation_data=pd.read_csv('aksharantar_sampled/hin/hin_valid.csv',header=None)
X_valid = list(validation_data[0])
Y_valid = list(validation_data[1])

test_data=pd.read_csv('aksharantar_sampled/hin/hin_test.csv',header=None)
X_test = list(test_data[0])
Y_test = list(test_data[1])

## DATA PROCESSING

In [None]:
def get_one_hot(char_dict,ch,len_alphabets):
  input_index = char_dict[ch]
  one_hot_tensor = np.zeros(len_alphabets)
  one_hot_tensor[input_index] = 1
  return one_hot_tensor.tolist()

def data_processing(data,char_dict,len_chrs):
  ONE_HOT = []
  for word in data :
    word = '$' + word + '*'
    encoded_word = [get_one_hot(char_dict,i,len_chrs) for i in word]
    encoded_word = torch.tensor(encoded_word)
    ONE_HOT.append(encoded_word)
  return ONE_HOT

$ - Start Character \\
\* - End Character

In [None]:
tgt_chrs = '$'
for i in range(2304,2432):
  tgt_chrs += chr(i)
tgt_chrs += '*'
tgt_char_dict = {char: i for i, char in enumerate(tgt_chrs)}

inp_chrs = '$abcdefghijklmnopqrstuvwxyz*'
inp_char_dict = {char: i for i, char in enumerate(inp_chrs)}

In [None]:
# These are lists of tensors corresponding to each word
inp_train = data_processing(X_train,inp_char_dict,len(inp_chrs))
tgt_train = data_processing(Y_train,tgt_char_dict,len(tgt_chrs))
inp_valid = data_processing(X_valid,inp_char_dict,len(inp_chrs))
tgt_valid = data_processing(Y_valid,tgt_char_dict,len(tgt_chrs))
inp_test = data_processing(X_test,inp_char_dict,len(inp_chrs))
tgt_test = data_processing(Y_test,tgt_char_dict,len(tgt_chrs))

## ENCODER

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_layer_size, num_encoder_layers, cell_type, dropout_prob, bidirectional):
      super(Encoder, self).__init__()
      '''
        self.input_size : int
        self.hidden_layer_size : int
        self.num_encoder_layers : int
        self.cell_type : string
      '''
      self.input_size = input_size
      self.hidden_layer_size = hidden_layer_size
      self.num_encoder_layers = num_encoder_layers
      self.cell_type = cell_type
      # self.cells = {'RNN':RNN,'LSTM':LSTM,'GRU':GRU}
      self.embedding = nn.Embedding(input_size, embedding_size)

      if cell_type == 'RNN':
        self.rnn = nn.RNN(input_size = embedding_size, hidden_size = hidden_layer_size, num_layers = num_encoder_layers, nonlinearity = 'relu', dropout = dropout_prob, bidirectional = bidirectional)
      elif cell_type == 'LSTM':
        self.rnn = nn.LSTM(input_size = embedding_size, hidden_size = hidden_layer_size, num_layers = num_encoder_layers, dropout = dropout_prob, bidirectional = bidirectional)
      elif cell_type == 'GRU':
        self.rnn = nn.GRU(input_size = embedding_size, hidden_size = hidden_layer_size, num_layers = num_encoder_layers, dropout = dropout_prob, bidirectional = bidirectional)
      
      self.dropout = nn.Dropout(dropout_prob)

    def forward(self, input, prev_hidden):
      # inp_len = len(input)
      # print("entered")
      # print(inp_len)
      embedded,embedding_dim = self.embedding(input)
      # print("kdm")
      # print(embedded.size)
      embedded = self.dropout(embedded)
      output, hidden = self.rnn(embedded,prev_hidden)
      return output,hidden

    def init_hidden(self):
      return torch.zeros(self.num_encoder_layers,1,self.hidden_layer_size)

## DECODER

In [None]:
# class Decoder

class Decoder(nn.Module):
  def __init__(self, output_size, embedding_size, hidden_layer_size, num_layers, cell_type, dropout_prob, bidirectional):
    super(Decoder, self).__init__()
    '''
      self.output_size : int
      self.hidden_layer_size : int
      self.num_encoder_layers : int
      self.cell_type : string
      self.rnn : RNN,LSTM,GRU 
    '''
    self.output_size = output_size
    self.hidden_layer_size = hidden_layer_size
    self.num_layers = num_layers
    self.cell_type = cell_type
    # self.cells = {'RNN':RNN,'LSTM':LSTM,'GRU':GRU}
    
    self.embedding = nn.Embedding(output_size, embedding_size)

    if cell_type == 'RNN':
      self.rnn = nn.RNN(input_size = embedding_size, hidden_size = hidden_layer_size, num_layers = num_layers, nonlinearity = 'relu', dropout = dropout_prob, bidirectional = bidirectional)
    elif cell_type == 'LSTM':
      self.rnn = nn.LSTM(input_size = embedding_size, hidden_size = hidden_layer_size, num_layers = num_layers, dropout = dropout_prob, bidirectional = bidirectional)
    elif cell_type == 'GRU':
      self.rnn = nn.GRU(input_size = embedding_size, hidden_size = hidden_layer_size, num_layers = num_layers, dropout = dropout_prob, bidirectional = bidirectional)

    self.dropout = nn.Dropout(dropout_prob)
    self.fc = nn.Linear(hidden_layer_size, output_size)

      
  def forward(self, input, hidden):
    # input = input.unsqueeze(0)
    embedded = self.dropout(self.embedding(input))
    # print(embedded.size)
    output, hidden = self.rnn(embedded, hidden)
    # output = output.squeeze(0)
    y_pred = self.fc(output)
    return y_pred, hidden

  def init_hidden(self):
    return torch.zeros(1,1,self.hidden_layer_size)


## SEQ2SEQ

In [None]:
# # class seq2seq

# class Seq2Seq(nn.Module):
#     def __init__(self, encoder, decoder):
#       super(Seq2Seq, self).__init__()
#       self.encoder = encoder
#       self.decoder = decoder
        
#     def forward(self, input, target, teacher_forcing_ratio):
#       # print(input,target)
#       # batch_size = len(input)
#       target_len = len(target)
#       target_vocab_size = self.decoder.output_size
      
#       output,hidden,cell = self.encoder(input)
#       # decoder_hidden = encoder_hidden
      
#       # decoder_input = torch.ones(batch_size, 1, dtype=torch.long) * SOS_token
#       outputs = torch.zeros(batch_size, target_len, target_vocab_size)
#       x = target[0]
#       # flag = False
#       # if np.random.random() < teacher_forcing_ratio:
#       #   flag = True

#       for t in range(1,target_len):
#         output,hidden,cell = self.decoder(x,hidden,cell)
#         outputs[t] = output
#         best_guess = output.argmax(1)
#         x = target[t] if random.random() < teacher_forcing_ratio else best_guess
#       # if np.random.random() < teacher_forcing_ratio:
#       #     for t in range(1, target_len):
#       #         decoder_output, decoder_hidden, decoder_cell = self.decoder(decoder_input, decoder_hidden)
#       #         outputs[:, t, :] = decoder_output.squeeze(1)
#       #         decoder_input = target[:, t].unsqueeze(1)
#       # else:
#       #     for t in range(1, target_len):
#       #         decoder_output, decoder_hidden = self.decoder(decoder_input, decoder_hidden)
#       #         outputs[:, t, :] = decoder_output.squeeze(1)
#       #         _, topi = decoder_output.topk(1)
#       #         decoder_input = topi.squeeze().detach().unsqueeze(1)


#       return outputs


class Seq2Seq(nn.Module):
  def __init__(self, encoder, decoder, teacher_forcing_ratio):
    super(Seq2Seq, self).__init__()
    self.encoder = encoder
    self.decoder = decoder
    self.teacher_forcing_ratio = teacher_forcing_ratio
      
  def forward(self, input, target):
    input_length = input.size()[0]
    target_length = target.size()[0]

    encoder_hidden = encoder.init_hidden()
    encoder_outputs, encoder_hidden = encoder(input,encoder_hidden)

    decoder_input = torch.tensor([[]])
    decoder_context = 
    decoder_hidden = encoder_hidden



### Testing

In [None]:
# Testing

encodertest = Encoder(28,20,10,1,'RNN',0.1,False)
decodertest = Decoder(28,20,10,1,'RNN',0.1,False)
# print(encodertest)
# print(decodertest)
input = inp_train[0]
target = tgt_train[0]

input_length = input.size()[0]
target_length = target.size()[0]

encoder_hidden = encodertest.init_hidden()
print(input.size())
encoder_outputs, encoder_hidden = encodertest(input[0],encoder_hidden)

# decoder_input = torch.tensor([[]])
# decoder_context = 
# decoder_hidden = encoder_hidden


torch.Size([13, 28])




RuntimeError: ignored

In [None]:
def train(model,optimizer,criterion):
  # Trains once on the whole dataset

  model.train()

  for i in range(len(inp_train)):
    input_tensor = inp_train[i]
    target_tensor = tgt_train[i]

    predictions = model(input_tensor,target_tensor)

    loss = criterion(predictions)
    loss.backward()
    optimizer.step()
    loss += loss.item()
  return loss

def train_model(model,learning_rate,epochs):
  encoder_optimizer = optim.Adam(model.encoder.parameters(), lr = learning_rate)
  decoder_optimizer = optim.Adam(model.decoder.parameters(), lr = learning_rate)
  criterion = nn.NLLLoss()
  for epoch in range(epochs):
    loss = train(model,optimizer,criterion)
    print("Epoch : %d, Loss : %f" % (epoch,loss))
# def train(model, iterator, optimizer, criterion):
#     model.train()
#     epoch_loss = 0
#     for batch in iterator:
#         optimizer.zero_grad()
#         predictions = model(batch.text)
#         loss = criterion(predictions, batch.label)
#         loss.backward()
#         optimizer.step()
#         epoch_loss += loss.item()
#     return epoch_loss / len(iterator)


In [None]:
# define hyperparameters
input_size = 128
embedding_size = 128
output_size = 128
hidden_layer_size = 256
num_layers = 2
cell_type = 'RNN'
dropout_prob = 0.2
learning_rate = 0.01
bidirectional = True
epochs = 5

# # define model and optimizer
# encoder = Encoder(input_size, hidden_size, num_layers, cell_type)
# decoder = Decoder(output_size, hidden_size, num_layers, cell_type)
# model = Seq2Seq(encoder, decoder)
# optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# # define loss function
# criterion = nn.CrossEntropyLoss()

# # train the model
# num_epochs = 10

In [None]:

encoder = Encoder(input_size, embedding_size, hidden_layer_size, num_layers, cell_type, dropout_prob, bidirectional)
decoder = Decoder(output_size, embedding_size, hidden_layer_size, num_layers, cell_type, dropout_prob, bidirectional)
model = Seq2Seq(encoder,decoder)
# optimizer = optim.Adam(model.parameters(),lr = learning_rate)
# criterion = nn.CrossEntropyLoss()
train_model(model,epochs)


tensor([[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         1., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0.],
        [1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         1., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 1., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1.,
         0., 0., 0., 0., 0., 0., 0., 0.],
        [1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0.],
        [1., 0.,

RuntimeError: ignored