In [1]:
import os
import re
import string
import random
import numpy as np
import tensorflow as tf
from tensorflow import keras
from keras import layers
from keras.layers import Input, Embedding, Dense, Dropout, TextVectorization

# Set working directory for transformer import and data loading
os.chdir(f'{os.getenv("HOME")}/Downloads/deeplearning')
from transformer import Transformer

os.chdir(f'{os.getenv("HOME")}/Downloads/deeplearning')

Transformer imported from local file "transformer.py"


In [2]:
text_file = "spa.txt"
with open(text_file) as f:
    lines = f.read().split("\n")[:-1]

text_pairs = []
for line in lines:
    english, spanish = line.split("\t")
    spanish = "[start] " + spanish + " [end]"
    text_pairs.append((english, spanish))

random.shuffle(text_pairs)
num_val = int(0.15 * len(text_pairs))
num_train = len(text_pairs) - 2 * num_val
train_pairs = text_pairs[:num_train]
val_pairs = text_pairs[num_train:num_train+num_val]
test_pairs = text_pairs[num_train+num_val:]

vocab_size, seq_length = 15000, 20

strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "").replace("]", "")

def custom_standardization(input_string):
    return tf.strings.regex_replace(tf.strings.lower(input_string), f"[{re.escape(strip_chars)}]", "")

source_vectorization = TextVectorization(
    max_tokens=vocab_size, output_mode="int", output_sequence_length=seq_length)
source_vectorization.adapt([pair[0] for pair in train_pairs])

target_vectorization = TextVectorization(
    max_tokens=vocab_size, output_mode="int", standardize=custom_standardization,
    output_sequence_length=seq_length + 1)
target_vectorization.adapt([pair[1] for pair in train_pairs])

def format_dataset(eng, spa):
    eng = source_vectorization(eng)
    spa = target_vectorization(spa)
    return ((eng, spa[:, :-1]), spa[:, 1:])

batch_size = 64
def make_dataset(pairs):
    eng_texts, spa_texts = zip(*pairs)
    dataset = tf.data.Dataset.from_tensor_slices((list(eng_texts), list(spa_texts)))
    return dataset.batch(batch_size).map(format_dataset, num_parallel_calls=4).shuffle(2048).prefetch(16).cache()

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)


2025-05-01 19:21:17.840790: I metal_plugin/src/device/metal_device.cc:1154] Metal device set to: Apple M2
2025-05-01 19:21:17.841020: I metal_plugin/src/device/metal_device.cc:296] systemMemory: 8.00 GB
2025-05-01 19:21:17.841027: I metal_plugin/src/device/metal_device.cc:313] maxCacheSize: 2.67 GB
I0000 00:00:1746141677.841573 31180663 pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
I0000 00:00:1746141677.841986 31180663 pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


In [3]:
def masked_loss(label, pred):
    loss = keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')(label, pred)
    mask = tf.cast(label != 0, dtype=loss.dtype)
    loss *= mask
    return tf.reduce_sum(loss) / tf.reduce_sum(mask)

def masked_accuracy(label, pred):
    pred = tf.argmax(pred, axis=2)
    label = tf.cast(label, pred.dtype)
    mask = label != 0
    match = tf.cast((label == pred) & mask, dtype=tf.float32)
    mask = tf.cast(mask, dtype=tf.float32)
    return tf.reduce_sum(match) / tf.reduce_sum(mask)

In [4]:
class CustomSchedule(keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, d_model, warmup_steps=4000):
        super().__init__()
        self.d_model = tf.cast(d_model, tf.float32)
        self.warmup_steps = warmup_steps

    def __call__(self, step):
        step = tf.cast(step, dtype=tf.float32)
        arg1 = tf.math.rsqrt(step)
        arg2 = step * (self.warmup_steps ** -1.5)
        return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

learning_rate = CustomSchedule(128)
optimizer = keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)

model = Transformer(n_layers=4, d_emb=128, n_heads=8, d_ff=512, dropout_rate=0.1,
                    src_vocab_size=vocab_size, tgt_vocab_size=vocab_size)
model.compile(loss=masked_loss, optimizer=optimizer, metrics=[masked_accuracy])

In [None]:
model.fit(train_ds, epochs=10, validation_data=val_ds)
model.save_weights("eng2spa_transformer_weights.keras")

Epoch 1/10


2025-05-01 19:22:32.623399: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:117] Plugin optimizer for device_type GPU is enabled.


[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1165s[0m 878ms/step - loss: 7.5709 - masked_accuracy: 0.1552 - val_loss: 3.7973 - val_masked_accuracy: 0.4043
Epoch 2/10
[1m 191/1302[0m [32m━━[0m[37m━━━━━━━━━━━━━━━━━━[0m [1m16:32[0m 894ms/step - loss: 3.8668 - masked_accuracy: 0.4072

In [None]:
spa_vocab = target_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))

# Reload weights for inference
model.load_weights("eng2spa_transformer_weights.keras")

def decode_sequence(input_sentence):
    tokenized_input = source_vectorization([input_sentence])
    decoded = "[start]"
    for i in range(20):
        tokenized_target = target_vectorization([decoded])[:, :-1]
        preds = model.predict([tokenized_input, tokenized_target], verbose=0)
        next_index = np.argmax(preds[0, i, :])
        next_token = spa_index_lookup[next_index]
        decoded += " " + next_token
        if next_token == "[end]":
            break
    return decoded.replace("[start] ", "").replace(" [end]", "")

In [None]:
if __name__ == "__main__":
    print("\nEnglish-to-Spanish Translator (Transformer-based)")
    print("Type 'quit' to exit.\n")
    while True:
        eng = input("Enter an English sentence: ")
        if eng.strip().lower() == "quit":
            break
        print("Spanish Translation:", decode_sequence(eng))
        print("-")