# RNN with tensorflow2.0

In [1]:
import tensorflow as tf
import numpy as np

In [2]:
# Be sure to used Tensorflow 2.0

In [3]:
assert hasattr(tf, "function") # Be sure to use tensorflow 2.0

In [4]:
# Open and process dataset

In [5]:
# You can used your own dataset with english text

with open("corona.txt", "r") as f:
    text = f.read()

print(len(text))

print(text[:100])

27962
CÃ©cile Ã©tait en blanc, comme aux tableaux illustres
OÃ¹ la Sainte se voit, un nimbe autour du chef


In [6]:
from unidecode import unidecode

text = unidecode(text)
text = text.lower()

text = text.replace("2", "")
text = text.replace("1", "")
text = text.replace("8", "")
text = text.replace("5", "")
text = text.replace(">", "")
text = text.replace("<", "")
text = text.replace("!", "")
text = text.replace("?", "")
text = text.replace("-", "")
text = text.replace("$", "")
text = text.replace(";", "")

text = text.strip()

vocab = set(text)
print(len(vocab), vocab)

print(text[:100])

41 {'/', ',', '3', 'e', '9', 'x', ':', 'h', 'l', 'b', '.', 'p', '\n', 'n', 'z', '"', 'g', 'v', 'c', 't', ')', '|', 'j', 'q', 'i', 'm', '(', 'k', '0', 's', 'o', 'a', ' ', 'f', '%', "'", 'w', 'd', 'r', 'y', 'u'}
ca(c)cile a(c)tait en blanc, comme aux tableaux illustres
oa la sainte se voit, un nimbe autour du c


In [7]:
# Map each letter to int

In [8]:
vocab_size = len(vocab)

vocab_to_int = {l:i for i,l in enumerate(vocab)}
int_to_vocab = {i:l for i,l in enumerate(vocab)}

print("vocab_to_int", vocab_to_int)
print()
print("int_to_vocab", int_to_vocab)

print("\nint for e:", vocab_to_int["e"])
int_for_e = vocab_to_int["e"]
print("letter for %s: %s" % (vocab_to_int["e"], int_to_vocab[int_for_e]))

vocab_to_int {'/': 0, ',': 1, '3': 2, 'e': 3, '9': 4, 'x': 5, ':': 6, 'h': 7, 'l': 8, 'b': 9, '.': 10, 'p': 11, '\n': 12, 'n': 13, 'z': 14, '"': 15, 'g': 16, 'v': 17, 'c': 18, 't': 19, ')': 20, '|': 21, 'j': 22, 'q': 23, 'i': 24, 'm': 25, '(': 26, 'k': 27, '0': 28, 's': 29, 'o': 30, 'a': 31, ' ': 32, 'f': 33, '%': 34, "'": 35, 'w': 36, 'd': 37, 'r': 38, 'y': 39, 'u': 40}

int_to_vocab {0: '/', 1: ',', 2: '3', 3: 'e', 4: '9', 5: 'x', 6: ':', 7: 'h', 8: 'l', 9: 'b', 10: '.', 11: 'p', 12: '\n', 13: 'n', 14: 'z', 15: '"', 16: 'g', 17: 'v', 18: 'c', 19: 't', 20: ')', 21: '|', 22: 'j', 23: 'q', 24: 'i', 25: 'm', 26: '(', 27: 'k', 28: '0', 29: 's', 30: 'o', 31: 'a', 32: ' ', 33: 'f', 34: '%', 35: "'", 36: 'w', 37: 'd', 38: 'r', 39: 'y', 40: 'u'}

int for e: 3
letter for 3: e


In [9]:
encoded = [vocab_to_int[l] for l in text]
encoded_sentence = encoded[:100]

print(encoded_sentence)

[18, 31, 26, 18, 20, 18, 24, 8, 3, 32, 31, 26, 18, 20, 19, 31, 24, 19, 32, 3, 13, 32, 9, 8, 31, 13, 18, 1, 32, 18, 30, 25, 25, 3, 32, 31, 40, 5, 32, 19, 31, 9, 8, 3, 31, 40, 5, 32, 24, 8, 8, 40, 29, 19, 38, 3, 29, 12, 30, 31, 32, 8, 31, 32, 29, 31, 24, 13, 19, 3, 32, 29, 3, 32, 17, 30, 24, 19, 1, 32, 40, 13, 32, 13, 24, 25, 9, 3, 32, 31, 40, 19, 30, 40, 38, 32, 37, 40, 32, 18]


In [10]:
decoded_sentence = [int_to_vocab[i] for i in encoded_sentence]
print(decoded_sentence)

['c', 'a', '(', 'c', ')', 'c', 'i', 'l', 'e', ' ', 'a', '(', 'c', ')', 't', 'a', 'i', 't', ' ', 'e', 'n', ' ', 'b', 'l', 'a', 'n', 'c', ',', ' ', 'c', 'o', 'm', 'm', 'e', ' ', 'a', 'u', 'x', ' ', 't', 'a', 'b', 'l', 'e', 'a', 'u', 'x', ' ', 'i', 'l', 'l', 'u', 's', 't', 'r', 'e', 's', '\n', 'o', 'a', ' ', 'l', 'a', ' ', 's', 'a', 'i', 'n', 't', 'e', ' ', 's', 'e', ' ', 'v', 'o', 'i', 't', ',', ' ', 'u', 'n', ' ', 'n', 'i', 'm', 'b', 'e', ' ', 'a', 'u', 't', 'o', 'u', 'r', ' ', 'd', 'u', ' ', 'c']


In [11]:
decoded_sentence = "".join(decoded_sentence)
print(decoded_sentence)

ca(c)cile a(c)tait en blanc, comme aux tableaux illustres
oa la sainte se voit, un nimbe autour du c


### Sample of one batch

In [12]:
inputs, targets = encoded, encoded[1:]

print("Inputs", inputs[:10])
print("Targets", targets[:10])

Inputs [18, 31, 26, 18, 20, 18, 24, 8, 3, 32]
Targets [31, 26, 18, 20, 18, 24, 8, 3, 32, 31]


In [13]:
# Method used to generate batch in sequence order

In [14]:
def gen_batch(inputs, targets, seq_len, batch_size, noise=0):
    # Size of each chunk
    chuck_size = (len(inputs) -1)  // batch_size
    # Numbef of sequence per chunk
    sequences_per_chunk = chuck_size // seq_len

    for s in range(0, sequences_per_chunk):
        batch_inputs = np.zeros((batch_size, seq_len))
        batch_targets = np.zeros((batch_size, seq_len))
        for b in range(0, batch_size):
            fr = (b*chuck_size)+(s*seq_len)
            to = fr+seq_len
            batch_inputs[b] = inputs[fr:to]
            batch_targets[b] = inputs[fr+1:to+1]
            
            if noise > 0:
                noise_indices = np.random.choice(seq_len, noise)
                batch_inputs[b][noise_indices] = np.random.randint(0, vocab_size)
            
        yield batch_inputs, batch_targets

for batch_inputs, batch_targets in gen_batch(inputs, targets, 5, 32, noise=0):
    print(batch_inputs[0], batch_targets[0])
    break

for batch_inputs, batch_targets in gen_batch(inputs, targets, 5, 32, noise=3):
    print(batch_inputs[0], batch_targets[0])
    break

[18. 31. 26. 18. 20.] [31. 26. 18. 20. 18.]
[18. 31.  9.  9.  9.] [31. 26. 18. 20. 18.]


In [15]:
# Create your own layer

In [16]:
class OneHot(tf.keras.layers.Layer):
    def __init__(self, depth, **kwargs):
        super(OneHot, self).__init__(**kwargs)
        self.depth = depth

    def call(self, x, mask=None):
        return tf.one_hot(tf.cast(x, tf.int32), self.depth)

In [17]:
class RnnModel(tf.keras.Model):

    def __init__(self, vocab_size):
        super(RnnModel, self).__init__()
        # Convolutions
        self.one_hot = OneHot(len(vocab))

    def call(self, inputs):
        output = self.one_hot(inputs)
        return output

batch_inputs, batch_targets = next(gen_batch(inputs, targets, 50, 32))

print(batch_inputs.shape)

model = RnnModel(len(vocab))
output = model.predict(batch_inputs)

print(output.shape)

#print(output)

print("Input letter is:", batch_inputs[0][0])
print("One hot representation of the letter", output[0][0])

#assert(output[int(batch_inputs[0][0])]==1)

(32, 50)
(32, 50, 41)
Input letter is: 18.0
One hot representation of the letter [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]


# Set up the model

In [18]:
vocab_size = len(vocab)

### Creat the layers

# Set the input of the model
tf_inputs = tf.keras.Input(shape=(None,), batch_size=64)
# Convert each value of the  input into a one encoding vector
one_hot = OneHot(len(vocab))(tf_inputs)
# Stack LSTM cells
rnn_layer1 = tf.keras.layers.LSTM(128, return_sequences=True, stateful=True)(one_hot)
rnn_layer2 = tf.keras.layers.LSTM(128, return_sequences=True, stateful=True)(rnn_layer1)
# Create the outputs of the model
hidden_layer = tf.keras.layers.Dense(128, activation="relu")(rnn_layer2)
outputs = tf.keras.layers.Dense(vocab_size, activation="softmax")(hidden_layer)

### Setup the model
model = tf.keras.Model(inputs=tf_inputs, outputs=outputs)



# Check if we can reset the RNN cells

In [19]:
model.reset_states()

# Get one batch
batch_inputs, batch_targets = next(gen_batch(inputs, targets, 50, 64))

# Make a first prediction
outputs = model.predict(batch_inputs)
first_prediction = outputs[0][0]

# Reset the states of the RNN states
model.reset_states()

# Make an other prediction to check the difference
outputs = model.predict(batch_inputs)
second_prediction = outputs[0][0]

# Check if both prediction are equal
assert(set(first_prediction)==set(second_prediction))

# Set the loss and objectives

In [20]:
loss_object = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam(lr=0.001)

# Set some metrics to track the progress of the training

In [21]:
# Loss
train_loss = tf.keras.metrics.Mean(name='train_loss')
# Accuracy
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

# Set the train method and the predict method in graph mode

In [22]:
@tf.function
def train_step(inputs, targets):
    with tf.GradientTape() as tape:
        # Make a prediction on all the batch
        predictions = model(inputs)
        # Get the error/loss on these predictions
        loss = loss_object(targets, predictions)
    # Compute the gradient which respect to the loss
    gradients = tape.gradient(loss, model.trainable_variables)
    # Change the weights of the model
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    # The metrics are accumulate over time. You don't need to average it yourself.
    train_loss(loss)
    train_accuracy(targets, predictions)

@tf.function
def predict(inputs):
    # Make a prediction on all the batch
    predictions = model(inputs)
    return predictions

In [23]:
# Train the model

In [None]:
model.reset_states()

for epoch in range(4000):
    for batch_inputs, batch_targets in gen_batch(inputs, targets, 100, 64, noise=13):
        train_step(batch_inputs, batch_targets)
    template = '\r Epoch {}, Train Loss: {}, Train Accuracy: {}'
    print(template.format(epoch, train_loss.result(), train_accuracy.result()*100), end="")
    model.reset_states()



To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where
 Epoch 239, Train Loss: 2.201303720474243, Train Accuracy: 34.5818023681640646

In [None]:
# Save the model

In [None]:
# import json
# model.save("model_rnn.h5")

# with open("model_rnn_vocab_to_int", "w") as f:
    # f.write(json.dumps(vocab_to_int))
# with open("model_rnn_int_to_vocab", "w") as f:
    #f.write(json.dumps(int_to_vocab))

In [None]:
# Generate some text

In [None]:
import random

model.reset_states()

size_poetries = 300

poetries = np.zeros((64, size_poetries, 1))
sequences = np.zeros((64, 100))
for b in range(64):
    rd = np.random.randint(0, len(inputs) - 100)
    sequences[b] = inputs[rd:rd+100]

for i in range(size_poetries+1):
    if i > 0:
        poetries[:,i-1,:] = sequences
    softmax = predict(sequences)
    # Set the next sequences
    sequences = np.zeros((64, 1))
    for b in range(64):
        argsort = np.argsort(softmax[b][0])
        argsort = argsort[::-1]
        # Select one of the strongest 4 proposals
        sequences[b] = argsort[0]

for b in range(64):
    sentence = "".join([int_to_vocab[i[0]] for i in poetries[b]])
    print(sentence)
    print("\n=====================\n")