In [1]:
import os
os.environ["KERAS_BACKEND"] = "tensorflow"
import pathlib
import random
import string
import re
import numpy as np

import tensorflow.data as tf_data
import tensorflow.strings as tf_strings

import keras
from keras import layers
from keras import ops
from keras.layers import TextVectorization



# English to Spanish Translator

Building on the previous encoder model that predicted imdb sentiment, I will build an encoder and decoder for the full transformer and apply it to the english/spanish dataset. Hopefully I will have a good enough grasp after this to detour to CNNs and then move up to Axial Trasformers.

This time I am just following [this tutorial](https://keras.io/examples/nlp/neural_machine_translation_with_transformer/) by studying the code, transcribing the architecture onto diagrams and notes, and doing my best to recreate it.

In [11]:
text_file = keras.utils.get_file(
    fname="spa-eng.zip",
    origin="http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip",
    extract=True,
)
text_file = pathlib.Path(text_file).parent / "spa-eng_extracted" / "spa-eng"/"spa.txt"

In [12]:
with open(text_file) as file:
    text_pairs = []
    for line in file:
        eng, spa = line.split("\t")
        spa = "[start]" + spa + " [end]"
        text_pairs.append((eng,spa))
        
random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples:num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples : ]

print(f"{len(text_pairs)} total pairs")
print(f"{len(train_pairs)} training pairs")
print(f"{len(val_pairs)} validation pairs")
print(f"{len(test_pairs)} test pairs")

strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

VOCAB_SIZE = 15000
SEQUENCE_LEN = 20
BATCH_SIZE = 64

vocab_size = 15000
sequence_length = 20
batch_size = 64

def custom_standardization(input_string):
    lowercase = tf_strings.lower(input_string)
    return tf_strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")

eng_vectorization = TextVectorization(
    max_tokens = VOCAB_SIZE,
    output_mode= 'int',
    output_sequence_length= SEQUENCE_LEN
)

spa_vectorization = TextVectorization(
    max_tokens = VOCAB_SIZE,
    output_mode= 'int',
    output_sequence_length= SEQUENCE_LEN + 1,
    standardize = custom_standardization
)

train_eng_texts = [pair[0] for pair in train_pairs]
train_spa_texts = [pair[1] for pair in train_pairs]
eng_vectorization.adapt(train_eng_texts)
spa_vectorization.adapt(train_spa_texts)

def format_dataset(eng,spa):
    eng = eng_vectorization(eng)
    spa = spa_vectorization(spa)
    return (
        {
            "encoder_inputs": eng,
            "decoder_inputs": spa[:,:-1]
        },
        spa[:,1:],
    )
    
def make_dataset(pairs):
    eng_texts, spa_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    spa_texts = list(spa_texts)
    dataset = tf_data.Dataset.from_tensor_slices((eng_texts, spa_texts))
    dataset = dataset.batch(BATCH_SIZE)
    dataset = dataset.map(format_dataset)
    return dataset.cache().shuffle(2024).prefetch(16)

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

for inputs, targets in train_ds.take(1):
    print(f'shape of inputs["encoder_inputs"]: {inputs["encoder_inputs"].shape}')
    print(f'shape of inputs["decoder_inputs"]: {inputs["decoder_inputs"].shape}')

118964 total pairs
83276 training pairs
17844 validation pairs
17844 test pairs
shape of inputs["encoder_inputs"]: (64, 20)
shape of inputs["decoder_inputs"]: (64, 20)


In [None]:
class PositionEmbedding(layers.Layer):
    def __init__(self, sequence_len, embed_dim, vocab_size, **kwargs):
        super().__init__(**kwargs)
        self.sequence_len = sequence_len
        self.embed_dim = embed_dim
        self.vocab_size = vocab_size
        self.tokenEmbedding = layers.Embedding(vocab_size, embed_dim)
        self.positionEmbedding = layers.Embedding(sequence_len, embed_dim)
        
    def compute_mask(self, inputs, mask=None):
        return ops.not_equal(inputs, 0)
    
    def call(self, inputs):
        token_embedding = self.tokenEmbedding(inputs)
        positions = ops.arange(0,self.sequence_len,1)
        position_embedding = self.positionEmbedding(positions)
        return (token_embedding + position_embedding)
        
    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "sequence_len": self.sequence_len,
            "vocab_size": self.vocab_size,
        })
        return config
    
class Encoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.key_dim = embed_dim // num_heads
        self.mha_layer = layers.MultiHeadAttention(
            key_dim = self.key_dim,
            num_heads = self.num_heads
        )
        self.normalization_layer = layers.LayerNormalization()
        self.normalization_layer2 = layers.LayerNormalization()
        self.dense_proj = keras.Sequential([
            layers.Dense(self.dense_dim, activation='relu'),
            layers.Dense(embed_dim)
        ])
        
    def call(self,inputs, mask=None):
        if mask is not None:
            padding_mask = ops.cast(mask[:,None,:], dtype='int32')
        else:
            padding_mask = None
        mha_output = self.mha_layer(
            query = inputs,
            key = inputs,
            value = inputs,
            query_mask = padding_mask
        )
        normalized_attention = self.normalization_layer(mha_output + inputs)
        dense_output = self.dense_proj(normalized_attention)
        return self.normalization_layer2(normalized_attention + dense_output)
    
    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "dense_dim": self.dense_dim,
            "num_heads": self.num_heads,
            "key_dim": self.key_dim
        })
        return config
    
class Decoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.key_dim = embed_dim // num_heads
        self.self_attention_layer = layers.MultiHeadAttention(
            key_dim=self.key_dim, num_heads=self.num_heads
        )
        self.cross_attention_layer = layers.MultiHeadAttention(
            key_dim=self.key_dim, num_heads=self.num_heads
        )
        self.normalization1 = layers.LayerNormalization()
        self.normalization2 = layers.LayerNormalization()
        self.normalization3 = layers.LayerNormalization()
        self.dense_proj = keras.Sequential([
            layers.Dense(self.dense_dim, activation='relu'),
            layers.Dense(self.embed_dim)
        ])
        
    def call(self, inputs, mask=None):
        self_attention_input, cross_attention_input = inputs
        # causal_mask = self.get_causal_mask(ops.shape(inputs))
        
        if mask is not None:
            self_attention_padding_mask, cross_attention_padding_mask = mask
        else:
            self_attention_padding_mask, cross_attention_padding_mask = None
            
        self_attention = self.self_attention_layer(
            query = self_attention_input,
            key = self_attention_input,
            value = self_attention_input,
            query_mask = self_attention_padding_mask,
            use_causal_mask = True
        )
        normal_added_self_attention = self.normalization1(self_attention + self_attention_input)
        
        cross_attention = self.cross_attention_layer(
            query = normal_added_self_attention,
            key = cross_attention_input,
            value = cross_attention_input,
            query_mask = self_attention_padding_mask,
            key_mask = cross_attention_padding_mask,
        )
        normal_added_cross_attention = self.normalization2(normal_added_self_attention + cross_attention)
        decoder_output = self.dense_proj(normal_added_cross_attention)
        return self.normalization3(normal_added_cross_attention + decoder_output)
        
    # def get_causal_mask(input_shape):
    #     batch_size, sequence_len = input_shape[0], input_shape[1]
    #     i = ops.arange(sequence_len)[:None]
    #     j = ops.arange(sequence_len)
    #     mask = ops.cast(i >= j, dtype='int32')
    #     mask = ops.reshape(mask, (1, input_shape[1], input_shape[1]))
    #     mult = ops.concatenate(
    #         [ops.expand_dims(batch_size, -1), ops.convert_to_tensor([1, 1])],
    #         axis=0,
    #     )
    #     return ops.tile(mask, mult)
        
    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "dense_dim": self.dense_dim,
            "num_heads": self.num_heads,
            "key_dim": self.key_dim,
        })
        return config

In [14]:
vocab_size = 15000
sequence_length = 20
batch_size = 64
embed_dim = 100
dense_dim = 4 * embed_dim
num_heads = 6

encoder_inputs = keras.Input(shape=(sequence_length,), dtype='int32', name="encoder_inputs")
x = PositionEmbedding(vocab_size=vocab_size, embed_dim=100, sequence_len=sequence_length)(encoder_inputs)
encoder_output = Encoder(embed_dim=embed_dim, dense_dim = dense_dim, num_heads=num_heads)(x)

decoder_self_attention_inputs = keras.Input(shape=(sequence_length,), dtype='int32',  name="decoder_inputs")
y = PositionEmbedding(vocab_size=vocab_size, embed_dim=100, sequence_len=sequence_length)(decoder_self_attention_inputs)
decoder_output = Decoder(embed_dim=embed_dim, dense_dim = dense_dim, num_heads=num_heads)((y, encoder_output))
outputs = layers.Dense(vocab_size, activation="softmax")(decoder_output)

transformer = keras.Model(inputs=[encoder_inputs,decoder_self_attention_inputs], outputs=outputs)

transformer.summary()

1. The `call()` method of your layer may be crashing. Try to `__call__()` the layer eagerly on some test input first to see if it works. E.g. `x = np.random.random((3, 4)); y = layer(x)`
2. If the `call()` method is correct, then you may need to implement the `def build(self, input_shape)` method on your layer. It should create all variables used by the layer (e.g. by calling `layer.build()` on all its children layers).
Exception encountered: ''cannot unpack non-iterable NoneType object''


In [15]:
epochs = 1  # This should be at least 30 for convergence
transformer.compile(
    "rmsprop",
    loss=keras.losses.SparseCategoricalCrossentropy(ignore_class=0),
    metrics=["accuracy"],
)
transformer.fit(train_ds, epochs=epochs, validation_data=val_ds)

[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m311s[0m 237ms/step - accuracy: 0.0750 - loss: 5.5264 - val_accuracy: 0.1473 - val_loss: 3.2575


<keras.src.callbacks.history.History at 0x3378654c0>

In [18]:
spa_vocab = spa_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 20


def decode_sequence(input_sentence):
    tokenized_input_sentence = eng_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = spa_vectorization([decoded_sentence])[:, :-1]
        predictions = transformer(
            {
                "encoder_inputs": tokenized_input_sentence,
                "decoder_inputs": tokenized_target_sentence,
            }
        )
        sampled_token_index = ops.convert_to_numpy(
            ops.argmax(predictions[0, i, :])
        ).item(0)
        sampled_token = spa_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == "[end]":
            break
    return decoded_sentence


test_eng_texts = [pair[0] for pair in test_pairs]
for _ in range(10):
    input_sentence = random.choice(test_eng_texts)
    translated = decode_sequence(input_sentence)
    print(f"input sentence: {input_sentence}\noutput sentence: {translated}")

input sentence: Tom and Mary lost their jobs due to budget cutbacks.
output sentence: [start] y mary se [UNK] su [UNK] [UNK] [UNK] [end]
input sentence: Tom needs money.
output sentence: [start] dinero [end]
input sentence: Stop wasting everyone's time.
output sentence: [start] de [UNK] de tiempo [end]
input sentence: It is too difficult a problem for me to solve.
output sentence: [start] demasiado bien por un problema para mí [end]
input sentence: This dictionary, of which the third volume is missing, cost me a hundred dollars.
output sentence: [start] de que el mundo es el mundo puede [UNK] un día de la [UNK] [end]
input sentence: I want you to grow up.
output sentence: [start] que te [UNK] [end]
input sentence: We're on strike because the company hasn't improved our wages.
output sentence: [start] en el mundo se ha sido la [UNK] no se ha estado en nuestra [UNK] [end]
input sentence: That box is bigger than this one.
output sentence: [start] es más más que esto [end]
input sentence: 

## Takeaways

Woah, this might be the coolest project I did so far. For instance:

>input sentence: It is too difficult a problem for me to solve.\
output sentence: [start] demasiado bien por un problema para mí [end]

Ok, it's bad, but I'm honestly impressed by how the language is able to get this far with just random weights. Here, the output sentence word by word (which is not always the best translation mind you) translates to "too much well for a problem for me". The output is obviously nonsensicle with just one epoch of training, but you can tell that the machine read the input, had a very rough idea of the words, and maintains some structure in its own output for example by putting "for" and "for a" before a noun.

Also, I had no clue how masks worked before, and now I find it especially sick how an simple attention mask of 1s with 0s above the diagonal can enforce causality. I'm starting to realize that the overall theme of neural networks is how stacking pieces of complexity via mathematical features contributes is analogous to enforcing rules on logic, which feels oddly metaphysical for such a rigid field. Apparently its also called 'inductive bias'.

As far as improvement goes, I honestly am not totally sure if this works perfectly as expected since I can't really train it for more than a few epochs (one epoch takes 5 minutes). I believe the way I wrote it is close to optimal (pretty closely follows the example docs). If I had to debug it, I think I would appreciate explicitly passing around masks and remove mask ambiguity for the decoder block.