# **Importing required libraries**

In [None]:
import numpy as np
import csv
import pandas as pd
import random
from tensorflow.keras import Input, Model
from tensorflow.keras.layers import LSTM, SimpleRNN, GRU, Embedding, Dense, TimeDistributed, Concatenate, AdditiveAttention 

In [None]:
!pip install -U -q PyDrive
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

In [None]:
!pip install wandb
import wandb
from wandb.keras import WandbCallback

In [None]:
import matplotlib.pyplot as plt
from matplotlib.font_manager import FontProperties
from IPython.display import HTML as html_print
import time
from IPython.display import clear_output

# **Downloading Data from Google Drive**

In [None]:
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [None]:
id1 = '1D3TlW1FklEsr-rJUEF2JP_upwEUvbhKU'
id2 = '1YKRR5D6ZRaFlG8rcrGJHZ0i9Zj3xPkh8'
id3 = '1neNMvrSDHHi65LU27bXWixfFpYzlsvQd'

In [None]:
downloaded1 = drive.CreateFile({'id':id1})
downloaded2 = drive.CreateFile({'id':id2})
downloaded3 = drive.CreateFile({'id':id3})
downloaded1.GetContentFile('hi.translit.sampled.train.tsv')
downloaded2.GetContentFile('hi.translit.sampled.test.tsv')
downloaded3.GetContentFile('hi.translit.sampled.dev.tsv')

In [None]:
tsv_file = open("hi.translit.sampled.train.tsv")
read_tsv = csv.reader(tsv_file, delimiter="\t")

In [None]:
val_tsv_file = open("hi.translit.sampled.dev.tsv")
val_read_tsv = csv.reader(val_tsv_file, delimiter="\t")

In [None]:
test_tsv_file = open("hi.translit.sampled.test.tsv")
test_read_tsv = csv.reader(test_tsv_file, delimiter="\t")

In [None]:
!wget "https://www.fontmirror.com/app_public/files/t/1/2020/04/MANGAL.TTF"

# **Processing training, validation and test data**

In [None]:
# Training data
devnagri = []
english = []

for i in read_tsv:   
    devnagri.append(i[0])
    english.append(i[1])

devnagri = np.array(devnagri)
english = np.array(english)

# Validation data
val_devnagri = []
val_english = []

for i in val_read_tsv:
    val_devnagri.append(i[0])
    val_english.append(i[1])

val_devnagri = np.array(val_devnagri)
val_english = np.array(val_english)

# Test data
test_devnagri = []
test_english = []

for i in test_read_tsv:
    test_devnagri.append(i[0])
    test_english.append(i[1])

test_devnagri = np.array(test_devnagri)
test_english = np.array(test_english)

In [None]:
for i in range(devnagri.shape[0]):
    devnagri[i] = "\t" + devnagri[i] + "\n"
    
for i in range(val_devnagri.shape[0]):
    val_devnagri[i] = "\t" + val_devnagri[i] + "\n"

for i in range(test_devnagri.shape[0]):
    test_devnagri[i] = "\t" + test_devnagri[i] + "\n"

In [None]:
# Getting input and target language characters

# Training set
english_characters = set()
devnagri_characters = set()

for word in english:
    for char in word:
        if char not in english_characters:
            english_characters.add(char)

for word in devnagri:
    for char in word:
        if char not in devnagri_characters:
            devnagri_characters.add(char)

# Validation set
v_english_characters = set()
v_devnagri_characters = set()

for word in val_english:
    for char in word:
        if char not in v_english_characters:
            v_english_characters.add(char)

for word in val_devnagri:
    for char in word:
        if char not in v_devnagri_characters:
            v_devnagri_characters.add(char)

# Test set
t_english_characters = set()
t_devnagri_characters = set()

for word in test_english:
    for char in word:
        if char not in t_english_characters:
            t_english_characters.add(char)

for word in test_devnagri:
    for char in word:
        if char not in t_devnagri_characters:
            t_devnagri_characters.add(char)

In [None]:
english_characters = sorted(list(english_characters))
devnagri_characters = sorted(list(devnagri_characters))

num_encoder_tokens = len(english_characters)
num_decoder_tokens = len(devnagri_characters)

max_encoder_seq_length = max([len(txt) for txt in english])
max_decoder_seq_length = max([len(txt) for txt in devnagri])

# print("Number of samples:", len(english))
# print("Number of unique input tokens:", num_encoder_tokens)
# print("Number of unique output tokens:", num_decoder_tokens)
# print("Max sequence length for inputs:", max_encoder_seq_length)
# print("Max sequence length for outputs:", max_decoder_seq_length)

# **Preparing Encoder and Decoder Inputs**

In [None]:
# Preparing train encoder and decoder inputs

input_token_index = dict([(char, i) for i, char in enumerate(english_characters)])
target_token_index = dict([(char, i) for i, char in enumerate(devnagri_characters)])

reverse_input_char_index = dict((i, char) for char, i in input_token_index.items())
reverse_target_char_index = dict((i, char) for char, i in target_token_index.items())

encoder_input_data = np.zeros((len(english), max_encoder_seq_length), dtype="float32")
decoder_input_data = np.zeros((len(english), max_decoder_seq_length), dtype="float32")
decoder_target_data = np.zeros((len(english), max_decoder_seq_length, num_decoder_tokens), dtype="float32")

for i, (english, devnagri) in enumerate(zip(english, devnagri)):
    for t, char in enumerate(english):
        encoder_input_data[i, t] = input_token_index[char]
    
    for t, char in enumerate(devnagri):
        decoder_input_data[i, t] = target_token_index[char]
        if t > 0:
            # decoder_target_data will be ahead by one timestep and will not include the start character.
            decoder_target_data[i, t - 1, target_token_index[char]] = 1.0    

In [None]:
# Preparing validation encoder and decoder inputs

encoder_val_input_data = np.zeros((len(val_english), max_encoder_seq_length), dtype="float32")
decoder_val_input_data = np.zeros((len(val_english), max_decoder_seq_length), dtype="float32")
decoder_val_target_data = np.zeros((len(val_english), max_decoder_seq_length, num_decoder_tokens), dtype="float32")

for i, (e, d) in enumerate(zip(val_english, val_devnagri)):
    for t, char in enumerate(e):
        encoder_val_input_data[i, t] = input_token_index[char]
  
    for t, char in enumerate(d):
        decoder_val_input_data[i, t] =  target_token_index[char]
        if t > 0:
            # decoder_target_data will be ahead by one timestep and will not include the start character.
            decoder_val_target_data[i, t - 1, target_token_index[char]] = 1.0   

In [None]:
# Preparing test encoder and decoder inputs

encoder_test_input_data = np.zeros((len(test_english), max_encoder_seq_length), dtype="float32")
decoder_test_input_data = np.zeros((len(test_english), max_decoder_seq_length), dtype="float32")
decoder_test_target_data = np.zeros((len(test_english), max_decoder_seq_length, num_decoder_tokens), dtype="float32")

for i, (e, d) in enumerate(zip(test_english, test_devnagri)):
    for t, char in enumerate(e):
        encoder_test_input_data[i, t] = input_token_index[char]
    
    for t, char in enumerate(d):
        decoder_test_input_data[i, t] =  target_token_index[char]
        if t > 0:
            # decoder_target_data will be ahead by one timestep and will not include the start character.
            decoder_test_target_data[i, t - 1, target_token_index[char]] = 1.0   

# **Defining Seq2Seq Model**

In [None]:
def training(input_embedding_size, dp, cell_type, hidden_layer_size, num_encoder_layers, num_decoder_layers):
    
    # ENCODER

    encoder_inputs = Input(shape=(max_encoder_seq_length,))
    encoder_embedding = Embedding(num_encoder_tokens, input_embedding_size, trainable=True)(encoder_inputs)
    
    encoder_layers = []
    encoder_states = []    
    if cell_type == 'RNN':
        encoder = SimpleRNN(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
        encoder_layers.append(encoder)
        encoder_outputs, state_h = encoder(encoder_embedding)
        encoder_states.append([state_h])
        if num_encoder_layers > 1:
            encoder = SimpleRNN(hidden_layer_size,return_sequences=True,return_state=True, dropout = dp) 
            encoder_layers.append(encoder)
            encoder_outputs, state_h2 = encoder(encoder_outputs)
            encoder_states.append([state_h2])
        if num_encoder_layers > 2:
            encoder = SimpleRNN(hidden_layer_size,return_sequences=True,return_state=True, dropout = dp) 
            encoder_layers.append(encoder)
            encoder_outputs, state_h3 = encoder(encoder_outputs)
            encoder_states.append([state_h3])
        
    elif cell_type == 'GRU':
        encoder = GRU(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
        encoder_layers.append(encoder)
        encoder_outputs, state_h = encoder(encoder_embedding)
        encoder_states.append([state_h])
        if num_encoder_layers > 1:
            encoder = GRU(hidden_layer_size,return_sequences=True,return_state=True, dropout = dp) 
            encoder_layers.append(encoder)
            encoder_outputs, state_h2 = encoder(encoder_outputs)
            encoder_states.append([state_h2])
        if num_encoder_layers > 2:
            encoder = GRU(hidden_layer_size,return_sequences=True,return_state=True, dropout = dp) 
            encoder_layers.append(encoder)
            encoder_outputs, state_h3 = encoder(encoder_outputs)
            encoder_states.append([state_h3])
       
    else:
        encoder = LSTM(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
        encoder_layers.append(encoder)
        encoder_outputs, state_h, state_c = encoder(encoder_embedding)
        encoder_states.append([state_h, state_c])
        if num_encoder_layers > 1:
            encoder = LSTM(hidden_layer_size,return_sequences=True,return_state=True, dropout = dp) 
            encoder_layers.append(encoder)
            encoder_outputs, state_h2, state_c2 = encoder(encoder_outputs)
            encoder_states.append([state_h2, state_c2])
        if num_encoder_layers > 2:
            encoder = LSTM(hidden_layer_size,return_sequences=True,return_state=True, dropout = dp) 
            encoder_layers.append(encoder)
            encoder_outputs, state_h3, state_c3 = encoder(encoder_outputs)
            encoder_states.append([state_h3, state_c3])

    
    # DECODER

    decoder_inputs = Input(shape=(max_decoder_seq_length,))
    decoder_embedding = Embedding(num_decoder_tokens, input_embedding_size, trainable=True)(decoder_inputs)

    # We set up our decoder to return full output sequences, and to return internal states as well. 
    # We don't use the return states in the training model, but we will use them in inference.
    
    decoder_layers = []
    if cell_type == 'RNN':
        decoder_RNN = SimpleRNN(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
        decoder_layers.append(decoder_RNN)
        decoder_outputs, _ = decoder_RNN(decoder_embedding, initial_state=encoder_states[0])
        if num_decoder_layers > 1:
            decoder_RNN = SimpleRNN(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
            decoder_layers.append(decoder_RNN)
            decoder_outputs, _  = decoder_RNN(decoder_outputs, initial_state=encoder_states[1])
        if num_decoder_layers > 2:
            decoder_RNN = SimpleRNN(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
            decoder_layers.append(decoder_RNN)
            decoder_outputs, _  = decoder_RNN(decoder_outputs, initial_state=encoder_states[2])
        
    elif cell_type == 'GRU':
        decoder_GRU = GRU(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
        decoder_layers.append(decoder_GRU)
        decoder_outputs, _ = decoder_GRU(decoder_embedding, initial_state=encoder_states[0])
        if num_decoder_layers > 1:
            decoder_GRU = GRU(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
            decoder_layers.append(decoder_GRU)
            decoder_outputs, _  = decoder_GRU(decoder_outputs, initial_state=encoder_states[1])
        if num_decoder_layers > 2:
            decoder_GRU = GRU(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
            decoder_layers.append(decoder_GRU)
            decoder_outputs, _  = decoder_GRU(decoder_outputs, initial_state=encoder_states[2])
      
    else:
        decoder_lstm = LSTM(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
        decoder_layers.append(decoder_lstm)
        decoder_outputs, _ , _ = decoder_lstm(decoder_embedding, initial_state=encoder_states[0])
        if num_decoder_layers > 1:
            decoder_lstm = LSTM(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
            decoder_layers.append(decoder_lstm)
            decoder_outputs, _ , _  = decoder_lstm(decoder_outputs, initial_state=encoder_states[1])
        if num_decoder_layers > 2:
            decoder_lstm = LSTM(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
            decoder_layers.append(decoder_lstm)
            decoder_outputs, _ , _  = decoder_lstm(decoder_outputs, initial_state=encoder_states[2])
       
    decoder_attention = AdditiveAttention(name="decoder_attention")
    decoder_concat    = Concatenate(name="decoder_concat")
    context_vec, attn_weights = decoder_attention([decoder_outputs, encoder_outputs], return_attention_scores=True)
    decoder_outputs = decoder_concat([decoder_outputs, context_vec])
  
    decoder_dense = TimeDistributed(Dense(num_decoder_tokens, activation="softmax"))
    decoder_outputs = decoder_dense(decoder_outputs)

    # MODEL
    model = Model([encoder_inputs, decoder_inputs], decoder_outputs)

    return model, encoder_layers, decoder_layers

# **Inference model**

In [None]:
def inferencing(model,num_encoder_layers,num_decoder_layers,encoder_layers,decoder_layers,cell_type, hidden_layer_size):
    
    # ENCODER MODEL RECONSTRUCTION 
    encoder_inputs = model.input[0]  # input_1
    encoder_states = []
    enc_emb = model.layers[2]     # embedding 1
    encoder_outputs = enc_emb(encoder_inputs)

    if cell_type == 'RNN' or cell_type =="GRU":
        for i in range(num_encoder_layers):
            encoder_outputs, state_h_enc = encoder_layers[i](encoder_outputs)
            encoder_states += [state_h_enc] 
    else:
        for i in range(num_encoder_layers):
            encoder_outputs, state_h_enc, state_c_enc = encoder_layers[i](encoder_outputs)
            encoder_states += [state_h_enc, state_c_enc]   

    encoder_model = Model(encoder_inputs, encoder_states + [encoder_outputs])


    # DECODER MODEL RECONSTRUCTION
    input_names = [["input_100","input_101"],["input_102","input_103"],["input_104","input_105"],"input_106"]

    decoder_inputs = model.input[1]       # input_2
    decoder_embedding = model.layers[3]   # embedding 2
    decoder_outputs = decoder_embedding(decoder_inputs)
    decoder_states = []
    decoder_states_inputs = []
    
    if cell_type == 'RNN' or cell_type =="GRU":
        for i in range(num_decoder_layers):
            decoder_states_inputs += [Input(shape=(hidden_layer_size,), name=input_names[i][0])]
        for i in range(num_decoder_layers):
            decoder_outputs, state_h_dec = decoder_layers[i](decoder_outputs, initial_state=decoder_states_inputs[i])
            decoder_states += [state_h_dec]
    else:
        for i in range(num_decoder_layers):
            decoder_states_inputs += [Input(shape=(hidden_layer_size,), name=input_names[i][0]), Input(shape=(hidden_layer_size,), name=input_names[i][1])]
        j = 0
        for i in range(num_decoder_layers):
            decoder_outputs, state_h_dec, state_c_dec = decoder_layers[i](decoder_outputs, initial_state=decoder_states_inputs[i+j:i+j+2])
            decoder_states += [state_h_dec , state_c_dec]
            j += 1

    att_layer = model.layers[4+2*num_encoder_layers]
    attn_input = Input(shape=(max_encoder_seq_length,hidden_layer_size), name=input_names[-1])   

    context_vec, attn_weights = att_layer([decoder_outputs, attn_input], return_attention_scores=True)
    
    concat_layer = model.layers[5+2*num_encoder_layers]
    decoder_outputs = concat_layer([decoder_outputs, context_vec])

    decoder_dense = model.layers[6+2*num_encoder_layers]
    decoder_outputs = decoder_dense(decoder_outputs)
    decoder_model = Model([decoder_inputs] + decoder_states_inputs + [attn_input], [decoder_outputs] + decoder_states + [attn_weights])

    return encoder_model, decoder_model

In [None]:
def decode_sequence(input_seq,encoder_model,decoder_model):
    # Encode the input as state vectors.
    states_value = encoder_model.predict(input_seq)
    attn_input = states_value[-1]
    states_value = states_value[:-1]
    target_seq = np.zeros((1, 1)) 
    target_seq[0, 0] = target_token_index["\t"]
    attn_weights = []
    stop_condition = False
    decoded_sentence = ""
    while not stop_condition:
        output_tokens = decoder_model.predict([target_seq] + states_value + [attn_input])
        sampled_token_index = np.argmax(output_tokens[0][0, -1, :])
        sampled_char = reverse_target_char_index[sampled_token_index]
        decoded_sentence += sampled_char
        if sampled_char == "\n" or len(decoded_sentence) > max_decoder_seq_length:
            stop_condition = True

        target_seq = np.zeros((1, 1))
        target_seq[0, 0] = sampled_token_index
        states_value = output_tokens[1:-1]
        attn_weights.append(output_tokens[-1][0][0])
        
    return decoded_sentence, attn_weights

# **Fitting the model**

In [None]:
batch_size = 128        
epochs = 7             
input_embedding_size = 512
hidden_layer_size = 512
num_layers = 1
num_encoder_layers = num_layers
num_decoder_layers = num_layers
dropout = 0.2
cell_type = 'GRU'

# TRAIN
model, encoder_layers, decoder_layers = training(input_embedding_size, dropout, cell_type, hidden_layer_size, num_encoder_layers, num_decoder_layers)

# COMPILE
model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])

# FIT
model.fit(
    [encoder_input_data, decoder_input_data],
    decoder_target_data,
    batch_size=batch_size,
    epochs=epochs,
    shuffle = True,
    validation_data= ([encoder_val_input_data, decoder_val_input_data], decoder_val_target_data)
)

# encoder_model, decoder_model = inferencing(model, num_encoder_layers, num_decoder_layers, encoder_layers, decoder_layers, cell_type, hidden_layer_size)
# correct = 0
# n = val_devnagri.shape[0]
# for i in range(n):
#     input = encoder_val_input_data[i:i+1]
#     output, attn_weights = decode_sequence(input,encoder_model, decoder_model)
#     if output.strip() == val_devnagri[i].strip():
#         correct += 1
# print("Validation accuracy : ", correct*100/n)

# **Hyperparameter Tuning**

In [None]:
sweep_config = {
    'method': 'bayes',
    'metric': {'goal': 'maximize', 'name': 'val_accuracy'},
    'parameters': {'input_embedding_size': {'values': [128, 256, 512]},
                   'hidden_layer_size': {'values': [128, 256, 512]},
                   'cell_type': {'values': ['LSTM', 'RNN', 'GRU']},
                   'num_layers': {'values': [1,2,3]},
                   'batch_size': {'values': [128,256,512]},
                   'dropout': {'values': [0.1, 0.2, 0.3, 0.4]}
                }}

In [None]:
def train():
    var1 = wandb.init()
    var2 = var1.config
    epochs = 7

    model, encoder_layers, decoder_layers = training(var2.input_embedding_size, var2.dropout, var2.cell_type , var2.hidden_layer_size, var2.num_layers, var2.num_layers)
    model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])
    model.fit(
        [encoder_input_data, decoder_input_data],
        decoder_target_data,
        batch_size=var2.batch_size,
        epochs=epochs,
        callbacks=[WandbCallback()]
    )

    encoder_model, decoder_model = inferencing(model,var2.num_layers, var2.num_layers,encoder_layers,decoder_layers,var2.cell_type,var2.hidden_layer_size)
    correct = 0
    n = val_devnagri.shape[0]
    for i in range(n):
        input = encoder_val_input_data[i:i+1]
        output, attn_weights = decode_sequence(input,encoder_model, decoder_model)
        if output.strip() == val_devnagri[i].strip():
            correct += 1
    wandb.log({'val_accuracy' : correct*100/n})

In [None]:
sweep_id = wandb.sweep(sweep_config, project="CS6910 Assignment 3")
wandb.agent(sweep_id, train, count=50)

# **Predictions on test set**

In [None]:
encoder_model, decoder_model = inferencing(model, num_encoder_layers, num_decoder_layers, encoder_layers, decoder_layers, cell_type, hidden_layer_size)
correct = 0
predictions = []
attentions = []
n = test_devnagri.shape[0]
print("len test ", n)
for i in range(n):
    input = encoder_test_input_data[i:i+1]
    output, attn_weights = decode_sequence(input,encoder_model, decoder_model)
    attentions.append(attn_weights)
    if output.strip() == test_devnagri[i].strip():
        correct += 1
    predictions.append(output.strip())
print("Test accuracy : ", correct*100/n)

In [None]:
# Storing predictions
file = open('predictions_attention.csv', 'w', newline ='', encoding = 'utf-8', errors='ignore')
  
with file:     
    header = ['Input', 'Prediction', 'Ground Truth']
    writer = csv.DictWriter(file, fieldnames = header)
    writer.writeheader()
    for i in range(n):
        writer.writerow({'Input' : test_english[i], 'Prediction': predictions[i], 'Ground Truth': test_devnagri[i]})

# **Attention heatmaps**

In [None]:
fig = []
n = 9 
fig , axs = plt.subplots(3,3)
fig.set_size_inches(23, 15)
l = -1
k = 0
for i in range(n):
    output = predictions[i]
    attn_weights = attentions[i]
    ylabel = [""]
    m = len(attn_weights)
    chars_ = [x for x in output]
    xlabel = [""]
    ylabel += chars_
    xlabel += [char for char in test_english[i]]
    
    for j in range(m):
        attn_weights[j] = attn_weights[j][1:len(xlabel)]
        
    attn_weights = attn_weights[:-1]
    if i%3 == 0:
        l+=1
        k=0
    cax = axs[l][k].matshow(np.array(attn_weights))
    axs[l][k].set_xticklabels(xlabel)
    xyz = FontProperties(fname = "MANGAL.TTF", size = 15)
    axs[l][k].set_yticklabels(ylabel, fontproperties = xyz)
    k+=1

plt.show()