# RNN

In [None]:
import os
import numpy as np
from os import listdir
from os.path import isfile, join
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # Disable tensorflow debugging logs
import tensorflow as tf
from tensorflow.keras import layers
import re #regex
import pickle
from tqdm import tqdm
from sklearn.model_selection import train_test_split #particiones

AUTOTUNE = tf.data.experimental.AUTOTUNE

In [None]:
def get_txt(path):
    """
    Regresa una lista con el contenido de todos los archivos de un directorio

    Args:
        path (str): ruta de la carpeta
    """
    text = []
    onlyfiles = [f for f in listdir(path) if isfile(join(path, f))]
    
    for file in onlyfiles:
        with open(path+"/"+file, 'r') as f:
            text += f.readlines()
    return text

# Guardamos cada película en un diccionario
# cada entrada del diccionario es una lista con las peliculas leídas
corpus = []
corpus += get_txt("../corpus/Pride & Prejudice")
corpus += get_txt("../corpus/Marvel")
corpus += get_txt("../corpus/Christopher Nolan")
corpus

In [None]:
len(corpus)

In [None]:
def get_clean(corpus):
    """
    Limpiamos el corpus pero manteniendo algunos signos de puntuación 
    que apaortan información

    Args:
        corpus(list): conjunto de diálogos de películas
    """
    clean = []
    pattern = r'[^a-z0-9 .,!;:]'
    for w in corpus:
        #convierte a minúsculas
        w = w.lower()
        w = re.sub(pattern,'', w)
        if w != '' or w == '\n':
            clean.append(w.strip())
    return clean

clean = get_clean(corpus)
clean

In [None]:
def get_longest_sentence(corpus):
    """
    Obtenemos la oración mas grande del corpus

    Args:
        corpus(list): conjunto de diálogos de películas
    """
    largest = []
    for sentence in corpus:
        if len(sentence) > len(largest):
            largest = sentence
    return largest

largest = get_longest_sentence(clean)
print(largest)
print(len(largest.split(' ')))

Cargamos el conjunto de entrenamiento

In [None]:
train = pickle.load(open('./pickles/datasets/train.pkl','rb'))
print('Número de cadenas train:',len(train))
print(train[:3])

Adecuamos los datos al tipo que requiere TensorFlow y preparamos los datos en bloques

In [None]:
raw_train_ds = tf.data.Dataset.from_tensor_slices(np.array(train).flatten())
batch_size = 32 #Tamaño del bloque
BUFFER_SIZE = len(raw_train_ds)
#Creamos bloques
raw_train_ds = (
    raw_train_ds
    .shuffle(BUFFER_SIZE)
    .batch(batch_size, drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE))


In [None]:
# Verificamos que lo hace bien
for batch in raw_train_ds.take(1):
    print(batch)

Hacemos que el texto de entrada se convierta en un vector que lo representa y adaptamos esta capa al conjunto de entrenamiento

In [None]:
voc_size = 20406 #Tamaño del vocabulario, del corpus

vectorize_layer = layers.TextVectorization(
    standardize=None,
    max_tokens=voc_size,
    output_mode='int',
    output_sequence_length=22,
)

vectorize_layer.adapt(raw_train_ds, 32)
vocab = vectorize_layer.get_vocabulary()
voc_size = len(vocab)
voc_size

In [None]:
vectorize_layer(['Love you', '3 millions'])

In [2]:
def get_input_target(text):
    """
    Dada la representación de una cadena 
    desplazamos esa representación en uno 
    (capa oculta, conserva contexto).
    Regresa el vector y el vector desplazado

    Args:
        text (str): frase a vectorizar
    """
    tokenized_text = vectorize_layer(text)
    input_text = tokenized_text[:, :-1]
    target_text = tokenized_text[:, 1:]
    return input_text, target_text

Obtenemos los vectores con su contexto (capa oculta)

In [None]:
train_ds = raw_train_ds.map(get_input_target)

for input_batch, target_batch in train_ds.take(1):
    print(input_batch.shape, target_batch.shape)
    print(input_batch[0], target_batch[0])

Definir modelo

In [None]:
emb_dim = 256
model_dim = 1024

In [None]:
class CustomRNN(tf.keras.Model):
    def __init__(self, voc_size, emb_dim, model_dim):
        super().__init__(self)
        """
        Creamos las capas de la red 
        """
        self.embedding = layers.Embedding(voc_size, emb_dim)
        self.gru = layers.GRU(model_dim,
                              return_sequences=True,
                              return_state=True)
        self.logits = layers.Dense(voc_size)

    def call(self, inputs, states=None, return_state=False, training=False):
        """
        Devuelve el valor del entrenamiento o entrena 
        """
        x = inputs
        x = self.embedding(x, training=training)
        if states is None:
            states = self.gru.get_initial_state(x)
        x, states = self.gru(x, initial_state=states, training=training)
        x = self.logits(x, training=training)

        if return_state:
            return x, states
        else:
            return x 

#Creamos el modelo
rnn = CustomRNN(voc_size=voc_size,
            emb_dim=emb_dim,
            model_dim=model_dim)

Probamos el modelo

In [None]:
for input_batch, target_batch in train_ds.take(1):
    predictions = rnn(target_batch)
    print(predictions.shape, target_batch.shape)

In [None]:
predictions[0].shape

In [None]:
rnn.summary()

In [None]:
pred_indices = tf.random.categorical(predictions[0], num_samples=1)
pred_indices[:, 0]

In [None]:
' '.join([vocab[_] for _ in input_batch[0]])

### Entrenamiento

In [None]:
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)
opt = tf.keras.optimizers.Adam(learning_rate=0.0001)
loss_metric = tf.keras.metrics.Mean(name='loss')

In [None]:
@tf.function
def train_step(input_batch, target_batch):
    with tf.GradientTape() as tape:
        logits = rnn(input_batch, training=True)
        loss_value = loss(target_batch, logits)

    gradients = tape.gradient(loss_value, rnn.trainable_weights)
    opt.apply_gradients(zip(gradients, rnn.trainable_weights))
    loss_metric(loss_value)

Creamos el modelo para guardar

In [None]:
ckpt = tf.train.Checkpoint(rnn)
ckpt_manager = tf.train.CheckpointManager(
    ckpt, 
    directory="./rnnModelCheckpoint/", 
    max_to_keep=3
)

Entrenamos

In [None]:
epochs = 100

for epoch in tqdm(range(epochs)):
    for input_batch, target_batch in train_ds:
        train_step(input_batch, target_batch)
        
    print(f'Epoch: {epoch} Loss: {loss_metric.result().numpy()}')
    loss_metric.reset_states()
    ckpt_manager.save(int(epoch)) #Guardamos

Cargamos el modelo

In [None]:
model = CustomRNN(voc_size=voc_size,
            emb_dim=emb_dim,
            model_dim=model_dim)
ckpt = tf.train.Checkpoint(model)
ckpt_manager = tf.train.CheckpointManager(
    ckpt, 
    directory="./rnnModelCheckpoint/", 
    max_to_keep=3
)
ckpt.restore(ckpt_manager.latest_checkpoint)

### Generación

In [None]:
def generate(start):
    """
    Dada una palabra de inicio genera el texto que le sigue
    """
    states = None
    context = tf.constant([start])
    output = [start]
    for i in range(21):
        # Obtener solo el primer elemento que regresa vectorize_layer
        pred_logits, states = model(vectorize_layer(context)[:, :1], 
                                    states=states, return_state=True)
        #print(pred_logits.shape)
        pred_index = tf.random.categorical(pred_logits[:, -1, :], 
                                        num_samples=1)

        #print(vocab[pred_index[0, 0]])
        context = tf.constant([vocab[pred_index[0, 0]]])
        output.append(vocab[pred_index[0, 0]])
    return output
    
' '.join(generate('Dr Strange'))

Generando oraciones para ser evaluadas

In [None]:
test = pickle.load(open('./pickles/datasets/test.pkl','rb'))

In [None]:
predictRNN = []

for sentence in tqdm(test):
    sentence_splited = sentence.split(' ')[:]
    length = len(sentence_splited)
    half_index = length // 2
    half = ' '.join(sentence_splited[:half_index])

    predictRNN.append(' '.join(generate(half)))

predictRNN[:20]

In [None]:
# Limpiamos oraciones generadas de espacios y oraciones vacías
predictRNN = list(map(lambda x: x.strip(), predictRNN))
predictRNN = list(filter(lambda x: len(x) > 0, predictRNN))

Guardamos para evaluar

In [None]:
pickle.dump(predictRNN, open('./pickles/predict/rnn.pkl', 'wb'))