<a href="https://colab.research.google.com/github/Flammingo2/TensorFlow/blob/main/homework9b.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [34]:
import random
%load_ext tensorboard
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
import matplotlib.pyplot as plt
import datetime
import copy
%matplotlib notebook

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


In [35]:
def calc_label(seq, query):
    n1 = tf.reduce_sum(tf.cast(tf.equal(seq, query[0]), tf.int32)) # count frequency of first element
    n2 = tf.reduce_sum(tf.cast(tf.equal(seq, query[1]), tf.int32)) # count frequency of second element 
    
    return tf.constant([int(n1>=n2), int(n2>=n1)])

In [36]:
seq_len = 23

# a generator that yields sequences of length seq_len
# and queries
def sequence_and_query():
  while True:
      seq = tf.random.uniform(shape=(seq_len, 1), minval=0, maxval=10, dtype=tf.int32)
      query = tf.random.uniform(shape=(2,1), minval=0, maxval=10, dtype=tf.int32)
      label = calc_label(seq, query)
      yield seq, query, label


In [37]:
dataset_gen = tf.data.Dataset.from_generator(sequence_and_query,
                                             output_signature=(
                                              tf.TensorSpec(shape=(23,1), dtype=tf.int32),
                                              tf.TensorSpec(shape=(2,1), dtype=tf.int32),
                                              tf.TensorSpec(shape=(2), dtype=tf.int32)))


# We can take a fixed amount of datapoints from our generator dataset
num = 200
data_train_num = num * 64
data_test_num = num//10 * 64

train_dataset = dataset_gen.take(data_train_num).cache()
test_dataset = dataset_gen.take(data_test_num).cache()


In [38]:
for elem in train_dataset.take(1):
  print("Sequence:", "\n", elem[0], "\n")
  print("Query:", "\n", elem[1], "\n")
  print("Label:", "\n", elem[2])

  

Sequence: 
 tf.Tensor(
[[2]
 [4]
 [7]
 [2]
 [0]
 [6]
 [4]
 [0]
 [0]
 [3]
 [4]
 [4]
 [2]
 [4]
 [6]
 [5]
 [6]
 [8]
 [4]
 [4]
 [4]
 [8]
 [6]], shape=(23, 1), dtype=int32) 

Query: 
 tf.Tensor(
[[6]
 [8]], shape=(2, 1), dtype=int32) 

Label: 
 tf.Tensor([1 0], shape=(2,), dtype=int32)


In [39]:
batch_size = 64

# encode data one-hot
train_dataset = train_dataset.map(lambda seq, query, label: 
                                                (tf.one_hot(seq, 10), tf.one_hot(query, 10), label), 
                                      num_parallel_calls=tf.data.experimental.AUTOTUNE).cache()
train_dataset = train_dataset.map(lambda seq, query, label: (seq, tf.reshape(query, (1,-1)), label))
train_dataset = train_dataset.map(lambda seq, query, label: (seq, tf.repeat(query, seq.shape[0], axis=0),label))
train_dataset = train_dataset.map(lambda seq, query, label: (tf.concat((tf.squeeze(seq), query), axis=-1) , label))


test_dataset = test_dataset.map(lambda seq, query, label: 
                                                (tf.one_hot(seq, 10), tf.one_hot(query, 10), label), 
                                      num_parallel_calls=tf.data.experimental.AUTOTUNE).cache()
test_dataset = test_dataset.map(lambda seq, query, label: (seq, tf.reshape(query, (1,-1)), label))
test_dataset = test_dataset.map(lambda seq, query, label: (seq, tf.repeat(query, seq.shape[0], axis=0),label))
test_dataset = test_dataset.map(lambda seq, query, label: (tf.concat((tf.squeeze(seq), query), axis=-1) , label))

# create minibatches
train_dataset = train_dataset.batch(batch_size)
test_dataset = test_dataset.batch(batch_size)

# Prefetch
train_dataset = train_dataset.prefetch(tf.data.experimental.AUTOTUNE)
test_dataset = test_dataset.prefetch(tf.data.experimental.AUTOTUNE)

In [40]:
class LSTM_cell(tf.keras.Model):

  def __init__(self,  unit_number):
    super(LSTM_cell, self).__init__()
    self.forget_gate = tf.keras.layers.Dense(units=unit_number, 
                                             bias_initializer= tf.keras.initializers.ones,
                                             activation='sigmoid')
    self.input_gate = tf.keras.layers.Dense(units=unit_number, activation='sigmoid')
    self.output_gate = tf.keras.layers.Dense(units=unit_number, activation='sigmoid')

    self.candidate_layer = tf.keras.layers.Dense(units=unit_number, activation='tanh')
    self.reshape_layer = tf.keras.layers.Reshape(target_shape = (3,1, unit_number))

  def call(self, last_cell_state, last_hidden_state, input):
    input = tf.concat([last_hidden_state, input], axis = 1)
    #tf.concat((last_hidden_state, input), axis=1)

    #forget gate
    x = tf.math.multiply(self.forget_gate(input), last_cell_state)

    #input gate + candidate gate
    y = tf.math.multiply(self.input_gate(input), self.candidate_layer(input))

    #plus
    new_cell_state = x + y
    x = new_cell_state

    #output gate + multiplication
    output = tf.math.multiply(tf.keras.activations.tanh(x), self.output_gate(input))
    new_hidden_state = output
    return [output, new_cell_state, new_hidden_state]

In [41]:
class LSTM(tf.keras.Model):
  def __init__(self, unit_number = 2):
    super(LSTM, self).__init__()

    self.unit_number = unit_number

    self.readin_layer = tf.keras.layers.Dense(units=30,
                                              activation=tf.keras.activations.sigmoid)

    self.LSTM_cell = LSTM_cell(unit_number)

    self.readout_layer = tf.keras.layers.Dense(units=2, 
                                              activation=tf.keras.activations.softmax)

  def call(self, inputs):
    outputs = []
    cell_state = tf.zeros(shape = (batch_size, self.unit_number), dtype = tf.float32)
    hidden_state = tf.zeros(shape = (batch_size,self.unit_number), dtype = tf.float32)

    for i in range(inputs.shape[1]):
      #print(inputs.shape)
      input = self.readin_layer(inputs[:,i,:])
      #print(input.shape)

      [output, cell_state, hidden_state] = self.LSTM_cell(cell_state,hidden_state, input)
      #input is here as well output of this last timestep
      output = self.readout_layer(output)
      outputs.append(output)
    return outputs

In [42]:
#@tf.function
def train_step(model, data, loss_function, optimizer):
    with tf.GradientTape() as tape:
        pred = model(data[0])
        label = tf.cast(data[1], dtype=tf.float32)
        loss = loss_function(pred[-1], label)
        
        # if the distance between prediction and label is < 0.5 
        # for the respective query elements we say the prediction is correct
        accuracy = tf.reduce_mean(tf.cast(tf.equal(tf.greater(pred[-1],0.5), tf.equal(label,1.0)), dtype=tf.float32))
        gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    return loss, accuracy

def test(model, test_data, loss_function, training = False):
    # Test over complete test data
    
    test_accuracy_aggregator = []
    test_loss_aggregator = []

    for data in test_data:
        pred = model(data[0])
        label = tf.cast(data[1], dtype=tf.float32)
        loss = loss_function(pred[-1], label)
        accuracy =  tf.reduce_mean(tf.cast(tf.equal(tf.greater(pred[-1],0.5), tf.equal(label,1.0)), dtype=tf.float32))
        test_loss_aggregator.append(np.mean(loss.numpy()))
        test_accuracy_aggregator.append(accuracy)
    
    test_loss = np.mean(test_loss_aggregator)
    test_accuracy = np.mean(test_accuracy_aggregator)
      
    return test_loss, test_accuracy

In [43]:
tf.keras.backend.clear_session()

### Hyperparameters
num_epochs = 5
learning_rate = tf.optimizers.schedules.ExponentialDecay(0.001, 5000, 0.97, staircase=True)
running_average_factor = 0.95

model_lstm = LSTM()

# Initialize the loss
binary_entropy_loss = tf.keras.losses.BinaryCrossentropy()

# Initialize the optimizer
optimizer = tf.keras.optimizers.Adam(learning_rate)

# Initialize lists for later visualization.
train_losses = []
train_accuracies = []

test_losses = []
test_accuracies = []

# Train loop for num_epochs epochs.
for epoch in range(num_epochs):
    print('Epoch: __ ' + str(epoch))

    # Training
    running_average_loss = 0
    running_average_accuracy = 0
    for data in train_dataset:
        train_loss, train_accuracy = train_step(model_lstm, data, binary_entropy_loss, optimizer)
        running_average_loss = np.mean((running_average_factor * running_average_loss  + (1 - running_average_factor) * train_loss).numpy())
        running_average_accuracy = np.mean((running_average_factor * running_average_accuracy  + (1 - running_average_factor) * train_accuracy).numpy())
        
    train_losses.append(running_average_loss)
    train_accuracies.append(running_average_accuracy)
   

    # Testing
    test_loss, test_accuracy = test(model_lstm, test_dataset, binary_entropy_loss)
    test_losses.append(test_loss)
    test_accuracies.append(test_accuracy)

    #Print accuracy
    print(test_accuracies[-1])
    print(test_losses[-1])
 

    # Visualize accuracy and loss for training and test data. 
    # One plot training and test loss.
    # One plot training and test accuracy.
    plt.figure()
    line1, = plt.plot(train_losses)
    line2, = plt.plot(test_losses)
    plt.xlabel("Training steps")
    plt.ylabel("Loss")
    plt.legend((line1,line2),("training","test"))
    plt.show()

    plt.figure()
    line1, = plt.plot(test_accuracies)
    plt.xlabel("Training steps")
    plt.ylabel("Accuracy")
    plt.show()

Epoch: __ 0
0.5015625
7.689247


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Epoch: __ 1
0.503125
7.69723


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Epoch: __ 2
0.5046875
7.693631


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Epoch: __ 3
0.4921875
7.7472


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Epoch: __ 4
0.4984375
7.6951003


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>