In [1]:
import matplotlib.pyplot as plt
import numpy as np
from sklearn.neighbors import kneighbors_graph
from scipy.sparse.csgraph import connected_components
from tensorflow import keras
import numpy as np
from sklearn.manifold import MDS
from scipy.linalg import pinv, pinvh

import torch
import torch.nn as nn
from torch.distributions.normal import Normal
import umap

from torch.utils.data import DataLoader
from torchvision import datasets, transforms

from sklearn.metrics import silhouette_samples
from scipy.sparse.csgraph import floyd_warshall

In [2]:
%run model.py

In [3]:
%matplotlib qt

In [4]:
np.random.seed(42)

In [5]:
def construct_mutual_knn_graph(X, k=5):
    """
    Construct a mutual k-NN graph and weight edges using a Gaussian kernel.

    Parameters:
        X (np.ndarray): Data matrix where rows are samples.
        k (int): Number of neighbors for k-NN.
        sigma (float): Gaussian kernel parameter.

    Returns:
        adj_matrix (np.ndarray): Symmetric weighted adjacency matrix.
    """

    # Compute k-NN graph (sparse adjacency matrix)
    knn_graph = kneighbors_graph(X, n_neighbors=k, mode='distance', include_self=False)
    knn_distances = knn_graph.toarray()

    # Symmetrize to get mutual k-NN graph
    mutual_knn = np.maximum(knn_distances, knn_distances.T)

    # return adj_matrix
    return mutual_knn

In [6]:
def is_graph_connected(adj_matrix):
    """
    Check if the graph is fully connected.

    Parameters:
        adj_matrix (np.ndarray): Weighted adjacency matrix.

    Returns:
        connected (bool): True if the graph is fully connected, False otherwise.
    """
    # Use scipy's connected_components to check connectivity
    n_components, _ = connected_components(csgraph=adj_matrix, directed=False)
    return n_components == 1

In [7]:
# def compute_commute_time_distance(knn_graph):
#     """
#     Compute the Commute Time Distance for a given k-NN graph.

#     Parameters:
#         knn_graph (np.ndarray): Adjacency matrix of the k-NN graph (NxN).
#                                 The graph should be symmetric and connected.

#     Returns:
#         np.ndarray: Commute time distance matrix (NxN).
#     """
#     # Ensure the graph is symmetric
#     if not np.allclose(knn_graph, knn_graph.T):
#         raise ValueError("Input graph must be symmetric.")

#     # Compute the degree matrix
#     degrees = np.sum(knn_graph, axis=1)
#     degree_matrix = np.diag(degrees)
#     print('Degree Matrix Computed')

#     # Compute the Laplacian matrix
#     laplacian = degree_matrix - knn_graph
#     print('Laplacian Matrix Computed')
    

#     # Compute the pseudoinverse of the Laplacian
#     laplacian_pinv = pinv(laplacian)
#     print('Laplacian Pseudoinverse Computed')

#     n = knn_graph.shape[0]
#     ctd_matrix = np.zeros((n, n))

#     # Compute the commute time distance matrix
#     for i in range(n):
#         for j in range(n):
#             ctd_matrix[i, j] = laplacian_pinv[i, i] + laplacian_pinv[j, j] - 2 * laplacian_pinv[i, j]

#     # ensure vtd_matrix is symmetric
#     ctd_matrix = np.maximum(ctd_matrix, ctd_matrix.T)
    
#     return ctd_matrix.squeeze()

In [8]:
def compute_commute_time_distance(knn_graph):
    """
    Compute the Commute Time Distance for a given k-NN graph.

    Parameters:
        knn_graph (np.ndarray): Adjacency matrix of the k-NN graph (NxN).
                                The graph should be symmetric and connected.

    Returns:
        np.ndarray: Commute time distance matrix (NxN).
    """
    # Ensure the graph is symmetric
    if not np.allclose(knn_graph, knn_graph.T):
        raise ValueError("Input graph must be symmetric.")
    
    # Compute the degree matrix
    degrees = np.sum(knn_graph, axis=1)
    degree_matrix = np.diag(degrees)

    # Compute the Laplacian matrix
    laplacian = degree_matrix - knn_graph

    # Compute the pseudoinverse of the Laplacian
    laplacian_pinv = pinvh(laplacian)  # Use pinvh for symmetric matrices

    # Vectorized computation of commute time distance
    diag_elements = np.diag(laplacian_pinv)
    ctd_matrix = diag_elements[:, None] + diag_elements[None, :] - 2 * laplacian_pinv

    # Return the CTD matrix
    return ctd_matrix

In [9]:
def compute_shortest_path_distance(knn_graph):
    """
    Compute the Shortest Path Distance for a given k-NN graph.

    Parameters:
        knn_graph (np.ndarray): Adjacency matrix of the k-NN graph (NxN).
                                The graph should be symmetric and connected.
                                Non-edges should have a weight of 0.

    Returns:
        np.ndarray: Shortest path distance matrix (NxN).
    """
    # Ensure the graph is symmetric
    if not np.allclose(knn_graph, knn_graph.T):
        raise ValueError("Input graph must be symmetric.")

    # Replace zeros (non-edges) with infinity for Floyd-Warshall algorithm
    graph = np.where(knn_graph > 0, knn_graph, np.inf)
    np.fill_diagonal(graph, 0)  # Set diagonal to 0 (distance to self is zero)

    # Compute shortest path distances
    shortest_path_distances = floyd_warshall(csgraph=graph, directed=False)

    return shortest_path_distances

In [10]:
def plot_reduced_data_3D(reduced_data, test_labels):
    """
    Plots reduced data points in 3D, colored by their labels.

    Parameters:
        reduced_data (np.ndarray): Reduced data with 3 dimensions.
        test_labels (np.ndarray): Labels corresponding to data points.
    """
    unique_labels = np.unique(test_labels)
    colors = plt.cm.get_cmap('tab10', len(unique_labels))  # Use a colormap with enough colors

    fig = plt.figure(figsize=(10, 8))
    ax = fig.add_subplot(111, projection='3d')

    for i, label in enumerate(unique_labels):
        idx = test_labels == label
        ax.scatter(
            reduced_data[idx, 0], reduced_data[idx, 1], reduced_data[idx, 2],
            color=colors(i), label=f"Label {label}", s=5, alpha=0.8
        )

    ax.set_title("3D Plot of Reduced Data")
    ax.set_xlabel("Dimension 1")
    ax.set_ylabel("Dimension 2")
    ax.set_zlabel("Dimension 3")
    ax.legend()

    plt.show()
    

In [11]:
def plot_reduced_data_2D(reduced_data, test_labels):
    """
    Plots reduced data points in 2D, colored by their labels.

    Parameters:
        reduced_data (np.ndarray): Reduced data with 2 dimensions.
        test_labels (np.ndarray): Labels corresponding to data points.
    """
    unique_labels = np.unique(test_labels)
    colors = plt.cm.get_cmap('tab10', len(unique_labels))  # Use a colormap with enough colors

    plt.figure(figsize=(10, 8))

    for i, label in enumerate(unique_labels):
        idx = test_labels == label
        plt.scatter(
            reduced_data[idx, 0], reduced_data[idx, 1],
            color=colors(i), label=f"Label {label}", s=5, alpha=0.8
        )

    plt.title("2D Plot of Reduced Data")
    plt.xlabel("Dimension 1")
    plt.ylabel("Dimension 2")
    plt.legend()

    plt.show()

In [12]:
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import imageio
import os

def plot_reduced_data_3D_gif(reduced_data, test_labels, output_path="3D_plot_classes.gif"):
    """
    Creates a 3D plot of reduced data points, adding one class at a time,
    and saves the result as a GIF.

    Parameters:
        reduced_data (np.ndarray): Reduced data with 3 dimensions.
        test_labels (np.ndarray): Labels corresponding to data points.
        output_path (str): Path to save the generated GIF.
    """
    unique_labels = np.unique(test_labels)
    colors = plt.cm.get_cmap('tab10', len(unique_labels))  # Use a colormap with enough colors

    frames_dir = "temp_frames"
    os.makedirs(frames_dir, exist_ok=True)

    # Create 3D plot
    fig = plt.figure(figsize=(10, 8))
    ax = fig.add_subplot(111, projection='3d')

    # Configure plot
    ax.set_title("3D Plot of Reduced Data")
    ax.set_xlabel("Dimension 1")
    ax.set_ylabel("Dimension 2")
    ax.set_zlabel("Dimension 3")

    frame_paths = []

    # Add classes one by one
    for i, label in enumerate(unique_labels):
        idx = test_labels == label
        ax.scatter(
            reduced_data[idx, 0], reduced_data[idx, 1], reduced_data[idx, 2],
            color=colors(i), label=f"Label {label}", s=5, alpha=0.8
        )
        ax.legend()

        # Save the current frame
        frame_path = os.path.join(frames_dir, f"frame_{i}.png")
        plt.savefig(frame_path)
        frame_paths.append(frame_path)

    # Create GIF
    images = [imageio.imread(frame_path) for frame_path in frame_paths]
    imageio.mimsave(output_path, images, fps=1)

    # Cleanup temporary frames
    for frame_path in frame_paths:
        os.remove(frame_path)
    os.rmdir(frames_dir)

    print(f"GIF saved to {output_path}")

In [13]:
import numpy as np
import matplotlib.pyplot as plt
import imageio
import os

def plot_reduced_data_2D_gif(reduced_data, test_labels, output_path="2D_plot_classes.gif"):
    """
    Creates a 2D plot of reduced data points, adding one class at a time,
    and saves the result as a GIF.

    Parameters:
        reduced_data (np.ndarray): Reduced data with 2 dimensions.
        test_labels (np.ndarray): Labels corresponding to data points.
        output_path (str): Path to save the generated GIF.
    """
    unique_labels = np.unique(test_labels)
    colors = plt.cm.get_cmap('tab10', len(unique_labels))  # Use a colormap with enough colors

    frames_dir = "temp_frames"
    os.makedirs(frames_dir, exist_ok=True)

    frame_paths = []

    # Create 2D plot
    plt.figure(figsize=(10, 8))

    for i, label in enumerate(unique_labels):
        idx = test_labels == label
        plt.scatter(
            reduced_data[idx, 0], reduced_data[idx, 1],
            color=colors(i), label=f"Label {label}", s=5, alpha=0.8
        )
        plt.title("2D Plot of Reduced Data")
        plt.xlabel("Dimension 1")
        plt.ylabel("Dimension 2")
        plt.legend()

        # Save the current frame
        frame_path = os.path.join(frames_dir, f"frame_{i}.png")
        plt.savefig(frame_path)
        frame_paths.append(frame_path)

    # Create GIF
    images = [imageio.imread(frame_path) for frame_path in frame_paths]
    imageio.mimsave(output_path, images, fps=1)

    # Cleanup temporary frames
    for frame_path in frame_paths:
        os.remove(frame_path)
    os.rmdir(frames_dir)

    print(f"GIF saved to {output_path}")


In [14]:
def get_latent(model, data_loader, device):
    model.eval()
    latents = []
    labels = []
    with torch.no_grad():
        for _, (data, label) in enumerate(data_loader):
          mu, _ = model.encoder(data.to(device))
          latents.append(mu.cpu())
          labels.append(label.cpu())

    latents = torch.cat(latents, dim=0).numpy()
    labels = torch.cat(labels, dim=0).numpy()
    
    return latents, labels

In [15]:
def umap_reduction(latents, n_neighbors=15, min_dist=0.1, n_components=2):
    # Initialize UMAP model
    # umap_model = umap.UMAP(n_neighbors=n_neighbors, min_dist=min_dist, n_components=n_components)
    
    reducer = umap.UMAP(n_neighbors=n_neighbors, min_dist=min_dist, n_components=n_components)
    embedding = reducer.fit_transform(latents)
    
    return embedding

In [18]:
def umap_reduction_3D(latents, n_neighbors=15, min_dist=0.1, n_components=3):
    # Initialize UMAP model
    # umap_model = umap.UMAP(n_neighbors=n_neighbors, min_dist=min_dist, n_components=n_components)
    
    reducer = umap.UMAP(n_neighbors=n_neighbors, min_dist=min_dist, n_components=n_components)
    embedding = reducer.fit_transform(latents)
    
    return embedding

In [16]:
z_dim = 8

model_name = "vae_8new_layout.pth"

vae = VAE(z_dim=z_dim)
vae.load_state_dict(torch.load(model_name))

  vae.load_state_dict(torch.load(model_name))


<All keys matched successfully>

In [19]:
transform = transforms.Compose([transforms.ToTensor()])
testset = datasets.MNIST('.', download=True, train=False, transform=transform)
test_loader = DataLoader(testset, batch_size=64, shuffle=True)

In [20]:
latent_variables, lables = get_latent(vae, test_loader,
                       device='cuda' if torch.cuda.is_available() else 'cpu')

In [31]:
# Randomly sample 1000 images
n_samples = 1000
sampled_indices = np.random.choice(latent_variables.shape[0], n_samples, replace=False)

latent_variables = latent_variables[sampled_indices]
lables = lables[sampled_indices]

In [32]:
latent_variables.shape

(1000, 8)

In [33]:
# Reduce dimensions to 2D using UMAP
latents_2d = umap_reduction(latent_variables)

In [21]:
latents_3d = umap_reduction_3D(latent_variables)

In [22]:
plot_reduced_data_3D(latents_3d, lables)

In [34]:
plot_reduced_data_2D(latents_2d, lables)

In [58]:
plot_reduced_data_2D_gif(latents_2d, lables, output_path="2D_classes.gif")

GIF saved to 2D_classes.gif


In [60]:
knn_latent = construct_mutual_knn_graph(latent_variables, k=3)

In [61]:
if is_graph_connected(knn_latent):
    print('Graph is connected')
else:
    print('Graph is not connected')

Graph is connected


In [62]:
# compute commute time distance
ctd_latent = compute_commute_time_distance(knn_latent)

In [63]:
# compute shortest path distance
spd_latent = compute_shortest_path_distance(knn_latent)

In [64]:
# # store spd_latent and ctd_latent in text files
# np.save('spd_latent_reduced_data.npy', spd_latent)
# np.save('ctd_latent_reduced_data.npy', ctd_latent)

In [65]:
# # load ctd_latent
# ctd_latent = np.load('ctd_latent.npy')
# spd_latent = np.load('spd_latent.npy')

In [66]:
# Perform MDS on the commute time distance matrix
mds_3D = MDS(n_components=3, dissimilarity='precomputed')
reduced_latent_ctd_3D = mds_3D.fit_transform(ctd_latent)

# store reduced_latent_3D in a text file
# np.save('reduced_latent_ctd_3D.npy', reduced_latent_ctd_3D)

In [None]:
# reduced_latent_ctd_3D = np.load("reduced_latent_ctd_3D.npy")

In [67]:
plot_reduced_data_3D(reduced_latent_ctd_3D, lables)

In [68]:
plot_reduced_data_3D_gif(reduced_latent_ctd_3D, lables, output_path="3D_classes.gif")

GIF saved to 3D_classes.gif


In [69]:
# Perform MDS on the commute time distance matrix
mds_2D = MDS(n_components=2, dissimilarity='precomputed')
reduced_latent_ctd_2D = mds_2D.fit_transform(ctd_latent)

# store reduced_latent_2D in a text file
# np.save('reduced_latent_ctd_2D.npy', reduced_latent_ctd_2D)

: 

In [53]:
reduced_latent_ctd_2D = np.load('reduced_latent_ctd_2D.npy')

FileNotFoundError: [Errno 2] No such file or directory: 'reduced_latent_ctd_2D.npy'

In [54]:
plot_reduced_data_2D(reduced_latent_ctd_2D, lables)

In [59]:
plot_reduced_data_2D_gif(reduced_latent_ctd_2D, lables, output_path="2D_classes.gif")

GIF saved to 2D_classes.gif


In [26]:
mds_3D = MDS(n_components=3, dissimilarity='precomputed')
reduced_latent_spd_3D = mds_3D.fit_transform(spd_latent)

# store reduced_latent_3D in a text file
np.save('reduced_latent_spd_3D.npy', reduced_latent_spd_3D)

In [27]:
reduced_latent_spd_3D = np.load('reduced_latent_spd_3D.npy')

In [29]:
plot_reduced_data_3D(reduced_latent_spd_3D, lables)

In [16]:
mds_2D = MDS(n_components=2, dissimilarity='precomputed')
reduced_latent_spd_2D = mds_2D.fit_transform(spd_latent)

# store reduced_latent_2D in a text file
np.save('reduced_latent_spd_2D.npy', reduced_latent_spd_2D)

In [17]:
reduced_latent_spd_2D = np.load("reduced_latent_spd_2D.npy")

In [23]:
plot_reduced_data_2D(reduced_latent_spd_2D, lables)

In [24]:
def silhouette_plot_from_distance_matrix(distance_matrix, labels, save_path):
    """
    Function to create a Silhouette Plot using a precomputed distance matrix.
    
    Parameters:
    - distance_matrix: Precomputed pairwise distance matrix.
    - labels: Cluster labels for each point (from clustering).
    - save_path: File path to save the silhouette plot.
    """
    # Ensure distance matrix is square
    assert distance_matrix.shape[0] == distance_matrix.shape[1], "Distance matrix must be square"

    n_clusters = len(np.unique(labels))
    silhouette_vals = silhouette_samples(distance_matrix, labels, metric="precomputed")
    
    # Initialize plot
    fig, ax = plt.subplots(figsize=(8, 6))
    
    # Variables to keep track of the silhouette value ranges
    y_lower = 10  # Starting point for the first silhouette bar

    # Define a list of tab colors (enough for up to 10 clusters)
    tab_colors = ['tab:blue', 'tab:orange', 'tab:green', 'tab:red', 'tab:gray', 'tab:olive', 'tab:cyan', 'tab:pink', 'tab:purple', 'tab:brown']

    # Plot silhouette values for each cluster
    for i in range(n_clusters):
        # Aggregate silhouette scores for samples in the current cluster
        ith_cluster_silhouette_values = silhouette_vals[labels == i]
        ith_cluster_silhouette_values.sort()

        # Number of points in the current cluster
        size_cluster_i = ith_cluster_silhouette_values.shape[0]
        y_upper = y_lower + size_cluster_i

        # Use tab colors for the clusters
        color = tab_colors[i % len(tab_colors)]
        
        # Fill plot for each cluster
        ax.fill_betweenx(np.arange(y_lower, y_upper),
                         0, ith_cluster_silhouette_values,
                         facecolor=color, edgecolor=color, alpha=0.7)

        # Calculate and annotate average silhouette score for the cluster
        avg_silhouette_score = np.mean(ith_cluster_silhouette_values)
        ax.text(0.05, y_lower + 0.5 * size_cluster_i, f'{avg_silhouette_score:.2f}', 
                color='black', fontsize=10, weight='bold')

        # Label cluster
        ax.text(-0.05, y_lower + 0.5 * size_cluster_i, str(i))

        # Move the y_lower boundary for the next cluster
        y_lower = y_upper + 10  # 10 for spacing between clusters

    # Configure the plot
    ax.set_title("Silhouette Plot for the Clusters (Precomputed Distance Matrix)")
    ax.set_xlabel("Silhouette Coefficient Values")
    ax.set_ylabel("Cluster Label")

    # Draw a vertical line for average silhouette score across all points
    ax.axvline(x=np.mean(silhouette_vals), color="red", linestyle="--", label='Average Silhouette Score')
    ax.legend()

    ax.set_yticks([])  # Clear the y-axis labels/ticks
    ax.set_xlim([-1, 1])  # Silhouette values range between -1 and 1

    # Save the plot
    plt.tight_layout()
    plt.savefig(save_path)
    plt.show()

In [26]:
silhouette_plot_from_distance_matrix(ctd_latent, lables, 'silhouette_plot.png')

In [27]:
def silhouette_plot(data, labels, save_path):
    """
    Function to create a Silhouette Plot with fixed tab colors and average silhouette scores.
    
    Parameters:
    - data: The dataset (after PCA transformation).
    - labels: Cluster labels for each point (from GMM clustering).
    """
    n_clusters = len(np.unique(labels))
    silhouette_vals = silhouette_samples(data, labels)
    
    # Initialize plot
    fig, ax = plt.subplots(figsize=(8, 6))
    
    # Variables to keep track of the silhouette value ranges
    y_lower = 10  # Starting point for the first silhouette bar

    # Define a list of tab colors (enough for up to 10 clusters)
    tab_colors = ['tab:blue', 'tab:orange', 'tab:green', 'tab:red', 'tab:gray', 'tab:olive', 'tab:cyan', 'tab:pink', 'tab:purple', 'tab:brown']

    # Plot silhouette values for each cluster
    for i in range(n_clusters):
        # Aggregate silhouette scores for samples in the current cluster
        ith_cluster_silhouette_values = silhouette_vals[labels == i]
        ith_cluster_silhouette_values.sort()

        # Number of points in the current cluster
        size_cluster_i = ith_cluster_silhouette_values.shape[0]
        y_upper = y_lower + size_cluster_i

        # Use tab colors for the clusters
        color = tab_colors[i % len(tab_colors)]
        
        # Fill plot for each cluster
        ax.fill_betweenx(np.arange(y_lower, y_upper),
                         0, ith_cluster_silhouette_values,
                         facecolor=color, edgecolor=color, alpha=0.7)

        # Calculate and annotate average silhouette score for the cluster
        avg_silhouette_score = np.mean(ith_cluster_silhouette_values)
        ax.text(0.05, y_lower + 0.5 * size_cluster_i, f'{avg_silhouette_score:.2f}', 
                color='black', fontsize=10, weight='bold')

        # Label cluster
        ax.text(-0.05, y_lower + 0.5 * size_cluster_i, str(i))

        # Move the y_lower boundary for the next cluster
        y_lower = y_upper + 10  # 10 for spacing between clusters

    # Configure the plot
    ax.set_title("Silhouette Plot for the Clusters")
    ax.set_xlabel("Silhouette Coefficient Values")
    ax.set_ylabel("Cluster Label")

    # Draw a vertical line for average silhouette score across all points
    ax.axvline(x=np.mean(silhouette_vals), color="red", linestyle="--", label='Average Silhouette Score')
    ax.legend()

    ax.set_yticks([])  # Clear the y-axis labels/ticks
    ax.set_xlim([-1, 1])  # Silhouette values range between -1 and 1

    # Save the plot
    plt.tight_layout()
    plt.savefig(save_path)
    plt.show()

In [28]:
silhouette_plot(latent_variables, lables, 'silhouette_plot.png')