# **Importing required libraries**

In [23]:
import numpy as np
import csv
import pandas as pd
import random
from tensorflow.keras import Input, Model
from tensorflow.keras.layers import LSTM, SimpleRNN, GRU, Embedding, Dense, TimeDistributed, Concatenate, AdditiveAttention

In [25]:
import json
import cv2
from PIL import Image, ImageDraw, ImageFont
import matplotlib.pyplot as plt

#**Train,Test and Dev Data uploaded in kaggle**

In [29]:
tsv_file = open("/kaggle/input/sequence-sequence/hi.translit.sampled.train.tsv")
read_tsv = csv.reader(tsv_file, delimiter="\t")

In [30]:
val_tsv_file = open("/kaggle/input/sequence-sequence/hi.translit.sampled.dev.tsv")
val_read_tsv = csv.reader(val_tsv_file, delimiter="\t")

In [31]:
test_tsv_file = open("/kaggle/input/sequence-sequence/hi.translit.sampled.test.tsv")
test_read_tsv = csv.reader(test_tsv_file, delimiter="\t")

# **Processing training, validation and test data**

In [34]:
# Training data
devnagri = []
english = []

for i in read_tsv:
    devnagri.append(i[0])
    english.append(i[1])

devnagri = np.array(devnagri)
english = np.array(english)

# Validation data
val_devnagri = []
val_english = []

for i in val_read_tsv:
    val_devnagri.append(i[0])
    val_english.append(i[1])

val_devnagri = np.array(val_devnagri)
val_english = np.array(val_english)

# Test data
test_devnagri = []
test_english = []

for i in test_read_tsv:
    test_devnagri.append(i[0])
    test_english.append(i[1])

test_devnagri = np.array(test_devnagri)
test_english = np.array(test_english)

In [35]:
for i in range(devnagri.shape[0]):
    devnagri[i] = "\t" + devnagri[i] + "\n"

for i in range(val_devnagri.shape[0]):
    val_devnagri[i] = "\t" + val_devnagri[i] + "\n"

for i in range(test_devnagri.shape[0]):
    test_devnagri[i] = "\t" + test_devnagri[i] + "\n"

In [36]:
# Getting input and target language characters

# Training set
english_characters = set()
devnagri_characters = set()

for word in english:
    for char in word:
        if char not in english_characters:
            english_characters.add(char)

for word in devnagri:
    for char in word:
        if char not in devnagri_characters:
            devnagri_characters.add(char)

# Validation set
v_english_characters = set()
v_devnagri_characters = set()

for word in val_english:
    for char in word:
        if char not in v_english_characters:
            v_english_characters.add(char)

for word in val_devnagri:
    for char in word:
        if char not in v_devnagri_characters:
            v_devnagri_characters.add(char)

# Test set
t_english_characters = set()
t_devnagri_characters = set()

for word in test_english:
    for char in word:
        if char not in t_english_characters:
            t_english_characters.add(char)

for word in test_devnagri:
    for char in word:
        if char not in t_devnagri_characters:
            t_devnagri_characters.add(char)

In [37]:
english_characters = sorted(list(english_characters))
devnagri_characters = sorted(list(devnagri_characters))

num_encoder_tokens = len(english_characters)
num_decoder_tokens = len(devnagri_characters)

max_encoder_seq_length = max([len(txt) for txt in english])
max_decoder_seq_length = max([len(txt) for txt in devnagri])

# print("Number of samples:", len(english))
# print("Number of unique input tokens:", num_encoder_tokens)
# print("Number of unique output tokens:", num_decoder_tokens)
# print("Max sequence length for inputs:", max_encoder_seq_length)
# print("Max sequence length for outputs:", max_decoder_seq_length)

# **Preparing Encoder and Decoder Inputs**

In [38]:
# Preparing train encoder and decoder inputs

input_token_index = dict([(char, i) for i, char in enumerate(english_characters)])
target_token_index = dict([(char, i) for i, char in enumerate(devnagri_characters)])

reverse_input_char_index = dict((i, char) for char, i in input_token_index.items())
reverse_target_char_index = dict((i, char) for char, i in target_token_index.items())

encoder_input_data = np.zeros((len(english), max_encoder_seq_length), dtype="float32")
decoder_input_data = np.zeros((len(english), max_decoder_seq_length), dtype="float32")
decoder_target_data = np.zeros((len(english), max_decoder_seq_length, num_decoder_tokens), dtype="float32")

for i, (english, devnagri) in enumerate(zip(english, devnagri)):
    for t, char in enumerate(english):
        encoder_input_data[i, t] = input_token_index[char]

    for t, char in enumerate(devnagri):
        decoder_input_data[i, t] = target_token_index[char]
        if t > 0:
            # decoder_target_data will be ahead by one timestep and will not include the start character.
            decoder_target_data[i, t - 1, target_token_index[char]] = 1.0

In [39]:
# Preparing validation encoder and decoder inputs

encoder_val_input_data = np.zeros((len(val_english), max_encoder_seq_length), dtype="float32")
decoder_val_input_data = np.zeros((len(val_english), max_decoder_seq_length), dtype="float32")
decoder_val_target_data = np.zeros((len(val_english), max_decoder_seq_length, num_decoder_tokens), dtype="float32")

for i, (e, d) in enumerate(zip(val_english, val_devnagri)):
    for t, char in enumerate(e):
        encoder_val_input_data[i, t] = input_token_index[char]

    for t, char in enumerate(d):
        decoder_val_input_data[i, t] =  target_token_index[char]
        if t > 0:
            # decoder_target_data will be ahead by one timestep and will not include the start character.
            decoder_val_target_data[i, t - 1, target_token_index[char]] = 1.0

In [40]:
# Preparing test encoder and decoder inputs

encoder_test_input_data = np.zeros((len(test_english), max_encoder_seq_length), dtype="float32")
decoder_test_input_data = np.zeros((len(test_english), max_decoder_seq_length), dtype="float32")
decoder_test_target_data = np.zeros((len(test_english), max_decoder_seq_length, num_decoder_tokens), dtype="float32")

for i, (e, d) in enumerate(zip(test_english, test_devnagri)):
    for t, char in enumerate(e):
        encoder_test_input_data[i, t] = input_token_index[char]

    for t, char in enumerate(d):
        decoder_test_input_data[i, t] =  target_token_index[char]
        if t > 0:
            # decoder_target_data will be ahead by one timestep and will not include the start character.
            decoder_test_target_data[i, t - 1, target_token_index[char]] = 1.0

# **Defining Seq2Seq Model**

In [41]:
def training(input_embedding_size, dp, cell_type, hidden_layer_size, num_encoder_layers, num_decoder_layers):

    # ENCODER

    encoder_inputs = Input(shape=(max_encoder_seq_length,))
    encoder_embedding = Embedding(num_encoder_tokens, input_embedding_size, trainable=True)(encoder_inputs)

    encoder_layers = []
    encoder_states = []
    if cell_type == 'RNN':
        encoder = SimpleRNN(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
        encoder_layers.append(encoder)
        encoder_outputs, state_h = encoder(encoder_embedding)
        encoder_states.append([state_h])
        if num_encoder_layers > 1:
            encoder = SimpleRNN(hidden_layer_size,return_sequences=True,return_state=True, dropout = dp)
            encoder_layers.append(encoder)
            encoder_outputs, state_h2 = encoder(encoder_outputs)
            encoder_states.append([state_h2])
        if num_encoder_layers > 2:
            encoder = SimpleRNN(hidden_layer_size,return_sequences=True,return_state=True, dropout = dp)
            encoder_layers.append(encoder)
            encoder_outputs, state_h3 = encoder(encoder_outputs)
            encoder_states.append([state_h3])

    elif cell_type == 'GRU':
        encoder = GRU(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
        encoder_layers.append(encoder)
        encoder_outputs, state_h = encoder(encoder_embedding)
        encoder_states.append([state_h])
        if num_encoder_layers > 1:
            encoder = GRU(hidden_layer_size,return_sequences=True,return_state=True, dropout = dp)
            encoder_layers.append(encoder)
            encoder_outputs, state_h2 = encoder(encoder_outputs)
            encoder_states.append([state_h2])
        if num_encoder_layers > 2:
            encoder = GRU(hidden_layer_size,return_sequences=True,return_state=True, dropout = dp)
            encoder_layers.append(encoder)
            encoder_outputs, state_h3 = encoder(encoder_outputs)
            encoder_states.append([state_h3])

    else:
        encoder = LSTM(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
        encoder_layers.append(encoder)
        encoder_outputs, state_h, state_c = encoder(encoder_embedding)
        encoder_states.append([state_h, state_c])
        if num_encoder_layers > 1:
            encoder = LSTM(hidden_layer_size,return_sequences=True,return_state=True, dropout = dp)
            encoder_layers.append(encoder)
            encoder_outputs, state_h2, state_c2 = encoder(encoder_outputs)
            encoder_states.append([state_h2, state_c2])
        if num_encoder_layers > 2:
            encoder = LSTM(hidden_layer_size,return_sequences=True,return_state=True, dropout = dp)
            encoder_layers.append(encoder)
            encoder_outputs, state_h3, state_c3 = encoder(encoder_outputs)
            encoder_states.append([state_h3, state_c3])


    # DECODER

    decoder_inputs = Input(shape=(max_decoder_seq_length,))
    decoder_embedding = Embedding(num_decoder_tokens, input_embedding_size, trainable=True)(decoder_inputs)

    # We set up our decoder to return full output sequences, and to return internal states as well.
    # We don't use the return states in the training model, but we will use them in inference.

    decoder_layers = []
    if cell_type == 'RNN':
        decoder_RNN = SimpleRNN(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
        decoder_layers.append(decoder_RNN)
        decoder_outputs, _ = decoder_RNN(decoder_embedding, initial_state=encoder_states[0])
        if num_decoder_layers > 1:
            decoder_RNN = SimpleRNN(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
            decoder_layers.append(decoder_RNN)
            decoder_outputs, _  = decoder_RNN(decoder_outputs, initial_state=encoder_states[1])
        if num_decoder_layers > 2:
            decoder_RNN = SimpleRNN(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
            decoder_layers.append(decoder_RNN)
            decoder_outputs, _  = decoder_RNN(decoder_outputs, initial_state=encoder_states[2])

    elif cell_type == 'GRU':
        decoder_GRU = GRU(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
        decoder_layers.append(decoder_GRU)
        decoder_outputs, _ = decoder_GRU(decoder_embedding, initial_state=encoder_states[0])
        if num_decoder_layers > 1:
            decoder_GRU = GRU(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
            decoder_layers.append(decoder_GRU)
            decoder_outputs, _  = decoder_GRU(decoder_outputs, initial_state=encoder_states[1])
        if num_decoder_layers > 2:
            decoder_GRU = GRU(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
            decoder_layers.append(decoder_GRU)
            decoder_outputs, _  = decoder_GRU(decoder_outputs, initial_state=encoder_states[2])

    else:
        decoder_lstm = LSTM(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
        decoder_layers.append(decoder_lstm)
        decoder_outputs, _ , _ = decoder_lstm(decoder_embedding, initial_state=encoder_states[0])
        if num_decoder_layers > 1:
            decoder_lstm = LSTM(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
            decoder_layers.append(decoder_lstm)
            decoder_outputs, _ , _  = decoder_lstm(decoder_outputs, initial_state=encoder_states[1])
        if num_decoder_layers > 2:
            decoder_lstm = LSTM(hidden_layer_size, return_sequences=True, return_state=True, dropout = dp)
            decoder_layers.append(decoder_lstm)
            decoder_outputs, _ , _  = decoder_lstm(decoder_outputs, initial_state=encoder_states[2])

    decoder_dense = TimeDistributed(Dense(num_decoder_tokens, activation="softmax"))
    decoder_outputs = decoder_dense(decoder_outputs)

    # MODEL
    model = Model([encoder_inputs, decoder_inputs], decoder_outputs)

    return model, encoder_layers, decoder_layers

# **Inference model**

In [42]:
def inferencing(model,num_encoder_layers,num_decoder_layers,encoder_layers,decoder_layers,cell_type, hidden_layer_size):

    # ENCODER MODEL RECONSTRUCTION
    encoder_inputs = model.input[0]  # input_1
    encoder_states = []
    enc_emb = model.layers[2]     # embedding 1
    encoder_outputs = enc_emb(encoder_inputs)

    if cell_type == 'RNN' or cell_type =="GRU":
        for i in range(num_encoder_layers):
            encoder_outputs, state_h_enc = encoder_layers[i](encoder_outputs)
            encoder_states += [state_h_enc]
    else:
        for i in range(num_encoder_layers):
            encoder_outputs, state_h_enc, state_c_enc = encoder_layers[i](encoder_outputs)
            encoder_states += [state_h_enc, state_c_enc]

    encoder_model = Model(encoder_inputs, encoder_states + [encoder_outputs])


    # DECODER MODEL RECONSTRUCTION
    input_names = [["input_100","input_101"],["input_102","input_103"],["input_104","input_105"],"input_106"]

    decoder_inputs = model.input[1]       # input_2
    decoder_embedding = model.layers[3]   # embedding 2
    decoder_outputs = decoder_embedding(decoder_inputs)
    decoder_states = []
    decoder_states_inputs = []

    if cell_type == 'RNN' or cell_type =="GRU":
        for i in range(num_decoder_layers):
            decoder_states_inputs += [Input(shape=(hidden_layer_size,), name=input_names[i][0])]
        for i in range(num_decoder_layers):
            decoder_outputs, state_h_dec = decoder_layers[i](decoder_outputs, initial_state=decoder_states_inputs[i])
            decoder_states += [state_h_dec]
    else:
        for i in range(num_decoder_layers):
            decoder_states_inputs += [Input(shape=(hidden_layer_size,), name=input_names[i][0]), Input(shape=(hidden_layer_size,), name=input_names[i][1])]
        j = 0
        for i in range(num_decoder_layers):
            decoder_outputs, state_h_dec, state_c_dec = decoder_layers[i](decoder_outputs, initial_state=decoder_states_inputs[i+j:i+j+2])
            decoder_states += [state_h_dec , state_c_dec]
            j += 1

    decoder_dense = model.layers[4+2*num_encoder_layers]
    decoder_outputs = decoder_dense(decoder_outputs)
    decoder_model = Model([decoder_inputs] + decoder_states_inputs, [decoder_outputs] + decoder_states)

    return encoder_model, decoder_model

In [43]:
# def decode_sequence(input_seq,encoder_model,decoder_model):
#     # Encode the input as state vectors.
#     states_value = encoder_model.predict(input_seq,verbose=0)
#     states_value = states_value[:-1]
#     target_seq = np.zeros((1, 1))
#     target_seq[0, 0] = target_token_index["\t"]
#     stop_condition = False
#     decoded_sentence = ""
#     while not stop_condition:
#         dec_ip = [target_seq]+states_value
#         output_tokens = decoder_model.predict(dec_ip,verbose=0)
#         sampled_token_index = np.argmax(output_tokens[0][0, -1, :])
#         sampled_char = reverse_target_char_index[sampled_token_index]
#         decoded_sentence += sampled_char
#         if sampled_char == "\n" or len(decoded_sentence) > max_decoder_seq_length:
#             stop_condition = True

#         target_seq = np.zeros((1, 1))
#         target_seq[0, 0] = sampled_token_index
#         states_value = output_tokens[1:]

#     return decoded_sentence



def decode_sequence_from_states(states_value, decoder_model):
    target_seq = np.zeros((1, 1))
    target_seq[0, 0] = target_token_index["\t"]

    stop_condition = False
    decoded_sentence = ""

    while not stop_condition:
        dec_ip = [target_seq] + states_value
        output_tokens = decoder_model.predict(dec_ip, verbose=0)

        sampled_token_index = np.argmax(output_tokens[0][0, -1, :])
        sampled_char = reverse_target_char_index[sampled_token_index]
        decoded_sentence += sampled_char

        if sampled_char == "\n" or len(decoded_sentence) > max_decoder_seq_length:
            stop_condition = True

        target_seq[0, 0] = sampled_token_index
        states_value = output_tokens[1:]

    return decoded_sentence


# **Fitting the model**

In [44]:
batch_size = 128
epochs = 10
input_embedding_size = 512
hidden_layer_size = 256
num_layers = 3
num_encoder_layers = num_layers
num_decoder_layers = num_layers
dropout = 0.1
cell_type = 'LSTM'

# TRAIN
model, encoder_layers, decoder_layers = training(input_embedding_size, dropout, cell_type, hidden_layer_size, num_encoder_layers, num_decoder_layers)

# COMPILE
model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])

# FIT
model.fit(
    [encoder_input_data, decoder_input_data],
    decoder_target_data,
    batch_size=batch_size,
    epochs=epochs,
    shuffle = True,
    validation_data= ([encoder_val_input_data, decoder_val_input_data], decoder_val_target_data)
)

# encoder_model, decoder_model = inferencing(model, num_encoder_layers, num_decoder_layers, encoder_layers, decoder_layers, cell_type, hidden_layer_size)
# correct = 0
# n = val_devnagri.shape[0]
# for i in range(n):
#     input = encoder_val_input_data[i:i+1]
#     output = decode_sequence(input,encoder_model, decoder_model)
#     if output.strip() == val_devnagri[i].strip():
#         correct += 1
# print("Validation accuracy : ", correct*100/n)

I0000 00:00:1747708176.708759      35 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 15513 MB memory:  -> device: 0, name: Tesla P100-PCIE-16GB, pci bus id: 0000:00:04.0, compute capability: 6.0


Epoch 1/10


I0000 00:00:1747708190.027051     102 cuda_dnn.cc:529] Loaded cuDNN version 90300


[1m346/346[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m26s[0m 38ms/step - accuracy: 0.0760 - loss: 1.2051 - val_accuracy: 0.1081 - val_loss: 0.9305
Epoch 2/10
[1m346/346[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 34ms/step - accuracy: 0.1378 - loss: 0.8748 - val_accuracy: 0.1945 - val_loss: 0.5840
Epoch 3/10
[1m346/346[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 34ms/step - accuracy: 0.2268 - loss: 0.5180 - val_accuracy: 0.2560 - val_loss: 0.3502
Epoch 4/10
[1m346/346[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 34ms/step - accuracy: 0.2852 - loss: 0.3113 - val_accuracy: 0.2825 - val_loss: 0.2614
Epoch 5/10
[1m346/346[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 34ms/step - accuracy: 0.3108 - loss: 0.2205 - val_accuracy: 0.2936 - val_loss: 0.2219
Epoch 6/10
[1m346/346[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 33ms/step - accuracy: 0.3259 - loss: 0.1755 - val_accuracy: 0.2986 - val_loss: 0.2064
Epoch 7/10
[1m346/346[0m 

<keras.src.callbacks.history.History at 0x79e82f342910>

# **Predictions on test set**

In [45]:
# encoder_model, decoder_model = inferencing(model, num_encoder_layers, num_decoder_layers, encoder_layers, decoder_layers, cell_type, hidden_layer_size)
# correct = 0
# predictions = []
# n = test_devnagri.shape[0]
# for i in range(n):
#     input = encoder_test_input_data[i:i+1]
#     output = decode_sequence(input,encoder_model, decoder_model)
#     if output.strip() == test_devnagri[i].strip():
#         correct += 1
#     predictions.append(output.strip())
# print("Test accuracy : ", correct*100/n)

encoder_model, decoder_model = inferencing(model, num_encoder_layers, num_decoder_layers, encoder_layers, decoder_layers, cell_type, hidden_layer_size)
all_states = encoder_model.predict(encoder_test_input_data, verbose=0)
all_states = all_states[:-1]  # drop encoder_outputs

# 2. Loop through test data
correct = 0
predictions = []
n = test_devnagri.shape[0]

for i in range(n):
    # Extract i-th encoder state from each state tensor
    states_value = [state[i:i+1] for state in all_states]

    # Decode
    output = decode_sequence_from_states(states_value, decoder_model)

    # Compare
    if output.strip() == test_devnagri[i].strip():
        correct += 1
    predictions.append(output.strip())

print("Test accuracy : ", correct * 100 / n)

Test accuracy :  31.363838294091515


In [53]:
with open('/kaggle/working/predictions_vanilla.csv', 'w', newline='', encoding='utf-8', errors='ignore') as file:
    header = ['Input', 'Prediction', 'Ground Truth']
    writer = csv.DictWriter(file, fieldnames=header)
    writer.writeheader()
    for i in range(n):
        writer.writerow({
            'Input': test_english[i],
            'Prediction': predictions[i],
            'Ground Truth': test_devnagri[i]
        })

# **Displaying grid**

In [56]:
def display_images( images, columns=5, width=22, height=10):
    height = max(height, int(len(images)/columns) * 5)
    plt.figure(figsize=(width, height))
    for i, image in enumerate(images):
        plt.subplot(int(len(images) / columns + 1), columns, i + 1)
        plt.tick_params(axis='both', which='both', bottom=False, top=False, labelbottom=False, right=False, left=False, labelleft=False)
        plt.imshow(image)

In [61]:
arial_dir=r"/kaggle/input/sequence-sequence/arial.ttf"
mangal_dir=r"/kaggle/input/sequence-sequence/MANGAL.TTF"



img = []
for i in range(10):
    img.append(Image.new('RGBA', (150, 150), color = (10, 200, 210, 140)))
    d = ImageDraw.Draw(img[i])
    d.line(((65, 80), (75, 90), (85, 80)), fill=(0, 0, 0), width=2)
    d.line(((75, 52), (75, 90)), fill=(0, 0, 0), width=2)
    text1 = test_english[i*355]
    text2 = "\n" + predictions[i*355]
    arial = ImageFont.truetype(arial_dir, 20)
    mangal = ImageFont.truetype(mangal_dir, 20)
    # arial = ImageFont.truetype('arial.ttf', 20)
    # mangal = ImageFont.truetype('MANGAL.TTF', 20)
    w1,h1 = arial.getsize(text1)
    w2,h2 = mangal.getsize(text2)
    d.text((75-w1/2, 35-h1/2), text1 , fill=(0,0,0), font = arial)
    d.text((77-w2/2, 77-h2/2), text2, font = mangal,  fill=(0,0,0))

display_images(img)