# Exercise 10: RNNs & More on Training Neural Networks with Keras

In [None]:
# Load packages we need
import sys
import os

import datetime

import numpy as np
import sklearn

import scipy as sp
import pandas as pd

import tensorflow as tf

# we'll use keras for neural networks
import tensorflow.keras as keras
from tensorflow.keras.datasets import fashion_mnist

# import layers we will use
from tensorflow.keras.layers import Input, Flatten, Dense, Dropout, SimpleRNN, GRU

# import callbacks we will use
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, TensorBoard

# Load the TensorBoard notebook extension
%load_ext tensorboard

%matplotlib inline
from matplotlib import pyplot as plt
plt.rcParams.update({'font.size': 18})

# Let's check our software versions
print('### Python version: ' + sys.version)
print('### Numpy version: ' + np.__version__)
print('### Scikit-learn version: ' + sklearn.__version__)
print('### Tensorflow version: ' + tf.__version__)
print('------------')


# load our packages / code
sys.path.insert(1, '../common/')
import utils
import plots

In [None]:
# global parameters to control behavior of the pre-processing, ML, analysis, etc.

seed = 42 # deterministic seed
np.random.seed(seed) 
tf.random.set_seed(seed)

prop_vec = [24, 2, 2]

## Let's use Fashion MNIST

In [None]:
def load_preprocess_fashion_mnist(minmax_normalize=True):
    
    labels = ['top', 'trouser', 'pullover', 'dress', 'coat', 'sandal', 'shirt', 'sneaker', 'bag', 'ankle boot']
    train, testval = fashion_mnist.load_data()
    
    train_x, train_y = train
    testval_x, testval_y = testval
    
    if minmax_normalize:
        train_x = train_x / 255.0
        testval_x = testval_x / 255.0
    
    # split test - val
    nval = testval_x.shape[0] // 2
    
    val_x = testval_x[:nval]
    val_y = testval_y[:nval]
    
    test_x = testval_x[nval:]
    test_y = testval_y[nval:]
    
    return train_x, train_y, test_x, test_y, val_x, val_y, np.array(labels)

In [None]:
train_x, train_y, test_x, test_y, val_x, val_y, labels = load_preprocess_fashion_mnist()

### What does the data look like?

In [None]:
num_images = 25
label_idx = train_y[:num_images].astype(int)
titles = labels[label_idx]
plots.plot_images(train_x[:num_images].reshape(-1, 28, 28), dim_x=28, dim_y=28, fig_size=(9,9), titles=titles)

In [None]:
# subclass for l4 regularization
class L4Regularizer(keras.regularizers.Regularizer):
    def __init__(self, lmbda):
        self.lmbda = lmbda

    # for regularizers, constraints, etc. need to define __call__()
    def __call__(self, weights):
        return self.lmbda * tf.reduce_sum(tf.pow(weights, 4.0))
    
    # this for config so we can save/load
    def get_config(self):
        return {'lmbda': self.lmbda}
    
    
# force the weights to be binary (+1 or -1)
def binary_weights_constraint(weights):
    return tf.where(weights >= 0.0, tf.ones_like(weights), -tf.ones_like(weights))

In [None]:
def create_model_functional(input_shape=(28, 28)):  
    
    # let's use the functional API to create a model
    input_layer = Input(shape=input_shape, name='Input')
    
    flatten_layer = Flatten(name='Flatten')(input_layer)
    fc1 = Dense(300, name='FC1', activation='relu', kernel_regularizer=L4Regularizer(0.01), kernel_constraint=binary_weights_constraint)(flatten_layer)
    fc2 = Dense(100, name='FC2', activation='relu', kernel_regularizer=L4Regularizer(0.01), kernel_constraint=binary_weights_constraint)(fc1)
    output_layer = Dense(10, name='Output', activation='softmax')(fc2)
    
    model = keras.Model(name='FC-model', inputs=[input_layer], outputs=[output_layer])
    
    return model

In [None]:
model = create_model_functional()

In [None]:
# what does the model look like?
model.summary()

## Let's compile and train the model

In [None]:
# compile the model
model.compile(loss='sparse_categorical_crossentropy', optimizer='Adam', metrics=['accuracy'])

In [None]:
max_epochs = 30
batch_size = 64
history = model.fit(train_x, train_y, validation_data=(val_x, val_y), epochs=max_epochs, batch_size=batch_size)

In [None]:
# how good is our model?
loss, acc = model.evaluate(test_x, test_y)

In [None]:
# check the weights
weights, biases = model.get_layer('FC1').get_weights()

In [None]:
weights, weights.shape

In [None]:
biases, biases.shape

### Can we use a RNN to predict stock prices?
#### Note: this data is synthetic.

In [None]:
fp = '../data/stock-data.csv'
stock_data = np.loadtxt(fp, delimiter=',')

In [None]:
# split into features and target
all_x = stock_data[:,:-1]
all_y = stock_data[:,-1]

In [None]:
train_x, train_y, test_x, test_y, val_x, val_y = utils.train_test_val_split(all_x, all_y, prop_vec, shuffle=True)

### What does the data look like?

In [None]:
fig = plt.figure(figsize=(12,7))
plt.plot(np.arange(0, train_x.shape[1]), train_x[0])
plt.xlabel('Time (day)')
plt.ylabel('Price (USD)')
plt.show()

### Let's reshape the data if necessary

In [None]:
if len(train_x.shape) < 3:
    train_x = train_x[:,:,np.newaxis]
    val_x = val_x[:,:,np.newaxis]
    test_x = test_x[:,:,np.newaxis]

In [None]:
def create_compile_rnn(input_shape=(None,1), verbose=True):
    name = 'Simple-RNN'

    model = keras.models.Sequential(name=name)

    model.add(keras.Input(shape=input_shape, name='input')) 
    
    model.add(SimpleRNN(32, return_sequences=True, name='rnn1'))
    model.add(SimpleRNN(32, name='rnn2'))
    
    # output
    model.add(Dense(1, activation='linear', name='output'))
    
    if verbose:
        model.summary()
        
    opt = keras.optimizers.Adam(learning_rate=0.001)
    model.compile(loss='mse', optimizer=opt, metrics=['mae'])
    
    return model

In [None]:
model = create_compile_rnn()

In [None]:
num_epochs = 30
batch_size = 100

hist = model.fit(train_x, train_y, validation_data=(val_x, val_y), epochs=num_epochs, batch_size=batch_size, callbacks=[])

In [None]:
val_preds = model.predict(val_x, verbose=0).ravel()

In [None]:
fig = plt.figure(figsize=(11,11))
plt.scatter(val_y, val_preds)
plt.xlabel('True Price (USD)')
plt.ylabel('Predicted Price (USD)')
plt.show()

## Training a Character-level RNN

### For this we'll use the text of Wizard of Oz books

In [None]:
def split_data_seq_target(seq_array, window_size, slide=1):
    seq_length = seq_array.shape[0]
    num_examples_slide1 = seq_length - window_size
    x = np.zeros((num_examples_slide1, window_size), dtype=np.uint8)
    y = np.zeros((num_examples_slide1,1), dtype=np.uint8)
    idx = 0
    for i in range(0, num_examples_slide1, slide):
        x[idx,:] = seq_array[i:i+window_size]
        y[idx] = seq_array[i+window_size]
        idx += 1

    return x[:idx], y[:idx]

def to_array(tokenizer, input_string_array, verbose=0):
    # encode as an sequence (array) of integers
    seq_list = tokenizer.texts_to_sequences(input_string_array)
    # remap to 0 to max_id -1
    encoded_array = np.array(seq_list[0], dtype=np.uint8) - 1 # subtract 1 because indices start at 1
    if verbose:
        print(encoded_array, encoded_array.shape, np.amin(encoded_array), np.amax(encoded_array))
    return encoded_array

def to_str(tokenizer, array):
     return tokenizer.sequences_to_texts(array + 1) # add 1 because indices start at 1

def load_preprocess_data(fp = '../data/oz-data.txt', window_size=150, verbose=0):
    with open(fp) as f:
        input_text = f.read()

    tokenizer = keras.preprocessing.text.Tokenizer(char_level=True, lower=False)
    tokenizer.fit_on_texts(input_text)

    num_classes = len(tokenizer.word_index)
    
    # encode as an sequence (array) of integers
    seq_array = to_array(tokenizer, [input_text], verbose)
    
    # split into windows
    x, y = split_data_seq_target(seq_array, window_size, slide=1)
    
    return x, y, int(num_classes), tokenizer

### We want to split this data into train, val, test 

### What could go wrong if we split randomly (e.g., shuffle x & y, then split)?

In [None]:
def train_test_split_seq(x, y, prop_vec=prop_vec, verbose=0):
    # instead we take the data in order
    n_tr = int(prop_vec[0] / np.sum(prop_vec) * x.shape[0])
    n_val = int(prop_vec[1] / np.sum(prop_vec) * x.shape[0])
    train_x = x[:n_tr]
    train_y = y[:n_tr]
    val_x = x[n_tr:n_tr+n_val]
    val_y = y[n_tr:n_tr+n_val]
    test_x = x[n_tr+n_val:]
    test_y = y[n_tr+n_val:]

    if verbose:
        print(train_x.shape, train_y.shape, val_x.shape, val_y.shape, test_x.shape, test_y.shape)
        
    return train_x, train_y, val_x, val_y, test_x, test_y

In [None]:
x, y, num_classes, tokenizer = load_preprocess_data()
train_x, train_y, val_x, val_y, test_x, test_y = train_test_split_seq(x, y)

In [None]:
print(train_x.shape, train_y.shape)

### We need to one-hot encode the data

In [None]:
def make_ds_and_onehot(x, y, num_classes, batch_size=100, prefetch_size=10):
    ds = tf.data.Dataset.from_tensor_slices(np.c_[x, y])
    ds = ds.map(lambda batch_xy: (batch_xy[:-1], batch_xy[-1]))
    ds = ds.map(lambda batch_x, batch_y: (tf.one_hot(batch_x, depth=num_classes), batch_y))   
    
    # shuffle, batch, and prefetch
    ds = ds.shuffle(4096).batch(batch_size)
    ds = ds.prefetch(prefetch_size)
    return ds

In [None]:
ds_train = make_ds_and_onehot(train_x, train_y, num_classes)
ds_test = make_ds_and_onehot(test_x, test_y, num_classes)
ds_val = make_ds_and_onehot(val_x, val_y, num_classes)

In [None]:
ds_train

In [None]:
for x, y in ds_train.take(2):
    print(x, y)

In [None]:
print(ds_train)

### Let's create a model

In [None]:
def create_compile_rnn(input_shape=(None, num_classes), dropout_rate=0.175, verbose=True):
    name = 'CharLevel-RNN'

    model = keras.models.Sequential(name=name)

    model.add(keras.Input(shape=input_shape, sparse=False, name='input')) 
    
    model.add(GRU(192, return_sequences=True, dropout=dropout_rate, recurrent_dropout=0.0, name='gru1'))
    model.add(GRU(128, recurrent_dropout=0.0, name='gru2'))
    
    # output
    model.add(Dense(num_classes, activation='softmax', name='output'))
    
    if verbose:
        model.summary()
        
    opt = keras.optimizers.Adam(learning_rate=0.001)
    model.compile(loss='sparse_categorical_crossentropy', optimizer=opt, metrics=['accuracy'])
    
    return model

In [None]:
model_fp = './charlevel-rnn.h5'

train = False
#train = True

if train:
    model = create_compile_rnn()
    
    num_epochs = 20
    history = model.fit(ds_train, validation_data=ds_val, epochs=num_epochs, callbacks=[])
    
    model.save(model_fp) # save the model
else:
    assert os.path.exists(model_fp), 'Train the model first!'
    
    model = keras.models.load_model(model_fp)

In [None]:
def create_prompt(prompt):
    prompt_array = to_array(tokenizer, prompt).reshape(len(prompt), -1)
    return tf.one_hot(prompt_array, depth=num_classes)

In [None]:
prompt = create_prompt(['Doroth'])
prompt_pred = np.argmax(model.predict(prompt), axis=-1)

In [None]:
to_str(tokenizer, prompt_pred.reshape(-1, 1))

### Let's generate some text

In [None]:
def sample_from_model(model, prompt_str, out_len=50, temp=1.0):
    res = ''
    for i in range(0, out_len):
        prompt = create_prompt([prompt_str + res])
        
        # get the logits and compute softmax probabilities
        prob_vec = model.predict(prompt, verbose=0).reshape(-1,)
        logits_vec = np.log(prob_vec)/temp
        sample_probas = np.exp(logits_vec)
        sample_probas = sample_probas / np.sum(sample_probas)
        
        # use numpy to sample index according to sample_probas
        choice_idx = np.random.choice(np.arange(0, sample_probas.shape[0]), size=1, p=sample_probas)
        
        chosen_char = to_str(tokenizer, np.array([choice_idx]))[0]
        res += chosen_char
    return res

In [None]:
prompt_str = 'Dorothy said'
out_str = sample_from_model(model, prompt_str, out_len=250, temp=0.7)

In [None]:
print(prompt_str + out_str)