<a href="https://colab.research.google.com/github/JuanMr7/ChatbotIA/blob/main/Chatbot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import math
import re
import time
import pandas as pd
from bs4 import BeautifulSoup

from google.colab import drive
from pathlib import Path

from keras.layers import Input, Dense, Dropout, LayerNormalization

import tensorflow as tf
from tensorflow.keras import layers
import tensorflow_datasets as tfds

import urllib.request

# Descargar el DataSet en Colab
url_en = "https://github.com/JuanMr7/ProyectoIAChatBot/raw/main/europarl-v7.es-en.en"
url_es = "https://github.com/JuanMr7/ProyectoIAChatBot/raw/main/europarl-v7.es-en.es"

urllib.request.urlretrieve(url_en, "europarl-v7.es-en.en")
urllib.request.urlretrieve(url_es, "europarl-v7.es-en.es")

# Leer los archivos descargados
with open("europarl-v7.es-en.en", mode='r', encoding='utf-8') as f:
    europarl_en = f.read()

with open("europarl-v7.es-en.es", mode='r', encoding='utf-8') as f:
    europarl_es = f.read()


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#Limpieza de datos ingles
corpus_en = europarl_en
corpus_en = re.sub(r"\.(?=[0-9]|[a-z]|[A-Z])", ".$$$", corpus_en)
#Remover $$$ Marcadores
corpus_en = re.sub(r".\$\$\$", '', corpus_en)
#Remover espacios
corpus_en = re.sub(r" +", " ", corpus_en)
corpus_en = corpus_en.split('\n')

#Limpieza de datos espaniol
corpus_es = europarl_es
corpus_es = re.sub(r"\.(?=[0-9]|[a-z]|[A-Z])", ".$$$", corpus_es)
#Remover $$$ Marcadores
corpus_es = re.sub(r".\$\$\$", '', corpus_es)
#Remover espacios
corpus_es = re.sub(r" +", " ", corpus_es)
corpus_es = corpus_es.split('\n')

**Cambio**

In [None]:
#Tokenizacion
tokenizer_en = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(corpus_en, target_vocab_size=2**13)
tokenizer_es = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(corpus_es, target_vocab_size=2**13)

VOCAB_SIZE_EN = tokenizer_en.vocab_size + 2
VOCAB_SIZE_ES = tokenizer_es.vocab_size + 2

inputs = [[VOCAB_SIZE_EN-2] + tokenizer_en.encode(sentence) + [VOCAB_SIZE_EN-1] for sentence in corpus_en]
outputs = [[VOCAB_SIZE_ES-2] + tokenizer_es.encode(sentence) + [VOCAB_SIZE_ES-1] for sentence in corpus_es]


In [None]:
#Removiendo oraciones largas
MAX_LENGTH = 20
idx_to_remove = [count for count, sent in enumerate(inputs) if len(sent) > MAX_LENGTH]
for idx in reversed(idx_to_remove):
  del inputs[idx]
  del outputs[idx]

idx_to_remove = [count for count, sent in enumerate(outputs) if len(sent) > MAX_LENGTH]
for idx in reversed(idx_to_remove):
  del inputs[idx]
  del outputs[idx]

In [None]:
#Crear INPUTS y OUTPUTS
inputs = tf.keras.preprocessing.sequence.pad_sequences(inputs, value = 0, padding = 'post', maxlen = MAX_LENGTH)
outputs = tf.keras.preprocessing.sequence.pad_sequences(outputs, value = 0, padding = 'post', maxlen = MAX_LENGTH)

BATCH_SIZE = 64
BUFFER_SIZE = 20000

dataset = tf.data.Dataset.from_tensor_slices((inputs, outputs))
dataset = dataset.cache()
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)

In [None]:
#Armando modelo
class PositionalEncoding(layers.Layer):
  def __init__(self):
     super(PositionalEncoding, self).__init__()

  def get_angles(self, pos, i, d_models):
    angles = 1 / np.power(10000., (2*(i//2)) / np.float32(d_model))
    return pos * angles

  def call(self, inputs):
    seq_length = inputs.shape.as_list()[-2]
    d_model = inputs.shape.as_list()[-1]
    angles = self.get_angles(np.arrange(seq_length)[:, np.newaxis],
                             np.arrange(d_model)[np.newaxis, :], d_model)
    angles[:, 0::2] = np.sin(angles[:, 0::2])
    angles[:, 1::2] = np.cos(angles[:, 1::2])
    pos_encoding = angles[np.newaxis, ...]
    return inputs + tf.cast(pos_encoding, tf.float32)

In [None]:
#Atención
def scaled_dot_product_attention(queries, keys, values, mask):
  product= tf.matmul(queries, keys, transpose_b=True)

  keys_dim = tf.cast(tf.shape(keys)[-1], tf.float32)
  scaled_product = product / tf.math.sqrt(keys_dim)

  if mask is not None:
    scaled_product += (mask * -1e9)

    attention = tf.matmul(tf.nn.softmax(scaled_product, axis=-1), values)
    return attention

In [None]:
#SubCapa de atencion multiple

class MultiHeadAttention(layers.Layer):

  def __init__(self, nb_proj):
    super(MultiHeadAttention, self).__init__()
    self.nb_proj = nb_proj

  def build(self, input_shape):
    self.d_model = input_shape [-1]
    assert self.d_model % self.nb_proj == 0

    self.d_proj = self.d_model // self.nb_proj

    self.query_lin = layers.Dense(units=self.d_model)
    self.key_lin = layers.Dense(units=self.d_model)
    self.value_lin = layers.Dense(units=self.d_model)

    self.final_lin = layers.Dense(units=self.d_model)

  def split_proj(self, inputs, batch_size):
    shape = (batch_size,
             -1,
             self.nb_proj,
             self.d_proj)
    splitted_inputs = tf.reshape(inputs, shape=shape)
    return tf.transpose(splitted_inputs, perm=[0, 2, 1, 3])

  def call(self, queries, keys, values, mask):
    batch_size = tf.shape(queries)[0]

    queries = self.split_proj(queries, batch_size)
    keys = self.split_proj(keys, batch_size)
    values = self.split_proj(values, batch_size)

    attention = scaled_dot_product_attention(queries, keys, values, mask)
    attention = tf.transpose(attention, perm=[0, 2, 1, 3])

    concat_attention = tf.reshape(attention,
                                  shape=(batch_size, -1, self.d_model))

    outputs = self.final_lin(concat_attention)
    return outputs

In [None]:
class EncoderLayer(layers.Layer):

    def __init__(self, FFN_units, nb_proj, dropout_rate, name="encoder_layer"):
        super(EncoderLayer, self).__init__(name=name)
        self.FFN_units = FFN_units
        self.nb_proj = nb_proj
        self.dropout_rate = dropout_rate

    def build(self, input_shape):
        self.d_model = input_shape[-1]

        # Atencion de subcabezas multiples
        self.multi_head_attention = MultiHeadAttention(self.nb_proj)
        self.dropout = layers.Dropout(rate=self.dropout_rate)
        self.norm = layers.LayerNormalization(epsilon=1e-6)

        # Feed Forward Network
        self.dense_1 = layers.Dense(units=self.FFN_units, activation="relu")
        self.dense_2 = layers.Dense(units=self.d_model)
        self.dropout_ffn = layers.Dropout(rate=self.dropout_rate)
        self.norm_ffn = layers.LayerNormalization(epsilon=1e-6)

    def call(self, inputs, mask, training):
        attention = self.multi_head_attention(inputs, inputs, inputs, mask)
        attention = self.dropout(attention, training=training)
        attention = self.norm(attention + inputs)

        ffn_output = self.dense_1(attention)
        ffn_output = self.dense_2(ffn_output)
        ffn_output = self.dropout_ffn(ffn_output, training=training)
        outputs = self.norm_ffn(ffn_output + attention)

        return outputs


In [None]:
# Codificador (modificado por siaca)
class Encoder(layers.Layer):

    def __init__(self,
                 nb_layers,
                 FFN_units,
                 nb_proj,
                 dropout_rate,
                 vocab_size,
                 d_model,
                 name="encoder"):
        super(Encoder, self).__init__(name=name)
        self.nb_layers = nb_layers
        self.d_model = d_model

        self.embedding = layers.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding()
        self.dropout = layers.Dropout(rate=dropout_rate)

        self.enc_layers = [EncoderLayer(FFN_units,
                                        nb_proj,
                                        dropout_rate)
                           for _ in range(nb_layers)]

    def call(self, inputs, mask, training):
        outputs = self.embedding(inputs)
        outputs *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        outputs = self.pos_encoding(outputs)
        outputs = self.dropout(outputs, training=training)

        for i in range(self.nb_layers):
            outputs = self.enc_layers[i](outputs, mask, training)

        return outputs

In [None]:
#DecodificadorLayer

class DecoderLayer(layers.Layer):

  def __init__(self, FFN_units, nb_proj, dropout_rate):
     super(DecoderLayer, self).__init__()
     self.FFN_units = FFN_units
     self.nb_proj = nb_proj
     self.dropout_rate = dropout_rate

  def build(self, input_shape):
    self.d_model = input_shape[-1]

    #Atencion de subcabezas multiples
    self.multi_head_attention_1 = MultiHeadAttention(self.nb_proj)
    self.dropout_1 = layers.Dropout(rate=self.dropout_rate)
    self.norm_1 = layers.LayerNormalization(epsilon=1e-6)

    #Atencion de subcabezas multiples + encoder output
    self.multi_head_attention_2 = MultiHeadAttention(self.nb_proj)
    self.dropout_2 = layers.Dropout(rate=self.dropout_rate)
    self.norm_2 = layers.LayerNormalization(epsilon=1e-6)

    #FFN
    self.dense_1 = layers.Dense(units=self.FFN_units,
                                activation = "relu")
    self.dense_2 = layers.Dense(units=self.d_model)
    self.dropout_3 = layers.Dropout(rate=self.dropout_rate)
    self.norm_3 = layers.LayerNormalization(epsilon=1e-6)

  def call(self, inputs, enc_outputs, mask_1, mask_2, training):
    attention = self.multi_head_attention_1(inputs,
                                            inputs,
                                            inputs,
                                            mask_1)
    attention = self.dropout_1(attention, training)
    attention = self.norm_1(attention + inputs)

    attention_2 = self.multi_head_attention_2(attention,
                                              enc_outputs,
                                              enc_outputs,
                                              mask_2)

    attention_2 = self.dropout_2(attention_2, training)
    attention_2 = self.norm_2(attention_2 + attention)

    outputs = self.dense_1(attention_2)
    outputs = self.dense_2(outputs)
    outputs = self.dropout_3(outputs, training)
    outputs = self.norm_3(outputs + attention_2)

    return outputs

In [None]:
#Decodificador

class Decoder(layers.Layer):

    def __init__(self,
                 nb_layers,
                 FFN_units,
                 nb_proj,
                 dropout_rate,
                 vocab_size,
                 d_model,
                 name="decoder"):
        super(Decoder, self).__init__(name=name)
        self.d_model = d_model
        self.nb_layers = nb_layers

        self.embedding = layers.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding()
        self.dropout = layers.Dropout(rate=dropout_rate)

        self.dec_layers = [DecoderLayer(FFN_units,
                                        nb_proj,
                                        dropout_rate)
                           for i in range(nb_layers)]

    def call(self, inputs, enc_outputs, mask_1, mask_2, training):
        outputs = self.embedding(inputs)
        outputs *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        outputs = self.pos_encoding(outputs)
        outputs = self.dropout(outputs, training=training)

        for i in range(self.nb_layers):
          outputs = self.dec_layers[i](outputs,
                                       enc_outputs,
                                       mask_1,
                                       mask_2,
                                       training)
        return outputs

In [None]:
#Transformador

class Transformer(tf.keras.Model):
    def __init__(self,
                 vocab_size_enc,
                 vocab_size_dec,
                 d_model,
                 nb_layers,
                 FFN_units,
                 nb_proj,
                 dropout_rate,
                 name="transformer"):
        super(Transformer, self).__init__(name=name)

        self.encoder = Encoder(nb_layers,
                               FFN_units,
                               nb_proj,
                               dropout_rate,
                               vocab_size_enc,
                               d_model)

        self.decoder = Decoder(nb_layers,
                               FFN_units,
                               nb_proj,
                               dropout_rate,
                               vocab_size_dec,
                               d_model)

        self.last_linear = layers.Dense(units=vocab_size_dec, name="Lin_output")

    def create_padding_mask(self, seq):
      mask = tf.cast(tf.math.equal(seq, 0), tf.float32)
      return mask[:, tf.newaxis, tf.newaxis, :]

    def create_look_ahead_mask(self, seq):
      seq_len = tf.shape(seq)[1]
      look_ahead_mask = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)
      return look_ahead_mask

    def call(self, enc_inputs, dec_inputs, training):
      enc_mask = self.create_padding_mask(enc_inputs)
      dec_mask_1 = tf.maximum(
          self.create_padding_mask(dec_inputs),
          self.create_look_ahead_mask(dec_inputs)
      )
      dec_mask_2 = self.create_padding_mask(enc_inputs)

      enc_outputs = self.encoder(enc_inputs, enc_mask, training)
      dec_outputs = self.decoder(dec_inputs,
                                 enc_outputs,
                                 enc_outputs,
                                 dec_mask_1,
                                 dec_mask_2,
                                 training)

      outputs = self.last_linear(dec_outputs)
      return outputs







In [None]:
#Entrenamiento

tf.keras.backend.clear_session()

#Hyper-parametros

D_MODEL = 128
NB_LAYERS = 8
FFN_UNITS = 512
NB_PROJ = 8
DROPOUT_RATE = 0.1

transformer = Transformer(vocab_size_enc=VOCAB_SIZE_EN,
                          vocab_size_dec=VOCAB_SIZE_ES,
                          d_model=D_MODEL,
                          nb_layers=NB_LAYERS,
                          FFN_units=FFN_UNITS,
                          nb_proj=NB_PROJ,
                          dropout_rate=DROPOUT_RATE)

loss_objetct = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True,
                                                             reduction="none")

def loss_function(target, pred):
  mask = tf.math.logical_not(tf.math.equal(target, 0))
  loss_ = loss_objetct(target, pred)

  mask = tf.cast(mask, dtype=loss_.dtype)
  loss_ *= mask

  return tf.reduce_mean(loss_)

train_loss = tf.keras.metrics.Mean(name="train_loss")
train_accuracu = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy")

class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
  def __init__(self, d_model, warmup_steps=4000):
    super(CustomSchedule, self).__init__()

    self.d_model = tf.cast(d_model, tf.float32)
    self.warmup_steps = warmup_steps

  def call(self, step):
    arg1 = tf.math.rsqrt(step)
    arg2 = step * (self.warmup_steps**-1.5)

    return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

learning_rate = CustomSchedule(D_MODEL)

optimizer = tf.keras.optimizers.Adam(learning_rate,
                                     beta_1=0.9,
                                     beta_2=0.98,
                                     epsilon=1e-9)



NotImplementedError: Learning rate schedule 'CustomSchedule' must override `__call__(self, step)`.

In [None]:
#Checkpoints
# Define la ruta en Google Drive donde deseas almacenar los checkpoints
drive_path = Path('/content/drive/MyDrive/Elementos_compartidos_Montesdeoca_Juan/Carlos_Mendoza-Juan_Montesdeoca/ckpt')

# Crea el directorio si no existe
drive_path.mkdir(parents=True, exist_ok=True)

# Crea el objeto Checkpoint y el CheckpointManager
checkpoint_path = drive_path / "ckpt/"
ckpt = tf.train.Checkpoint(transformer=transformer, optimizer=optimizer)
ckpt_manager = tf.train.CheckpointManager(ckpt, str(checkpoint_path), max_to_keep=5)

if ckpt_manager.latest_checkpoint:
  ckpt.restore(ckpt_manager.latest_checkpoint)
  print("Latest Checkpoint Restored!!")

EPOCHS = 10

for epoch in range(EPOCHS):
  print("Start of epoch{}".format(epoch+1))
  start = time.time()

  train_loss.reset_states()
  train_accuracy.reset_states()

  for(batch, (enc_inputs, targets)) in enumerate(dataset):
    dec_inputs = targets[:, :-1]
    dec_outputs_real = targets[:, 1:]
    with tf.GradientTape() as tape:
      predictions = transformer(enc_inputs, dec_inputs, True)
      loss = loss_function(dec_outputs_real, predictions)

    gradients = tape.gradient(loss, transformer.trainable_variables)
    optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))

    train_loss(loss)
    train_accuracy(dec_outputs_real, predictions)

    if batch % 50 == 0:
      print("Epoch {} Batch {} Loss{:.4f} Accuracy {:.4f}".format(
          epoch+1, batch, train_loss.results(), train_accuracy.results()
      ))
  ckpt_save_path = ckpt.manager.save()
  print("Saving checkpoints for epoch {} at {}".format(epoch+1,
                                                       ckpt_save_path))
  print("Time taken for 1 epoch: {} secs \n".format(time.time() - start))

In [None]:
#EVALUANDO MODELO

def evaluate(inp_sentence):
  inp_sentence = \
    [VOCAB_SIZE_EN-2] + tokenizer_en.enode(inp_sentence) + [VOCAB_SIZE_EN - 1]
  enc_input = tf.expand.dims(inp_sentence, axis=0)

  output = tf.expand_dims([VOCAB_SIZE_ES-2], axis=0)

  for _ in range (MAX_LENGTH):
    predictions = transformer(enc_input, output, False)
    prediction = predictions [:, -1:, :]

    predicted_id = tf.cast(tf.argmax(prediction, axis=-1), tf.init32)

    if predicted_id == VOCAB_SIZE_ES-1:
      return tf.squeeze(output, axis=0)

    output = tf.concat([output, predicted_id], axis=-1)

  return tf.squeeze(output, axis=0)

def translate(sentence):
  output = evaluate(sentence).numpy()

  predicted_sentence = tokenizer_es.decode(
      [i for i in output if i < VOCAB_SIZE_ES-2]
  )

  print("Input: {}".format(sentence))
  print("Predicted translations: {}".format(predicted_sentence))

translate("This is a really powerful tool!")