# Results of clustering

In [1]:
# Load packages
import imageio
import matplotlib
import time
import warnings

import matplotlib.pyplot as plt
import matplotlib.image as maimg
import numpy as np
import pandas as pd
import seaborn as sns

from joblib import parallel_backend
from sklearn import cluster, metrics, mixture, model_selection, preprocessing

from FDApy.preprocessing.dim_reduction.fpca import MFPCA, UFPCA
from FDApy.representation.functional_data import DenseFunctionalData
from FDApy.representation.functional_data import IrregularFunctionalData
from FDApy.representation.functional_data import MultivariateFunctionalData
from FDApy.visualization.plot import plot

from mpl_toolkits.axes_grid1.inset_locator import inset_axes

RECORDING_NUMBER = '00'
VAR = ['x', 'y', 'xVelocity', 'yVelocity', 'xAcceleration', 'yAcceleration']

COLORS = ["#377eb8", "#ff7f00", "#4daf4a",
          "#f781bf", "#a65628", "#984ea3",
          "#999999", "#e41a1c", "#dede00"]
custom_palette = sns.set_palette(sns.color_palette(COLORS))

In [2]:
# Function to read {}_background.png
def read_background(arguments):
    """
    This method reads the background file from rounD data.
    :param arguments: the parsed arguments for the program containing the input path for the background file.
    :return: the background image.
    """
    return imageio.imread(arguments["input_img"])

In [3]:
argv = {
    "input_path": f'./data/{RECORDING_NUMBER}_tracks.csv',
    "input_static_path": f'./data/{RECORDING_NUMBER}_tracksMeta.csv',
    "input_meta_path": f'./data/{RECORDING_NUMBER}_recordingMeta.csv',
    "input_img": f'./data/{RECORDING_NUMBER}_background.png'
}

In [4]:
# Load data
tracks = pd.read_pickle(f'./data/tracks_sub.pkl')
info = pd.read_pickle(f'./data/info.pkl')
meta = pd.read_pickle(f'./data/meta.pkl')
data_fd = pd.read_pickle(f'./data/tracks_smooth_fd.pkl')
tree = pd.read_pickle(f'./data/tree.pkl')
results = pd.read_pickle(f'./data/fcubt_results.pkl')
background = read_background(argv)

In [5]:
idx_labels = {}
for track, label in zip(tracks, results):
    idx_labels[track['trackId']] = label

In [7]:
for idx in np.unique(results):
    os.mkdir(f'./figures/cluster-{idx}')

## Plot functions

In [6]:
# Utility functions
def get_id(tracks, idx):
    """
    This method returns the information about an particular id.
    :param tracks: a list contaning all tracks as dictionary
    :param idx: the index to return
    :return: a dictionary containing information for a particular idx"""
    l = [d for d in tracks if d['trackId'] == idx]
    return l[0]

def norm(*args):
    """
    Compute the norm of vectors
    :param args: vectors
    :return: norm
    """
    return np.sqrt(np.sum([i**2 for i in args], axis=0))

def cart2pol(cart):
    """
    Transform cartesian to polar coordinates.
    :param cart: Nx2 ndarray
    :return: 2 Nx1 ndarrays
    """
    if cart.shape == (2,):
        cart = np.array([cart])

    x = cart[:, 0]
    y = cart[:, 1]

    th = np.arctan2(y, x)
    r = np.sqrt(np.power(x, 2) + np.power(y, 2))
    return th, r

def pol2cart(th, r):
    """
    Transform polar to cartesian coordinates.
    :param th: Nx1 ndarray
    :param r: Nx1 ndarray
    :return: Nx2 ndarray
    """

    x = np.multiply(r, np.cos(th))
    y = np.multiply(r, np.sin(th))

    cart = np.array([x, y]).transpose()
    return cart

def calculate_rotated_bboxes(center_points_x, center_points_y, length, width, rotation=0):
    """
    Calculate bounding box vertices from centroid, width and length.
    :param centroid: center point of bbox
    :param length: length of bbox
    :param width: width of bbox
    :param rotation: rotation of main bbox axis (along length)
    :return:
    """

    centroid = np.array([center_points_x, center_points_y]).transpose()

    centroid = np.array(centroid)
    if centroid.shape == (2,):
        centroid = np.array([centroid])

    # Preallocate
    data_length = centroid.shape[0]
    rotated_bbox_vertices = np.empty((data_length, 4, 2))

    # Calculate rotated bounding box vertices
    rotated_bbox_vertices[:, 0, 0] = -length / 2
    rotated_bbox_vertices[:, 0, 1] = -width / 2

    rotated_bbox_vertices[:, 1, 0] = length / 2
    rotated_bbox_vertices[:, 1, 1] = -width / 2

    rotated_bbox_vertices[:, 2, 0] = length / 2
    rotated_bbox_vertices[:, 2, 1] = width / 2

    rotated_bbox_vertices[:, 3, 0] = -length / 2
    rotated_bbox_vertices[:, 3, 1] = width / 2

    for i in range(4):
        th, r = cart2pol(rotated_bbox_vertices[:, i, :])
        rotated_bbox_vertices[:, i, :] = pol2cart(th + rotation, r).squeeze()
        rotated_bbox_vertices[:, i, :] = rotated_bbox_vertices[:, i, :] + centroid

    return rotated_bbox_vertices

def plot_trajectory_vel(tracks, idx, img, meta_tracks, sdf=10, skip_frame=10):
    """
    This methods plots a particular trajectory given an idx.
    :param tracks: a list containing all tracks as dictionary
    :param idx: the index to plot
    :param img: the background image
    :param sdf: scale down factor (default=10)
    :param skip_frame: number of frames to skip for plotting (defualt=10)
    :return: matplotlib figure
    """
    # Set parameters
    ortho_px_to_meter = meta_tracks['orthoPxToMeter']
    scale_down_factor = sdf
    
    track = get_id(tracks, idx)

    # Define a dictionary for plotting
    track_vis = {}
    track_vis["xCenterVis"] = track['xCenter'][::skip_frame] / ortho_px_to_meter / scale_down_factor
    track_vis["yCenterVis"] = -track['yCenter'][::skip_frame] / ortho_px_to_meter / scale_down_factor
    
    # Define the figure
    fig, axs = plt.subplots(1, 1)
    fig.set_size_inches(18, 8)
    
    p = axs.plot(track_vis["xCenterVis"],
                    track_vis["yCenterVis"],
                    color="grey", linewidth=5, alpha=0.75)
    for x, y, vx, vy in zip(track_vis["xCenterVis"], track_vis["yCenterVis"],
                            track['xVelocity'][::skip_frame], -track['yVelocity'][::skip_frame]):
        p = axs.arrow(x, y, 10*vx, 10*vy, color='red', head_width=10, head_length=10)

    axs.set_axis_off()
    axs.imshow(img)    
    fig.tight_layout()
    return fig, axs

def plot_trajectory_acc(tracks, idx, img, meta_tracks, sdf=10, skip_frame=10):
    """
    This methods plots a particular trajectory given an idx.
    :param tracks: a list containing all tracks as dictionary
    :param idx: the index to plot
    :param img: the background image
    :param sdf: scale down factor (default=10)
    :param skip_frame: number of frames to skip for plotting (defualt=10)
    :return: matplotlib figure
    """
    # Set parameters
    ortho_px_to_meter = meta_tracks['orthoPxToMeter']
    scale_down_factor = sdf
    
    track = get_id(tracks, idx)

    # Define a dictionary for plotting
    track_vis = {}
    track_vis["xCenterVis"] = track['xCenter'][::skip_frame] / ortho_px_to_meter / scale_down_factor
    track_vis["yCenterVis"] = -track['yCenter'][::skip_frame] / ortho_px_to_meter / scale_down_factor
    
    # Define the figure
    fig, axs = plt.subplots(1, 1)
    fig.set_size_inches(18, 8)
    
    q = axs.plot(track_vis["xCenterVis"],
                    track_vis["yCenterVis"],
                    color="grey", linewidth=5, alpha=0.75)
    for x, y, ax, ay in zip(track_vis["xCenterVis"], track_vis["yCenterVis"],
                            track["xAcceleration"][::skip_frame], -track["yAcceleration"][::skip_frame]):
        q = axs.arrow(x, y, 10*ax, 10*ay, color='red', head_width=10, head_length=10)
    axs.set_axis_off()
    axs.imshow(img)
    fig.tight_layout()
    return fig, axs

In [11]:
# Plot with velocities
for idx, labels in idx_labels.items():
    _ = plot_trajectory_vel(tracks, idx, background, meta, sdf=10, skip_frame=10)
    plt.savefig(f'./figures/cluster-{labels}/vel-{idx}.pdf', bbox_inches='tight')

  fig, axs = plt.subplots(1, 1)


In [12]:
# Plot with acceleration
for idx, labels in idx_labels.items():
    _ = plot_trajectory_acc(tracks, idx, background, meta, sdf=10, skip_frame=10)
    plt.savefig(f'./figures/cluster-{labels}/acc-{idx}.png', bbox_inches='tight')

In [7]:
labels_idx = {}
for idx in np.arange(23):
    labels_idx[idx] = np.array([track['trackId'] for track, label in zip(tracks, results) if label==idx])

In [8]:
results_labels = {}
for idx in np.arange(23):
    results_labels[idx] = [data for data, label in zip(data_fd.get_obs(), results) if label==idx]

In [9]:
results_labels2 = {}
for idx in np.arange(23):
    temp = results_labels[idx][0]
    for i in results_labels[idx][1:]:
        temp = temp.concatenate(i)
    results_labels2[idx] = temp

In [10]:
def plot_trajectories_vel(tracks, idx, id_cluster, img, meta_tracks, sdf=10, skip_frame=10):
    """
    This methods plots a particular trajectory given an idx.
    :param tracks: a list containing all tracks as dictionary
    :param idx: the indices to plot
    :param img: the background image
    :param sdf: scale down factor (default=10)
    :param skip_frame: number of frames to skip for plotting (defualt=10)
    :return: matplotlib figure
    """
    # Set parameters
    ortho_px_to_meter = meta_tracks['orthoPxToMeter']
    scale_down_factor = sdf
    
    # Define the figure
    fig, axs = plt.subplots(1, 1)
    fig.set_size_inches(18, 8)
    
    j = 0
    for i in idx:
        track = get_id(tracks, i)
        
        # Define a dictionary for plotting
        track_vis = {}
        track_vis["xCenterVis"] = track['xCenter'][::skip_frame] / ortho_px_to_meter / scale_down_factor
        track_vis["yCenterVis"] = -track['yCenter'][::skip_frame] / ortho_px_to_meter / scale_down_factor
        track_vis["xVelocityVis"] = track['xVelocity'][::skip_frame]
        track_vis["yVelocityVis"] = -track['yVelocity'][::skip_frame]
        
        p = axs.plot(track_vis["xCenterVis"],
                     track_vis["yCenterVis"],
                     color="lightcoral", linewidth=5, alpha=0.75)
        for x, y, vx, vy in zip(track_vis["xCenterVis"], track_vis["yCenterVis"],
                                track_vis["xVelocityVis"], track_vis["yVelocityVis"]):
            p = axs.arrow(x, y, 10*vx, 10*vy, color="red", head_width=10, head_length=10)
        j = j + 1

    axs.imshow(img)
    axs.set_xlim(300, 1200)
    axs.axis('off')
    fig.tight_layout()
    fig.patch.set_visible(False)
    return fig, axs

def plot_trajectories_acc(tracks, idx, id_cluster, img, meta_tracks, sdf=10, skip_frame=10):
    """
    This methods plots a particular trajectory given an idx.
    :param tracks: a list containing all tracks as dictionary
    :param idx: the indices to plot
    :param img: the background image
    :param sdf: scale down factor (default=10)
    :param skip_frame: number of frames to skip for plotting (defualt=10)
    :return: matplotlib figure
    """
    # Set parameters
    ortho_px_to_meter = meta_tracks['orthoPxToMeter']
    scale_down_factor = sdf
    
    # Define the figure
    fig, axs = plt.subplots(1, 1)
    fig.set_size_inches(18, 8)
    
    j = 0
    for i in idx:
        track = get_id(tracks, i)

        # Define a dictionary for plotting
        track_vis = {}
        track_vis["xCenterVis"] = track['xCenter'][::skip_frame] / ortho_px_to_meter / scale_down_factor
        track_vis["yCenterVis"] = -track['yCenter'][::skip_frame] / ortho_px_to_meter / scale_down_factor
        track_vis["xAccelerationVis"] = track['xAcceleration'][::skip_frame]
        track_vis["yAccelerationVis"] = -track['yAcceleration'][::skip_frame]
        
        q = axs.plot(track_vis["xCenterVis"],
                     track_vis["yCenterVis"],
                     color="lightcoral", linewidth=5, alpha=0.75)
        for x, y, ax, ay in zip(track_vis["xCenterVis"], track_vis["yCenterVis"],
                                track_vis["xAccelerationVis"], track_vis["yAccelerationVis"]):
            q = axs.arrow(x, y, 10*ax, 10*ay, color="red", head_width=10, head_length=10)
        j = j + 1
    
    axs.imshow(img)
    axs.set_xlim(300, 1200)
    axs.axis('off')
    fig.tight_layout()
    fig.patch.set_visible(False)
    return fig, axs

def plot_trajectories(tracks, idx, id_cluster, img, meta_tracks, sdf=10, skip_frame=10):
    """
    This methods plots a particular trajectory given an idx.
    :param tracks: a list containing all tracks as dictionary
    :param idx: the indices to plot
    :param img: the background image
    :param sdf: scale down factor (default=10)
    :param skip_frame: number of frames to skip for plotting (defualt=10)
    :return: matplotlib figure
    """
    # Set parameters
    ortho_px_to_meter = meta_tracks['orthoPxToMeter']
    scale_down_factor = sdf
    
    # Define the figure
    fig, axs = plt.subplots(1, 1)
    fig.set_size_inches(18, 8)
    
    j = 0
    for i in idx:
        track = get_id(tracks, i)
        
        norm_vel = norm(track['xVelocity'], track['yVelocity'])
        norm_acc = norm(track['xAcceleration'], track['yAcceleration'])
        # Define a dictionary for plotting
        track_vis = {}
        track_vis["xCenterVis"] = track['xCenter'] / ortho_px_to_meter / scale_down_factor
        track_vis["yCenterVis"] = -track['yCenter'] / ortho_px_to_meter / scale_down_factor
        track_vis["xVelocityVis"] = track['xVelocity'][::skip_frame]
        track_vis["yVelocityVis"] = -track['yVelocity'][::skip_frame]
        track_vis["xAccelerationVis"] = track['xAcceleration'][::skip_frame]
        track_vis["yAccelerationVis"] = -track['yAcceleration'][::skip_frame]
        
        q = axs.scatter(track_vis["xCenterVis"],
                        track_vis["yCenterVis"],
                        c=norm_acc, linewidth=5, alpha=0.75)
        axins = inset_axes(axs, width="2.5%", height="25%", loc='upper left')
        axs.figure.colorbar(q, ax=axs, cax=axins, label='Acceleration norm', orientation='vertical', pad=0)
        for x, y, ax, ay in zip(track_vis["xCenterVis"][::skip_frame], track_vis["yCenterVis"][::skip_frame],
                                track_vis["xVelocityVis"], track_vis["yVelocityVis"]):
            q = axs.arrow(x, y, 10*ax, 10*ay, color="red", head_width=10, head_length=10)
        j = j + 1
    
    axs.imshow(img)
    axs.set_xlim(300, 1200)
    axs.axis('off')
    #fig.tight_layout()
    fig.patch.set_visible(False)
    return fig, axs

def plot_trajectories2(tracks, idx, id_cluster, img, meta_tracks, sdf=10, skip_frame=10):
    """
    This methods plots a particular trajectory given an idx.
    :param tracks: a list containing all tracks as dictionary
    :param idx: the indices to plot
    :param img: the background image
    :param sdf: scale down factor (default=10)
    :param skip_frame: number of frames to skip for plotting (defualt=10)
    :return: matplotlib figure
    """
    # Set parameters
    ortho_px_to_meter = meta_tracks['orthoPxToMeter']
    scale_down_factor = sdf
    
    # Define the figure
    fig, axs = plt.subplots(1, 1)
    fig.set_size_inches(18, 8)
    
    j = 0
    for i in idx:
        track = get_id(tracks, i)
        
        norm_vel = norm(track['xVelocity'], track['yVelocity'])
        norm_acc = norm(track['xAcceleration'], track['yAcceleration'])
        # Define a dictionary for plotting
        track_vis = {}
        track_vis["xCenterVis"] = track['xCenter'] / ortho_px_to_meter / scale_down_factor
        track_vis["yCenterVis"] = -track['yCenter'] / ortho_px_to_meter / scale_down_factor
        track_vis["xVelocityVis"] = track['xVelocity'][::skip_frame]
        track_vis["yVelocityVis"] = -track['yVelocity'][::skip_frame]
        track_vis["xAccelerationVis"] = track['xAcceleration'][::skip_frame]
        track_vis["yAccelerationVis"] = -track['yAcceleration'][::skip_frame]
        
        q = axs.scatter(track_vis["xCenterVis"],
                        track_vis["yCenterVis"], c='grey',
                        s=60*norm_acc, alpha=0.25)

        pws = [1, 2, 3]
        for pw in pws:
            plt.scatter([], [], s=(pw**2)*60, c="k", label=str(pw))
            h, l = plt.gca().get_legend_handles_labels()
        plt.legend(h, l, labelspacing=1.2, title="Acceleration norm", borderpad=1, 
                   frameon=True, framealpha=0.6, edgecolor="k", facecolor="w")
        for x, y, ax, ay in zip(track_vis["xCenterVis"][::skip_frame], track_vis["yCenterVis"][::skip_frame],
                                track_vis["xVelocityVis"], track_vis["yVelocityVis"]):
            q = axs.arrow(x, y, 10*ax, 10*ay, color="red", head_width=10, head_length=10)
        j = j + 1
    
    axs.imshow(img)
    axs.set_xlim(300, 1200)
    axs.axis('off')
    #fig.tight_layout()
    fig.patch.set_visible(False)
    return fig, axs

In [14]:
if not os.path.isdir(f'./figures/clusters'):
    os.mkdir(f'./figures/clusters')
    os.mkdir(f'./figures/clusters/vel')
    os.mkdir(f'./figures/clusters/acc')

In [21]:
for idx in np.arange(23):
    _ = plot_trajectories_vel(tracks, labels_idx[idx], idx, background, meta, sdf=10, skip_frame=15)
    plt.savefig(f'./figures/clusters/vel/cluster-{idx}.pdf', bbox_inches='tight')

In [22]:
for idx in np.arange(23):
    _ = plot_trajectories_acc(tracks, labels_idx[idx], idx, background, meta, sdf=10, skip_frame=15)
    plt.savefig(f'./figures/clusters/acc/cluster-{idx}.pdf', bbox_inches='tight', format='pdf')