In [None]:
import pandas as pd
import random
import re
import tensorflow as tf
from tensorflow import keras
from keras.models import Sequential
from tensorflow.keras.layers import TextVectorization
from keras.layers import Dense, Dropout, Embedding, LSTM, Bidirectional,RepeatVector,TimeDistributed

# Data Pre-processing

In [None]:
pattern = r'([!#$%&\()*+,-./:;<=>?@\[\\\]^_`{|}~])'
text_pairs=[]
eng_vocab=[]
ara_vocab=[]
max_seq_ara=0
max_seq_eng=0
with open("ara_eng.txt","r", encoding='utf-8') as file:
  for line in file:
      eng,ara=line.split("\t")
      eng = re.sub(pattern, r' \1 ', eng)
      eng=eng.strip()
      eng=eng.lower()

      eng_words= eng.split(" ")
      if len(eng_words)>max_seq_eng:
        max_seq_eng= len(eng_words)

      for word in eng_words:
        if word not in eng_vocab:
          eng_vocab.append(word)


      ara = re.sub(pattern, r' \1 ', ara)
      ara=ara.strip()
      ara="<start> "+ara+" <end>"
      ara_words= ara.split(" ")
      
      if len(ara_words)>max_seq_ara:
        max_seq_ara= len(ara_words)

      for word in ara_words:
        if word not in ara_vocab:
          if (word != '<start>') and (word != '<end>'):
            ara_vocab.append(word)

      

      text_pairs.append((eng,ara))
#text_pairs

In [None]:
eng_vocab_len= len(eng_vocab)
ara_vocab_len= len(ara_vocab)
#max_seq_len= max(max_seq_eng, max_seq_ara)
max_seq_len=20
print(eng_vocab_len)
print(ara_vocab_len)
print(max_seq_ara)
print(max_seq_eng)
print(max_seq_len)

26104
57892
227
226
20


In [None]:
#Splitting the data:
random.shuffle(text_pairs)
test_num= int(0.2 * len(text_pairs))
test_data= text_pairs[:test_num]
train_data= text_pairs[test_num:]
print(len(text_pairs))
print(len(test_data))
print(len(train_data))
print(len(train_data)+len(test_data))

24638
4927
19711
24638


In [None]:
eng_vector= TextVectorization(max_tokens= eng_vocab_len, output_mode="int", output_sequence_length= max_seq_len)
ara_vector= TextVectorization(max_tokens= ara_vocab_len, output_mode="int", output_sequence_length= max_seq_len +1)

In [None]:
#Train data vectorization 

train_eng=[t[0] for t in train_data ]
train_ara=[t[1] for t in train_data ]
eng_vector.adapt(train_eng)
ara_vector.adapt(train_ara)

In [None]:
def format_dataset(english, arabic):
  eng_train= eng_vector(english)
  ara_train= ara_vector(arabic)
  return ({"encoder_inputs": eng_train, "decoder_inputs": ara_train[:, :-1],}, ara_train[:, 1:])

In [None]:
def dataset(pairs):
  english, arabic= zip(*text_pairs)
  english= list(english)
  arabic= list(arabic)
  dataset= tf.data.Dataset.from_tensor_slices((english,arabic))
  dataset= dataset.batch(64)
  dataset= dataset.map(format_dataset)
  return dataset.shuffle(2048).prefetch(16).cache()

In [None]:
train_dataset= dataset(train_data)
test_dataset= dataset(test_data)

In [None]:
test_data

In [None]:
for inputs, targets in train_dataset.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f"targets.shape: {targets.shape}")

inputs["encoder_inputs"].shape: (64, 50)
inputs["decoder_inputs"].shape: (64, 50)
targets.shape: (64, 50)


# Transformer

In [None]:
import torch
import torch.nn as nn
from torch.nn import Transformer
import math
import tensorflow.keras.layers as layers

In [None]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)



In [None]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)


In [None]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(latent_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)

        attention_output_1 = self.attention_1(
            query=inputs, value=inputs, key=inputs, attention_mask=causal_mask
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
            axis=0,
        )
        return tf.tile(mask, mult)

In [None]:
embed_dim= 256
dense_dim= 2048
heads=8

#ENCODER:

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = PositionalEmbedding(max_seq_len, ara_vocab_len, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, dense_dim, heads)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)



In [None]:
decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None, embed_dim), name="decoder_state_inputs")
x = PositionalEmbedding(max_seq_len, ara_vocab_len, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, dense_dim, heads)(x, encoded_seq_inputs)
decoder_outputs = layers.Dense(ara_vocab_len, activation="softmax")(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs)
decoder_outputs = decoder([decoder_inputs, encoder_outputs])
transformer = keras.Model(
    [encoder_inputs, decoder_inputs], decoder_outputs, name="transformer"
)

In [None]:
epochs = 25 # This should be at least 30 for convergence
#opt=adam
transformer.summary()
transformer.compile("rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
transformer.fit(train_dataset, epochs=epochs, validation_data=test_dataset)

Model: "transformer"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 positional_embedding_2 (Positi  (None, None, 256)   14825472    ['encoder_inputs[0][0]']         
 onalEmbedding)                                                                                   
                                                                                                  
 decoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 transformer_encoder_1 (Transfo  (None, None, 256)   3155456     ['positional_embedding_

<keras.callbacks.History at 0x7f3b8671e4a0>

In [None]:
transformer.save("model_trans.h5")

In [None]:
eng_vocab

In [None]:
import numpy as np

In [None]:
ara_vo = ara_vector.get_vocabulary()
ara_index_lookup = dict(zip(range(len(ara_vo)), ara_vo))
max_decoded_sentence_length = 20


def decode_sequence(input_sentence):
    tokenized_input_sentence = eng_vector([input_sentence])
    decoded_sentence = "<start> "
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = ara_vector([decoded_sentence])[:, :-1]
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])

        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = ara_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == " <end>":
            break
    return decoded_sentence


test_eng_texts = [pair[0] for pair in test_data]


In [None]:
for _ in range(30):
    input_sentence = random.choice(test_eng_texts)
    translated = decode_sequence(input_sentence)

In [None]:
print(input_sentence)

print(translated)

in ecuador ivan petroff shares some of his work done for his studies in artistic expression pointing us to some characteristic features of the colonial plastic arts .
<start>  في الاكوادور يشارك ايفان [UNK] بعض من اعماله التي عملها لدراسة الفنون التعبيرية [UNK] انتباهنا لبعض السمات المميزة للفنون [UNK]
