In [1]:
!wget http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip
!unzip -q spa-eng.zip

--2023-04-21 17:42:47--  http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip
Resolving storage.googleapis.com (storage.googleapis.com)... 142.250.107.128, 74.125.20.128, 108.177.98.128, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|142.250.107.128|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2638744 (2.5M) [application/zip]
Saving to: ‘spa-eng.zip’


2023-04-21 17:42:48 (159 MB/s) - ‘spa-eng.zip’ saved [2638744/2638744]



In [2]:
import random 
import os 
import tensorflow as tf
from tensorflow.keras import layers
import numpy as np
from  tensorflow import keras
import string 
import re

In [5]:
file='/kaggle/working/spa-eng/spa.txt'
data_pairs=[]
with open(file) as f:
  lines=f.read().split("\n")[:-1]
  for l in lines:
    eng,spn=l.split("\t")
    spn='[start] '+spn+' [end]'
    data_pairs.append((eng,spn))


In [6]:
random.choice(data_pairs)

('Tom was waiting for Mary.', '[start] Tom estaba esperando a Mary. [end]')

In [7]:
num_val_samples=int(len(data_pairs)*0.15)
num_train_examples=int(len(data_pairs)-2*num_val_samples)
random.shuffle(data_pairs)
train_data=data_pairs[:num_train_examples]
val_data=data_pairs[num_train_examples:num_train_examples+num_val_samples]
test_data=data_pairs[num_train_examples+num_val_samples:]

In [8]:
print(len(train_data))
print("-------------------")
print(len(val_data))
print("-------------------")
print(len(test_data))
print("-------------------")

83276
-------------------
17844
-------------------
17844
-------------------


In [9]:
uneeded_chars=string.punctuation+"¿"
uneeded_chars.replace("[","")
uneeded_chars.replace("]","")

def clear_text(text):
  lower_cased=tf.strings.lower(text)
  return tf.strings.regex_replace(lower_cased,f"[{re.escape(uneeded_chars)}]","")


In [10]:
vocab_size=15000
max_length=20

source_vectorization=layers.TextVectorization(
    max_tokens=vocab_size,output_mode='int',output_sequence_length=max_length
)

target_vectorization=layers.TextVectorization(
    max_tokens=vocab_size,output_mode='int',output_sequence_length=max_length+1,standardize=clear_text
)

In [11]:
eng_train= [p[0]for p in train_data]
spn_train= [p[1]for p in train_data]

source_vectorization.adapt(eng_train)
target_vectorization.adapt(spn_train)

In [12]:
def format_dataset(eng,spa):
  eng=source_vectorization(eng)
  spa=target_vectorization(spa)
  return (
      {
          'eng':eng,
          'spa':spa[:,:-1]
        
      }, spa[:,1:])

In [53]:
batch_size=64
Autotune=tf.data.experimental.AUTOTUNE

def load_data(pairs):
  eng,spa=zip(*pairs)
  eng=list(eng)
  spa=list(spa)
  train_tensor=tf.data.Dataset.from_tensor_slices(
      (eng,spa)
  )
  train_tensor=train_tensor.batch(batch_size)
  train_tensor=train_tensor.map(format_dataset,num_parallel_calls=Autotune)
  return train_tensor.shuffle(2048).prefetch(16).cache()

train_dataset=load_data(train_data)
val_dataset=load_data(val_data)



In [14]:
for inputs, targets in train_dataset.take(1):
    print(f"inputs['english'].shape: {inputs['eng'].shape}")
    print(f"inputs['spanish'].shape: {inputs['spa'].shape}")
    print(f"targets.shape: {targets.shape}")

inputs['english'].shape: (64, 20)
inputs['spanish'].shape: (64, 20)
targets.shape: (64, 20)


**# 1-Seq2Seq Model**

In [15]:
embedding_dim=256
latent_dim=1024


source_input=keras.Input(shape=(None,),name='eng',dtype='int64')
x=layers.Embedding(vocab_size,embedding_dim,mask_zero=True)(source_input)
encoded_source=layers.Bidirectional(layers.GRU(latent_dim),merge_mode='sum')(x)



In [16]:
target_input=keras.Input(shape=(None,),name='spa',dtype='int64')
x=layers.Embedding(vocab_size,embedding_dim,mask_zero=True)(target_input)
decoder_gru = layers.GRU(latent_dim,return_sequences=True)
x = decoder_gru(x, initial_state=encoded_source)
x = layers.Dropout(0.5)(x)
target_next_step = layers.Dense(vocab_size, activation="softmax")(x)
seq2seq_rnn = keras.Model([source_input, target_input], target_next_step)


In [17]:
seq2seq_rnn.compile(
    optimizer="rmsprop",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"])
seq2seq_rnn.fit(train_dataset ,epochs=15, validation_data=val_dataset)

Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15


<keras.callbacks.History at 0x7207384c4850>

In [18]:
spa_vocab = target_vectorization.get_vocabulary()
spanish_lookup=dict()
for i,tok in enumerate(spa_vocab):
  spanish_lookup[i]=tok

In [19]:
def decode_sequence(sentence):
  input_encoded=source_vectorization([sentence])
  max_length=20
  decoded_sentence='start'
  for i in range(max_length):
    target_encoded=target_vectorization([decoded_sentence])
    predicted_probabilities=seq2seq_rnn.predict([input_encoded,target_encoded])
    predicted_token=np.argmax(predicted_probabilities[0,i,:])
    predicted_token=spanish_lookup[predicted_token]
    if predicted_token=='end':
      break
    decoded_sentence += " " + predicted_token

  return decoded_sentence[5:]


In [20]:
txt='we want to buy a car'
decode_sequence(txt)



' queremos comprar un coche'

In [64]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        attention_output = self.attention(
            inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config


In [65]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1),
             tf.constant([1, 1], dtype=tf.int32)], axis=0)
        return tf.tile(mask, mult)

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(
                mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)
        else:
            padding_mask = mask
        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask)
        attention_output_1 = self.layernorm_1(inputs + attention_output_1)
        attention_output_2 = self.attention_2(
            query=attention_output_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        attention_output_2 = self.layernorm_2(
            attention_output_1 + attention_output_2)
        proj_output = self.dense_proj(attention_output_2)
        return self.layernorm_3(attention_output_2 + proj_output)


In [66]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)

    def get_config(self):
        config = super(PositionalEmbedding, self).get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

In [67]:
embed_dim = 256
dense_dim = 2048
num_heads = 8
sequence_length=20
encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="eng")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="spa")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)

In [68]:
transformer.summary()

Model: "model_5"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 eng (InputLayer)               [(None, None)]       0           []                               
                                                                                                  
 spa (InputLayer)               [(None, None)]       0           []                               
                                                                                                  
 positional_embedding_20 (Posit  (None, None, 256)   3845120     ['eng[0][0]']                    
 ionalEmbedding)                                                                                  
                                                                                                  
 positional_embedding_21 (Posit  (None, None, 256)   3845120     ['spa[0][0]']              

In [69]:
transformer.compile(
    optimizer="rmsprop",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"])
transformer.fit(train_dataset, epochs=15, validation_data=val_dataset)

Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15


<keras.callbacks.History at 0x7202c0fb4610>

In [71]:
def decode_sequence_2(sentence):
  input_encoded=source_vectorization([sentence])
  max_length=20
  decoded_sentence='start'
  for i in range(max_length):
    target_encoded=target_vectorization([decoded_sentence])
    predicted_probabilities=transformer.predict([input_encoded,target_encoded])
    predicted_token=np.argmax(predicted_probabilities[0,i,:])
    predicted_token=spanish_lookup[predicted_token]
    if predicted_token=='end':
      break
    decoded_sentence += " " + predicted_token

  return decoded_sentence[5:]
