Basic utilities for the other notebooks.

In [None]:
%cd "/content/drive/MyDrive/Colab Notebooks"

In [None]:
import numpy as np
import cv2
from PIL import Image
import os
import PIL
import tensorflow as tf
import tensorflow_datasets as tfds
import glob
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import json

### Utils

In [None]:
def plot_results(history: tf.keras.callbacks.History, n_epochs=None):
    """
    Plots training and validation loss of a given experiment.
    :param n_epochs: If None, takes its value as the length of the history.
    """
    fig, ax = plt.subplots(1, 1)
    n_epochs = n_epochs or len(history.history['loss'])
    epochs = np.arange(1, n_epochs+1)
    trainLoss, validationLoss = history.history['loss'][:n_epochs], history.history['val_loss'][:n_epochs]
    ax.plot(epochs, trainLoss, label='Training Loss')
    ax.plot(epochs, validationLoss, label='Validation Loss')


In [None]:
def plot_metrics(
    history: tf.keras.callbacks.History, start_epoch=0, end_epoch=None,
    metrics: str | list[str] = 'loss', data_labels = [('Training Loss', 'Validation Loss')],
    axes_labels = [('Epochs', 'Value')]
):
  """
  Plots a specified set of metrics monitored by keras for training and validation,
  each one in a separate figure.
  :param start_epoch: First (included) epoch whose metric values are plotted.
  :param end_epoch: Last (excluded) epoch whose metric values are plotted.
  :param metrics: List of metrics as they are registered in `history`.
  :param data_labels: Labels for each figure, in the same order as the metric names
  in `metrics`.
  :param axes_labels: Labels for the axes of each figure, in the same order as the metric names
  in `metrics`.
  """
  metrics = [metrics] if isinstance(metrics, str) else metrics
  fig, ax = plt.subplots(len(metrics), 1)
  end_epoch = end_epoch or len(history.history['loss'])
  epochs = np.arange(start_epoch, end_epoch)
  for i, (metric, metric_data_labels, metric_axes_labels) \
    in enumerate(zip(metrics, data_labels, axes_labels)):
    ax[i].set_xlabel(metric_axes_labels[0])
    ax[i].set_ylabel(metric_axes_labels[1])
    ax[i].plot(epochs, history.history[metric][start_epoch:end_epoch], label=metric_data_labels[0])
    ax[i].plot(epochs, history.history[f'val_{metric}'][start_epoch:end_epoch], label=metric_data_labels[1])
    ax[i].legend()

In [None]:
def plot_relative_errors(predictions, targets, epsilon=1e-8, max_error=1, nbins=20):
  """
  Plots relative errors adjusted by a small constant to avoid divisions by 0.
  Relative errors are defined as: ||y_i + epsilon - ypred_i||/||y_i + epsilon||.
  :param predictions: Predicted values (e.g. validation or test set).
  :param targets: Ground truth values.
  :param epsilon: Constant to add to `targets` when calculating relative errors.
  :param max_error: Maximum value for the relative errors to be plotted.
  :param nbins: Number of bins for the histogram plot.
  """
  differences = np.abs(predictions - targets - epsilon)
  relative_errors = differences / np.abs(targets + epsilon)
  relative_errors_x = relative_errors[:, 0]
  relative_errors_y = relative_errors[:, 1]
  relative_errors_z = relative_errors[:, 2]
  print(len(relative_errors_x), len(relative_errors_y), len(relative_errors_z))
  # Prune the cases of a zero target (multiplies by 10**6 the error)
  relative_errors_x = relative_errors_x[targets[:, 0] != 0]
  relative_errors_y = relative_errors_y[targets[:, 1] != 0]
  relative_errors_z = relative_errors_z[targets[:, 2] != 0]
  print(len(relative_errors_x), len(relative_errors_y), len(relative_errors_z))
  fig, ax = plt.subplots(3, 1)
  ax[0].hist(
      relative_errors_x, bins=np.linspace(0, max_error, nbins), label='Relative Error (x)'
  )
  ax[1].hist(
      relative_errors_y, bins=np.linspace(0, max_error, nbins), label='Relative Error (y)'
  )
  ax[2].hist(
      relative_errors_z, bins=np.linspace(0, max_error, nbins), label='Relative Error (z)'
  )
  print(f'x-axis: mean = {np.mean(relative_errors_x)}, std = {np.std(relative_errors_x)}')
  print(f'y-axis: mean = {np.mean(relative_errors_y)}, std = {np.std(relative_errors_y)}')
  print(f'z-axis: mean = {np.mean(relative_errors_z)}, std = {np.std(relative_errors_z)}')
  for i in range(3):
    ax[i].set_xlabel('Error')
    ax[i].set_ylabel('Frequency')

The **`Mean Euclidean Error`**:

In [None]:
def mean_euclidean_error(y_true, y_pred):
  squared_difference = tf.math.square(y_true - y_pred)
  return tf.math.sqrt(tf.math.reduce_sum(squared_difference, axis=-1))

### Dataset Retrieval
The original dataset is made up of $513$ `full-hd` ($1920 \times 1080$) images in `png` format. While in this format a single image requires $\approx 100$ KB of memory, a `numpy` array that contains the image in `float32` datatype will require at least $1920 \times 1080 \times 3 \times 4 \approx 24$ MB of memory, hence the whole dataset will require $\approx 12$ GB and would be unfeasible.

For simplicity, we then define a `get_dataset()` function that resizes all the images to a percentage of the original size through the `PIL.Image.Image.thumbnail()` method. By default we operate with a size of $240 \times 135$ ($12.5\%$ of original width and height values).

In [None]:
def get_dataset(
    image_data_path='data/image/ws_0.5', motion_data_path='data/motion/ws_0.5.npz',
    resize=True, force_resize=True, target_size_perc=0.125, rescale_coordinates=1,
    target_size=None,
  ): # by default 135 x 240
  """
  Returns the already loaded dataset with the images in the given size.
  Parameters:
    image_data_path (str): Path to the directory containing the image data. Default is 'data/image/ws_0.5'.
    motion_data_path (str): Path to the motion data file. Default is 'data/motion/ws_0.5.npz'.
    resize (bool): Flag to enable resizing of the images. Default is True.
    force_resize (bool): Flag to force image resizing even if resized images already exist. Default is True.
    target_size_perc (float): Percentage of the target image size relative to the original size. Default is 0.125.
    rescale_coordinates (float): Scaling factor for the tip position coordinates. Default is 1.
    target_size (tuple): Tuple specifying the target height and width of the images. Default is None.

Returns:
    tuple: A tuple containing two elements:
        - images (numpy.ndarray): A numpy array of shape (num_images, height, width, channels) representing the images.
        - tip_pos (numpy.ndarray): A numpy array of shape (num_images, 3) representing the rescaled tip positions.
  """
  if target_size:
    target_height, target_width = target_size
  else:
    target_height, target_width = int(1080 * target_size_perc), int(1920 * target_size_perc)
  resized_dir_path = os.path.join(image_data_path, f'resized_{target_height}x{target_width}')
  # Resizes images to given size
  if force_resize or (resize and not os.path.exists(resized_dir_path)):
    os.makedirs(resized_dir_path, exist_ok=True)
    pngs = sorted(list(glob.glob(f'{image_data_path}/*.png')))
    for i, png in enumerate(pngs):
      image = PIL.Image.open(png)
      image = image.resize((target_width, target_height))
      image.save(os.path.join(resized_dir_path, f'{i}.png'), "PNG")
  pngs = sorted(list(glob.glob(f'{resized_dir_path}/*.png')), key=lambda x: int(x.split('/')[-1].split('.')[0]))
  images = np.zeros((len(pngs), target_height, target_width, 3), dtype=np.float32)
  for i, png in enumerate(pngs):
    images[i][:, :, :] = np.array(PIL.Image.open(png))
  motion_data = np.load(motion_data_path)
  tip_pos = rescale_coordinates * motion_data['position_rod1'][:, [2, 0, 1], -1]  # all positions of last node ([2,0,1] converts to xyz)
  return images / 255.0, tip_pos

Finally, a train-validation-test split function for Holdout-based model selection.

In [None]:
def holdout_split(train_perc: float, eval_perc: float, test_perc: float, images, tip_pos):
  dev_images, test_images, dev_tip_pos, test_tip_pos = train_test_split(images, tip_pos, test_size=test_perc, random_state=0)
  train_images, eval_images, train_tip_pos, eval_tip_pos = train_test_split(
      dev_images, dev_tip_pos, test_size=eval_perc/(eval_perc + train_perc), random_state=0
  )
  return (train_images, train_tip_pos), (eval_images, eval_tip_pos), (test_images, test_tip_pos)