In [5]:
import os

os.environ["KERAS_BACKEND"] = "tensorflow"

import pathlib
import random
import string
import re
import numpy as np
import tensorflow as tf
import tensorflow.data as tf_data
import tensorflow.strings as tf_strings
import tensorflow_datasets.public_api as tfds
from tensorflow import keras
from keras import layers, Model, Input
from keras.layers import TextVectorization

import keras
from keras import layers
from keras import ops
from keras.layers import TextVectorization
from keras import backend as K  # Import the Keras backend



# ------------------------
# Data Loading and Preprocessing
# ------------------------

text_pairs = []
with open("data.tsv", "r", encoding="utf-8") as f:
    for line in f:
        fields = line.strip().split("\t")
        if len(fields) < 4:
            continue
        french = fields[1]  # second column
        portuguese = "[start] " + fields[3] + " [end]"  # fourth column
        text_pairs.append((french, portuguese))

print(text_pairs[0])
random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples : num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples :]

print(f"{len(text_pairs)} total pairs")
print(f"{len(train_pairs)} training pairs")
print(f"{len(val_pairs)} validation pairs")
print(f"{len(test_pairs)} test pairs")

strip_chars = string.punctuation + "«" + "»"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

vocab_size = 25000  # for text vectorization; later we use 15000 in the model (feel free to align these)
sequence_length = 20
batch_size = 64

def custom_standardization(input_string):
    lowercase = tf_strings.lower(input_string)
    return tf_strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")

french_vectorization = TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)
portuguese_vectorization = TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization,
)
train_french_texts = [pair[0] for pair in train_pairs]
train_portuguese_texts = [pair[1] for pair in train_pairs]
french_vectorization.adapt(train_french_texts)
portuguese_vectorization.adapt(train_portuguese_texts)

def format_dataset(french, portuguese):
    french = french_vectorization(french)
    portuguese = portuguese_vectorization(portuguese)
    return (
        {
            "encoder_inputs": french,
            "decoder_inputs": portuguese[:, :-1],
        },
        portuguese[:, 1:],
    )

def make_dataset(pairs):
    french_texts, portuguese_texts = zip(*pairs)
    dataset = tf_data.Dataset.from_tensor_slices((list(french_texts), list(portuguese_texts)))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.cache().shuffle(2048).prefetch(16)

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

# ------------------------
# Model Components with Improved Regularization
# ------------------------

# Positional Embedding (unchanged)
from keras import layers, ops

class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.position_embeddings = layers.Embedding(input_dim=sequence_length, output_dim=embed_dim)
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = ops.shape(inputs)[-1]
        positions = ops.arange(0, length, 1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        # Use keras.ops.not_equal which is designed to work with symbolic tensors.
        return ops.not_equal(inputs, 0)

    def get_config(self):
        config = super().get_config()
        config.update({
            "sequence_length": self.sequence_length,
            "vocab_size": self.vocab_size,
            "embed_dim": self.embed_dim,
        })
        return config


# Transformer Encoder with dropout added to attention and feedforward sublayers
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, dropout_rate=0.1, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.dropout_att = layers.Dropout(dropout_rate)
        self.dense_proj = keras.Sequential([
            layers.Dense(dense_dim, activation="relu"),
            layers.Dense(embed_dim),
        ])
        self.dropout_ffn = layers.Dropout(dropout_rate)
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None, training=False):
        if mask is not None:
            padding_mask = tf.cast(mask[:, None, :], dtype="int32")
        else:
            padding_mask = None

        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        attention_output = self.dropout_att(attention_output, training=training)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        proj_output = self.dropout_ffn(proj_output, training=training)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "dense_dim": self.dense_dim,
            "num_heads": self.num_heads,
        })
        return config

# Transformer Decoder with dropout added to both attention sublayers and feedforward network
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, dropout_rate=0.1, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.dropout_att1 = layers.Dropout(dropout_rate)
        self.attention_2 = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.dropout_att2 = layers.Dropout(dropout_rate)
        self.dense_proj = keras.Sequential([
            layers.Dense(latent_dim, activation="relu"),
            layers.Dense(embed_dim),
        ])
        self.dropout_ffn = layers.Dropout(dropout_rate)
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None, training=False):
        decoder_inputs, encoder_outputs = inputs
        causal_mask = self.get_causal_attention_mask(decoder_inputs)

        if mask is None:
            decoder_padding_mask, encoder_padding_mask = None, None
        else:
            decoder_padding_mask, encoder_padding_mask = mask

        # Self-attention (causal)
        attention_output_1 = self.attention_1(
            query=decoder_inputs,
            value=decoder_inputs,
            key=decoder_inputs,
            attention_mask=causal_mask,
            query_mask=decoder_padding_mask,
        )
        attention_output_1 = self.dropout_att1(attention_output_1, training=training)
        out_1 = self.layernorm_1(decoder_inputs + attention_output_1)

        # Cross-attention with encoder outputs
        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            query_mask=decoder_padding_mask,
            key_mask=encoder_padding_mask,
        )
        attention_output_2 = self.dropout_att2(attention_output_2, training=training)
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        proj_output = self.dropout_ffn(proj_output, training=training)
        return self.layernorm_3(out_2 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, seq_length = input_shape[0], input_shape[1]
        i = tf.range(seq_length)[:, None]
        j = tf.range(seq_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, seq_length, seq_length))
        mult = tf.concat([tf.expand_dims(batch_size, -1), tf.convert_to_tensor([1, 1])], axis=0)
        return tf.tile(mask, mult)

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "latent_dim": self.latent_dim,
            "num_heads": self.num_heads,
        })
        return config

# ------------------------
# Building the Transformer Model
# ------------------------

# Set model hyperparameters
embed_dim = 128         # Embedding dimension for tokens
latent_dim = 1024       # Dimension of the feed-forward network in the Transformer blocks
num_heads = 6           # Number of attention heads
sequence_length = 20    # Maximum sequence length
vocab_size_model = 15000  # Vocabulary size used in the model

# Encoder
encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = PositionalEmbedding(sequence_length, vocab_size_model, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)

# Decoder (for training using teacher forcing)
decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
# Use the same encoder outputs (teacher forcing)
x = PositionalEmbedding(sequence_length, vocab_size_model, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)([x, encoder_outputs])
decoder_outputs = layers.Dense(vocab_size_model, activation="softmax")(x)

# Final Transformer model
transformer = keras.Model(
    {"encoder_inputs": encoder_inputs, "decoder_inputs": decoder_inputs},
    decoder_outputs,
    name="transformer",
)

transformer.summary()

# ------------------------
# Compile the Model with Adam and a Learning Rate Schedule
# ------------------------

# Use a learning rate schedule for adaptive training
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=1e-3,
    decay_steps=10000,
    decay_rate=0.9,
    staircase=True,
)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)

# Use label smoothing in the loss to improve generalization
transformer.compile(
    optimizer=optimizer,
    loss=keras.losses.SparseCategoricalCrossentropy(ignore_class=0),
    metrics=["accuracy"],
)


# ------------------------
# Training
# ------------------------

epochs = 30  # You might consider increasing this further or using early stopping
transformer.fit(train_ds, epochs=epochs, validation_data=val_ds)


('Je ne supporte pas ce type.', '[start] Eu não suporto esse tipo. [end]')
33030 total pairs
23122 training pairs
4954 validation pairs
4954 test pairs


Epoch 1/30
[1m362/362[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m123s[0m 328ms/step - accuracy: 0.0637 - loss: 6.4672 - val_accuracy: 0.1304 - val_loss: 4.2834
Epoch 2/30
[1m362/362[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m124s[0m 341ms/step - accuracy: 0.1479 - loss: 3.7420 - val_accuracy: 0.1775 - val_loss: 3.3135
Epoch 3/30
[1m362/362[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m123s[0m 340ms/step - accuracy: 0.2043 - loss: 2.4399 - val_accuracy: 0.1997 - val_loss: 2.9113
Epoch 4/30
[1m129/362[0m [32m━━━━━━━[0m[37m━━━━━━━━━━━━━[0m [1m1:14[0m 319ms/step - accuracy: 0.2455 - loss: 1.5932

KeyboardInterrupt: 