In [23]:
# to access the google drive.
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [24]:
# to import local python modules.
import sys
sys.path.insert(0,"/content/drive/MyDrive/Colab-Notebooks/AdvanceRoboticsWorkshop/rtp_cnn/")
from promp import ProMP

In [25]:
# imports
import os
import json
import numpy as np
import tensorflow as tf
from tensorflow.keras import Model
import tensorflow.keras.layers as layers
from tensorflow.python.keras.metrics import MeanMetricWrapper
import abc
import matplotlib.pyplot as plt
from skimage.io import imread
from skimage.transform import resize
import random


In [31]:
class RTP_RGBD:
  """ this class contains all the preprocessing, training and testing codes for
      RTB-RGB CNN model network.

  """
  def __init__(self):

    # ProMP configurations
    self.n_basis = 10
    self.n_dof = 7
    self.n_t = 100

    # Trajectories -> ProMP Weights -> Model Labels
    self.promp = ProMP(n_basis = self.n_basis, n_dof = self.n_dof, n_t = self.n_t )

    # Encoder configurations.
    self.AUTOENCODER_MODEL_PATH = "/content/drive/MyDrive/Colab-Notebooks/AdvanceRoboticsWorkshop/autoencoders/model/rtp-rgbd/"
    self.IMAGE_DIR = "/content/drive/MyDrive/Colab-Notebooks/AdvanceRoboticsWorkshop/data/rtp-rgbd/color_img/"
    self.IMAGE_RESHAPE = (256, 256, 3)
    
    # Generate a simple model from trained autoencoder. 
    # This model takes in input image and returns bottleneck-layer. 
    # Input Image -> Encoder -> bottleneck output -> CNN
    model_layers = tf.keras.models.load_model(self.AUTOENCODER_MODEL_PATH)
    bottleneck_layer = model_layers.get_layer("bottleneck").output
    self.encoder = Model(inputs=model_layers.inputs, outputs=bottleneck_layer, name="encoder")

    self.load_dataset()

    # Model configuration 
    self.LOSS_DIR = "/content/drive/MyDrive/Colab-Notebooks/AdvanceRoboticsWorkshop/rtp_cnn/model/"
    self.MODEL_DIR = "/content/drive/MyDrive/Colab-Notebooks/AdvanceRoboticsWorkshop/rtp_cnn/model/"
    self.BATCH_SIZE = 10
    self.EPOCHS = 6000
    self.LR = 1e-7
    self.TEST_SIZE = 10
    self.MODEL_NAME = "rtp_rgbd_cnn_1"
    self.MODEL_WEIGHTS_PATH = os.path.join(self.MODEL_DIR, self.MODEL_NAME, "weights")

    self.load_model()

  def load_dataset(self) -> None:
    '''Load data i.e. color images and joint_position from data.
    ''' 

    # Generate file ids based on regions. 
    regions = ["A", "B", "C", "D"]
    total = 10
    id_list = [ region + "_" + str(n).zfill(3) for n in range(1,total) for region in regions]
    
    train_set = int (0.7 * len(id_list))
    test_set = len(id_list) - train_set
    id_list_train , id_list_test = random.sample( id_list, train_set ), random.sample( id_list, test_set )
    self.idx_list_train = [ id_list.index(i) for i in id_list_train if id_list.index(i) < len(id_list)]
    self.idx_list_test  = [id_list.index(i) for i in id_list_test if id_list.index(i) < len(id_list)]


    # load encoded images 
    self.encoded_images = self.load_encoded_images(self.IMAGE_DIR, id_list, self.encoder, self.IMAGE_RESHAPE)

    # Labels configuration.
    self.TRAJ_DIR = "/content/drive/MyDrive/Colab-Notebooks/AdvanceRoboticsWorkshop/data/rtp-rgbd/trajectories/"

    # Load trajectories.
    trajectories = self.load_array_from_json(self.TRAJ_DIR, "joint_position", id_list, slicer=np.s_[..., 0:self.n_dof])

    # Convert each trajectory into ProMP weights.
    self.promp_weights = np.zeros((len(trajectories), self.promp.n_basis * self.promp.n_dof))
    for i, trajectory in enumerate(trajectories):
        self.promp_weights[i, :] = self.promp.weights_from_trajectory(trajectory)


  def load_encoded_images(self, img_dir, id_list, encoder, resize_shape = None, normalize = False) -> np.ndarray:
    """Load multiple images by ID from a directory an pass them through an encoder model.

    Args:
        img_dir (str): Path of the directory.
        id_list (np.ndarray): List of image id
        encoder (tf.keras.Model): The encoder model.
        resize_shape (tuple[int], optional): If specified, reshape all images to this shape. Defaults to None.
        normalize (bool, optional): If True, normalize the image in [-1, 1]. Defaults to False.

    Returns:
        np.ndarray: The image data with shape (samples, width, height, channels).
    """
    return np.array([self.load_encoded_image(os.path.join(img_dir, id + ".png"), encoder, resize_shape, normalize) for id in id_list])
  
  def load_encoded_image(self, img_path, encoder, resize_shape = None, normalize = False) -> np.ndarray:
      """Load an image an pass it through an encoder model.

      Args:
          img_path (str): Path of the image.
          encoder (tf.keras.Model): The encoder model.
          resize_shape (tuple[int], optional): If specified, reshape all images to this shape. Defaults to None.
          normalize (bool, optional): If True, normalize the image in [-1, 1]. Defaults to False.

      Returns:
          np.ndarray: The image data with shape (width, height, channels).
      """
      return np.squeeze(encoder(np.expand_dims(self.load_image(img_path, resize_shape, normalize), axis=0)))

  def load_image(self, img_path, resize_shape = None, normalize = False) -> np.ndarray:
      """Load an image from file.

      Args:
          img_path (str): Path of the image.
          resize_shape (tuple[int], optional): If specified, reshape all images to this shape. Defaults to None.
          normalize (bool, optional): If True, normalize the image in [-1, 1]. Defaults to False.

      Returns:
          np.ndarray: The image data.
      """

      img = imread(img_path, pilmode='RGB')
      if resize_shape:
          img = resize(img, resize_shape)
      if normalize:
          img = self.normalize_negative_one(img)
      return img

  def normalize_negative_one(self, img, two_d=True):
      normalized_input = (img - np.amin(img)) / (np.amax(img) - np.amin(img))
      if two_d:
          return 2*normalized_input - 1
      else:
          return 2*normalized_input - 1, np.amin(img), np.amax(img)


  def load_array_from_json(self, dir_path, json_key, id_list, slicer = None):
      """Load (a slice of) a Numpy array from multiple JSON files in a folder.

      Can be used, for example, to load joint states from multiple trajectory files.

      Args:
          dir_path (str): The path of the directory containing the files.
          json_keys (list[str]): The keys to be read from the JSON files.
          id_list (list[str]): The ids of the files to be read.
          slicer (tuple[slice], optional): If specified, slice every array accoroding to this slicer. Defaults to None.

      Returns:
          list[np.ndarray]: An array for each key.
      """

      data = []
      for id in id_list:
          file_path = os.path.join(dir_path, f"{id}.json")
          with open(file_path, "r") as file:
              json_data = json.load(file)[json_key]
              arr = np.array(json_data)
              if slicer:
                  arr = arr[slicer]
              data.append(arr)

      return data



  def load_model(self, name = "RTP"):

    self.optimizer = tf.keras.optimizers.Adam(learning_rate=self.LR)
    self.loss = self.get_joint_loss(self.promp)
    self.metrics_test = [
        RmseJointsPromp(self.promp, promp_space="joint", name="rmse_joints")
    ]

    self.metrics_train = [
        RmseJointsPromp(self.promp, promp_space="joint", name="rmse_joints"),
    ]


    l1_reg = 0.0
    l2_reg = 0.0
    l1_l2_reg = tf.keras.regularizers.l1_l2(l1_reg, l2_reg)

    input_layer = layers.Input(shape=(32, 32, 3), name="encoded_image_input")

    # convolution sub-network
    x = layers.Conv2D(32, (3, 3), padding="same", kernel_regularizer=l1_l2_reg)(input_layer)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)
    x = layers.Dropout(0.25)(x)

    x = layers.Conv2D(16, (3, 3), padding="same", kernel_regularizer=l1_l2_reg)(x)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)
    x = layers.Dropout(0.25)(x)

    x = layers.Conv2D(8, (3, 3), padding="same", kernel_regularizer=l1_l2_reg)(x)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)
    x = layers.Dropout(0.25)(x)

    x = layers.Conv2D(4, (3, 3), padding="same", kernel_regularizer=l1_l2_reg)(x)
    x = layers.Flatten(name="feature_vec")(x)

    # fully connected sub-network
    neorons = [64]
    for dense_size in [64]:
        x = layers.Dense(dense_size, activation="relu", kernel_regularizer=l1_l2_reg)(x)
        
    output_layer = layers.Dense(self.promp.n_dof * self.promp.n_basis, activation="linear")(x)
    return Model(inputs=input_layer, outputs=output_layer, name=name)

  def get_joint_loss(self, promp):
      """Obtain the loss function evaluating the RMSE on the joint trajectories from the full ProMP weights (in TensorFlow).

      Args:
          promp (ProMP): An initialized ProMP instance.

      Returns:
          The loss function
      """

      def joint_loss(promp_true, promp_pred):
          """RMSE loss on the joint trajectory from the full ProMP weights.

          Args:
              promp_true: Ground truth full ProMP weights with shape (n_batch, n_basis * n_dof).
              promp_pred: Predicted full ProMP weights with shape (n_batch, n_basis * n_dof).

          Returns:
              The resulting loss.
          """
          traj_true = promp.trajectory_from_weights_tf(promp_true)
          traj_pred = promp.trajectory_from_weights_tf(promp_pred)
          # RMSE on joint trajectories.
          loss = tf.sqrt(tf.reduce_mean(tf.square(traj_true - traj_pred), axis=1))
          # Average over batches and joints.
          loss = tf.reduce_mean(loss)
          return loss

      return joint_loss
      

  def get_data(self, indices) :
      """Get the input and output data for the model.

      Args:
          indices (np.ndarray): The list of indices to slice the dataset.

      Returns:
          tuple[np.ndarray, np.ndarray]: Input and output data for the model.
      """
      X = self.encoded_images[indices]
      y = self.promp_weights[indices]

      return X, y

  def train(self):

      model = self.load_model()
      model.summary(print_fn=print)

      model.compile(optimizer=self.optimizer, loss=self.loss, metrics=self.metrics_train)

      # Load the data.
      print ("Encoded images size: ", self.encoded_images.shape)

      X_train, y_train = self.get_data(self.idx_list_train)
      X_val, y_val = self.get_data(self.idx_list_test)

      # Training.
      history = model.fit(
          X_train, y_train, validation_data=(X_val, y_val),
          epochs=self.EPOCHS, batch_size=self.BATCH_SIZE, shuffle=True, verbose=0
      )

      model.save_weights(self.MODEL_WEIGHTS_PATH)

      # Plot the loss history.
      self.plot_metric(
          [history.history["loss"], history.history["val_loss"]],
          ["train", "val"], ylabel="loss",
          save_path=os.path.join(self.LOSS_DIR, "loss_{}.png".format(self.MODEL_NAME))
      )

      # Save the actual history values.
      np.save(os.path.join(self.LOSS_DIR, 'train_history'), history.history)


  def plot_metric(
      self,
      data,
      legend_labels = None, title = None,
      ylabel = None, xlabel = "epoch",
      yscale = "linear",
      save_path = None, show = False
  ):
      """Plot the history of a metric during a training episode.

      Args:
          data: History of the metric values with shape (n_samples,) or (n_channels, n_samples).
          legend_labels (list[str], optional): Legend labels. Defaults to None.
          title (str, optional): Title of the graph. Defaults to None.
          ylabel (str, optional): Label of the y axis. Defatuls to None.
          xlabel (str, optional): Label of the x axis. Defaults to "epoch".
          yscale (str, optional): The y scale of the plot. Defaults to "linear".
          save_path (str, optional): If specified, the file where the plot image is to be saved. Defaults to None.
          show (bool, optional): If True, displays the plot in a window, halting execution. Defaults to False.
      """

      data = np.array(data)
      epochs = range(1, data.shape[-1]+1)

      fig = plt.figure()
      plt.plot(epochs, np.transpose(data))
      plt.yscale(yscale)
      plt.grid(True, which='both')
      plt.title(title)
      plt.xlabel(xlabel)
      plt.ylabel(ylabel)
      if legend_labels:
          plt.legend(legend_labels)

      if save_path:
          fig.savefig(save_path)
      if show:
          plt.show()
      plt.close(fig)

In [27]:
class PrompTrajMeanMetric(abc.ABC, MeanMetricWrapper):
    """Custom base class for a metric taking ProMP weights as input and computing
    the metric value on the corresponding trajectory.

    The metric is averaged on all batches of each epoch.
    """

    def __init__(self, promp: ProMP, promp_space: str, name: str, **kwargs):
        """

        Args:
            promp (ProMP): A initialized ProMP instance to compute weights and trajectories.
            promp_space (str): The space where ProMP trajectories live. Must be either "joint" or "task".
            name (str): The name of the metric
        """
        # MeanMetricWrapper is used to compute the mean on the batches.
        super().__init__(fn=self.metric_fn, name=name, **kwargs)
        # ProMP variables to compute the trajectory from the weights.
        self.promp = promp
        assert promp_space in ["joint", "task"], "The 'promp_space' parameter must be either 'joint' or 'task'."
        self.promp_space = promp_space

    @abc.abstractmethod
    def metric_fn(self, promp_true, promp_pred):
        """Compute the metric value on a batch of data.

        Args:
            promp_true: Ground truth tensor of the ProMP weights with shape (n_batch, n_dof*n_basis).
            promp_pred: Predicted tensor of the ProMP weights with shape (n_batch, n_dof*n_basis).

        Returns:
            The value of the metric on this batch.
        """
        pass





In [28]:
class RmseJointsPromp(PrompTrajMeanMetric):
    """RMSE of the joint trajectories from a batch of ProMP weights of joint trajectories.

    The value is averaged on all batches of each epoch.
    """

    def __init__(self, promp: ProMP, promp_space: str, name: str = "rmse_joints", **kwargs):
        super().__init__(promp=promp, promp_space=promp_space, name=name, **kwargs)

    def metric_fn(self, promp_true, promp_pred):
        """Compute the RMSE value on a batch of data.

        Args:
            promp_true: Ground truth tensor of the ProMP weights with shape (n_batch, n_dof*n_basis).
            promp_pred: Predicted tensor of the ProMP weights with shape (n_batch, n_dof*n_basis).

        Returns:
            The MSE value on this batch.
        """

        # Compute trajectories from weights.
        traj_true = self.promp.trajectory_from_weights_tf(promp_true)
        traj_pred = self.promp.trajectory_from_weights_tf(promp_pred)
        # Convert to joint space.
        if self.promp_space == "task":
            # Convert from task to joint space.
            raise NotImplementedError("The method to convert from task to joint space has not been implemented yet!")
        elif self.promp_space == "joint":
            # Trajectories are already in joint space.
            pass
        else:
            raise ValueError(f"'{self.promp_space}' is not a valid value for 'promp_space'. It must be either 'joint' or 'task'.")
        # Compute the RMSE on joint trajectories.
        rmse_joints = tf.sqrt(tf.reduce_mean(tf.square(traj_true - traj_pred), axis=1))
        # Average over joints and batch samples.
        rmse_joints = tf.reduce_mean(rmse_joints)

        return rmse_joints


In [29]:
class MseJointsPromp(PrompTrajMeanMetric):
    """MSE of the joint trajectories from a batch of ProMP weights of joint trajectories.

    The value is averaged on all batches of each epoch.
    """

    def __init__(self, promp: ProMP, promp_space: str, name: str = "mse_joints", **kwargs):
        super().__init__(promp=promp, promp_space=promp_space, name=name, **kwargs)

    def metric_fn(self, promp_true, promp_pred):
        """Compute the MSE value on a batch of data.

        Args:
            promp_true: Ground truth tensor of the ProMP weights with shape (n_batch, n_dof*n_basis).
            promp_pred: Predicted tensor of the ProMP weights with shape (n_batch, n_dof*n_basis).

        Returns:
            The MSE value on this batch.
        """

        # Compute trajectories from weights.
        traj_true = self.promp.trajectory_from_weights_tf(promp_true)
        traj_pred = self.promp.trajectory_from_weights_tf(promp_pred)
        # Convert to joint space.
        if self.promp_space == "task":
            # Convert from task to joint space.
            raise NotImplementedError("The method to convert from task to joint space has not been implemented yet!")
        elif self.promp_space == "joint":
            # Trajectories are already in joint space.
            pass
        else:
            raise ValueError(f"'{self.promp_space}' is not a valid value for 'promp_space'. It must be either 'joint' or 'task'.")
        # Compute the RMSE on joint trajectories.
        mse_joints = tf.reduce_mean(tf.square(traj_true - traj_pred), axis=1)
        # Average over joints and batch samples.
        mse_joints = tf.reduce_mean(mse_joints)

        return mse_joints

In [None]:
if __name__ == "__main__":

  exp = RTP_RGBD()
  exp.train()

Model: "RTP"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 encoded_image_input (InputL  [(None, 32, 32, 3)]      0         
 ayer)                                                           
                                                                 
 conv2d_4 (Conv2D)           (None, 32, 32, 32)        896       
                                                                 
 max_pooling2d_3 (MaxPooling  (None, 16, 16, 32)       0         
 2D)                                                             
                                                                 
 dropout_3 (Dropout)         (None, 16, 16, 32)        0         
                                                                 
 conv2d_5 (Conv2D)           (None, 16, 16, 16)        4624      
                                                                 
 max_pooling2d_4 (MaxPooling  (None, 8, 8, 16)         0       