In [1]:
import numpy as np
import re
import string
import unicodedata
from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow.keras import layers
from TextProcessing import Encode, Decode, NaturalLanguageProcessing, Attention

BATCH_SIZE = 16
step_per_epoch = len(x_train) // BATCH_SIZE
HM_UNITS_ENCODE = 64
HM_UNITS_DECODE = 64
embending_dim = 256
vocab_inp_size = len(input_key.word_index) + 1
vocab_out_size = len(output_key.word_index) + 1

persian_path = 'mizan/mizan_fa.txt'
english_path = 'mizan/mizan_en.txt'

def prepare_data(sentense):
    sentense = re.sub(r'[\n]', r'', sentense)
    sentense = re.sub(r'[?$*@#]', r'', sentense)
    sentense = '<start> ' + sentense + ' <end>'
    return sentense

def ReadText(path, maxlen=1000):
    lst = []
    with open(path, 'r', encoding='UTF-8') as file:
        lines = file.readlines()
        for i, line in enumerate(lines):
            line = prepare_data(line)
            lst.append(line)
            if i==maxlen:
                break
    
    return lst

persian = ReadText(persian_path)
english = ReadText(english_path)    
persian[1], english[1]

('<start> و از روی گزارشاتی که برای رؤسای من در قاهره ارسال گردیده بودند نوشته شد. <end>',
 '<start> from notes jotted daily on the march, strengthened by some reports sent to my chiefs in Cairo. <end>')

In [2]:
nlp = NaturalLanguageProcessing(persian, english)
input_value, output_value, input_key, output_key = nlp.TokenizerText()
x_train, x_valid, y_train, y_valid = train_test_split(input_value, output_value, test_size=0.12)
print(f"Shape of X_train {x_train.shape}")
print(f"Shape of the Y_train {y_train.shape}")
print(f"Shape of the X_valid {x_valid.shape}")
print(f"Shape of the Y_valid {y_valid.shape}")

dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(len(x_train))
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)

Shape of X_train (880, 88)
Shape of the Y_train (880, 64)
Shape of the X_valid (121, 88)
Shape of the Y_valid (121, 64)


In [57]:
print(x_train[0], '\n') #this is persian data
print(y_train[0]) #this is english data

[   2  115 2652   17    7 2653    6 2654    4  611    1 1379    5   95
 2655  292 2656 1213    3    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0] 

[   2   23   41 1948  965   53    1  585 1949    4  307    5 1950    3
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0]


In [3]:
encoder = Encode(vocab_inp_size, embending_dim, HM_UNITS_ENCODE, BATCH_SIZE)
encoder

<__main__.Encode at 0x13d88de72b0>

In [5]:
decode = Decode(vocab_out_size, embending_dim, HM_UNITS_DECODE, BATCH_SIZE)
decode 

<__main__.Decode at 0x13d88de66b0>

In [6]:
opt = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')
checkpoint = tf.train.Checkpoint(optimizer=opt, encoder=encoder, decoder=decode)

def LossFunc(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask
    return tf.reduce_mean(loss_)

def train_step(inp, out, enc_hidden):
    loss = 0
    with tf.GradientTape() as grad:
        enc_output, enc_hidden = encoder(inp, enc_hidden)
        dec_hidden = enc_hidden
        dec_input = tf.expand_dims([output_key.word_index['<start>']] * BATCH_SIZE, 1)
        for t in range(1, out.shape[1]):
            predictions, dec_hidden, _ = decode(dec_input, dec_hidden, enc_output)
            loss += LossFunc(out[:, t], predictions)
            dec_input = tf.expand_dims(out[:, t], 1)
        batch_loss = (loss / int(out.shape[1]))
        variables = encoder.trainable_variables + decode.trainable_variables
        gradients = grad.gradient(loss, variables)
        opt.apply_gradients(zip(gradients, variables))
        return batch_loss
    
EPOCHS = 5
for epoch in range(EPOCHS):
    enc_hidden = encoder.InitialHiddenState()
    total_loss = 0
    for inp, out in dataset.take(step_per_epoch):
        batch_loss = train_step(inp, out, enc_hidden)
        total_loss += batch_loss
    print(f'Epochs {epoch} Total Loss is {total_loss.numpy()}')
    checkpoint.save(file_prefix='test_one')

Epochs 0 Total Loss is 114.2126235961914
Epochs 1 Total Loss is 97.24425506591797
Epochs 2 Total Loss is 94.83502197265625
Epochs 3 Total Loss is 93.40696716308594
Epochs 4 Total Loss is 92.34122467041016


In [53]:
def prepare_data(sentense):
    sentense = re.sub(r'[\n]', r'', sentense)
    sentense = re.sub(r'[?$*@#]', r'', sentense)
    sentense = '<start> ' + sentense + ' <end>'
    return sentense

def evaluate(sentence):
    def MaxLength(tensore):
        return max(len(t) for t in tensore)
    
    max_len_inp = MaxLength(input_value)
    max_len_out = MaxLength(output_value)
    prepare_data(sentence)
    inputs = [input_key.word_index[i] for i in sentence.split(' ')]
    inputs = tf.keras.preprocessing.sequence.pad_sequences([inputs], maxlen=max_len_inp, padding='post')
    inputs = tf.convert_to_tensor(inputs)
    results = ''
    hidden = [tf.zeros((1, HM_UNITS_ENCODE))]
    enc_out, enc_hidden = encoder(inputs, hidden)
    dec_hidden = enc_hidden
    dec_input = tf.expand_dims([output_key.word_index['<start>']], 0)
    for t in range(max_len_out):
        predictions, dec_hidden, waigth = decode(dec_input, dec_hidden, enc_out)
        waigth = tf.reshape(waigth, (-1, ))
        prediced_id = tf.argmax(predictions[0]).numpy()
        results += output_key.index_word[prediced_id] + ' '
        if output_key.index_word[prediced_id]=='<end>':
            return results, sentence
        dec_input = tf.expand_dims([prediced_id], 0)

        return results, sentence
    
    

In [54]:
checkpoint.restore(tf.train.latest_checkpoint(''))

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x13d977e1270>

In [55]:
text_test = "زن زندگی آزادی ."
evaluate(text_test) # translate text

('Woman of freedom life', 'زن زندگی آزادی .')