# 3  Translation with an Encoder-Decoder Trans-former
In this exercise, we will build an encoder-decoder transformer and apply it to the“translation” of dates from one format to another format.The architecture we will build closely follows the seminal “Attention is All You

## 3.1  Copy Layers from the Previous Exercise
In the previous exercise on classification with an encoder you implemented
  - FeedForward
  - EmbeddingWithPosition
  - EncoderBlockAll these 
layers are identical in the encoder-decoder architecture we will build.Copy these layers to the current file

In [1]:
%pip install keras
%pip install tensorflow



In [2]:
import tensorflow as tf
import keras

In [3]:
# Simple FeedForward layer
@keras.saving.register_keras_serializable()
class FeedForward(keras.layers.Layer):

    def __init__(self, factor=4, **kwargs):
        super().__init__(**kwargs)
        self.factor = factor

    def build(self, batch_input_shape):
        time_steps, embed_size = batch_input_shape[1:]
        #! YOUR CODE HERE:
        self.w1 = self.add_weight(shape=(embed_size, self.factor * embed_size))
        self.w2 = self.add_weight(shape=(self.factor * embed_size, embed_size))
        self.b1 = self.add_weight(shape=(self.factor * embed_size,))
        self.b2 = self.add_weight(shape=(embed_size,))

    #? Call kun je oproepen met `FeedForward()()`
    def call(self, inputs):
        #! YOUR CODE HERE:
        #! Perform calculation on inputs and return result
        inputs = keras.ops.matmul(inputs,self.w1)
        inputs = inputs + self.b1
        inputs = keras.layers.Activation("relu")(inputs)

        inputs = keras.ops.matmul(inputs,self.w2)
        inputs = inputs + self.b2
        return inputs

    def get_config(self):
        base_config = super().get_config()
        return{**base_config,"factor": self.factor,}

In [4]:
@keras.saving.register_keras_serializable()
class EmbeddingWithPosition(keras.layers.Layer):
    def __init__(self, num_tokens, max_seq_length, embed_size, **kwargs):
        super().__init__(**kwargs)
        #! YOUR CODE HERE
        #! Save constructor arguments
        self.num_tokens = num_tokens
        self.max_seq_length = max_seq_length
        self.embed_size = embed_size
    def build(self, batch_input_shape):
        print(f"Building EmbeddingWithPosition with input shape {batch_input_shape}")
        #! Shape not actually needed!!
        #! YOUR CODE HERE
        #! Add the weights for the two embeddings
        #? Token kunnen omzetten naar een embedding?
        #? token 0 (the)
        #? --> embedding [30,45,29,..., 223,45] # 512
        #? token 2 (or)
        #? --> embedding [12,34,56,...,78] # 512
        #? [
        #? (0): [30,45,29,...,223,45],
        #? ...
        #? (2): [12,34,56,...,78]
        #? ]
        self.embedding_loop_table = self.add_weight(shape=(self.num_tokens,self.embed_size))
        self.position_lookup_table = self.add_weight(shape=(self.max_seq_length,self.embed_size))
    def call(self, inputs):
        _, length = keras.ops.shape(inputs)
        #? YOUR CODE HERE
        #? Get both embeddings and add them.
        token_embeddings = keras.ops.take(self.embedding_loop_table,inputs,axis=0)
        position_embeddings = self.position_lookup_table[:length]
        return token_embeddings + position_embeddings

    def get_config(self):
        base_config = super().get_config()
        return{**base_config,
                "num_tokens": self.num_tokens,
                "max_seq_length": self.max_seq_length,
                "embed_size": self.embed_size}

In [5]:
@keras.saving.register_keras_serializable()
class EncoderBlock(keras.layers.Layer):
    def __init__(self, num_heads, embed_size, **kwargs):
        super().__init__(**kwargs)
        self.num_heads = num_heads
        self.embed_size = embed_size

        # Self-Attention
        self.attention = keras.layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=(embed_size // num_heads), name="self_attention"
        )
        self.norm_1 = keras.layers.LayerNormalization(name="norm_1")

        # Feed-Forward Network
        self.feed_forward = FeedForward(name="feed_forward")
        self.norm_2 = keras.layers.LayerNormalization(name="norm_2")

    def call(self, inputs):
        # 1. Self-Attention + Skip Connection + Normalization
        skip_1 = inputs
        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, use_causal_mask=False # Encoder is niet-causaal
        )
        # Correctie: Keras ops.add in plaats van keras.ops.add()[...]
        x = self.norm_1(keras.layers.add([attention_output, skip_1]))

        # 2. Feed-Forward + Skip Connection + Normalization
        skip_2 = x
        ff_output = self.feed_forward(x)
        x = self.norm_2(keras.layers.add([ff_output, skip_2]))

        return x

    def get_config(self):
        config = super().get_config()
        return {**config, "num_heads": self.num_heads, "embed_size": self.embed_size}

In [6]:
@keras.saving.register_keras_serializable()
class DecoderBlock(keras.layers.Layer):
    def __init__(self, num_heads, embed_size, **kwargs):
        super().__init__(**kwargs)
        self.num_heads = num_heads
        self.embed_size = embed_size

        # 1. Causal Self-Attention
        self.causal_attention = keras.layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=(embed_size // num_heads), name="causal_attention"
        )
        self.norm_1 = keras.layers.LayerNormalization(name="norm_1")

        # 2. Cross-Attention (Encoder-Decoder Attention)
        self.cross_attention = keras.layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=(embed_size // num_heads), name="cross_attention"
        )
        self.norm_2 = keras.layers.LayerNormalization(name="norm_2")

        # 3. Feed-Forward Network
        self.feed_forward = FeedForward(name="feed_forward")
        self.norm_3 = keras.layers.LayerNormalization(name="norm_3")

    def call(self, inputs):
        # inputs is een lijst: [decoder_embs, encoder_embs]
        decoder_embs, encoder_embs = inputs[0], inputs[1]

        # 1. Causal Self-Attention + Skip Connection + Normalization
        skip_1 = decoder_embs
        ca_output = self.causal_attention(
            query=decoder_embs, value=decoder_embs, key=decoder_embs, use_causal_mask=True
        )
        x = self.norm_1(keras.layers.add([ca_output, skip_1]))

        # 2. Cross-Attention + Skip Connection + Normalization
        # Queries (Q) komen van de vorige decoderlaag (x)
        # Keys (K) en Values (V) komen van de encoder output (encoder_embs)
        skip_2 = x
        cross_output = self.cross_attention(
            query=x, key=encoder_embs, value=encoder_embs
        )
        x = self.norm_2(keras.layers.add([cross_output, skip_2]))

        # 3. Feed-Forward + Skip Connection + Normalization
        skip_3 = x
        ff_output = self.feed_forward(x)
        x = self.norm_3(keras.layers.add([ff_output, skip_3]))

        return x

    def get_config(self):
        config = super().get_config()
        return {**config, "num_heads": self.num_heads, "embed_size": self.embed_size}

In [7]:
def get_encoder_decoder_model(
    num_tokens_enc: int, max_seq_length_enc: int,
    num_tokens_dec: int, max_seq_length_dec: int,
    embed_size: int, num_heads: int, num_blocks: int,
    use_mask: bool = False # De maskering logica is verwijderd voor eenvoud
) -> keras.Model:

    # --- De Encoder ---
    encoder_input = keras.layers.Input(shape=(None,),
                                       dtype=tf.int32, name="encoder_input")

    # Positional embedding (met de gecorrigeerde klasse naam)
    encoder_embs = EmbeddingWithPosition(
        num_tokens=num_tokens_enc,
        max_seq_length=max_seq_length_enc,
        embed_size=embed_size,
        name="enc_positional_embedding"
    )(encoder_input)

    # Encoder blocks
    encoder_output = encoder_embs
    for index in range(num_blocks):
        # EncoderBlock ontvangt slechts één input: de embeddings
        encoder_output = EncoderBlock(
            num_heads=num_heads,
            embed_size=embed_size,
            name=f"encoder_block_{index}"
        )(encoder_output)

    # --- De Decoder ---
    decoder_input = keras.layers.Input(shape=(None,),
                                       dtype=tf.int32, name="decoder_input")

    # Positional embedding
    decoder_embs = EmbeddingWithPosition(
        num_tokens=num_tokens_dec,
        max_seq_length=max_seq_length_dec,
        embed_size=embed_size,
        name="dec_positional_embedding"
    )(decoder_input)

    # Decoder blocks
    decoder_output = decoder_embs
    for index in range(num_blocks):
        # DecoderBlock ontvangt een lijst: [decoder_embeddings, encoder_embeddings]
        decoder_output = DecoderBlock(
            num_heads=num_heads,
            embed_size=embed_size,
            name=f"decoder_block_{index}"
        )([decoder_output, encoder_output]) # encoder_output bevat de finale embeddings

    # Classification head. Output logits, niet softmax
    decoder_output = keras.layers.Dense(
        units=num_tokens_dec, activation="linear", name="output_logits"
    )(decoder_output)

    return keras.Model(inputs=[encoder_input, decoder_input],
                       outputs=decoder_output, name="Transformer_Encoder_Decoder")

# --- Test de Modelinitialisatie ---

# Noot: num_tokens_dec moet 13 zijn voor de latere opgave, maar we gebruiken 10 voor de check.
model = get_encoder_decoder_model(
    num_tokens_enc=20, max_seq_length_enc=30,
    num_tokens_dec=10, max_seq_length_dec=12,
    embed_size=32, num_heads=2, num_blocks=4,
)

Building EmbeddingWithPosition with input shape (None, None)
Building EmbeddingWithPosition with input shape (None, None)


In [8]:
model.summary()
# Dit zou nu correct moeten draaien en het aantal parameters van 121.418 moeten opleveren.

# --- Test de voorspelling (gecorrigeerd) ---
# Fout: tf.random.inform bestaat niet. tf.random.uniform is correct.
X_enc = tf.random.uniform(shape=(2,30), minval=0, maxval=20, dtype=tf.int32)
X_dec = tf.random.uniform(shape=(2,12), minval=0, maxval=10, dtype=tf.int32)
print(f"Shape of result: {model([X_enc, X_dec]).shape}")
# Verwachte Shape: (2, 12, 10)

X_enc_short = tf.random.uniform(shape=(2,15), minval=0, maxval=20, dtype=tf.int32)
X_dec_short = tf.random.uniform(shape=(2,9), minval=0, maxval=10, dtype=tf.int32)
print(f"Shape of result (shorter sequences): {model([X_enc_short, X_dec_short]).shape}")
# Verwachte Shape: (2, 9, 10)

Shape of result: (2, 12, 10)
Shape of result (shorter sequences): (2, 9, 10)


In [9]:
%pip install faker
%pip install babel

Collecting faker
  Downloading faker-38.2.0-py3-none-any.whl.metadata (16 kB)
Downloading faker-38.2.0-py3-none-any.whl (2.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m22.5 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: faker
Successfully installed faker-38.2.0


In [10]:
import tensorflow as tf
from keras.layers import TextVectorization
import faker
import random
import babel.dates

# --- Definitie van de Data-functies ---

fake = faker.Faker()
faker.Faker.seed(12345)
random.seed(12345)

FORMATS = [
    'short', 'medium', 'long', 'full', 'full',
    'full', 'full', 'full', 'full', 'full', 'full',
    'full', 'full', 'd MMM YYY', 'd MMMM YYY',
    'dd MMM YYY', 'd MMM, YYY', 'd MMMM, YYY',
    'dd, MMM YYY', 'd MM YY', 'd MMMM YYY',
    'MMMM d YYY', 'MMMM d, YYY', 'dd.MM.YY'
]
# Meer locales toevoegen om de encoder te helpen generaliseren
LOCALES = ['nl_NL', 'de_DE', 'en_US', 'fr_FR', 'it_IT']

def load_date():
    dt = fake.date_object()
    try:
        human_readable = babel.dates.format_date(
            dt,
            format=random.choice(FORMATS),
            locale=random.choice(LOCALES)
        ).lower().replace(',', '')
        machine_readable = dt.isoformat()
    except (AttributeError, ValueError):
        # Vang alle mogelijke fouten bij het genereren van datums
        return None, None, None
    return human_readable, machine_readable, dt

def load_dataset(m_count): # De parameter is hernoemd naar m_count
    dataset = []
    i = 0
    while i < m_count: # Vergelijking is nu tussen twee integers (i en m_count)
        h, m_date, _ = load_date() # De machine-readable string wordt opgeslagen in m_date
        if h is not None and m_date is not None:
            dataset.append((h, m_date))
            i += 1
    return dataset

In [11]:
# --- 3.4.1 Dataset Creation ---

dataset = load_dataset(20_000)

train = dataset[:10_000]
valid = dataset[10_000:15_000]
test  = dataset[15_000:]

# Gebruik zip om de data te splitsen in human-readable en machine-readable
train_human, train_machine = zip(*train)
valid_human, valid_machine = zip(*valid)
test_human, test_machine = zip(*test)

In [12]:
# --- 3.4.2 Create TextVectorization Layers ---

# Encoder TextVectorization (leert van de 'human' data)
enc_text_vec_layer = TextVectorization(
    split="character",
    standardize=None # Voorkom automatische lowercasing/stripping
)
# Pas aan op de trainingsdata
enc_text_vec_layer.adapt(list(train_human))

# Decoder TextVectorization (vast vocabulaire)
# *: End-of-sequence, .: Start-of-sequence
decoder_vocabulary=["*", ".", "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "-"]
num_tokens_dec_final = len(decoder_vocabulary) # Dit is 13

dec_text_vec_layer = TextVectorization(
    split="character",
    standardize=None,
    vocabulary=decoder_vocabulary,
    # Opmerking: TextVectorization voegt automatisch een '[UNK]' (index 0) en een '[PAD]' (index 1) toe
)

In [13]:
# --- Creëer training data voor de encoder ---
X_train_enc = enc_text_vec_layer(list(train_human))
X_valid_enc = enc_text_vec_layer(list(valid_human))
X_test_enc = enc_text_vec_layer(list(test_human))

print(f"Shape of X_train_enc: {X_train_enc.shape}") # Moet (10000, 29) zijn

Shape of X_train_enc: (10000, 29)


In [14]:
# --- Creëer inputs en targets voor de decoder ---

# X_dec: Start-of-sequence ('.') gevolgd door de target datum. (e.g., '.1992-06-16')
X_train_dec_str = [("." + m) for m in train_machine]
X_valid_dec_str = [("." + m) for m in valid_machine]
X_test_dec_str = [("." + m) for m in test_machine]

# Y_dec: De target datum gevolgd door End-of-sequence ('*'). (e.g., '1992-06-16*')
Y_train_dec_str = [(m + "*") for m in train_machine]
Y_valid_dec_str = [(m + "*") for m in valid_machine]
Y_test_dec_str = [(m + "*") for m in test_machine]

# Tokeniseren en 2 aftrekken:
# De eerste twee indexen ([UNK]=0, [PAD]=1) worden genegeerd.
# Onze tokens (*=2, .=3, 0=4, ...) krijgen nu nieuwe indexen (*=0, .=1, 0=2, ...)
X_train_dec = dec_text_vec_layer(X_train_dec_str) - 2
X_valid_dec = dec_text_vec_layer(X_valid_dec_str) - 2
X_test_dec = dec_text_vec_layer(X_test_dec_str) - 2

Y_train_dec = dec_text_vec_layer(Y_train_dec_str) - 2
Y_valid_dec = dec_text_vec_layer(Y_valid_dec_str) - 2
Y_test_dec = dec_text_vec_layer(Y_test_dec_str) - 2

print(f"Shape of X_train_dec: {X_train_dec.shape}") # Moet (10000, 11) zijn
print(f"Shape of Y_train_dec: {Y_train_dec.shape}") # Moet (10000, 11) zijn

Shape of X_train_dec: (10000, 11)
Shape of Y_train_dec: (10000, 11)


In [15]:
# Huidige maximale sequence lengtes (op basis van de getokeniseerde data):
# X_train_enc.shape[1] is de max lengte van de human-readable datum
# X_train_dec.shape[1] is de max lengte van de machine-readable datum + '*' of '.'
SEQ_LENGTH_ENC = X_train_enc.shape[1] # Ongeveer 29
SEQ_LENGTH_DEC = X_train_dec.shape[1] # Ongeveer 11
NUM_BLOCKS = 2
NUM_HEADS = 2
EMBED_SIZE = 16

# Vocabulaire groottes van de TextVectorization layers:
NUM_TOKENS_ENC = enc_text_vec_layer.vocabulary_size()
# Decoder tokens zijn 13 (0 tot 12). Onze verschoven labels (Y_train_dec) gaan van 0 t/m 12.
# Het uitvoer-vocabulaire is 13 (gelijk aan num_tokens_dec_final).
NUM_TOKENS_DEC = num_tokens_dec_final

In [16]:
import tensorflow as tf
from tensorflow import keras

# Maak het model aan
model_no_mask = get_encoder_decoder_model(
    num_tokens_enc=NUM_TOKENS_ENC,
    max_seq_length_enc=SEQ_LENGTH_ENC,
    num_tokens_dec=NUM_TOKENS_DEC,
    max_seq_length_dec=SEQ_LENGTH_DEC,
    embed_size=EMBED_SIZE,
    num_heads=NUM_HEADS,
    num_blocks=NUM_BLOCKS,
    use_mask=False # We trainen zonder expliciete padding maskering
)

# Toon de samenvatting en controleer het aantal parameters
model_no_mask.summary()

# Het totaal aantal parameters zou 17,053 moeten zijn.

Building EmbeddingWithPosition with input shape (None, None)
Building EmbeddingWithPosition with input shape (None, None)


In [17]:
# Definieer de Adam optimizer
optimizer = keras.optimizers.Adam()

# Definieer de loss functie (Logits output vereist from_logits=True)
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Definieer de metriek (accuracy op token-niveau)
metrics = [keras.metrics.SparseCategoricalAccuracy(name="accuracy")]

# Compileer het model
model_no_mask.compile(
    optimizer=optimizer,
    loss=loss_fn,
    metrics=metrics
)

In [50]:
# Early Stopping Callback
early_stopping_cb = keras.callbacks.EarlyStopping(
    monitor="val_accuracy",
    min_delta=0.001,  # 0.1% verbetering
    patience=5,
    restore_best_weights=True,
    verbose=1
)

# Start de training
history = model_no_mask.fit(
    x=[X_train_enc, X_train_dec],
    y=Y_train_dec,
    validation_data=([X_valid_enc, X_valid_dec], Y_valid_dec),
    epochs=100, # Maximaal aantal epochs (Early Stopping stopt eerder)
    callbacks=[early_stopping_cb],
    batch_size=64 # Een standaard batch size
)

# Controleer de uiteindelijke nauwkeurigheid
train_accuracy = history.history['accuracy'][-1]
val_accuracy = history.history['val_accuracy'][-1]

print("\n--- Resultaten ---")
print(f"Laatste Training Accuracy: {train_accuracy * 100:.2f}%")
print(f"Beste Validatie Accuracy: {val_accuracy * 100:.2f}%")

# Het is te verwachten dat beide > 99% zullen zijn.

Epoch 1/100
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 51ms/step - accuracy: 0.9964 - loss: 0.0139 - val_accuracy: 0.9974 - val_loss: 0.0127
Epoch 2/100
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 47ms/step - accuracy: 0.9975 - loss: 0.0118 - val_accuracy: 0.9973 - val_loss: 0.0123
Epoch 3/100
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 54ms/step - accuracy: 0.9983 - loss: 0.0083 - val_accuracy: 0.9956 - val_loss: 0.0158
Epoch 4/100
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 45ms/step - accuracy: 0.9954 - loss: 0.0176 - val_accuracy: 0.9979 - val_loss: 0.0100
Epoch 5/100
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 53ms/step - accuracy: 0.9983 - loss: 0.0077 - val_accuracy: 0.9963 - val_loss: 0.0134
Epoch 6/100
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 54ms/step - accuracy: 0.9977 - loss: 0.0104 - val_accuracy: 0.9971 - val_loss: 0.0116
Epoch 6: early s

In [63]:
max_len_human_readable = max(len(s) for s in X_train_enc)
max_len_machine_readable = max(len(s) for s in Y_train_dec)
print(f"Maximale lengte human-readable datum in training set: {max_len_human_readable}")
print(f"Maximale lengte machine-readable datum in training set: {max_len_machine_readable}")

Maximale lengte human-readable datum in training set: 29
Maximale lengte machine-readable datum in training set: 11


In [64]:
test_dec_inputs = keras.ops.convert_to_tensor([['.']])
test_enc_inputs = keras.ops.convert_to_tensor([["13 nov 2024"]])

In [65]:
test_enc = enc_text_vec_layer(test_enc_inputs)
test_dec = dec_text_vec_layer(test_dec_inputs) - 2
result = model_no_mask.predict([test_enc, test_dec])
result.shape

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 39ms/step


(1, 1, 13)

In [66]:
keras.ops.softmax(result)[0]

<tf.Tensor: shape=(1, 13), dtype=float32, numpy=
array([[8.2934555e-03, 3.3951804e-04, 4.5100256e-07, 6.0762298e-01,
        3.4895864e-01, 6.9088012e-04, 3.1868402e-02, 3.3596950e-04,
        1.1660886e-05, 3.0571240e-04, 1.9308309e-05, 3.1411383e-04,
        1.2389970e-03]], dtype=float32)>

In [74]:
result[0].shape

(1, 13)

In [68]:
i = keras.ops.argmax(keras.ops.softmax(result[0][0]))

In [72]:
dec_text_vec_layer.get_vocabulary()[i+2]

np.str_('1')

In [70]:
result.shape

(1, 1, 13)