In [1]:
import keras
import tensorflow as tf

## Simple RNN

**Simple RNN Memory Cell**

We can do computations in one shot for the whole layer:
$$
\hat{Y}_t = \phi(X_t W_x + \hat{Y}_{t-1} W_{\hat{y}} + b)
$$

Where:

- $\hat{Y}_t$ is an $m \times n_{\text{neurons}}$ matrix containing the layer’s outputs at time step $t$ for each instance in the mini-batch ($m$ is the number of instances in the mini-batch and $n_{\text{neurons}}$ is the number of neurons).  
- $X_t$ is an $m \times n_{\text{inputs}}$ matrix containing the inputs for all instances ($n_{\text{inputs}}$ is the number of input features).  
- $W_x$ is an $n_{\text{inputs}} \times n_{\text{neurons}}$ matrix containing the connection weights for the inputs of the current time step.  
- $W_{\hat{y}}$ is an $n_{\text{neurons}} \times n_{\text{neurons}}$ matrix containing the connection weights for the outputs of the previous time step.  
- $b$ is a vector of size $n_{\text{neurons}}$ containing each neuron’s bias term.  

In [2]:
@tf.function
def simple_rnn_cell(x_t, h_t_1, w_x, w_h, b):
    return tf.tanh(x_t @ w_x + h_t_1 @ w_h + b) # tensor in, tensor out

In [3]:
batch_size = 32
units = 8
n_features = 12
timesteps = 16

kernel_init = keras.initializers.glorot_uniform(seed=42) # best for tanh activation
recurrent_init = keras.initializers.orthogonal(seed=42) # best for recurrent weights

w_x = kernel_init(shape=(n_features, units))
w_h = recurrent_init(shape=(units, units))
b = tf.zeros(shape=(units)) # 1 bias per neuron
h_init = tf.zeros(shape=(batch_size,units)) # if this was stateful rnn, we fed last hidden state of sequence before instead

In [4]:
tf.random.set_seed(42)
input_sequence = tf.random.normal(shape=(batch_size,timesteps,n_features)) # typical 3D rnn input

In [5]:
x_t = input_sequence[:,0,:] # each timestep x(t) shape: (batch_size, n_features)
h_t = simple_rnn_cell(x_t, h_init, w_x, w_h, b) # returns one hidden state per sample-neuron: (batch_size, units)
y_t = h_t # in simple rnn, timestep t's output is equal to its hidden state
y_t.shape

TensorShape([32, 8])

In [6]:
h_t_1 = h_init
hiddens_states = []

for t in range(timesteps):
    x_t = input_sequence[:,t,:] 
    h_t = simple_rnn_cell(x_t, h_t_1, w_x, w_h, b)
    hiddens_states.append(h_t)
    h_t_1 = h_t

outputs = tf.stack(hiddens_states, axis=1) # stacked on axis=1, timesteps
outputs.shape # returns (batch_size, timesteps, units)

TensorShape([32, 16, 8])

In [7]:
# lets wrap eveything in a function

def simple_rnn(input_sequence, units, return_sequences=True, seed=42, h_init=None):
    assert input_sequence.ndim == 3, "RNN expects 3D input"
    batch_size, timesteps, n_features = input_sequence.shape

    kernel_init = keras.initializers.glorot_uniform(seed=seed)
    recurrent_init = keras.initializers.orthogonal(seed=seed)
    w_x = kernel_init(shape=(n_features, units))
    w_h = recurrent_init(shape=(units, units))
    b = tf.zeros(shape=(units)) 
    h_init = tf.zeros(shape=(batch_size,units)) if h_init is None else h_init

    h_t_1 = h_init
    hiddens_states = []

    for t in range(timesteps):
        x_t = input_sequence[:,t,:] 
        h_t = simple_rnn_cell(x_t, h_t_1, w_x, w_h, b)
        hiddens_states.append(h_t)
        h_t_1 = h_t
    
    outputs = tf.stack(hiddens_states, axis=1)
    
    return outputs if return_sequences else outputs[:,-1,:] # or else return last timestep output

In [8]:
my_simple_rnn_outputs = simple_rnn(input_sequence, units, return_sequences=True, seed=42)

keras_simple_rnn = keras.layers.SimpleRNN(units,
                                          return_sequences=True, 
                                          kernel_initializer=kernel_init, 
                                          recurrent_initializer=recurrent_init)

keras_simple_rnn_outputs = keras_simple_rnn(input_sequence)

print(f"Are my results same as Keras layer? {(my_simple_rnn_outputs == keras_simple_rnn_outputs).numpy().all()}")

Are my results same as Keras layer? True


Yay!

Some Notes:
- Although those parameters need to be set up somehow so the optimizer can tweak them, I just implemented the forward pass to get the intuition.
- A single Simple RNN cell outputs a tensor of shape (batch_size, features) at each time step, where the dimensionality of features equals the number of units. So essentially, at each time step, the hidden state of a single unit is a scalar—meaning all the encoded information up to that point is compressed into a single number! Given the kinds of tasks RNNs can handle (speech recognition, image captioning, language modeling), this is quite fascinating!
- If the gradients adjust the weights in a way that slightly increases them, it can lead to trouble—because all time steps share the same weight matrix, those small changes can accumulate and eventually cause the hidden states to explode. That’s one reason we use tanh, a saturated (bounded) activation function, instead of ReLU, which is unbounded.

## LSTM

### Implementing LSTM Memory Cell: Step by Step in TensorFlow Eager Mode

According to Wikipedia:

- $f_t = \sigma(W_f x_t + U_f h_{t-1} + b_f)$  
- $i_t = \sigma(W_i x_t + U_i h_{t-1} + b_i)$  
- $\tilde{c}_t = \tanh(W_c x_t + U_c h_{t-1} + b_c)$  
- $o_t = \sigma(W_o x_t + U_o h_{t-1} + b_o)$ 
- $c_t = f_t \odot c_{t-1} + i_t \odot \tilde{c}_t$
- $h_t = o_t \odot \tanh(c_t)$ 


Initial values are $c_0 = 0$ 
and $h_0 = 0$, and the operator $\odot$ denotes the Hadamard product (element-wise multiplication); the subscript $t$ indexes the time step.

Letting the superscripts $d$ and $h$ refer to the number of input features and number of hidden units, respectively:

- $x_t \in \mathbb{R}^d$: input vector to the LSTM unit  
- $f_t \in (0,1)^h$: forget gate's activation vector  
- $i_t \in (0,1)^h$: input/update gate's activation vector  
- $o_t \in (0,1)^h$: output gate's activation vector  
- $h_t \in (-1,1)^h$: hidden state vector (also output of the LSTM unit)  
- $\tilde{c}_t \in (-1,1)^h$: cell input activation vector  
- $c_t \in \mathbb{R}^h$: cell state vector  
- $W \in \mathbb{R}^{h \times d}$, $U \in \mathbb{R}^{h \times h}$, $b \in \mathbb{R}^h$: weight matrices and bias vector  
- $\sigma_g$: sigmoid activation function  
- $\sigma_c$: hyperbolic tangent activation function

these equations are for single sample, i will implement batched operations, so X_t is shaped: (batch_size, n_features)

In [9]:
batch_size = 32
timesteps = 16
n_features = 12
units = 8

tf.random.set_seed(42)
X = tf.random.normal(shape=(batch_size, timesteps, n_features)) # X batch: (batch_size, timesteps, n_features)
x_t = X[:,0,:]
x_t.shape # x_t shape

TensorShape([32, 12])

In [10]:
# for now i have to initialize them seprately, but later on i will concatanate them and initialzie each of W, U once
w_f = keras.initializers.GlorotUniform(seed=43)(shape=(units, n_features))
w_i = keras.initializers.GlorotUniform(seed=44)(shape=(units, n_features))
w_o = keras.initializers.GlorotUniform(seed=45)(shape=(units, n_features))
w_c = keras.initializers.GlorotUniform(seed=46)(shape=(units, n_features))

u_f = keras.initializers.Orthogonal(seed=53)(shape=(units, units))
u_i = keras.initializers.Orthogonal(seed=54)(shape=(units, units))
u_o = keras.initializers.Orthogonal(seed=55)(shape=(units, units))
u_c = keras.initializers.Orthogonal(seed=56)(shape=(units, units))

h_init = tf.zeros(shape=(batch_size, units))
c_init = tf.zeros(shape=(batch_size, units))

b_f = tf.ones(shape=(units)) # tensorflow init forget bias with ones, this prevents forgetting everything at the beginning of training
b_i = tf.zeros(shape=(units))
b_o = tf.zeros(shape=(units))
b_c = tf.zeros(shape=(units))

In [11]:
# x_t is (batch_size, n_features) and w_f is (units, n_features) -> matmul -> (batch_size, units)
# h_t_1 is (batch_size, units) and u_f is (units, units) -> matmul -> (batch_size, units)
# b is (units) -> broadcast and add

h_t_1 = h_init

f_t = tf.sigmoid(x_t @ tf.transpose(w_f)  + h_t_1 @ u_f + b_f) 
f_t.shape

TensorShape([32, 8])

Alright, 
𝑓
(
𝑡
)
 outputs 
(
batch_size
,
 units
)
. So for each sample and each unit, at each timestep, the forget activation is a number between 0 and 1 that determines how much of the memory cell state should be forgotten.

In [12]:
# same thing here

i_t = tf.sigmoid(x_t @ tf.transpose(w_i)  + h_t_1 @ u_i + b_i) 
o_t = tf.sigmoid(x_t @ tf.transpose(w_o)  + h_t_1 @ u_o + b_o) 

print(f"Shapes of Input activation: {i_t.shape} and Output activation: {o_t.shape}")

Shapes of Input activation: (32, 8) and Output activation: (32, 8)


In [13]:
# now c candiate. linear transformation + tanh, some of it will be added to long term memory
c_candidate_t = tf.tanh(x_t @ tf.transpose(w_c)  + h_t_1 @ u_c + b_c) 
c_candidate_t.shape

TensorShape([32, 8])

In [14]:
# now, forget and input gates: determine how much of c(t-1) should be forgotten and how much of the candidate cell state (c_candidate_t) should be added to it

# forget gate: f_t (batch_size, units) * c_t_1 (batch_size, units)
# input gate: i_t (batch_size, units) * c_candidate_t (batch_size, units)

c_t_1 = c_init # initialize cell state with zeros

c_t = f_t * c_t_1 + i_t * c_candidate_t

c_t.shape

TensorShape([32, 8])

In [15]:
# hiddens state h(t) (same as cell's output y(t)): for time step t is some of activated cell state

h_t = y_t = o_t * tf.tanh( c_t )

h_t.shape

TensorShape([32, 8])

That's it!  
This was for a single memory cell, so now let's write a function to handle all timesteps. We'll also concatenate the weight matrices and biases, then perform the matrix multiplications in one shot to take advantage of vectorization for a speed boost.

### Vectorizerd Implementation of LSTM Layer in TF Graph Mode

In [16]:
# stacking all w matrices vertically, along first dimension -> W: (4 * units, n_features)
# stacking all u matrices vertically, along first dimension -> U: (4 * units, units)
# x_t: (batch_size, n_features)
# h_t_1: (batch_size, units)
# c_t_1: (batch_size, units)
# b vector is repeated 4 times horizonaltly, -> B: (4 * units)

@tf.function
def lstm_cell(x_t, h_t_1, c_t_1, W, U, B):
    z = x_t @ W + h_t_1 @ U + B # compute all 4 equations in one shot!
    i_t, f_t, c_candidate_t, o_t = tf.split(z, 4, axis=1)
    
    f_t = tf.sigmoid(f_t)
    i_t = tf.sigmoid(i_t)
    o_t = tf.sigmoid(o_t)
    c_candidate_t = tf.tanh(c_candidate_t)
    
    c_t = f_t * c_t_1 + i_t * c_candidate_t
    h_t = o_t * tf.tanh(c_t)
    return h_t, c_t

In [17]:
def lstm(input_sequence, units, kernel_initializer, recurrent_initializer, return_sequences=True, h_init=None, c_init=None, return_state=False):
    assert input_sequence.ndim == 3, "LSTM expects 3D input"
    batch_size, timesteps, n_features = input_sequence.shape

    W_init = kernel_init(shape=(n_features, 4 * units)) 
    U_init = recurrent_init(shape=(units, 4 * units))   
    
    b_init = tf.concat([
        tf.zeros(units),  # Input gate
        tf.ones(units),   # Forget gate
        tf.zeros(units),  # Cell candidate
        tf.zeros(units)   # Output gate
                        ], axis=0)
    
    h_init = tf.zeros(shape=(batch_size,units)) if h_init is None else h_init
    c_init = tf.zeros(shape=(batch_size,units)) if c_init is None else c_init
    
    W = W_init
    U = U_init
    B = b_init
    h_t_1 = h_init 
    c_t_1 = c_init

    hiddens_states = []
    cell_states = []
    
    for t in range(timesteps):
        x_t = input_sequence[:,t,:] 
        h_t, c_t = lstm_cell(x_t, h_t_1, c_t_1, W, U, B)
        
        hiddens_states.append(h_t)
        cell_states.append(c_t)
        
        h_t_1 = h_t
        c_t_1 = c_t
    
    layer_hidden_states = tf.stack(hiddens_states, axis=1)

    if return_state:
        return layer_hidden_states, h_t, c_t
    elif return_sequences:
        return layer_hidden_states  
    else:
        return layer_hidden_states[:, -1, :]   

In [18]:
tf.random.set_seed(42)
input_sequence = tf.random.normal(shape=(batch_size,timesteps,n_features)) 

kernel_init = keras.initializers.glorot_uniform(seed=42)
recurrent_init = keras.initializers.orthogonal(seed=42)

In [19]:

my_lstm_hidden_states = lstm(input_sequence, units, kernel_init, recurrent_init, return_sequences=True, h_init=None, c_init=None, return_state=False)

keras_lstm_layer = keras.layers.LSTM(units,
                                     kernel_initializer=kernel_init, recurrent_initializer=recurrent_init,
                                     return_sequences=True, return_state=False, use_cudnn=False)

keras_lstm_hidden_states = keras_lstm_layer(input_sequence)

print(f"Are they same?: {(my_lstm_hidden_states == keras_lstm_hidden_states).numpy().all()}")

Are they same?: True


Yayy!

## GRU

## Attention (Concatanative)

## Attention (Luong)