In [1]:
import tensorflow as tf

In [2]:
from pathlib import Path
import numpy as np

# Download and extract the dataset
url = "https://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip"
path = tf.keras.utils.get_file("spa-eng.zip", origin=url, cache_dir="datasets", extract=True)

# Read the data file
text = (Path(path).with_name("spa-eng") / "spa.txt").read_text(encoding="utf-8")

# Process the data: remove special characters, split into pairs, and shuffle
text = text.replace("¡", "").replace("¿", "")
pairs = [line.split("\t") for line in text.splitlines()]
np.random.seed(42)  # Ensures reproducibility on CPU
np.random.shuffle(pairs)
sentences_en, sentences_es = zip(*pairs)  # Separate into English and Spanish lists

# Define vocab size and sequence length
vocab_size = 10000
max_length = 50

# Create TextVectorization layers for English and Spanish sentences
text_vec_layer_en = tf.keras.layers.TextVectorization(vocab_size, output_sequence_length=max_length)
text_vec_layer_es = tf.keras.layers.TextVectorization(vocab_size, output_sequence_length=max_length)

# Adapt the vectorization layers on the data
text_vec_layer_en.adapt(sentences_en)
text_vec_layer_es.adapt([f"startofseq {s} endofseq" for s in sentences_es])

# Vectorize the sentences
X_train = text_vec_layer_en(sentences_en[:100_000])
X_valid = text_vec_layer_en(sentences_en[100_000:])
X_train_dec = text_vec_layer_es([f"startofseq {s}" for s in sentences_es[:100_000]])
X_valid_dec = text_vec_layer_es([f"startofseq {s}" for s in sentences_es[100_000:]])
Y_train = text_vec_layer_es([f"{s} endofseq" for s in sentences_es[:100_000]])
Y_valid = text_vec_layer_es([f"{s} endofseq" for s in sentences_es[100_000:]])

# Set random seed for reproducibility
tf.random.set_seed(42)

# Define the encoder and decoder input layers
encoder_inputs = tf.keras.layers.Input(shape=(None,), dtype=tf.int64)
decoder_inputs = tf.keras.layers.Input(shape=(None,), dtype=tf.int64)



In [3]:
embed_size = 128
encoder_input_ids = text_vec_layer_en(encoder_inputs)
decoder_input_ids = text_vec_layer_es(decoder_inputs)
encoder_embedding_layer = tf.keras.layers.Embedding(vocab_size, embed_size,
                                                    mask_zero=True)
decoder_embedding_layer = tf.keras.layers.Embedding(vocab_size, embed_size,
                                                    mask_zero=True)
encoder_embeddings = encoder_embedding_layer(encoder_input_ids)
decoder_embeddings = decoder_embedding_layer(decoder_input_ids)


# Transformer Architecture

# Trainable Positional Encoding(Not used in original architecture)

In [4]:
from tensorflow.keras.layers import Layer, Embedding

# Custom layer to add positional encoding
class AddPositionalEncoding(Layer):
    def __init__(self, max_size, embed_size, **kwargs):
        super(AddPositionalEncoding, self).__init__(**kwargs)
        self.pos_embed_layer = Embedding(max_size, embed_size)

    def call(self, embeddings):
        batch_max_len = tf.shape(embeddings)[1]
        position_indices = tf.range(batch_max_len)
        position_embeddings = self.pos_embed_layer(position_indices)
        return embeddings + position_embeddings

# Example usage in a model
max_size = 50
embed_size = 128

# Input embeddings (these would typically come from a preceding layer)
encoder_embeddings = tf.keras.Input(shape=(None, embed_size))
decoder_embeddings = tf.keras.Input(shape=(None, embed_size))

# Add positional encodings
pos_encoding_layer = AddPositionalEncoding(max_size, embed_size)
enc_in = pos_encoding_layer(encoder_embeddings)
dec_in = pos_encoding_layer(decoder_embeddings)





In [5]:
max_len=50


# Non Trainable Positional Encodings

In [6]:
class PositionalLayer(tf.keras.layers.Layer):
    def __init__(self,max_len,embed_size,dtype=tf.float32,**kwargs):
        super().__init__(dtype=dtype,**kwargs)
        assert embed_size%2==0,"Embedding Size must be even"
        p,i=np.meshgrid(np.arange(max_len),2*(np.arange(embed_size//2)))
        pos_emb=np.empty((1,max_len,embed_size))
        pos_emb[0,:,::2]=np.sin(p/10000**(i/embed_size)).T
        pos_emb[0,:,1::2]=np.cos(p/10000**(i/embed_size)).T
        self.pos_encodings=tf.constant(pos_emb.astype(self.dtype))
        self.support_masking=True
    def call(self,inputs):
        batch_max_len=tf.shape(inputs)[1]
        return inputs+self.pos_encodings[:,:batch_max_len]
        

In [7]:
pos_embed_layer=PositionalLayer(max_len,embed_size)
encoder_in=pos_embed_layer(encoder_embeddings)
decoder_in=pos_embed_layer(decoder_embeddings)


In [8]:
def create_pad_mask(inputs):
    return tf.math.not_equal(inputs, 0)[:, tf.newaxis]
    

# Encoder of Transformer

In [9]:
N=2
num_heads=8
dropout_rate=0.1
n_units=128
encoder_pad_mask=tf.keras.layers.Lambda(create_pad_mask)(encoder_input_ids)
Z = encoder_in
for _ in range(N):
    skip = Z
    Z = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_size, dropout=dropout_rate)(
        Z, value=Z, attention_mask=encoder_pad_mask)
    Z = tf.keras.layers.Normalization()(tf.keras.layers.Add()([Z, skip]))
    skip = Z
    Z = tf.keras.layers.Dense(n_units, activation='relu')(Z)
    Z = tf.keras.layers.Dense(embed_size)(Z)
    Z = tf.keras.layers.Dropout(dropout_rate)(Z)
    Z = tf.keras.layers.Normalization()(tf.keras.layers.Add()([Z, skip]))

encoder_outputs = Z

# Decoder of Transformer

In [10]:
decoder_pad_mask=tf.keras.layers.Lambda(create_pad_mask)(decoder_input_ids)
Z = decoder_in
for _ in range(N):
    skip = Z
    Z = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_size, dropout=dropout_rate)(
        Z, value=Z, attention_mask=decoder_pad_mask, use_causal_mask=True)
    Z = tf.keras.layers.Normalization()(tf.keras.layers.Add()([Z, skip]))
    skip = Z
    Z = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_size, dropout=dropout_rate)(
        Z, value=encoder_outputs, attention_mask=decoder_pad_mask)
    Z = tf.keras.layers.Normalization()(tf.keras.layers.Add()([Z, skip]))
    skip = Z
    Z = tf.keras.layers.Dense(n_units, activation='relu')(Z)
    Z = tf.keras.layers.Dense(embed_size)(Z)
    Z = tf.keras.layers.Normalization()(tf.keras.layers.Add()([Z, skip]))
    

In [12]:
# Output Layer
Y_proba = tf.keras.layers.Dense(vocab_size, activation="softmax")(Z)

# Model creation
model = tf.keras.Model(inputs=[encoder_inputs, decoder_inputs], outputs=[Y_proba])

# Compilation and fitting
model.compile(loss="sparse_categorical_crossentropy", optimizer="nadam", metrics=["accuracy"])
#model.fit((X_train, X_train_dec), Y_train, epochs=10, validation_data=((X_valid, X_valid_dec), Y_valid))

In [13]:
model.summary()

In [14]:
print("X_train shape:", X_train.shape)
print("X_train_dec shape:", X_train_dec.shape)
print("Y_train shape:", Y_train.shape)
print("X_valid shape:", X_valid.shape)
print("X_valid_dec shape:", X_valid_dec.shape)
print("Y_valid shape:", Y_valid.shape)


X_train shape: (100000, 50)
X_train_dec shape: (100000, 50)
Y_train shape: (100000, 50)
X_valid shape: (18964, 50)
X_valid_dec shape: (18964, 50)
Y_valid shape: (18964, 50)
