In [None]:
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import train_test_split
import string
from string import digits
import re
from sklearn.utils import shuffle
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import LSTM, Input, Dense,Embedding, Concatenate, TimeDistributed
from tensorflow.keras.models import Model,load_model, model_from_json
from tensorflow.keras.utils import plot_model
from tensorflow.keras.preprocessing.text import one_hot, Tokenizer
from tensorflow.keras.callbacks import EarlyStopping
import pickle as pkl
import numpy as np
import time
import unicodedata

In [None]:
with open('/content/mar.txt','r') as f:
  data = f.read()

In [None]:
uncleaned_data_list = data.split('\n')
len(uncleaned_data_list)
uncleaned_data_list = uncleaned_data_list[:38695]
len(uncleaned_data_list)
english_word = []
marathi_word = []
cleaned_data_list = []
for word in uncleaned_data_list:
  english_word.append(word.split('\t')[:-1][0])
  marathi_word.append(word.split('\t')[:-1][1])
language_data = pd.DataFrame(columns=['English','Marathi'])
language_data['English'] = english_word
language_data['Marathi'] = marathi_word
language_data.to_csv('language_data.csv', index=False)

In [None]:
language_data.shape
language_data.head()
language_data['English'].values
language_data['Marathi'].values

array(['जा.', 'पळ!', 'धाव!', ..., 'मला ऑस्ट्रेलियात राहायचं नाहीये.',
       'मला स्वतःबद्दल बोलायचं नाहीये.',
       'मी चुकून माझी हार्ड डिस्क पुसून टाकली.'], dtype=object)

In [None]:
english_text = language_data['English'].values
marathi_text = language_data['Marathi'].values
len(english_text), len(marathi_text)

(38695, 38695)

In [None]:
#to lower case
english_text_ = [x.lower() for x in english_text]
marathi_text_ = [x.lower() for x in marathi_text]

english_text_ = [re.sub("'",'',x) for x in english_text_]
marathi_text_ = [re.sub("'",'',x) for x in marathi_text_]

def remove_punctuation(text):
    punctuation = string.punctuation
    cleaned_text = ''.join(char for char in text if char not in punctuation)
    return cleaned_text

def remove_punctuation_from_list(text_list):
    return [remove_punctuation(text) for text in text_list]

# Applying the function to English and Marathi text
english_text_ = remove_punctuation_from_list(english_text_)
marathi_text_ = remove_punctuation_from_list(marathi_text_)

remove_digits = str.maketrans('', '', digits)
removed_digits_text = []
for sent in english_text_:
  sentance = [w.translate(remove_digits) for w in sent.split(' ')]
  removed_digits_text.append(' '.join(sentance))
english_text_ = removed_digits_text

# removing the digits from the marathi sentances
marathi_text_ = [re.sub("[२३०८१५७९४६]","",x) for x in marathi_text_]
marathi_text_ = [re.sub("[\u200d]","",x) for x in marathi_text_]

# removing the starting and ending whitespaces
english_text_ = [x.strip() for x in english_text_]
marathi_text_ = [x.strip() for x in marathi_text_]

# Putting the start and end words in the marathi sentances
marathi_text_ = ["start " + x + " end" for x in marathi_text_]
# manipulated_marathi_text_
marathi_text_[0], english_text_[0]

('start जा end', 'go')

In [None]:
def tokenize_sent(text):
  '''
  Take list on texts as input and
  returns its tokenizer and enocoded text
  '''
  tokenizer = Tokenizer()
  tokenizer.fit_on_texts(text)

  return tokenizer, tokenizer.texts_to_sequences(text)
  # Tokenize english and marathi sentences
eng_tokenizer, eng_encoded= tokenize_sent(text= english_text_)
mar_tokenizer, mar_encoded= tokenize_sent(text= marathi_text_)

# English Word --> index dictionary
eng_index_word = eng_tokenizer.index_word

# English Index --> word dictionary
eng_word_index= eng_tokenizer.word_index

# size of English vocabulary for encoder input
# For zero padding we have to add +1 in size
ENG_VOCAB_SIZE = len(eng_tokenizer.word_counts)+1 #4494

# Marathi Word --> index dict
mar_word_index= mar_tokenizer.word_index

# Marathi Index --> word dict
mar_index_word = mar_tokenizer.index_word
# marathi vocab size for decoder output
MAR_VOCAB_SIZE=len(mar_tokenizer.word_counts)+1 #10642

# Getting max length of English and Marathi sentences
max_eng_len = 0
for i in range(len(eng_encoded)):
  if len(eng_encoded[i]) > max_eng_len:
    max_eng_len= len(eng_encoded[i]) #9

max_mar_len = 0
for i in range(len(mar_encoded)):
  if len(eng_encoded[i]) > max_mar_len:
    max_mar_len= len(mar_encoded[i]) #9


# Padding both
eng_padded = pad_sequences(eng_encoded, maxlen=max_eng_len, padding='post')
mar_padded = pad_sequences(mar_encoded, maxlen=max_mar_len, padding='post')

# Convert to array
eng_padded= np.array(eng_padded)
mar_padded= np.array(mar_padded)

# Split data into train and test set
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(eng_padded, mar_padded, test_size=0.1, random_state=0)


In [None]:
EPOCHS = 25
# BUFFER_SIZE stores the number of training points
BUFFER_SIZE = len(X_train)

# BATCH_SIZE is set to 64. Training and gradient descent happens in batches of 64
BATCH_SIZE = 64

# the number of batches in one epoch (also, the number of steps during training, when we go batch by batch)
steps_per_epoch = len(X_train)//BATCH_SIZE

# the length of the embedded vector
embedding_dim = 256

# no of GRUs
units = 1024

# Hidden dimension
hidden_dim = 1024

# now, we shuffle the dataset and split it into batches of 64
dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True) # the remainder after splitting by 64 are dropped

print(BUFFER_SIZE)
print(BUFFER_SIZE//64)
print(steps_per_epoch)

34825
544
544


In [None]:
class Encoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(Encoder, self).__init__()
        self.hidden_dim = hidden_dim
        # Define the embedding layer
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        # Define the RNN layer, LSTM
        self.lstm = tf.keras.layers.LSTM(
            hidden_dim, return_sequences=True, return_state=True)

    def call(self, input_sequence, states):
        # Embed the input
        embed = self.embedding(input_sequence)
        # Call the LSTM unit
        output, state_h, state_c = self.lstm(embed, initial_state=states)

        return output, state_h, state_c

    def init_states(self, batch_size):
        # Return a all 0s initial states
        return (tf.zeros([batch_size, self.hidden_dim]),
                tf.zeros([batch_size, self.hidden_dim]))


In [None]:
class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(Decoder, self).__init__()
        self.hidden_dim = hidden_dim
        # Define the embedding layer
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        # Define the RNN layer, LSTM
        self.lstm = tf.keras.layers.LSTM(
            hidden_dim, return_sequences=True, return_state=True)
        self.dense = tf.keras.layers.Dense(vocab_size)

    def call(self, input_sequence, state):
        # Embed the input
        embed = self.embedding(input_sequence)
        # Call the LSTM unit
        lstm_out, state_h, state_c = self.lstm(embed, state)
        # Dense layer to predict output token
        logits = self.dense(lstm_out)

        return logits, state_h, state_c

In [None]:
#Create the encoder
encoder = Encoder(ENG_VOCAB_SIZE, embedding_dim, hidden_dim)
# Get the initial states
initial_state = encoder.init_states(1)
# Call the encoder for testing
test_encoder_output = encoder(tf.constant(
    [[1, 23, 4, 5, 0, 0]]), initial_state)
print(test_encoder_output[0].shape)
# Create the decoder
decoder = Decoder(MAR_VOCAB_SIZE, embedding_dim, hidden_dim)
# Get the initial states
de_initial_state = test_encoder_output[1:]
# Call the decoder for testing
test_decoder_output = decoder(tf.constant(
    [[1, 3, 5, 7, 9, 0, 0, 0]]), de_initial_state)
print(test_decoder_output[0].shape)

(1, 6, 1024)
(1, 8, 10642)


In [None]:
from tensorflow.keras import backend as K

def loss_func(targets, logits):
    crossentropy = tf.keras.losses.SparseCategoricalCrossentropy(
        from_logits=True)
    # Mask padding values, they do not have to compute for loss
    mask = tf.math.logical_not(tf.math.equal(targets, 0))
    mask = tf.cast(mask, dtype=tf.int64)
    # Calculate the loss value
    loss = crossentropy(targets, logits, sample_weight=mask)

    return loss

def accuracy_fn(y_true, y_pred):
    # y_pred shape is batch_size, seq length, vocab size
    # y_true shape is batch_size, seq length
    pred_values = K.cast(K.argmax(y_pred, axis=-1), dtype='int32')
    correct = K.cast(K.equal(y_true, pred_values), dtype='float32')

    # 0 is padding, don't include those
    mask = K.cast(K.greater(y_true, 0), dtype='float32')
    n_correct = K.sum(mask * correct)
    n_total = K.sum(mask)

    return n_correct / n_total

In [None]:
# Use the @tf.function decorator to take advance of static graph computation
@tf.function
def train_step(input_seq, target_seq_in, target_seq_out, en_initial_states, optimizer):
    ''' A training step, train a batch of the data and return the loss value reached
        Input:
        - input_seq: array of integers, shape [batch_size, max_seq_len, embedding dim].
            the input sequence
        - target_seq_out: array of integers, shape [batch_size, max_seq_len, embedding dim].
            the target seq, our target sequence
        - target_seq_in: array of integers, shape [batch_size, max_seq_len, embedding dim].
            the input sequence to the decoder, we use Teacher Forcing
        - en_initial_states: tuple of arrays of shape [batch_size, hidden_dim].
            the initial state of the encoder
        - optimizer: a tf.keras.optimizers.
        Output:
        - loss: loss value

    '''
    # Network’s computations need to be put under tf.GradientTape() to keep track of gradients
    with tf.GradientTape() as tape:
        # Get the encoder outputs
        en_outputs = encoder(input_seq, en_initial_states)
        # Set the encoder and decoder states
        en_states = en_outputs[1:]
        de_states = en_states
        # Get the encoder outputs
        de_outputs = decoder(target_seq_in, de_states)
        # Take the actual output
        logits = de_outputs[0]
        # Calculate the loss function
        loss = loss_func(target_seq_out, logits)
        acc = accuracy_fn(target_seq_out, logits)

    variables = encoder.trainable_variables + decoder.trainable_variables
    # Calculate the gradients for the variables
    gradients = tape.gradient(loss, variables)
    # Apply the gradients and update the optimizer
    optimizer.apply_gradients(zip(gradients, variables))

    return loss, acc

In [None]:
import os
def main_train(encoder, decoder, dataset, n_epochs, batch_size, optimizer, checkpoint, checkpoint_prefix):
    for epoch in range(n_epochs):
        start = time.time()
        en_initial_states = encoder.init_states(batch_size)
        for batch, (input_seq, target_seq) in enumerate(dataset.take(-1)):
            target_seq_in = target_seq[:, :-1]
            target_seq_out = target_seq[:, 1:]
            loss, accuracy = train_step(input_seq, target_seq_in, target_seq_out, en_initial_states, optimizer)
            if batch % 100 == 0:
                print('Epoch {} Batch {} Loss {:.4f} Accuracy {:.4f}'.format(epoch + 1, batch, loss.numpy(), accuracy.numpy()))
        if (epoch + 1) % 2 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)
        print('Epoch {} Loss {:.4f} Accuracy {:.4f}'.format(epoch + 1, loss.numpy(), accuracy.numpy()))
        print('Time taken for 1 epoch {:.2f} sec\n'.format(time.time() - start))
# Create an Adam optimizer and clips gradients by norm
optimizer = tf.keras.optimizers.Adam(clipnorm=5.0)
# Create a checkpoint object to save the model
checkpoint_dir = './training_ckpt_seq2seq'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(optimizer=optimizer,
                                 encoder=encoder,
                                 decoder=decoder)

# Train the model
main_train(encoder, decoder, dataset, EPOCHS, BATCH_SIZE, optimizer, checkpoint, checkpoint_prefix)


Epoch 1 Batch 0 Loss 5.9584 Accuracy 0.0000
Epoch 1 Batch 100 Loss 3.8391 Accuracy 0.2324
Epoch 1 Batch 200 Loss 3.3065 Accuracy 0.2378
Epoch 1 Batch 300 Loss 3.3664 Accuracy 0.2380
Epoch 1 Batch 400 Loss 3.1226 Accuracy 0.2606
Epoch 1 Batch 500 Loss 3.2151 Accuracy 0.2786
Epoch 1 Loss 2.9702 Accuracy 0.3114
Time taken for 1 epoch 41.01 sec

Epoch 2 Batch 0 Loss 2.7680 Accuracy 0.2937
Epoch 2 Batch 100 Loss 2.7630 Accuracy 0.3304
Epoch 2 Batch 200 Loss 2.8169 Accuracy 0.3254
Epoch 2 Batch 300 Loss 2.7491 Accuracy 0.3452
Epoch 2 Batch 400 Loss 2.4665 Accuracy 0.3982
Epoch 2 Batch 500 Loss 2.3268 Accuracy 0.3789
Epoch 2 Loss 2.4136 Accuracy 0.3494
Time taken for 1 epoch 19.90 sec

Epoch 3 Batch 0 Loss 1.9674 Accuracy 0.4107
Epoch 3 Batch 100 Loss 2.2590 Accuracy 0.4077
Epoch 3 Batch 200 Loss 2.1288 Accuracy 0.3994
Epoch 3 Batch 300 Loss 1.9834 Accuracy 0.4192
Epoch 3 Batch 400 Loss 1.9910 Accuracy 0.4252
Epoch 3 Batch 500 Loss 1.8367 Accuracy 0.4728
Epoch 3 Loss 1.9598 Accuracy 0.4331
Ti

In [None]:
def predict_output(input_text, encoder, input_max_len, tokenizer_inputs, word_index_outputs, index_word_outputs):
    # Tokenize the input sequence
    input_seq = tokenizer_inputs.texts_to_sequences([input_text])
    # Pad the sentence
    input_seq = pad_sequences(input_seq, maxlen=input_max_len, padding='post')
    # Set the encoder initial state
    en_initial_states = encoder.init_states(1)
    en_outputs = encoder(tf.constant(input_seq), en_initial_states)
    # Create the decoder input, the sos token
    de_input = tf.constant([[word_index_outputs['start']]])
    # Set the decoder states to the encoder vector or encoder hidden state
    de_state_h, de_state_c = en_outputs[1:]

    out_words = []
    while True:
        # Decode and get the output probabilities
        de_output, de_state_h, de_state_c = decoder(
            de_input, (de_state_h, de_state_c))
        # Select the word with the highest probability
        de_input = tf.argmax(de_output, -1)
        # Get the predicted word index
        predicted_index = de_input.numpy()[0][0]
        # Check if the predicted word is 'end' or if max length is reached
        if index_word_outputs[predicted_index] == 'end' or len(out_words) >= 20:
            break
        # Append the word to the predicted output
        out_words.append(index_word_outputs[predicted_index])

    return ' '.join(out_words)

# Example input sentence
input_sentence = "i want to go to my home"

# Predict the output sequence
output_sequence = predict_output(input_sentence, encoder, max_eng_len, eng_tokenizer, mar_word_index, mar_index_word)

# Print the predicted output sequence
print("Input Sentence:", input_sentence)
print("Predicted Output Sequence:", output_sequence)


Input Sentence: i want to go to my home
Predicted Output Sequence: मला माझ्या घरी जायचं आहे
