In [None]:
import numpy as np
np.object = object
np.int = int
np.float = float
np.bool = bool

In [None]:
import tensorflow as tf

gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        tf.config.experimental.set_memory_growth(gpus[0], True)
    except RuntimeError as e:
        print(e)


In [None]:
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn import model_selection
import re
import tqdm

In [None]:
df =pd.read_csv('D:\\data.csv\\data.csv')

In [None]:
df

In [None]:
def clean_text(text): #---------- defult cleaning function to use any time ...
    text = text.lower()
    text = re.sub('\[.*?\]',' ',text)
    text = re.sub('https?://\S+|www\.\.S+',' ',text)
    text = re.sub('<.*?>+',' ',text)
    text = re.sub('\n',' ',text)
    text = re.sub('[^\w]',' ',text)
    text = re.sub('\w*\d\w*',' ',text)
    return text

In [None]:
df.english = df.english.map(clean_text)
df.spanish = df.spanish.map(clean_text)

In [None]:
def add_start_end(text):
    text = f'<start> {text} <end>'
    return text

In [None]:
df.english = df.english.map(add_start_end)
df.spanish = df.spanish.map(add_start_end)

In [None]:
df

In [None]:
def tokenize(lang):
    lang_tokenizer = tf.keras.preprocessing.text.Tokenizer(
        filters='!"#$%&()*+,-./:;=?@[\\]^_`{|}~\t\n',oov_token='<OOV>')
    lang_tokenizer.fit_on_texts(lang)
    tensor = lang_tokenizer.texts_to_sequences(lang)
    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor,padding='post')
    return tensor , lang_tokenizer
    

In [None]:
eng_sequence , eng_tokenizer = tokenize(df.english)
spn_sequence , spn_tokenizer = tokenize(df.spanish)

In [None]:
x_train, x_test , y_train , y_test = model_selection.train_test_split(eng_sequence,spn_sequence,test_size=0.1,random_state=42)

In [None]:
def convert (lang,tensor):
    for t in tensor:
        if t!=0:
            print('%d---> %s' % (t,lang.index_word[t]))

convert(eng_tokenizer,x_train[0])

In [None]:
vocab_inp_size = len(eng_tokenizer.word_index)+1
vocab_tar_size =  len(spn_tokenizer.word_index)+1
embedding_dim = 256
units = 1024
batch_size=32
     

In [None]:
def create_dataset(x,y,batch_size=32):
    data = tf.data.Dataset.from_tensor_slices((x,y))
    data = data.shuffle(1028)
    data = data.batch(batch_size,drop_remainder = True)
    data = data.prefetch(tf.data.experimental.AUTOTUNE)
    return data

In [None]:
train_dataset = create_dataset(x_train,y_train)
test_dataset = create_dataset(x_test,y_test)

In [None]:
for eng , spn in train_dataset.take(1):
    print(f'English : {eng.shape}\n{eng}')
    print(f'Spanish : {spn.shape}\n{spn}')

In [None]:
class Encoder(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim, encoder_units, batch_size):
      super(Encoder, self).__init__()

      self.batch_size = batch_size
      self.encoder_units = encoder_units
      self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim, mask_zero=True)
      self.gru = tf.keras.layers.GRU(self.encoder_units, 
                                           return_sequences=True,
                                           return_state=True,
                                           recurrent_initializer = 'glorot_uniform')

  def call(self, x, hidden):
    x = self.embedding(x)
    output, state = self.gru(x, initial_state = hidden)
    return output, state

  def initialize_hidden_state(self):
    return tf.zeros((self.batch_size, self.encoder_units))

In [None]:
class Decoder(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim, decoder_units, batch_size):
      super(Decoder, self).__init__()

      self.batch_size = batch_size
      self.decoder_units = decoder_units
      self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim, mask_zero=True)
      self.gru = tf.keras.layers.GRU(self.decoder_units, 
                                           return_sequences=True,
                                           return_state=True,
                                           recurrent_initializer = 'glorot_uniform')
      
      self.fc = tf.keras.layers.Dense(vocab_size)


  def call(self, x, hidden):
    x = self.embedding(x)
    output, hidden = self.gru(x, initial_state = hidden)
    output = tf.reshape(output, (-1, output.shape[2]))
    x =  tf.nn.softmax(self.fc(output))
    return x, hidden

In [None]:
# vocab_inp_size = len(eng_tokenizer.word_index)+1
# vocab_tar_size =  len(spn_tokenizer.word_index)+1
# embedding_dim = 256
# units = 1024
# batch_size=32

encoder = Encoder(vocab_inp_size, embedding_dim, units, batch_size)
sample_hidden = encoder.initialize_hidden_state()
sample_output, sample_hidden = encoder(eng, sample_hidden)
print ('Encoder output shape: (batch size, sequence length, units) {}'.format(sample_output.shape))
print ('Encoder Hidden state shape: (batch size, units) {}'.format(sample_hidden.shape))

In [None]:
decoder = Decoder(vocab_tar_size, embedding_dim, units, batch_size)

sample_decoder_output, _ = decoder(tf.random.uniform((batch_size, 1)), sample_hidden)

print ('Decoder output shape: (batch size, vocab_size) {}'.format(sample_decoder_output.shape))

In [None]:
# create the optimizer using the Adam optimizer
optimizer = tf.keras.optimizers.Adam()
# create the loss function
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=False, reduction='none')

# define the loss function for the training
def loss_function(real, pred):
  # create the mask to ignore the padding tokens
  mask = tf.math.logical_not(tf.math.equal(real, 0))
  # mask shape == (batch_size, sequence_length)
  # calculate the loss
  loss_ = loss_object(real, pred)
  # mask the loss
  # how the mask works:
  # if the value is 1, the loss is calculated
  # if the value is 0, the loss is ignored
    #[1,1,1,1,1,1,0,0,0,0,0] mask
    # *
    #[2,6,2,1,6,3,2,1,5,7,9] input
    # =
    #[2,6,2,1,6,3,0,0,0,0,0] output
  mask = tf.cast(mask, dtype=loss_.dtype)
  # mask shape == (batch_size, sequence_length)

  loss_ *= mask
  # calculate the average loss per batch 
  return tf.reduce_mean(loss_)

In [None]:
# create the training metric 
train_loss = tf.metrics.Mean(name='train loss')
# create the testing metric 
test_loss =tf.metrics.Mean(name='test loss')
     

In [None]:
# create the training step
# using the tf.function decorator to speed up the training process by converting the training function to a TensorFlow graph
@tf.function
# define the training step 
def train_step(inputs, target, enc_hidden):
  # the encoder_hidden is the initial hidden state of the encoder
  # enc_hidden shape == (batch_size, hidden_size)

  # inilaize the loss to zero
  loss = 0
  # create the gradient tape to record the gradient of the loss with respect to the weights

  with tf.GradientTape() as tape:
    # pass the input to the encoder
    # enc_output shape == (batch_size, 49, hidden_size)
    # enc_hidden shape == (batch_size, hidden_size)
    # using the encoder to get the encoder_output and the encoder_hidden
    # using the encoder_hidden as the initial hidden state of the decoder
    enc_output, enc_hidden = encoder(inputs, enc_hidden)
    # set the initial decoder hidden state to the encoder hidden state
    dec_hidden = enc_hidden

    # create the start token 
    # start_token shape == (batch_size, 1)
    # repeat the start token for the batch size times
    dec_input = tf.expand_dims([spn_tokenizer.word_index['<start>']] * inputs.shape[0], 1)
    
    # Teacher forcing - feeding the target as the next input
    
    for t in range(1, target.shape[1]):
      # passing enc_output to the decoder
      predictions, dec_hidden = decoder(dec_input, dec_hidden)
      # calculate the loss for the current time step using the loss function
      loss += loss_function(target[:, t], predictions)

      # using teacher forcing
      dec_input = tf.expand_dims(target[:, t], 1)
  # calculate the loss for the current batch
  batch_loss = (loss / int(target.shape[1]))

  # get the trainable variables
  variables = encoder.trainable_variables + decoder.trainable_variables
  # calculate the gradients using the tape 
  gradients = tape.gradient(loss, variables)
  # update the trainable variables
  optimizer.apply_gradients(zip(gradients, variables))
  # add the loss to the training loss metric
  train_loss(batch_loss)
  return batch_loss

In [None]:
# create the training step
# using the tf.function decorator to speed up the training process by converting the training function to a TensorFlow graph
@tf.function 
def test_step(inputs, target, enc_hidden):
    # the encoder_hidden is the initial hidden state of the encoder
    # enc_hidden shape == (batch_size, hidden_size)
    # inilaize the loss to zero
    loss = 0
    # pass the input to the encoder 
    # enc_output shape == (batch_size, 49, hidden_size) 
    # enc_hidden shape == (batch_size, hidden_size)
    # using the encoder to get the encoder_output and the encoder_hidden
    enc_output, enc_hidden = encoder(inputs, enc_hidden)
    # set the initial decoder hidden state to the encoder hidden state
    dec_hidden = enc_hidden
    # create the start token
    # start_token shape == (batch_size, 1)
    # repeat the start token for the batch size times
    dec_input = tf.expand_dims([spn_tokenizer.word_index['<start>']] * inputs.shape[0], 1)
    for t in range(1, target.shape[1]):
        # passing enc_output to the decoder with dec_hidden as the initial hidden state
        predictions, dec_hidden = decoder(dec_input, dec_hidden)
        # calculate the loss for the current time step using the loss function 
        loss += loss_function(target[:, t], predictions)

        # using teacher forcing
        dec_input = tf.expand_dims(target[:, t], 1)
    # calculate the loss for the current batch
    batch_loss = (loss / int(target.shape[1]))
    # add the batch loss to the test loss metric
    test_loss(batch_loss)

In [None]:
# set the epochs to 3 
EPOCHS = 3
# set the old test loss to high number 
old_test_loss=1000000
# create the training loop
for epoch in range(EPOCHS):
    # reset the training loss metric
    train_loss.reset_states()
    # reset the testing loss metric
    test_loss.reset_states()

    # initalize the hidden state of the encoder to zeros 
    enc_hidden = encoder.initialize_hidden_state()
    # create the training progress bar set the total number of batches to the length of the training dataset and the batch size to the test size
    steps_per_epoch = eng_sequence.shape[0]//batch_size #=> 3717 batch in the dataset 
    bar = tf.keras.utils.Progbar(target=steps_per_epoch)
    
    count=0
    # iterate over the training dataset 
    for (batch, (inputs, target)) in enumerate(train_dataset):
        # update the progress bar
        count += 1
        # run the training step
        batch_loss = train_step(inputs, target, enc_hidden)
        bar.update(count)  # manually update the progress bar
                                                  
    
         
    
    # iterate over the testing dataset    
    for (batch, (inputs, target)) in enumerate(test_dataset):
        count += 1
        # run the testing step
        batch_loss = test_step(inputs, target, enc_hidden)
        bar.update(count)
    # save the best performance model on the test dataset 
    
    if old_test_loss> test_loss.result():
        # set the old test loss to the test loss 
        old_test_loss= test_loss.result()
        encoder.save(filepath='D:\\encoder')
        decoder.save(filepath='D:\\decoder')
        print('Model is saved')
    # print the training and testing loss
    print('#' * 50)
    print(f'Epoch #{epoch + 1}')
    print(f'Training Loss {train_loss.result()}')
    print(f'Testing Loss {test_loss.result()}')
    print('#' * 50)