In [None]:
!curl --header "Host: storage.googleapis.com" --header "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.128 Safari/537.36 Edg/89.0.774.77" --header "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9" --header "Accept-Language: en-US,en;q=0.9" --header "Referer: https://github.com/google-research-datasets/dakshina" "https://storage.googleapis.com/gresearch/dakshina/dakshina_dataset_v1.0.tar" -L -o "dakshina_dataset_v1.0.tar"

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 1915M  100 1915M    0     0  72.3M      0  0:00:26  0:00:26 --:--:-- 75.9M


In [None]:
import shutil
shutil.unpack_archive("/content/dakshina_dataset_v1.0.tar",'/content/')

In [None]:
import re
from tqdm import tqdm
import pandas as pd
import numpy as np
import tensorflow as tf
from keras.models import Sequential,Model,load_model
from keras.layers import Dense,LSTM,GRU,SimpleRNN,Input,Dropout,TimeDistributed,RepeatVector,dot,BatchNormalization,concatenate,multiply,Activation
from keras.layers.embeddings import Embedding
from keras.preprocessing import sequence

In [None]:
class seq2seq:
    def __init__(self,cell,embedding_size,latent_dim,encoder_layers_size,decoder_layers_size,dropouts,epochs,batch_size):
        self.cell = cell
        self.embedding_size = embedding_size
        self.latent_dim = latent_dim
        self.encoder_layers_size = encoder_layers_size
        self.decoder_layers_size = decoder_layers_size
        self.dropouts=dropouts
        self.epochs=epochs
        self.batch_size=batch_size

    def get_data(self,path):
        d = pd.read_csv(path,sep="\t",header=None,error_bad_lines=False)
        d = d.dropna()

        decoder_target_data = np.zeros((d.shape[0],self.max_length_y,self.decoder_tokens), dtype="float32")

        for i,target_text in enumerate(d[0]):
            target_text = '\t'+target_text+'\n'
            for t, char in enumerate(target_text):
                if t > 0:
                    decoder_target_data[i, t - 1, self.target_token_index[char]] = 1.0
            decoder_target_data[i, t:, self.target_token_index["\n"]] = 1.0

        return ([[self.input_token_index[letter] for letter in list('\t'+word+'\n')] for word in d[1]]),\
                ([[self.target_token_index[letter] for letter in list('\t'+word+'\n')] for word in d[0]]),decoder_target_data

    def create_vocab(self,path):
        d = pd.read_csv(path,sep="\t",header=None,error_bad_lines=False)
        d = d.dropna()

        x = [list('\t'+word+'\n') for word in np.array(d[1])]
        y = [list('\t'+word+'\n') for word in np.array(d[0])]

        telugu_vocab = set()
        english_vocab = set()

        for word in x:
            for char in word:
                english_vocab.add(char)

        for word in y:
            for char in word:
                telugu_vocab.add(char)

        telugu_list = sorted(list(telugu_vocab))
        english_list = sorted(list(english_vocab))

        max_length_x = (np.max([len(i) for i in x]))
        max_length_y = (np.max([len(i) for i in y]))

        return telugu_list,english_list,max_length_x,max_length_y    

    def create_data(self):
        train_path = "/content/dakshina_dataset_v1.0/te/lexicons/te.translit.sampled.train.tsv"
        cv_path = "/content/dakshina_dataset_v1.0/te/lexicons/te.translit.sampled.dev.tsv"
        test_path = "/content/dakshina_dataset_v1.0/te/lexicons/te.translit.sampled.test.tsv"

        telugu_list,english_list,self.max_length_x,self.max_length_y = self.create_vocab(train_path)
        self.encoder_tokens = len(english_list)
        self.decoder_tokens = len(telugu_list)

        # Dict for char to index
        self.input_token_index = dict([(char, i) for i, char in enumerate(english_list)])
        self.target_token_index = dict([(char, i) for i, char in enumerate(telugu_list)])

        # Dict for index to char
        self.inv_input_token_index = dict({(value,key) for key,value in self.input_token_index.items()})
        self.inv_target_token_index = dict({(value,key) for key,value in self.target_token_index.items()})

        encoder_train,decoder_train,self.decoder_target_train = self.get_data(train_path)
        encoder_cv,decoder_cv,self.decoder_target_cv = self.get_data(cv_path)
        encoder_test,decoder_test,self.decoder_target_test = self.get_data(test_path)


        self.encoder_train = sequence.pad_sequences(encoder_train,maxlen=self.max_length_x,padding="post")
        self.decoder_train = sequence.pad_sequences(decoder_train,maxlen=self.max_length_y,padding="post")
        self.encoder_cv = sequence.pad_sequences(encoder_cv,maxlen=self.max_length_x,padding="post")
        self.decoder_cv = sequence.pad_sequences(decoder_cv,maxlen=self.max_length_y,padding="post")
        self.encoder_test = sequence.pad_sequences(encoder_test,maxlen=self.max_length_x,padding="post")
        self.decoder_test = sequence.pad_sequences(decoder_test,maxlen=self.max_length_y,padding="post")

    def create_model(self):
        encoder_inputs = Input(shape=(None,))
        x = Embedding(self.encoder_tokens,self.embedding_size,input_length=self.max_length_x)(encoder_inputs)
        for _ in range(self.encoder_layers_size):
            if self.cell == "lstm":
                x,state_h,state_c = LSTM(self.latent_dim,return_state=True,return_sequences=True,dropout=self.dropouts)(x)
                encoder_states = [state_h,state_c]
            elif self.cell == "rnn":
                x,state_c = SimpleRNN(self.latent_dim,return_state=True,return_sequences=True,dropout=self.dropouts)(x)
                encoder_states = [state_c]
            elif self.cell == "gru":
                x,state_c = GRU(self.latent_dim,return_state=True,return_sequences=True,dropout=self.dropouts)(x)
                encoder_states = [state_c]
                        

        decoder_inputs = Input(shape=(None,))
        decoder_embb = Embedding(self.decoder_tokens,self.embedding_size,input_length=self.max_length_y)(decoder_inputs)
        if self.cell == "lstm":
            decoder_lstm,state_h,state_c = LSTM(self.latent_dim, return_sequences=True,return_state=True,dropout=self.dropouts)(decoder_embb,initial_state=encoder_states)
        elif self.cell == "rnn":
            decoder_lstm,state_h = SimpleRNN(self.latent_dim, return_sequences=True,return_state=True,dropout=self.dropouts)(decoder_embb,initial_state=encoder_states)
        elif self.cell == "gru":
            decoder_lstm,state_h = GRU(self.latent_dim, return_sequences=True,return_state=True,dropout=self.dropouts)(decoder_embb,initial_state=encoder_states)

        for i in range(self.decoder_layers_size-1):
            if self.cell == "lstm":
                decoder_lstm,state_h,state_c = LSTM(self.latent_dim, return_sequences=True,return_state=True,dropout=self.dropouts)(decoder_lstm)
            elif self.cell == "rnn":
                decoder_lstm,state_h = SimpleRNN(self.latent_dim, return_sequences=True,return_state=True,dropout=self.dropouts)(decoder_lstm)
            elif self.cell == "gru":
                decoder_lstm,state_h = GRU(self.latent_dim, return_sequences=True,return_state=True,dropout=self.dropouts)(decoder_lstm)
        

        decoder_outputs = TimeDistributed(Dense(self.decoder_tokens, activation='softmax'))(decoder_lstm)

        model = Model([encoder_inputs, decoder_inputs], decoder_outputs)

        return model

    def percentage_of_correct_test_predictions(self,model):
        count = 0
        for i in tqdm(range(self.encoder_test.shape[0])):
            pred = model([np.expand_dims(self.encoder_test[i],0),np.expand_dims(self.decoder_test[i],0)])
            actual_out = re.sub('\t|\n','',''.join([self.inv_target_token_index[self.decoder_test[i][j]] for j in range(self.max_length_y)]))
            predicted_out = re.sub('\t|\n','',''.join(self.inv_target_token_index[np.argmax(pred[0][j])] for j in range(self.max_length_y)))
            if (actual_out==predicted_out):
                count+=1
        return count/self.encoder_test.shape[0]

    def run(self,model):
        # Compile & run training
        model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=["accuracy"])

        model.fit([self.encoder_train,self.decoder_train], self.decoder_target_train,
                self.batch_size,
                self.epochs,validation_data=([self.encoder_cv,self.decoder_cv], self.decoder_target_cv))
        


In [None]:
s2s=seq2seq("rnn",12,512,2,2,0.2,3,32)
s2s.create_data()
model=s2s.create_model()
s2s.run(model)

Epoch 1/3
Epoch 2/3
Epoch 3/3


In [None]:
model.summary()

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_3 (InputLayer)            [(None, None)]       0                                            
__________________________________________________________________________________________________
embedding_2 (Embedding)         (None, None, 12)     336         input_3[0][0]                    
__________________________________________________________________________________________________
input_4 (InputLayer)            [(None, None)]       0                                            
__________________________________________________________________________________________________
simple_rnn_4 (SimpleRNN)        [(None, None, 512),  268800      embedding_2[0][0]                
____________________________________________________________________________________________