# Clustering Stability Test

In this notebook, we test the stability of clustering result for each clustering algorithms we used. This is done by boostrapping the data and calculate the adjusted rand index between the original clustering result and the boostrapped clustering result.

# Boostrap Dataset

In [1]:
import pandas as pd

survey_df = pd.read_csv("../data/survey.csv")
uids = set(survey_df["uid"])

data = pd.read_csv("../data/trial.csv")
data = data[data["uid"].isin(uids)]

# Normalize the position, the canvas size is 1000 x 600
data["x"] = data["x"] / 1000
data["y"] = data["y"] / 600

In [2]:
from collections import defaultdict
import random
from sklearn.utils import resample
import numpy as np


random.seed(42)


def boostrap_distance_matrix() -> dict[tuple[int, int], list[float]]:
    distances = defaultdict(list)
    bootstrapped_uids = resample(list(uids))

    for uid in bootstrapped_uids:
        # Get data of this participant
        uid_data = data[data["uid"] == uid]

        for trial in range(1, 7):
            # Get data of this trial
            trial_data = uid_data[uid_data["trial_id"] == trial]
            records = trial_data.to_dict("records")

            for i in range(len(records)):
                for j in range(i + 1, len(records)):
                    # Get object pair tuple, make sure the smaller id is always first
                    object_pair = (records[i]["object_id"], records[j]["object_id"])

                    # Skip if the object pair is the same
                    # This might happen when the participant submit the same trial multiple times
                    # and changed the position of some objects
                    if object_pair[0] == object_pair[1]:
                        continue

                    if object_pair[0] > object_pair[1]:
                        object_pair = (object_pair[1], object_pair[0])

                    # Calculate the distance between object i and j
                    p1 = np.array([records[i]["x"], records[i]["y"]])
                    p2 = np.array([records[j]["x"], records[j]["y"]])
                    distance = np.linalg.norm(p1 - p2)

                    # Add the distance to the list
                    distances[object_pair].append(distance)

    # Get the average distance of each object pair
    average_distances = {
        object_pair: np.mean(distances[object_pair]) for object_pair in distances
    }

    distance_matrix = np.zeros((30, 30))
    for (i, j), d in average_distances.items():
        distance_matrix[i - 1, j - 1] = d
        distance_matrix[j - 1, i - 1] = d

    return distance_matrix

In [3]:
from tqdm.autonotebook import tqdm

N_SIMULATIONS = 1000

all_boostrapped_distance_matrices = [
    boostrap_distance_matrix() for _ in tqdm(range(N_SIMULATIONS))
]

  from tqdm.autonotebook import tqdm


  0%|          | 0/1000 [00:00<?, ?it/s]

# Convert to MDS embeddings

In [4]:
from sklearn.manifold import MDS

BEST_K = 3


def get_mds_embedding(distance_matrix):
    mds = MDS(n_components=BEST_K, dissimilarity="precomputed", random_state=42)
    return mds.fit_transform(distance_matrix)

In [5]:
all_mds_embeddings = [
    get_mds_embedding(matrix) for matrix in all_boostrapped_distance_matrices
]

In [6]:
all_mds_distances_by_object_pair = []
all_mds_distance_matrix = []

for n in range(N_SIMULATIONS):
    mds_distance_matrix = np.zeros((30, 30))
    mds_distance_by_object_pair = {}

    object_embeddings = all_mds_embeddings[n]
    for i in range(30):
        for j in range(30):
            p1 = object_embeddings[i]
            p2 = object_embeddings[j]
            mds_distance_matrix[i, j] = np.linalg.norm(p1 - p2)

            if i < j:
                mds_distance_by_object_pair[(i + 1, j + 1)] = mds_distance_matrix[i, j]

    all_mds_distances_by_object_pair.append(mds_distance_by_object_pair)
    all_mds_distance_matrix.append(mds_distance_matrix)

# Stability Measure

In [7]:
from sklearn.metrics import adjusted_rand_score
from itertools import combinations


def clustering_stability(clustering_results):
    pairwise_ari = []
    for c1, c2 in combinations(clustering_results, 2):
        pairwise_ari.append(adjusted_rand_score(c1, c2))

    print(f"Averaged ARI: {np.mean(pairwise_ari)}")

# KMeans Clustering

In [8]:
from sklearn.cluster import KMeans
from kneed import KneeLocator

BEST_N_CLUSTERS = 7


def get_kmeans_clusters(object_embeddings):
    kmeans = KMeans(n_clusters=BEST_N_CLUSTERS, random_state=42)
    kmeans.fit(object_embeddings)
    return list(kmeans.labels_)

In [9]:
kmeans_clustering_results = [
    get_kmeans_clusters(object_embeddings) for object_embeddings in all_mds_embeddings
]

In [10]:
clustering_stability(kmeans_clustering_results)

Averaged ARI: 0.5484932022228171


# Hierarchical clustering

In [14]:
from scipy.cluster.hierarchy import linkage, fcluster

THRESHOLD = 1.1


def get_linkage_clusters(distance_matrix):
    linkage_matrix = linkage(distance_matrix, method="ward", optimal_ordering=True)
    linkage_clusters = fcluster(linkage_matrix, THRESHOLD, criterion="distance")
    linkage_clusters = [
        l - 1 for l in linkage_clusters
    ]  # offset by 1 to be consistent with other clustering results

    linkage_categories = [set() for _ in range(max(linkage_clusters) + 1)]
    for i, label in enumerate(linkage_clusters):
        linkage_categories[label].add(i + 1)

    return linkage_clusters

In [15]:
linkage_clustering_results = [
    get_linkage_clusters(distance_matrix) for distance_matrix in all_mds_distance_matrix
]

  linkage_matrix = linkage(distance_matrix, method="ward", optimal_ordering=True)


In [16]:
clustering_stability(linkage_clustering_results)

Averaged ARI: 0.573709592440113
