# Week 6: Sequence-to-sequence model

In [83]:
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import ops
import numpy as np
import random
import tensorflow as tf
import string
import re

## Load the data
The text data read from the textfile and split into pairs of English and Finnish sentences. These text pairs are appended to a list.

In [84]:
text_file = "fin.txt"

with open(text_file, encoding='utf-8') as f:
    lines = f.read().split("\n")[:-1]
text_pairs = []
for line in lines:
    english, finnish, rest = line.split("\t")
    finnish = "[start] " + finnish + " [end]"
    text_pairs.append((english, finnish))

In [85]:
print(random.choice(text_pairs))

("As the crow flies, it's about 20 miles from here.", '[start] Se on täältä noin 20 mailia linnuntietä. [end]')


## Split the data into training, validation, and test sets
The text pairs are shuffled randomly and split into training, validation, and test sets. The training set contains 70% of the data, the validation set contains 15%, and the test set contains 15%.

In [86]:
random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples:num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples:]

Punctuation removed from the text data. The text data is then tokenized using the TextVectorization layer. The TextVectorization layer is used to convert the text data into a sequence of integers, where each integer represents a unique word in the vocabulary. The vocabulary size is set to 15,000, and the maximum sequence length is set to 20.

In [87]:
strip_chars = string.punctuation
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")


def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(
        lowercase, f"[{re.escape(strip_chars)}]", "")


vocab_size = 15000
sequence_length = 30

source_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)
target_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization,
)
train_english_texts = [pair[0] for pair in train_pairs]
train_finnish_texts = [pair[1] for pair in train_pairs]
source_vectorization.adapt(train_english_texts)
target_vectorization.adapt(train_finnish_texts)

In [88]:
batch_size = 64

def format_dataset(eng, fin):
    eng = source_vectorization(eng)
    fin = target_vectorization(fin)
    return ({
                "english": eng,
                "finnish": fin[:, :-1],
            }, fin[:, 1:])

def make_dataset(pairs):
    eng_texts, fi_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    fi_texts = list(fi_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, fi_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset, num_parallel_calls=4)
    return dataset.shuffle(2048).prefetch(16).cache()


train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [89]:
for inputs, targets in train_ds.take(1):
    print(f"inputs['english'].shape: {inputs['english'].shape}")
    print(f"inputs['finnish'].shape: {inputs['finnish'].shape}")
    print(f"targets.shape: {targets.shape}")

inputs['english'].shape: (64, 30)
inputs['finnish'].shape: (64, 30)
targets.shape: (64, 30)


2025-05-07 22:54:13.518475: W tensorflow/core/kernels/data/cache_dataset_ops.cc:916] The calling iterator did not fully read the dataset being cached. In order to avoid unexpected truncation of the dataset, the partially cached contents of the dataset  will be discarded. This can happen if you have an input pipeline similar to `dataset.cache().take(k).repeat()`. You should use `dataset.take(k).cache().repeat()` instead.


Define a custom layer for converting the integer sequences into dense vectors. The layer uses an embedding layer to convert the integer sequences into dense vectors. The layer also adds positional embeddings to the dense vectors. The positional embeddings are used to give the model information about the position of each word in the sequence.

In [90]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = ops.shape(inputs)[-1]
        positions = ops.arange(0, length, 1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return ops.not_equal(inputs, 0)

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "sequence_length": self.sequence_length,
                "vocab_size": self.vocab_size,
                "embed_dim": self.embed_dim,
            }
        )
        return config

## Encoder and Decoder
The Encoder takes the input sequence and produces a sequence of hidden states. The Decoder takes the hidden states from the Encoder and produces the output sequence.

In [91]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [
                layers.Dense(dense_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = ops.cast(mask[:, None, :], dtype="int32")
        else:
            padding_mask = None

        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "embed_dim": self.embed_dim,
                "dense_dim": self.dense_dim,
                "num_heads": self.num_heads,
            }
        )
        return config

In [92]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [
                layers.Dense(latent_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = ops.cast(mask[:, None, :], dtype="int32")
            padding_mask = ops.minimum(padding_mask, causal_mask)
        else:
            padding_mask = None

        attention_output_1 = self.attention_1(
            query=inputs, value=inputs, key=inputs, attention_mask=causal_mask
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = ops.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = ops.arange(sequence_length)[:, None]
        j = ops.arange(sequence_length)
        mask = ops.cast(i >= j, dtype="int32")
        mask = ops.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = ops.concatenate(
            [ops.expand_dims(batch_size, -1), ops.convert_to_tensor([1, 1])],
            axis=0,
        )
        return ops.tile(mask, mult)

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "embed_dim": self.embed_dim,
                "latent_dim": self.latent_dim,
                "num_heads": self.num_heads,
            }
        )
        return config

## Build the model
The model takes two inputs: the Finnish sentence and the English sentence. The model outputs the predicted English sentence.

In [93]:
embed_dim = 256 
dense_dim = 2048 
num_heads = 8 
  
encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="english")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)     
 
decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="finnish")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs)  
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)        
transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)

Next we compile and train the model.

In [94]:
transformer.compile(
 optimizer="rmsprop",
 loss="sparse_categorical_crossentropy",
 metrics=["accuracy"])

early_stopping = keras.callbacks.EarlyStopping(
    monitor="val_loss",
    patience=3,
    restore_best_weights=True,
)
    
transformer.fit(train_ds, epochs=30, validation_data=val_ds, verbose=1, callbacks=[early_stopping])

Epoch 1/30
[1m791/791[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m49s[0m 45ms/step - accuracy: 0.0996 - loss: 5.1001 - val_accuracy: 0.1156 - val_loss: 3.3611
Epoch 2/30
[1m791/791[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 26ms/step - accuracy: 0.1143 - loss: 3.5134 - val_accuracy: 0.1293 - val_loss: 2.8059
Epoch 3/30
[1m791/791[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 26ms/step - accuracy: 0.1257 - loss: 3.0012 - val_accuracy: 0.1348 - val_loss: 2.5965
Epoch 4/30
[1m791/791[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 26ms/step - accuracy: 0.1322 - loss: 2.7073 - val_accuracy: 0.1378 - val_loss: 2.4657
Epoch 5/30
[1m791/791[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 26ms/step - accuracy: 0.1372 - loss: 2.5145 - val_accuracy: 0.1371 - val_loss: 2.4929
Epoch 6/30
[1m791/791[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 26ms/step - accuracy: 0.1405 - loss: 2.3964 - val_accuracy: 0.1406 - val_loss: 2.4040
Epoch 7/30
[1m7

<keras.src.callbacks.history.History at 0x7f4ed10eb310>

In [96]:
fin_vocab = target_vectorization.get_vocabulary()
fin_index_lookup = dict(zip(range(len(fin_vocab)), fin_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = target_vectorization(
            [decoded_sentence]
        )[:, :-1]
        predictions = transformer(
            [tokenized_input_sentence, tokenized_target_sentence]
        )
        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = fin_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token
        if sampled_token == "[end]":
            break
    return decoded_sentence

test_eng_texts = [pair[0] for pair in test_pairs]
for i in range(5):
    print("Translation " + str(i + 1) + ":")
    input_sentence = random.choice(test_eng_texts)
    print(input_sentence)
    print(decode_sequence(input_sentence))
    print("")

Translation 1:
I got a job.
[start] sain töitä [end]

Translation 2:
Stay absolutely still.
[start] pysy ihan vielä ihan vielä [end]

Translation 3:
Cats usually hate dogs.
[start] kissat yleensä [UNK] [end]

Translation 4:
I felt exhausted when the game was over.
[start] minulla oli ihan ollut ihan joka päivä [end]

Translation 5:
That's a stupid thing to say.
[start] se on tyhmä [UNK] [end]
