In [None]:
# Essentials
import numpy as np
import tensorflow 
from tensorflow import keras
from tensorflow.keras import layers
import io
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences

In [None]:
# Fetching the dataset
!git clone https://github.com/borate267/lexicon-dataset.git

Cloning into 'lexicon-dataset'...
remote: Enumerating objects: 5, done.[K
remote: Counting objects: 100% (5/5), done.[K
remote: Compressing objects: 100% (5/5), done.[K
remote: Total 5 (delta 0), reused 0 (delta 0), pack-reused 0[K
Unpacking objects: 100% (5/5), done.


In [None]:
# Reading the dataset

train_dir = "lexicon-dataset/ta.translit.sampled.train.tsv"
dev_dir = "lexicon-dataset/ta.translit.sampled.dev.tsv"
test_dir = "lexicon-dataset/ta.translit.sampled.test.tsv"

# The following function reads the raw text document and returns a list of lists comprising the romanized and native versions of the words
def read_corpus(corpus_file):
  tamil_words = []
  latin_words = []
  with io.open(corpus_file, encoding ='utf-8') as f:
    for line in f:
      if '\t' not in line:
        continue
      tokens = line.rstrip().split("\t")
      latin_words.append(tokens[1])
      tamil_words.append(tokens[0])
  return latin_words, tamil_words

train_input, train_target = read_corpus(train_dir)
valid_input, valid_target = read_corpus(dev_dir)
test_input, test_target = read_corpus(test_dir)

print("Number of training samples: ", len(train_input))
print("Number of validation samples: ", len(valid_input))
print("Number of testing samples: ", len(test_input))


Number of training samples:  68218
Number of validation samples:  6827
Number of testing samples:  6864


In [None]:
# PRE-PROCESSING

#### Appending decoder inputs with <bow> and <eow>

#bow = "<bow>"
#eow = "<eow>"

bow = 0
eow = 1

#decoder_inputs = [bow + text + eow for text in train_target] 

#### Creating vocabularies for the dataset

vocab_tamil = set()
vocab_latin = set()

for i in range(len(train_input)):
  input_str = train_input[i].lower()
  for char in input_str:
    if char not in vocab_latin:
      vocab_latin.add(char)

for i in range(len(train_target)):
  input_str = train_target[i] 
  for char in input_str:
    if char not in vocab_tamil:
      vocab_tamil.add(char)
#vocab_tamil.add('<bow>')
#vocab_tamil.add('<eow>')

vocab_tamil = sorted(list(vocab_tamil))
vocab_latin = sorted(list(vocab_latin))
sizeofTamilVocab = len(vocab_tamil)
sizeofLatinVocab = len(vocab_latin)
max_encSeqLen = max([len(sample) for sample in train_input])
max_decSeqLen = max([len(sample) for sample in train_target])

#print("Tamil vocabulary: ", vocab_tamil)
#print("Latin vocabulary: ", vocab_latin)
#print("Size of Tamil vocabulary: ", sizeofTamilVocab)
#print("Size of Latin vocabulary: ", sizeofLatinVocab)
#print("Maximum length of encoder size: ", max_encSeqLen)
#print("Maximum length of decoder size: ", max_decSeqLen)

#### Tokenising the encoder and decoder inputs

latin_token_index = dict([(char, i) for i, char in enumerate(vocab_latin)])
tamil_token_index = dict([(char, i) for i, char in enumerate(vocab_tamil)])
print(tamil_token_index)
#### Convert sequences of characters to sequences of tokens

encoder_seq = []
decoder_seq = []

for i in range(len(train_input)):
  input_str = train_input[i].lower()
  dummy = []
  for char in input_str:
    if char in vocab_latin:
      dummy.append(latin_token_index[char])
  encoder_seq.append(dummy)
#print(latin_token_index)
#print(encoder_seq[0:2])

for i in range(len(decoder_inputs)):
  input_str = decoder_inputs[i]
  dummy = []
  for char in input_str:
    if char in vocab_tamil:
      dummy.append(tamil_token_index[char])
  decoder_seq.append(dummy)
#print(tamil_token_index)
#print(decoder_seq[0:2])

#### Padding sequences that are to be fed as input to the encoder and decoder RNN

encoder_inputdata = pad_sequences(encoder_seq, maxlen= max_encSeqLen, dtype='int32', padding='post', truncating='post')
decoder_inputdata = pad_sequences(decoder_seq, maxlen= max_decSeqLen, dtype='int32', padding='post', truncating='post')
#print(encoder_inputdata[0])
#print(decoder_inputdata[0])

#### CHARACTER EMBEDDING





{'ஃ': 0, 'அ': 1, 'ஆ': 2, 'இ': 3, 'ஈ': 4, 'உ': 5, 'ஊ': 6, 'எ': 7, 'ஏ': 8, 'ஐ': 9, 'ஒ': 10, 'ஓ': 11, 'க': 12, 'ங': 13, 'ச': 14, 'ஜ': 15, 'ஞ': 16, 'ட': 17, 'ண': 18, 'த': 19, 'ந': 20, 'ன': 21, 'ப': 22, 'ம': 23, 'ய': 24, 'ர': 25, 'ற': 26, 'ல': 27, 'ள': 28, 'ழ': 29, 'வ': 30, 'ஷ': 31, 'ஸ': 32, 'ஹ': 33, 'ா': 34, 'ி': 35, 'ீ': 36, 'ு': 37, 'ூ': 38, 'ெ': 39, 'ே': 40, 'ை': 41, 'ொ': 42, 'ோ': 43, 'ௌ': 44, '்': 45}


In [None]:
# Configuration

input_emb_size = 256 #Input embedding size
num_enc = 128 #Number of encoder layers
num_dec = 128 #Number of decoder layers
hidden_size = 16 #Hidden layer size
cell_type = 'RNN' #Cell type
dropout = 0.3 #Dropout
batch_size = 16 #batch size

#TODO: Beam search in decoder, add more hyperparas


In [None]:
# seq2seq model architecture

# Input embedding layer : A feedforward layer of size sizeofLatinVocab x input_emb_size




In [None]:
def one_hot_encode(arr, n_labels):
    
    # Initialize the the encoded array
    one_hot = np.zeros((arr.size, n_labels), dtype=np.float32)
    
    # Fill the appropriate elements with ones
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.
    
    # Finally reshape it to get back to the original array
    one_hot = one_hot.reshape((*arr.shape, n_labels))
    
    return one_hot

In [None]:
# check that the function works as expected
#test_seq = np.array([[3, 5, 1]])
test_seq = encoder_inputdata[0]
one_hot = one_hot_encode(test_seq, sizeofLatinVocab)
print(test_seq)
print(one_hot)

In [None]:
def get_batches(arr, batch_size, seq_length):
    '''Create a generator that returns batches of size
       batch_size x seq_length from arr.
       
       Arguments
       ---------
       arr: Array you want to make batches from
       batch_size: Batch size, the number of sequences per batch
       seq_length: Number of encoded chars in a sequence
    '''
    
    batch_size_total = batch_size * seq_length
    # total number of batches we can make
    n_batches = len(arr)//batch_size_total
    
    # Keep only enough characters to make full batches
    arr = arr[:n_batches * batch_size_total]
    # Reshape into batch_size rows
    arr = arr.reshape((batch_size, -1))
    
    # iterate through the array, one sequence at a time
    for n in range(0, arr.shape[1], seq_length):
        # The features
        x = arr[:, n:n+seq_length]
        # The targets, shifted by one
        y = np.zeros_like(x)
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+seq_length]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y

In [None]:
batches = get_batches(encoder_inputdata, batch_size, sizeofLatinVocab)
x, y = next(batches)

In [None]:
# printing out the first 10 items in a sequence
print('x\n', x[:10, :10])
print('\ny\n', y[:10, :10])

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms
import matplotlib.pyplot as plt

import torch
#from torchtext import data
from torchtext.legacy import data
from torchtext import datasets

In [None]:
class Encoder_RNN(nn.Module):
  def __init__(self, input_size, hidden_size, embbed_size, layers_n,model,dropout_prob):
    super(Encoder_RNN, self).__init__()
      
    self.input_size = input_size
    self.embbed_size = embbed_size
    self.hidden_size = hidden_size
    self.layers_n = layers_n
    self.model = model
    self.dropout_prob = dropout_prob

    #initialize the embedding layer with input and embbed dimention
    self.embedding = nn.Embedding(input_size, self.embbed_size)
       
    if model == "RNN":
      self.rnn = nn.RNN(self.embbed_size, self.hidden_size, num_layers=self.layers_n, batch_first=True)
    elif model == "LSTM":
      self.lstm = nn.LSTM(self.embbed_size, self.hidden_size, num_layers=self.layers_n, batch_first=True)
    elif model == "GRU":
      self.gru = nn.GRU(self.embbed_size, self.hidden_size, num_layers=self.layers_n)

      self.dropout = nn.Dropout(dropout_prob)

  def forward(self, input):
      
       embedded = self.embedding(input).view(1,1,-1)
       if model == "RNN":
          outputs, hidden = self.rnn(embedded)
       elif model == "LSTM":
          outputs, hidden = self.lstm(embedded)
       elif model == "GRU":
          outputs, hidden = self.gru(embedded)

       return outputs, hidden

In [None]:
class Decoder_RNN(nn.Module):
   def __init__(self, output_dim, hidden_size, embbed_size, layers_n,model,dropout_prob):
       super(Decoder_RNN, self).__init__()

       self.embbed_size = embbed_size
       self.hidden_size = hidden_size
       self.output_dim = output_dim
       self.layers_n = layers_n
       self.model = model
       self.dropout_prob = dropout_prob

       self.embedding = nn.Embedding(output_dim, self.embbed_size)

       if model == "RNN":
         self.rnn = nn.RNN(self.embbed_size, self.hidden_size, num_layers=self.layers_n, batch_first=True)
       if model == "LSTM":
         self.lstm = nn.LSTM(self.embbed_size, self.hidden_size, num_layers=self.layers_n, batch_first=True)
       elif model == "GRU":
         self.gru = nn.GRU(self.embbed_size, self.hidden_size, num_layers=self.layers_n)
       
       self.out = nn.Linear(self.hidden_size, output_dim)
       self.softmax = nn.Softmax(dim=1)

       self.dropout = nn.Dropout(dropout_prob)
      
   def forward(self, input, hidden):

       input = input.view(1, -1)
       embedded = F.relu(self.embedding(input))
       if model == "RNN":
           output, hidden = self.rnn(embedded, hidden)
       if model == "LSTM":
           output, hidden = self.lstm(embedded, hidden)                    
       elif model == "GRU":
           output, hidden = self.gru(embedded, hidden)       
       
       prediction = self.softmax(self.out(output[0]))
    
       return prediction, hidden


In [None]:
class Seq2Seq(nn.Module):
   def __init__(self, encoder, decoder, device):
       super().__init__()
      
       self.encoder = encoder
       self.decoder = decoder
       self.device = device
     
   def forward(self, source, target):

       input_length = source.size(0) #get the input length (number of words in sentence)
       batch_size = target.shape[1] 
       target_length = target.shape[0]
       vocab_size = self.decoder.output_dim
      
      #initialize a variable to hold the predicted outputs
       outputs = torch.zeros(target_length, batch_size, vocab_size).to(self.device)

       #encode every word in a sentence
       for i in range(input_length):
           encoder_output, encoder_hidden = self.encoder(source[i])

      #use the encoder’s hidden layer as the decoder hidden
       decoder_hidden = encoder_hidden.to(device)
  
      #add a token before the first predicted word
       decoder_input = torch.tensor([bow], device=device)

       for t in range(target_length):   
           decoder_output, decoder_hidden = self.decoder(decoder_input, decoder_hidden)
           outputs[t] = decoder_output
       return outputs

In [None]:
  optimizer = torch.optim.Adam(model.parameters(), lr = lr)
  criterion = nn.CrossEntropyLoss()
  epoch = epoch
  model = Seq2Seq(bla bla bla)

In [None]:
def train():
  for epoch in range(1,epoch+1)

    model.train()
    if train_gpu:
      model.cuda()

    training_pairs = [tensorsFromPair(source, target, random.choice(pairs))
                     for i in range(num_iteration)]
  
    epoch_loss = 0
    
    for iter in range(1, epoch+1):
      
      training_pair = training_pairs[iter - 1]
      input_tensor = training_pair[0]
      target_tensor = training_pair[1]

      model_optimizer.zero_grad()

      input_length = input_tensor.size(0)
      
      # print(input_tensor.shape)

      output = model(input_tensor, target_tensor)

      num_iter = output.size(0)
      print(num_iter)
      loss = 0

#calculate the loss from a predicted sentence with the expected result
      for ot in range(num_iter):
        loss += criterion(output[ot], target_tensor[ot])

      loss.backward()

      nn.utils.clip_grad_norm_(net.parameters(), clip)
      opt.step()
      
      model_optimizer.step()
      epoch_loss = loss.item() / num_iter

      total_loss_iterations += epoch_loss

      return

In [None]:
def train():
  for epoch in range(1,epoch+1):
    train_loss = 0
    val_loss = 0 

    model.train()
    for epoch in range(1,epoch+1):

      training_pairs = [tensorsFromPair(source, target, random.choice(pairs))
                     for i in range(num_iteration)]

                     
