<a href="https://colab.research.google.com/github/LlakmalGamage/Translator-Using-Transformers/blob/main/translator_german_version_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Data Loading and Preprocessing

In [8]:
import pandas as pd
german_file_path="data_german.csv"


In [9]:
df1=pd.read_csv(german_file_path)

In [10]:
import random
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow as tf
import string
import re
import csv



In [11]:
df1.head(20)

Unnamed: 0,English,German
0,Go.,Geh.
1,Hi.,Hallo!
2,Hi.,Grüß Gott!
3,Run!,Lauf!
4,Run.,Lauf!
5,Wow!,Potzdonner!
6,Wow!,Donnerwetter!
7,Fire!,Feuer!
8,Help!,Hilfe!
9,Help!,Zu Hülf!


In [12]:
first_column_values_last_10_rows=df1['English'].tail(10)


paragraph = '\n'.join(first_column_values_last_10_rows)
# Print the paragraph
print(paragraph)

As a prank, some students let three goats loose inside their school after painting the numbers 1, 2 and 4 on the sides of the goats. The teachers spent most of the day looking for goat number 3.
The small crowd at Hiroshima Peace Memorial Park stood for a moment of silence at 8:15 a.m., the exact moment an atomic bomb nicknamed “Little Boy” was dropped from the U.S. warplane Enola Gay.
In today's world, we have to equip all our kids with an education that prepares them for success, regardless of what they look like, or how much their parents make, or the zip code that they live in.
Death is something that we're often discouraged to talk about or even think about, but I've realized that preparing for death is one of the most empowering things you can do. Thinking about death clarifies your life.
At a moment when our economy is growing, our businesses are creating jobs at the fastest pace since the 1990s, and wages are starting to rise again, we have to make some choices about the kind o

In [13]:
len(df1)

221533

# Split the English and german translation pairs

In [14]:
#common function for both spanish and german

class split_pairs:
 def split_pairs_method(self,df1):
  text_pairs=[]

  for i in range(len(df1)):
    english,language=df1["English"][i],df1["German"][i]
    language="[start] " + language + " [end]"
    text_pairs.append((english,language))

  return text_pairs


In [15]:
#randomly selecting that if the above function work
class random_pair_test:
  def random_test_method(self,text_pairs):
   for i in range(3):
    print(random.choice(text_pairs))


In [16]:
#pairing for german text

german_text_pairs=split_pairs().split_pairs_method(df1)

random_pair_test().random_test_method(german_text_pairs)

('What do we do after this?', '[start] Was machen wir danach? [end]')
('He never returned from that expedition.', '[start] Er kehrte nie von dieser Expedition zurück. [end]')
('Tom was fired last week.', '[start] Tom wurde letzte Woche entlassen. [end]')


# Randomizing the data

In [17]:
random.shuffle(german_text_pairs)

# Split the data into training, validation,testing

In [18]:
#class for splitting text pairs in to train,test,validation
class splitting:
    def splitting_method(self,text_pairs):
        num_val_sample=int(0.15*len(text_pairs))
        num_train_samples=len(text_pairs) - 2 * num_val_sample
        train_pairs=text_pairs[:num_train_samples]
        val_pairs=text_pairs[num_train_samples:num_train_samples+num_val_sample]
        test_pairs=text_pairs[num_train_samples+num_val_sample:]

        print("Total Sentences: ",len(text_pairs))
        print("Training set size: ",len(train_pairs))
        print("Validation set size: ",len(val_pairs))
        print("Testng set size: ",len(test_pairs))
        return train_pairs,val_pairs,test_pairs



In [19]:
german_train_pairs,german_val_pairs,german_test_pairs=splitting().splitting_method(german_text_pairs)


Total Sentences:  221533
Training set size:  155075
Validation set size:  33229
Testng set size:  33229


In [20]:
print(len(german_test_pairs)+len(german_train_pairs)+len(german_val_pairs))

221533


In [21]:
print(german_val_pairs[200])

('I drink either coffee or tea every morning.', '[start] Ich trinke jeden Morgen entweder Kaffee oder Tee. [end]')


# Removing Puncuations

In [22]:
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")


f"[{re.escape(strip_chars)}]"

f"{5+3}"

'8'

# Vectorizing the English and spanish text pairs

In [23]:
def custom_standardization(input_string):
    lowercase=tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase,f"[{re.escape(strip_chars)}]", "")

vocab_size=15000
sequence_length=20

source_vectorization=layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)

target_vectorization=layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length+1,
    standardize=custom_standardization,
)

train_english_texts=[pair[0] for pair in german_train_pairs]
train_german_texts=[pair[1] for pair in german_train_pairs]

source_vectorization.adapt(train_english_texts)
target_vectorization.adapt(train_german_texts)

In [24]:
print(train_english_texts[1])
print(train_german_texts[1])

print(source_vectorization)

Do you trust her?
[start] Vertraust du ihr? [end]
<keras.src.layers.preprocessing.text_vectorization.TextVectorization object at 0x7957b8238df0>


In [25]:
print(german_train_pairs[1:5])

[('Do you trust her?', '[start] Vertraust du ihr? [end]'), ("I don't care for alcoholic drinks.", '[start] Ich mache mir nichts aus Alkohol. [end]'), ('We should see each other more often.', '[start] Wir sollten uns öfter treffen. [end]'), ('Tom got discouraged.', '[start] Tom verlor den Mut. [end]')]


# Preparing datasets for the translation task

In [26]:
batch_size=64

def format_dataset(eng,ger):
    eng=source_vectorization(eng)
    ger=target_vectorization(ger)
    return ({"english":eng,
             "german":ger[:,:-1],
             },ger[:,1:])

def make_dataset(pairs):
    eng_texts,ger_texts=zip(*pairs)
    eng_texts=list(eng_texts)
    ger_texts=list(ger_texts)
    dataset=tf.data.Dataset.from_tensor_slices((eng_texts,ger_texts))
    dataset=dataset.batch(batch_size)
    dataset=dataset.map(format_dataset,num_parallel_calls=4)

    return dataset.shuffle(2048).prefetch(16).cache()

train_data=make_dataset(german_train_pairs)
val_data=make_dataset(german_val_pairs)

# print(train_data)
# print(val_data)

for inputs, targets in train_data.take(1):
    print(f"inputs['english'].shape: {inputs['english'].shape}")
    print(f"inputs['german'].shape: {inputs['german'].shape}")
    print(f"targets.shape: {targets.shape}")



inputs['english'].shape: (64, 20)
inputs['german'].shape: (64, 20)
targets.shape: (64, 20)


In [27]:
print(list(train_data.as_numpy_iterator())[50])

({'english': array([[   2,  165,  944, ...,    0,    0,    0],
       [   6,   93,  897, ...,    0,    0,    0],
       [ 143,   37,    9, ...,    0,    0,    0],
       ...,
       [  73,   57,    4, ...,    0,    0,    0],
       [   6,  242,  318, ...,    0,    0,    0],
       [   2, 2312,  927, ...,    0,    0,    0]]), 'german': array([[   2,    5,   56, ...,    0,    0,    0],
       [   2,    4,  191, ...,    0,    0,    0],
       [   2, 1228,   89, ...,    0,    0,    0],
       ...,
       [   2,  168,  225, ...,    0,    0,    0],
       [   2,    4,  402, ...,    0,    0,    0],
       [   2,    5,   16, ...,    0,    0,    0]])}, array([[   5,   56,  195, ...,    0,    0,    0],
       [   4,  191,  128, ...,    0,    0,    0],
       [1228,   89,  222, ...,    0,    0,    0],
       ...,
       [ 168,  225,    9, ...,    0,    0,    0],
       [   4,  402,  508, ...,    0,    0,    0],
       [   5,   16, 2930, ...,    0,    0,    0]]))


# Transformers encoder implemented as a subclassed Layer

In [28]:
class TransformerEncoder(layers.Layer):
    def __init__(self,embed_dim,dense_dim,num_heads,**kwargs):
        super().__init__(**kwargs)
        self.embed_dim=embed_dim
        self.dense_dim=dense_dim
        self.num_heads=num_heads
        self.attention=layers.MultiHeadAttention(
           num_heads=num_heads,key_dim=embed_dim)
        self.dense_proj=keras.Sequential(
            [layers.Dense(dense_dim,activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1=layers.LayerNormalization()
        self.layernorm_2=layers.LayerNormalization()

    def call(self,inputs,mask=None):
        if mask is not None:
            mask=mask[:,tf.newaxis,:]
        attention_output=self.attention(
            inputs,inputs,attention_mask=mask
        )
        project_input=self.layernorm_1(inputs+attention_output)
        project_output=self.dense_proj(project_input)

        return self.layernorm_2(project_input+project_output)

    def get_config(self):
        config=super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

# Transformer decorder

In [29]:
class TransformerDecoder(layers.Layer):
    def __init__(self,embed_dim,dense_dim,num_heads,**kwargs):
        super().__init__(**kwargs)
        self.embed_dim=embed_dim
        self.dense_dim=dense_dim
        self.num_heads=num_heads
        self.attention_1=layers.MultiHeadAttention(
            num_heads=num_heads,key_dim=embed_dim)
        self.attention_2=layers.MultiHeadAttention(
            num_heads=num_heads,key_dim=embed_dim)
        self.dense_proj=keras.Sequential(
            [layers.Dense(dense_dim,activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1=layers.LayerNormalization()
        self.layernorm_2=layers.LayerNormalization()
        self.layernorm_3=layers.LayerNormalization()
        self.supports_masking=True

    def get_config(self):
        config=super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

    def get_casual_attention_mask(self,inputs):
        input_shape=tf.shape(inputs)
        batch_size,sequence_length=input_shape[0],input_shape[1]
        i=tf.range(sequence_length)[:,tf.newaxis]
        j=tf.range(sequence_length)
        mask=tf.cast(i>=j,dtype="int32")
        mask=tf.reshape(mask,(1,input_shape[1],input_shape[1]))
        mult=tf.concat(
            [tf.expand_dims(batch_size,-1),
             tf.constant([1,1],dtype=tf.int32)],axis=0)

        return tf.tile(mask,mult)


    def call(self,inputs,encorder_outputs,mask=None):
        casual_mask=self.get_casual_attention_mask(inputs)
        if mask is not None:
            padding_mask=tf.cast(
                mask[:,tf.newaxis,:],dtype="int32"
            )
            padding_mask=tf.minimum(padding_mask,casual_mask)
        else:
            padding_mask=mask
        attention_output_1=self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=casual_mask
        )
        attention_output_1=self.layernorm_1(inputs+attention_output_1)
        attention_output_2=self.attention_2(
            query=attention_output_1,
            value=encorder_outputs,
            key=encorder_outputs,
            attention_mask=padding_mask,
        )
        attention_output_2=self.layernorm_2(
            attention_output_1+attention_output_2
        )
        proj_output=self.dense_proj(attention_output_2)
        return self.layernorm_3(attention_output_2+proj_output)




# Positional Encoding

In [30]:
class PositionalEmbedding(layers.Layer):
    def __init__(self,sequence_length,input_dim,output_dim,**kwargs):
        super().__init__(**kwargs)
        self.token_embeddings=layers.Embedding(
            input_dim=input_dim,output_dim=output_dim)
        self.position_embeddings=layers.Embedding(
            input_dim=sequence_length,output_dim=output_dim)
        self.sequence_length=sequence_length
        self.input_dim=input_dim
        self.output_dim=output_dim

    def call(self,inputs):
        length=tf.shape(inputs)[-1]
        positions=tf.range(start=0,limit=length,delta=1)
        embedded_tokens=self.token_embeddings(inputs)
        embedded_positions=self.position_embeddings(positions)

        return embedded_tokens + embedded_positions


    def compute_mask(self, inputs, mask=None):
        return tf.not_equal(inputs, 0)

    def get_config(self):
        config=super(PositionalEmbedding,self).get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

In [31]:
# from tensorflow.python.framework.ops import disable_eager_execution
# disable_eager_execution()

# End-to-End Transformer

In [32]:
embed_dim=256
dense_dim=2048
num_heads=8

encoder_inputs=keras.Input(shape=(None,), dtype="int64", name="english")
x=PositionalEmbedding(sequence_length,vocab_size,embed_dim)(encoder_inputs)
encoder_outputs=TransformerEncoder(embed_dim,dense_dim,num_heads)(x)

decorder_inputs=keras.Input(shape=(None,),dtype="int64",name="german")
x=PositionalEmbedding(sequence_length,vocab_size,embed_dim)(decorder_inputs)
x=TransformerDecoder(embed_dim,dense_dim,num_heads)(x,encoder_outputs)
x=layers.Dropout(0.5)(x)
decorder_outputs=layers.Dense(vocab_size,activation="softmax")(x)
transformer=keras.Model([encoder_inputs,decorder_inputs],decorder_outputs)

In [33]:
transformer.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 english (InputLayer)        [(None, None)]               0         []                            
                                                                                                  
 german (InputLayer)         [(None, None)]               0         []                            
                                                                                                  
 positional_embedding (Posi  (None, None, 256)            3845120   ['english[0][0]']             
 tionalEmbedding)                                                                                 
                                                                                                  
 positional_embedding_1 (Po  (None, None, 256)            3845120   ['german[0][0]']          

# Training the sequence-to-sequence Transformer

In [34]:
transformer.compile(
    optimizer="rmsprop",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
)

transformer.fit(train_data,epochs=50,validation_data=val_data)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<keras.src.callbacks.History at 0x7957a0534910>

In [47]:
import numpy as np

ger_vocab = target_vectorization.get_vocabulary()
ger_index_lookup = dict(zip(range(len(ger_vocab)), ger_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
    tokenized_input_sentence = source_sentence = source_vectorization([input_sentence])
    decoded_sentence = "[start]"

    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = target_vectorization([decoded_sentence])[:, :-1]
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])
        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = ger_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token
        if sampled_token == "[end]":
            break

    return decoded_sentence

In [48]:
ger_eng_texts = [pair[0] for pair in german_test_pairs]
for _ in range(20):
    input_sentence = random.choice(ger_eng_texts)
    print("-")
    print(input_sentence)
    print(decode_sequence(input_sentence))

-
I cannot tell you everything that happened to me yesterday.
[start] ich kann ihnen nicht alles sagen was mir gestern passiert ist [end]
-
I don't want to cook.
[start] ich möchte nicht kochen [end]
-
If I can't trust you, who can I trust?
[start] wenn ich nicht vertrauen kann wer ich vertrauen kann [end]
-
That isn't to my liking.
[start] das sagt mir nicht zu [end]
-
I can't show you this.
[start] ich kann sie nicht zeigen [end]
-
The old man lives alone.
[start] der alte mann lebt allein [end]
-
I've said all I have to say.
[start] ich habe gesagt alles was ich zu sagen habe [end]
-
I already know.
[start] ich weiß es schon [end]
-
Tom is trying to help you.
[start] tom versucht sie zu helfen [end]
-
The door was locked from within.
[start] die tür war von zu erfahren [end]
-
I didn't ask you to do that.
[start] ich habe nicht um das zu tun [end]
-
The snow has finally melted.
[start] der schnee ist endlich ihn [end]
-
I don't have any evidence.
[start] ich habe keine [UNK] [end]
-

In [37]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [38]:
transformer.save('/content/drive/My Drive/Translators')