In [None]:
#Necessary packages
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import collections
import os
import re
import keras.backend as K
import random

Transformation Model for Creating A Translation Machine from English to German. This model was trained using the EUROPARL Dataset

In [2]:
#Here I mounted the Google Drive to my colab environment since I saved the dataset on drive
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [9]:
#I define the directory path and file path for english and german text files
dir_path = '/content/drive/MyDrive/Colab Notebooks/de-en'
en_file_path = os.path.join(dir_path, "europarl-v7.de-en.en")
de_file_path = os.path.join(dir_path, "europarl-v7.de-en.de")

#read both files line by line(english and german file)
with open(en_file_path, 'r', encoding='utf-8') as f:
    english_lines = f.read().splitlines()

with open(de_file_path, 'r', encoding='utf-8') as f:
    german_lines = f.read().splitlines()

# Convert to pandas dataframes and add start and end tokens
df_en = pd.DataFrame(english_lines, columns=["English words/sentences"])
df_en["English words/sentences"]=("<SOS> "+df_en["English words/sentences"]+" <EOS>")

df_de = pd.DataFrame(german_lines, columns=["German words/sentences"])
df_de["German words/sentences"]=("<SOS> "+df_de["German words/sentences"]+" <EOS>")

df = pd.concat([df_en, df_de], axis=1)

In [10]:
df["German word numbers"]=(df['English words/sentences'].str.split().apply(len))
df["English word numbers"]=(df['German words/sentences'].str.split().apply(len))

eng = df['English words/sentences']
de = df['German words/sentences']

In [11]:
english_words_counter = collections.Counter([word for sentence in eng for word in sentence.split()])
german_words_counter = collections.Counter([word for sentence in de for word in sentence.split()])

print('{} English words.'.format(len([word for sentence in eng for word in sentence.split()])))
print('{} unique English words.'.format(len(english_words_counter)))
print('10 Most common words in the English dataset:')
print('"' + '" "'.join(list(zip(*english_words_counter.most_common(10)))[0]) + '"')
print()
print('{} German words.'.format(len([word for sentence in de for word in sentence.split()])))
print('{} unique German words.'.format(len(german_words_counter)))
print('10 Most common words in the German dataset:')
print('"' + '" "'.join(list(zip(*german_words_counter.most_common(10)))[0]) + '"')

51722761 English words.
295399 unique English words.
10 Most common words in the English dataset:
"the" "<SOS>" "<EOS>" "of" "to" "and" "in" "that" "a" "is"

48454703 German words.
639032 unique German words.
10 Most common words in the German dataset:
"<SOS>" "<EOS>" "die" "der" "und" "in" "zu" "den" "für" "von"


In [12]:
#in this function I initialized the tokenizer, fitted the tokenzier on the texts(Creating a word index) and then convert thet texts to sequences of integers

def tokenize(x):
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(x)
    return tokenizer.texts_to_sequences(x), tokenizer

def pad(x, length=14):
    # If length is None, the function determines the maximum length of the sequences
    if length is None:
        length = max([len(sentence) for sentence in x])
    # Pad the sequences to the specified length with 'post' padding (adding padding at the end)
    return pad_sequences(x, maxlen=length, padding='post')

def clean_text(text):
    cleaned_texts = []
    for sent in text:
        # Remove all characters that are not letters, digits, or whitespace
        cleaned_text = re.sub(r'[^a-zA-Z0-9\s]', '', sent)
        cleaned_texts.append(cleaned_text)
    return cleaned_texts

def preprocess(x, y):

    # Tokenize the English text and german
    preprocess_x, x_tk = tokenize(x)
    preprocess_y, y_tk = tokenize(y)

    # Pad the tokenized English text and German text to a uniform length
    preprocess_x = pad(preprocess_x)
    preprocess_y = pad(preprocess_y)

    # Return the preprocessed and padded sequences along with the tokenizers
    return preprocess_x, preprocess_y, x_tk, y_tk

In [None]:
#preprocessing
preproc_english_sentences, preproc_german_sentences, english_tokenizer, german_tokenizer = preprocess(eng, de)

In [None]:
#the class positioinal encoding is a custom TensorFlow Keras layer that adds positional information to the input embeddings using the sinusoidal functions
#by adding positional encodings, the model gains information about the order of the tokens in the sequence
class positional_encoding(tf.keras.layers.Layer):
    def __init__(self,max_sentence_len,embedding_size,**kwargs):
        super().__init__(**kwargs)

        self.pos=np.arange(max_sentence_len).reshape(1,-1).T
        self.i=np.arange(embedding_size/2).reshape(1,-1)
        self.pos_emb=np.empty((1,max_sentence_len,embedding_size))
        self.pos_emb[:,:,0 : :2]=np.sin(self.pos / np.power(10000, (2 * self.i / embedding_size)))
        self.pos_emb[:,:,1 : :2]=np.cos(self.pos / np.power(10000, (2 * self.i / embedding_size)))
        self.positional_embedding = tf.cast(self.pos_emb,dtype=tf.float32)

    def call(self, inputs):
        return inputs + self.positional_embedding

In [None]:
#The paddding_mask class is designed to create a mask for padding tokens in a sequence.
# This mask is used to prevent the model from paying attention to padding tokens during training and inference.
class paddding_mask(tf.keras.layers.Layer):
    def __init__(self,**kwargs):
        super().__init__(**kwargs)
    def call(self,inputs):
        # Create a mask where padding tokens (zeros) are marked with 0, and non-padding tokens are marked with 1
        mask=1-tf.cast(tf.math.equal(inputs,0),tf.float32)
        #expands the mask to having an additional dimension to make it compatible with the attention mechanisms
        return mask[:, tf.newaxis, :]

In [None]:
#This class ensures that at each position in the sequence, the model can only consider the current and previous positions, not future positions.
class create_look_ahead_mask(tf.keras.layers.Layer):
    def __init__(self,**kwargs):
        #we initiliaze the layer with any extra arguments
        super().__init__(**kwargs)
    # the function call creates a look ahead mask to prevent the model from attending to future tokens during training
    def call(self,sequence_length):
        mask = tf.linalg.band_part(tf.ones((1, sequence_length, sequence_length)), -1, 0)
        return mask

In [None]:
# Custom layer to create padding mask to ignore padding tokens during processing.
# it converts input sequences into dense embeddings
# applies positional encodings to the embeddings to capture positional relationships in the sequences
class input_layer_encoder(tf.keras.layers.Layer):
    def __init__(self,max_sentence_len,embedding_size,vocab_size,**kwargs):
        super().__init__(**kwargs)
        self.paddding_mask=paddding_mask()

        self.embedding=tf.keras.layers.Embedding(vocab_size,
                                                 embedding_size,
                                                 input_length=max_sentence_len,
                                                 input_shape=(max_sentence_len,))

        self.positional_encoding=positional_encoding(max_sentence_len,embedding_size)
    def call(self,inputs):
        mask=self.paddding_mask(inputs)

        emb=self.embedding(inputs)

        emb=self.positional_encoding(emb)
        return emb,mask

In [None]:
# Custom layer to create padding mask
class input_layer_decoder(tf.keras.layers.Layer):
    def __init__(self, max_sentence_len, embedding_size, vocab_size, **kwargs):
        super().__init__(**kwargs)

        # Initialize the padding mask layer
        self.paddding_mask = paddding_mask()

        # Embedding layer to convert token indices to dense vectors
        self.embedding = tf.keras.layers.Embedding(vocab_size,
                                                   embedding_size,
                                                   input_length=max_sentence_len,
                                                   input_shape=(max_sentence_len,))

        # Positional encoding layer to add positional information to embeddings
        self.positional_encoding = positional_encoding(max_sentence_len, embedding_size)

        # Look-ahead mask for the decoder self-attention
        self.look_ahead_mask = create_look_ahead_mask()

        # Maximum sentence length for the look-ahead mask
        self.max_sentence_len = max_sentence_len

    def call(self, inputs):
        # Calculate padding mask for the input sequences
        mask = self.paddding_mask(inputs)

        # Convert input sequences to embeddings
        emb = self.embedding(inputs)

        # Apply positional encoding to the embeddings
        emb = self.positional_encoding(emb)

        # Create look-ahead mask for self-attention in decoder
        look_ahead_mask = self.look_ahead_mask(self.max_sentence_len)

        # Combine look-ahead mask and padding mask using bitwise AND operation
        look_ahead_mask = tf.bitwise.bitwise_and(tf.cast(look_ahead_mask, dtype=tf.int8),
                                                 tf.cast(mask, dtype=tf.int8))

        # Return processed embeddings and combined mask
        return emb, look_ahead_mask


In [None]:
class Encoder_layer(tf.keras.layers.Layer):
    def __init__(self,
                 embedding_size,
                 heads_num,
                 dense_num,
                 dropout_rate=0.0,
                 **kwargs):

        super().__init__(**kwargs)

        # Multi-head self-attention mechanism
        self.multi_attention = tf.keras.layers.MultiHeadAttention(
            num_heads=heads_num,
            key_dim=embedding_size,
            dropout=dropout_rate,
        )

        # Dropout layer for regularization
        self.Dropout = tf.keras.layers.Dropout(dropout_rate)

        # Feed-forward neural network (position-wise feed-forward network)
        self.ff = tf.keras.Sequential([
            tf.keras.layers.Dense(dense_num, activation="relu"),
            tf.keras.layers.Dense(dense_num, activation="relu"),
            tf.keras.layers.Dense(dense_num, activation="relu"),
            tf.keras.layers.Dense(embedding_size, activation="relu"),
            tf.keras.layers.Dropout(dropout_rate)
        ])

        # Residual connection followed by layer normalization
        self.add = tf.keras.layers.Add()
        self.norm1 = tf.keras.layers.LayerNormalization()
        self.norm2 = tf.keras.layers.LayerNormalization()

    def call(self, inputs, mask, training):
        # Multi-head self-attention
        mha = self.multi_attention(inputs, inputs, inputs, mask=mask)

        # Add and normalize the residual connection (skip connection)
        norm1 = self.norm1(self.add([inputs, mha]))

        # Feed-forward neural network
        ff = self.ff(norm1)

        # Apply dropout for regularization
        ff_drop = self.Dropout(ff, training=training)

        # Add and normalize the residual connection (skip connection)
        output = self.norm2(self.add([ff_drop, norm1]))

        return output


In [None]:
#this class is responsible for processing input sequences by applying multi-head self-attention and feed-forward operations with residual connections.
class Encoder(tf.keras.layers.Layer):
    def __init__(self,
                 max_sentence_len,
                 embedding_size,
                 vocab_size,
                 heads_num,
                 dense_num,
                 num_of_encoders,
                 **kwargs):
        super().__init__(**kwargs)
        self.add=tf.keras.layers.Add()
        self.input_layer=input_layer_encoder(max_sentence_len,embedding_size,vocab_size)
        self.encoder_layer=[Encoder_layer(embedding_size,heads_num, dense_num) for i in range (num_of_encoders)]
        self.num_layers=num_of_encoders
    def call(self,inputs,training):
        emb,mask=self.input_layer(inputs)
        skip=emb
        for layer in self.encoder_layer:
            emb = layer(emb, mask,training)
            emb = self.add([skip,emb])
            skip = emb
        return emb,mask

In [None]:
import tensorflow as tf

class decoder_layer(tf.keras.layers.Layer):
    def __init__(self,
                 embedding_size,
                 heads_num,
                 dense_num,
                 dropout_rate=0.0,
                 **kwargs):

        super().__init__(**kwargs)

        # Multi-head self-attention for masked decoder inputs
        self.masked_mha = tf.keras.layers.MultiHeadAttention(
            num_heads=heads_num,
            key_dim=embedding_size,
            dropout=dropout_rate,
        )

        # Multi-head attention for attending to encoder outputs
        self.multi_attention = tf.keras.layers.MultiHeadAttention(
            num_heads=heads_num,
            key_dim=embedding_size,
            dropout=dropout_rate,
        )

        # Feed-forward neural network
        self.ff = tf.keras.Sequential([
            tf.keras.layers.Dense(dense_num, activation="relu"),
            tf.keras.layers.Dense(dense_num, activation="relu"),
            tf.keras.layers.Dense(dense_num, activation="relu"),
            tf.keras.layers.Dense(embedding_size, activation="relu"),
            tf.keras.layers.Dropout(dropout_rate)
        ])

        # Dropout layer
        self.Dropout = tf.keras.layers.Dropout(dropout_rate)

        # Addition layer for residual connections
        self.add = tf.keras.layers.Add()

        # Layer normalization layers
        self.norm1 = tf.keras.layers.LayerNormalization()
        self.norm2 = tf.keras.layers.LayerNormalization()
        self.norm3 = tf.keras.layers.LayerNormalization()

    def call(self, inputs, encoder_output, enc_mask, look_head_mask, training):
        # Masked multi-head self-attention for decoder inputs
        mha_out, atten_score = self.masked_mha(inputs, inputs, inputs, look_head_mask, return_attention_scores=True)

        # Add and normalize the residual connection
        Q1 = self.norm1(self.add([inputs, mha_out]))

        # Multi-head attention over encoder outputs
        mha_out2, atten_score2 = self.multi_attention(Q1, encoder_output, encoder_output, enc_mask, return_attention_scores=True)

        # Add and normalize the residual connection
        Z = self.norm2(self.add([Q1, mha_out2]))

        # Feed-forward network
        fc = self.ff(Z)

        # Apply dropout for regularization
        A = self.Dropout(fc, training=training)

        # Add and normalize the residual connection
        output = self.norm3(self.add([A, Z]))

        return output


In [None]:
# the class decoder encapsulates multiple layers of the class decooder layer instances within a transformer decoder stack.
# it orchestrates the processing of input sequences throught, embedding, masking, positional encoding and multiple decoder layers
class Decoder(tf.keras.layers.Layer):
    def __init__(self,
                 max_sentence_len,
                 embedding_size,
                 vocab_size,
                 heads_num,
                 dense_num,
                 num_of_decoders,
                 **kwargs):
        super().__init__(**kwargs)
        self.add=tf.keras.layers.Add()
        self.input_layer=input_layer_decoder(max_sentence_len,embedding_size,vocab_size)
        self.decoder_layer=[decoder_layer(embedding_size,heads_num, dense_num) for i in range (num_of_decoders)]
        self.num_layers=num_of_decoders
    def call(self,inputs,encoder_output,enc_mask,training):
        emb,look_head_mask=self.input_layer(inputs)
        skip=emb
        for layer in self.decoder_layer:
            emb = layer(emb,encoder_output,enc_mask,look_head_mask,training)
            emb = self.add([skip,emb])
            skip = emb
        return emb

In [None]:
#this class integrates the encoder and decoder to implement a transformer

class transformer(tf.keras.Model):
    def __init__(self,
                 max_sentence_len_1=None,
                 max_sentence_len_2=None,
                 embedding_size=None,
                 vocab_size1=None,
                 vocab_size2=None,
                 heads_num=None,
                 dense_num=None,
                 num_of_encoders_decoders=None):

        super(transformer,self).__init__()

        # Initialize the encoder with specified parameters
        self.Encoder = Encoder(max_sentence_len_1,
                               embedding_size,
                               vocab_size1,
                               heads_num,
                               dense_num,
                               num_of_encoders_decoders)

        # Initialize the decoder with specified parameters
        self.Decoder = Decoder(max_sentence_len_2,
                               embedding_size,
                               vocab_size2,
                               heads_num,
                               dense_num,
                               num_of_encoders_decoders)

        # Final dense layer for transforming decoder outputs to vocabulary size
        self.Final_layer = tf.keras.layers.Dense(vocab_size2, activation='relu')

        # Softmax activation to generate probabilities over the vocabulary
        self.softmax = tf.keras.layers.Softmax(axis=-1)

    def call(self, inputs):
        # Unpack input sequences
        input_sentence, output_sentence = inputs

        # Encode the input sentence to get encoder output and mask
        enc_output, enc_mask = self.Encoder(input_sentence)

        # Decode using the output sentence, encoder output, and encoder mask
        dec_output = self.Decoder(output_sentence, enc_output, enc_mask)

        # Apply final dense layer
        final_out = self.Final_layer(dec_output)

        # Apply softmax to get the final probabilities over the vocabulary
        softmax_out = self.softmax(final_out)

        return softmax_out


In [None]:
# here I initialized an instance of the transformer class with specific parameters
#after initialization we can use this for training, evaluation etc
tran=transformer(max_sentence_len_1=14, #max length of input sequences
                     max_sentence_len_2=13, #max length of output sequences
                     embedding_size=300, #dimensionality of the embedding vectors used in encoder and decoder layer
                     vocab_size1=german_vocab_size+1, #size of vocabulary for the input language(German)
                     vocab_size2=english_vocab_size+1, #size of vocabulary for the input language(English)
                     heads_num=5, #number of attention heads in each encoder and decoder layer
                     dense_num=512, #number of units in the feed forward neural network layers, within each encoder and decoder
                     num_of_encoders_decoders=2) #number of encoder and decoder layers stackedin the transformer model

In [None]:
tran((preproc_german_sentences[:1],preproc_english_sentences[:1,:-1]))
tran.summary()

Model: "transformer"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 encoder (Encoder)           multiple                  109471948 
                                                                 
 decoder (Decoder)           multiple                  43907248  
                                                                 
 dense_16 (Dense)            multiple                  35134526  
                                                                 
 softmax (Softmax)           multiple                  0         
                                                                 
Total params: 188513722 (719.12 MB)
Trainable params: 188513722 (719.12 MB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


As we can see here we have the total number of parameters which is 188513722 it is very large and also has a memory size of 719.12 MB.


In [None]:
#here the "compile" prepares the model for trainingn by specifying the loss function, optimizer and metrics
tran.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(),
             optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
             metrics=["accuracy"])

In [None]:
#here I start training the model, input data is the preproc_german_sentences, preproc_english_sentences and target data is "preproc_english_sentences"
tran.fit((preproc_german_sentences,preproc_english_sentences[:,:-1]),
         preproc_english_sentences[:,1:,tf.newaxis],
         epochs=1, verbose = True,
         batch_size=64)



<keras.src.callbacks.History at 0x7836ed0ed960>

In [None]:
#the prepare_pred function is intialized to prepare the input sequences for prediction using a transformer model.
def prepare_pred(sent):
    output=english_tokenizer.texts_to_sequences(sent)
    output=pad(output,13)
    return output

In [None]:
#this function is designed to generate predictions using the "tran" transfomer model that I have created earlier
def pred(i):
    sent = ["<SOS>"]  # Initialize the output sentence with a start token
    german_token = prepare_pred(sent)  # Prepare the initial input token for the decoder

    # Predict the next word iteratively until the end-of-sequence token is predicted or 12 words are generated
    for j in range(12):
        # Prepare the current input token for the decoder
        german_token = prepare_pred(sent)

        # Predict the next word using the transformer model
        word = np.argmax(tran.predict((preproc_german_sentences[[i]], german_token), verbose=0), -1)[0, j]

        # Convert the predicted word index back to text and append it to the output sentence
        sent[0] = sent[0] + " " + english_tokenizer.sequences_to_texts(np.array([[word]]))[0]

        # Break the loop if the end-of-sequence token is predicted
        if english_tokenizer.sequences_to_texts(np.array([[word]]))[0] == "eos":
            break

    return sent

In [None]:
from nltk.translate.bleu_score import sentence_bleu

def show():
    i = random.randint(0,170111)

    print("german sent : ", german_tokenizer.sequences_to_texts(preproc_german_sentences[[i]]))

    # Assuming that pred(i) returns a list with a single string element
    predict_sent = pred(i)[0]
    print("predict sent : ", predict_sent)

    # Assuming that english_tokenizer.sequences_to_texts(preproc_english_sentences[[i]]) returns a list with a single string element
    true_sent = english_tokenizer.sequences_to_texts(preproc_english_sentences[[i]])[0]
    print("true sent : ", true_sent)

    # Since predict_sent and true_sent are now strings, splitting should work
    predict_sent_words = predict_sent.split(' ')
    true_sent_words = true_sent.split(' ')

    # If your predicted sentence starts with '<SOS> ', you might want to remove it
    if predict_sent_words[0] == '<SOS>':
        predict_sent_words = predict_sent_words[1:]

    # Calculate the BLEU score
    bleu_score = sentence_bleu([true_sent_words], predict_sent_words)
    print('BLEU score: {}'.format(bleu_score))


# Call the function
for i in range(5):
    show()
    print("----------------")

german sent :  ['herkunft europäische union sicherheit genug sein doch im moment ist sie es nicht eos']
predict sent :  <SOS> european union is not however the european union is not the moment
true sent :  eu origin may be assurance enough but it is not at this moment eos
BLEU score: 4.90260194222537e-155
----------------
german sent :  ['und zwar mit abschreckenden strafen belegt werden sondern auch der besitz von kinderpornographie eos']
predict sent :  <SOS> and with the other people are also being the part of all
true sent :  deterrent but that the possession of child pornography itself should also be punishable eos
BLEU score: 1.090462944153118e-231
----------------
german sent :  ['sehr gründliche diskussion über die rechtsgrundlage des berichts von frau schleicher geführt haben eos']


The hypothesis contains 0 counts of 2-gram overlaps.
Therefore the BLEU score evaluates to 0, independently of
how many N-gram overlaps of lower order it contains.
Consider using lower n-gram order or use SmoothingFunction()


predict sent :  <SOS> i have had a debate on the legal basis of report eos
true sent :  debate took place on mr schleicher's report in the committee on constitutional affairs eos
BLEU score: 1.2390051155620427e-231
----------------
german sent :  ['union vor übergriffen und eingriffen europäischer institutionen in die grundrechte der bürger schützen eos']
predict sent :  <SOS> the european union and european institutions and european institutions eos
true sent :  interference and intervention of european institutions in the fundamental rights of the citizens eos
BLEU score: 4.854408244229234e-155
----------------
german sent :  ['3 aufgrund seines diskriminierenden inhalts im hinblick auf die religionszugehörigkeit überhaupt zulässig ist eos']
predict sent :  <SOS> the discriminatory discriminatory access to the discriminatory discriminatory access to health eos
true sent :  been checked for admissibility as it is wholly discriminatory on grounds of religion eos
BLEU score: 9.853445011

REMARKS:

This code trains the model using transformers. <br>
As we can see the Bleu Score is extremly low which shows that the translation is very bad. I have also showed the german word, the translated word and the excact word that shows how should it be translated. <br>
The result of the bleu score is very low because we have a very large dataset as I have showed in the upper part of the code with 51722761 English words and 48454703 German words, and I used only 30.000 sentences to train on. The reason for this was that it was taking a very large amount of time and the computational resources that I use are not that efficient for large dataset.<br>