# Clustering

In [None]:
from sklearn.cluster import KMeans
from sklearn.mixture import GaussianMixture
from sklearn.metrics import silhouette_score
from sklearn.model_selection import train_test_split
from sklearn.metrics import pairwise_distances
import numpy as np
import pandas as pd

This notebook provides functions to compute and evaluate Clustering. 2 methods are implemented; K-means and Gaussian Mixture Models (GMM).

The different functionalities are the following:

- Function to fit clusters: fit_cluster
    This function handles 2 types of clustering: K-means and GMM. It necessitates as parameter the number of cluster (resp. guassian mixtures for GMM).

- Function to predict clusters: predict_cluster
    This function applies the model on a sample to attribute clusters.

- Functions to evaluate the clusters on the training sample and validation set: evaluate_cluster_train and evaluate_cluster_validation
    These functions compute the following metrics: 
    - Silhouette score
    - Intra-cluster inertia: measures the sum of squares of distances between each data point and its assigned cluster centroid.
    - Extra-cluster inertia: measures the total dispersion of data points relative to their centroids.
    - Davies-Bouldin index: measures the average similarity between each cluster and its most similar clusters. A lower value indicates better separation between clusters.

- Function to optimize the hyperparameter: gridsearch_cluster_hyperparametres
    This function aims at selectinig the optimal number of clusters for K-means and Gaussian Mixtures for GMMs.

- Function to compute a Cross-Validation: train_cluster_CV
    This function:
    - Split the sample into a training set (80%) and a test one (20%)
    - Train the clustering task on the training set (function fit_cluster)
    - The model is then applied to both training and validation sets (function predict_cluster)
    - The different metrics are computed on both samples (functions evaluate_cluster_train and evaluate_cluster_validation)
    - 2 Dataframes are returned (one on the training and on the validation test) containing for each fold the different metrics

- Function to visualize the clusters through PCA or t-SNE dimension reduction techniques: PCA_caracteristic
    This function is developed for maximum 5 clusters. It takes as input a dataframe containing the different attributes of the samples (categorical variable, for example Chemiotherapy Y/N or type of Chemiotherapy = {adjuvant, neoadjuvant, both}). For each attribute, a plot is displayed emphasizing the distribution of the different values of attributes regarding the cluster the sample belong to.

## Fit and Predict the clusters

In [None]:
def fit_cluster(data, method, k):
    '''
    Parameters
    ----------
    data : array
        Input values.
    method : string
        Clustering method to apply {'kmeans', 'GMM'}. Default = 'kmeans'.
    k : int
        For kmeans = number of clusters. For Gaussian Mixture = number of components. Default =2.
    
    Returns
    -------
    model : Sklearn model
        Fitted model.
    '''

    if method == 'GMM':
        model = GaussianMixture(n_components=k)

    else :
        model = KMeans(n_clusters=k, random_state=42)

    model.fit(data)

    return model



In [1]:
def predict_cluster(model, data) :
    '''
    Parameters
    ----------
    model : Sklearn model
        Fitted model to apply.
    data : array
        Input values.
    
    Returns
    -------
    labels : array
        The predicted cluster per sample.
    '''
    
    labels = model.predict(data)
    return labels

## Evaluate the clusters

- Silhouette score
- Intra-cluster inertia: measures the sum of squares of distances between each data point and its assigned cluster centroid.
- Extra-cluster inertia: measures the total dispersion of data points relative to their centroids.
- Davies-Bouldin index: measures the average similarity between each cluster and its most similar clusters. A lower value indicates better separation between clusters.

In [2]:
def evaluate_cluster_train(model, data, label) :
    '''
    Parameters
    ----------
    model : Sklearn model
        Fitted model to apply.
    data : array
        Training sample.
    label : array
        The predicted cluster per sample.
    
    Return
    ------
    dict of evaluations on validation sample : 
        Kmeans and GMM : silhouette score, inertia intra clusters and extra clusters, Davies-Bouldin index
        Agglomeration Clustering : silhouette
    '''

    silhouette = silhouette_score(data, label)

    if str(type(model)) == "<class 'sklearn.cluster._kmeans.KMeans'>" :
        k = model.n_clusters
        cluster_centers_train = model.cluster_centers_
    
    else :
        k = model.n_components
        cluster_centers_train = []
        for lab in range(k):
            cluster_centers_train.append(np.mean(data[label== lab], axis=0))
        cluster_centers_train = np.array(cluster_centers_train)

    
    inertie_intra_cluster = 0
    for i in range(k):
        cluster_points = data[label == i]
        if len(cluster_points) > 0:
            cluster_centroid = cluster_centers_train[i]
            inertie_intra_cluster += np.sum((cluster_points - cluster_centroid) ** 2)

    inertie_extra_cluster = np.sum(np.min(np.linalg.norm(data - cluster_centers_train[label], axis=1) ** 2))

    intra_cluster_distances = []
    for i in range(k):
        if np.sum(label == i) == 0:
            intra_cluster_distances.append(0)  
        else:
            intra_cluster_distances.append(np.mean(pairwise_distances(data[label == i], [cluster_centers_train[i]], metric='euclidean')))
    cluster_distances = pairwise_distances(cluster_centers_train, metric='euclidean')
    np.fill_diagonal(cluster_distances, np.inf)
    db_scores = [(intra_cluster_distances[i] + intra_cluster_distances[j]) / cluster_distances[i, j]
                for i in range(k) for j in range(k)]
    
    db_score_mean, db_score_min, db_score_max = np.mean(db_scores), np.min(db_scores), np.max(db_scores)

    return {'silhouette' : round(silhouette,2), 'inertia_intra' : int(inertie_intra_cluster), 'inertia_extra' : round(inertie_extra_cluster,0), 'db_mean' : round(db_score_mean,2), 'db_min':round(db_score_min,2), 'db_max':round(db_score_max,2)}

In [8]:
def evaluate_cluster_validation(model, data_train, data_test, label) :
    '''
    Parameters
    ----------
    model : Sklearn model
        Fitted model to apply.
    data : array
        Validation sample.
    label : array
        The predicted cluster per sample.
    
    Return
    ------
    dict of evaluations on validation sample : silhouette score, inertia intra clusters and extra clusters, Davies-Bouldin index
    '''

    if str(type(model)) == "<class 'sklearn.cluster._kmeans.KMeans'>" :
        k = model.n_clusters
        cluster_centers_train = model.cluster_centers_
        labels_train = model.labels_

    else :
        k = model.n_components
        labels_train = model.predict(data_train)
        cluster_centers_train = []
        for lab in range(k):
            cluster_centers_train.append(np.mean(data_train[labels_train == lab], axis=0))
        cluster_centers_train = np.array(cluster_centers_train)

    silhouette = silhouette_score(data_test, label)

    inertie_intra_cluster = 0
    for i in range(k):
        cluster_points = data_test[label == i]
        if len(cluster_points) > 0:
            cluster_centroid = cluster_centers_train[i]
            inertie_intra_cluster += np.sum((cluster_points - cluster_centroid) ** 2)

    inertie_extra_cluster = np.sum(np.min(np.linalg.norm(data_test - cluster_centers_train[label], axis=1) ** 2))

    intra_cluster_distances = []
    for i in range(k):
        if np.sum(label == i) == 0:
            intra_cluster_distances.append(0) 
        else:
            intra_cluster_distances.append(np.mean(pairwise_distances(data_test[label == i], [cluster_centers_train[i]], metric='euclidean')))
    cluster_distances = pairwise_distances(cluster_centers_train, metric='euclidean')
    np.fill_diagonal(cluster_distances, np.inf)
    db_scores = [(intra_cluster_distances[i] + intra_cluster_distances[j]) / cluster_distances[i, j]
                for i in range(k) for j in range(k)]
    
    db_score_mean, db_score_min, db_score_max = np.mean(db_scores), np.min(db_scores), np.max(db_scores)

    return {'silhouette' : silhouette, 'inertia_intra' : int(inertie_intra_cluster), 'inertia_extra' : round(inertie_extra_cluster,0), 'db_mean' : round(db_score_mean,2), 'db_min':round(db_score_min,2), 'db_max':round(db_score_max,2)}

## Gridsearch

Selection of the optimal number of clusters for K-means and Gaussian Mixtures for GMMs.

In [None]:
def gridsearch_cluster_hyperparametres(method, data, max_clusters=10, nb_folds = 5):
    '''
    Parameters
    ----------
    method : string
        Clustering method to apply {'kmeans', 'GMM'}. Default = 'kmeans'.
    data : array
        Input values.
    max_clusters : int
        Maximum number of clusters to test. Default = 10.
    nb_folds : int 
        Number of folds for the Cross Validation. Default = 5.
    
    Returns
    -------
    df : DataFrame
        For each number of clusters tested, the averaged silhouette scores (both on training and test samples) over the nb_folds cross validation.
    '''
    

    df = pd.DataFrame()
    j=0
   
    for k in range(2, max_clusters+1):
        silhouette_train_list, silhouette_test_list = [],[]
        for i in range(nb_folds) :
            X_train, X_test = train_test_split(data, test_size=0.2)
            model = fit_cluster(X_train, method, k)
            labels_train = predict_cluster(model, X_train)
            labels_test = predict_cluster(model, X_test)
            silhouette_train_list.append(silhouette_score(X_train, labels_train))
            silhouette_test_list.append(silhouette_score(X_test, labels_test))

        df.loc[j, 'k'], df.loc[j, 'Silhouette_train'], df.loc[j, 'Silhouette_test'] = k, round(np.mean(silhouette_train_list),4), round(np.mean(silhouette_test_list),4)
        j+=1
        
    return df

## Train and evaluate with Cross Validation

In [None]:
def train_cluster_CV(method, data, k, nb_folds = 5):
    '''
    Parameters
    ----------
    method : string
        Clustering method to apply {'kmeans', 'GMM'}. Default = 'kmeans'.
    data : array
        Input values.
    k : int
        Numver of clusters.
    nb_folds : int 
        Number of folds for the Cross Validation. Default = 5.
    
    Returns
    -------
    df : DataFrame
        For each clusters fitted (number of folds), the averaged metrics (both on training and test samples) over the nb_folds cross validation.
    '''
    
    df_train, df_test = pd.DataFrame(columns=['Silhouette', 'Inertia_intra', 'Inertia_extra', 'DB_mean', 'DB_min', 'DB_max']), pd.DataFrame(columns=['Silhouette', 'Inertia_intra', 'Inertia_extra', 'DB_mean', 'DB_min', 'DB_max'])

    for i in range(nb_folds) :
        X_train, X_test = train_test_split(data, test_size=0.3)
        model = fit_cluster(X_train, method, k)
        labels_train = predict_cluster(model, X_train)
        labels_test = predict_cluster(model, X_test)
        eval_train = evaluate_cluster_train(model, X_train, labels_train)
        eval_test = evaluate_cluster_validation(model, X_train, X_test, labels_test)
        df_train.loc[i, 'Silhouette'], df_train.loc[i, 'Inertia_intra'], df_train.loc[i, 'Inertia_extra'], df_train.loc[i, 'DB_mean'], df_train.loc[i, 'DB_min'], df_train.loc[i, 'DB_max'] = eval_train['silhouette'], eval_train['inertia_intra'], eval_train['inertia_extra'], eval_train['db_mean'], eval_train['db_min'], eval_train['db_max']
        df_test.loc[i, 'Silhouette'], df_test.loc[i, 'Inertia_intra'], df_test.loc[i, 'Inertia_extra'], df_test.loc[i, 'DB_mean'], df_test.loc[i, 'DB_min'], df_test.loc[i, 'DB_max'] = eval_test['silhouette'], eval_test['inertia_intra'], eval_test['inertia_extra'], eval_test['db_mean'], eval_test['db_min'], eval_test['db_max']

    return df_train, df_test

## Visualization Analysis

In [None]:
def PCA_caracteristic(data, columns_list, k=2, path='', loc_legend2 = 'best'):
    '''
    This function is only suitable for 2 clusters and 2 dimensional PCA or T-SNE.

    Parameters
    ----------
    data : DataFrame
        DataFrame containing the clusters 'Cluster', the two pricipal components resulting form a PCA or a T-SNE in two dimensions 'PC1', 'PC2' and the caracteristics of the population, on which we apply the visualization.
    colums_list : list
        List containing the name of the columns (ie. variables) on which we want to realize the plot.
    k : int
        Number of clusters.
    path : str
        Path and name of the resulting figure(s).
    loc_legend2 : str
        Localization for the legend of the different unique values for the characteristics of the population (a variable, ie. a column). 
    '''

    liste_markers = ['o', 's', '^', 'd', 'X']

    for col in columns_list : 

        fig, axs = plt.subplots(ncols=3, nrows=math.ceil((len(np.unique(data[col]))+1)/3),figsize=(15, 4*math.ceil((len(np.unique(data[col]))+1)/3)))

        if (len(np.unique(data[col]))+1)%3 != 0 :
            if math.floor((len(np.unique(data[col]))+1)/3)==1:
                fig.delaxes(axs[math.ceil((len(np.unique(data[col]))+1)/3)-1, -1])
            else :
                for l in range(1,math.floor((len(np.unique(data[col]))+1)/3)):
                    fig.delaxes(axs[math.ceil((len(np.unique(data[col]))+1)/3)-1, -l])

        if len(np.unique(data[col])) == 2 :
            colors = ['mediumblue', 'darkorange']
        elif len(np.unique(data[col])) == 3 :
            colors = ['mediumblue', 'limegreen', 'gold']
        elif len(np.unique(data[col])) > 5 :
            colors = ['mediumblue', 'teal', 'limegreen', 'gold', 'darkorange', 'deeppink', 'darkviolet', 'chocolate']
        else :
            colors = ['mediumblue', 'teal', 'limegreen', 'gold', 'darkorange']
        

        for i, ax in enumerate(axs.ravel()):
            if i <= len(np.unique(data[col])) :
                if i == 0:
                    for cluster in range(k):
                        if cluster == 0 :
                            # Cluster 1
                            j = 0
                            cluster_data = data[data['Cluster'] == cluster]
                            for var in data[col].unique():
                                var_data = cluster_data[cluster_data[col] == var]
                                ax.scatter(var_data['PC1'], var_data['PC2'], label=f'{var}', c=colors[j], marker=liste_markers[cluster], alpha=0.6)
                                j += 1

                        else :
                            # Clusters 2 à k
                            j = 0
                            cluster_data = data[data['Cluster'] == cluster]
                            for var in data[col].unique():
                                var_data = cluster_data[cluster_data[col] == var]
                                ax.scatter(var_data['PC1'], var_data['PC2'], c=colors[j], marker=liste_markers[cluster], alpha=0.6)
                                j += 1

                    ax2 = ax.twinx()
                    ax2.get_yaxis().set_visible(False)
                    for cluster in range(k) :
                        ax2.plot(np.NaN, np.NaN, linestyle="none", marker=liste_markers[cluster], markersize=7, markerfacecolor="black", label='Cluster ' + str(cluster+1))

                    # Création de la légende pour les BC Sub Type
                    list_caracteristic = []
                    for cc, var in enumerate(data[col].unique()):
                        list_caracteristic.append(plt.Line2D([0], [0], marker='o', color='w', markerfacecolor=colors[cc], markersize=7, label=var, alpha=0.6))

                    # Affichage des légendes
                    ax.set_xlabel('PC1')
                    ax.set_ylabel('PC2')
                    #ax.set_title(col)
                    if loc_legend2 != 'best':
                        ax.legend(handles=list_caracteristic, title=col, fontsize=7, title_fontsize=8, loc=loc_legend2)
                    else :
                        ax.legend(handles=list_caracteristic, title=col, fontsize=7, title_fontsize=8)
                    ax2.legend(loc='upper left', title='Clusters', fontsize=7, title_fontsize=8)

                    ax.grid(False)
                    
                    plt.savefig(path+'_'+col+'.PNG')

                else :
                    for cluster in range(k):
                        if cluster == 0 :
                            # Cluster 1
                            list_caracteristic = []
                            cluster_data = data[data['Cluster'] == cluster]
                            for var in data[col].unique():
                                var_data = cluster_data[cluster_data[col] == var]
                                if var == data[col].unique()[i-1] :
                                    ax.scatter(var_data['PC1'], var_data['PC2'], label=f'{var}', c='mediumblue', marker=liste_markers[cluster], alpha=0.8)
                                    list_caracteristic.append(plt.Line2D([0], [0], marker=liste_markers[cluster], color='w', markerfacecolor='mediumblue', markersize=7, label=var, alpha=0.8))
                                else :
                                    ax.scatter(var_data['PC1'], var_data['PC2'], label=f'{var}', c='gray', marker=liste_markers[cluster], alpha=0.4)
                                    list_caracteristic.append(plt.Line2D([0], [0], marker=liste_markers[cluster], color='w', markerfacecolor='gray', markersize=7, label=var, alpha=0.4))

                        else :
                            # Clusters 2 à k
                            cluster_data = data[data['Cluster'] == cluster]
                            for var in data[col].unique():
                                var_data = cluster_data[cluster_data[col] == var]
                                if var == data[col].unique()[i-1] :
                                    ax.scatter(var_data['PC1'], var_data['PC2'], label=f'{var}', c='mediumblue', marker=liste_markers[cluster], alpha=0.8)
                                else :
                                    ax.scatter(var_data['PC1'], var_data['PC2'], label=f'{var}', c='gray', marker=liste_markers[cluster], alpha=0.4)

                        ax2 = ax.twinx()
                        ax2.get_yaxis().set_visible(False)
                        for cluster in range(k) :
                            ax2.plot(np.NaN, np.NaN, linestyle="none", marker=liste_markers[cluster], markersize=7, markerfacecolor="black", label='Cluster ' + str(cluster+1))

                    ax.set_xlabel('PC1')
                    ax.set_ylabel('PC2')
                    #ax.set_title(col)
                    if loc_legend2 != 'best':
                        ax.legend(handles=list_caracteristic, title=col, fontsize=7, title_fontsize=8, loc=loc_legend2)
                    else :
                        ax.legend(handles=list_caracteristic, title=col, fontsize=7, title_fontsize=8)
                    ax2.legend(loc='upper left', title='Clusters', fontsize=7, title_fontsize=8)

                    ax.grid(False)

                    plt.savefig(path+'_'+col+'.PNG')