This is the **6th** Notebook in the clustering pipeline. It shows you how to use a NBclust-like workflow to analyze clustering solutions. NBclust is an R package that evaluates clustering solutions on a handful of clustering metrics to determine the best one.

<u>Note:</u> This is **NOT** a preferred way of evaluating clusters, as it evaluates things like how seperated the clusters are, not their biological relevance or robustness. I am working on a way to pick a solution that is better.

Use <u>***NBclust_env***</u> virtual environment.

In [None]:
# Set this to whatever directory GoodCopy is in

home_dir = "/home/l/lungboy/tadam/Project/"

# Importing Packages and Data

In [None]:
# Importing packages and functions

import numpy as np
import pandas as pd
import umap.umap_ as umap
import matplotlib.pyplot as plt
import sys
import pyckmeans.core.ckmeans as pyckmeans

sys.path.append(home_dir + 'GoodCopy/Functions')

import FunctionsOOPGood as func

In [None]:
# Importing Datasets

data = func.DataSet(empty=True)
data.open_DataSet(home_dir + "GoodCopy/Objects/data_object")

# data_matched = func.MatchedDataSet(empty=True)
# data_matched.open_MatchedDataSet(home_dir + "GoodCopy/Objects/matched_data_saved")

In [None]:
# Importing Labels

agglo_labels = [pd.read_csv(home_dir + "GoodCopy/EnsembleResults/{}_agglo_labels.csv".format(i), index_col= 0) for i in range(2,10)]
all_labels = [pd.read_csv(home_dir + "GoodCopy/EnsembleResults/{}_all_labels.csv".format(i), index_col= 0) for i in range(2,10)]
hdb_labels = [pd.read_csv(home_dir + "GoodCopy/EnsembleResults/{}_hdb_labels.csv".format(i), index_col= 0) for i in range(2,10)]
kmedoids_labels = [pd.read_csv(home_dir + "GoodCopy/EnsembleResults/{}_kmedoids_labels.csv".format(i), index_col= 0) for i in range(2,10)]
snf_hdb_labels = [pd.read_csv(home_dir + "GoodCopy/EnsembleResults/{}_snf_hdb_labels.csv".format(i), index_col= 0) for i in range(2,10)]
snf_spectral_labels =[pd.read_csv(home_dir + "GoodCopy/EnsembleResults/{}_snf_spectral_labels.csv".format(i), index_col= 0) for i in range(2,10)]

labels = agglo_labels + all_labels + hdb_labels + kmedoids_labels + snf_hdb_labels + snf_spectral_labels

In [None]:
# remapping labels, so that they are numbered from 0-n, instead of some where they might be labeled [0,2,3,4] for example/
# This strange numbering would be caused as the cluster ensemble algorithm would find less clusters than the max number.
# This is needed as the following code relies on labels strucutre as [0,1,2,...,n]

def remap_labels(lst):
    # Create a dictionary to store the mapping of old labels to new labels
    label_mapping = {label: new_label for new_label, label in enumerate(set(lst))}
    
    # Create a new list with updated labels
    new_labels = [label_mapping[label] for label in lst]
    
    return new_labels

for i in range(len(labels)):
    labels[i] = remap_labels(np.ravel(labels[i]))

# Average Adjusted Rand Index

Adjusted Rand index to compare how well the clustering solution is representative of all generated solutions.

Here I use the sklearn version of the function. Values closer to 1 are better

In [None]:
from sklearn.metrics import adjusted_rand_score

scores_ari = []

for i in range(len(labels)):
    
    res = 0
    
    for j in range(len(labels)):
       
        if i != j:
            
            res += adjusted_rand_score(labels[i],labels[j])
    
    res /= len(labels) - 1
    
    scores_ari.append(res)

In [None]:
lol = [2,3,4,5,6,7,8,9]

plt.plot(lol,scores_ari[:8], label = "Agglomerative")
plt.plot(lol,scores_ari[8:16], label = "All")
plt.plot(lol,scores_ari[16:24], label = "HDB")
plt.plot(lol,scores_ari[24:32], label = "Kmedoid")
plt.plot(lol,scores_ari[32:40], label = "SNF HDB")
plt.plot(lol,scores_ari[40:48], label = "SNF Spectral")
plt.legend()

# Average Normalized Mutual Information

Normalized Mutual Information to compare how well the clustering solution is representative of all generated solutions.

Here I use the sklearn version of the function. Values closer to 1 are better.

In [None]:
from sklearn.metrics import normalized_mutual_info_score

scores_nmi = []

for i in range(len(labels)):
    
    res = 0
    
    for j in range(len(labels)):
       
        if i != j:
            
            res += normalized_mutual_info_score(labels[i],labels[j])
    
    res /= len(labels) - 1
    
    scores_nmi.append(res)

In [None]:
lol = [2,3,4,5,6,7,8,9]

plt.plot(lol,scores_nmi[:8], label = "Agglomerative")
plt.plot(lol,scores_nmi[8:16], label = "All")
plt.plot(lol,scores_nmi[16:24], label = "HDB")
plt.plot(lol,scores_nmi[24:32], label = "Kmedoid")
plt.plot(lol,scores_nmi[32:40], label = "SNF HDB")
plt.plot(lol,scores_nmi[40:48], label = "SNF Spectral")
plt.legend()

# Silhouette Score
Silhouette score from sklearn, https://scikit-learn.org/stable/modules/generated/sklearn.metrics.silhouette_score.html.

A value between -1 and 1 is given, where 1 is the best. It provides an indication of how well-separated the clusters are and how similar data points are within their own cluster compared to other clusters.

In [None]:
from sklearn.metrics import silhouette_score

sil = []

for i in range(len(labels)):
    sil.append(silhouette_score(data.gower, np.ravel(labels[i]),metric="precomputed"))
    
lol = [2,3,4,5,6,7,8,9]
    
plt.plot(lol,sil[:8], label = "Agglomerative")
plt.plot(lol,sil[8:16], label = "All")
plt.plot(lol,sil[16:24], label = "HDB")
plt.plot(lol,sil[24:32], label = "Kmedoid")
plt.plot(lol,sil[32:40], label = "SNF HDB")
plt.plot(lol,sil[40:48], label = "SNF Spectral")
plt.legend()

# Bayesian Info Criterion
Bayesion info criterion for clustering, taken from pyckmeans package, https://pyckmeans.readthedocs.io/en/latest/pyckmeans.core.html.

A higher value is better. In the context of clustering, BIC assesses the trade-off between the goodness of fit of the model (how well the data fits the clusters) and the complexity of the model (the number of parameters required to define the clusters). It penalizes models with more parameters, discouraging overfitting.

*Note: first plot, using the data in the bic array allows the bic algorithm to compute centroids, while the second plot, using the data in the bic_wmed array allows the bic algorithm to use precomputed medoids. The medoids are computed using the function I wrote in FunctionsOOP, particularily the DataSet.compute_medoids_from_distances() function.*

In [None]:
import pyckmeans.core.ckmeans as pyckmeans

bic = []
bic_wmed = []

for i in range(len(labels)):
    bic.append(pyckmeans.bic_kmeans(np.asarray(data.input_data),np.ravel(labels[i])))
    bic_wmed.append(pyckmeans.bic_kmeans(np.asarray(data.input_data),np.ravel(labels[i]), 
                                         np.asarray(data.compute_medoids_from_distances(np.ravel(labels[i])))))

fig = plt.figure(figsize=(7,7))

lol = [2,3,4,5,6,7,8,9]

plt.plot(lol,bic[:8], label = "Agglomerative")
plt.plot(lol,bic[8:16], label = "All")
plt.plot(lol,bic[16:24], label = "HDB")
plt.plot(lol,bic[24:32], label = "Kmedoid")
plt.plot(lol,bic[32:40], label = "SNF HDB")
plt.plot(lol,bic[40:48], label = "SNF Spectral")
plt.legend()

In [None]:
plt.plot(lol,bic_wmed[:8], label = "Agglomerative")
plt.plot(lol,bic_wmed[8:16], label = "All")
plt.plot(lol,bic_wmed[16:24], label = "HDB")
plt.plot(lol,bic_wmed[24:32], label = "Kmedoid")
plt.plot(lol,bic_wmed[32:40], label = "SNF HDB")
plt.plot(lol,bic_wmed[40:48], label = "SNF Spectral")
plt.legend()

# Gap Statistic

Gap statistic method adjusted from various internet sources + chatGPT!

A higher gap statistic is better. The gap statistic compares the goodness of fit of the clustering solution obtained from the data with the goodness of fit that would be expected from a null reference distribution. It essentially can determine if a clustering solution is statistically significant.

Ouputs of the method:

* gap_value: Gap Statistic. The overall metric returned by this algorithm.
* sdk: Standard Deviation of log dispersion. It provides information about the spread of log dispersions and helps in assessing the reliability of the gap statistic.
* sk: Standard Error of Gap Statistic. It helps quantify the uncertainty associated with the gap statistic and is used for error estimation.
* gap_star: Gap* Statistic. Uses non logarithmic values, so better for smaller datasets.
* sdk_star: Standard Deviation of log dispersion. Same as sdk but for Gap*.
* sk_star: Standard Error of Gap Statistic. Same as sk but for Gap*.


In [None]:
# Functions to compute gap statistic

def _calculate_dispersion(X = pd.DataFrame, labels = np.ndarray, centroids = np.ndarray):
        """
        Calculate the dispersion between actual points and their assigned centroids
        """
        disp = np.sum(
            np.sum(
                [np.abs(inst - centroids[label]) ** 2 for inst, label in zip(X, labels)]
            )
        )
        return disp
    
from sklearn_extra.cluster import KMedoids

def _calculate_gap(X: pd.DataFrame, n_refs: int, n_clusters: int, labels, medoids):
    """
    Calculate the gap value of the given data, n_refs, and number of clusters.
    Return the resutling gap value and n_clusters
    """
    # Holder for reference dispersion results
    ref_dispersions = np.zeros(n_refs)

    # Compute the range of each feature
    X = np.asarray(X)
    a, b = X.min(axis=0, keepdims=True), X.max(axis=0, keepdims=True)

    # For n_references, generate random sample and perform kmeans getting resulting dispersion of each loop
    for i in range(n_refs):
        # Create new random reference set uniformly over the range of each feature
        random_data = np.random.random_sample(size=X.shape) * (b - a) + a

        # Fit to it, getting the centroids and labels, and add to accumulated reference dispersions array.
        centroids, labels = KMedoids(n_clusters).fit(random_data).cluster_centers_, KMedoids(n_clusters).fit(random_data).labels_  # type: Tuple[np.ndarray, np.ndarray]
        dispersion = _calculate_dispersion(
            X=random_data, labels=labels, centroids=centroids
        )
        ref_dispersions[i] = dispersion

    # Fit cluster to original data and create dispersion calc.
    dispersion = _calculate_dispersion(X=X, labels=labels, centroids=medoids)

    # Calculate gap statistic
    ref_log_dispersion = np.mean(np.log(ref_dispersions))
    log_dispersion = np.log(dispersion)
    gap_value = ref_log_dispersion - log_dispersion
    # compute standard deviation
    sdk = np.sqrt(np.mean((np.log(ref_dispersions) - ref_log_dispersion) ** 2.0))
    sk = np.sqrt(1.0 + 1.0 / n_refs) * sdk

    # Calculate Gap* statistic
    # by "A comparison of Gap statistic definitions with and
    # with-out logarithm function"
    # https://core.ac.uk/download/pdf/12172514.pdf
    gap_star = np.mean(ref_dispersions) - dispersion
    sdk_star = np.sqrt(np.mean((ref_dispersions - dispersion) ** 2.0))
    sk_star = np.sqrt(1.0 + 1.0 / n_refs) * sdk_star

    return gap_value, sdk, sk, gap_star, sdk_star, sk_star

In [None]:
# Actually computing the gap statistic. Will probably take 10-20 minutes to run, depending on your n_refs. 
# An ideal n_refs value is above 100, but the higher n_refs, the longer the algorithm will take (O(n) complexity).

gaps = []

for i in range(6):
    for j in range(2,10):
        
        ind = int(j-2 + (i*8))
        #print(ind)
        gaps.append(_calculate_gap(X = np.asarray(data.input_data.copy()), n_refs = 100, 
                                   n_clusters = len(np.unique(labels[ind])),
                                   labels = np.ravel(labels[ind]),
                                   medoids = np.asarray(data.compute_medoids_from_distances(np.ravel(labels[ind])))))

In [None]:
plt.errorbar([2,3,4,5,6,7,8,9],[g[0] for g in gaps[:8]], yerr = [g[1] for g in gaps[:8]], capsize=5, fmt="-o", label="Agglomerative")
plt.errorbar([2,3,4,5,6,7,8,9],[g[0] for g in gaps[8:16]], yerr = [g[1] for g in gaps[8:16]], capsize=5, fmt="-o", label = "All")
plt.errorbar([2,3,4,5,6,7,8,9],[g[0] for g in gaps[16:24]], yerr = [g[1] for g in gaps[16:24]], capsize=5, fmt="-o", label = "HDB")
plt.errorbar([2,3,4,5,6,7,8,9],[g[0] for g in gaps[24:32]], yerr = [g[1] for g in gaps[24:32]], capsize=5, fmt="-o", label  = "Kmedoid")
plt.errorbar([2,3,4,5,6,7,8,9],[g[0] for g in gaps[32:40]], yerr = [g[1] for g in gaps[32:40]], capsize=5, fmt="-o", label = "SNF HDB")
plt.errorbar([2,3,4,5,6,7,8,9],[g[0] for g in gaps[40:48]], yerr = [g[1] for g in gaps[40:48]], capsize=5, fmt="-o", label = "SNF Spectral")
plt.legend()

# Elbow Method

Uses the above *\_calculate_dispersion()* function to compute the intracluster dispersion, which is how spread out points in a cluster are from their medoid. A lower value is better.

In [None]:
elb = []

for i in range(len(labels)):

    elb.append(_calculate_dispersion(np.asarray(data.input_data),np.asarray(labels[i]),np.asarray(data.compute_medoids_from_distances(labels[i]))))
    
plt.plot(lol,elb[:8], label = "Agglomerative")
plt.plot(lol,elb[8:16], label = "All")
plt.plot(lol,elb[16:24], label = "HDB")
plt.plot(lol,elb[24:32], label = "Kmedoid")
plt.plot(lol,elb[32:40], label = "SNF HDB")
plt.plot(lol,elb[40:48], label = "SNF Spectral")
plt.legend()

# Calinski Harabasz

Calinski Harabasz method from scikit learn, https://scikit-learn.org/stable/modules/generated/sklearn.metrics.calinski_harabasz_score.html.

A higher value is better. The score is defined as ratio of the sum of between-cluster dispersion and of within-cluster dispersion.

In [None]:
from sklearn.metrics import calinski_harabasz_score

cal = []

for i in range(len(labels)):
    cal.append(calinski_harabasz_score(data.input_data,labels[i]))
    
plt.plot(lol,cal[:8], label = "Agglomerative")
plt.plot(lol,cal[8:16], label = "All")
plt.plot(lol,cal[16:24], label = "HDB")
plt.plot(lol,cal[24:32], label = "Kmedoid")
plt.plot(lol,cal[32:40], label = "SNF HDB")
plt.plot(lol,cal[40:48], label = "SNF Spectral")
plt.legend()

# Davies Bouldin

Davies Bouldin method from scikit learn, https://scikit-learn.org/stable/modules/generated/sklearn.metrics.davies_bouldin_score.html.

Lower values is better. The score is defined as the average similarity measure of each cluster with its most similar cluster, where similarity is the ratio of within-cluster distances to between-cluster distances. Thus, clusters which are farther apart and less dispersed will result in a better score.

In [None]:
from sklearn.metrics import davies_bouldin_score

dav = []

for i in range(len(labels)):
    dav.append(davies_bouldin_score(data.input_data,labels[i]))
    
plt.plot(lol,dav[:8], label = "Agglomerative")
plt.plot(lol,dav[8:16], label = "All")
plt.plot(lol,dav[16:24], label = "HDB")
plt.plot(lol,dav[24:32], label = "Kmedoid")
plt.plot(lol,dav[32:40], label = "SNF HDB")
plt.plot(lol,dav[40:48], label = "SNF Spectral")
plt.legend()

# Dunn Index

Dunn index, adapted from internet sources + chatGPT.

A higher solution is better. It measures the ratio of the minimum inter-cluster distance to the maximum intra-cluster distance. The Dunn Index aims to capture both the compactness of clusters and the separation between clusters, making it a useful criterion for assessing the clustering structure.

In [None]:
def dunn_index(distances, labels):
    """
    Calculate the Dunn index for a given clustering result.

    Parameters:
    distances (numpy.ndarray): n x n distance matrix, where n is the number of data points.
    labels (numpy.ndarray): Cluster assignments for each data point.

    Returns:
    float: Dunn index value.
    """
    # Number of clusters (unique labels)
    num_clusters = len(np.unique(labels))

    # Calculate the cluster diameters (maximum distance within each cluster)
    cluster_diameters = np.zeros(num_clusters)
    for label in range(num_clusters):
        cluster_data = distances[labels == label][:, labels == label]
        cluster_diameters[label] = np.max(cluster_data)

    # Calculate the inter-cluster distance (minimum distance between clusters)
    inter_cluster_distance = np.min(distances[distances > 0])

    # Calculate the Dunn index
    dunn_index_value = inter_cluster_distance / np.max(cluster_diameters)

    return dunn_index_value

In [None]:
dunn = []

for i in range(len(labels)):
    dunn.append(dunn_index(np.asarray(data.gower),np.asarray(labels[i])))
    
plt.plot(lol,dunn[:8], label = "Agglomerative")
plt.plot(lol,dunn[8:16], label = "All")
plt.plot(lol,dunn[16:24], label = "HDB")
plt.plot(lol,dunn[24:32], label = "Kmedoid")
plt.plot(lol,dunn[32:40], label = "SNF HDB")
plt.plot(lol,dunn[40:48], label = "SNF Spectral")
plt.legend()

# Summary
In this section we add up all the scores for each solution to find the best overall solution

### Ranking based scoring
Here we score based on ranking when compared to other solutions and then sum up ranks to find best overall solution.

**Lower score is better**

In [None]:
sum_df = pd.DataFrame(np.vstack((sil,bic,bic_wmed,dunn,dav, cal, elb,[g[0] for g in gaps], scores_ari, scores_nmi)).T,
                   index=[f'{method} {i}' for method in ["Agglomerative", "All", "HDB", "Kmedoid", "SNF HDB", "SNF Spectral"] for i in range(2,10)],
                columns = ["Silhouette","BIC Centroid","BIC Medoid","Dunn","Davies Bouldin","Calinski Harabazc","Elbow","Gap", "Adjusted Rand Index", "Normalized Mutual Information"])

In [None]:
# Computing a score based on ranking

sum_df.rank().pow({"Silhouette":-1,"BIC Centroid":-1,"BIC Medoid":-1,"Davies Bouldin":1,"Dunn":-1,"Calinski Harabazc":-1,"Elbow":1,"Gap":-1, "Adjusted Rand Index":-1,"Normalized Mutual Information":-1}).mul({"Silhouette":1,"BIC Centroid":1,"BIC Medoid":1,"Davies Bouldin":0.25,"Dunn":1,"Calinski Harabazc":1,"Elbow":1,"Gap":1, "Adjusted Rand Index":1,"Normalized Mutual Information":1}).sum(axis=1).sort_values(ascending=True)

# Smallest score is best solution

### Scaled-based scoring
Here we score based on scaled test statistics and then sum up scaled scores and rank compared to other solutions. 

**Lower score is better.**

In [None]:
# Define custom scaling range (1 to 10)
custom_min = 1
custom_max = 10

df = sum_df.copy()

# Apply Min-Max scaling with custom range
scaled_data = (df - df.min()) / (df.max() - df.min()) * (custom_max - custom_min) + custom_min

In [None]:
scaled_data.rank().pow({"Silhouette":-1,"BIC Centroid":-1,"BIC Medoid":-1,"Davies Bouldin":1,"Dunn":-1,"Calinski Harabazc":-1,"Elbow":1,"Gap":-1, "Adjusted Rand Index":-1,"Normalized Mutual Information":-1}).mul({"Silhouette":1,"BIC Centroid":1,"BIC Medoid":1,"Davies Bouldin":0.25,"Dunn":1,"Calinski Harabazc":1,"Elbow":1,"Gap":1, "Adjusted Rand Index":1,"Normalized Mutual Information":1}).sum(axis=1).sort_values(ascending=True)