In [1]:
from sklearn.metrics import adjusted_rand_score
import numpy as np
import os
from sklearn.cluster import AgglomerativeClustering

%matplotlib qt5

import matplotlib.pyplot as plt


In [2]:
def palette_for_label_map(no_cluster, unclassified=False, mask=False):
    """
    no_cluster = number of real clusters
    unclassified = no_cluster + 1
    mask = no_cluster + 2

    Returns palette, CustomCmap
    """
    import seaborn as sns
    import matplotlib

    # Extract colors from tab20 and tab20b
    tab20_colors = [plt.cm.tab20(i) for i in range(20)]
    tab20b_colors = [plt.cm.tab20b(i) for i in range(20)]
    combined_colors = tab20_colors + tab20b_colors

    custom_palette = combined_colors[:no_cluster]

    if unclassified == True:
        custom_palette.insert(0, "black")  # UNCLASSIFIED
    if mask == True:
        custom_palette.insert(0, "white")  # MASKED OUT

    CustomCmap = matplotlib.colors.ListedColormap(custom_palette)
    palette = sns.color_palette(palette=custom_palette)
    return (palette, CustomCmap)

def include_scalebar(dp):
    from matplotlib_scalebar.scalebar import ScaleBar

    dx = dp.axes_manager[0].scale
    scalebar = ScaleBar(
        dx,
        "nm",
        length_fraction=0.25,
        width_fraction=0.015,
        location="lower left",
        frameon=False,
        color="w",
        scale_loc="top",
        border_pad=0.5,
    )
    plt.gca().add_artist(scalebar)


def plot_label_map(labels_highest_soft_spatial, no_cluster_soft, unclassified=False, scalebar=False):
    palette, CustomCmap = palette_for_label_map(
        no_cluster_soft, unclassified=unclassified, mask=False
    )

    plt.figure(figsize=(15,15))
    plt.imshow(labels_highest_soft_spatial, cmap=CustomCmap)

    if scalebar == True:
        include_scalebar(dp)
    
    plt.xticks([])
    plt.yticks([])
    plt.tight_layout()
    plt.show()

In [3]:


def load_clustering_results(folder_path):
    """
    Loads clustering results from .npy files in a folder and creates an array.

    Args:
        folder_path (str): Path to the folder containing .npy clustering result files.

    Returns:
        np.ndarray: A 2D array where each row corresponds to clustering labels from one file.
    """
    clustering_results = []

    # Loop through all .npy files in the folder
    for file_name in sorted(os.listdir(folder_path)):
        if file_name.endswith(".npy"):
            file_path = os.path.join(folder_path, file_name)
            print(file_path)
            labels = np.load(file_path)
            if labels.ndim > 1:  # Flatten if labels are in a column/row
                labels = labels.flatten()
            clustering_results.append(labels)
    
    # Convert to a NumPy array
    clustering_results = np.array(clustering_results)
    return np.array(clustering_results)

labels_path = "C:\\Users\Zhi Quan\\Dropbox (Personal)\\Jupyter backup\TRISO\\2025\data\\feature_engi_labels\\"
labels = load_clustering_results(labels_path)

C:\Users\Zhi Quan\Dropbox (Personal)\Jupyter backup\TRISO\2025\data\feature_engi_labels\0_control_labels.npy
C:\Users\Zhi Quan\Dropbox (Personal)\Jupyter backup\TRISO\2025\data\feature_engi_labels\1_std_dev_thres_labels.npy
C:\Users\Zhi Quan\Dropbox (Personal)\Jupyter backup\TRISO\2025\data\feature_engi_labels\2_bina_labels.npy
C:\Users\Zhi Quan\Dropbox (Personal)\Jupyter backup\TRISO\2025\data\feature_engi_labels\3_rv_labels.npy
C:\Users\Zhi Quan\Dropbox (Personal)\Jupyter backup\TRISO\2025\data\feature_engi_labels\4_aa_labels.npy
C:\Users\Zhi Quan\Dropbox (Personal)\Jupyter backup\TRISO\2025\data\feature_engi_labels\5_ift_labels.npy


In [4]:
from sklearn.metrics import adjusted_rand_score
import numpy as np
import os

def load_membership_weights(file_paths, file_format='npy'):
    """
    Load membership weights from files into a list of numpy arrays.
    
    Parameters:
        file_paths (str or list of str): Path(s) to membership weight files or a directory containing them.
        file_format (str): Format of the files ('npy', 'csv', or 'txt'). Default is 'npy'.
        
    Returns:
        list of np.ndarray: List of membership weight matrices (n_samples x n_clusters).
    """
    # If a directory is provided, get all files with the given format
    if isinstance(file_paths, str) and os.path.isdir(file_paths):
        file_paths = sorted([
            os.path.join(file_paths, f)
            for f in os.listdir(file_paths)
            if f.endswith(f".{file_format}")
        ])
    elif isinstance(file_paths, str):
        file_paths = [file_paths]
    
    # Check for valid file paths
    if not file_paths:
        raise ValueError("No valid file paths provided or found in the directory.")
    
    # Load weights from each file
    weight_matrices = []
    for file_path in file_paths:
        if not os.path.isfile(file_path):
            raise ValueError(f"File not found: {file_path}")
        
        if file_format == 'npy':
            print(file_path)
            weight_matrix = np.load(file_path)
        elif file_format in ['csv', 'txt']:
            weight_matrix = np.loadtxt(file_path, delimiter=',')
        else:
            raise ValueError(f"Unsupported file format: {file_format}")
        
        weight_matrices.append(weight_matrix)
    
    return weight_matrices


memberships_path = "C:\\Users\Zhi Quan\\Dropbox (Personal)\\Jupyter backup\TRISO\\2025\data\\feature_engi_memberships\\"
clusters_all_memberships_list = load_membership_weights(memberships_path)


C:\Users\Zhi Quan\Dropbox (Personal)\Jupyter backup\TRISO\2025\data\feature_engi_memberships\0_control_memberships.npy
C:\Users\Zhi Quan\Dropbox (Personal)\Jupyter backup\TRISO\2025\data\feature_engi_memberships\1_std_dev_thres_memberships.npy
C:\Users\Zhi Quan\Dropbox (Personal)\Jupyter backup\TRISO\2025\data\feature_engi_memberships\2_bina_memberships.npy
C:\Users\Zhi Quan\Dropbox (Personal)\Jupyter backup\TRISO\2025\data\feature_engi_memberships\3_rv_memberships.npy
C:\Users\Zhi Quan\Dropbox (Personal)\Jupyter backup\TRISO\2025\data\feature_engi_memberships\4_aa_memberships.npy
C:\Users\Zhi Quan\Dropbox (Personal)\Jupyter backup\TRISO\2025\data\feature_engi_memberships\5_ift_memberships.npy


In [5]:
clusters_single_membership_list = []

for cluster in clusters_all_memberships_list:
    single_membership = np.array([np.max(x) for x in cluster])
    clusters_single_membership_list.append(single_membership)

clusters_single_membership_list = np.array(clusters_single_membership_list)
clusters_single_membership_list.shape

(6, 25600)

In [6]:
from sklearn.preprocessing import Normalizer
clusters_single_membership_list = Normalizer('l1').fit_transform(clusters_single_membership_list.T).T


# Soft label implementation (working)

In [7]:
import numpy as np
from scipy.sparse import coo_matrix
from sklearn.preprocessing import MinMaxScaler

def soft_label_consensus(cluster_runs_list, label_memberships):
    """
    Compute a consensus distance matrix based on clustering labels and associated membership values,
    considering only the clustering run with the maximum pairwise weight for each label.

    Args:
        cluster_runs_list (list of np.ndarray): List of clustering label arrays (one array per clustering).
        label_memberships (list of np.ndarray): List of membership value arrays corresponding to cluster labels.
                                                Each array matches the length of the corresponding label array.

    Returns:
        np.ndarray: A consensus distance matrix (1 - normalized co-occurrence).
    """
    # Number of samples
    n_samples = len(cluster_runs_list[0])
    
    # Initialize matrices
    co_occurrence_matrix = np.zeros((n_samples, n_samples), dtype=float)
    max_pairwise_weights = np.zeros((n_samples, n_samples), dtype=float)
    
    # Process each clustering run
    for clustering, memberships in zip(cluster_runs_list, label_memberships):
        # Unique cluster labels and indices
        unique_labels = np.unique(clustering)
        
        for label in unique_labels:
            # Find indices of points in the current cluster
            cluster_indices = np.where(clustering == label)[0]

            # Skip small clusters
            if len(cluster_indices) <= 1:
                continue

            cluster_memberships = memberships[cluster_indices]
        
            # Compute pairwise weights (vectorized outer product)
            pairwise_weights = np.outer(cluster_memberships, cluster_memberships)

            
            # Update co-occurrence and max-pairwise weights using broadcasting
            idx_i, idx_j = np.meshgrid(cluster_indices, cluster_indices, indexing="ij")
            mask = pairwise_weights >= max_pairwise_weights[idx_i, idx_j]

            # print("Max weights before update:", max_pairwise_weights[idx_i[mask], idx_j[mask]])
            max_pairwise_weights[idx_i[mask], idx_j[mask]] = pairwise_weights[mask]

    max_pairwise_weights = MinMaxScaler().fit_transform(max_pairwise_weights)
    
    # Convert similarity to distance
    distance_matrix = 1 - max_pairwise_weights
    
    return distance_matrix


In [17]:
distance_matrix = soft_label_consensus(labels, clusters_single_membership_list_norm)

## Notes

HDBSCAN does not work well with the distance matrix created. Use agglo clustering instead

In [None]:
# Apply hierarchical clustering to the co-occurrence matrix
hierarchical = AgglomerativeClustering(
    distance_threshold=0.8,
    n_clusters=None,
    metric='precomputed',
    linkage='average'
)
consensus_labels = hierarchical.fit_predict(distance_matrix)  # Dissimilarity for agglomerative clustering

print(np.unique(consensus_labels))

In [23]:
np.save('consensus_labels.npy', consensus_labels)

In [None]:
plt.imshow(consensus_labels.reshape(160,160), cmap='tab20')
plt.xticks([])
plt.yticks([])

## V3 weighted (working)

In [7]:
import numpy as np

def soft_label_consensus_with_subclusters(cluster_runs_list, label_memberships, penalization_weight=0.5):
    """
    Compute a consensus distance matrix to encourage finer sub-clusters,
    balancing co-occurrence with variability across clustering runs.

    Args:
        cluster_runs_list (list of np.ndarray): Clustering label arrays from different runs.
        label_memberships (list of np.ndarray): Membership value arrays corresponding to the label arrays.
        penalization_weight (float): Threshold for penalizing overly broad consensus. variability threshold = 0 results in complete disabling of the variability penalty. The function behaves like the original implementation, focusing purely on pairwise co-occurrence without penalizing inconsistent clustering.

    Returns:
        np.ndarray: A consensus distance matrix (1 - normalized co-occurrence).
    """
    # Number of samples

    from sklearn.preprocessing import MinMaxScaler

    n_samples = len(cluster_runs_list[0])
    
    # Initialize matrices
    co_occurrence_matrix = np.zeros((n_samples, n_samples), dtype=float)
    run_contribution_matrix = np.zeros((n_samples, n_samples), dtype=float)

    # Process each clustering run
    for clustering, memberships in zip(cluster_runs_list, label_memberships):
        unique_labels = np.unique(clustering)
        
        for label in unique_labels:
            cluster_indices = np.where(clustering == label)[0]
            if len(cluster_indices) <= 1:
                continue
            
            cluster_memberships = memberships[cluster_indices]
            pairwise_weights = np.outer(cluster_memberships, cluster_memberships)
            
            idx_i, idx_j = np.meshgrid(cluster_indices, cluster_indices, indexing="ij")
            co_occurrence_matrix[idx_i, idx_j] += pairwise_weights
            run_contribution_matrix[idx_i, idx_j] += 1

    # Normalize co-occurrence by the number of cluster runs
    co_occurrence_matrix /= len(cluster_runs_list)
    run_contribution_matrix /= len(cluster_runs_list)

    # Apply variability penalty
    penalized_similarity_matrix  = co_occurrence_matrix * (1 - np.abs(run_contribution_matrix - 1) * penalization_weight)

    # Normalize similarity matrix
    scaler = MinMaxScaler()
    normalized_similarity = scaler.fit_transform(penalized_similarity_matrix)

    # Convert similarity to distance
    distance_matrix = 1 - normalized_similarity

    return distance_matrix

In [14]:
import gc
import hdbscan

# # Delete unnecessary variables to free up memory
# del distance_matrix_sub
# del clust
# gc.collect()

distance_matrix_sub = soft_label_consensus_with_subclusters(labels, clusters_single_membership_list, penalization_weight=value)

clust = hdbscan.HDBSCAN(min_cluster_size=250, min_samples=25, metric='precomputed')
clust.fit(distance_matrix_sub)

print(f'{value} : {len(np.unique(clust.labels_))}')
plot_label_map(clust.labels_.reshape(160,160), no_cluster_soft=len(np.unique(clust.labels_))-1, unclassified=True)

# plt.savefig(f'{value}_weight_label_map_hdbscan.png')
# plt.close()


0.3 : 13


In [21]:
plt.imshow(clust.probabilities_.reshape(160,160))
plt.colorbar()

<matplotlib.colorbar.Colorbar at 0x20299901fd0>