In [2]:
!pip install numpy Pillow Matplotlib pandas seaborn scikit-learn requests scikit-image mlxtend prefixspan scikit-fuzzy umap-learn openpyxl stanza torch torchvision tesseract pytesseract nltk wordcloud spacy tensorflow==2.19.0 tensorflow-datasets opencv-python ucimlrepo nbconvert



In [3]:
import zipfile
import os
from pathlib import Path
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import requests
import random
import string
import re
import numpy as np
import tensorflow.data as tf_data
import tensorflow.strings as tf_strings
import keras
from keras import layers
from keras import ops
from keras.layers import TextVectorization

In [4]:
# vamos a descargar nuestro dataset
url = 'https://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip'
zip_path = "./spa-eng.zip"
print("Descargando dataset")

try:
    headers = {
        'User-Agent': 'Mozilla/5.0 (Linux; Android 7.0; WAS-L03T) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.101 Mobile Safari/537.36'
    }
    response = requests.get(url, headers=headers, stream=None)
    response.raise_for_status()

    # guardar archivo
    with open(zip_path, "wb") as f:
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                f.write(chunk)
    print("Descarga completa")

except requests.exceptions.RequestException as e:
    print(f"Error al descargar el archivo {e}")
    print("Intenta de nuevo")
    !wget -O spa-eng.zip https://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip

extract_path = "./spa-eng"
os.makedirs(extract_path, exist_ok=True)
print("Extrayendo el archivo")

try:
    with zipfile.ZipFile(zip_path, "r") as zip_ref:
        zip_ref.extractall(extract_path)
except zipfile.BadZipFile:
    print("Error al extraer el archivo zip")

actual_data_path = os.path.join(extract_path, "spa-eng")
print(f"Buscando datos en {actual_data_path}")

text_file = os.path.join(actual_data_path, "spa.txt")
print(f"Buscando datos en {text_file}")

if os.path.exists(text_file):
    with open(text_file, encoding="utf-8") as f:
        lines = f.read().split("\n")[:-1]
    text_pairs = []
    for line in lines:
        if "\t" in line:
            parts = line.split("\t")
            if len(parts) >= 2:
                eng, spa = parts[0], parts[1]
                spa = "[start]" + spa + "[end]"
                text_pairs.append((eng, spa))

    print(f"Correctamente cargados {len(text_pairs)} pares traducidos")

    print("Ejemplos")
    for i in range(min(5, len(text_pairs))):
        print(f"English {text_pairs[i][0]}")
        print(f"Spanish {text_pairs[i][1]}\n")

Descargando dataset
Descarga completa
Extrayendo el archivo
Buscando datos en ./spa-eng/spa-eng
Buscando datos en ./spa-eng/spa-eng/spa.txt
Correctamente cargados 118964 pares traducidos
Ejemplos
English Go.
Spanish [start]Ve.[end]

English Go.
Spanish [start]Vete.[end]

English Go.
Spanish [start]Vaya.[end]

English Go.
Spanish [start]Váyase.[end]

English Hi.
Spanish [start]Hola.[end]



In [5]:
for _ in range(5):
    print(random.choice(text_pairs))

('Tom got very angry at Mary.', '[start]Tom se enfadó mucho con Mary.[end]')
('Send him in.', '[start]Mandalo adentro.[end]')
('She is holding a red flower.', '[start]Ella está sosteniendo una flor roja.[end]')
('I have a collection of documentaries.', '[start]Tengo una colección de documentales.[end]')
("I'm going to change my shirt.", '[start]Me voy a cambiar de camisa.[end]')


In [6]:
random.shuffle(text_pairs)
num_val_samples = int(0.15*len(text_pairs))
num_train_samples = len(text_pairs) -2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples:num_train_samples + num_val_samples]
text_pairs = text_pairs[num_train_samples + num_val_samples:]

In [7]:
print(f'{len(text_pairs)} total pairs')
print(f'{len(train_pairs)} training pairs')
print(f'{len(val_pairs)} validation pairs')
print(f'{len(text_pairs)} test pairs')

17844 total pairs
83276 training pairs
17844 validation pairs
17844 test pairs


Vectorizar los datos: TextVectorization

Al vectorizar los datos, estamos conviertiendo esos "textos"a secuencias de números de enteros que sirvan para nuestro modelo

In [8]:
strip_chars = string.punctuation
strin_chars = strip_chars.replace(']','')
strin_chars = strip_chars.replace('[','')

vocab_size = 150000
sequence_length = 20
batch_size = 64

def custom_standarization(input_string):
    lowercase = tf_strings.lower(input_string)
    return tf_strings.regex_replace(lowercase, '[%s]' % re.escape(strin_chars), '')

eng_vectorization = TextVectorization(
    max_tokens=vocab_size,
    output_mode='int',
    output_sequence_length=sequence_length
)

spa_vectorization = TextVectorization(
    max_tokens=vocab_size,
    output_mode='int',
    output_sequence_length=sequence_length + 1,
    standardize=custom_standarization,
)

train_eng_texts = [pair[0] for pair in train_pairs]
train_spa_texts = [pair[1] for pair in train_pairs]
eng_vectorization.adapt(train_eng_texts)
spa_vectorization.adapt(train_spa_texts)

Formateo de nuestro dataset: tuplas - input (encoder y decoder) / target (lo que nuestro modelo trata de predecir)

In [9]:
def format_dataset(eng, spa):
    eng = eng_vectorization(eng)
    spa = spa_vectorization(spa)
    return (
        {
            'encoder_inputs': eng,
            'decoder_inputs': spa[:,:-1]
        },
        spa[:,:-1]
    )

def make_dataset(pairs):
    eng_texts, spa_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    spa_texts = list(spa_texts)
    dataset = tf_data.Dataset.from_tensor_slices((eng_texts, spa_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.cache().shuffle(2048).prefetch(16)

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

Tenemos que tener batches de 64 y en grupos, que van a ser mis secuencias para utilizar en el decoder y encoder

In [10]:
for inputs, targets in train_ds.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f'target.shape: {targets.shape}')

inputs["encoder_inputs"].shape: (64, 20)
inputs["decoder_inputs"].shape: (64, 20)
target.shape: (64, 20)


Estamos listos para construir nuestro modelo!

Tenemos un sec2sec transformer que necesita reconocer las posición de las frases/oraciones, entonces necesitamos una capa
PositionalEmbedding para poder tomar en cuenta el orden de las palabras

In [11]:
import keras.ops as ops
class TransformerEncoder(layers.Layer):
    def __init__(self,embed_dim,dense_dim,num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        #multihead attention
        self.attention = layers.MultiHeadAttention(
            num_heads = num_heads,key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [
                layers.Dense(dense_dim, activation = "relu"),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = ops.cast(mask[:,None,:], dtype="int32")
        else:
            padding_mask = None

        attention_output = self.attention(
            query=inputs,value=inputs,key=inputs,attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        #capa densa
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "embed_dim":self.embed_dim,
                "dense_dim":self.dense_dim,
                "num_heads":self.num_heads,
            }
        )
        return config

class PositionalEmbedding(layers.Layer):
    def __init__(self,sequence_length, vocab_size, embed_dim,**kwargs):
        super().__init__(**kwargs)
        #embeddings
        self.token_embeddings = layers.Embedding(
            input_dim = vocab_size, output_dim = embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim = embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = ops.shape(inputs)[-1]
        positions = ops.arange(0,length, 1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return ops.not_equal(inputs,0)

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "sequence_length": self.sequence_length,
                "vocab_size": self.vocab_size,
                "embed_dim": self.embed_dim,
            }
        )
        return config

class TransformerDecoder(layers.Layer):
    def __init__(self,embed_dim,latent_dim,num_heads,**kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        #MultiHead-attention
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads,key_dim = embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads,key_dim = embed_dim
        )
        self.dense_proj = keras.Sequential(
            [
                layers.Dense(latent_dim,activation="relu"),
                layers.Dense(embed_dim)
            ]
        )
        #normalizacion
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        inputs, encoder_outputs = inputs
        causal_mask = self.get_causal_attention_mask(inputs)

        if mask is None:
            inputs_padding_mask,encoder_outputs_padding_mask = None, None
        else:
            inputs_padding_mask, encoder_outputs_padding_mask = mask

        attention_output_1 = self.attention_1(
            query = inputs,
            value = inputs,
            key = inputs,
            attention_mask = causal_mask,
            query_mask = inputs_padding_mask,
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query = out_1,
            value = encoder_outputs,
            key = encoder_outputs,
            query_mask = inputs_padding_mask,
            key_mask = encoder_outputs_padding_mask,
        )

        out_2 = self.layernorm_2(out_1 + attention_output_2)

        #capa densa
        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)


    def get_causal_attention_mask(self,inputs):
        input_shape = ops.shape(inputs)
        batch_size,sequence_length = input_shape[0], input_shape[1]
        i = ops.arange(sequence_length)[:,None]
        j = ops.arange(sequence_length)
        mask = ops.cast(i >=j , dtype="int32")
        mask = ops.reshape(mask, (1, input_shape[1],input_shape[1]))
        mult = ops.concatenate(
            [ops.expand_dims(batch_size,-1),ops.convert_to_tensor([1,1])],
            axis=0,
        )
        return ops.tile(mask,mult)

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "embed_dim": self.embed_dim,
                "latent_dim": self.latent_dim,
                "num_heads": self.num_heads,
            }
        )
        return config

In [12]:
embed_dim = 256
latent_dim = 2048
num_heads = 8

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = PositionalEmbedding(sequence_length, vocab_size,embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)

decoder_inputs = keras.Input(shape=(None,),dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None,embed_dim), name ="decoder_state_inputs")
x = PositionalEmbedding(sequence_length,vocab_size,embed_dim)(decoder_inputs)
x=TransformerDecoder(embed_dim,latent_dim,num_heads)([x,encoded_seq_inputs])
x=layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size,activation="softmax")(x)
decoder = keras.Model([decoder_inputs,encoded_seq_inputs],decoder_outputs)

decoder_outputs = decoder([decoder_inputs,encoder_outputs])

Transformer = keras.Model(
    [encoder_inputs,decoder_inputs],
    decoder_outputs,
    name="transformer",
)



In [None]:
epochs = 1

Transformer.summary()
Transformer.compile(
    "rmsprop",
    loss=keras.losses.SparseCategoricalCrossentropy(ignore_class=0),
    metrics=["accuracy"],
)
Transformer.fit(train_ds,epochs=epochs,validation_data=val_ds)

In [None]:
spa_vocab = spa_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)),spa_vocab))
max_decoded_sentence_length = 20

def decoded_sequence(input_sequence):
    tokenized_input_sentence = eng_vectorization([input_sequence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = spa_vectorization([decoded_sentence])[:, :-1]
        predictions = Transformer.predict(
            {
                "encoder_inputs": tokenized_input_sentence,
                "decoder_inputs": tokenized_target_sentence,
            }
        )
        sampled_token_index = ops.convert_to_numpy(
            ops.argmax(predictions[0,i,:])
        ).item(0)
        sampled_token = spa_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == ["end"]:
            break
    return decoded_sentence

test_eng_texts = [pair[0] for pair in test_pairs]
for i in range(30):
    input_sentence = random.choice(test_eng_texts)
    translated = decoded_sequence(input_sentence)

In [None]:
test_eng_texts = [pair[0] for pair in test_pairs]
test_spa_texts = [pair[1] for pair in test_pairs]

print("Traducciones de selecciones random")
print("-"*50)

for i in range(30):
    idx = random.randint(0, len(test_eng_texts)-1)
    input_sentence = test_eng_texts[idx]
    reference_translation = test_spa_texts[idx].replace("[start] ","").replace(" [end]","")

    predicted_translation = decoded_sequence(input_sentence)
    predicted_translation = predicted_translation.replace("[start] ","").replace(" [end]","")

    print(f"English:{input_sentence}")
    print(f"Ref: {reference_translation}")
    print(f"Español: {predicted_translation}")