<a href="https://colab.research.google.com/github/FernandoBRdgz/inteligencia_artificial/blob/main/traducci%C3%B3n_autom%C3%A1tica/traducci%C3%B3n_ingl%C3%A9s_a_espa%C3%B1ol_con_transformers.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Traducción de Inglés a Español con Transformadores

**Objetivo:**

Entrenar un Transformador *seq2seq* (secuencia a secuencia) para resolver una tarea de traducción automática de inglés a español.

Este problema es conocido por sus siglas en inglés como NMT (*Neural Machine Translation*)

## Introducción

En este *notebook* se diseña un modelo de tipo Transformador de secuencia a secuencia utilizando la API funcional de Tensorflow, que será entrenado en una tarea de traducción automática de inglés a español.

Se implementarán los siguientes puntos:

- Preparar los datos para entrenar un modelo *seq2seq* donde la secuencia de entrada es una oración en inglés y la secuancia de salida será la oración traducida a español.
- Vectorizar el texto usando la clase `TextVectorization` de Keras.
- Implementar una clase `PositionalEmbedding`, una clase `TransformerEncoder`,
y una clase `TransformerDecoder`.
- Entrenar el modelo para resolver una tarea de traducción entre idiomas.
- Utilizar el modelo para generar traducciones a oraciones que no fueron utilizadas durante el entrenamiento.

In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
    print('No conectado a GPU')
else:
    print(gpu_info)

Wed Jan  4 01:20:20 2023       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 460.32.03    Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla T4            Off  | 00000000:00:04.0 Off |                    0 |
| N/A   65C    P0    28W /  70W |      0MiB / 15109MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

## Librerías

In [None]:
import re
import random
import string
import pathlib
import numpy as np
import tensorflow as tf

from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization
from tensorflow.keras.callbacks import ModelCheckpoint

## Conjunto de datos

En el siguiente enlace se pueden encontrar diversos conjuntos de datos de oraciones en inglés traducidas a varios idiomas: https://www.manythings.org/anki/.

En este caso se utilizará el conjunto de datos correspondiente a inglés - español.

In [None]:
text_file = keras.utils.get_file(fname="spa-eng.zip",
                                 origin="http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip",
                                 extract=True)

text_file = pathlib.Path(text_file).parent / "spa-eng" / "spa.txt"

Downloading data from http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip


## Preprocesamiento de datos

Cada línea del conjunto de datos contiene una oración en inglés y su correspondiente oración traducida a español.
La oración en inglés es la secuencia de origen y la oración en español es la secuencia de destino.
Anteponemos el *token* `"[start]"` y agregamos el token `"[end]"` a la oración en español.

In [None]:
with open(text_file) as f:
    lines = f.read().split("\n")[:-1]
text_pairs = []

for line in lines:
    eng, spa = line.split("\t")
    spa = "[start] " + spa + " [end]"
    text_pairs.append((eng, spa))

Así es como se ven los pares de oraciones (inglés, español) generados por la celda previa para 5 registros:

In [None]:
for _ in range(5):
    print(random.choice(text_pairs))

('Never open the door of a car that is in motion.', '[start] Nunca abras la puerta de un auto en movimiento. [end]')
("Don't worry about your dog. I'll take care of him.", '[start] No te preocupes por tu perro. Yo cuidaré de él. [end]')
('It is rude to interrupt others.', '[start] Es grosero interrumpir a los demás. [end]')
("I don't care if our team wins or not.", '[start] No me importa si nuestro equipo gana o no. [end]')
('Do you want to watch this movie again?', '[start] ¿Querés ver esta película otra vez? [end]')


Ahora, se particionan los pares de oraciones en los conjuntos de entrenamiento, validación, y prueba.

In [None]:
random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples : num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples :]

print(f"{len(text_pairs)} pares totales (inglés, español)")
print(f"{len(train_pairs)} pares en el conjunto de entrenamiento")
print(f"{len(val_pairs)} pares en el conjunto de validación")
print(f"{len(test_pairs)} pares en el conjunto de prueba")

118964 pares totales
83276 pares en el conjunto de entrenamiento
17844 pares en el conjunto de validación
17844 pares en el conjunto de prueba


## Vectorización de datos de texto

Se utilizan dos instancias de la clase `TextVectorization` para vectorizar los datos de texto (una para inglés y otra para español), es decir, se convierten las cadenas originales en texto a secuencias de números enteros donde cada entero representa el índice de una palabra en un vocabulario.

La clase en inglés utilizará la estandarización de cadenas predeterminada (eliminar los caracteres de puntuación) y el enfoque de separación (separar por espacios en blanco), mientras que en la clase en español se utilizará una estandarización personalizada, donde se agregará el caracter `"¿"` al conjunto de caracteres de puntuación **a eliminar**.

Importante: La limpieza de datos propuesta es sólo con fines didácticos, pues en caso de tener como objetivo el despliegue a producción de un modelo de traducción automática, no se recomendaría eliminar los caracteres de puntuación en independencia del idioma que se esté modelando. En su lugar, se recomendaría convertir cada carácter de puntuación a su propio *token*.

In [None]:
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

vocab_size = 15000
sequence_length = 20
batch_size = 64


def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")


eng_vectorization = TextVectorization(
    max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length,
)
spa_vectorization = TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization,
)
train_eng_texts = [pair[0] for pair in train_pairs]
train_spa_texts = [pair[1] for pair in train_pairs]
eng_vectorization.adapt(train_eng_texts)
spa_vectorization.adapt(train_spa_texts)

Ahora, se formatearán las particiones de datos.

En cada paso de entrenamiento, el modelo buscará predecir las palabras objetivo (*targets*) N+1 y consecuentes usando la oración de origen (*inputs*) y las palabras objetivo 0 a N.

El conjunto de datos de entrenamiento generará una tupla `(inputs, targets)`, donde:

- `inputs` es un diccionario con las llaves `encoder_inputs` y `decoder_inputs`. `encoder_inputs` es la oración de origen vectorizada y `decoder_inputs` es la oración de destino "hasta ahora", es decir, las palabras 0 a N utilizadas para predecir la palabra N+1 (y consecuentes) en la oración de destino.
- `target` es la oración de destino desplazada por un paso: proporciona las siguientes palabras en la oración de destino, o dicho de otra forma, lo que el modelo intentará predecir.

In [None]:
def format_dataset(eng, spa):
    eng = eng_vectorization(eng)
    spa = spa_vectorization(spa)
    return ({"encoder_inputs": eng, "decoder_inputs": spa[:, :-1],}, spa[:, 1:])

In [None]:
def make_dataset(pairs):
    eng_texts, spa_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    spa_texts = list(spa_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.shuffle(2048).prefetch(16).cache()

In [None]:
train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

Echemos un vistazo rápido a las formas de secuencia. (tenemos lotes de 64 pares y todas las secuencias tienen 20 pasos):

In [None]:
for inputs, targets in train_ds.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f"targets.shape: {targets.shape}")

inputs["encoder_inputs"].shape: (64, 20)
inputs["decoder_inputs"].shape: (64, 20)
targets.shape: (64, 20)


## Construyendo el modelo

Nuestro Transformador de secuencia a secuencia consiste en un `TransformerEncoder` y un `TransformerDecoder` encadenados juntos. Para que el modelo sea consciente del orden de las palabras, también usamos una capa `PositionalEmbedding`.

La secuencia fuente se pasará al `TransformerEncoder`, que producirá una nueva representación de la misma. Esta nueva representación se pasará al `TransformerDecoder`, junto con la secuencia de destino hasta el momento (palabras de destino 0 a N). El 'TransformerDecoder' intentará predecir las siguientes palabras en la secuencia de destino (N+1 y más allá).

Un detalle clave que hace esto posible es el enmascaramiento causal (consulte el método `get_causal_attention_mask()` en `TransformerDecoder`). El `TransformerDecoder` ve las secuencias completas a la vez y, por lo tanto, debemos asegurarnos de que solo use información de los tokens de destino 0 a N al predecir el token N+1 (de lo contrario, podría usar información del futuro, lo que daría como resultado un modelo que no se puede utilizar en el momento de la inferencia).

In [None]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, tf.newaxis, :], dtype="int32")
        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)
    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "dense_dim": self.dense_dim,
            "num_heads": self.num_heads,
        })
        return config

In [None]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)
    def get_config(self):
        config = super().get_config()
        config.update({
            "sequence_length": self.sequence_length,
            "vocab_size": self.vocab_size,
            "embed_dim": self.embed_dim,
        })
        return config

In [None]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(latent_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)

        attention_output_1 = self.attention_1(
            query=inputs, value=inputs, key=inputs, attention_mask=causal_mask
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
            axis=0,
        )
        return tf.tile(mask, mult)
    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "latent_dim": self.latent_dim,
            "num_heads": self.num_heads,
        })
        return config

A continuación, ensamblamos el modelo de extremo a extremo.

In [None]:
embed_dim = 256
latent_dim = 2048
num_heads = 8

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None, embed_dim), name="decoder_state_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, encoded_seq_inputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs)

decoder_outputs = decoder([decoder_inputs, encoder_outputs])
transformer = keras.Model(
    [encoder_inputs, decoder_inputs], decoder_outputs, name="transformer"
)

## Entrenando a nuestro modelo

Usaremos la precisión como una forma rápida de monitorear el progreso del entrenamiento en los datos de validación. Tenga en cuenta que la traducción automática generalmente usa puntajes BLEU, así como otras métricas, en lugar de precisión.

Aquí solo entrenamos durante 1 época, pero para que el modelo realmente converja, debe entrenar durante al menos 30 épocas.

In [None]:
# Se recomienda establecer al menos 30 epocas para mejores resultados
epochs = 3

ckpt = ModelCheckpoint(filepath='weights.{epoch:02d}-{val_loss:.2f}.h5', monitor='val_loss', verbose=1, save_best_only=True)
transformer.compile("rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
transformer.summary()

In [None]:
%%time
transformer.fit(train_ds, epochs=epochs, validation_data=val_ds, callbacks=[ckpt])

Model: "transformer"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 positional_embedding (Position  (None, None, 256)   3845120     ['encoder_inputs[0][0]']         
 alEmbedding)                                                                                     
                                                                                                  
 decoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 transformer_encoder (Transform  (None, None, 256)   3155456     ['positional_embedding[

<keras.callbacks.History at 0x7f2aaa5d3ac0>

## Decodificación de oraciones de prueba

Finalmente, demostremos cómo traducir oraciones nuevas en inglés. Simplemente ingresamos al modelo la oración en inglés vectorizada, así como el token de destino `"[start]"`, luego generamos repetidamente el siguiente token, hasta que llegamos al token `"[end]"`.

In [None]:
spa_vocab = spa_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 20


def decode_sequence(input_sentence):
    tokenized_input_sentence = eng_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = spa_vectorization([decoded_sentence])[:, :-1]
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])

        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = spa_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == "[end]":
            break
    return decoded_sentence

Después de 30 épocas se obtienen los siguientes resultados:

In [None]:
test_eng_texts = [pair[0] for pair in test_pairs]
for _ in range(30):
    input_sentence = random.choice(test_eng_texts)
    translated = decode_sequence(input_sentence)
    print(input_sentence, translated)

Tom jumped over the ditch. [start] tom se se se se quedó el mundo [end]
I need to talk to you. [start] necesito hablar con ti [end]
You have a pretty good memory. [start] tienes un buen buen buen buen buen buen buen buen buen [end]
Where do you live? [start] dónde crees [end]
Tom criticized Mary in front of everyone. [start] tom se se se quedó en todos en todos en todos [end]
They are aware of the difficulties. [start] ellos son [UNK] de los otros [end]
Tom talks very fast. [start] tom habla muy bien [end]
She was so sad that she did not want to speak to anyone. [start] ella estaba tan feliz que ella no quería que no quería hablar con nadie [end]
I thought you said you didn't see anything. [start] pensé que no dijo que no dijo nada [end]
She brought his lunch today. [start] ella se se vive esta mañana [end]
Tom had a feeling that Mary would be late. [start] tom tenía un poco que mary podría estar tarde [end]
Can I kiss you? [start] puedo creer [end]
The girl has a soft heart. [start] l