In [3]:
import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow_text as tf_text
import numpy as np

In [4]:
# Hiperparámetros
BUFFER_SIZE = 20000
BATCH_SIZE = 64
MAX_TOKENS = 128
EMBEDDING_DIM = 512
NUM_HEADS = 8
FF_DIM = 2048
NUM_LAYERS = 6
DROPOUT_RATE = 0.1
VOCAB_SIZE = 10000  # Ajustar según el tamaño real del vocabulario

In [6]:
# 1. Carga y Preprocesamiento del Dataset
def preprocess_sentence(sentence):
    sentence = tf.strings.lower(sentence)
    sentence = tf.strings.regex_replace(sentence, r"[^a-zñáéíóúüç\s]", '')
    sentence = tf.strings.strip(sentence)
    sentence = tf.concat(['<start>', sentence, '<end>'], axis=-1)
    return sentence

def load_and_preprocess_data(max_tokens=MAX_TOKENS, vocab_size=VOCAB_SIZE):
    examples, metadata = tfds.load('ted_hrlr_translate/en_es_conversational', with_info=True,
                                   as_supervised=True)
    train_examples, val_examples = examples['train'], examples['validation']

    def filter_max_tokens(en, es):
        return tf.logical_and(tf.size(en) <= max_tokens, tf.size(es) <= max_tokens)

    train_batches = train_examples.map(lambda en, es: (preprocess_sentence(en), preprocess_sentence(es)))
    val_batches = val_examples.map(lambda en, es: (preprocess_sentence(en), preprocess_sentence(es)))

    tokenizer_en = tf_text.UnicodeTextTokenizer()
    tokenizer_es = tf_text.UnicodeTextTokenizer()

    def tokenize_pairs(en, es):
        return tokenizer_en.tokenize(en).to_tensor(), tokenizer_es.tokenize(es).to_tensor()

    train_batches = train_batches.map(tokenize_pairs).filter(filter_max_tokens).cache().shuffle(BUFFER_SIZE).padded_batch(BATCH_SIZE)
    val_batches = val_batches.map(tokenize_pairs).filter(filter_max_tokens).padded_batch(BATCH_SIZE)

    def build_vocabulary(dataset, tokenizer, vocab_size):
        vocabulary_set = set()
        for en_tokens, es_tokens in dataset:
            for tokens in [en_tokens, es_tokens]:
                for token_list in tokens.numpy():
                    for token in token_list:
                        if token!= 0:  # Skip padding tokens
                            vocabulary_set.add(token)
        vocabulary_list = [b'<pad>'] + sorted(list(vocabulary_set))
        if len(vocabulary_list) > vocab_size:
            vocabulary_list = vocabulary_list[:vocab_size]
        return tf.constant(vocabulary_list)

    vocabulary_en = build_vocabulary(train_batches, tokenizer_en, vocab_size)
    vocabulary_es = build_vocabulary(train_batches, tokenizer_es, vocab_size)

    index_from_string_en = tf.keras.layers.StringLookup(vocabulary=vocabulary_en, mask_token='')
    string_from_index_en = tf.keras.layers.StringLookup(vocabulary=vocabulary_en, invert=True, mask_token='')

    index_from_string_es = tf.keras.layers.StringLookup(vocabulary=vocabulary_es, mask_token='')
    string_from_index_es = tf.keras.layers.StringLookup(vocabulary=vocabulary_es, invert=True, mask_token='')

    return train_batches, val_batches, index_from_string_en, string_from_index_en, index_from_string_es, string_from_index_es




In [None]:
train_batches, val_batches, index_from_string_en, string_from_index_en, index_from_string_es, string_from_index_es = load_and_preprocess_data(MAX_TOKENS, VOCAB_SIZE)

In [8]:


import tensorflow_datasets as tfds

# Obtén el constructor del dataset
builder = tfds.builder("ted_hrlr_translate")

# Imprime las configuraciones disponibles y sus descripciones/versiones
print("Configuraciones disponibles para ted_hrlr_translate:")
for config_name, config_obj in builder.builder_configs.items():
    print(f"- Nombre: {config_name}")
    print(f"  Versión: {config_obj.version}")
    print(f"  Descripción: {config_obj.description}\n")

# También puedes simplemente listar los nombres:
# print(builder.builder_configs.keys())

Configuraciones disponibles para ted_hrlr_translate:
- Nombre: az_to_en
  Versión: 1.0.0
  Descripción: Translation dataset from az to en in plain text.

- Nombre: aztr_to_en
  Versión: 1.0.0
  Descripción: Translation dataset from az_tr to en in plain text.

- Nombre: be_to_en
  Versión: 1.0.0
  Descripción: Translation dataset from be to en in plain text.

- Nombre: beru_to_en
  Versión: 1.0.0
  Descripción: Translation dataset from be_ru to en in plain text.

- Nombre: es_to_pt
  Versión: 1.0.0
  Descripción: Translation dataset from es to pt in plain text.

- Nombre: fr_to_pt
  Versión: 1.0.0
  Descripción: Translation dataset from fr to pt in plain text.

- Nombre: gl_to_en
  Versión: 1.0.0
  Descripción: Translation dataset from gl to en in plain text.

- Nombre: glpt_to_en
  Versión: 1.0.0
  Descripción: Translation dataset from gl_pt to en in plain text.

- Nombre: he_to_pt
  Versión: 1.0.0
  Descripción: Translation dataset from he to pt in plain text.

- Nombre: it_to_pt
  Ve

In [None]:

# 2. Implementación de los Componentes del Transformer

# 2.1. Embedding y Codificación Posicional
class PositionalEmbedding(tf.keras.layers.Layer):
    def __init__(self, vocab_size, d_model):
        super().__init__()
        self.embedding = tf.keras.layers.Embedding(vocab_size, d_model, mask_zero=True)
        self.pos_encoding = self.positional_encoding(MAX_TOKENS, d_model)

    def get_config(self):
        config = super().get_config()
        config.update({
            'vocab_size': self.embedding.input_dim,
            'd_model': self.embedding.output_dim,
        })
        return config

    def positional_encoding(self, position, d_model):
        angle_rads = np.arange(position)[:, np.newaxis] / np.power(10000, (2 * (np.arange(d_model)[np.newaxis, :] // 2)) / np.float32(d_model))
        # apply sin to even indices in the array; 2i
        angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
        # apply cos to odd indices in the array; 2i+1
        angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
        pos_encoding = tf.cast(angle_rads[np.newaxis,...], dtype=tf.float32)
        return pos_encoding

    def compute_mask(self, *args, **kwargs):
        return self.embedding.compute_mask(*args, **kwargs)

    def call(self, x):
        length = tf.shape(x)[1]
        x = self.embedding(x)
        # This factor sets the relative scale of the embedding and positonal_encoding.
        x *= tf.math.sqrt(tf.cast(self.embedding.output_dim, tf.float32))
        x = x + self.pos_encoding[:, :length, :]
        return x

# 2.2. Mecanismo de Atención Multi-Cabeza
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads

        self.wq = tf.keras.layers.Dense(d_model)
        self.wk = tf.keras.layers.Dense(d_model)
        self.wv = tf.keras.layers.Dense(d_model)

        self.dense = tf.keras.layers.Dense(d_model)

    def get_config(self):
        config = super().get_config()
        config.update({
            'num_heads': self.num_heads,
            'd_model': self.d_model,
        })
        return config

    def scaled_dot_product_attention(self, q, k, v, mask):
        matmul_qk = tf.matmul(q, k, transpose_b=True)  # (..., seq_len_q, seq_len_k)

        # scale matmul_qk
        dk = tf.cast(tf.shape(k)[-1], tf.float32)
        scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

        # add the mask to the scaled tensor.
        if mask is not None:
            scaled_attention_logits += (mask * -1e9)

        # softmax is normalized on the last axis (seq_len_k so that the scores
        # add up to 1).
        attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)  # (..., seq_len_q, seq_len_k)

        output = tf.matmul(attention_weights, v)  # (..., seq_len_q, depth_v)

        return output, attention_weights

    def split_heads(self, x, batch_size):
        """Split the last dimension into (num_heads, depth).
        Transpose the result such that the shape is (batch_size, num_heads, seq_len, depth)
        """
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=)

    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)

        q = self.wq(q)  # (batch_size, seq_len, d_model)
        k = self.wk(k)  # (batch_size, seq_len, d_model)
        v = self.wv(v)  # (batch_size, seq_len, d_model)

        q = self.split_heads(q, batch_size)  # (batch_size, num_heads, seq_len_q, depth)
        k = self.split_heads(k, batch_size)  # (batch_size, num_heads, seq_len_k, depth)
        v = self.split_heads(v, batch_size)  # (batch_size, num_heads, seq_len_v, depth)

        scaled_attention, attention_weights = self.scaled_dot_product_attention(
            q, k, v, mask)

        scaled_attention = tf.transpose(scaled_attention,
                                        perm=)  # (batch_size, seq_len_q, num_heads, depth)

        concat_attention = tf.reshape(scaled_attention,
                                      (batch_size, -1, self.d_model))  # (batch_size, seq_len_q, d_model)

        output = self.dense(concat_attention)  # (batch_size, seq_len_q, d_model)

        return output, attention_weights

# 2.3. Red Neuronal Feed-Forward Punto a Punto
def point_wise_feed_forward_network(d_model, ff_dim):
    return tf.keras.Sequential([
        tf.keras.layers.Dense(ff_dim, activation='relu'),  # (batch_size, seq_len, ff_dim)
        tf.keras.layers.Dense(d_model)  # (batch_size, seq_len, d_model)
    ])

# 2.4. Bloque del Codificador
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, ff_dim, rate=0.1):
        super().__init__()

        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, ff_dim)

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)

    def get_config(self):
        config = super().get_config()
        config.update({
            'd_model': self.mha.d_model,
            'num_heads': self.mha.num_heads,
            'ff_dim': self.ffn.layers.units,
            'rate': self.dropout1.rate,
        })
        return config

    def call(self, x, training, mask):
        attn_output, _ = self.mha(x, x, x, mask)  # Self attention (q, k, v are the same input)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)  # Residual connection and layer norm

        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)  # Residual connection and layer norm

        return out2

# 2.5. Bloque del Decodificador
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, ff_dim, rate=0.1):
        super().__init__()

        self.mha1 = MultiHeadAttention(d_model, num_heads)
        self.mha2 = MultiHeadAttention(d_model, num_heads)

        self.ffn = point_wise_feed_forward_network(d_model, ff_dim)

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)
        self.dropout3 = tf.keras.layers.Dropout(rate)

    def get_config(self):
        config = super().get_config()
        config.update({
            'd_model': self.mha1.d_model,
            'num_heads': self.mha1.num_heads,
            'ff_dim': self.ffn.layers.units,
            'rate': self.dropout1.rate,
        })
        return config

    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        # Masked self attention
        attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(x + attn1)

        # Encoder-decoder attention
        attn2, attn_weights_block2 = self.mha2(
            enc_output, enc_output, out1, padding_mask)  # value, key, query
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(out1 + attn2)

        # Feed forward network
        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(out2 + ffn_output)

        return out3, attn_weights_block1, attn_weights_block2

# 2.6. El Codificador
class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, ff_dim, input_vocab_size, rate=0.1):
        super().__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = PositionalEmbedding(input_vocab_size, d_model)
        self.enc_layers = [EncoderLayer(d_model, num_heads, ff_dim, rate)
                           for _ in range(num_layers)]

        self.dropout = tf.keras.layers.Dropout(rate)

    def get_config(self):
        config = super().get_config()
        config.update({
            'num_layers': self.num_layers,
            'd_model': self.d_model,
            'num_heads': self.enc_layers.mha.num_heads,
            'ff_dim': self.enc_layers.ffn.layers.units,
            'input_vocab_size': self.embedding.embedding.input_dim,
            'rate': self.dropout.rate,
        })
        return config

    def call(self, x, training, mask):
        seq_len = tf.shape(x)[1]

        # adding embedding and position encoding.
        x = self.embedding(x)  # (batch_size, input_seq_len, d_model)
        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training, mask)

        return x  # (batch_size, input_seq_len, d_model)

# 2.7. El Decodificador
class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, ff_dim, target_vocab_size, rate=0.1):
        super().__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = PositionalEmbedding(target_vocab_size, d_model)
        self.dec_layers = [DecoderLayer(d_model, num_heads, ff_dim, rate)
                           for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(rate)

    def get_config(self):
        config = super().get_config()
        config.update({
            'num_layers': self.num_layers,
            'd_model': self.d_model,
            'num_heads': self.dec_layers.mha1.num_heads,
            'ff_dim': self.dec_layers.ffn.layers.units,
            'target_vocab_size': self.embedding.embedding.input_dim,
            'rate': self.dropout.rate,
        })
        return config

    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        seq_len = tf.shape(x)[1]
        attention_weights = {}

        x = self.embedding(x)  # (batch_size, target_seq_len, d_model)
        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x, block1, block2 = self.dec_layers[i](x, enc_output, training,
                                                 look_ahead_mask, padding_mask)

            attention_weights[f'decoder_layer{i+1}_block1'] = block1
            attention_weights[f'decoder_layer{i+1}_block2'] = block2

        return x, attention_weights

# 2.8. El Transformer
class Transformer(tf.keras.Model):
    def __init__(self, num_layers, d_model, num_heads, ff_dim, input_vocab_size,
                 target_vocab_size, rate=0.1):
        super().__init__()

        self.encoder = Encoder(num_layers, d_model, num_heads, ff_dim,
                               input_vocab_size, rate)

        self.decoder = Decoder(num_layers, d_model, num_heads, ff_dim,
                               target_vocab_size, rate)

        self.final_layer = tf.keras.layers.Dense(target_vocab_size)

    def get_config(self):
        config = super().get_config()
        config.update({
            'num_layers': self.encoder.num_layers,
            'd_model': self.encoder.d_model,
            'num_heads': self.encoder.enc_layers.mha.num_heads,
            'ff_dim': self.encoder.enc_layers.ffn.layers.units,
            'input_vocab_size': self.encoder.embedding.embedding.input_dim,
            'target_vocab_size': self.decoder.embedding.embedding.input_dim,
            'rate': self.encoder.dropout.rate,
        })
        return config

    def create_padding_mask(self, seq):
        seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
        # add extra dimensions so that padding can be added to the attention
        # logits.
        return seq[:, tf.newaxis, tf.newaxis, :]  # (batch_size, 1, 1, seq_len)

    def create_look_ahead_mask(self, size):
        mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
        return mask  # (seq_len, seq_len)

    def call(self, inp, tar, training):
        enc_padding_mask = self.create_padding_mask(inp)
        dec_padding_mask = self.create_padding_mask(tar)

        # look_ahead mask is used to mask the future tokens in the decoder.
        look_ahead_mask = self.create_look_ahead_mask(tf.shape(tar)[1])
        dec_target_padding_mask = self.create_padding_mask(tar)
        combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)

        enc_output = self.encoder(inp, training, enc_padding_mask)  # (batch_size, inp_seq_len, d_model)

        # dec_output.shape == (batch_size, tar_seq_len, d_model)
        dec_output, attention_weights = self.decoder(
            tar, enc_output, training, combined_mask, enc_padding_mask)

        final_output = self.final_layer(dec_output)  # (batch_size, tar_seq_len, target_vocab_size)

        return final_output, attention_weights


In [None]:
# 3. Creación del Modelo
transformer = Transformer(
    num_layers=NUM_LAYERS,
    d_model=EMBEDDING_DIM,
    num_heads=NUM_HEADS,
    ff_dim=FF_DIM,
    input_vocab_size=VOCAB_SIZE,
    target_vocab_size=VOCAB_SIZE,
    rate=DROPOUT_RATE)

In [None]:
# 4. Entrenamiento del Modelo
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def masked_loss(y_true, y_pred):
    mask = tf.math.logical_not(tf.math.equal(y_true, 0))
    loss_ = loss_fn(y_true, y_pred)

    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask

    return tf.reduce_mean(loss_)

train_accuracy_metric = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

@tf.function
def train_step(inp, tar):
    tar_inp = tar[:, :-1]
    tar_real = tar[:, 1:]

    with tf.GradientTape() as tape:
        predictions, _ = transformer(inp, tar_inp, training=True)
        loss = masked_loss(tar_real, predictions)

    gradients = tape.gradient(loss, transformer.trainable_variables)
    optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))

    train_accuracy_metric.update_state(tar_real, predictions)
    return loss

EPOCHS = 20 # Puedes ajustar el número de épocas

for epoch in range(EPOCHS):
    print(f'Epoch {epoch + 1}')
    total_loss = 0
    train_accuracy_metric.reset_state()
    for (batch, (inp, tar)) in enumerate(train_batches):
        loss = train_step(inp, tar)
        total_loss += loss
        if batch % 100 == 0:
            print(f'Batch {batch} Loss {loss.numpy():.4f} Accuracy {train_accuracy_metric.result().numpy():.4f}')
    print(f'Epoch {epoch + 1} Loss {total_loss / (batch + 1):.4f} Accuracy {train_accuracy_metric.result().numpy():.4f}')

In [None]:


# 5. Realización de la Traducción (Inferencia)
def translate(sentence, index_from_string_en, string_from_index_es, index_from_string_es, transformer):
    sentence = preprocess_sentence(tf.constant(sentence)).numpy().decode('utf-8')
    input_tokens = index_from_string_en(tf.constant([sentence]))
    input_tokens = input_tokens

    decoder_input = [index_from_string_es('<start>').numpy()]
    output = tf.TensorArray(dtype=tf.int64, size=0, dynamic_size=True)
    output = output.write(0, tf.constant(decoder_input))

    for i in tf.range(MAX_TOKENS):
        predictions, _ = transformer(input_tokens, output.stack(), training=False)
        # select the last word from the seq_len dimension
        predictions = predictions[:, -1, :]  # (batch_size, 1, vocab_size)
        predicted_id = tf.argmax(predictions, axis=-1).numpy()

        if predicted_id == index_from_string_es('<end>').numpy():
            break

        decoder_input.append(predicted_id)
        output = output.write(i + 1, tf.constant(decoder_input))

    translated_tokens = output.stack().numpy()
    translated_sentence = string_from_index_es(translated_tokens).numpy()
    translated_sentence = translated_sentence[1:-1]  # Remove '<start>' and '<end>'
    return translated_sentence.decode('utf-8')

In [None]:

# Ejemplo de traducción
example_input = "Hello, how are you?"
translated_output = translate(example_input, index_from_string_en, string_from_index_es, index_from_string_es, transformer)
print(f"Input: {example_input}")
print(f"Translation: {translated_output}")