# SetUp

In [None]:
%%capture
!git clone https://github.com/Ryuksito/chatbot.git
!pip install --upgrade keras-nlp

In [2]:
from google.colab import drive
drive.mount('/content/drive')


import os

os.environ["KERAS_BACKEND"] = "tensorflow"

import pathlib
import random
import string
import re
import numpy as np
import json

import tensorflow.data as tf_data
import tensorflow.strings as tf_strings
import tensorflow as tf

import keras
from keras import layers
from keras import ops
from keras.layers import TextVectorization
import keras_nlp

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
# Consts

SEED = 42
AUTOTUNE = tf.data.AUTOTUNE

DATASET_PATH = '/content/chatbot/data/instructions.json'
CORPUS_PATH = '/content/chatbot/data/corpus.txt'
VOCAB_PATH = '/content/chatbot/weights/vocab.txt'
METADATA_PATH = '/content/chatbot/weights/metadata.json'
EMBEDDINGS_PATH = '/content/chatbot/weights/embedding.weights.npy'

MODEL_DIR = '/content/drive/MyDrive/Exposiciones/Chatbot/weights/chatbot_model/'

VOCAB_SIZE = 15000
SEQ_LENGTH = 2**8
EMBED_DIM = 2**8
BATCH_SIZE = 2**6
TAKE = 100
LATENT_DIM = 2**11
NUM_LAYERS = 4
NUM_HEADS = 8
EPOCHS = 60


with open(VOCAB_PATH, 'r', encoding='utf-8') as file:
  VOCAB = file.read().split('\n')

def replace_first_zero(tensor, scalar):
    mask = tf.equal(tensor, 0)

    indices = tf.where(mask)
    if tf.size(indices) == 0:
        print(tensor)
        raise ValueError("No se encontró un 0 en el tensor")

    first_zero_index = indices[0][0]

    updated_tensor = tf.tensor_scatter_nd_update(tensor, [[first_zero_index]], [scalar])

    return updated_tensor

# Preprocesar los datos

## Cargar la capa de embeddings

In [4]:
weights = np.load(EMBEDDINGS_PATH)
embedding_layer = layers.Embedding(VOCAB_SIZE,
                      SEQ_LENGTH,
                      name="w2v_embedding")

# paso de forward para inicializar la capa
dummy_target = tf.zeros((1,), dtype=tf.int64)
embedding_layer(dummy_target)

embedding_layer.set_weights([weights])

embedding_layer.trainable = False

## Cargar el dataset

In [5]:
# Load Dataset

with open(DATASET_PATH, 'r', encoding='utf-8') as f:
  dataset = json.load(f)

instructions = []
answers = []

for i in range(len(dataset['data'])):
  instructions.append(
      dataset['data'][i]['instruction'].lower()
      )
  answers.append(
      '<bos> ' +
      dataset['data'][i]['answer'].lower()
      + ' <eos>'
      )

# Imprimir datos
for i, a in zip(instructions[0:1], answers[0:1]):
  print(i + ' --> ' + a)

sugiera un eslogan para una campaña de reciclaje.
 --> <bos> 1. "reduce, reutiliza, recicla: juntos por un futuro más verde."
2. "recicla hoy, para un mañana mejor."
3. "¡convierte tu basura en tesoro - recicla!"
4. "recicla por el ciclo de vida."
5. "ahorra recursos, recicla más." <eos>


## Cargar el tokenizer

In [6]:
tokenizer = keras_nlp.tokenizers.WordPieceTokenizer(
     vocabulary=VOCAB,
     lowercase=True,
     suffix_indicator='<pow>',
     oov_token='<unk>',
     sequence_length=SEQ_LENGTH + 1,
     special_tokens=['<bos>', '<eos>', '<sep>', '<mask>', '<unk>', '<pow>'],
     special_tokens_in_strings=True
)

In [7]:
print(tokenizer(instructions[0:5])[:-1].shape)
print(tokenizer(answers[0])[:-1].shape)
print(tokenizer(answers[0])[1:].shape)

(4, 257)
(256,)
(256,)


# Dataset pipeline

In [8]:
def format_dataset(instruction, answer):
    instruction = tokenizer(instruction)
    answer = tokenizer(answer)
    return (
        {
            "encoder_inputs": instruction[:, :-1],
            "decoder_inputs": answer[:, :-1],
        },
        answer[:, 1:],
    )


def make_dataset(instructions, answers):
    dataset = tf_data.Dataset.from_tensor_slices((instructions, answers))
    dataset = dataset.batch(BATCH_SIZE)
    dataset = dataset.map(format_dataset)
    return dataset.cache().shuffle(2048).prefetch(16)


train_ds = make_dataset(instructions, answers)

In [9]:
for inputs, targets in train_ds.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f"targets.shape: {targets.shape}")

inputs["encoder_inputs"].shape: (64, 256)
inputs["decoder_inputs"].shape: (64, 256)
targets.shape: (64, 256)


# Model Clases

## Encoder

In [10]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [
                layers.Dense(dense_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = ops.cast(mask[:, None, :], dtype="int32")
        else:
            padding_mask = None

        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "embed_dim": self.embed_dim,
                "dense_dim": self.dense_dim,
                "num_heads": self.num_heads,
            }
        )
        return config

## Positional Embedding

In [11]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, features_embeddings, **kwargs):
        super().__init__(**kwargs)

        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim
        self.features_embeddings = features_embeddings

    def build(self):
        self.token_embeddings = layers.Embedding(
            input_dim=self.vocab_size, output_dim=self.embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=self.sequence_length, output_dim=self.embed_dim
        )

    def call(self, inputs):
        length = ops.shape(inputs)[-1]
        positions = ops.arange(0, length, 1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)

        embedded_features = self.features_embeddings(inputs)
        return embedded_tokens + embedded_positions + embedded_features

    def compute_mask(self, inputs, mask=None):
        if mask is None:
            return None
        else:
            return ops.not_equal(inputs, 0)

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "sequence_length": self.sequence_length,
                "vocab_size": self.vocab_size,
                "embed_dim": self.embed_dim,
            }
        )
        return config

## Decoder

In [12]:

class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [
                layers.Dense(latent_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = ops.cast(mask[:, None, :], dtype="int32")
            padding_mask = ops.minimum(padding_mask, causal_mask)
        else:
            padding_mask = None

        attention_output_1 = self.attention_1(
            query=inputs, value=inputs, key=inputs, attention_mask=causal_mask
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = ops.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = ops.arange(sequence_length)[:, None]
        j = ops.arange(sequence_length)
        mask = ops.cast(i >= j, dtype="int32")
        mask = ops.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = ops.concatenate(
            [ops.expand_dims(batch_size, -1), ops.convert_to_tensor([1, 1])],
            axis=0,
        )
        return ops.tile(mask, mult)

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "embed_dim": self.embed_dim,
                "latent_dim": self.latent_dim,
                "num_heads": self.num_heads,
            }
        )
        return config


# Crear, Compilar y Entrenar

In [13]:

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = PositionalEmbedding(SEQ_LENGTH, VOCAB_SIZE, EMBED_DIM, embedding_layer)(encoder_inputs)
encoder_outputs = TransformerEncoder(EMBED_DIM, LATENT_DIM, NUM_HEADS)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None, EMBED_DIM), name="decoder_state_inputs")
x = PositionalEmbedding(SEQ_LENGTH, VOCAB_SIZE, EMBED_DIM, embedding_layer)(decoder_inputs)
x = TransformerDecoder(EMBED_DIM, LATENT_DIM, NUM_HEADS)(x, encoded_seq_inputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(VOCAB_SIZE, activation="softmax")(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs)

decoder_outputs = decoder([decoder_inputs, encoder_outputs])
chatbot = keras.Model(
    [encoder_inputs, decoder_inputs], decoder_outputs, name="transformer"
)

In [14]:

chatbot.summary()

chatbot.compile(
    "rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)

In [None]:
early_stopping = keras.callbacks.EarlyStopping(monitor='loss',patience=5, restore_best_weights=True)
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir="logs")
check_point = tf.keras.callbacks.ModelCheckpoint(
        filepath=os.path.join(MODEL_DIR, "weights" + "_epoch_{epoch}" + '.weights.h5'),
        monitor="loss",
        save_best_only=False,
        save_weights_only=True,
    )


chatbot.fit(train_ds.take(1), epochs=EPOCHS//EPOCHS, callbacks=[early_stopping, tensorboard_callback, check_point])



[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m58s[0m 58s/step - accuracy: 0.0000e+00 - loss: 9.6953


<keras.src.callbacks.history.History at 0x786e02d713c0>

# Decodificar

In [22]:
vocab = tokenizer.get_vocabulary()
index_lookup = dict(zip(range(VOCAB_SIZE), vocab))


def decode_sequence(input_sentence):
    tokenized_input_sentence = tokenizer(input_sentence)[:-1]
    decoded_sentence = "<start>"
    tokenized_target_sentence = tokenizer(decoded_sentence)[:-1]

    print('Chatbot:', end='')

    for i in range(tf.where(tf.not_equal(tokenized_target_sentence, 0)).shape[0], SEQ_LENGTH):

        predictions = chatbot([tokenized_input_sentence[None, :], tokenized_target_sentence[None, :]])

        sampled_token_index = ops.convert_to_numpy(
            ops.argmax(predictions[0, i, :])
        ).item(0)

        tokenized_target_sentence = replace_first_zero(tokenized_target_sentence, sampled_token_index)

        sampled_token = index_lookup[sampled_token_index]
        if '<pow>' in sampled_token:
          sampled_token = sampled_token.replace('<pow>', '')
        else:
          if (i+1) % 8 == 0:
            sampled_token = sampled_token + '\n'
          else:
              sampled_token = ' ' + sampled_token

        print(sampled_token, end='')

        if sampled_token == "<end>":
            break
    return tokenized_target_sentence


In [None]:
input_sentence = input('User: ').lower()

try:
  out_tokenized_text = decode_sequence(input_sentence)
except KeyboardInterrupt:
  print('\n End Chat')