In [25]:
import io
import csv
import keras
import tensorflow 
import numpy as np
from math import log, log1p 
from numpy import array, argmax
from keras.models import Model, load_model
from tensorflow.keras.optimizers import Adam
from keras.utils.vis_utils import plot_model
from keras.layers import Input, LSTM, Dense, Embedding, GRU, Dropout, SimpleRNN

In [26]:
class seq2seq(object):
    def __init__(self,Type = 'RNN',input_embedding = 32, hid_layer_size=32, lr= 1e-3, 
               drop_prob=0.4,prediction ='greedy', number_of_epochs = 10, batch_size = 32,beam_width = 5,
               number_of_encoders = 1, number_of_decoders = 1):  
    
        self.Type = Type
        self.input_embedding = input_embedding
        self.hid_layer_size = hid_layer_size
        self.lr = lr
        self.drop_prob = drop_prob
        self.prediction = prediction
        self.number_of_epochs = number_of_epochs
        self.batch_size = batch_size
        self.beam_width = beam_width
        self.number_of_encoders = number_of_encoders
        self.number_of_decoders = number_of_decoders

    def fit(self, enc_inp, dec_inp, dec_tar,x_test, y_test):

        enc_inps = Input(shape=(None, ),name = 'Enc_inputs')

        encoder_embedding =  Embedding(enc_tok_num, self.input_embedding , mask_zero = True,name = 'Enc_emb')(enc_inps)
        
        enc_outs = encoder_embedding

        if self.Type == 'LSTM':

            enc_LSTM = LSTM(self.hid_layer_size, return_state=True,dropout = self.drop_prob, return_sequences=True, name="Enc_hidden_1")
            enc_outs, h, c = enc_LSTM(enc_outs)
            states = [h, c]

            for i in range( 2, self.number_of_encoders +1):

                layer_name = ('Enc_hidden_%d') %i
                enc_LSTM = LSTM(self.hid_layer_size, return_state=True,dropout = self.drop_prob, return_sequences=True, name=layer_name)
                enc_outs, h, c = enc_LSTM(enc_outs,initial_state = states)
                states = [h, c]

        elif self.Type == 'GRU':

            enc_GRU = GRU(self.hid_layer_size, return_state=True,dropout = self.drop_prob, return_sequences=True, name="Enc_hidden_1")
            enc_outs, h = enc_GRU(enc_outs)
            states = [h]

            for i in range(2, self.number_of_encoders +1):

                l = ('Enc_hidden_%d') %i
                enc_GRU = GRU(self.hid_layer_size, return_state=True,dropout = self.drop_prob, return_sequences=True, name=l)
                enc_outs, h = enc_GRU(enc_outs, initial_state = states)
                states = [h]  

        elif self.Type == 'RNN':

            enc_rnn = SimpleRNN(self.hid_layer_size, return_state=True,dropout = self.drop_prob, return_sequences=True, name="Enc_hidden_1")
            enc_outs, h = enc_rnn(enc_outs)
            states = [h]
            
            for i in range(2, self.number_of_encoders +1):

                l = ('Enc_hidden_%d') %i
                enc_rnn = SimpleRNN(self.hid_layer_size, return_state=True,dropout = self.drop_prob, return_sequences=True, name=l)
                enc_outs, h = enc_rnn(enc_outs, initial_state = states)
                states = [h]  

        dec_inps= Input(shape=(None,), name = 'Dec_inputs')
        decoder_embedding_layer = Embedding(dec_tok_num, self.hid_layer_size, mask_zero = True, name = 'Dec_emb')
        decoder_embedding = decoder_embedding_layer(dec_inps)
        dec_outs = decoder_embedding

        if self.Type == 'LSTM':

            dec_LSTM = LSTM(self.hid_layer_size, return_sequences=True, return_state=True,dropout = self.drop_prob, name="Dec_hidden_1")
            dec_outs, _, _ = dec_LSTM(dec_outs, initial_state = states)

            for i in range(2, self.number_of_decoders +1):

                l = ('Dec_hidden_%d') %i
                dec_LSTM = LSTM(self.hid_layer_size, return_sequences=True, return_state=True,dropout = self.drop_prob, name=l)
                dec_outs, _, _ = dec_LSTM(dec_outs, initial_state = states)

        elif self.Type == 'GRU':

            dec_GRU = GRU(self.hid_layer_size, return_sequences=True, return_state=True,dropout = self.drop_prob, name="Dec_hidden_1")
            dec_outs, _ = dec_GRU(dec_outs, initial_state = states)

            for i in range(2, self.number_of_decoders+1):

                l = ('Dec_hidden_%d') %i
                dec_GRU = GRU(self.hid_layer_size, return_sequences=True, return_state=True,dropout = self.drop_prob, name=l)
                dec_outs, _ = dec_GRU(dec_outs, initial_state = states)

        elif self.Type == 'RNN':

            dec_RNN = SimpleRNN(self.hid_layer_size, return_sequences=True, return_state=True,dropout = self.drop_prob, name="Dec_hidden_1")
            dec_outs, _ = dec_RNN(dec_outs, initial_state = states)

            for i in range(2, self.number_of_decoders+1):

                l = ('Dec_hidden_%d') %i
                dec_RNN = SimpleRNN(self.hid_layer_size, return_sequences=True, return_state=True,dropout = self.drop_prob, name=l)
                dec_outs, _ = dec_RNN(dec_outs, initial_state = states)

        den_dec= Dense(dec_tok_num, activation='softmax', name = 'dense')
        dec_outs = den_dec(dec_outs)

        
        model = Model([enc_inps, dec_inps], dec_outs)
        model.summary()
        
        optimizer = Adam(lr=self.lr, beta_1=0.9, beta_2=0.999)

        model.compile(loss = "categorical_crossentropy", optimizer = optimizer, metrics=['accuracy'])

        model.fit(
            [enc_inp, dec_inp],
            dec_tar,
            batch_size=self.batch_size,
            epochs=self.number_of_epochs,
            )
        
        
        enc_mod, dec_mod = self.inference(model)
        data_list = [["SNO", "Input Data", "Target Data", "Predicted Data"]]

        g_t = 0
        g_c = 0

        for i in range(len(y_test)):
          
            input_seq = x_test[i : i + 1]
            result = self.dec_seq(enc_mod,dec_mod,input_seq)
            target = y_test[i]
            target = target[1:len(target)-1]
            result = result[0:len(result)-1]
            dlist = [i+1, test_input_texts[i], target, result]
            data_list.append(dlist)
            if result.strip() == target.strip():
                g_c = g_c + 1

            g_t = g_t + 1
            accuracy_epoch = g_c/g_t

        with open('predictions_vanilla.tsv', 'w', newline='', encoding="utf-8") as file:
            writer = csv.writer(file, delimiter='\t')
            writer.writerows(data_list)
        val_accuracy = g_c/g_t
        print(val_accuracy)

    
    def inference(self, model):

        enc_inps = model.input[0]  

        if self.Type == 'RNN' or self.Type == 'GRU':

            enc_outs, encoder_h = model.get_layer('Enc_hidden_'+ str(self.number_of_encoders)).output
            states = [encoder_h]
            enc_mod = Model(enc_inps, states)

            dec_inps = model.input[1]  
            dec_outs = model.get_layer('Dec_emb')(dec_inps)
            inps_dec_states = []
            dec_states = []

            for i in range(1,self.number_of_decoders +1):

                decoder_state_input_h = keras.Input(shape=(self.hid_layer_size,))
                curr_states_inputs = [decoder_state_input_h]
                decoder = model.get_layer('Dec_hidden_'+ str(i))
                dec_outs, decoder_h = decoder(dec_outs, initial_state=curr_states_inputs)
                dec_states += [decoder_h]
                inps_dec_states += curr_states_inputs

        elif self.Type == 'LSTM':

            enc_outs, encoder_h, encoder_c = model.get_layer('Enc_hidden_'+ str(self.number_of_encoders)).output 
            states = [encoder_h, encoder_c]
            enc_mod = Model(enc_inps, states)

            dec_inps = model.input[1] 
            dec_outs = model.get_layer('Dec_emb')(dec_inps)
            inps_dec_states = []
            dec_states = []

            for i in range(1,self.number_of_decoders +1):

                decoder_state_input_h = keras.Input(shape=(self.hid_layer_size,))
                decoder_state_input_c = keras.Input(shape=(self.hid_layer_size,))
                curr_states_inputs = [decoder_state_input_h, decoder_state_input_c]
                decoder = model.get_layer('Dec_hidden_'+ str(i))
                dec_outs, decoder_h, decoder_c = decoder(dec_outs, initial_state=curr_states_inputs)
                dec_states += [decoder_h, decoder_c]
                inps_dec_states += curr_states_inputs


        den_dec = model.get_layer('dense')
        dec_outs = den_dec(dec_outs)
        dec_mod = Model([dec_inps] + inps_dec_states, [dec_outs] + dec_states)

        return enc_mod,dec_mod


    def search_beam(self,data, k):

        seqs = [[list(), 0.0]]
        for r in data:
            a = list()
            for i in range(len(seqs)):
                seq, score = seqs[i]
                for j in range(len(r)):
                    c = [seq + [j], score - log(r[j])]
                    a.append(c)
        ord = sorted(a, key=lambda tup:tup[1])
        seqs = ord[:k]
        return seqs

    def dec_seq(self,enc_mod,dec_mod,input_sequence):
      
        val_states = [enc_mod.predict(input_sequence)] * self.number_of_decoders
        tar_sequence = np.zeros((1, 1))
        tar_sequence[0, 0] = tok_ind_tar['B']
 
        flag = False
        decoded_sentence = ""

        while not flag:

            if self.Type == 'RNN' or self.Type == 'GRU':

                d = dec_mod.predict([tar_sequence] + [val_states])
                output_tokens, val_states = d[0],d[1:]

            elif self.Type == 'LSTM':  

                d = dec_mod.predict([tar_sequence] + val_states)
                output_tokens, val_states = d[0],d[1:]

            if self.prediction == 'greedy':
                width_beam = 1
            elif self.prediction == 'beam_search':
                width_beam = self.beam_width

            sampled_token_index = self.search_beam(output_tokens[0,:,:], width_beam)
            sampled_token_index = sampled_token_index[width_beam-1][0]

            sampled_token_index = np.argmax(output_tokens[0, -1, :])
            sampled_char = rtci[sampled_token_index]
            decoded_sentence += sampled_char

            if sampled_char == 'E' or len(decoded_sentence) > max_decoder_seq_length:
                flag = True

            tar_sequence = np.zeros((1, 1))
            tar_sequence[0, 0] = sampled_token_index

        return decoded_sentence


In [4]:
# Dataset
!yes | wget "https://storage.googleapis.com/gresearch/dakshina/dakshina_dataset_v1.0.tar"

--2022-05-08 03:25:55--  https://storage.googleapis.com/gresearch/dakshina/dakshina_dataset_v1.0.tar
Resolving storage.googleapis.com (storage.googleapis.com)... 142.250.148.128, 209.85.234.128, 172.217.212.128, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|142.250.148.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2008340480 (1.9G) [application/x-tar]
Saving to: ‘dakshina_dataset_v1.0.tar’


2022-05-08 03:26:06 (173 MB/s) - ‘dakshina_dataset_v1.0.tar’ saved [2008340480/2008340480]



In [5]:
# Unzip
!yes | tar xopf dakshina_dataset_v1.0.tar

In [6]:
# Train, Test and Validation Data
!ls dakshina_dataset_v1.0/hi/lexicons

hi.translit.sampled.dev.tsv   hi.translit.sampled.train.tsv
hi.translit.sampled.test.tsv


In [7]:
train_dir = "./dakshina_dataset_v1.0/hi/lexicons/hi.translit.sampled.train.tsv"
test_dir = "./dakshina_dataset_v1.0/hi/lexicons/hi.translit.sampled.test.tsv"

In [27]:
# reads text document and returns a list of lists comprising the romanized and native versions of the words
def read(f):
    hindi = []
    latin = []
    with io.open(f, encoding ='utf-8') as f:
        for line in f:
            if '\t' not in line:
                continue
            tokens = line.rstrip().split("\t")
            latin.append(tokens[1])
            hindi.append(tokens[0])
    return latin, hindi

In [33]:
train_source, train_target = read(train_dir)
test_source, test_target = read(test_dir)

In [34]:
arr = np.arange(len(train_source))
np.random.shuffle(arr)
arr1 = np.arange(len(test_source))
np.random.shuffle(arr1)

In [35]:
source_characters = set()
target_characters = set()

In [36]:
ns_txt_inp = []
ns_txt_tar = []
test_input_texts_ns = []
test_target_texts_ns = []

In [37]:
for (input_text, txt_tar) in zip(train_source, train_target):
    txt_tar = "B" + txt_tar + "E"
    ns_txt_inp.append(input_text)
    ns_txt_tar.append(txt_tar)
    for char in input_text:
        if char not in source_characters:
            source_characters.add(char)
    for char in txt_tar:
        if char not in target_characters:
            target_characters.add(char)

In [38]:
for (input_text, txt_tar) in zip(test_source, test_target):
    txt_tar = "B" + txt_tar + "E"
    test_input_texts_ns.append(input_text)
    test_target_texts_ns.append(txt_tar)
    for char in input_text:
        if char not in source_characters:
            source_characters.add(char)
    for char in txt_tar:
        if char not in target_characters:
            target_characters.add(char)

In [39]:
input_texts = []
target_texts = []

In [40]:
test_input_texts = []
txt_tar_tst = []

In [41]:
for i in range(len(train_source)):
    input_texts.append(ns_txt_inp[arr[i]])
    target_texts.append(ns_txt_tar[arr[i]])

In [42]:
for i in range(len(test_source)):
    test_input_texts.append(test_input_texts_ns[arr1[i]])
    txt_tar_tst.append(test_target_texts_ns[arr1[i]])

In [43]:
source_characters.add(" ")
target_characters.add(" ")

In [44]:
source_characters = sorted(list(source_characters))
target_characters = sorted(list(target_characters))

In [45]:
enc_tok_num = len(source_characters)
dec_tok_num = len(target_characters)
max_encoder_seq_length = max([len(txt) for txt in input_texts])
max_decoder_seq_length = max([len(txt) for txt in target_texts])
test_max_encoder_seq_length = max([len(txt) for txt in test_input_texts])
test_max_decoder_seq_length = max([len(txt) for txt in txt_tar_tst])

In [46]:
tok_ind_src = dict([(char, i) for i, char in enumerate(source_characters)])
tok_ind_tar = dict([(char, i) for i, char in enumerate(target_characters)])
reverse_source_char_index = dict((i, char) for char, i in tok_ind_src.items())
rtci = dict((i, char) for char, i in tok_ind_tar.items())

In [47]:
enc_inp = np.zeros((len(input_texts), max_encoder_seq_length), dtype="float32")
dec_inp = np.zeros((len(input_texts), max_decoder_seq_length), dtype="float32")
dec_tar = np.zeros((len(input_texts), max_decoder_seq_length, dec_tok_num), dtype="float32")

In [49]:
for i, (input_text, txt_tar) in enumerate(zip(input_texts, target_texts)):
    for t, char in enumerate(input_text):
        enc_inp[i, t] = tok_ind_src[char]
    enc_inp[i, t + 1 :] = tok_ind_src[" "]

    for t, char in enumerate(txt_tar):
        dec_inp[i, t] = tok_ind_tar[char]
        if t > 0:
            dec_tar[i, t - 1, tok_ind_tar[char]] = 1.0
    dec_inp[i, t + 1: ] = tok_ind_tar[" "]
    dec_tar[i, t:, tok_ind_tar[" "]] = 1.0

In [50]:
inp_tst_enc_data = np.zeros((len(input_texts), test_max_encoder_seq_length), dtype="float32")
dec_inp_tst_data = np.zeros((len(input_texts), test_max_decoder_seq_length), dtype="float32")
dec_tar_tst_data = np.zeros((len(input_texts), test_max_decoder_seq_length, dec_tok_num), dtype="float32")

In [51]:
for i, (input_text, txt_tar) in enumerate(zip(test_input_texts, txt_tar_tst)):
    for t, char in enumerate(input_text):
        inp_tst_enc_data[i, t] = tok_ind_src[char]
    inp_tst_enc_data[i, t + 1 :] = tok_ind_src[" "]

    for t, char in enumerate(txt_tar):
        dec_inp_tst_data[i, t] = tok_ind_tar[char]
        if t > 0:
            dec_tar_tst_data[i, t - 1, tok_ind_tar[char]] = 1.0
    dec_inp_tst_data[i, t + 1: ] = tok_ind_tar[" "]
    dec_tar_tst_data[i, t:, tok_ind_tar[" "]] = 1.0

In [52]:
x_test = inp_tst_enc_data
y_test = txt_tar_tst

In [53]:
best_batch_size = 64
best_beam_width = 3
best_type = 'GRU'
best_dec_search = 'greedy'
best_dropout = 0.3
best_epochs = 15
best_hidden_size = 128
best_in_emb = 256
best_lr = 0.001
best_num_dec = 1
best_num_enc = 2

In [None]:
model_rnn = seq2seq(Type = best_type, input_embedding = best_in_emb, hid_layer_size=best_hidden_size,
                lr= best_lr, drop_prob=best_dropout,prediction = best_dec_search, number_of_epochs = best_epochs,
                batch_size = best_batch_size, beam_width = best_beam_width, number_of_encoders = best_num_enc, number_of_decoders = best_num_dec)
  
model_rnn.fit(enc_inp, dec_inp,dec_tar,x_test, y_test)

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 Enc_inputs (InputLayer)        [(None, None)]       0           []                               
                                                                                                  
 Enc_emb (Embedding)            (None, None, 256)    6912        ['Enc_inputs[0][0]']             
                                                                                                  
 Dec_inputs (InputLayer)        [(None, None)]       0           []                               
                                                                                                  
 Enc_hidden_1 (GRU)             [(None, None, 128),  148224      ['Enc_emb[0][0]']                
                                 (None, 128)]                                                 

  super(Adam, self).__init__(name, **kwargs)


Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15




INFO:tensorflow:Assets written to: s2s\assets


INFO:tensorflow:Assets written to: s2s\assets


0.3658374055975122
