<a href="https://colab.research.google.com/github/enthusiastic2003/SummerTimeFun/blob/main/MachineTranslator/venus.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import unicodedata

import re


# Convert the unicode sequence to ascii
def unicode_to_ascii(s):

  # Normalize the unicode string and remove the non-spacking mark
  return ''.join(c for c in unicodedata.normalize('NFD', s)
      if unicodedata.category(c) != 'Mn')

# Preprocess the sequence
def preprocess_sentence(w):

  # Clean the sequence
  w = unicode_to_ascii(w.lower().strip())

  # Create a space between a word and the punctuation following it also place a space between the punctuation and the following word. Note that punctuation also includes |

  w = re.sub(r"([?.!।])", r" \1 ", w)

  # Add a start and stop token to detect the start and end of the sequence
  w = '<start> ' + w + ' <end>'
  return w

In [None]:
import io

# Create the Dataset
def create_dataset(path, num_examples):
  lines = io.open(path, encoding='UTF-8').read().strip().split('\n')

  lines = lines[:num_examples]
  # Loop through lines (sequences) and extract the English and French sequences. Store them as a word-pair
  word_pairs = [[preprocess_sentence(w) for w in l.split('\t', 2)[:-1]]  for l in lines]
  return zip(*word_pairs)

In [None]:
path_to_file='ben.txt'

In [None]:
lines = io.open(path_to_file, encoding='UTF-8').read().strip().split('\n')
print(lines[0])
print(preprocess_sentence(lines[0].split('\t', 2)[0]))
print(preprocess_sentence(lines[0].split('\t', 2)[1]))

In [None]:
en, fra = create_dataset(path_to_file,6508)
print(en[0])
print(fra[0])

In [None]:
import tensorflow as tf

# Convert sequences to tokenizers
def tokenize(lang):
  lang_tokenizer = tf.keras.preprocessing.text.Tokenizer(
      filters='')

  # Convert sequences into internal vocab
  lang_tokenizer.fit_on_texts(lang)

  # Convert internal vocab to numbers
  tensor = lang_tokenizer.texts_to_sequences(lang)

  # Pad the tensors to assign equal length to all the sequences
  tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor,
                                                         padding='post')

  return tensor, lang_tokenizer

In [None]:
# Load the dataset
def load_dataset(path, num_examples=5000):

  # Create dataset (targ_lan = English, inp_lang = French)
  inp_lang,targ_lang = create_dataset(path,num_examples)

  # Tokenize the sequences
  input_tensor, inp_lang_tokenizer = tokenize(inp_lang)
  target_tensor, targ_lang_tokenizer = tokenize(targ_lang)

  return input_tensor, target_tensor, inp_lang_tokenizer, targ_lang_tokenizer

In [None]:
# Consider 50k examples
num_examples = 6508
input_tensor, target_tensor, inp_lang, targ_lang = load_dataset(path_to_file, num_examples)

# Calculate max_length of the target tensors
max_length_targ, max_length_inp = target_tensor.shape[1], input_tensor.shape[1]

In [None]:
print(max_length_targ, max_length_inp)

In [None]:
print(input_tensor[0], target_tensor.shape)

In [None]:
from sklearn.model_selection import train_test_split

# Create training and validation sets using an 80/20 split
input_tensor_train, input_tensor_val, target_tensor_train, target_tensor_val = train_test_split(input_tensor, target_tensor, test_size=0.2)

print(len(input_tensor_train), len(target_tensor_train), len(input_tensor_val), len(target_tensor_val))

In [None]:
print(input_tensor_train)
print(inp_lang)

In [None]:
# Show the mapping b/w word index and language tokenizer
def convert(lang, tensor):
  for t in tensor:
    if t != 0:
      print ("%d ----> %s" % (t, lang.index_word[t]))

print ("Input Language; index to word mapping")
convert(inp_lang, input_tensor_train[0])
print ()
print ("Target Language; index to word mapping")
convert(targ_lang, target_tensor_train[0])

In [None]:
BUFFER_SIZE = len(input_tensor_train)
BATCH_SIZE = 64
steps_per_epoch = len(input_tensor_train)//BATCH_SIZE
embedding_dim = 256
units = 1024
vocab_inp_size = len(inp_lang.word_index) + 1
vocab_tar_size = len(targ_lang.word_index) + 1

In [None]:
dataset = tf.data.Dataset.from_tensor_slices((input_tensor_train, target_tensor_train)).shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)

In [None]:
# Size of input and target batches
example_input_batch, example_target_batch = next(iter(dataset))
example_input_batch.shape, example_target_batch.shape

In [None]:
import tensorflow as tf

class Encoder(tf.keras.layers.Layer):
    def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz):
        super(Encoder, self).__init__()
        self.batch_sz = batch_sz
        self.enc_units = enc_units

        # Embedding layer
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)

        # GRU-bidirectional Layer
        self.gru = tf.keras.layers.GRU(self.enc_units,
                                   return_sequences=True,
                                   return_state=True,
                                   recurrent_initializer='glorot_uniform')


    # Encoder network comprises an Embedding layer followed by an LSTM layer
    def call(self, x, hidden):
        x = self.embedding(x)
        output_all_cells, state = self.gru(x, initial_state=hidden)
        return output_all_cells, state

    # To initialize the hidden state
    def initialize_hidden_state(self):
      return tf.zeros((self.batch_sz, self.enc_units))



In [None]:
encoder = Encoder(vocab_inp_size, embedding_dim, units, BATCH_SIZE)

sample_hidden = encoder.initialize_hidden_state()
sample_output, enc_states = encoder(example_input_batch, sample_hidden)

print ('Encoder output shape: (batch size, sequence length, units) {}'.format(sample_output.shape))
print('Encoder state shape: {}'.format(enc_states.shape))



In [None]:
import tensorflow as tf

class PayAttention(tf.keras.layers.Layer):
    def __init__(self, units_enc, units_dec,enc_length):
        super(PayAttention, self).__init__() #Call initializer of the superclass
        self.units_enc=units_enc
        self.units_dec=units_dec
        self.enc_length=enc_length
        self.W1 = tf.keras.layers.Dense(units_dec)
        self.W2 = tf.keras.layers.Dense(units_dec)
        self.V = tf.keras.layers.Dense(1)

    def call(self, enc_output, dec_states):


        # enc_output= (64, 23, 1024)
        # x=(1, units_dec)
        # x=(1,1)
        dec_states=tf.expand_dims(dec_states,1)
        score = self.V(tf.nn.tanh(
            self.W1(dec_states) + self.W2(enc_output)))
        #weights=(64,23,scalr)
        weights=score

        # softmaxed_weights = (64,1,23)
        softmaxed_weights = tf.nn.softmax(weights, axis=1)
        context_vector = tf.reduce_sum(softmaxed_weights*enc_output, axis=1)
        softmaxed_weights = tf.squeeze(softmaxed_weights)
        return context_vector, softmaxed_weights






In [None]:
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz, enc_units,inp_length):
        super().__init__()
        self.batch_sz=batch_sz
        self.dec_units=dec_units
        self.enc_units=enc_units
        self.embedding_dim=embedding_dim
        self.vocab_size=vocab_size
        self.inp_length=inp_length

        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)

        self.attention = PayAttention(self.enc_units , self.dec_units, self.inp_length)

        self.gru = tf.keras.layers.GRU(self.dec_units,
                                   return_sequences=True,
                                   return_state=True,
                                   recurrent_initializer='glorot_uniform')
        self.fc = tf.keras.layers.Dense(vocab_size)

    def call(self, enc_output, dec_input, dec_internal_state=None):
        # hidden is the hidden states of all the units in the encoder
        # context_vector is the context vector from the attention layer
        # dec_input is the input to the decoder

        # Now, first embed the decoder input
        if dec_internal_state==None:
          dec_internal_state = self.gru.get_initial_state(inputs=dec_input)
          dec_internal_state = tf.cast(dec_internal_state, tf.float32)

        # Get Context for each target token generation
        context_vector, softmaxed_weights = self.attention(enc_output, dec_internal_state)

        # Embed the decoder input
        embed_dec_input=self.embedding(dec_input)

        # Context + input = input to the GRU
        x = tf.concat([tf.expand_dims(context_vector, 1), embed_dec_input], axis=-1)

        # Pass through a GRU layer
        output, state = self.gru(x)

        # Pass through a dense layer to get the probabilities distribution over the target vocabulary
        output = tf.reshape(output, (-1, output.shape[2]))
        x = self.fc(output)
        return x, state , softmaxed_weights

In [None]:
decoder = DecoderLayer(vocab_tar_size ,embedding_dim, units, BATCH_SIZE, units, max_length_inp)

In [None]:
dec_input = tf.expand_dims([targ_lang.word_index['<start>']] * BATCH_SIZE, 1)
prob_dist, states_decode, weights = decoder(sample_output, dec_input, enc_states )
#decoder(sample_output, dec_input, None )

In [None]:
print(states_decode.shape)

In [None]:
print(weights[0])

In [None]:
# Now we will define the Model using the layers we have already defined.
#class BenLish(tf.keras.Model):
#  def __init__(self, vocab_size_inp, vocab_size_tar, embedding_dim, dec_units, batch_sz, enc_units,inp_length):
#    super().__init__()
#    self.encoder = Encoder(vocab_size_inp, embedding_dim, enc_units, batch_sz)
#    self.decoder = DecoderLayer(vocab_size_tar ,embedding_dim, dec_units, batch_sz, dec_units, inp_length)

#  def call(self, sentence):


In [None]:
#Now we define the loss function. The loss function is the cross entropy loss function. The cross entropy loss function is defined as follows:
#-sum(y_true * log(y_pred), axis=-1)
import numpy as np

with tf.device('/gpu:0'):
  optimizer = tf.optimizers.Adam()

  def loss_function(real, pred):
      mask = 1 - np.equal(real, 0)
      loss_ = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=real, logits=pred) * mask
      return tf.reduce_mean(loss_)

In [None]:

#Now we run the training loop
with tf.device('/gpu:0'):
  optimizer = tf.keras.optimizers.Adam()
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

  decoder = DecoderLayer(vocab_tar_size ,embedding_dim, units, BATCH_SIZE, units, max_length_inp)
  encoder=Encoder(vocab_inp_size, embedding_dim, units, BATCH_SIZE)

  import os
  checkpoint_dir = './training_checkpoints'
  checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
  checkpoint = tf.train.Checkpoint(optimizer=optimizer,
                                  encoder=encoder,
                                  decoder=decoder)

  def train_step(inp, targ):
      loss=0

      with tf.GradientTape() as tape:
          # Initialize the hidden state of the encoder and pass the input to the encoder
          hidden_initialize = encoder.initialize_hidden_state()
          enc_output, enc_hidden = encoder(inp, hidden_initialize)
          dec_hidd=enc_hidden
          # Initialize the hidden state of the decoder
          dec_input = tf.expand_dims([targ_lang.word_index['<start>']] * BATCH_SIZE, 1)

          # Teacher forcing - feeding the target as the next input
          for t in range(1, targ.shape[1]):
              # passing enc_output to the decoder
              prob_dist, dec_hidd, weights = decoder(enc_output, dec_input, dec_hidd )
              loss += loss_function(targ[:, t], prob_dist)
              # using teacher forcing
              dec_input = tf.expand_dims(targ[:, t], 1)



      batch_loss = (loss / int(targ.shape[1]))
      variables = encoder.trainable_variables + decoder.trainable_variables
      gradients = tape.gradient(loss, variables)
      optimizer.apply_gradients(zip(gradients, variables))
      return batch_loss




In [None]:
print(steps_per_epoch)

In [None]:
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

In [None]:
with tf.device('/gpu:0'):
  EPOCHS=100
  import time
  from tqdm import tqdm
  for epoch in range(EPOCHS):
      start = time.time()
      total_loss = 0
      pbar = tqdm(dataset.take(steps_per_epoch), ascii=True, total=steps_per_epoch)
      print("Epoch : {} / {}".format(epoch, EPOCHS))
      for (batch, (inp, targ)) in enumerate(pbar):
          batch_loss = train_step(inp, targ)
          total_loss += batch_loss
          pbar.set_description(
              "Step - {} / {} - batch loss - {:.4f} "
                  .format(batch+1, steps_per_epoch, batch_loss.numpy()))

      # saving (checkpoint) the model every 2 epochs
      checkpoint.save(file_prefix = checkpoint_prefix+str(epoch))

      print('Epoch {} Loss {:.4f}'.format(epoch + 1,
                                        total_loss / steps_per_epoch))




In [None]:
checkpoint.save(file_prefix = "finish_line")

In [None]:
from translator import utils
from translator import models

import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing import sequence

from matplotlib import ticker
from matplotlib import pyplot as plt
from matplotlib import font_manager as fm

FONT_NAME = 'assets/banglafonts/Siyamrupali.ttf'

class Infer():
    def __init__(self, input_language_tokenizer, target_language_tokenizer,
                max_length_input, max_length_target, encoder, decoder, units):
        self.input_language_tokenizer = input_language_tokenizer
        self.target_language_tokenizer = target_language_tokenizer
        self.max_length_input = max_length_input
        self.max_length_target = max_length_target
        self.encoder = encoder
        self.decoder = decoder
        self.units = units

    def preprocess(self, sentence):
        # clean and pad sequece
        sentence = utils.clean_seq(sentence)
        sentence = utils.add_start_and_end_token_to_seq(sentence)

        inputs = [
            self.input_language_tokenizer.word_index[i] for i in sentence.split(' ')]
        inputs = sequence.pad_sequences(
            [inputs], maxlen=self.max_length_input,padding='post')
        tensor = tf.convert_to_tensor(inputs)

        return tensor

    def predict(self, sentence):
        tensor = self.preprocess(sentence)

        # init encoder
        encoder_initial_hidden = [tf.zeros((1, self.units))]
        encoder_out, encoder_hidden = self.encoder(tensor, encoder_initial_hidden)

        # init decoder
        decoder_hidden = encoder_hidden
        decoder_input = tf.expand_dims(
            [self.target_language_tokenizer.word_index['<start>']], 0)

        result = ''
        for _ in range(self.max_length_target):
            predictions, decoder_hidden, _ = self.decoder(encoder_out, decoder_input, decoder_hidden)
            predicted_id = tf.argmax(predictions[0]).numpy()
            result += self.target_language_tokenizer.index_word[predicted_id] + ' '
            if self.target_language_tokenizer.index_word[predicted_id] == '<end>':
                return result
            # the predicted ID is fed back into the model insteqad of using
            # teacher forcing that we use in training time
            decoder_input = tf.expand_dims([predicted_id], 0)

        return result

    def predict_with_attention_weights(self, sentence):
        tensor = self.preprocess(sentence)

        # init encoder
        encoder_initial_hidden = [tf.zeros((1, self.units))]
        encoder_out, encoder_hidden = self.encoder(tensor, encoder_initial_hidden)

        # init decoder
        decoder_hidden = encoder_hidden
        decoder_input = tf.expand_dims(
            [self.target_language_tokenizer.word_index['<start>']], 0)

        result = ''
        attention_plot = np.zeros((self.max_length_target, self.max_length_input))
        for t in range(self.max_length_target):
            predictions, decoder_hidden, attention_weights = \
                self.decoder(decoder_input, decoder_hidden, encoder_out)

            # storing the attention weights to plot later on
            attention_weights = tf.reshape(attention_weights, (-1, ))
            attention_plot[t] = attention_weights.numpy()

            predicted_id = tf.argmax(predictions[0]).numpy()
            result += self.target_language_tokenizer.index_word[predicted_id] + ' '
            if self.target_language_tokenizer.index_word[predicted_id] == '<end>':
                return result, sentence, attention_plot

            # the predicted ID is fed back into the model insteqad of using
            # teacher forcing that we use in training time
            decoder_input = tf.expand_dims([predicted_id], 0)

        return result, sentence, attention_plot

# function for plotting the attention weights
def plot_attention(attention, sentence, predicted_sentence):
    prop = fm.FontProperties(fname=FONT_NAME)
    fig = plt.figure(figsize=(10, 10))
    ax = fig.add_subplot(1, 1, 1)
    ax.matshow(attention, cmap='viridis')

    ax.set_xticklabels([''] + sentence, rotation=90, fontproperties=prop)
    ax.set_yticklabels([''] + predicted_sentence, fontproperties=prop)

    ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
    ax.yaxis.set_major_locator(ticker.MultipleLocator(1))

    plt.rcParams.update({'font.size': 14})

    plt.show()

In [None]:
decoder_infer = DecoderLayer(vocab_tar_size ,embedding_dim, units, 64, units, max_length_inp)
encoder_infer=Encoder(vocab_inp_size, embedding_dim, units, 64)

In [None]:
checkpoint = tf.train.Checkpoint(optimizer=optimizer,
                                  encoder=encoder_infer,
                                  decoder=decoder_infer)

In [None]:
checkpoint.restore('./training_checkpoints/ckpt30-31')

In [None]:
pred=Infer(inp_lang,targ_lang, max_length_inp, max_length_targ, encoder_infer, decoder_infer,1024)

In [None]:
pred.predict("I did what i had to do.")