In [None]:
# Useful Imports

import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import pandas as pd
import os
import cv2
import pathlib
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.optimizers import Adam, SGD, RMSprop, Nadam
from tensorflow.keras.metrics import categorical_crossentropy
from tensorflow.keras.callbacks import EarlyStopping
from keras.utils.vis_utils import plot_model
import csv
from IPython.display import HTML as html_print
from IPython.display import display

In [None]:
if os.path.exists('best_model.h5'):
    os.remove('best_model.h5')

### Downloading the Dakshina dataset

In [None]:
#Downloading
!wget https://storage.googleapis.com/gresearch/dakshina/dakshina_dataset_v1.0.tar
    
#Uncompressing
!tar -xf dakshina_dataset_v1.0.tar

### Pre processing data

In [None]:
def read(data_path, characters = False):
    
    # Returns the (x, y) pair from the dataset
    # If characters == True, the input/output sample would be in the form list of characters, else as string

    with open(data_path, "r", encoding="utf-8") as f:
        lines = [line.split("\t") for line in f.read().split("\n") if line != '']
    
    x, y = [val[1] for val in lines], [val[0] for val in lines]
    '''if characters:
        input, target = [list(inp_str) for inp_str in input], [list(tar_str) for tar_str in target]'''
    return x, y

In [None]:
START_CHAR = '\t'
END_CHAR = '\n'
BLANK_CHAR = ' '
def encode_decode_characters(train_input, train_target, val_input, val_target):
    
    # Returns the encoding for characters to integer (as a dictionary) and decoding for integers to characters (as a list) for input and target data
    # Encoding and decoding of input vocabulary
    
    input_char_enc = {}
    input_char_dec = []
    max_encoder_seq_length = 1
    for string in train_input + val_input:
        max_encoder_seq_length = max(max_encoder_seq_length, len(string))
        for char in string:
            if char not in input_char_enc:
                input_char_enc[char] = len(input_char_dec)
                input_char_dec.append(char)
    if BLANK_CHAR not in input_char_enc:
        input_char_enc[BLANK_CHAR] = len(input_char_dec)
        input_char_dec.append(BLANK_CHAR)
        
    # Encoding and decoding of target vocabulary
    target_char_enc = {}
    target_char_dec = []
    target_char_enc[START_CHAR] = len(target_char_dec)
    target_char_dec.append(START_CHAR)
    max_decoder_seq_length = 1
    for string in train_target + val_target:
        max_decoder_seq_length = max(max_decoder_seq_length, len(string)+2)
        for char in string:
            if char not in target_char_enc:
                target_char_enc[char] = len(target_char_dec)
                target_char_dec.append(char)
    target_char_enc[END_CHAR] = len(target_char_dec)
    target_char_dec.append(END_CHAR)
    if ' ' not in target_char_enc:
        target_char_enc[BLANK_CHAR] = len(target_char_dec)
        target_char_dec.append(BLANK_CHAR)

    print("Number of training samples:", len(train_input))
    print("Number of validation samples:", len(val_input))
    print("Number of unique input tokens:", len(input_char_dec))
    print("Number of unique output tokens:", len(target_char_dec))
    print("Max sequence length for inputs:", max_encoder_seq_length)
    print("Max sequence length for outputs:", max_decoder_seq_length)

    return input_char_enc, input_char_dec, target_char_enc, target_char_dec, max_encoder_seq_length, max_decoder_seq_length

In [None]:
def process_data(input, enc_timesteps, input_char_enc, target = None, dec_timesteps = None, target_char_enc = None):
    
    # Returns the input and target data in a form needed by the Keras embedding layer (i.e) 
    # decoder_input & encoder_input -- (None, timesteps) where each character is encoded by an integer
    # decoder_output -- (None, timesteps, vocabulary size) where the last dimension is the one-hot encoding
    # BLANK_CHAR -- space (equivalent to no meaningful input / blank input)
    
    encoder_input = np.array([[input_char_enc[ch] for ch in string] + [input_char_enc[BLANK_CHAR]] * (enc_timesteps - len(string)) for string in input])

    decoder_input, decoder_target = None, None
    if target is not None and dec_timesteps is not None and target_char_enc is not None:
        
        # START_CHAR -- start of sequence, END_CHAR -- end of sequence
        decoder_input = np.array([[target_char_enc[START_CHAR]] + [target_char_enc[ch] for ch in string] + [target_char_enc[END_CHAR]] 
                                    + [target_char_enc[BLANK_CHAR]] * (dec_timesteps - len(string) - 2) for string in target])
        decoder_target = np.zeros((decoder_input.shape[0], dec_timesteps, len(target_char_enc)), dtype='float32')

        for i in range(decoder_input.shape[0]):
            for t, char_ind in enumerate(decoder_input[i]):
                if t > 0:
                    decoder_target[i,t-1,char_ind] = 1.0
            decoder_target[i,t:,target_char_enc[BLANK_CHAR]] = 1.0

    return encoder_input, decoder_input, decoder_target

In [None]:
train_x, train_y = read('./dakshina_dataset_v1.0/bn/lexicons/bn.translit.sampled.train.tsv')
val_x, val_y = read('./dakshina_dataset_v1.0/bn/lexicons/bn.translit.sampled.dev.tsv')
test_x, test_y = read('./dakshina_dataset_v1.0/bn/lexicons/bn.translit.sampled.test.tsv')

# Assigning encoding and decoding for input and target characters
input_char_enc, input_char_dec, target_char_enc, target_char_dec, max_encoder_seq_length, max_decoder_seq_length = encode_decode_characters(
    train_x, train_y, val_x, val_y)

# Assigning training, validation and test encoder input, decoder input, decoder output
train_enc_x, train_dec_x, train_dec_y = process_data(train_x, max_encoder_seq_length, input_char_enc, train_y, 
                                                                  max_decoder_seq_length, target_char_enc)
val_enc_x, val_dec_x, val_dec_y = process_data(val_x, max_encoder_seq_length, input_char_enc, val_y, 
                                                            max_decoder_seq_length, target_char_enc)
test_enc_x, test_dec_x, test_dec_y = process_data(test_x, max_encoder_seq_length, input_char_enc, test_y, 
                                                               max_decoder_seq_length, target_char_enc)

## Seq2Seq Model (without Attention)

### Building the model

In [None]:
def build_model(encoder_vocab_size, decoder_vocab_size, inp_emb_size=64, n_enc_layers=1, n_dec_layers=1, 
                 h_layer_size=64, cell_type='GRU', dropout=0, r_dropout=0, cell_activation='tanh'):
   
    '''
    Function to create a seq2seq model without attention.
    Input :
        encoder_vocab_size -- number of characters in input vocabulary (int)
        decoder_vocab_size -- number of characters in output vocabulary (int)
        inp_emb_size -- size of input embedding layer for encoder and decoder (int, default value : 64)
        n_enc_layers -- number of layers of cell to stack in encoder (int, default value : 1)
        n_dec_layers -- number of layers of cell to stack in decoder (int, default value : 1)
        h_layer_size -- size of hidden layer of the encoder and decoder cells (int, default : 64)
        cell_type -- type of cell used in encoder and decoder (string('LSTM'/ 'GRU'/ 'RNN'), default : 'LSTM')
        dropout -- value of normal dropout (float(between 0 and 1), default : 0.0)
        r_dropout -- value of recurrent dropout (float(between 0 and 1), default : 0.0)
        cell_activation -- type of activation used in the cell (string, default : 'tanh')
    Output :
        model -- (Keras model object)
    '''
    
    # Dictionary of different cell type
    cell_dict = {
        'RNN': keras.layers.SimpleRNN,
        'GRU': keras.layers.GRU,
        'LSTM': keras.layers.LSTM
    }
    
    # Encoder input and embedding
    encoder_input = keras.layers.Input(shape=(None,), name="input_1")
    encoder_inp_emb = keras.layers.Embedding(encoder_vocab_size, inp_emb_size, name="embedding_1")(encoder_input)
    
    # Encoder cell layers
    encoder_seq, *encoder_states = cell_dict[cell_type](h_layer_size, activation=cell_activation, return_sequences=True, return_state=True, 
                                                      dropout=dropout, recurrent_dropout=r_dropout, name="encoder_1")(encoder_inp_emb)

    for i in range(1, n_enc_layers):
        encoder_seq, *encoder_states = cell_dict[cell_type](h_layer_size, activation=cell_activation, return_sequences=True, return_state=True, 
                                                          dropout=dropout, recurrent_dropout=r_dropout, name="encoder_"+str(i+1))(encoder_seq)
    
    
    # Decoder input and embedding
    decoder_input = keras.layers.Input(shape=(None,), name="input_2")
    decoder_inp_emb = keras.layers.Embedding(decoder_vocab_size, inp_emb_size, name="embedding_2")(decoder_input)

    # Decoder cell layers
    decoder_seq, *_ = cell_dict[cell_type](h_layer_size, activation=cell_activation, return_sequences=True, return_state=True, 
                                          dropout=dropout, recurrent_dropout=r_dropout, name="decoder_1")(
                                                decoder_inp_emb, initial_state=encoder_states
                                         )
    for i in range(1, n_dec_layers):
        decoder_seq, *_ = cell_dict[cell_type](h_layer_size, activation=cell_activation, return_sequences=True, return_state=True, 
                                              dropout=dropout, recurrent_dropout=r_dropout, name="decoder_"+str(i+1))(
                                                    decoder_seq, initial_state=encoder_states
                                             )
    
    # Softmax Fully Connected dense layer
    decoder_dense_output = keras.layers.Dense(decoder_vocab_size, activation="softmax", name="dense_1")(
        decoder_seq
    )

    # Finally the full encoder-decoder model
    model = keras.Model([encoder_input, decoder_input], decoder_dense_output)

    #model.summary(line_length=200)
    return model

### Inference Model (without attention)

In [None]:
def create_inference_model(model):
    '''
    Function to return models needed for inference from the original model (without attention).
    Inputs :
        model -- non-attention model used for training
    Outputs :
        encoder_model 
        deocder_model
        no_enc_layers -- number of layers in the encoder(int)
        no_dec_layers -- number of layers in the decoder(int)
    '''
    # Calculating number of layers in encoder and decoder
    n_enc_layers, n_dec_layers = 0, 0
    for layer in model.layers:
        n_enc_layers += layer.name.startswith('encoder')
        n_dec_layers += layer.name.startswith('decoder')

    # Encoder input
    encoder_input = model.input[0]      # Input_1
    # Encoder cell final layer
    encoder_cell = model.get_layer("encoder_"+str(n_enc_layers))
    encoder_type = encoder_cell.__class__.__name__
    encoder_seq, *encoder_state = encoder_cell.output
    # Encoder model
    encoder_model = keras.Model(encoder_input, encoder_state)

    # Decoder input
    decoder_input = model.input[1]      # Input_2
    decoder_inp_emb = model.get_layer("embedding_2")(decoder_input)
    decoder_seq = decoder_inp_emb
    # Inputs to decoder layers' initial states
    decoder_states, decoder_state_inputs = [], []
    for i in range(1, n_dec_layers+1):
        if encoder_type == 'LSTM':
            decoder_state_input = [keras.Input(shape=(encoder_state[0].shape[1],), name="input_"+str(2*i+1)), 
                                   keras.Input(shape=(encoder_state[1].shape[1],), name="input_"+str(2*i+2))]
        else:
            decoder_state_input = [keras.Input(shape=(encoder_state[0].shape[1],), name="input_"+str(i+2))]

        decoder_cell = model.get_layer("decoder_"+str(i))
        decoder_seq, *decoder_state = decoder_cell(decoder_seq, initial_state=decoder_state_input)
        decoder_states += decoder_state
        decoder_state_inputs += decoder_state_input

    # Softmax FC layer
    decoder_dense = model.get_layer("dense_1")
    decoder_dense_output = decoder_dense(decoder_seq)

    # Decoder model
    decoder_model = keras.Model(
        [decoder_input] + decoder_state_inputs, [decoder_dense_output] + decoder_states
    )

    return encoder_model, decoder_model, n_enc_layers, n_dec_layers


def convert_to_word(predictions, char_enc, char_dec = None):
    
    '''
    Function to return the predictions after cutting the END_CHAR and BLANK_CHAR s at the end.
    If char_dec == None, the predictions are in the form of decoded string, otherwise as list of integers
    '''
    
    no_samples = len(predictions) if type(predictions) is list else predictions.shape[0]
    pred_words = ['' for _ in range(no_samples)]
    for i, pred_list in enumerate(predictions):
        for l in pred_list:
            # Stop word : END_CHAR
            if l == char_enc[END_CHAR]:
                break
            pred_words[i] += char_dec[l] if char_dec is not None else l
    
    return pred_words

### Beam decoder

In [None]:
def beam_decoder_infer(model, input_seqs, max_decoder_timesteps, K=1, target_seqs=None, starting_char_enc=0, batch_size=64, attention=False):
    
    '''
    Function to do inference on the model using beam decoder.
    Inputs :
        model -- training model
        input_seqs -- input to encoder(numpy array, shape : (None, timesteps))
        max_decoder_timesteps -- Number of timesteps to infer in decoder(int)
        K -- beam width of beam decoder(int, default : 1)
        target_seqs -- expected target(numpy array, shape : (None, timesteps, decoder_vocab_size), deault : None)
                       If None, cross entropy errors won't be calculated.
        starting_char_enc -- Encoding integer for START_CHAR(int, default : 0)
        batch_size -- batch_size sent to Keras predict(int, default : 64)
        attention -- whether the model has attention or not(bool, defualt : False)
        
    Outputs :
        final_outputs -- top K output sequences(numpy array, shape : (None, K, timesteps))
        final_errors -- cross entropy errors for top K output(numpy array, shape : (None, K))
        states_values -- hidden states of decoder(numpy array, shape : (K, None, timesteps, hid_layer_size))
        final_attn_scores -- attention to all encoder timesteps for a decoder timestep(numpy array, shape : (None, K, decoder_timesteps, encoder_timesteps))
    '''
    
    # Generating output from encoder
    encoder_model, decoder_model, no_enc_layers, no_dec_layers = create_attention_inference_model(model) if attention else create_inference_model(model)
    encoder_output = encoder_model.predict(input_seqs, batch_size=batch_size)
    encoder_out = encoder_output if type(encoder_output) is list else [encoder_output]

    # Number of input samples in the data passed
    no_samples = input_seqs.shape[0]

    # Top K output sequences for each input 
    final_outputs = np.zeros((no_samples, K, max_decoder_timesteps), dtype=np.int32)
    
    # Errors for top K output sequences for each input
    final_errors = np.zeros((no_samples, K))
    
    # Attention scores for top K output sequences for each input
    final_attn_scores = np.zeros((no_samples, K, max_decoder_timesteps, input_seqs.shape[1]))

    # decoder input sequence for 1 timestep (for all samples). Initially one choice only there
    decoder_k_inputs = np.zeros((no_samples, 1, 1))
    
    # Populate the input sequence with the start character at the 1st timestep
    decoder_k_inputs[:, :, 0] = starting_char_enc

    # (log(probability) sequence, decoder output sequence) pairs for all choices and all samples. Probability starts with log(1) = 0
    decoder_k_out = [[(0, [])] for _ in range(no_samples)]
    
    # Categorical cross entropy error in the sequence for all choice and all samples
    errors = [[0] for _ in range(no_samples)]
    
    # Output states from decoder for all choices, and all samples
    states_values  = [encoder_out * no_dec_layers]

    # Attention weights output
    attn_k_scores = [[None] for _ in range(no_samples)]

    # Sampling loop
    for it in range(max_decoder_timesteps):
        # Storing respective data for all possibilities
        All_k_beams = [[] for _ in range(no_samples)]
        All_decoder_states = [[] for _ in range(no_samples)]
        All_errors = [[] for _ in range(no_samples)]
        All_attn_scores = [[] for _ in range(no_samples)]

        for k in range(len(decoder_k_out[0])):
            if attention:
                attn_score, decoder_output, *decoder_states = decoder_model.predict([input_seqs, decoder_k_inputs[:,k]] + states_values[k], batch_size=batch_size)
            else:
                decoder_output, *decoder_states = decoder_model.predict([decoder_k_inputs[:,k]] + states_values[k], batch_size=batch_size)

            # Top K scores
            top_k = np.argsort(decoder_output[:, -1, :], axis=-1)[:, -K:]
            for b in range(no_samples):
                All_k_beams[b] += [(
                    decoder_k_out[b][k][0] + np.log(decoder_output[b, -1, top_k[b][i]]),
                    decoder_k_out[b][k][1] + [top_k[b][i]]
                ) for i in range(K)]

                if attention:
                    All_attn_scores[b] += [attn_score[b]] * K if attn_k_scores[b][k] is None \
                                          else [np.concatenate((attn_k_scores[b][k], attn_score[b]), axis=0)] * K
            
                if target_seqs is not None:
                    All_errors[b] += [errors[b][k] - np.log(decoder_output[b, -1, target_seqs[b, it]])] * K
                
                All_decoder_states[b] += [[state[b:b+1] for state in decoder_states]] * K
        
        # Sort and choose top K with max probabilities
        sorted_ind = list(range(len(All_k_beams[0])))
        sorted_ind = [sorted(sorted_ind, key = lambda ix: All_k_beams[b][ix][0])[-K:][::-1] for b in range(no_samples)]
        
        # Choose the top K decoder output sequences till now
        decoder_k_out = [[All_k_beams[b][ind] for ind in sorted_ind[b]] for b in range(no_samples)]

        # Update the input sequence for next 1 timestep
        decoder_k_inputs = np.array([[All_k_beams[b][ind][1][-1] for ind in sorted_ind[b]] for b in range(no_samples)])

        # Update states
        states_values = [All_decoder_states[0][ind] for ind in sorted_ind[0]]
        for b in range(1, no_samples):
            states_values = [[np.concatenate((states_values[i][j], All_decoder_states[b][ind][j])) 
                              for j in range(len(All_decoder_states[b][ind]))] for i,ind in enumerate(sorted_ind[b])]

        # Update attention scores
        if attention:
            attn_k_scores = [[All_attn_scores[b][ind] for ind in sorted_ind[b]] for b in range(no_samples)]

        # Update errors
        if target_seqs is not None:
            errors = [[All_errors[b][ind] for ind in sorted_ind[b]] for b in range(no_samples)]

    final_outputs = np.array([[decoder_k_out[b][i][1] for i in range(K)] for b in range(no_samples)])
    if target_seqs is not None:
        final_errors = np.array(errors) / max_decoder_timesteps
    if attention:
        final_attn_scores = np.array(attn_k_scores)

    return final_outputs, final_errors, np.array(states_values), final_attn_scores


def calc_metrics(k_outputs, target_seqs, char_enc, char_dec, k_errors=None, exact_word=True):
    
    '''
    Calculates the accuracy (and mean error if info provided) for the best of K possible output sequences
    target_seqs -- Expected output (encoded sequence)
    k_outputs -- k choices of output sequences for each sample
    '''

    matches = np.mean(k_outputs == np.repeat(target_seqs.reshape((target_seqs.shape[0], 1, target_seqs.shape[1])), k_outputs.shape[1], axis=1), axis=-1)
    best_k = np.argmax(matches, axis=-1)
    best_ind = (tuple(range(best_k.shape[0])), tuple(best_k))
    accuracy = np.mean(matches[best_ind])

    loss = None
    if k_errors is not None:
        loss = np.mean(k_errors[best_ind])
    if exact_word:
        equal = [0] * k_outputs.shape[0]
        true_out = convert_to_word(target_seqs, char_enc, char_dec)
        for k in range(k_outputs.shape[1]):
            pred_out = convert_to_word(k_outputs[:,k], char_enc, char_dec)
            equal = [equal[i] or (pred_out[i] == true_out[i]) for i in range(k_outputs.shape[0])]
        exact_accuracy = np.mean(equal)

        return accuracy, exact_accuracy, loss
    
    return accuracy, loss


def beam_decoder(model, input_seqs, target_seqs_onehot, max_decoder_timesteps, char_enc, char_dec, K=1, 
                 model_batch_size=64, attention=False, infer_batch_size=512, exact_word=True, return_outputs=False, 
                 return_states=False, return_attn_scores=False):
    '''
    Function to calculate/capture character-wise accuracy, exact-word-match accuracy, and loss for the seq2seq model using a beam decoder.
    Inputs:
        model -- model used for training
        input_seqs -- input to encoder(numpy array, shape : (None, timesteps))
        target_seqs -- expected target in onehot format(numpy array, shape : (None, timesteps, decoder_vocab_size))
        max_decoder_timesteps -- Number of timesteps to infer in decoder(int)
        char_enc -- target character encoding(dict)
        char_dec -- target character decoding(list)
        K -- beam width to be used in beam decoder(int, default : 1)
        model_batch_size -- batch size to be used while evaluating model using Keras(int, default : 64)
        attention -- whether the model has attention or not(bool, defualt : False)
        infer_batch_size -- number of samples to be sent to beam_decoder_infer() at a time(int, default : 512)
        exact_word -- whether or not exact_accuracy has(bool, default : True)
        return_outputs -- whether or not the outputs predicted need to be returned(bool, default : True)
        return_states -- whether or not the decoder hidden states need to be returned(bool, default : True)
        return_attn_scores -- whether or not the attention scores need to be returned(bool, default : True)
    Outputs:
        accuracy -- the character-wise match accuracy(float)
        (If exact_word is True) exact_accuracy -- (float) the exact word match accuracy
        loss -- (float) the cross-entropy loss for the top K predictions
        (If return_outputs is True) k_outputs -- (numpy ndarray of size : (None, K, timesteps)) top K output sequences
        (If return_states is True) k_states -- (numpy ndarray of size : (K, None, timesteps, hid_layer_size))  hidden states of decoder
        (If return_attn_scores is True) k_attn_scores -- (numpy ndarray of size : (None, K, decoder_timesteps, encoder_timesteps)) attention scores
    '''
    
    target_seqs = np.argmax(target_seqs_onehot, axis=-1)
    k_outputs, k_errors, k_states, k_attn_scores = None, None, None, None
    for i in range(0, input_seqs.shape[0], infer_batch_size):
        tmp_k_outputs, tmp_k_errors, tmp_k_states, tmp_k_attn_scores = beam_decoder_infer(model, input_seqs[i:i+infer_batch_size], 
                                                                                          max_decoder_timesteps, K, 
                                                                                          target_seqs[i:i+infer_batch_size], char_enc['\t'], 
                                                                                          model_batch_size, attention)
        if k_errors is None:
            k_outputs, k_errors, k_states, k_attn_scores = tmp_k_outputs, tmp_k_errors, tmp_k_states, tmp_k_attn_scores
        else:
            k_outputs = np.concatenate((k_outputs, tmp_k_outputs))
            k_errors = np.concatenate((k_errors, tmp_k_errors))
            k_states = np.concatenate((k_states, tmp_k_states), axis=2)
            k_attn_scores = np.concatenate((k_attn_scores, tmp_k_attn_scores))

    return_elements = []
    if return_outputs:
        return_elements += [k_outputs]
    if return_states:
        return_elements += [k_states]
    if return_attn_scores:
        return_elements += [k_attn_scores]

    if len(return_elements) > 0:
        return calc_metrics(k_outputs, target_seqs, char_enc, char_dec, k_errors, exact_word) + tuple(return_elements)

    return calc_metrics(k_outputs, target_seqs, char_enc, char_dec, k_errors, exact_word)

### Train function

In [None]:
def train(model, train_input_data, train_target_data, val_input_data, val_target_data, beam_width = 5, attention = False,
                batch_size = 64, optimizer = 'adam', learning_rate = 0.0005, epochs = 15, loss_fn = 'categorical_crossentropy'):
    
    # Function to train the model using the mentioned optimizer, learning rate and epochs using given training and validation data

    if optimizer == 'adam':
        model.compile(optimizer = Adam(learning_rate=learning_rate), loss = loss_fn, metrics = ['accuracy'])
    elif optimizer == 'momentum':
        model.compile(optimizer = SGD(learning_rate=learning_rate, momentum = 0.9), loss = loss_fn, metrics = ['accuracy'])
    elif optimizer == 'rmsprop':
        model.compile(optimizer = RMSprop(learning_rate=learning_rate), loss = loss_fn, metrics = ['accuracy'])
    elif optimizer == 'nesterov':
        model.compile(optimizer = SGD(learning_rate=learning_rate, momentum = 0.9, nesterov = True), loss = loss_fn, metrics = ['accuracy'])
    elif optimizer == 'nadam':
        model.compile(optimizer = Nadam(learning_rate=learning_rate), loss = loss_fn, metrics = ['accuracy'])
    else:
        model.compile(optimizer = SGD(learning_rate=learning_rate), loss = loss_fn, metrics = ['accuracy'])

    # Using validation accuracy as the metric
    model.fit(train_input_data,
              train_target_data,
              batch_size = batch_size,
              epochs = epochs
             )

    return model

### Function to calculate levenshtein distance between two sequences
Code refered from: https://codereview.stackexchange.com/questions/217065/calculate-levenshtein-distance-between-two-strings-in-python

In [None]:
def levenshtein_dist(s1, s2):
    
    # Function to calculate levenshtein distance between two sequences
    """
    The Levenshtein distance is a string metric for measuring the difference
    between two sequences.
    It is calculated as the minimum number of single-character edits necessary to
    transform one string into another.
    """
    
    m, n = len(s1)+1, len(s2)+1
    # Initialisation
    dp = np.zeros((m, n))
    # Base case
    dp[0,1:] = np.arange(1,n)
    dp[1:,0] = np.arange(1,m)

    # Recursion
    for i in range(1,m):
        for j in range(1,n):
            if s1[i-1] == s2[j-1]:
                dp[i,j] = min(dp[i-1,j-1], dp[i-1,j]+1, dp[i,j-1]+1)
            else:
                dp[i,j] = min(dp[i,j-1], dp[i-1,j], dp[i-1,j-1]) + 1
    
    return dp[m-1,n-1]

### Function to Test model

In [None]:
def test(model, test_enc_input, test_dec_target, max_decoder_seq_length, target_char_enc, target_char_dec, test_input=None):
    '''
    Function to evaluate the model metrics on test data and optionally save the predictions.
    Inputs :
        test_enc_input -- input to encoder(numpy array shape : (None, timesteps)) (where characters are encoded as integers)
        test_dec_target -- expected target in onehot format(numpy array shape : (None, timesteps, decoder_vocab_size))
        max_decoder_seq_length -- number of timesteps in the decoder(int)
        target_enc_enc -- target character encoding(dict)
        target_char_dec -- target character decoding(list)
        test_input -- input as words (list of strings)
    Outputs :
        acc -- character-wise match accuracy(float)
        exact_K_acc -- exact word match accuracy using the beam width for the model(float)
        exact_acc -- exact word match accuracy using the first prediction (float)(which is equivalent to beam width = 1)
        loss -- loss value(float)
        true_out -- true output(list of string : (no_samples, word))
        pred_out -- predicted output(2D list of string : (no_samples, K, word))
        pred_scores -- levenshtein distance of prediction to true output(2D list : (no_samples, K))
        model -- the model obtained from the run
    '''
    

    no_samples, K, batch_size = test_enc_input.shape[0], 5, 64
    acc, exact_K_acc, loss, outputs = beam_decoder(model, test_enc_input, test_dec_target, max_decoder_seq_length, target_char_enc, 
                                                                target_char_dec, K, batch_size, False,
                                                                return_outputs=True, return_attn_scores=False)
    
    print(f'Test accuracy (word level using beam width = {K}) : {exact_K_acc*100:.2f}%')

    test_target = np.argmax(test_dec_target, axis=-1)
    true_out = convert_to_word(test_target, target_char_enc, target_char_dec)
    pred_out = [[] for _ in range(no_samples)]
    pred_scores = [[] for _ in range(no_samples)]
    for k in range(K):
        pred = convert_to_word(outputs[:,k], target_char_enc, target_char_dec)
        pred_out = [pred_out[b] + [pred[b]] for b in range(no_samples)]
        pred_scores = [pred_scores[b] + [levenshtein_dist(pred[b], true_out[b])] for b in range(no_samples)]
    
    equal = [pred_out[i][0] == true_out[i] for i in range(no_samples)]
    exact_acc = np.mean(equal)

    print(f'Test accuracy ((word level using first prediction) : {exact_acc*100:.2f}%')
    print('\n')
    
    # Writing top k predictions in CSV file
    pred_file_name = 'predictions_vanilla.csv'
    with open(pred_file_name, 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(["Input"] + ["Prediction_"+str(k+1) for k in range(K)])
        for b in range(no_samples):
            writer.writerow([test_input[b]] + [pred_out[b][k] for k in range(K)])

    return acc, exact_K_acc, exact_acc, loss, true_out, pred_out, pred_scores, model

### Print sample predictions

In [None]:
def get_clr(value, cmap=None):
    
  # Function to get appropriate color for a value between 0 and 1 from the default blue to red hard-coded colors or a matplotlib cmap 
  
  colors = ['#85c2e1', '#89c4e2', '#95cae5', '#99cce6', '#a1d0e8',
    '#b2d9ec', '#baddee', '#c2e1f0', '#eff7fb', '#f9e8e8',
    '#f9e8e8', '#f9d4d4', '#f9bdbd', '#f8a8a8', '#f68f8f',
    '#f47676', '#f45f5f', '#f34343', '#f33b3b', '#f42e2e']
  if cmap is not None:
      rgba = matplotlib.cm.get_cmap(cmap)(value)
      return 'rgb'+str(tuple([int(c*255) for c in rgba[:-1]]))
  value = min(int((value * 100) / 5), 19)
  return colors[value]

def visualize_samples(input, true_out, pred_out, pred_scores, rand_seq=None):
    
    '''
    Function to print sample outputs in a neat format
    Arguments :
        input -- input words
        true_out -- true output as words
        pred_out -- K predicted output words
        pred_scores -- levenshtein distance for the predictions to the true output
        rand_seq -- list of indices from the dataset passed for which the sample outputs are to be printed (If None, random 10 samples will be chosen)
    Returns :
        rand_seq -- the list of indices for which sample outputs are printed
    '''
    
    n_samples = len(true_out)
    if rand_seq is None:
        rand_seq = np.random.randint(n_samples, size=(10,))
    print('-'*20 + f' Top {len(pred_scores[0])} predictions in decreasing order of probabilities for 10 random samples ' + '-'*20)
    print('')
    for i in rand_seq:
        K = len(pred_scores[i])
        html_str = '''
        <table style="border:2px solid black; border-collapse:collapse">
        <caption> <strong>INPUT :</strong> {} &emsp; | &emsp; <strong> TRUE OUTPUT : </strong> {} </caption>
        <tr>
        <th scope="row" style="border:1px solid black;padding:10px;text-align:left"> Top {} Predictions </th>
        '''.format(input[i], true_out[i], K)
        for k in range(K):
            html_str += '''
            <td style="color:#000;background-color:{};border:1px solid black;padding:10px"> {} </td>
            '''.format(get_clr(pred_scores[i][k]/5), pred_out[i][k])
        html_str += '''
        </tr>
        <tr>
        <th scope="row" style="border:1px solid black;padding:10px;text-align:left"> Levenshtein distance (to true output) &emsp; </th>
        '''
        for k in range(K):
            html_str += '''
            <td style="border:1px solid black;padding:10px"> {} </td>
            '''.format(pred_scores[i][k])
        html_str += '''
        </tr>
        </table>
        '''
        display(html_print(html_str))
        print('\n\n')
    
    return rand_seq

### Testing with the best model

In [None]:
#Building the model with the best hyperparameters
model = build_model(len(input_char_dec), len(target_char_dec), 
                    inp_emb_size=256, n_enc_layers=2, 
                    n_dec_layers=5, h_layer_size=256, 
                    cell_type='GRU', dropout=0.3, r_dropout=0.3)


#Training the model with best set of hyperparameters
model = train(model = model, train_input_data= [train_enc_x,train_dec_x], train_target_data= train_dec_y, 
                      val_input_data= [val_enc_x,val_dec_x], val_target_data= val_dec_y, beam_width= 5,
                      attention = True, batch_size= 256, optimizer = 'adam', learning_rate= 0.001, 
                      epochs= 15)
model.save("best_model_attn.h5")

In [None]:
#Testing the model with best set of hyperparameters

test_acc, test_exact_K_acc, test_exact_acc, test_loss, test_true_out,\
test_pred_out, test_pred_scores, model =test(model, test_enc_x, 
                                                          test_dec_y, max_decoder_seq_length, 
                                                          target_char_enc, 
                                                          target_char_dec, test_x)

### Visualizing random samples of predictions

In [None]:
random_samples = visualize_samples(test_x, test_true_out, test_pred_out, test_pred_scores)

### Visualise the best model (with attention)

In [None]:
plot_model(model, to_file="model_vanilla.png", show_shapes=True)