In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.layers import Embedding
from matplotlib import pyplot as plt
import random

from time import time
id_exec = int(time())

In [None]:
def clean_book(book, all_books):
    with open (book) as f:
        for line in f.readlines():
            line = line.lower().strip()
            line = line + ' '
            if 'page' in line and 'rowling' in line:
                continue
            all_books += line
    for c in ['”', '\"', '!', '"', '#', '$', '%', '&', '(', ')', '*', '+', ',', '-', '—', '.', '/', ':', ';', '<', '=', '>', '?', '@', '[', '\\', ']', '^', '_', '`', '{', '|', '}', '~', '\t', '\n']:
        all_books.replace(c, "")
    return all_books

In [None]:
from google.colab import files

def plot_metric(losses_train, losses_val, id_exec):
    fig = plt.figure()
    plt.plot([x for x in range(epochs)], losses_train)
    plt.plot([x for x in range(epochs)], losses_val)
    plt.title('LSTM Model Loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'])
    fig.savefig(str(id_exec)+'.png')
    files.download(str(id_exec)+'.png')
    plt.show()

In [None]:
book = ''
pross_book = "/content/drive/MyDrive/Book3.txt"
book = clean_book(pross_book, book)

In [None]:
tokenizer = Tokenizer(oov_token="<OOV>", filters='!"#$%&()*+,-—./:;<=>?@[\\]^_`{|}~\t\n')
tokenizer.fit_on_texts([book])

vocab = tokenizer.word_index
encoded = tokenizer.texts_to_sequences([book])

In [None]:
context = 5
vocab_size = 1000

sequences = []
for i in range(len(encoded[0])-context+1):
    sequences.append(encoded[0][i:i+context])
sequences = sequences[:vocab_size]
np.random.shuffle(sequences)

## Onehot para RNN simples

In [None]:
# # Aplica o onehot em toda a base
# sequences = list(map(
#     lambda x: [np.array((tf.transpose(x2), vocab_size)) for x2 in x],
#     sequences))

## Separação da base de dados

In [None]:
porcent = [0.7, 0.2, 0.10]
tam = len(sequences)
sequences_train = sequences[:int(tam*porcent[0])]
sequences_val = sequences[int(tam*porcent[0]):int(tam*(porcent[0]+porcent[1]))]
sequences_test =  sequences[int(tam*(porcent[0]+porcent[1])):]
# print(f"Len Datasets: [{len(sequences_train)}, {len(sequences_val)}, {len(sequences_test)}]")

# Baixando embbeding para rede

In [None]:
!wget --no-check-certificate \
     http://nlp.stanford.edu/data/glove.6B.zip \
     -O /tmp/glove.6B.zip

import os
import zipfile
with zipfile.ZipFile('/tmp/glove.6B.zip', 'r') as zip_ref:
    zip_ref.extractall('/tmp/glove')

In [None]:
embeddings_index = {}
f = open('/tmp/glove/glove.6B.100d.txt')
for line in f:
    values = line.split()
    word = values[0]
    coefs = np.array(values[1:], dtype='float32')
    embeddings_index[word] = coefs
f.close()

print('Found %s word vectors.' % len(embeddings_index))

Found 400000 word vectors.


In [None]:
embedding_matrix = np.zeros((len(vocab) + 1, 100))
for word, i in vocab.items():
    embedding_vector = embeddings_index.get(word)
    if embedding_vector is not None:
        # words not found in embedding index will be all-zeros.
        embedding_matrix[i] = embedding_vector

# Aplicando embedding na base de dados

In [None]:
# Aplica Embedding na base de dados
embedding_layer = Embedding(
    input_dim=len(vocab) + 1, output_dim=100,
    weights=[embedding_matrix], input_length=1, trainable=False)

aux = list(map(lambda x: [embedding_layer(c) for c in x], sequences_train))
for c_i in range(len(aux)): sequences_train[c_i] = aux[c_i][:-1] + [sequences_train[c_i][-1]]
random.shuffle(sequences_train)
aux = list(map(lambda x: [embedding_layer(c) for c in x], sequences_test))
for c_i in range(len(aux)): sequences_test[c_i] = aux[c_i][:-1] + [sequences_test[c_i][-1]]
random.shuffle(sequences_test)
aux = list(map(lambda x: [embedding_layer(c) for c in x], sequences_val))
for c_i in range(len(aux)): sequences_val[c_i] = aux[c_i][:-1] + [sequences_val[c_i][-1]]
random.shuffle(sequences_val)

## RNN simples com embedding 
### PS: após os testes com embedding a rede rnn simples foi modificada para funcionar apenas com essa configuração de dados

In [None]:
class EmbeddingModel(tf.keras.models.Model):
    def __init__(self, num_hiddens, vocab_size):
        super(EmbeddingModel, self).__init__()
        # célular simples de rnn (retornando state)
        self.rnn_cell = tf.keras.layers.SimpleRNNCell(
            num_hiddens, kernel_initializer='glorot_uniform')
        self.rnn = tf.keras.layers.RNN(
            self.rnn_cell, time_major=False,
            return_sequences=False, return_state=True)

        self.dense_1 = tf.keras.layers.Dense(vocab_size, activation='relu')
        self.dense_2 = tf.keras.layers.Dense(vocab_size, activation='softmax')

    def call(self, inputs, state, training=True):
        output, state = self.rnn(inputs, state)
        output = self.dense_1(tf.reshape(output, (-1, output.shape[-1])))
        output = self.dense_2(tf.reshape(output, (-1, output.shape[-1])))
        return output, state

    # Utilizado na chamada da primeira iteração da recorrência
    def get_begin_state(self, batch_size):
      return self.rnn_cell.get_initial_state(batch_size=batch_size, dtype=tf.float32)

# RNN profunda com embedding

In [None]:
class DeepEmbeddingModel(tf.keras.models.Model):
    def __init__(self, num_hiddens, vocab_size):
        super(DeepEmbeddingModel, self).__init__()
        # célular simples de rnn (retornando state)
        self.rnn_cell = tf.keras.layers.SimpleRNNCell(
            num_hiddens, kernel_initializer='glorot_uniform')
        self.rnn = tf.keras.layers.RNN(
            self.rnn_cell, time_major=False,
            return_sequences=False, return_state=True)

        self.dense_1 = tf.keras.layers.Dense(vocab_size, activation='tanh')
        self.dense_2 = tf.keras.layers.Dense(vocab_size, activation='tanh')
        self.dense_3 = tf.keras.layers.Dense(vocab_size, activation='tanh')
        self.dense_4 = tf.keras.layers.Dense(vocab_size, activation='tanh')

    def call(self, inputs, state, training=True):
        output, state = self.rnn(inputs, state)
        output = self.dense_1(tf.reshape(output, (-1, output.shape[-1])))
        output = self.dense_2(tf.reshape(output, (-1, output.shape[-1])))
        output = self.dense_3(tf.reshape(output, (-1, output.shape[-1])))
        output = self.dense_4(tf.reshape(output, (-1, output.shape[-1])))
        return output, state

    # Utilizado na chamada da primeira iteração da recorrência
    def get_begin_state(self, batch_size):
      return self.rnn_cell.get_initial_state(batch_size=batch_size, dtype=tf.float32)

# Função para predizer a próxima palavra

In [None]:
def predict(prefix, num_preds, model, vocab, tokenizer):
    prefix = tokenizer.texts_to_sequences(prefix.split())
    prediction = prefix.copy()

    aux = [embedding_layer(np.array(x)) for x in prefix]

    state = model.get_begin_state(1)
    last_word = aux[-1]

    aux = np.array(aux, dtype=np.float32)
    last_word = np.array([last_word], dtype=np.float32)

    for p in aux:
        print(len(model(np.array([p]), state)))
        _, state = model(np.array([p]), state)

    for i in range(num_preds):
        pred, state = model(last_word, state)
        pred = np.argmax(pred, axis=1)
        last_word = np.array([embedding_layer(pred)])
        prediction.append(pred)

    prediction = list(map(lambda x: x[0], prediction)) # Tira da lista de list
    return tokenizer.sequences_to_texts(np.array([prediction]))

# função para treinar os modelos

In [None]:
def fit(model, sequences_train, sequences_val, epochs, losses_train, losses_val):
    for epoch in range(epochs):
        sum_loss_train = 0.0

        for i, sample in enumerate(sequences_train):
            # Separa samples e y
            x = np.array([sample[0:-1]], dtype=np.float32);
            # y = np.array([sample[-1]], dtype=np.float32)
            y_onehot = np.asarray([tf.one_hot(tf.transpose(sample[-1]), vocab_size)]) # gambiarra?????????

            state = model.get_begin_state(1) # State padrão
            y_pred = None # Predição final

          # salva cada passo para cálculo do gradiente
            with tf.GradientTape() as tape:
                for x_sample in x:
                    y_pred, state = model(np.array([x_sample]), state, training=True)
                loss_value = loss_func(y_onehot, y_pred)

            grads = tape.gradient(loss_value, model.trainable_weights)
            optimizer.apply_gradients(zip(grads, model.trainable_weights))

            sum_loss_train += loss_value.numpy()

      #_________________________________________________________________________
            # if i % 50 == 0:
            #     print(f"[exec train: {i}/{len(sequences_train)}] [epo: {epoch}/{epochs-1}] Loss: {sum_loss_train/(i+1)}", end="\r")
        print(f"Final Mean Train Loss: {sum_loss_train/len(sequences_train):.8f}")
        losses_train.append(sum_loss_train/len(sequences_train))
        print("")
        #_________________________________________________________________________

      # validação dos dados
        sum_loss_val = 0.0
        for i, sample in enumerate(sequences_val):
            # Separa samples de y
            x = np.array([sample[0:-1]], dtype=np.float32);
            # y = np.array(sample[-1], dtype=np.float32)
            y_onehot = np.asarray([tf.one_hot(tf.transpose(sample[-1]), vocab_size)])


            state = model.get_begin_state(1) # State padrão
            y_pred = None # Predição final

            # salva cada passo para cálculo do gradiente
            for x_sample in x:
                y_pred, state = model(np.array([x_sample]), state, training=False)
            loss_value = loss_func(y_onehot, y_pred)

            sum_loss_val += loss_value.numpy()

        #_________________________________________________________________________
            # if i % 50 == 0:
            #     print(f"[exec val  : {i}/{len(sequences_val)}] [epo: {epoch}/{epochs-1}] Loss: {sum_loss_val/(i+1)}", end="\r")
        print(f"Final Mean Val Loss  : {sum_loss_val/len(sequences_val):.8f}")
        losses_val.append(sum_loss_val/len(sequences_val))
        print("")
        # print() # Limitador de print a cada época
     
        inp = 'harry potter was a'
        frase = predict(inp, 5, model, vocab, tokenizer)    
        print(f"{frase[0]}")

    return losses_train, losses_val

In [None]:
print(f"Iniciando execução: [{id_exec}]")

losses_train, losses_val = [], []
num_hiddens, batch_size, lr, epochs = 500, 1, 0.5, 10
optimizer = tf.keras.optimizers.Adam(lr)
loss_func = tf.keras.losses.CategoricalCrossentropy() 

model = EmbeddingModel(num_hiddens, vocab_size)
losses_train, losses_val = fit(model, sequences_train, sequences_val, epochs, losses_train, losses_val)

plot_metric(losses_train, losses_val, id_exec)

In [None]:
# model.save_weights('Lista2_Q1_RNN_Nathalia_Santos')

# Campo para digitar frase de predição

In [None]:
inp = input("Digite sua frase: ")
frase = predict(inp, 5, model, vocab, tokenizer)
print(f"{frase[0]}")

In [None]:
print(f"Iniciando execução: [{id_exec}]")

losses_train2, losses_val2 = [], []
num_hiddens, batch_size, lr, epochs = 500, 1, 0.005, 4
optimizer = tf.keras.optimizers.Adam(lr)
loss_func = tf.keras.losses.CategoricalCrossentropy() 

model_deep = DeepEmbeddingModel(num_hiddens, vocab_size)
losses_train2, losses_val2 = fit(model_deep, sequences_train, sequences_val, epochs, losses_train2, losses_val2)

# plot_metric(losses_train2, losses_val2, id_exec)

In [None]:
# model_deep.save_weights('Lista2_Q1_RNN_profunda_Nathalia_Santos')

# Campo para digitar frase de predição

In [None]:
inp = input("Digite sua frase: ")
frase = predict(inp, 5, model_deep, vocab, tokenizer)
print(f"{frase[0]}")

# LSTM

In [None]:
class LSTMModel(tf.keras.models.Model):
    def __init__(self, num_hiddens, vocab_size):
        super(LSTMModel, self).__init__()
        # célular simples de rnn (retornando state)
        self.lstm_cell = tf.keras.layers.LSTMCell(
            num_hiddens, kernel_initializer='glorot_uniform',
            recurrent_initializer='orthogonal')
        self.lstm = tf.keras.layers.RNN(
            self.lstm_cell, time_major=False,
            return_sequences=False, return_state=True)

        self.dense_1 = tf.keras.layers.Dense(vocab_size, activation='tanh')
        self.dense_2 = tf.keras.layers.Dense(vocab_size, activation='tanh')
        self.dense_3 = tf.keras.layers.Dense(vocab_size, activation='tanh')
        self.dense_4 = tf.keras.layers.Dense(vocab_size, activation='softmax')
        # self.vocab_size = vocab_size

    def call(self, inputs, state, training=True):
    
        output, state, _ = self.lstm(inputs, state)
        output = self.dense_1(tf.reshape(output, (-1, output.shape[-1])))
        output = self.dense_2(tf.reshape(output, (-1, output.shape[-1])))
        output = self.dense_3(tf.reshape(output, (-1, output.shape[-1])))
        output = self.dense_4(tf.reshape(output, (-1, output.shape[-1])))
        return output, state

    # Utilizado na chamada da primeira iteração da recorrência
    def get_begin_state(self, batch_size):
      return self.lstm_cell.get_initial_state(batch_size=batch_size, dtype=tf.float32)

In [None]:
print(f"Iniciando execução: [{id_exec}]")

losses_train3, losses_val3 = [], []
num_hiddens, batch_size, lr, epochs = 500, 1, 0.0005, 5
optimizer = tf.keras.optimizers.Adam(lr)
loss_func = tf.keras.losses.CategoricalCrossentropy() 

lstm = LSTMModel(num_hiddens, vocab_size)
losses_train3, losses_val3 = fit(lstm, sequences_train, sequences_val, epochs,losses_train3, losses_val3)

# plot_metric(losses_train3, losses_val3, id_exec)

In [None]:
lstm.save_weights('Lista2_Q1_LSTM_profundo_Nathalia_Santos')

# Campo para digitar frase de predição

In [None]:
inp = input("Digite sua frase: ")
frase = predict(inp, 5, lstm, vocab, tokenizer)
print(f"{frase[0]}")