<a href="https://colab.research.google.com/github/JacopoBartoli/vas_regression/blob/main/test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#1) Install packages and organize imports.
In this section we install the needed packages and import them.
We set some variables for the used paths, and mount GDrive.

In [None]:
!pip install tensorflow-addons

In [None]:
import tensorflow as tf
import tensorflow_addons as tfa
import tensorflow_datasets as tfds
import tensorboard
import keras
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
import time, datetime, math, io, sklearn.preprocessing, itertools
from sklearn.metrics import confusion_matrix

Useful paths.

In [None]:
# Path to the datasets.
DATASET_DIR = '/content/gdrive/My Drive/IVA/data/'
# Path to where we save logs.
LOGS_DIR = '/content/gdrive/My Drive/IVA/logs'
# Path to where we save the checkpoints.
CHECKPOINT_DIR = '/content/gdrive/My Drive/IVA/checkpoint/train'
# Path to where we save the model.
MODEL_DIR = '/content/gdrive/My Drive/IVA/model'

Mount the drive.

In [None]:
# Mount your drive to access the dataset.
# Remember to link the dataset as explained above.
from google.colab import drive
drive.mount('/content/gdrive')

#2) Manage the data.
In this section we manipulate and extract the data.

##2.1) Load the train set.

Define the name of the dataset used for 
training.

The data in the .csv can have a variable number of features. But three column are always needed. They are 'Sequenza', 'Frame' and 'Label'.
The first represent the id of a sequence, the second the id of a frame. The third represent the label of each frame.


Each row of the file need to represent a frame, and each frame of the same sequence need to have the same label.



In [None]:
# Name of the dataset used.
TEST_SET = 'test-velocity-66-sampled.csv'

Load the train set from a .csv file.

In [None]:
df = pd.read_csv(DATASET_DIR + TEST_SET)
print(df.head())

## 2.2) Divide the labels from the data.

The labels and the data are saved in different data structure.


In [None]:
# Needed later for confusion matrix
number_of_labels = df['Label'].tolist()
number_of_labels = len(list(dict.fromkeys(number_of_labels))) - 1

df_test = df.drop(['Frame'], axis=1)

# Extract the labels.
lbl_test = df_test['Label']

# Remove the labels from the data.
df_test = df_test.drop(['Label'], axis = 1)

In [None]:
print(df_test.head())

##2.3) Preprocessing of the sequences.

In the dataset each row represent a frame of the sequence. Each frame in a sequence has the same label. We want to make some preprocessing for having a dataset that has a single label for each sequence (not one for each frame). We want that each item of the dataset represent a whole sequence and not a frame.


In [None]:
# Need to pass different in a separate ways lbl and data.
def preprocessing_sequences(data, lbl):
  # Preprocess the labels.
  # The label and the ids of the sequence are concatenated together.
  seq_ids = data['Sequenza']

  tmp = pd.concat([seq_ids, lbl], axis=1)
  tmp = tmp.set_index('Sequenza')
  # Then they are gourped by sequence id so we can have a single label for each
  # sequence.
  tmp = tmp.groupby(level='Sequenza').mean()

  labels = tmp['Label'].values

  min_seq = data['Sequenza'].min()
  num_seqs = data['Sequenza'].max() - data['Sequenza'].min() + 1
  min_seq = int(min_seq)
  num_seqs = int(num_seqs)

  # Create the new dataset.
  temp = []
  for id in tqdm(range(min_seq, min_seq + num_seqs)):
    # Extract sequences one by one.
    seq = data.loc[data['Sequenza'] == id]

    # Remove the unused columns.
    seq = seq.drop(['Sequenza'], axis=1)
    num_col = len(seq.columns)

    # Iterate over each row of the selected sequence  
    temp_row = []
    for index, row in seq.iterrows():
      temp_row = np.append(temp_row, row)
    temp_row = np.reshape(temp_row, (-1, num_col))

    temp.append(temp_row[:])

  return temp, labels


In [None]:
df_test, lbl_test = preprocessing_sequences(df_test, lbl_test)

## 2.4) Create and manage the test.


In [None]:
ds_test = tf.data.Dataset.from_tensor_slices((df_test, lbl_test))

In [None]:
BATCH_SIZE = 1
BUFFER_SIZE = 5000
random_seed = 1337

Function to apply some preprocessing when making batches.

In [None]:
def make_batches(ds):
  return (
      ds
      .cache()
      .shuffle(BUFFER_SIZE,seed=random_seed)
      .batch(BATCH_SIZE)
      .prefetch(tf.data.AUTOTUNE))

Now we divide in batches the validation and training sets.

In [None]:
test_batches = make_batches(ds_test)

# 3) Evaluation.

## 3.1) Set the the error metric.

In [None]:
# This loss and accuracy objects are meant for regression.
# For classifications other metrics will be needed.
error_object = tf.keras.metrics.MeanAbsoluteError()

## 3.2) Custom implementation of the error function. 
Add a way to customize the error function.

In [None]:
def error_function(real, pred):

  errors = error_object(real, pred)
  
  return errors

Create the metrics object.

In [None]:
test_error = tf.keras.metrics.Mean(name='test_error')

## 3.3) Set the paths for Tensorboard.
The test_log_dir need to be associate to a valid train_log_dir using their timestamp.

In [None]:
# In this case we use the model associated to the train of the 2021-10-14 at 8:34:14
current_time = datetime.datetime(2021, 10, 14, 8, 34, 12).strftime("%Y%m%d-%H%M%S")
test_log_dir = LOGS_DIR + '/gradient_tape/' + current_time + '/test'
test_summary_writer = tf.summary.create_file_writer(test_log_dir)

### 3.3.1) Define some utilities functions for confusion matrix visualization inside Tensorboard.

In [None]:
# Needed for confusion matrix visualization
# This can be done just because the regression task has a finite number of labels.
tag_list = [float(_dummy) for _dummy in range(number_of_labels + 1)]

Utilities functions for confusion matrix visualization.

In [None]:
def plot_to_image(figure):
  """Converts the matplotlib plot specified by 'figure' to a PNG image and
  returns it. The supplied figure is closed and inaccessible after this call."""
  # Save the plot to a PNG in memory.
  buf = io.BytesIO()
  plt.savefig(buf, format='png')
  # Closing the figure prevents it from being displayed directly inside
  # the notebook.
  plt.close(figure)
  buf.seek(0)
  # Convert PNG buffer to TF image
  image = tf.image.decode_png(buf.getvalue(), channels=4)
  # Add the batch dimension
  image = tf.expand_dims(image, 0)
  return image

def plot_confusion_matrix(cm, class_names):
  """
  Returns a matplotlib figure containing the plotted confusion matrix.

  Args:
    cm (array, shape = [n, n]): a confusion matrix of integer classes
    class_names (array, shape = [n]): String names of the integer classes
  """
  figure = plt.figure(figsize=(8, 8))
  plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
  plt.title("Confusion matrix")
  plt.colorbar()
  tick_marks = np.arange(len(class_names))
  plt.xticks(tick_marks, class_names, rotation=45)
  plt.yticks(tick_marks, class_names)

  # Compute the labels from the normalized confusion matrix.
  # Remove number equals to zero.
  divider = cm.sum(axis=1)[:, np.newaxis]
  divider = np.where(divider!=0, divider, 1)

  labels = np.around(cm.astype('float') / divider, decimals=2)

  # Use white text if squares are dark; otherwise black.
  threshold = cm.max() / 2.
  for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
    color = "white" if cm[i, j] > threshold else "black"
    plt.text(j, i, labels[i, j], horizontalalignment="center", color=color)

  plt.tight_layout()
  plt.ylabel('True label')
  plt.xlabel('Predicted label')
  return figure

## 3.4) Load the model.
Since it's a custom model it can't be load as .h5 config.

In [None]:
!ls -l '/content/gdrive/My Drive/IVA/model'

In [None]:
transformer = keras.models.load_model('/content/gdrive/My Drive/IVA/model/' + current_time + '/transformers')

## 3.5) Evaluate the model.

In [None]:
def test_step(inp,tar):
  tar_real = tar

  
  predictions = transformer(inp, training = False)
  error = error_function(tar_real, predictions)


  test_error(error)
  
  return predictions


In [None]:
start = time.time()
  
test_error.reset_states()

# Needed for histogram visualization.
predictions_histogram = []
labels_histogram = []
y_pred, y_true = [], [] # Needed for the confusion matrix


for (batch, (inp, tar)) in enumerate(test_batches):
    predictions = test_step(inp, tar)

    # Save the histogram of predictions.
    predictions_histogram = np.hstack((predictions_histogram, tf.reshape(predictions, len(predictions))))    
    labels_histogram = np.hstack((labels_histogram, tar))

    # Transform back the label value, from the 0 to 1 representation,
    # It's a simple scale back.
    y_pred.extend(np.around(np.array(predictions)*number_of_labels))
    y_true.extend(np.around(np.array(tar)*number_of_labels))



# Build confusion matrix.
cm = confusion_matrix(y_pred, y_true)
figure = plot_confusion_matrix(cm, class_names=tag_list)
cm_image = plot_to_image(figure)

with test_summary_writer.as_default():
   tf.summary.scalar('Error', test_error.result(),step = 0)
   tf.summary.histogram('Predictions distribution', predictions_histogram, step=0)
   tf.summary.histogram('Ground Truth distribution', labels_histogram, step=0)
   tf.summary.image('Confusion Matrix', cm_image, step=0)