In [None]:
%cd ../
%load_ext autoreload

%autoreload 2

In [None]:
import os
import random
import textwrap as tw
import numpy as np
import pandas as pd
pd.options.mode.chained_assignment = None
import matplotlib.pyplot as plt
import seaborn as sns
import umap
import cv2
from numba import jit
from ast import literal_eval

from emv.features.pose import load_poses 
from emv.features.pose_utils import draw_pose, CONNECTIONS, KEYPOINTS_NAMES, ANGLES_ASSOCIATIONS
from emv.features.pose_utils import compute_hips_angles, normalize_angles

# Clustering
from sklearn.neighbors import NearestNeighbors
from sklearn.cluster import DBSCAN, KMeans
from hdbscan import HDBSCAN

# DR
from umap import UMAP
from umap.umap_ import nearest_neighbors
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
from trimap import TRIMAP

# Metrics
from sklearn.metrics import pairwise_distances
from coranking import coranking_matrix
from coranking.metrics import trustworthiness, continuity, LCMC
import mantel

In [None]:
local_poses_path = "data/sample_pose_df.csv"
pose_df = load_poses(local_poses_path, filter_poses={})

pose_df = pose_df[pose_df.keypoints.map(lambda x: x[7][2] > 0.5 and x[8][2] > 0.5)]

# Computing features

In [None]:
pose_df["hips_angles"] = pose_df.keypoints.map(lambda x: compute_hips_angles(x)[0])
pose_df["hips_angles"] = pose_df["hips_angles"].map(lambda x: normalize_angles(x))

In [None]:
keypoints_names = [k for k in KEYPOINTS_NAMES if k != "left_hip" and k != "right_hip"]

hips_angles = pd.DataFrame(pose_df["hips_angles"].to_list(), columns = keypoints_names)
hips_angles_means = hips_angles.mean()
hips_angles_std = hips_angles.std()

plt.figure(figsize=(15, 5))
hips_angles.boxplot()
plt.title("Hips angles")
plt.show()

In [None]:
angles = pd.DataFrame(pose_df.angle_vec.tolist(), columns = ANGLES_ASSOCIATIONS.keys())

default_angles = []
for angle in ANGLES_ASSOCIATIONS.keys():
    non_missing_angles = angles[angles[angle] != 0][angle]
    default_angles.append(non_missing_angles.mean())

random_size = 0.0001
pose_df["angle_vec"] = pose_df.angle_vec.map(lambda x: [a if a != 0 else default_angles[i] + random.random() * random_size for i,a in enumerate(x)])

# Embedding

In [None]:
sport = "Weightlifting"
sport_poses = pose_df[pose_df.sport == sport]
print(f"Testing with {len(sport_poses)} poses from {sport}.")

In [None]:
def compute_embeddings(features, reducer, params):
    embeddings = reducer(**params).fit_transform(features)
    pairwise_dist = pairwise_distances(embeddings, metric = "euclidean")
    
    return {
        "reducer": reducer,
        "reducer_params": params,
        "embeddings": embeddings,
        "pairwise_dist": pairwise_dist
    }

In [None]:
def compute_umap_embeddings(features, n_neighbors, min_dist = 0.01, metric = "cosine"):
    knn = nearest_neighbors(features, 
                            n_neighbors=np.max(n_neighbors), 
                            metric=metric,
                            metric_kwds={},
                            angular=False,
                            random_state=None)
    umap_embeddings = []
    for n in n_neighbors:
        embeddings_results = compute_embeddings(features, UMAP, {"n_neighbors": n, "min_dist": min_dist, "metric": metric, "precomputed_knn": knn})
        umap_embeddings.append(embeddings_results)
        
    return umap_embeddings

In [None]:
human_angles = np.array(sport_poses["angle_vec"].tolist())

# UMAP embeddings
n_neighbors = [50, 100, 500]
human_angles_embeddings = compute_umap_embeddings(features = human_angles, n_neighbors = n_neighbors)

# Other embeddings
human_angles_embeddings.append(compute_embeddings(features = human_angles, reducer = PCA, params = {"n_components": 2}))

perps = [5, 10, 50, 100]
for perp in perps:
    human_angles_embeddings.append(compute_embeddings(features = human_angles, reducer = TSNE, params = {"n_components": 2, "metric": "cosine", "perplexity": perp}))
    
# TRIMAP embeddings
n_inliers_values = [10, 20, 50] # Ratio of 2:1:1 for n_inliers:n_outliers:n_random (as recommended in the paper)
for n in n_inliers_values:
    m = int(0.5 * n)
    human_angles_embeddings.append(compute_embeddings(features = human_angles, reducer = TRIMAP, params = {"n_inliers": n, "n_outliers": m, "n_random": m, "distance": "cosine"}))

In [None]:
def format_params(params):
    return ", ".join([f"{k}={v}" for k,v in params.items() if k != "precomputed_knn"])

def plot_embeddings(embeddings_results, fig_title, d = 4):
    n_plots = len(embeddings_results)
    n_cols = 4
    n_rows = int(np.ceil(n_plots / n_cols))
    
    fig, axs = plt.subplots(nrows=n_rows, ncols=n_cols, figsize=(n_cols * d, n_rows * d))
    axs = axs.flatten()
    for i, result in enumerate(embeddings_results):
        coords = result["embeddings"]
        reducer = result["reducer"]
        params = result["reducer_params"]
        axs[i].scatter(coords[:,0], coords[:,1], s=0.1)
        axs[i].set_xticks([])
        axs[i].set_yticks([])
        title = f"{reducer.__name__} - params: {format_params(params)}"
        axs[i].set_title(tw.fill(title, width = 40), fontsize=10)
    [axs[i].axis("off") for i in range(n_plots, n_rows * n_cols)]
    plt.suptitle(fig_title)
    plt.tight_layout()
    plt.show()

In [None]:
plot_embeddings(human_angles_embeddings, fig_title = "Human angles embeddings")

### PCA case

In [None]:
pca_model = PCA(n_components = 2)
pca_model.fit(human_angles)

pca_embeddings = pca_model.transform(human_angles)
pca_components = pd.DataFrame(pca_model.components_.T, columns = ["PC1", "PC2"], index = ANGLES_ASSOCIATIONS.keys())
pca_model.explained_variance_ratio_

In [None]:
pca_components["PC1"].abs().sort_values(ascending = False)

In [None]:
pca_components["PC2"].abs().sort_values(ascending = False)

In [None]:
plt.scatter(pca_embeddings[:,0], pca_embeddings[:,1], s=0.1)
plt.xlabel("PC1")
plt.ylabel("PC2")
plt.title("PCA embeddings")
plt.show()

## Evaluation

### Co-ranking Metrics: Trustworthiness and Continuity

References:
* https://towardsdatascience.com/on-the-validating-umap-embeddings-2c8907588175
* https://github.com/MoritzM00/drcomp/tree/main

In [None]:
ks = [10, 50, 100, 500, 1000]
for result in human_angles_embeddings:
    Q = coranking_matrix(human_angles, result["embeddings"])
    t_values = []
    c_values = []
    for k in ks:
        t_values.append(trustworthiness(Q, min_k = k, max_k = k + 1)[0])
        c_values.append(continuity(Q, min_k = k, max_k = k + 1)[0])
    result["trustworthiness"] = t_values
    result["continuity"] = c_values

In [None]:
linestyles = {"PCA": "-.", "TSNE": "--", "UMAP": ":", "TRIMAP": "-"}
plt.figure(figsize=(10, 5)) 

for i, result in enumerate(human_angles_embeddings):
    plt.plot(ks, result["trustworthiness"], 
             label = f"{result['reducer'].__name__} - {format_params(result['reducer_params'])}", 
             marker = "x", 
             linestyle = linestyles[result["reducer"].__name__])

plt.xlabel("k")
plt.ylabel("Trustworthiness")
plt.title("Trustworthiness of the different embeddings (features: human angles)")
plt.legend(loc = [1.01, 0.2], fontsize = 10)
plt.show()

In [None]:
plt.figure(figsize=(10, 5)) 

for i, result in enumerate(human_angles_embeddings):
    plt.plot(ks, result["continuity"], 
             label = f"{result['reducer'].__name__} - {format_params(result['reducer_params'])}", 
             marker = "x", 
             linestyle = linestyles[result["reducer"].__name__])

plt.xlabel("k")
plt.ylabel("Continuity")
plt.title("Continuity of the different embeddings (features: human angles)")
plt.legend(loc = [1.01, 0.2], fontsize = 10)
plt.show()

### Random Triplet Accuracy

In [None]:
def get_triplets_random(n_features, n_triplets = 1000):
    triplets = [np.random.randint(0, n_features, 3) for _ in range(n_triplets)]
    return triplets
 
def get_triplets(knn, sampling, n_triplets = 1000, n_neighbors = 10):
    initial_points = np.random.randint(0, knn.shape[0], n_triplets)
    triplets = []
    for i in initial_points:
        if sampling == "local": # Sample both j and k from the neighborhood of i
            j, k = np.random.choice(knn[i,1:n_neighbors], 2, replace = False)            
        elif sampling == "mixed": # Sample j from the neighborhood of i and k from outside the neighborhood of i
            j = np.random.choice(knn[i,1:n_neighbors])
            k = np.random.choice(knn[i,n_neighbors:]) 
        elif sampling == "global": # Sample both j and k from outside the neighborhood of i
            j, k = np.random.choice(knn[i,n_neighbors:], 2, replace = False)            
        else:
            raise ValueError("Invalid sampling method. Choose between 'local', 'mixed' or 'global'.")

        triplets.append((i,j,k))
    return triplets

def compute_relative_distances(original_d, embeddings_d, triplets):
    relative_d_original = [np.sign(original_d[i,j] - original_d[i,k]) for i,j,k in triplets]
    relative_d_embedded = [np.sign(embeddings_d[i,j] - embeddings_d[i,k]) for i,j,k in triplets]

    return np.array(relative_d_original), np.array(relative_d_embedded)

In [None]:
def random_triplet_accuracy(data_high_dim, embeddings_result, sampling, n_triplets = 1000, n_repetitions = 10, size_neighborhood = 10):
    original_d = pairwise_distances(data_high_dim, metric = "euclidean")
    dists,knn = NearestNeighbors(n_neighbors=len(data_high_dim) - 1).fit(data_high_dim).kneighbors()
    
    accs = []
    for _ in range(n_repetitions):
        triplets = get_triplets(knn, n_triplets = n_triplets, sampling = sampling, n_neighbors = size_neighborhood)
        relative_d_original, relative_d_embedded = compute_relative_distances(original_d, embeddings_result["pairwise_dist"], triplets)
        acc = np.mean(relative_d_original == relative_d_embedded)
        accs.append(acc)

    mean_acc = np.mean(accs)
    std_acc = np.std(accs)
    
    return mean_acc, std_acc

In [None]:
original_d = pairwise_distances(human_angles, metric="euclidean")

for result in human_angles_embeddings:
    result["triplet_accuracy_local"] = random_triplet_accuracy(human_angles, result, sampling = "local", n_repetitions=100)
    result["triplet_accuracy_mixed"] = random_triplet_accuracy(human_angles, result, sampling = "mixed", n_repetitions=100)
    result["triplet_accuracy_global"] = random_triplet_accuracy(human_angles, result, sampling = "global", n_repetitions=100)

In [None]:
labels = [tw.fill(f"{result['reducer'].__name__} - {format_params(result['reducer_params'])}", width = 20) for result in human_angles_embeddings]

fig, axs = plt.subplots(3, 1, figsize=(20, 15))
for i, sampling in enumerate(["local", "mixed", "global"]):
    for j, result in enumerate(human_angles_embeddings):
        acc, std = result[f"triplet_accuracy_{sampling}"]
        axs[i].errorbar(j, acc, yerr = std, fmt = "o", color = "black")
    axs[i].set_ylabel("Accuracy")
    axs[i].set_title(f"Random triplet accuracy ({sampling} sampling)")
    axs[i].set_xticks(range(len(human_angles_embeddings)), labels, rotation=0)
plt.tight_layout()
plt.show()

### Density Preservation

In [None]:
def local_densities(data, ks, metric = "euclidean"):
    max_k = np.max(ks)
    knn = NearestNeighbors(n_neighbors=max_k+1, metric=metric).fit(data)
    mean_local_densities_at_k = []
    for k in ks:
        distances, _ = knn.kneighbors(data, n_neighbors=k+1)
        avg_distances = np.mean(distances[:, 1:], axis=1)
        mean_local_densities_at_k.append(np.mean(1 / avg_distances))
    return mean_local_densities_at_k

def density_preservation(data_high_dim, data_low_dim, ks, metric = "euclidean"):
    densities_high_dim = local_densities(data_high_dim, ks, metric)
    densities_low_dim = local_densities(data_low_dim, ks, metric)
    
    mean_relative_densities = [np.mean(d_low / d_high) for d_low, d_high in zip(densities_low_dim, densities_high_dim)]
    std_relative_densities = [np.std(d_low / d_high) for d_low, d_high in zip(densities_low_dim, densities_high_dim)]
    return densities_low_dim, mean_relative_densities, std_relative_densities

In [None]:
ks = [10, 50, 100, 500, 1000]
for result in human_angles_embeddings:
    densities_low_dim, mean_relative_densities, std_relative_densities = density_preservation(human_angles, result["embeddings"], ks)
    result["local_densities"] = densities_low_dim
    result["densities_preservation"] = mean_relative_densities, std_relative_densities

In [None]:
random_points_2D = np.random.uniform(-1, 1, (len(human_angles), 2))
random_points_densities = local_densities(random_points_2D, ks)

In [None]:
plt.figure(figsize=(10, 5))
for i, result in enumerate(human_angles_embeddings):
    if result["reducer"] == PCA:
        continue
    plt.plot(ks, result["local_densities"], marker="x", label=f"{result['reducer'].__name__} - {format_params(result['reducer_params'])}", linestyle = linestyles[result["reducer"].__name__])
plt.plot(ks, random_points_densities, marker="x", label="Random points in 2D", linestyle = linestyles["PCA"])
plt.legend(loc = [1.01, 0.2], fontsize = 10)
plt.title("Mean local density of the embedded space")
plt.ylabel("Mean local density at k")
plt.xlabel("k")
plt.xscale("log")
plt.show()

### Pearson Correlation Coefficient (PCC)

In [None]:
from scipy.spatial.distance import pdist

def compute_pcc(data_high_dim, data_low_dim, n_clusters = 100, n_sample = 10000, n_repetitions = 100):
    pccs = []
    if n_sample > len(data_high_dim):
        n_sample = len(data_high_dim)
    for _ in range(n_repetitions):
        sample_idx = np.random.choice(len(data_high_dim), n_sample, replace=False)
        kmeans_high_dim = KMeans(n_clusters=n_clusters).fit(data_high_dim[sample_idx])
        kmeans_low_dim = KMeans(n_clusters=n_clusters).fit(data_low_dim[sample_idx])
        
        clusters_centers_high_dim = kmeans_high_dim.cluster_centers_
        clusters_centers_low_dim = kmeans_low_dim.cluster_centers_
        
        clusters_d_high = pdist(clusters_centers_high_dim, metric="euclidean")
        clusters_d_low = pdist(clusters_centers_low_dim, metric="euclidean")
        
        pcc = mantel.test(clusters_d_high, clusters_d_low, perms=1000, method="pearson")
        pccs.append(pcc.r)
    return np.mean(pccs), np.std(pccs)

In [None]:
from sklearn import datasets

S_points, _ = datasets.make_s_curve(n_samples=1000, noise=0.1, random_state=42)

pca = PCA(n_components=2)
S_pca = pca.fit_transform(S_points)

print(compute_pcc(S_points, S_pca, n_clusters=100, n_sample=1000, n_repetitions=100))

umap_model = UMAP()
S_umap = umap_model.fit_transform(S_points)

print(compute_pcc(S_points, S_umap, n_clusters=100, n_sample=1000, n_repetitions=100))

In [None]:
for result in human_angles_embeddings:
    result["pcc"] = compute_pcc(human_angles, result["embeddings"])

In [None]:
plt.figure(figsize=(20, 5))
for i,result in enumerate(human_angles_embeddings):
    plt.errorbar(i, result["pcc"][0], yerr=result["pcc"][1], fmt="x", color = "black")
plt.xticks(range(len(human_angles_embeddings)), [tw.fill(f"{result['reducer'].__name__} - {format_params(result['reducer_params'])}", width = 20) for result in human_angles_embeddings], rotation=0)
plt.ylabel("PCC")
plt.title("Pearson Correlation Coefficient (PCC) between the clusters in the high and low dimensional spaces")
plt.show()

### Global Score (GS)

In [None]:
def global_score(X, Y):
    """
    From https://github.com/eamid/trimap/blob/master/trimap/trimap_.py
    X: high-dimensional data
    Y: low-dimensional data
    """

    def global_loss_(X, Y):
        X = X - np.mean(X, axis=0)
        Y = Y - np.mean(Y, axis=0)
        A = X.T @ (Y @ np.linalg.inv(Y.T @ Y))
        return np.mean(np.power(X.T - A @ Y.T, 2))

    n_dims = Y.shape[1]
    Y_pca = PCA(n_components=n_dims).fit_transform(X)
    gs_pca = global_loss_(X, Y_pca)
    gs_emb = global_loss_(X, Y)
    return np.exp(-(gs_emb - gs_pca) / gs_pca)

In [None]:
for result in human_angles_embeddings:
    result["global_score"] = global_score(human_angles, result["embeddings"])

In [None]:
plt.figure(figsize=(20, 5))
for i,result in enumerate(human_angles_embeddings):
    plt.bar(i, result["global_score"], color = "black")
plt.xticks(range(len(human_angles_embeddings)), [tw.fill(f"{result['reducer'].__name__} - {format_params(result['reducer_params'])}", width = 20) for result in human_angles_embeddings], rotation=0)
plt.ylabel("GS")
plt.title("Global Score (GS) of the embeddings")
plt.show()

# Clustering

In [None]:
for result in human_angles_embeddings:
    hdscan = HDBSCAN(min_cluster_size=10, min_samples=10, metric="euclidean").fit(result["embeddings"])
    result["clusters_labels"] = hdscan.labels_
    result["clusters_probs"] = hdscan.probabilities_

In [None]:
def visualize_clusters(embeddings_results, fig_title, d = 4):
    n_plots = len(embeddings_results)
    n_cols = 4
    n_rows = int(np.ceil(n_plots / n_cols))
    
    fig, axs = plt.subplots(nrows=n_rows, ncols=n_cols, figsize=(n_cols * d, n_rows * d))
    axs = axs.flatten()
    for i, result in enumerate(embeddings_results):
        coords = result["embeddings"]
        reducer = result["reducer"]
        params = result["reducer_params"]
        labels = result["clusters_labels"]
        probs = result["clusters_probs"]
        
        for cluster in np.unique(labels):
            cluster_mask = labels == cluster
            cluster_coords = coords[cluster_mask]
            cluster_probs = np.clip(probs[cluster_mask], 0.1, 1)
            alpha = 0.5 if cluster != -1 else 0.1
            axs[i].scatter(cluster_coords[:,0], cluster_coords[:,1], s = cluster_probs, alpha = alpha, label=f"Cluster {cluster}")

        axs[i].set_xticks([])
        axs[i].set_yticks([])
        title = f"{reducer.__name__} - params: {format_params(params)}"
        axs[i].set_title(tw.fill(title, width = 40), fontsize=10)
    [axs[i].axis("off") for i in range(n_plots, n_rows * n_cols)]
    plt.suptitle(fig_title)
    plt.tight_layout()
    plt.show()

In [None]:
visualize_clusters(human_angles_embeddings, fig_title = "HDSCAN clustering")

In [None]:
def compute_clusters_distances(data_high_dim, cluster_labels):

    intra_cluster_d = []
    for cluster_id in np.unique(cluster_labels):
        cluster_points = data_high_dim[cluster_labels == cluster_id]
        if len(cluster_points) > 1:  # Ensure there's more than one point in the cluster
            cluster_pair_d = pairwise_distances(cluster_points, metric = "cosine")
            cluster_pair_d = np.triu(cluster_pair_d, k=1)
            mean_d = np.mean(cluster_pair_d)
            std_d = np.std(cluster_pair_d)
            intra_cluster_d.append((mean_d, std_d))

    between_cluster_d = []
    for i, cluster_id1 in enumerate(np.unique(cluster_labels)):
        for cluster_id2 in np.unique(cluster_labels):
            if cluster_id1 != cluster_id2:
                cluster1_points = data_high_dim[cluster_labels == cluster_id1]
                cluster2_points = data_high_dim[cluster_labels == cluster_id2]
                ds = pairwise_distances(cluster1_points, cluster2_points, metric = "cosine")
                mean_d = np.mean(ds)
                std_d = np.std(ds)
                between_cluster_d.append((mean_d, std_d))

    return intra_cluster_d, between_cluster_d

In [None]:
for result in human_angles_embeddings:
    intra_cluster_d, between_cluster_d = compute_clusters_distances(human_angles, result["clusters_labels"])
    result["intra_cluster_d"] = intra_cluster_d
    result["between_cluster_d"] = between_cluster_d

In [None]:
plt.figure(figsize=(20, 5))
plt.boxplot([[d[0] for d in result["intra_cluster_d"]] for result in human_angles_embeddings], positions = range(len(human_angles_embeddings)), showfliers=False)
plt.xticks(range(len(human_angles_embeddings)), [tw.fill(f"{result['reducer'].__name__} - {format_params(result['reducer_params'])}", width = 20) for result in human_angles_embeddings], rotation=0)
plt.ylabel("Mean intra-cluster distance")
plt.title("Intra-cluster distances")
plt.show()

In [None]:
plt.figure(figsize=(20, 5))
plt.boxplot([[d[0] for d in result["between_cluster_d"]] for result in human_angles_embeddings], positions = range(len(human_angles_embeddings)), showfliers=False)
plt.xticks(range(len(human_angles_embeddings)), [tw.fill(f"{result['reducer'].__name__} - {format_params(result['reducer_params'])}", width = 20) for result in human_angles_embeddings], rotation=0)
plt.ylabel("Mean between-cluster distance")
plt.title("Between-cluster distances")
plt.show()

In [None]:
from sklearn.metrics import silhouette_score, davies_bouldin_score, calinski_harabasz_score

metric = "euclidean"
for result in human_angles_embeddings:
    labels = result["clusters_labels"]
    if len(np.unique(labels)) == 1:
        result["silhouette_score"] = 0
        result["davies_bouldin_score"] = 0
        result["calinski_harabasz_score"] = 0
    else:
        result["silhouette_score"] = silhouette_score(result["embeddings"], labels, metric = metric)
        result["davies_bouldin_score"] = davies_bouldin_score(result["embeddings"], labels)
        result["calinski_harabasz_score"] = calinski_harabasz_score(result["embeddings"], labels)

In [None]:
fig, axs = plt.subplots(3, 1, figsize=(20, 15))
for i, score in enumerate(["silhouette_score", "davies_bouldin_score", "calinski_harabasz_score"]):
    scores = [result[score] for result in human_angles_embeddings]
    axs[i].bar(range(len(human_angles_embeddings)), scores)
    axs[i].set_xticks(range(len(human_angles_embeddings)))
    axs[i].set_xticklabels([tw.fill(f"{result['reducer'].__name__} - {format_params(result['reducer_params'])}", width = 20) for result in human_angles_embeddings], rotation=0)
    axs[i].set_title(score)
plt.tight_layout()
plt.show()