In [1]:
import tensorflow as tf
import pandas as pd
import numpy as np 

In [None]:
def integration_task(seq_len, num_samples):

    for _ in range(num_samples):

        x = tf.random.normal((seq_len, 1))
        target = tf.expand_dims(tf.cast(tf.math.reduce_sum(x) > 0, tf.int16), -1)

        yield x, target

In [None]:
seq_len = 10
num_samples = 80000

def my_integration_wrapper():

  for x,y in integration_task(seq_len, num_samples):
      yield x,y 



In [None]:
data = tf.data.Dataset.from_generator(my_integration_wrapper, output_signature=(
                                            tf.TensorSpec(shape=(seq_len,1), dtype= tf.float32), 
                                            tf.TensorSpec(shape=(1,), dtype=tf.int16)))


In [None]:
def prepare(ds):   
    # Prepare data for model

    # cache
    ds = ds.cache()
    # shuffle, batch, prefetch our dataset
    ds = ds.shuffle(5000)
    ds = ds.batch(256)
    ds = ds.prefetch(128)
    return ds

In [None]:
# prepare data
ds = prepare(data)

train_ds = ds.take(72000)
test_ds = ds.skip(72000).take(8000)

In [None]:
from tensorflow.keras.layers import Layer
from tensorflow.keras.layers import Dense, Conv2D, MaxPool2D, BatchNormalization, Activation, GlobalAvgPool2D

In [None]:
#class LSTM_Cell(Layer):
#
#    def __init__(self, units):
#        super(LSTM_Cell, self).__init__()
#
#        self.units = units
#
#        # forget gate
#        self.f_gate = Dense(units, activation="sigmoid", bias_initializer='ones')
#
#        # input gate 
#        self.i_gate = Dense(units, activation="tanh")
#
#        # candidate gate
#        self.c_gate = Dense(units, activation="sigmoid")
#
#        # output gate
#        self.o_gate = Dense(units, activation="sigmoid")
#
#        def call(self, x, states):
#
#            # current cell state and hidden cell state
#            hidden_state, cell_state = states
#
#            # put input and hidden state in a tensor 
#            input = tf.concat((x, hidden_state), axis=-1)
#
#            # compute forget gate output
#            forget = self.f_gate(input)
#
#            # update the currentcell state
#            cell_state = cell_state * forget
#
#            # compute input_gate output * candidate gate output 
#            candidate = self.i_gate(input) * self.c_gate(input)
#
#            # update cell state
#            cell_state = cell_state + candidate 
#
#            # compute output
#            output = tf.tanh(cell_state) * self.o_gate(input)
#
#            return output, (output, cell_state)

class LSTM_Cell(Layer):
  def __init__(self, units):
    super(LSTM_Cell, self).__init__()
    self.units = units
    self.forget_Gate = tf.keras.layers.Dense(units, activation=tf.nn.sigmoid, bias_initializer=tf.keras.initializers.Ones)
    self.candidates = tf.keras.layers.Dense(units, activation=tf.nn.tanh, kernel_initializer='orthogonal')
    self.candidates_gate = tf.keras.layers.Dense(units, activation=tf.nn.sigmoid, kernel_initializer='orthogonal')
    self.out_gate = tf.keras.layers.Dense(units, activation=tf.nn.sigmoid, kernel_initializer='orthogonal')
  
  @tf.function
  def call(self, x, states):
    hidden_state, cell_state = states
    concat_input = tf.concat((x, hidden_state), axis=-1)
    cell_state = cell_state*self.forget_Gate(concat_input)


    update = self.candidates(concat_input)*self.candidates_gate(concat_input)
    cell_state = cell_state + update
    out = tf.nn.tanh(cell_state)*self.out_gate(concat_input)
    return out, (out, cell_state)

In [None]:
class LSTM_Layer(Layer):
    def __init__(self, cell):
        super(LSTM_Layer, self).__init__()
        self.cell = cell
    @tf.function
    def call(self, x, states):
        
        seq_len = tf.shape(x)[1]
        #print(seq_len)

        output = tf.TensorArray(dtype=tf.float32, size=seq_len, clear_after_read=True)

        for i in tf.range(seq_len):
            #print(self.cell(x[:, i, :], states))
            out, states = self.cell(x[:,i,:], states)
            output = output.write(i, out)
        result = output.stack()
        #print(result)
        # perm=[1,0,2]?
        result = tf.transpose(result, perm=[1,0,2])
            
        return result
    
    def zero_state(self, batch_size):

        return (tf.zeros((batch_size, self.cell.units)), tf.zeros((batch_size, self.cell.units)))

In [None]:
class LSTM_Model(tf.keras.Model):

   def __init__(self):
       super(LSTM_Model, self).__init__()

       self.read_in = Dense(64, activation='sigmoid')

       self.dense_layer = Dense(32)

       self.lstm_layer = LSTM_Layer(LSTM_Cell(2))

       self.out = Dense(units=1, activation='sigmoid')
   @tf.function
   def call(self, x):
       batch_size = tf.shape(x)[0]
       #print(batch_size)
       x = self.read_in(x)
       zero_state = self.lstm_layer.zero_state(batch_size)
       x = self.lstm_layer.call(x, zero_state)
       x = self.out(x)

       return x

In [None]:
def train_step(model, input, target, loss_function, optimizer):
  # loss_object and optimizer_object are instances of respective tensorflow classes
  with tf.GradientTape() as tape:
    prediction = model(input)
    loss = loss_function(target, prediction[:,-1,:])
    gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss


def test(model, test_data, loss_function):
  # test over complete test data

  test_accuracy_aggregator = []
  test_loss_aggregator = []

  for (input, target) in test_data:
    prediction = model(input)
    sample_test_loss = loss_function(target, prediction)
    sample_test_accuracy =  np.round(target) == np.round(prediction[:,-1,:])
    sample_test_accuracy = np.mean(sample_test_accuracy)
    test_loss_aggregator.append(sample_test_loss.numpy())
    test_accuracy_aggregator.append(np.mean(sample_test_accuracy))

  test_loss = tf.reduce_mean(test_loss_aggregator)
  test_accuracy = tf.reduce_mean(test_accuracy_aggregator)

  return test_loss, test_accuracy

In [None]:
tf.keras.backend.clear_session()

train_dataset = train_ds 
test_dataset = test_ds 

### Hyperparameters
num_epochs = 10
learning_rate = 0.001

# Initialize the model.
model = LSTM_Model()
# Initialize the loss: binary cross entropy. Check out 'tf.keras.losses'.
cross_entropy_loss = tf.keras.losses.BinaryCrossentropy()
# Initialize the optimizer: Nadam with default parameters
optimizer = tf.keras.optimizers.Adam(learning_rate)

# Initialize lists for later visualization.
train_losses = []

test_losses = []
test_accuracies = []

#testing once before we begin
test_loss, test_accuracy = test(model, test_dataset, cross_entropy_loss)
test_losses.append(test_loss)
test_accuracies.append(test_accuracy)

#check how model performs on train data once before we begin
train_loss, _ = test(model, train_dataset, cross_entropy_loss)
train_losses.append(train_loss)

# We train for num_epochs.
for epoch in range(num_epochs):
    print(f'Epoch {str(epoch)} ::: Accuracy: {test_accuracies[epoch]} ::: Loss:{train_losses[epoch]}')
    #training (and checking in with training)
    epoch_loss_agg = []
    for input,target in train_dataset:
        train_loss = train_step(model, input, target, cross_entropy_loss, optimizer)
        epoch_loss_agg.append(train_loss)
    
    #track training loss
    train_losses.append(tf.reduce_mean(epoch_loss_agg))

    #testing, so we can track accuracy and test loss
    test_loss, test_accuracy = test(model, test_dataset, cross_entropy_loss)
    test_losses.append(test_loss)
    test_accuracies.append(test_accuracy)
print('Final accuracy:', test_accuracies[epoch])  

Epoch 0 ::: Accuracy: nan ::: Loss:0.6965903639793396
Epoch 1 ::: Accuracy: nan ::: Loss:0.42759719491004944
Epoch 2 ::: Accuracy: nan ::: Loss:0.2515907883644104
Epoch 3 ::: Accuracy: nan ::: Loss:0.20742614567279816
Epoch 4 ::: Accuracy: nan ::: Loss:0.1821606606245041
Epoch 5 ::: Accuracy: nan ::: Loss:0.16359546780586243
Epoch 6 ::: Accuracy: nan ::: Loss:0.1500256359577179
Epoch 7 ::: Accuracy: nan ::: Loss:0.13891752064228058
Epoch 8 ::: Accuracy: nan ::: Loss:0.12931491434574127
Epoch 9 ::: Accuracy: nan ::: Loss:0.12094012647867203
Final accuracy: tf.Tensor(nan, shape=(), dtype=float32)
