In [None]:
import random
import tensorflow as tf
import numpy as np
import string
import re
from sklearn.model_selection import train_test_split

In [None]:
file_loc="/content/spa.txt"
with open(file_loc) as f:
    lines=f.read().split("\n")[:-1]
data_pairs=[]
for line in lines:
    english, spanish=line.split("\t")
    spanish="[start] "+spanish+" [end]"
    data_pairs.append((english,spanish))

In [None]:
len(data_pairs)

118964

In [None]:
random.shuffle(data_pairs)

In [None]:
data_pairs

In [None]:
train_data,test_data=train_test_split(data_pairs)

In [None]:
train_data, val_data=train_test_split(train_data)

In [None]:
print(len(train_data),len(test_data),len(val_data))

66917 29741 22306


In [None]:
strip_chars=string.punctuation+"¿"
strip_chars=strip_chars.replace("[","")
strip_chars=strip_chars.replace("]","")

In [None]:
strip_chars

'!"#$%&\'()*+,-./:;<=>?@\\^_`{|}~¿'

In [None]:
f"[{re.escape(strip_chars)}]"

'[!"\\#\\$%\\&\'\\(\\)\\*\\+,\\-\\./:;<=>\\?@\\\\\\^_`\\{\\|\\}\\~¿]'

In [None]:
from tensorflow.keras import layers
def customStandardisation(input_string):
    lowercase=tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase,f"[{re.escape(strip_chars)}]","")

In [None]:
vocab_size=15000
sequence_length=20

In [None]:
english_vectorization=layers.experimental.preprocessing.TextVectorization(
    max_tokens=vocab_size,
    output_mode='int',
    output_sequence_length=sequence_length)

In [None]:
spanish_vectorization=layers.experimental.preprocessing.TextVectorization(
    max_tokens=vocab_size,
    output_mode='int',
    output_sequence_length=sequence_length+1,
    standardize=customStandardisation)

In [None]:
train_english_texts=[pair[0] for pair in train_data]
train_spanish_texts=[pair[1] for pair in train_data]

In [None]:
english_vectorization.adapt(train_english_texts)
spanish_vectorization.adapt(train_spanish_texts)

In [None]:
english_vectorization

<keras.layers.preprocessing.text_vectorization.TextVectorization at 0x7efeb666ddf0>

In [None]:
batch_size=64
def format_dataset(eng,spa):
    eng=english_vectorization(eng)
    spa=spanish_vectorization(spa)
    return ({"english":eng,"spanish":spa[:,:-1],},spa[:,1:])
def make_dataset(pairs):
    eng_texts,spa_texts=zip(*pairs)
    eng_texts=list(eng_texts)
    spa_texts=list(spa_texts)
    dataset=tf.data.Dataset.from_tensor_slices((eng_texts,spa_texts))
    dataset=dataset.batch(batch_size)
    dataset=dataset.map(format_dataset,num_parallel_calls=4)
    return dataset.shuffle(2048).prefetch(16).cache()

In [None]:
train_ds=make_dataset(train_data)
val_ds=make_dataset(val_data)

In [None]:
print(list(train_ds.as_numpy_iterator())[50])

({'english': array([[   5,  101, 1630, ...,    0,    0,    0],
       [2452,   23, 1172, ...,    0,    0,    0],
       [  40,    3,  145, ...,    0,    0,    0],
       ...,
       [  21,  251,   37, ...,    0,    0,    0],
       [  21,   14, 5315, ...,    0,    0,    0],
       [   3,  496,   10, ...,    0,    0,    0]]), 'spanish': array([[   2,   50,   12, ...,    0,    0,    0],
       [   2,   20, 3088, ...,    0,    0,    0],
       [   2,   54,  172, ...,    0,    0,    0],
       ...,
       [   2,   26,  136, ...,    0,    0,    0],
       [   2,   26,   15, ...,    0,    0,    0],
       [   2,   35, 1094, ...,    0,    0,    0]])}, array([[   50,    12,   529, ...,     0,     0,     0],
       [   20,  3088,    15, ...,     0,     0,     0],
       [   54,   172,    56, ...,     0,     0,     0],
       ...,
       [   26,   136,    18, ...,     0,     0,     0],
       [   26,    15, 11189, ...,     0,     0,     0],
       [   35,  1094,    11, ...,     0,     0,     0]]

In [None]:
embed_dim = 256
latent_dim = 1024

In [None]:
spa_vocab = spanish_vectorization.get_vocabulary()

In [None]:
spa_vocab

In [None]:
spa_index_lookup=dict(zip(range(len(spa_vocab)),spa_vocab))

In [None]:
max_decoded_sentence_length=20

In [None]:
class TransformerEncoder(layers.Layer):
    def __init__(self,embed_dim,dense_dim,num_heads,**kwargs):
        super().__init__(**kwargs)
        self.embed_dim=embed_dim
        self.dense_dim=dense_dim
        self.num_heads=num_heads
        self.attention=layers.MultiHeadAttention(
        num_heads=num_heads,key_dim=embed_dim)
        self.dense_proj=tf.keras.Sequential(
        [layers.Dense(dense_dim, activation='relu'),layers.Dense(embed_dim),])
        self.layernorm_1=layers.LayerNormalization()
        self.layernorm_2=layers.LayerNormalization()
        
    def call(self,inputs,mask=None):
        if mask is not None:
            mask=mask[:,tf.newaxis,:]
        attention_output=self.attention(inputs,inputs,attention_mask=mask)
        proj_input=self.layernorm_1(inputs+attention_output)
        proj_output=self.dense_proj(proj_input)
        return self.layernorm_2(proj_input+proj_output)
#     def get_config(self)


In [None]:
class TransformerDecoder(layers.Layer):
    def __init__(self,embed_dim,dense_dim,num_heads,**kwargs):
        super().__init__(**kwargs)
        self.embed_dim=embed_dim
        self.dense_dim=dense_dim
        self.num_heads=num_heads
        self.attention_1=layers.MultiHeadAttention(
        num_heads=num_heads,key_dim=embed_dim)
        self.attention_2=layers.MultiHeadAttention(
        num_heads=num_heads,key_dim=embed_dim)
        self.dense_proj=tf.keras.Sequential(
        [layers.Dense(dense_dim,activation='relu'),
         layers.Dense(embed_dim),])
        self.layernorm_1=layers.LayerNormalization()
        self.layernorm_2=layers.LayerNormalization()
        self.layernorm_3=layers.LayerNormalization()
        self.supports_masking=True
        
#     def get_config(self):

    def get_casual_attention_mask(self,inputs):
        input_shape=tf.shape(inputs)
        batch_size, sequence_length=input_shape[0],input_shape[1]
        i=tf.range(sequence_length)[:,tf.newaxis]
        j=tf.range(sequence_length)
        mask=tf.cast(i>=j,dtype='int32')
        mask=tf.reshape(mask,(1,input_shape[1],input_shape[1]))
        mult=tf.concat(
        [tf.expand_dims(batch_size,-1),tf.constant([1,1],dtype=tf.int32)],axis=0)
        return tf.tile(mask,mult)
    
    def call(self,inputs,encoder_outputs,mask=None):
        casual_mask=self.get_casual_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:,tf.newaxis,:],dtype="int32")
            padding_mask=tf.minimum(padding_mask,casual_mask)
            
        attention_output_1=self.attention_1(
        query=inputs,
        value=inputs,
        key=inputs,
        attention_mask=casual_mask)
        
        attention_output_1=self.layernorm_1(inputs+attention_output_1)
        
        attention_output_2=self.attention_2(
        query=attention_output_1,
        value=encoder_outputs,
        key=encoder_outputs,
        attention_mask=padding_mask,)
        
        attention_output_2=self.layernorm_2(attention_output_1+attention_output_2)
        proj_output=self.dense_proj(attention_output_2)
        return self.layernorm_3(attention_output_2+proj_output)

In [None]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)

    def get_config(self):
        config = super().get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

In [None]:
embed_dim=256
dense_dim=2048
num_heads=8

encoder_inputs=tf.keras.Input(shape=(None,),dtype="int64",name='english')
x=PositionalEmbedding(sequence_length,vocab_size,embed_dim)(encoder_inputs)
encoder_outputs=TransformerEncoder(embed_dim,dense_dim,num_heads)(x)

decoder_inputs = tf.keras.Input(shape=(None,),dtype="int64",name="spanish")
x=PositionalEmbedding(sequence_length,vocab_size,embed_dim)(decoder_inputs)
x=TransformerDecoder(embed_dim,dense_dim,num_heads)(x,encoder_outputs)

x=layers.Dropout(0.5)(x)

decoder_outputs=layers.Dense(vocab_size,activation="softmax")(x)
transformer=tf.keras.Model([encoder_inputs,decoder_inputs],decoder_outputs)

In [None]:
transformer.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 english (InputLayer)           [(None, None)]       0           []                               
                                                                                                  
 spanish (InputLayer)           [(None, None)]       0           []                               
                                                                                                  
 positional_embedding (Position  (None, None, 256)   3845120     ['english[0][0]']                
 alEmbedding)                                                                                     
                                                                                                  
 positional_embedding_1 (Positi  (None, None, 256)   3845120     ['spanish[0][0]']            

In [None]:
transformer.compile(optimizer='adam',loss='sparse_categorical_crossentropy',metrics=['accuracy'])

In [None]:
stats=transformer.fit(train_ds,epochs=100,batch_size=32,validation_data=val_ds)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

In [None]:
transformer.save("/content/model.h5")

In [None]:
spa_vocab=spanish_vectorization.get_vocabulary()
spa_index_lookup=dict(zip(range(len(spa_vocab)),spa_vocab))
max_decoded_sentence_length=20

def decode_sequence(input_sentence):
    tokenized_input_sentence=spanish_vectorization([input_sentence])
    decoded_sentence="[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence=spanish_vectorization([decoded_sentence])[:,:,-1]
        predictions=transformer([tokenized_input_sentence,tokenized_target_sentence])
        sampled_token_index=np.argmax(predictions[0,i,:])
        sampled_token=spa_index_lookup(sampled_token_index)
        decoded_sentence+=" "+sampled_token
        if sampled_token=="[end]":
            break
        return decoded_sentence

test_eng_texts=[pair[0] for pair in test_data]

for _ in range(20):
    input_sentence=random.choice(test_eng_texts)