<a href="https://colab.research.google.com/github/Efefefef/IANNwTF-UniOsnabrueck/blob/main/homework07/homework07.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# HOMEWORK 07

In [1]:
import tensorflow_datasets as tfds
import tensorflow as tf
from keras.layers import Dense, Conv2D, AveragePooling2D, TimeDistributed, LSTM, GlobalAvgPool2D, AbstractRNNCell
from keras.initializers import Orthogonal
from tqdm import tqdm
import datetime

## Prepare Dataset

In [2]:
# Load MNIST dataset
def load_data():
    (train_ds, test_ds), ds_info = tfds.load('mnist', split=['train', 'test'], as_supervised=True, with_info=True)
    return (train_ds, test_ds), ds_info

In [3]:
# Creating new target
def new_target_fnc(ds, window_size):
  l = list()
  for i, elem in enumerate(ds):
    if (i % window_size) == 0:
      l.append(int(elem[1]))
    else:
      if (i % 2) == 0:
        l.append(int(l[i-1] + elem[1]))
      else:
        l.append(int(l[i-1] - elem[1]))
  return l

# Preprocessing data
def preprocess(data, batch_size, window_size):
  new_targets = new_target_fnc(data, window_size)
  new_targets = tf.data.Dataset.from_tensor_slices(new_targets)
  data = tf.data.Dataset.zip((data, new_targets))
  data = data.map(lambda img, new_target: (img[0], new_target))
  data = data.map(lambda img, target: (img, tf.one_hot(target, depth=10)))

  data = data.map(lambda img, target: (tf.cast(img, tf.float32), target))
  data = data.map(lambda img, target: ((img/128.)-1., target))

  data = data.batch(window_size, drop_remainder=True)
  data = data.batch(batch_size, drop_remainder=True)
  data = data.cache().shuffle(1000).prefetch(tf.data.AUTOTUNE)
  return data

## Prepare Model

In [4]:
# CNN Model
class CNN(tf.keras.Model):
  def __init__(self, optimizer, loss_function, input_shape):
    super().__init__()
    # input conv1 = 28x28x1
    self.conv1 = TimeDistributed(Conv2D(24, 3, activation='relu', padding='valid'), input_shape=input_shape)
    # output conv1 = 28x28x24
    self.conv2 = TimeDistributed(Conv2D(24, 3, activation='relu', padding='valid'))
    # output conv2 = 28x28x24
    self.pooling1 = TimeDistributed(AveragePooling2D())
    # output pooling1 = 14x14x24
    self.conv3 = TimeDistributed(Conv2D(24, 3, activation='relu', padding='valid'))
    # output conv3 = 14x14x24
    self.conv4 = TimeDistributed(Conv2D(24, 3, activation='relu', padding='valid'))
    # output conv3 = 14x14x24
    self.globalpooling = TimeDistributed(GlobalAvgPool2D())
    # output globalpooling = 7x7x24
    self.out = TimeDistributed(Dense(10, activation='softmax'))

    self.optimizer = optimizer
    self.loss_function = loss_function

    self.metrics_list = [
        tf.keras.metrics.CategoricalAccuracy(name="accuracy"),
        tf.keras.metrics.Mean(name="loss")
    ]

  @tf.function
  def __call__(self, x, training=False):
    x = self.conv1(x)
    x = self.conv2(x)
    x = self.pooling1(x)
    x = self.conv3(x)
    x = self.conv4(x)

    x = self.globalpooling(x)
    x = self.out(x)
    return x

  # reset all metrics
  def reset_metrics(self):
      for metric in self.metrics:
          metric.reset_states()

  @tf.function
  def train_step(self, data):
      image, label = data

      with tf.GradientTape() as tape:
          prediction = self(image, training = True)
          loss = self.loss_function(label, prediction)

      gradients = tape.gradient(loss, self.trainable_variables)
      self.optimizer.apply_gradients(zip(gradients,self.trainable_variables))
      self.metrics[0].update_state(label, prediction)
      self.metrics[1].update_state(loss)

  @tf.function
  def test_step(self, data):
      image, label = data
      prediction = self(image, training = False)
      loss = self.loss_function(label, prediction)
      self.metrics[0].update_state(label, prediction)
      self.metrics[1].update_state(loss)

In [5]:
# RNN Cell
class RNNCell(AbstractRNNCell):
  def __init__(self, rec_unit_1, rec_unit_2, **kwargs):
    super().__init__(**kwargs)

    self.rec_unit_1 = rec_unit_1
    self.rec_unit_2 = rec_unit_2

    self.linear_1 = Dense(rec_unit_1)
    self.linear_2 = Dense(rec_unit_2)

    # 1st recurrent layer 
    self.rec_layer_1 = Dense(rec_unit_1, 
                             kernel_initializer= Orthogonal(gain=1, seed=None),
                             activation= tf.nn.tanh)
    
    # layer normalisation for trainability
    self.layer_norm_1 = tf.keras.layers.LayerNormalization()

    # 2nd recurrent layer
    self.rec_layer_2 = Dense(rec_unit_2, 
                             kernel_initializer= Orthogonal(gain=1, seed=None),
                             activation= tf.nn.tanh)
    
    # layer normalisation for trainability
    self.layer_norm_2 = tf.keras.layers.LayerNormalization()


    @property
    def state_size(self):
        return [tf.TensorShape([self.rec_unit_1]),
                tf.TensorShape([self.rec_unit_2])]

    @property
    def output_size(self):
        return [tf.TensorShape([self.rec_unit_2])]
    
    def get_initial_state(self, inputs=None, batch_size=None, dtype=None):
        return [tf.zeros([self.rec_unit_1]), 
                tf.zeros([self.rec_unit_2])]

    def call(self, inputs, states):
        # unpack the states
        state_layer_1 = states[0]
        state_layer_2 = states[1]
        
        # linearly project input
        x = self.linear_1(inputs) + state_layer_1
        
        # apply first recurrent kernel
        new_state_layer_1 = self.rec_layer_1(x)
        
        # apply layer norm
        x = self.layer_norm_1(new_state_layer_1)
        
        # linearly project output of layer norm
        x = self.linear_2(x) + state_layer_2
        
        # apply second recurrent layer
        new_state_layer_2 = self.rec_layer_2(x)
        
        # apply second layer's layer norm
        x = self.layer_norm_2(new_state_layer_2)
        
        # return output and the list of new states of the layers
        return x, [new_state_layer_1, new_state_layer_2]
    
    def get_config(self):
        return {"recurrent_units_1": self.rec_unit_1, 
                "recurrent_units_2": self.rec_unit_2}

In [6]:
# RNN Model
class RNNModel(tf.keras.Model):
  def __init__(self):
    super().__init__()
        
    self.rnn_cell = RNNCell(rec_unit_1=24,
                            rec_unit_2=48)
        
    # return_sequences collects and returns the output 
    #    of the rnn_cell for all time-steps
    # unroll unrolls the network for speed (at the cost of memory)
    self.rnn_layer = tf.keras.layers.RNN(self.rnn_cell, 
                                         return_sequences=True, # we need to know every output in each step
                                                                # as we use it for calculation in next state
                                         unroll=True) 
        
    self.output_layer = tf.keras.layers.Dense(37, activation="softmax")
    self.metrics_list = [
                 tf.keras.metrics.CategoricalAccuracy(name="accuracy"),
                 tf.keras.metrics.Mean(name="loss")]

  @property
  def metrics(self):
    return self.metrics_list
    
  def reset_metrics(self):
     for metric in self.metrics:
       metric.reset_state()
        
  def call(self, sequence, training=False):
        
    rnn_output = self.rnn_layer(sequence)
        
    return self.output_layer(rnn_output)

  def train_step(self, data):   
    """
    Standard train_step method, assuming we use model.compile(optimizer, loss, ...)
    """
        
    sequence, label = data
    with tf.GradientTape() as tape:
      output = self(sequence, training=True)
      loss = self.compiled_loss(label, output, regularization_losses=self.losses)
    gradients = tape.gradient(loss, self.trainable_variables)
        
    self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
     
    self.metrics[0].update_state(loss)
    self.metrics[1].update_state(label, output)
        
    return {m.name : m.result() for m in self.metrics}
    
  def test_step(self, data):      
    """
    Standard test_step method, assuming we use model.compile(optimizer, loss, ...)
    """
        
    sequence, label = data
    output = self(sequence, training=False)
    loss = self.compiled_loss(label, output, regularization_losses=self.losses)
                
    self.metrics[0].update_state(loss)
    self.metrics[1].update_state(label, output)
        
    return {m.name : m.result() for m in self.metrics}

## Prepare Training Loop for CNN

In [7]:
# Training Loop for CNN model
def training_loop(model, train_ds, test_ds, epoch, train_summary_writer, test_summary_writer, save_path):
    for epoch in range (epochs):
        model.reset_metrics()

        for data in tqdm(train_ds, position=0, leave=True):
            model.train_step(data)

        with train_summary_writer.as_default():
            tf.summary.scalar(model.metrics[0].name, model.metrics[0].result(), step=epoch)
            tf.summary.scalar(model.metrics[1].name, model.metrics[1].result(), step=epoch)
        
        print("Epoch: ", epoch+1)
        print("Loss: ", model.metrics[1].result().numpy(), "Accuracy: ", model.metrics[0].result().numpy(), "(Train)")
        model.reset_metrics()

        for data in test_ds:
            model.test_step(data)

        with test_summary_writer.as_default():
            tf.summary.scalar(model.metrics[0].name, model.metrics[0].result(), step=epoch)
            tf.summary.scalar(model.metrics[1].name, model.metrics[1].result(), step=epoch)

        print("Loss: ", model.metrics[1].result().numpy(), "Accuracy: ", model.metrics[0].result().numpy(), "(Test)")
    
    model.save_weights(save_path)

## Training the CNN model

In [8]:
batch_size = 32
window_size = 4
(train_ds,test_ds), ds_info = load_data()
train_ds = preprocess(train_ds, batch_size, window_size)
test_ds = preprocess(test_ds, batch_size, window_size)

# for data in train_ds.take(1):
    # print(data[0].shape, data[1])

optimizer = tf.keras.optimizers.Adam()
loss_function = tf.keras.losses.CategoricalCrossentropy()
cnn = CNN(optimizer=optimizer, loss_function=loss_function, input_shape=(window_size, 28, 28, 1))
epochs = 3

current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
save_path = f"models/{current_time}"
train_log_path = f"logs/{current_time}/train"
test_log_path = f"logs/{current_time}/test"
train_summary_writer = tf.summary.create_file_writer(train_log_path)
test_summary_writer = tf.summary.create_file_writer(test_log_path)
training_loop(cnn, train_ds, test_ds, epochs, train_summary_writer, test_summary_writer, save_path)

100%|██████████| 468/468 [01:13<00:00,  6.39it/s]


Epoch:  1
Loss:  1.4767942 Accuracy:  0.23584402 (Train)
Loss:  1.3960503 Accuracy:  0.2996795 (Test)


100%|██████████| 468/468 [01:01<00:00,  7.60it/s]


Epoch:  2
Loss:  1.3744901 Accuracy:  0.2852898 (Train)
Loss:  1.3652352 Accuracy:  0.2960737 (Test)


100%|██████████| 468/468 [01:12<00:00,  6.45it/s]


Epoch:  3
Loss:  1.3453008 Accuracy:  0.2946214 (Train)
Loss:  1.3499708 Accuracy:  0.29757613 (Test)


In [28]:
# Extracting output from CNN model
cnn_output_train = cnn.predict(train_ds)

# Generating input label from training
X_train= list(map(lambda x: x[0], train_ds))
y_train= list(map(lambda x: x[1], train_ds))



In [29]:
print(cnn_output_train.shape)
print(len(y_train))
print(len(X_train))

(14976, 4, 10)
468
468


In [22]:
# Initiating RNN model
rnn = RNNModel()

# Compiling the rnn model
rnn.compile(optimizer, loss_function)

In [23]:
# Training RNN Model 
EXPERIMENT_NAME = "RNN_model"
current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
logging_callback = tf.keras.callbacks.TensorBoard(log_dir=f"./logs/{EXPERIMENT_NAME}/{current_time}")

In [25]:
# Training RNN Model using fit
history = rnn.fit(x= cnn_output_train,
                  y = y_train,
                  validation_data=test_ds,
                  initial_epoch=2,
                  epochs=6,
                  callbacks=[logging_callback])

ValueError: ignored

## Plotting the result

In [None]:
# Plotting RNN model from History
plt.plot(history.history["loss"])
plt.plot(history.history["val_loss"])
plt.legend(labels=["training","validation"])
plt.xlabel("Epoch")
plt.ylabel("Categorical Crossentropy Loss")
plt.show()