
Tutoriales
https://www.tensorflow.org/text

https://www.tensorflow.org/text/tutorials/nmt_with_attention

Datasets
https://www.manythings.org/anki/

# Librerías y componentes

In [1]:
import tensorflow as tf
import string
import re
import random
import zipfile

from tensorflow import keras
from tensorflow.keras import layers

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import TextVectorization,GlobalMaxPooling1D, Dropout, MultiHeadAttention, Dense, LayerNormalization, Embedding, GRU, Bidirectional


# Dataset (archivo)

In [2]:
!wget http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip

--2024-11-12 15:13:02--  http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip
Resolving storage.googleapis.com (storage.googleapis.com)... 74.125.135.207, 142.250.99.207, 142.250.107.207, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|74.125.135.207|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2638744 (2.5M) [application/zip]
Saving to: ‘spa-eng.zip’


2024-11-12 15:13:02 (235 MB/s) - ‘spa-eng.zip’ saved [2638744/2638744]



In [3]:
with zipfile.ZipFile('/content/spa-eng.zip', 'r') as zip_ref:
    zip_ref.extractall('/content/')

# Dataset (preparación)

In [4]:
archivo = "spa-eng/spa.txt"
with open(archivo) as f:
  ejemplos = f.read().split("\n")[:-1]
dataset = []

In [5]:
for line in ejemplos:
  salida, entrada = line.split("\t")  # par hacerlo español a inglés

  # Cambios DRT
  salida = "[start] " + salida + " [end]"
  dataset.append((entrada, salida))

print(random.choice(dataset))

('Ella me miró.', '[start] She looked at me. [end]')


In [6]:
# Aleatorizar
random.shuffle(dataset)
no_muestras_val = int(0.15 * len(dataset))
no_muestras_train = len(dataset) - 2 * no_muestras_val
train_pairs = dataset[:no_muestras_train]
val_pairs = dataset[no_muestras_train:no_muestras_train + no_muestras_val]
test_pairs = dataset[no_muestras_train + no_muestras_val:]

In [7]:
caracteres_a_eliminar = string.punctuation + "¿"
caracteres_a_eliminar = caracteres_a_eliminar.replace("[", "")
caracteres_a_eliminar = caracteres_a_eliminar.replace("]", "")

def estandarizacion(input_string):
  lowercase = tf.strings.lower(input_string)
  return tf.strings.regex_replace(
      lowercase, f"[{re.escape(caracteres_a_eliminar)}]", "")

vocab_size = 15000
sequence_length = 20

# Hasta aquí OK

vectorizacion_entrada = TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)

vectorizacion_salida = TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=estandarizacion,
)

train_entrada_texts = [pair[0] for pair in train_pairs]
train_salida_texts = [pair[1] for pair in train_pairs]
vectorizacion_entrada.adapt(train_entrada_texts)
vectorizacion_salida.adapt(train_salida_texts)

In [8]:
batch_size = 64

def format_dataset(ent, sal):
  ent = vectorizacion_entrada(ent)
  sal = vectorizacion_salida(sal)
  return ({
      "entrada": ent,
      "salida": sal[:, :-1],
      }, sal[:, 1:])

def make_dataset(pairs):
  ent_texts, sal_texts = zip(*pairs)
  ent_texts = list(ent_texts)
  sal_texts = list(sal_texts)
  dataset = tf.data.Dataset.from_tensor_slices((ent_texts, sal_texts))
  dataset = dataset.batch(batch_size)
  dataset = dataset.map(format_dataset, num_parallel_calls=4)
  return dataset.shuffle(2048).prefetch(16).cache()

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [9]:
# Dataset output
for inputs, targets in train_ds.take(1):
  print(f"inputs['entrada'].shape: {inputs['entrada'].shape}")
  print(f"inputs['salida'].shape: {inputs['salida'].shape}")
  print(f"targets.shape: {targets.shape}")

inputs['entrada'].shape: (64, 20)
inputs['salida'].shape: (64, 20)
targets.shape: (64, 20)


#Transformer encoder

In [10]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [Dense(dense_dim, activation="relu"),
             Dense(embed_dim),]
        )
        self.layernorm_1 = LayerNormalization()
        self.layernorm_2 = LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        attention_output = self.attention(
            inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

# Transformer-based model

In [11]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1),
             tf.constant([1, 1], dtype=tf.int32)], axis=0)
        return tf.tile(mask, mult)

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(
                mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)
        else:
            padding_mask = mask
        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask)
        attention_output_1 = self.layernorm_1(inputs + attention_output_1)
        attention_output_2 = self.attention_2(
            query=attention_output_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        attention_output_2 = self.layernorm_2(
            attention_output_1 + attention_output_2)
        proj_output = self.dense_proj(attention_output_2)
        return self.layernorm_3(attention_output_2 + proj_output)

#Positional embedding

In [17]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        # Wrap tf.math.not_equal in a Lambda layer
        return layers.Lambda(lambda x: tf.math.not_equal(x, 0))(inputs)

    def get_config(self):
        config = super(PositionalEmbedding, self).get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

# Modelo

In [18]:
embed_dim = 256
dense_dim = 2048
num_heads = 8

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="entrada")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="salida")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)



# Entrenamiento

In [19]:
transformer.compile(
    optimizer="rmsprop",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"])

transformer.fit(train_ds, epochs=10, validation_data=val_ds)

Epoch 1/10




[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m94s[0m 61ms/step - accuracy: 0.1474 - loss: 4.5459 - val_accuracy: 0.2458 - val_loss: 2.5821
Epoch 2/10
[1m 802/1302[0m [32m━━━━━━━━━━━━[0m[37m━━━━━━━━[0m [1m22s[0m 45ms/step - accuracy: 0.2491 - loss: 2.5713

KeyboardInterrupt: 

# Evaluación

In [None]:
# Translate

import numpy as np
sal_vocab = vectorizacion_salida.get_vocabulary()
sal_index_lookup = dict(zip(range(len(sal_vocab)), sal_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
    tokenized_input_sentence = vectorizacion_entrada([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = vectorizacion_salida(
            [decoded_sentence])[:, :-1]
        predictions = transformer(
            [tokenized_input_sentence, tokenized_target_sentence])
        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = sal_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token
        if sampled_token == "[end]":
            break
    return decoded_sentence

In [None]:
test_ent_texts = [pair[0] for pair in test_pairs]
for _ in range(3):
    frase_entrada = random.choice(test_ent_texts)
    print("-")
    print(frase_entrada)
    print(decode_sequence(frase_entrada))

-
Mi madre me dio lo que quería de almuerzo.
[start] my mother gave me what you wanted was lunch [end]
-
Para los monos es sencillo trepar árboles.
[start] to children is easy to swim on us [end]
-
Tom la está pasando bien.
[start] tom is good on the way well [end]


In [None]:
frase_entrada = input('Ingrese frase en español: ')

print(frase_entrada)
print(decode_sequence(frase_entrada))

Ingrese frase en español: Cuál es el laboratorio?
Cuál es el laboratorio?
[start] what is the window [end]
