# Import libraries

In [1]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
import pickle
import json
import time
from os import getcwd, listdir, mkdir, chdir
from os.path import join
import requests
if 'colab' in str(get_ipython()): from google.colab import files

# Parameters

In [2]:
length_generated_text = 1000
seed = '§'
model_filename = 'model___run__2021-03-25__17-42-59___Stateful_GRU_LN_encoding_dense.h5'
tokenizer_filename = 'tokenizer___run__2021-03-25__17-42-59___Stateful_GRU_LN_encoding_dense.json'
model_directory = 'models'
model_url = 'https://github.com/fabio-a-oliveira/14-CFR-FAA/blob/main/models/model___run__2021-03-25__17-42-59___Stateful_GRU_LN_encoding_dense.h5?raw=true'
tokenizer_url = 'https://github.com/fabio-a-oliveira/14-CFR-FAA/blob/main/models/tokenizer___run__2021-03-25__17-42-59___Stateful_GRU_LN_encoding_dense.json?raw=true'

generated_text_filename = time.strftime("generated_text___" + str(length_generated_text) + "_chars__%Y-%m-%d__%H-%M-%S")
generated_text_directory = 'generated_text'

# Functions and custom classes definitions

In [3]:
class LN_GRU_Cell(keras.layers.Layer):
    def __init__(self, units, activation="tanh", dropout=0, recurrent_dropout=0, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.dropout = dropout
        self.recurrent_dropout = recurrent_dropout
        self.state_size = units
        self.output_size = units
        self.GRU_cell = keras.layers.GRUCell(units, dropout=dropout, recurrent_dropout=recurrent_dropout, activation=None)
        self.layer_norm = keras.layers.LayerNormalization()
        self.activation = keras.activations.get(activation)
    def call(self, inputs, states):
        outputs, new_states = self.GRU_cell(inputs, states)
        norm_outputs = self.activation(self.layer_norm(outputs))
        return norm_outputs, [new_states]
    def get_config(self):
        base_config = super().get_config()
        custom_config = {'units':self.units,
                         'dropout':self.dropout,
                         'recurrent_dropout':self.recurrent_dropout,
                         'activation':self.activation}
        return {**base_config, **custom_config}
    
class LN_LSTM_Cell(keras.layers.Layer):
    def __init__(self, units, activation="tanh", dropout=0, recurrent_dropout=0, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.dropout = dropout
        self.recurrent_dropout = recurrent_dropout
        self.state_size = [units, units]
        self.output_size = units
        self.LSTM_cell = keras.layers.LSTMCell(units, dropout=dropout, recurrent_dropout=recurrent_dropout, activation=None)
        self.layer_norm = keras.layers.LayerNormalization()
        self.activation = keras.activations.get(activation)
    def call(self, inputs, states):
        memory_states, carry_states = states
        outputs, new_states = self.LSTM_cell(inputs, [memory_states, carry_states])
        norm_outputs = self.activation(self.layer_norm(outputs))
        return norm_outputs, [new_states]
    def get_config(self):
        base_config = super().get_config()
        custom_config = {'units':self.units,
                         'dropout':self.dropout,
                         'recurrent_dropout':self.recurrent_dropout,
                         'activation':self.activation}
        return {**base_config, **custom_config}

In [4]:
def convert_to_inference_model(original_model, custom_objects=None):
    original_model_json = original_model.to_json()
    inference_model_dict = json.loads(original_model_json)

    layers = inference_model_dict['config']['layers']
    for layer in layers:
        if 'stateful' in layer['config']:
            layer['config']['stateful'] = True

        if 'batch_input_shape' in layer['config']:
            layer['config']['batch_input_shape'][0] = 1
            layer['config']['batch_input_shape'][1] = None

    inference_model = keras.models.model_from_json(json.dumps(inference_model_dict), custom_objects = custom_objects)
    inference_model.set_weights(original_model.get_weights())

    return inference_model

# Download required files

In [5]:
if model_directory not in listdir():
    mkdir(model_directory)

if model_filename not in listdir(model_directory):
    wd = getcwd()
    chdir(model_directory)
    r = requests.get(model_url)
    with open(model_filename, 'wb') as file:
        file.write(r.content)
    chdir(wd)

if tokenizer_filename not in listdir(model_directory):
    wd = getcwd()
    chdir(model_directory)
    r = requests.get(tokenizer_url)
    with open(tokenizer_filename, 'wb') as file:
        file.write(r.content)
    chdir(wd)

# Load model and tokenizer

In [6]:
model = tf.keras.models.load_model(join(getcwd(), model_directory, model_filename),
                                   custom_objects = {'LN_GRU_Cell':LN_GRU_Cell, 'LN_LSTM_Cell':LN_LSTM_Cell})
model.summary()

Model: "Stateful_GRU_LN_encoding_dense"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
TD_Encoding (TimeDistributed (None, None, 116)         0         
_________________________________________________________________
Recurrent_0 (RNN)            (None, None, 580)         1215680   
_________________________________________________________________
Recurrent_1 (RNN)            (None, None, 580)         2023040   
_________________________________________________________________
Recurrent_2 (RNN)            (None, None, 580)         2023040   
_________________________________________________________________
Dense (Dense)                (None, None, 116)         67396     
Total params: 5,329,156
Trainable params: 5,329,156
Non-trainable params: 0
_________________________________________________________________


In [7]:
with open(join(getcwd(), model_directory, tokenizer_filename), 'rb') as file:
    tokenizer = pickle.load(file)

# Generate new text

In [None]:
#inference_model = convert_to_inference_model(model, custom_objects = {'LN_GRU_Cell':LN_GRU_Cell, 'LN_LSTM_Cell':LN_LSTM_Cell})
inference_model = model


text = seed
sequence = tokenizer.texts_to_sequences([text])
dict_size = len(tokenizer.word_index) + 1

for i in range(length_generated_text):

    if i % 1000 == 0:
        print('Progress -- characters generated: {}/{}'.format(i, length_generated_text))
    elif i == length_generated_text-1:
        print('Progress -- characters generated: {}/{}'.format(length_generated_text, length_generated_text))

    probs = inference_model.predict(np.array(sequence[0][-1]).reshape(1,-1,1))
    token = np.random.choice(np.arange(dict_size), p = probs[0,-1,:])
    sequence[0].append(token)

text = tokenizer.sequences_to_texts(sequence)[0]    
print(text[0:np.min([2000, length_generated_text]):2])

if generated_text_directory not in listdir():
    mkdir(generated_text_directory)

with open(join(getcwd(), generated_text_directory, generated_text_filename + '.txt'), 'w', encoding='utf-8') as file:
    file.write(text[0::2])

if 'colab' in str(get_ipython()): files.download(join(getcwd(), generated_text_directory, generated_text_filename))

Progress -- characters generated: 0/1000
