In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf

from tensorflow.python.ops.numpy_ops import np_config
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import Embedding, Dense, Input, Dropout, LayerNormalization

## Loading The Data <a name="2-1"></a>

In [None]:
en_df = pd.read_csv('./small_vocab_en.csv', header=None, usecols=[0])
fr_df = pd.read_csv('./small_vocab_fr.csv', header=None, usecols=[0])

In [None]:
print(len(en_df), len(fr_df))

In [None]:
en_df.head()

In [None]:
fr_df.head()

In [None]:
english_sentences =  en_df[0].values
french_sentences = fr_df[0].values

In [None]:
for i in range(len(english_sentences)):
    english_sentences[i] = "sos " + str(english_sentences[i]) + " eos."
    french_sentences[i] = "sos " + str(french_sentences[i]) + " eos."


## Tokenization <a name="2-3"></a>

In [None]:
num_words = 10000
tokenizer_en = Tokenizer(num_words=num_words, filters='!#$%&()*+,-/:;<=>@«»""[\\]^_`{|}~\t\n')
tokenizer_en.fit_on_texts(english_sentences)
english_sentences = tokenizer_en.texts_to_sequences(english_sentences)

word_index = tokenizer_en.word_index
print(f"The number of words in the English vocabulary: {len(word_index)}")

In [None]:
tokenizer_fr = Tokenizer(num_words=num_words, filters='!#$%&()*+,-/:;<=>@«»""[\\]^_`{|}~\t\n')
tokenizer_fr.fit_on_texts(french_sentences)
french_sentences = tokenizer_fr.texts_to_sequences(french_sentences)

word_index_fr = tokenizer_fr.word_index
print(f"The number of words in the French vocabulary: {len(word_index_fr)}")

## Padding <a name="2-4"></a>

In [None]:
english_sentences = pad_sequences(english_sentences, maxlen = 30, padding='post', truncating='post')
french_sentences = pad_sequences(french_sentences, maxlen=30, padding='post', truncating='post')

In [None]:
def get_angles(pos, i, embedding_dim):
    angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(embedding_dim))
    return pos * angle_rates

In [None]:
def positional_encoding(position, embedding_dim):
    angle_rads = get_angles(np.arange(position)[:, np.newaxis], 
                           np.arange(embedding_dim)[np.newaxis, :], embedding_dim)
    
    # apply sin to even indices in the array. ie 2i
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    
    # apply cos to odd indices in the array. ie 2i+1
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    
    pos_encoding = angle_rads[np.newaxis, ...]
    return tf.cast(pos_encoding, dtype=tf.float32)

## Masking   <a name="3-2"></a>

In [None]:
def create_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
    return seq[:, tf.newaxis, tf.newaxis, :]


In [None]:
def create_look_ahead_mask(size):
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    
    return mask


In [None]:
def create_masks(inputs, targets):

    enc_padding_mask = create_padding_mask(inputs)

    dec_padding_mask = create_padding_mask(inputs)

    look_ahead_mask = create_look_ahead_mask(tf.shape(targets)[1])

    dec_target_padding_mask = create_padding_mask(targets)
    

    combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)
        
    return enc_padding_mask, combined_mask, dec_padding_mask


In [None]:
def scaled_dot_product_attention(q, k, v, mask):

    matmul_qk = tf.matmul(q, k, transpose_b=True)
    

    dk = tf.cast(tf.shape(k)[-1], dtype=tf.float32)
    scaled_dk = tf.math.sqrt(dk)
    

    scaled_attention_logits = matmul_qk / scaled_dk
    

    if mask is not None:
        scaled_attention_logits += (mask * -1e9)
        

    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
    

    output = tf.matmul(attention_weights, v)
    
    return output, attention_weights


In [None]:
class MyMultiHeadAttention(tf.keras.layers.Layer):

    def __init__(self, key_dim, num_heads, dropout_rate=0.0):

        super(MyMultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.key_dim = key_dim

        assert key_dim % num_heads == 0 
        self.depth = self.key_dim // self.num_heads
        
        self.wq = Dense(key_dim)
        self.wk = Dense(key_dim)
        self.wv = Dense(key_dim)
    
        self.dropout = Dropout(dropout_rate)
    

        self.dense = Dense(key_dim)
        
    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])
        
    def call(self, v, k, q, mask=None):

        batch_size = tf.shape(q)[0]
        
        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)
        
        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)
        
        scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
        
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.key_dim))
        output = self.dense(concat_attention)
        output = self.dropout(output)
        
        return output, attention_weights
    

## Fully Connected NeuralNetwork <a name="4-3"></a>

In [None]:
def FeedForward(embedding_dim, fully_connected_dim):

    model = tf.keras.Sequential([
        tf.keras.layers.Dense(fully_connected_dim, activation='relu'),
        tf.keras.layers.Dense(embedding_dim)
    ])
    return model


In [None]:
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, embedding_dim, num_heads, fully_connected_dim, dropout_rate=0.1):

        super(EncoderLayer, self).__init__()
        

        self.mha = MyMultiHeadAttention(embedding_dim, num_heads, dropout_rate)
        

        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        

        self.dropout = Dropout(dropout_rate)
        

        self.ffn = FeedForward(embedding_dim, fully_connected_dim)
        
    def call(self, x, training, mask):
        attn_output, _ = self.mha(x, x, x, mask)
        
        out1 = self.layernorm1(attn_output + x)
        
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout(ffn_output, training=training)
        
        out2 = self.layernorm2(ffn_output + out1)
        
        return out2

In [None]:
class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, embedding_dim, num_heads, fully_connected_dim, input_vocab_size, maximum_position_encoding, dropout_rate=0.1):
        super(Encoder, self).__init__()
        
        self.num_layers = num_layers
        self.embedding_dim = embedding_dim
        
        self.embedding = Embedding(input_vocab_size, embedding_dim)
        
        self.pos_encoding = positional_encoding(maximum_position_encoding, embedding_dim)
        
        self.enc_layers = [EncoderLayer(embedding_dim, num_heads, fully_connected_dim, dropout_rate) for _ in range(num_layers)]
        
        self.dropout = Dropout(dropout_rate)
        
    def call(self, inputs, training, mask):
        seq_len = tf.shape(inputs)[1]

        inputs = self.embedding(inputs)

        inputs *= tf.math.sqrt(tf.cast(self.embedding_dim, tf.float32))

        inputs += self.pos_encoding[:, :seq_len, :]

        inputs = self.dropout(inputs, training=training)

        for i in range(self.num_layers):
            inputs = self.enc_layers[i](inputs, training, mask)

        return inputs

In [None]:
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, embedding_dim, num_heads, fully_connected_dim, dropout_rate=0.1):

        super(DecoderLayer, self).__init__()
        

        self.mha1 = MyMultiHeadAttention(embedding_dim, num_heads, dropout_rate)
        self.mha2 = MyMultiHeadAttention(embedding_dim, num_heads, dropout_rate)
        

        self.ffn = FeedForward(embedding_dim, fully_connected_dim)
        

        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.layernorm3 = LayerNormalization(epsilon=1e-6)
        
        self.dropout3 = Dropout(dropout_rate)
        
    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):

        attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask)
        
        out1 = self.layernorm1(attn1 + x) 
        
        attn2, attn_weights_block2 = self.mha2(enc_output, enc_output, out1, padding_mask)
        

        out2 = self.layernorm2(attn2 + out1)
        

        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output, training=training)
        
        out3 = self.layernorm3(ffn_output + out2)
        
        return out3, attn_weights_block1, attn_weights_block2


In [None]:
class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, embedding_dim, num_heads, fully_connected_dim, target_vocab_size, maximum_position_encoding, dropout_rate=0.1):

        super(Decoder, self).__init__()
        
        self.num_layers = num_layers
        self.embedding_dim = embedding_dim
        
        self.embedding = Embedding(target_vocab_size, embedding_dim)
        self.pos_encoding = positional_encoding(maximum_position_encoding, embedding_dim)
        self.dec_layers = [DecoderLayer(embedding_dim, num_heads, fully_connected_dim, dropout_rate=0.1) for _ in range(num_layers)]
        self.dropout = Dropout(dropout_rate)
    
    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):

        seq_len = tf.shape(x)[1]
        attention_weights = {}


        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.embedding_dim, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]
        x = self.dropout(x, training=training)

        for i in range(self.num_layers):

            x, block1, block2 = self.dec_layers[i](x, enc_output, training, look_ahead_mask, padding_mask)


            attention_weights[f"decoder_layer{i + 1}_block1"] = block1
            attention_weights[f"decoder_layer{i + 1}_block2"] = block2

        return x, attention_weights

In [None]:
class Transformer(tf.keras.Model):

    def __init__(self, num_layers, embedding_dim, num_heads, fully_connected_dim, input_vocab_size, target_vocab_size, max_positional_encoding_input, max_positional_encoding_target, dropout_rate=0.1):
        super(Transformer, self).__init__()
        
        self.encoder = Encoder(num_layers, embedding_dim, num_heads, fully_connected_dim, input_vocab_size, max_positional_encoding_input, dropout_rate)
        self.decoder = Decoder(num_layers, embedding_dim, num_heads, fully_connected_dim, input_vocab_size, max_positional_encoding_target, dropout_rate)
        
        self.final_layer = tf.keras.layers.Dense(target_vocab_size, activation='softmax')
        
    def call(self, inp, tar, training, enc_padding_mask, look_ahead_mask, dec_padding_mask):

        enc_output = self.encoder(inp, training, enc_padding_mask)
        
        dec_output, attention_weights = self.decoder(tar, enc_output, training, look_ahead_mask, dec_padding_mask)
        
        final_output = self.final_layer(dec_output)
        
        return final_output, attention_weights


## Initializing Hyperparameters <a name="5-1"></a>

In [None]:

embedding_dim = 256  
fully_connected_dim = 512 
num_layers = 4 
num_heads = 8 
dropout_rate = 0.1 


input_vocab_size = len(tokenizer_fr.word_index) + 2  
target_vocab_size = len(tokenizer_en.word_index) + 2  


max_positional_encoding_input = input_vocab_size  
max_positional_encoding_target = target_vocab_size  


EPOCHS = 10
batch_size = 64


In [None]:
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):

    def __init__(self, embedding_dim, warmup_steps=4000):
        super(CustomSchedule, self).__init__()
        self.embedding_dim = tf.cast(embedding_dim, dtype=tf.float32)
        self.warmup_steps = tf.cast(warmup_steps, dtype=tf.float32)

    def __call__(self, step):

        step = tf.cast(step, dtype=tf.float32)
        arg1 = tf.math.rsqrt(step)
        arg2 = step * (self.warmup_steps ** -1.5)
        return tf.math.rsqrt(self.embedding_dim) * tf.math.minimum(arg1, arg2)

learning_rate = CustomSchedule(embedding_dim)


In [None]:

transformer = Transformer(num_layers, embedding_dim, num_heads,
                           fully_connected_dim, input_vocab_size, target_vocab_size, 
                           max_positional_encoding_input, max_positional_encoding_target, dropout_rate)


optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2 = 0.98, epsilon = 1e-9)


loss_object = tf.keras.losses.SparseCategoricalCrossentropy()


def loss_function(true_values, predictions):

    
    mask = tf.math.logical_not(tf.math.equal(true_values, 0))

   
    loss_ = loss_object(true_values, predictions)

    
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask

    
    return tf.reduce_sum(loss_) / tf.reduce_sum(mask)

def accuracy_function(true_values, predictions):

    accuracies = tf.equal(true_values, tf.argmax(predictions, axis=2))

    mask = tf.math.logical_not(tf.math.equal(true_values, 0))

    accuracies = tf.math.logical_and(mask, accuracies)
    accuracies = tf.cast(accuracies, dtype=tf.float32)
    mask = tf.cast(mask, dtype=tf.float32)

    return tf.reduce_sum(accuracies) / tf.reduce_sum(mask)

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

In [None]:


train_step_signature = [
    tf.TensorSpec(shape=(batch_size, 30), dtype=tf.int64),
    tf.TensorSpec(shape=(batch_size,30), dtype=tf.int64),
]


@tf.function(input_signature=train_step_signature)
def train_step(encoder_input, target):

    decoder_input = target[:, :-1]

    expected_output = target[:, 1:]

    enc_padding_mask, combined_mask, dec_padding_mask = create_masks(encoder_input, decoder_input)

    with tf.GradientTape() as tape:
        predictions, _ = transformer(encoder_input, decoder_input, True, enc_padding_mask, combined_mask, dec_padding_mask)

        loss = loss_function(expected_output, predictions)

    gradients = tape.gradient(loss, transformer.trainable_variables)
    optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))

    train_loss(loss)
    train_accuracy(expected_output, predictions)



In [None]:

for epoch in range(1, EPOCHS+1):
    train_loss.reset_states()
    train_accuracy.reset_states()
    current_batch_index = 0

    

    for i in range(int(len(english_sentences)/batch_size)):
        target_batch = tf.convert_to_tensor(np.array(english_sentences[current_batch_index:current_batch_index+batch_size]),dtype=tf.int64)
        input_batch = tf.convert_to_tensor(np.array(french_sentences[current_batch_index:current_batch_index+batch_size]),dtype=tf.int64)

        current_batch_index = current_batch_index + batch_size
        train_step(input_batch, target_batch)
    
    
    print(f'Epoch {epoch} Loss {train_loss.result():.4f} Accuracy {train_accuracy.result():.4f}')


In [None]:
maxlen = 30
def translate_helper(sentence):
    sentence = 'sos ' + sentence[0] + ' eos.'
    sentence = [sentence] 
    

    sentence = tokenizer_fr.texts_to_sequences(sentence)
    sentence = pad_sequences(sentence, maxlen=30, padding='post', truncating='post')
    input = tf.convert_to_tensor(np.array(sentence),dtype=tf.int64)
    

    decoder_input = tokenizer_en.texts_to_sequences(['sos'])
    decoder_input = tf.convert_to_tensor(np.array(decoder_input), dtype=tf.int64)
    

    for i in range(maxlen):

        enc_padding_mask, combined_mask, dec_padding_mask = create_masks(input, decoder_input)
        
        predictions, _ = transformer(input, decoder_input,False,enc_padding_mask,combined_mask, dec_padding_mask)
        
        predictions = predictions[: ,-1:, :] 
        
        predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int64)
        
        if predicted_id == tokenizer_en.texts_to_sequences(['eos']):
            return tf.squeeze(decoder_input, axis=0)
        

        decoder_input = tf.concat([decoder_input, predicted_id], axis=1)
    
    return tf.squeeze(decoder_input, axis=0)


## Translate Function <a name="6-2"></a>

In [None]:
def translate(sentence):

    
    sentence = [sentence]
    

    print(f'Input sentence: {sentence[0]}')
    print()
    

    result = (translate_helper(sentence)).tolist()
    

    predicted_ids = [i for i in result if i != tokenizer_en.texts_to_sequences(['sos'])[0][0]
                     and i != tokenizer_en.texts_to_sequences(['eos.'])[0][0]]
    
    predicted_sentence = tokenizer_en.sequences_to_texts([predicted_ids])
    
    print(f'Translation: {predicted_sentence[0]}')
 


In [None]:
sentence = "new jersey est parfois calme pendant l' automne"
translate(sentence)

In [None]:
sentence = "california est généralement calme en mars"
translate(sentence)