# **Capítulo 9: Procesamiento de lenguaje natural**

## Traducción automática de texto: de español a inglés

### Traducción mediante un modelo de Transformer

Instalación de los recursos necesarios para el caso práctico

In [1]:
!wget -q http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip
!unzip -q spa-eng.zip
!pip install -q git+https://github.com/keras-team/keras-nlp.git --upgrade

  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m851.9/851.9 kB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.5/6.5 MB[0m [31m39.4 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m524.1/524.1 MB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m85.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.6/5.6 MB[0m [31m102.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m440.8/440.8 kB[0m [31m46.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for keras-nlp (pyproject.toml) ... [?25l[?25hdone


Importación de keras_nlp

In [None]:
import keras_nlp

Using TensorFlow backend


Cargado de datos

In [2]:
def cargar_datos():
    with open('spa-eng/spa.txt', 'r') as f:
        lineas = f.read().splitlines()
    pares = [linea.split('\t') for linea in lineas]
    esp = [par[1] for par in pares]
    ing = [par[0] for par in pares]
    return esp, ing

X, Y = cargar_datos()
print(f'Número de pares de oraciones: {len(X)}')
print(f'Posible entrada: {X[50]}')
print(f'Posible salida: {Y[50]}')


Número de pares de oraciones: 118964
Posible entrada: Estoy levantado.
Posible salida: I'm up.


Creación del vocabulario

In [None]:
import re

def crear_vocab(frases):
   # Obtenemos el vocabulario
   vocab = set()
   for f in frases:
       # Expresión regular para separar palabras
       # manteniendo signos de puntuación
       vocab.update(re.findall(r'\w+|[^\w\s]', f))

   # Creamos los diccionarios
   w2i = {w: i+4 for i, w in enumerate(vocab)}
   w2i['PAD'] = 0
   w2i['SOS'] = 1
   w2i['EOS'] = 2
   w2i['UNK'] = 3
   i2w = {i: w for w, i in w2i.items()}

   return w2i, i2w

X_w2i, X_i2w = crear_vocab(X)
Y_w2i, Y_i2w = crear_vocab(Y)
print(f'Tamaño del vocabulario de español: {len(X_w2i)}')
print(f'Tamaño del vocabulario de inglés: {len(Y_w2i)}')

Tamaño del vocabulario de español: 28993
Tamaño del vocabulario de inglés: 14779


Codificación de las secuencias con el vocabulario creado

In [None]:
def codificar(secs, w2i):
    secs_cod = []
    for s in secs:
        s_cod = [w2i[w] for w in re.findall(r'\w+|[^\w\s]', s)]
        s_cod = [w2i['SOS']] + s_cod + [w2i['EOS']]
        secs_cod.append(s_cod)
    return secs_cod

X_cod = codificar(X, X_w2i)
Y_cod = codificar(Y, Y_w2i)

División del conjunto de datos en entrenamiento y test (80-20)

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, Y_train, Y_test = train_test_split(X_cod, Y_cod,\
                                                    test_size=0.2,\
                                                    random_state=42)
print('¡Particiones realizadas!')
print(f'Tamaño del conjunto de entrenamiento: {len(X_train)}')
print(f'Tamaño del conjunto de test: {len(X_test)}')

¡Particiones realizadas!
Tamaño del conjunto de entrenamiento: 95171
Tamaño del conjunto de test: 23793


Preprocesado de los datos de entrenamiento

In [None]:
import numpy as np

def preproceso_batch(X, Y):
   max_len_X = max([len(x) for x in X])
   max_len_Y = max([len(y) for y in Y])

   encoder_input = np.zeros((len(X), max_len_X))
   decoder_input = np.zeros((len(Y), max_len_Y))
   salida = np.zeros((len(Y), max_len_Y))

   for i, s in enumerate(X):
       # Sec. completa con relleno para el encoder (frase a traducir)
       encoder_input[i, :len(s)] = np.array(s)

   for i, s in enumerate(Y):
       # Sec. sin el "EOS" con relleno para el decoder (traducción)
       decoder_input[i, :len(s)-1] = np.array(s[:-1])
       # Sec. sin el "SOS" con relleno para la salida (traducción)
       salida[i, :len(s)-1] = np.array(s[1:])

   src_pad_mask = (encoder_input == 0)
   tgt_pad_mask = (decoder_input == 0)

   encoder_input = encoder_input.astype(np.int64)
   decoder_input = decoder_input.astype(np.int64)
   salida = salida.astype(np.int64)

   return [encoder_input, decoder_input, src_pad_mask, tgt_pad_mask], salida

Creación de un generador de batches

In [None]:
from sklearn.utils import shuffle

def generador_batch(X, Y, batch_size):
    idx = 0
    while True:
        bx = X[idx:idx+batch_size]
        by = Y[idx:idx+batch_size]

        yield preproceso_batch(bx, by)

        idx = (idx + batch_size) % len(X)


batch_size = 128
train_loader = generador_batch(X_train, Y_train, batch_size=batch_size)
[be, bd, sp, tp], bs = next(train_loader)
print(f'Entrada al encoder: {[X_i2w[w.item()]for w in be[0]]}')
print(f'Entrada al decoder: {[Y_i2w[w.item()]for w in bd[0]]}')
print(f'Salida del decoder: {[Y_i2w[w.item()]for w in bs[0]]}')
print(f'Mascara del encoder: {sp[0]}')
print(f'Mascara del decoder: {tp[0]}')

Entrada al encoder: ['SOS', 'No', 'tengo', 'otra', 'opción', 'en', 'absoluto', '.', 'EOS', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD']
Entrada al decoder: ['SOS', 'I', 'have', 'no', 'choice', 'at', 'all', '.', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD']
Salida del decoder: ['I', 'have', 'no', 'choice', 'at', 'all', '.', 'EOS', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD']
Mascara del encoder: [False False False False False False False False False  True  True  True
  True  True  True  True  True  True]
Mascara del decoder: [False False False False False False False False  True  True  True  True
  True  True  True  True  True  True  True  True  True]


Creación del Positional Encoding

In [None]:
import tensorflow as tf
import numpy as np

class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, max_len, emb_dim, dropout=0.1):
        super(PositionalEncoding, self).__init__()
        self.dropout = tf.keras.layers.Dropout(dropout)

        pos = np.arange(max_len).reshape(-1, 1)
        den = np.power(10000, np.arange(0, emb_dim, 2) / emb_dim)
        pe = np.zeros((1, max_len, emb_dim))
        pe[0, :, 0::2] = np.sin(pos / den)
        pe[0, :, 1::2] = np.cos(pos / den)
        self.pe = tf.constant(pe, dtype=tf.float32)

    def call(self, x):
        # x.shape = [batch_size, sec_len, emb_dim]
        x = x + self.pe[:, :tf.shape(x)[1], :]
        return self.dropout(x)

Creación del modelo de Transformer

In [None]:
def crear_transformer(max_long,
                      emb_dim,
                      num_enc_capas,
                      num_dec_capas,
                      ncabezas,
                      src_vocab_tam,
                      tgt_vocab_tam,
                      dim_mlp,
                      dropout=0.1):

  # Definicion del encoder
  enc_entradas = tf.keras.Input(shape=(None,),
                                dtype="int64",
                                name="enc_entradas")
  mask_entradas_encoder = tf.keras.Input(shape=(None,),
                                         dtype="int64",
                                         name="mask_entradas_encoder")

  enc_salidas = tf.keras.layers.Embedding(src_vocab_tam, emb_dim)(enc_entradas)
  enc_salidas = PositionalEncoding(max_long, emb_dim, 0.1)(enc_salidas)

  for i in range(num_enc_capas):
       enc_salidas = keras_nlp.layers.TransformerEncoder(
           intermediate_dim=dim_mlp,
           num_heads=ncabezas,
           dropout=dropout,
           activation="relu",
           name=None)(enc_salidas, padding_mask=mask_entradas_encoder)

  # Definicion del decoder
  dec_entradas = tf.keras.Input(shape=(None,), dtype="int64",
                      name="dec_entradas")
  enc_seq_entradas = tf.keras.Input(shape=(None, emb_dim),
                          name="dec_state_entradas")

  mask_entradas_decoder = tf.keras.Input(shape=(None,),
                                         dtype="int64",
                                         name="mask_entradas_decoder")

  dec_salidas = tf.keras.layers.Embedding(tgt_vocab_tam, emb_dim)(dec_entradas)
  dec_salidas = PositionalEncoding(max_long, emb_dim, 0.1)(dec_salidas)

  capas_decoder = []
  for _ in range(num_dec_capas):
    capas_decoder.append(keras_nlp.layers.TransformerDecoder(
           intermediate_dim=dim_mlp,
           num_heads=ncabezas,
           dropout=dropout,
           activation="relu",
           name=None))

  trf_salida = dec_salidas

  for capa in capas_decoder:
       trf_salida = capa(decoder_sequence=trf_salida,
                         encoder_sequence=enc_salidas,
                         decoder_padding_mask=mask_entradas_decoder,
                         use_causal_mask=True)

  for capa in capas_decoder:
       dec_salidas = capa(decoder_sequence=dec_salidas,
                          encoder_sequence=enc_seq_entradas,
                          decoder_padding_mask=mask_entradas_decoder,
                          use_causal_mask=True)

  capa_salida = tf.keras.layers.Dense(tgt_vocab_tam,
                    activation="linear")

  salida_transformer = capa_salida(trf_salida)
  salida_decoder = capa_salida(dec_salidas)

  # Definicion del Transformer
  encoder = tf.keras.Model([enc_entradas, mask_entradas_encoder],
                          enc_salidas,
                          name="encoder",
  )

  decoder = tf.keras.Model([dec_entradas, enc_seq_entradas, mask_entradas_decoder],
                          salida_decoder,
                          name="decoder",
  )

  transformer = tf.keras.Model([enc_entradas, dec_entradas, mask_entradas_encoder, mask_entradas_decoder],
                          salida_transformer,
                          name="transformer",
  )

  transformer.summary()
  perdida = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, ignore_class=0)
  optimizador = tf.keras.optimizers.Adam(learning_rate=0.0001, beta_1=0.9, beta_2=0.98, epsilon=1e-9)
  transformer.compile(loss=perdida, optimizer=optimizador)

  return transformer, encoder, decoder


Instanciación del Transformer

In [None]:
max_long = max([len(x) for x in X + Y])
# Instancia del modelo Transformer
transformer, encoder, decoder = crear_transformer(
   max_long=max_long,
   emb_dim=512,
   num_enc_capas=6,
   num_dec_capas=6,
   ncabezas=8,
   src_vocab_tam=len(X_w2i),
   tgt_vocab_tam=len(Y_w2i),
   dim_mlp=2048,
   dropout=0.1
)

Model: "transformer"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 enc_entradas (InputLayer)   [(None, None)]               0         []                            
                                                                                                  
 embedding (Embedding)       (None, None, 512)            1484441   ['enc_entradas[0][0]']        
                                                          6                                       
                                                                                                  
 positional_encoding (Posit  (None, None, 512)            0         ['embedding[0][0]']           
 ionalEncoding)                                                                                   
                                                                                        

Entrenamiento del modelo

In [None]:
epocas = 20
train_loader = generador_batch(X_train, Y_train, batch_size=128)
transformer.fit(train_loader, epochs=epocas, steps_per_epoch=len(X_train)//128, verbose=1)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.src.callbacks.History at 0x7e84d80af6a0>

Definición de la función de decodificación

In [None]:
import numpy as np
import tensorflow as tf

def decodificacion_voraz(codificador, decodificador, src, src_mask, max_len, tgt_w2i, tgt_i2w):
   # Codificación
   print(src.shape)
   print(src_mask.shape)

   src_cod = codificador.predict([src, src_mask], verbose=0)

   # Decodificación
   tgt_token = tf.constant([[tgt_w2i['SOS']]], dtype=tf.int64)

   tgt_pred_decod = []
   for i in range(max_len):
       # Predicción del modelo
       tgt_pred = decodificador.predict([tgt_token, src_cod, (tgt_token == 0)], verbose=0)
       tgt_pred = tgt_pred[:, -1, :]  # Último token

       # Nos quedamos con el token más probable
       tgt_pred = tf.argmax(tgt_pred, axis=-1).numpy()[0]
       tgt_pred_decod.append(tgt_i2w[tgt_pred])

       print(f'token predicho: {tgt_pred}')
       print(f'secuencia: {tgt_pred_decod}')

       # Preparamos la nueva entrada del decoder
       tgt_token = np.hstack((tgt_token, np.array([[tgt_pred]])))

       # Comprobamos si se ha predicho el token de fin de secuencia
       if tgt_pred_decod[-1] == 'EOS':
           break

   return tgt_pred_decod

Función de traduccion

In [None]:
def traducir(codificador, decodificador, src_frase, src_w2i, tgt_w2i, tgt_i2w):
   # Codificamos la secuencia de entrada
   src_cod = codificar([src_frase], src_w2i)
   src_cod = tf.convert_to_tensor(src_cod, dtype=tf.int64)
   # src_cod = tf.expand_dims(src_cod, axis=0)  # Agregamos dimensión de batch [1, sec_len]

   # Máscara de ceros para el source (dejamos ver todo)
   src_mask = tf.zeros((1, src_cod.shape[1]))

   # Permitimos hasta 5 tokens más en la traducción
   max_len = src_cod.shape[1] + 5

   # Iniciamos la traducción
   tgt_pred_decod = decodificacion_voraz(codificador, decodificador, src_cod, src_mask, max_len, tgt_w2i, tgt_i2w)

   # Quitamos los tokens de inicio y fin de secuencia
   tgt_pred_decod = [t for t in tgt_pred_decod if t not in ['SOS', 'EOS']]
   return ' '.join(tgt_pred_decod)


Prueba de traducción de una frase

In [None]:
src_frase = 'Espero que te haya gustado el caso de estudio'
tgt_frase = traducir(
   encoder, decoder,
   src_frase,
   X_w2i,
   Y_w2i, Y_i2w,
)


(1, 11)
(1, 11)
token predicho: 4574
secuencia: ['He']
token predicho: 11534
secuencia: ['He', 'is']
token predicho: 2
secuencia: ['He', 'is', 'EOS']


### Traducción con Hugging Face Transformers

Instalación de la librería de Hugging Face Transformers

In [3]:
!pip install transformers
!pip install sentencepiece

Collecting transformers
  Downloading transformers-4.31.0-py3-none-any.whl (7.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.4/7.4 MB[0m [31m68.3 MB/s[0m eta [36m0:00:00[0m
Collecting huggingface-hub<1.0,>=0.14.1 (from transformers)
  Downloading huggingface_hub-0.16.4-py3-none-any.whl (268 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m268.8/268.8 kB[0m [31m32.5 MB/s[0m eta [36m0:00:00[0m
Collecting tokenizers!=0.11.3,<0.14,>=0.11.1 (from transformers)
  Downloading tokenizers-0.13.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (7.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.8/7.8 MB[0m [31m77.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting safetensors>=0.3.1 (from transformers)
  Downloading safetensors-0.3.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m60.4 MB/s[0m eta [36m0:00:0

Creación del tokenizador y del modelo

In [21]:
from transformers import AutoTokenizer, TFMarianMTModel
import tensorflow as tf

tokenizador = AutoTokenizer.from_pretrained("Helsinki-NLP/opus-mt-fr-en")
modelo = TFMarianMTModel.from_pretrained("Helsinki-NLP/opus-mt-fr-en")

All model checkpoint layers were used when initializing TFMarianMTModel.

All the layers of TFMarianMTModel were initialized from the model checkpoint at Helsinki-NLP/opus-mt-fr-en.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFMarianMTModel for predictions without further training.


Prueba del tokenizador implementado

In [22]:
tokenizador("Buenas tardes", return_tensors="tf")

{'input_ids': <tf.Tensor: shape=(1, 6), dtype=int32, numpy=array([[7545, 8277,    9, 2065,  114,    0]], dtype=int32)>, 'attention_mask': <tf.Tensor: shape=(1, 6), dtype=int32, numpy=array([[1, 1, 1, 1, 1, 1]], dtype=int32)>}

Prueba del modelo sin ajustarse

In [23]:
entrada = tokenizador(["Espero que te haya gustado el caso practico"], return_tensors="tf").input_ids
outputs = modelo.generate(entrada)
print(tokenizador.decode(outputs[0], skip_special_tokens=True))



Espero que te haya gustado el caso practico


División del conjunto de datos en entrenamiento y test con los datos sin tokenizar (80-20)

In [24]:
from sklearn.model_selection import train_test_split

X_train, X_test, Y_train, Y_test = train_test_split(X, Y,\
                                                    test_size=0.2,\
                                                    random_state=42)
print('¡Particiones realizadas!')
print(f'Tamaño del conjunto de entrenamiento: {len(X_train)}')
print(f'Tamaño del conjunto de test: {len(X_test)}')

¡Particiones realizadas!
Tamaño del conjunto de entrenamiento: 95171
Tamaño del conjunto de test: 23793


Reimplementación de la función de preprocesado de datos con el tokenizador implementado.

In [25]:
def preproceso_batch(X, Y, tokenizador):

   transformer_data = tokenizador(X, text_target=Y, return_tensors="tf", padding=True)

   return {"input_ids": transformer_data["input_ids"], "attention_mask": transformer_data["attention_mask"], "labels": transformer_data["labels"]}

Reimplementación del generador de batches con el tokenizador implementado

In [26]:
def generador_batch(X, Y, tokenizador, batch_size):
    idx = 0
    while True:
        bx = X[idx:idx+batch_size]
        by = Y[idx:idx+batch_size]

        yield preproceso_batch(bx, by, tokenizador)

        idx = (idx + batch_size) % len(X)

train_loader = generador_batch(X_train, Y_train, tokenizador=tokenizador, batch_size=16)

Compilación del modelo para entrenarlo

In [27]:
modelo.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, ignore_class=0),
               optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001, beta_1=0.9, beta_2=0.98, epsilon=1e-9))

Ajuste del modelo para nuestro conjunto de datos

In [28]:
modelo.fit(train_loader, epochs=1, steps_per_epoch=len(X_train)//16, verbose=1)



<keras.src.callbacks.History at 0x7acf4694d9c0>

Prueba de traducción con el modelo ajustado

In [29]:
entrada = tokenizador(["Espero que te haya gustado el caso practico"], return_tensors="tf").input_ids
outputs = modelo.generate(entrada)
print(tokenizador.decode(outputs[0], skip_special_tokens=True))

I hope you'd enjoyed the practice case.
