# Transformers

[Read the blog post here!](https://dyluc.github.io/2024/09/01/the-transformer-architecture.html)

In [19]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' 
import tensorflow as tf
import numpy as np
from pathlib import Path

url = "https://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip"
path = tf.keras.utils.get_file("spa-eng.zip", origin=url, cache_dir="datasets",
                               extract=True)
text = (Path(path).with_name("spa-eng") / "spa.txt").read_text().replace("¡", "").replace("¿", "")
pairs = [line.split("\t") for line in text.splitlines()]
np.random.seed(42)
np.random.shuffle(pairs)
sentences_en, sentences_es = zip(*pairs)
for i in range(3):
    print(sentences_en[i], "=>", sentences_es[i])

How boring! => Qué aburrimiento!
I love sports. => Adoro el deporte.
Would you like to swap jobs? => Te gustaría que intercambiemos los trabajos?


In [20]:
vocab_size, max_length = 1000, 50
text_vec_layer_en = tf.keras.layers.TextVectorization(
    vocab_size, output_sequence_length=max_length)
text_vec_layer_es = tf.keras.layers.TextVectorization(
    vocab_size, output_sequence_length=max_length)
text_vec_layer_en.adapt(sentences_en)
text_vec_layer_es.adapt([f"startofseq {s} endofseq" for s in sentences_es])

print(text_vec_layer_en.get_vocabulary()[:10])
print(text_vec_layer_es.get_vocabulary()[:10])

['', '[UNK]', 'the', 'i', 'to', 'you', 'tom', 'a', 'is', 'he']
['', '[UNK]', 'startofseq', 'endofseq', 'de', 'que', 'a', 'no', 'tom', 'la']


In [21]:
X_train = tf.constant(sentences_en[:100_000])
X_valid = tf.constant(sentences_en[100_000:])
X_train_dec = tf.constant([f"startofseq {s}" for s in sentences_es[:100_000]])
X_valid_dec = tf.constant([f"startofseq {s}" for s in sentences_es[100_000:]])
Y_train = text_vec_layer_es([f"{s} endofseq" for s in sentences_es[:100_000]])
Y_valid = text_vec_layer_es([f"{s} endofseq" for s in sentences_es[100_000:]])

tf.random.set_seed(42)
encoder_inputs = tf.keras.layers.Input(shape=[], dtype=tf.string)
decoder_inputs = tf.keras.layers.Input(shape=[], dtype=tf.string)

embed_size = 128
encoder_input_ids = text_vec_layer_en(encoder_inputs)
decoder_input_ids = text_vec_layer_es(decoder_inputs)
encoder_embedding_layer = tf.keras.layers.Embedding(vocab_size, embed_size, mask_zero=True)
decoder_embedding_layer = tf.keras.layers.Embedding(vocab_size, embed_size, mask_zero=True)
encoder_embeddings = encoder_embedding_layer(encoder_input_ids)
decoder_embeddings = decoder_embedding_layer(decoder_input_ids)

## Positional Encodings

In [22]:
class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, max_length, embed_size, dtype=tf.float32, **kwargs):
        super().__init__(dtype=dtype, **kwargs)
        
        p, i = np.meshgrid(np.arange(max_length), 2 * np.arange(embed_size // 2))
        pos_emb = np.empty((1, max_length, embed_size))
        pos_emb[0, :, ::2] = np.sin(p / 10_000 ** (i / embed_size)).T
        pos_emb[0, :, 1::2] = np.cos(p / 10_000 ** (i / embed_size)).T
        self.pos_encodings = tf.constant(pos_emb.astype(self.dtype))
        self.supports_masking = True

    def call(self, inputs):
        batch_max_length = tf.shape(inputs)[1]
        return inputs + self.pos_encodings[:, :batch_max_length]

In [23]:
pos_embed_layer = PositionalEncoding(max_length, embed_size)
encoder_in = pos_embed_layer(encoder_embeddings)
decoder_in = pos_embed_layer(decoder_embeddings)

## Masking

In [24]:
# padding and causal masks
class PaddingMask(tf.keras.layers.Layer):
    def call(self, inputs):
        return tf.math.not_equal(inputs, 0)[:, tf.newaxis]

class CausalMask(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.supports_masking = True
        
    def call(self, inputs):
        seq_len = tf.shape(inputs)[1]
        return tf.linalg.band_part(tf.ones((seq_len, seq_len), tf.bool), -1, 0)

encoder_pad_mask = PaddingMask()(encoder_input_ids)
decoder_pad_mask = PaddingMask()(decoder_input_ids)

causal_mask = CausalMask()(decoder_embeddings)

## Encoder

In [25]:
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, embed_size, att_heads, ff_units, dropout_rate, **kwargs):
        super().__init__(**kwargs)
        self.supports_masking = True
        self.attn_layer = tf.keras.layers.MultiHeadAttention(
            num_heads=att_heads, key_dim=embed_size, dropout=dropout_rate
        )
        self.norm1 = tf.keras.layers.LayerNormalization()
        self.norm2 = tf.keras.layers.LayerNormalization()
        self.ffn = tf.keras.Sequential([
            tf.keras.layers.Dense(ff_units, activation="relu"),
            tf.keras.layers.Dense(embed_size),
            tf.keras.layers.Dropout(dropout_rate)
        ])

    def call(self, inputs, mask=None):
        # multi-head attention sublayer
        attn_output = self.attn_layer(inputs, value=inputs, attention_mask=mask)
        out1 = self.norm1(tf.keras.layers.Add()([attn_output, inputs]))

        # fully connected sublayer
        ffn_output = self.ffn(out1)
        out2 = self.norm2(tf.keras.layers.Add()([ffn_output, out1]))

        return out2

N, att_heads, dropout_rate, ff_units = 2, 8, 0.1, 128
encoder_layers = [EncoderLayer(embed_size, att_heads, ff_units, dropout_rate) for _ in range(N)]

Z = encoder_in
for encoder_layer in encoder_layers:
    Z = encoder_layer(Z, mask=encoder_pad_mask)

## Decoder

In [26]:
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, embed_size, att_heads, ff_units, dropout_rate, **kwargs):
        super().__init__(**kwargs)
        self.supports_masking = True
        self.self_attn_layer = tf.keras.layers.MultiHeadAttention(
            num_heads=att_heads, key_dim=embed_size, dropout=dropout_rate
        )
        self.cross_attn_layer = tf.keras.layers.MultiHeadAttention(
            num_heads=att_heads, key_dim=embed_size, dropout=dropout_rate
        )
        self.norm1 = tf.keras.layers.LayerNormalization()
        self.norm2 = tf.keras.layers.LayerNormalization()
        self.norm3 = tf.keras.layers.LayerNormalization()
        self.ffn = tf.keras.Sequential([
            tf.keras.layers.Dense(ff_units, activation="relu"),
            tf.keras.layers.Dense(embed_size),
            tf.keras.layers.Dropout(dropout_rate)
        ])

    def call(self, inputs, encoder_outputs, decoder_mask=None, encoder_mask=None):
        # self attention sublayer
        self_attn_output = self.self_attn_layer(inputs, value=inputs, attention_mask=decoder_mask)
        out1 = self.norm1(tf.keras.layers.Add()([self_attn_output, inputs]))

        # cross attention sublayer
        cross_attn_output = self.cross_attn_layer(out1, value=encoder_outputs, attention_mask=encoder_mask) # use encoder stack final outputs
        out2 = self.norm2(tf.keras.layers.Add()([cross_attn_output, out1]))

        # fully connected sublayer
        ffn_output = self.ffn(out2)
        out3 = self.norm3(tf.keras.layers.Add()([ffn_output, out2]))

        return out3

decoder_layers = [DecoderLayer(embed_size, att_heads, ff_units, dropout_rate) for _ in range(N)]

encoder_outputs = Z
Z = decoder_in
for decoder_layer in decoder_layers:
    Z = decoder_layer(Z, encoder_outputs, decoder_mask=causal_mask & decoder_pad_mask, encoder_mask=encoder_pad_mask)

# Output Projection Layer

In [27]:
Y_proba = tf.keras.layers.Dense(vocab_size, activation="softmax")(Z)
model = tf.keras.Model(inputs=[encoder_inputs, decoder_inputs], outputs=[Y_proba])
model.compile(loss="sparse_categorical_crossentropy", optimizer="nadam", metrics=["accuracy"])
model.fit((X_train, X_train_dec), Y_train, epochs=10, validation_data=((X_valid, X_valid_dec), Y_valid))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x17a7d7280>