<a href="https://colab.research.google.com/github/anniewit/IANNWTF-2020/blob/main/RNN_solution.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#import
import tensorflow as tf
import numpy as np
import tensorflow_datasets as tfds
from tensorflow.keras.layers import Layer
from tensorflow.keras import  Model
import time
import matplotlib.pyplot as plt

In [None]:
#Following Chris Olah here
class LSTM_Cell(Layer):
  def __init__(self, units):
    super(LSTM_Cell, self).__init__()
    self.units = units
    self.forget_Gate = tf.keras.layers.Dense(units, activation=tf.nn.sigmoid, bias_initializer=tf.keras.initializers.Ones)
    self.candidates = tf.keras.layers.Dense(units, activation=tf.nn.tanh, kernel_initializer='orthogonal')
    self.candidates_gate = tf.keras.layers.Dense(units, activation=tf.nn.sigmoid, kernel_initializer='orthogonal')
    self.out_gate = tf.keras.layers.Dense(units, activation=tf.nn.sigmoid, kernel_initializer='orthogonal')
  
  def call(self, x, states):
    hidden_state, cell_state = states
    concat_input = tf.concat((x, hidden_state), axis=-1)
    cell_state = cell_state*self.forget_Gate(concat_input)
    update = self.candidates(concat_input)*self.candidates_gate(concat_input)
    cell_state = cell_state + update
    out = tf.nn.tanh(cell_state)*self.out_gate(concat_input)
    return out, (out, cell_state)

class LSTM(Layer):
  def __init__(self, cell):

    super(LSTM, self).__init__()
    self.cell = cell
  
  def call(self, x, states):
    # x of shape[batch_size, time_steps, size]
    seq_len = tf.shape(x)[1]
    # Tensor Array only needed in graph mode
    outs = tf.TensorArray(dtype=tf.float32, size=seq_len, clear_after_read=True)
    for t in tf.range(seq_len):
      t_out, states = self.cell(x[:,t,:], states)
      outs = outs.write(t, t_out)
    out = outs.stack()
    out = tf.transpose(out, perm=[1,0,2])
    return out

  def zero_state(self, batch_size):
    return (tf.zeros((batch_size, self.cell.units)), tf.zeros((batch_size, self.cell.units)))
    




Generate the data set for the task:

In [None]:
def count_task(len, num_samples):
  for _ in range(num_samples):
    candidate_1 = np.random.randint(0,10)
    candidate_2 = candidate_1
    while candidate_2 == candidate_1:
      candidate_2 = np.random.randint(0,10)
    count_1 = 0
    count_2 = 0
    inputs = []
    for _ in range(len):
      sample = np.random.randint(0,10)
      inputs.append(sample)
      if sample == candidate_1:
        count_1 = count_1 + 1
      if sample == candidate_2:
        count_2 = count_2 + 1
    input = np.asarray(inputs, dtype=np.uint8)
    target = 1
    if count_1 > count_2: 
      target = 0
    context = np.asarray((candidate_1, candidate_2))
    yield input, context, target


SEQ_LEN = 25
def my_count_task():
  for elem in count_task(SEQ_LEN, 80000):
    yield elem




In [None]:
class my_model(Model):
  def __init__(self):
    super(my_model, self).__init__()
    self.in_layer = tf.keras.layers.Dense(64, activation=tf.nn.sigmoid)
    self.in_layer_2 = tf.keras.layers.Dense(32)
    self.lstm_cell = LSTM_Cell(2)
    self.lstm = LSTM(self.lstm_cell)
    self.out=tf.keras.layers.Dense(1, activation=tf.nn.sigmoid)

  def call(self, x):
    batch_size = tf.shape(x)[0]
    x = self.in_layer(x)
    x = self.in_layer_2(x)
    zero_state = self.lstm.zero_state(batch_size)
    x = self.lstm(x, zero_state)
    x = self.out(x)
    return x




In [None]:
@tf.function 
def train_step(model, input, target, loss_function, optimizer, training=True):
  target = tf.expand_dims(target, axis=-1)
  with tf.GradientTape(persistent=True) as tape:
    prediction = model(input)
    loss = loss_function(target, prediction[:,-1,:])
  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  acc = tf.reduce_mean(tf.cast(tf.round(prediction[:,-1,:])==target, dtype=tf.float32))
  return loss, acc

In [None]:
def preprocess_inputs(series, context, seq_len):
  series = tf.one_hot(series,10)
  context = tf.one_hot(context, 10)
  context = tf.reshape(context, (1,20))
  context = tf.repeat(context, seq_len, axis=0)
  out=tf.concat((series,context), axis=-1)
  return out

In [None]:
batch_size = 256
iters = 100
model = my_model()
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
loss_function = tf.losses.BinaryCrossentropy()

data = tf.data.Dataset.from_generator(my_count_task, output_types=(tf.uint8, tf.uint8, tf.uint8))
data = data.map(lambda x, y, z: (preprocess_inputs(x,y, SEQ_LEN), tf.cast(z, dtype=tf.float32)))
data = data.shuffle(buffer_size=1000).batch(batch_size=batch_size).prefetch(20)

for iter in range(iters):
  loss_agg = []
  acc_agg = []
  for input, target in data:
    loss, acc = train_step(model, input, target, loss_function, optimizer)
    loss_agg.append(loss)
    acc_agg.append(acc)
  print("Loss %2.5f ::: Accuracy %2.5f" % (np.mean(loss_agg), np.mean(acc_agg)))




Loss 0.67731 ::: Accuracy 0.58971
Loss 0.67767 ::: Accuracy 0.58823
Loss 0.67702 ::: Accuracy 0.59046
Loss 0.67712 ::: Accuracy 0.59012
Loss 0.67690 ::: Accuracy 0.59042
Loss 0.67748 ::: Accuracy 0.58868
Loss 0.67748 ::: Accuracy 0.58825
Loss 0.67675 ::: Accuracy 0.58988
Loss 0.67596 ::: Accuracy 0.58944
Loss 0.66713 ::: Accuracy 0.60302
Loss 0.65466 ::: Accuracy 0.61485
Loss 0.64633 ::: Accuracy 0.62454
Loss 0.63831 ::: Accuracy 0.63012
Loss 0.63413 ::: Accuracy 0.63567
Loss 0.62947 ::: Accuracy 0.64597
Loss 0.62750 ::: Accuracy 0.64482
Loss 0.62575 ::: Accuracy 0.64896
Loss 0.62058 ::: Accuracy 0.65137
Loss 0.61834 ::: Accuracy 0.65518
Loss 0.61686 ::: Accuracy 0.65679
Loss 0.61542 ::: Accuracy 0.65609
Loss 0.61276 ::: Accuracy 0.65918
Loss 0.61140 ::: Accuracy 0.66138
Loss 0.60849 ::: Accuracy 0.66375
Loss 0.60645 ::: Accuracy 0.66843
Loss 0.60310 ::: Accuracy 0.66864
Loss 0.60198 ::: Accuracy 0.67015
Loss 0.59874 ::: Accuracy 0.67195
Loss 0.59762 ::: Accuracy 0.67491
Loss 0.58968 :