<a href="https://colab.research.google.com/github/amifunny/Deep-Learning-Notebook/blob/master/rnn_cells_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
"""
AIM : TO CREATE TF ASSISTED HAND WRITTEN RNN_CELL , LSTM , GRU

      AND COMPARE THEIR PERFORMANCE ON "IMDB REVIEWs" DATATSET

"""

In [0]:
# ALL Imports Here
import tensorflow as tf
import numpy as np
import tensorflow_datasets as tfds
import re

In [0]:
"""
  We Use Easy to feed  IMDB Dataset from TFDS( TENSORFLOW DATASETS ) at https://www.tensorflow.org/datasets/catalog/imdb_reviews

  Split     No. of Entities
  
  test       25,000
  train      25,000

"""

raw_data,info = tfds.load( 'imdb_reviews' , split=['train','test'] , with_info=True )

# see content map of datasets
print(info)

In [0]:
# Download Pre - Trained GLove EMbeddings ( Quite a big File but save us from Embedding Training )
!wget 'http://nlp.stanford.edu/data/glove.6B.zip'

In [0]:
# unzip the file
!unzip 'glove.6B.zip'

In [0]:
"""
  Create a Hefty Dictionary 
  Matching each word to its Vector

  ** We Use 100D file here. ( GLove offers 50d, 100d, 200d, & 300d vectors)
  TO be clear 100D means each word is represented by 100 dimensional array

"""

embed_dict = {}

with open('/content/glove.6B.100d.txt') as file:

  all_lines = file.readlines()
  for each_line in all_lines:
    
    split_line = each_line.split()
    word = split_line[0]
    vector = np.array( split_line[1:] , dtype = np.float32 )
    embed_dict[ word ] = vector

  file.close()


print("TXT FILE Line Example == > {}".format(each_line) )
print("Num of Words in Dictionary == > {}".format( len(embed_dict) ))
print("EXAMPLE :: ")
print("Embedding of word '' {} '' is == > {}".format( "the",embed_dict['the'] ) )

In [0]:
batch_size = 64
embed_dim = 100
hidden_dim = 512

In [0]:
# store test and train pointers in different Variable( actually they are iterators! )
train_data,test_data = raw_data[0],raw_data[1]

# "map_fn" is called every time daatset is used.
def map_fn(text,label):
 
  x = []
  # remove all letters except alphanumerics and period. eg. " don't " become " dont "
  cleaned_text = re.sub( r'[^a-zA-Z0-9. ]' , '' , text.numpy().decode('utf-8') )
  # remove sticky periods
  cleaned_text = re.sub( r'[.]' , ' .', cleaned_text )

  for w in cleaned_text.lower().split():

    try:
      x.append( embed_dict[ w ] )
    except:
      # FOR <UNK> i.e Unknown Token
      x.append( np.zeros(embed_dim) )
      continue

  y = tf.cast( label , tf.float32 )

  return tf.convert_to_tensor(x , dtype=tf.float32 ),y

# its just a "wrapper" for "map_fn" as map_fn is "pythonic" instead of "tensorflow-ic!!!"
def map_fn_wrapper(data):
  """
    Data is dict of "label" and "text"
  """

  x,y = tf.py_function( map_fn , inp=[ data['text'],data['label'] ] , Tout=[ tf.float32,tf.float32 ] )
  return x,y


In [0]:
train_data = train_data.shuffle( 25000 )
train_data = train_data.map( map_fn_wrapper )
train_data = train_data.padded_batch( batch_size , padded_shapes=( [None,embed_dim],[] ) , drop_remainder=True )

In [0]:
test_data = test_data.shuffle( 25000 )
test_data = test_data.map( map_fn_wrapper )
test_data = test_data.padded_batch( batch_size , padded_shapes=( [None,embed_dim],[] ) )

In [0]:
# see how a batch may look like
for one_batch in train_data.take(1):
  print( one_batch[0].shape )
  print( one_batch[1].shape )
  print( one_batch[1] )


In [0]:

# see how a batch may look like
for one_batch in train_data.take(1):
  print( one_batch[0].shape )
  print( one_batch[1].shape )
  print( one_batch[1] )


In [0]:
loss_fn = tf.keras.losses.BinaryCrossentropy()
optimizer = tf.keras.optimizers.Adam(0.001)

def training( model , epochs , train_batches , test_batches ):

  avg_loss = tf.keras.metrics.Mean()
  accuracy = tf.keras.metrics.Accuracy()

  with tf.device('/device:GPU:0'):
    
    for e in range(epochs):

      print("EPOCH {}  ...  ".format(e))

      avg_loss.reset_states()
      accuracy.reset_states()

      for train_batch in train_batches:
        
        with tf.GradientTape() as tape:
          pred = model( train_batch[0] )
          loss = loss_fn( train_batch[1] , pred  )

        grads = tape.gradient( loss , model.trainable_variables )
        optimizer.apply_gradients( zip(grads,model.trainable_variables) )

      for test_batch in test_batches:

        pred = model( test_batch[0] )
        pred_categ = tf.where(pred>0.5,1.0,0.0)
        accuracy.update_state( test_batch[1] , pred_categ )

      print( "test_accuracy ==> {} ,  train_loss ==> {} ".format( accuracy.result(),avg_loss.result() ) )

  return


In [0]:
class SEQ_MODEL(tf.keras.Model):

  def __init__(self,cell,is_lstm=False):
    super(SEQ_MODEL,self).__init__()
    self.main_cell = cell
    self.is_lstm = is_lstm

    # this is bcz lstm has separate memory and hidden activations
    if self.is_lstm:
      self.hidden_state = [ tf.zeros( [batch_size,hidden_dim] ) , tf.zeros( [batch_size,hidden_dim] ) ]
    else:
      self.hidden_state = [ tf.zeros( [batch_size,hidden_dim] ) ]

    
  def call(self,inputs):
    """
      inputs.shape => (batch, time_steps , hidden_dim)
      prev_hidden_state.shape =>  (batch , hidden_dim)
    """

    seq_list = []

    hidden_state = self.hidden_state

    if inputs.shape[1] is None:
      # For compilation of Model with Variable Shape 
      time_steps = 1
    else:
      time_steps = inputs.shape[1]

    for each_step in range( time_steps ):

      hidden_state = self.main_cell( inputs[:,each_step,:] , self.hidden_state )
      seq_list.append( hidden_state[0] )

    # final_output.shape ==> (batch , hidden_dim)
    final_output = hidden_state[0]

    # seq_output.shape ==> (batch, time_steps , hidden_dim)
    seq_output = tf.stack( seq_list , axis=1 )

    if self.is_lstm:
      memory = hidden_state[1]
      return final_output,seq_output,memory
    else:
      return final_output,seq_output


In [0]:
# CUSTOM RNN CELL
class RNN_CELL(tf.keras.layers.Layer):
  
  def __init__(self,initializer,hidden_dim):
    
    super(RNN_CELL,self).__init__()

    self.Waa = tf.Variable( initial_value=initializer( [hidden_dim,hidden_dim] ) , trainable=True )
    self.Wax = tf.Variable( initial_value=initializer( [embed_dim,hidden_dim] ) , trainable=True )

  def call(self,inputs,hidden_states):

    prev_hidden_state = hidden_states[0]
    hidden_state = tf.math.tanh( tf.matmul( prev_hidden_state , self.Waa ) + tf.matmul( inputs , self.Wax )  )
    return [hidden_state]


custom_cell = RNN_CELL( tf.keras.initializers.GlorotNormal() , hidden_dim )
temp_hidden = custom_cell( tf.random.normal( [32,embed_dim] ) , [tf.zeros( [32,hidden_dim] )] )
print( "Hidden State Shape ==> {}".format( temp_hidden[0].shape ) )

In [0]:
# CUSTOM LSTM CELL
class LSTM_CELL(tf.keras.layers.Layer):
  
  def __init__(self,initializer,hidden_dim):
    
    super(LSTM_CELL,self).__init__()

    """
     these are stacked weights , to so we don't deal with too many Variables
     Wax(embed_dim,hidden_dim) and Waa(hidden_dim,hidden_dim)---stacked to-- W(embed_dim+hidden_dim,hidden_dim)
    """
    self.W_c = tf.Variable( initial_value=initializer( [embed_dim+hidden_dim,hidden_dim] ) )
    self.W_u = tf.Variable( initial_value=initializer( [embed_dim+hidden_dim,hidden_dim] ) )
    self.W_f = tf.Variable( initial_value=initializer( [embed_dim+hidden_dim,hidden_dim] ) )
    self.W_o = tf.Variable( initial_value=initializer( [embed_dim+hidden_dim,hidden_dim] ) )

  def call(self,inputs,hidden_states):

    prev_hidden_state,prev_memory = hidden_states[0],hidden_states[1]

    # stacked_input.shape ==> (batch_size,embed_dim+hidden_dim)
    stacked_input = tf.concat( [inputs,prev_hidden_state] , 1 )
    
    memory = tf.math.tanh( tf.matmul( stacked_input , self.W_c ) )
    update_gate = tf.math.sigmoid( tf.matmul( stacked_input , self.W_u ) )
    forget_gate = tf.math.sigmoid( tf.matmul( stacked_input , self.W_f ) )
    output_gate = tf.math.sigmoid( tf.matmul( stacked_input , self.W_o ) )
    
    memory = update_gate*memory + forget_gate*prev_memory
    hidden_state = output_gate*tf.nn.tanh( memory )

    """
    NOTICE : MEMORY CELL and HIDDEN State are different for LSTM
    """

    return [hidden_state,memory]


custom_cell = LSTM_CELL( tf.keras.initializers.GlorotNormal() , hidden_dim )
temp_hidden = custom_cell( tf.random.normal( [32,embed_dim] ) , [tf.zeros( [32,hidden_dim] ) , tf.zeros( [32,hidden_dim] ) ] )
print( "Hidden State or Activation Shape ==> {}".format(temp_hidden[0].shape) )
print( "Memory Shape ==> {}".format(temp_hidden[1].shape) )

In [0]:
# CUSTOM GRU CELL
class GRU_CELL(tf.keras.layers.Layer):
  
  def __init__(self,initializer,hidden_dim):
    
    super(GRU_CELL,self).__init__()

    """
     these are stacked weights , to so we don't deal with too many Variables
     Wax(embed_dim,hidden_dim) and Waa(hidden_dim,hidden_dim)---stacked to-- W(embed_dim+hidden_dim,hidden_dim)
    """
    self.W_c = tf.Variable( initial_value=initializer( [embed_dim+hidden_dim,hidden_dim] ) )
    self.W_u = tf.Variable( initial_value=initializer( [embed_dim+hidden_dim,hidden_dim] ) )
    self.W_r = tf.Variable( initial_value=initializer( [embed_dim+hidden_dim,hidden_dim] ) )


  def call(self,inputs,hidden_states):

    prev_memory = hidden_states[0]

    stacked_input = tf.concat( [inputs,prev_memory] , 1 )
    rel_gate = tf.math.sigmoid( tf.matmul( stacked_input , self.W_r ) )

    rel_input = tf.concat( [ inputs,rel_gate*prev_memory] , 1 )

    memory = tf.math.tanh( tf.matmul( rel_input , self.W_c ) )
    update_gate = tf.math.sigmoid( tf.matmul( stacked_input , self.W_u ) )
    
    memory = update_gate*memory + (1-update_gate)*prev_memory

    """
    NOTICE : MEMORY CELL and HIDDEN State are same for GRU
    You can denote memory in above code with 'hidden_state'
    """

    return [memory]


custom_cell = GRU_CELL( tf.keras.initializers.GlorotNormal() , hidden_dim )
temp_hidden = custom_cell( tf.random.normal( [32,embed_dim] ) , [tf.zeros( [32,hidden_dim] )] )
print( "Hidden State Shape ==> {}".format(temp_hidden[0].shape) )

In [0]:
# "Let the Training BEGINS"

# FIRST UP RNN MODEL
rnn_model = SEQ_MODEL( RNN_CELL( tf.keras.initializers.GlorotNormal() , hidden_dim ) , is_lstm=False )

inputs = tf.keras.layers.Input([None,embed_dim] , batch_size=batch_size)
final_out,seq_out = rnn_model( inputs )
out = tf.keras.layers.Dense(256,activation='relu')( final_out )
outputs = tf.keras.layers.Dense(1,activation='sigmoid')( out )

rnn_final_model = tf.keras.Model(inputs,outputs)
rnn_final_model.summary()

In [0]:
training( rnn_final_model , 10 , train_data , test_data  )

In [0]:
# SECOND UP LSTM MODEL
lstm_model = SEQ_MODEL( LSTM_CELL( tf.keras.initializers.GlorotNormal() , hidden_dim ) , is_lstm=False )

inputs = tf.keras.layers.Input([None,embed_dim] , batch_size=batch_size )
final_out,seq_out = lstm_model( inputs )
out = tf.keras.layers.Dense(256,activation='relu')( final_out )
outputs = tf.keras.layers.Dense(1,activation='sigmoid')( out )

lstm_final_model = tf.keras.Models(inputs,outputs)
lstm_final_model.summary()

In [0]:
training( lstm_final_model , 10 , train_data , test_data  )

In [0]:
# THIRD UP GRU MODEL
gru_model = SEQ_MODEL( GRU_CELL( tf.keras.initializers.GlorotNormal() , hidden_dim ) , is_lstm=False )

inputs = tf.keras.layers.Input([180,embed_dim],batch_size=batch_size)
final_out,seq_out = gru_model( inputs )
outputs = tf.keras.layers.Dense(256)( final_out )

gru_final_model = tf.keras.Model(inputs,outputs)
gru_final_model.summary()

In [0]:
training( gru_final_model , 10 , train_data , test_data  )

In [0]:
class SEQ_MODEL(tf.keras.Model):

  
  def __init__(self,cell,is_lstm=False):
    super(SEQ_MODEL,self).__init__()
    self.reccur_layer = Reccuring_layer( cell , is_lstm )

    # layer to further compute upon 'recurr_layer' final output
    self.dense1 = tf.keras.layers.Dense(hidden_dim)
    self.dense0 = tf.keras.layers.Dense(1)
    
  def call(self,inputs):
    """
      inputs.shape => (batch, time_steps , hidden_dim)
    """

    # for classification purpose ignore output at each time step
    out,_ = self.reccur_layer(inputs)
    out = self.dense1(out)
    outputs = self.dense0(out)

    return outputs