<a href="https://colab.research.google.com/github/Black3rror/AI/blob/master/Playground/Keras_char_RNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Goal

To implement RNN models in keras. We will use these models to learn the character level sequences.

Tasks to do:
- Vanilla RNN - done
- LSTM RNN - done
- Custom vanilla RNN - done
- Custom LSTM RNN

# Importing stuff

In [None]:
import numpy as np    # tf uses np so probabily we use np in our code
import tensorflow as tf
from tensorflow import keras

from tensorflow.keras.models import Sequential
from tensorflow.keras import layers

from google.colab import drive

# Custom functions

In [None]:
"""
@param x: list of characters
@return y: array of one-hot representation
"""
def char_to_hot(x, chars):
  y = np.zeros((len(x), len(chars)))
  char_to_indx = { ch:i for i,ch in enumerate(chars) }
  for i in range(len(x)):
    y[i, char_to_indx[x[i]]] = 1
  return y

In [None]:
"""
@param x: array of one-hot representation
@return y: list of characters
"""
def hot_to_char(x, chars):
  indx_to_char = { i:ch for i,ch in enumerate(chars) }
  indxs = np.argmax(x, axis = 1)
  y = []
  for i in range(len(x)):
    y.append(indx_to_char[indxs[i]])
  return y

In [None]:
def gen_text(rnn_gen, rnn_state, first_char_hot, len):
  h = rnn_state
  new_text = [first_char_hot]

  for _ in range(len):
    x, h = rnn_gen(new_text[-1].reshape(1, 1, c), initial_state = h)
    pred_p = dense(x)

    pred = np.zeros_like(pred_p)
    for m, n in enumerate(np.argmax(pred_p, axis=1)):
      pred[m, n] = 1

    new_text = np.concatenate((new_text, pred))

  new_text = hot_to_char(new_text, chars)
  new_text_str = ""
  new_text_str = new_text_str.join(new_text)
  return new_text_str

# Custom classes

## Layers

### SimpleRNN_custom

In [None]:
class SimpleRNN_custom(layers.Layer):
  def __init__(self, units, return_state = False, return_sequences = False, 
               keep_state = False, kernel_initializer = 'glorot_uniform', 
               recurrent_initializer = 'orthogonal', **kwargs):
    super(SimpleRNN_custom, self).__init__(**kwargs)
    self.units = units
    self.return_state = return_state
    self.return_sequences = return_sequences
    self.keep_state = keep_state
    self.kernel_initializer = kernel_initializer
    self.recurrent_initializer = recurrent_initializer
    self.h = None
  
  def build(self, input_shape):
    assert len(input_shape) == 3    # ([batch_size, ]seq_len, prev_units)
    self.Wxh = self.add_weight(name='Wxh',
                               shape=(input_shape[2], self.units),
                               initializer=self.kernel_initializer,
                               trainable=True)
    self.Whh = self.add_weight(name='Whh',
                               shape=(self.units, self.units),
                               initializer=self.recurrent_initializer,
                               trainable=True)
    self.b = self.add_weight(name='b',
                             shape=(self.units, 1),
                             initializer='zeros',
                             trainable=True)
  
  def call(self, inputs, initial_state=None):
    assert inputs.ndim == 3       # (batch_size, seq_len, prev_units)
    if initial_state is not None:
      assert initial_state.shape == [inputs.shape[0], self.units]
      self.h = initial_state
    elif self.keep_state == True and self.h is not None:
      assert self.h.shape[0] == inputs.shape[0]
    else:
      self.h = tf.zeros((inputs.shape[0], self.units))
    
    hs = []
    for i in range(inputs.shape[1]):
      x = inputs[:, i, :]
      h = tf.matmul(x, self.Wxh) + tf.matmul(self.h, self.Whh) + tf.transpose(self.b)
      h = tf.tanh(h)

      hs.append(h)
      self.h = h
    hs = tf.reshape(tf.convert_to_tensor(hs), (inputs.shape[0], inputs.shape[1], -1))

    if self.return_sequences == True:
      if self.return_state == True:
        return hs, self.h
      else:
        return hs
    else:
      if self.return_state == True:
        return self.h, self.h
      else:
        return self.h

# Initialization

In [None]:
epochs = 3
seq_len = 25
h_units = 100
learning_rate = 1e-1

drive.mount('/content/drive')
text_adrs = '/content/drive/My Drive/Colab Stuff/Mini_char_RNN/William Shakespear.txt'

text = open(text_adrs, 'r').read()
text = text[:100000]
chars = sorted(list(set(text)))
c = len(chars)
print("text has %d characters, %d unique." % (len(text), c))

text = char_to_hot(text, chars)
print("text shape: ", text.shape)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
text has 100000 characters, 78 unique.
text shape:  (100000, 78)


# No model

We will give SimpleRNN layer a batch with len 1 (input shape is (1, seq_len, prev_units)) but in fact it has seq_len examples and it will change to a batch with seq_len examples before going to Dense layer.

We use rnn_gen which has input shape of (1, 1, prev_units) and is used for generating new text.

In [None]:
loss_func = keras.losses.CategoricalCrossentropy()
opt = keras.optimizers.Adagrad(learning_rate)

# layers
rnn = SimpleRNN_custom(h_units, kernel_initializer = 'random_normal', 
                       recurrent_initializer = 'random_normal', 
                       return_state = True, 
                       return_sequences = True, 
                       input_shape=(seq_len, c))

rnn_gen = SimpleRNN_custom(h_units, kernel_initializer = 'random_normal', 
                           recurrent_initializer = 'random_normal', 
                           return_state = True, 
                           input_shape=(1, c))

dense = layers.Dense(c, activation='softmax', 
                     kernel_initializer = 'random_normal')

rnn_gen(text[0].reshape(1, 1, c))   # call it to build it

smooth_loss = 0
for epoch_num in range(epochs):
  print("epoch %d started -------------" % (epoch_num))
  state = None

  for step, pointer in enumerate(range(0, len(text) - seq_len - 1, seq_len)):
    X_batch = text[pointer:pointer+seq_len].reshape(1, seq_len, c)
    y_batch = text[pointer+1:pointer+seq_len+1].reshape(1, seq_len, c)

    with tf.GradientTape() as tape:
      if state is not None:
        x, state = rnn(X_batch, initial_state = state)
      else:
        x, state = rnn(X_batch)
      x = dense(x)
      loss = loss_func(y_batch, x)

    trainable_vars = rnn.trainable_weights + dense.trainable_weights
    grads = tape.gradient(loss, trainable_vars)
    opt.apply_gradients(zip(grads, trainable_vars))

    smooth_loss = 0.999 * smooth_loss + 0.001 * loss
    if step == 0 and epoch_num == 0:
      smooth_loss = loss

    if step % 200 == 0:
      print("Step ", step, ":\t loss = ", smooth_loss.numpy())
      
      sp = int(200 * np.random.rand())
      rnn_gen.set_weights(rnn.get_weights())
      new_text_str = gen_text(rnn_gen, state, text[sp], 100)
      print("new text: ", new_text_str)
      print("\n\n")



To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

epoch 0 started -------------


To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

Step  0 :	 loss =  4.353026
new text:   e e e e e e e e e e e e e e e e e e e e e e e e e e e e e e e e e e e e e e e e e e e e e e e e e e 



Step  200 :	 loss =  4.116248
new text:  i                                                                                                    



Step  400 :	 loss =  3.8870087
new text:                                      

KeyboardInterrupt: ignored