code being adapted from: https://scikit-learn.org/stable/auto_examples/cluster/plot_cluster_comparison.html#sphx-glr-auto-examples-cluster-plot-cluster-comparison-py

In [None]:
import time
import warnings
from itertools import cycle, islice

import matplotlib.pyplot as plt
import numpy as np
import plotly.express as px

from sklearn import cluster, datasets, mixture
from sklearn.neighbors import kneighbors_graph
from sklearn.preprocessing import StandardScaler
from sklearn.covariance import EmpiricalCovariance

In [None]:
import seaborn as sns

In [None]:
# mean = np.zeros(num_feat)

# cov = [[1, 0], [0, 1]]

# x = np.random.multivariate_normal(mean, cov, (3, 3))

In [None]:
%run blob_distCurve_metrics.ipynb

In [None]:
n_features = 3

In [None]:
# ellips_homosk_clusters[1]

In [None]:
def make_heteroskedastic_anisotropic_blobs_nd(
    n_samples=1000,
    n_features=3,
    cluster_std=1.0,
    transforms=None,
    random_state=seed,
):
    
    if isinstance(cluster_std, float):
        num_center = 1
    elif isinstance(cluster_std, list):
        num_center = len(cluster_std)
    else:
        # raise error
        return

    X, y = datasets.make_blobs(
        n_samples=n_samples,
        n_features=n_features,
        cluster_std=cluster_std,
        random_state=random_state,
        centers=num_center
    )

    rng = np.random.RandomState(random_state)
    unique_labels = np.unique(y)

    if transforms is None:
        transforms = {
            label: rng.randn(n_features, n_features)*3.0
            for label in unique_labels
        }

    X_transformed = np.zeros_like(X)
    for label in unique_labels:
        T = transforms[label]
        X_transformed[y == label] = X[y == label] @ T.T

    return X_transformed, y, transforms


In [None]:
def generate_noisy_nd_grid(grid_points_per_dim=10, n_dims=3, spacing=1.0, noise_std=0.1, seed=None):

    rng = np.random.default_rng(seed)

    if isinstance(grid_points_per_dim, int):
        grid_points_per_dim = [grid_points_per_dim] * n_dims
    elif len(grid_points_per_dim) != n_dims:
        raise ValueError("Length of grid_points_per_dim must match n_dims")

    # Create clean grid
    grid_pts = [np.linspace(0, (n - 1) * spacing, n) for n in grid_points_per_dim]
    mesh = np.meshgrid(*grid_pts, indexing='ij')
    grid = np.stack(mesh, axis=-1).reshape(-1, n_dims)

    # Add Gaussian noise
    # TODO: VARY NOISE / HETEROSKEDASTIC
    noise = rng.normal(scale=noise_std, size=grid.shape)
    noisy_grid = grid + noise

    return noisy_grid


In [None]:
def noisy_torus(n_samples=1000, R=2, r=1, noise=0.2, random_state=None, center=(0, 0, 0)):
    rng = np.random.default_rng(random_state)
    
    # Sample angles u, v uniformly
    u = rng.uniform(0, 2 * np.pi, n_samples)
    v = rng.uniform(0, 2 * np.pi, n_samples)
    
    # Parametric equations
    x = (R + r * np.cos(v)) * np.cos(u)
    y = (R + r * np.cos(v)) * np.sin(u)
    z = r * np.sin(v)
    
    # Add Gaussian noise if requested
    if noise > 0:
        x += rng.normal(0, noise, n_samples)
        y += rng.normal(0, noise, n_samples)
        z += rng.normal(0, noise, n_samples)
    
    return np.stack([x, y, z], axis=1)


In [None]:
def noisy_concentric_spheres(n_samples=1000,n_features=3,n_spheres=4,noise=0.2, center=(0, 0, 0), radii_arr=None):
    
    if radii_arr is None:
        scale_factor = 50
        radii_arr = np.linspace(1, n_spheres*scale_factor,n_spheres + 1)

    n_samples_per_shell = int(n_samples / n_spheres)
    rng = np.random.default_rng(seed)

    X_list = []

    for i, r in enumerate(radii_arr):

        # sample spherical coords
        theta = rng.uniform(0, 2 * np.pi, n_samples_per_shell)
        phi = np.arccos(rng.uniform(-1, 1, n_samples_per_shell))
        
        r_noisy = r + rng.normal(0, noise, n_samples_per_shell)
        
        r_noisy_x = r_noisy
        r_noisy_y = r_noisy
        r_noisy_z = r_noisy

        # convert to feature vector
        x = r_noisy_x * np.sin(phi) * np.cos(theta)
        y = r_noisy_y * np.sin(phi) * np.sin(theta)
        z = r_noisy_z * np.cos(phi)
        
        shell_points = np.stack([x, y, z], axis=1)
        X_list.append(shell_points)
    
    X = np.vstack(X_list)

    return X

In [None]:
from sklearn.feature_selection import mutual_info_regression
from sklearn.covariance import EmpiricalCovariance, MinCovDet

def continuous_mi_matrix(X,dim=1):
    n = X.shape[1]
    mi_matrix = np.zeros((n, n))
    
    for i in range(n):
        y = X[:, i]
        for j in range(n):
            if i == j:
                mi_matrix[i, j] = 0  # MI with itself isn't informative
            else:
                mi_matrix[i, j] = mutual_info_regression(X[:, [j]], y, discrete_features=False)[0]
    
    return mi_matrix


In [None]:
## SEVER THE DATA IN SOME WAY -> EG DROPOUTS / REMOVE POINTS RANDOMLY

In [None]:
def rotate_3d_points(X, theta1, theta2, theta3):

    Rx = np.array([
        [1, 0, 0],
        [0, np.cos(theta1), -np.sin(theta1)],
        [0, np.sin(theta1),  np.cos(theta1)]
    ])
    
    Ry = np.array([
        [ np.cos(theta2), 0, np.sin(theta2)],
        [0, 1, 0],
        [-np.sin(theta2), 0, np.cos(theta2)]
    ])
    
    Rz = np.array([
        [np.cos(theta3), -np.sin(theta3), 0],
        [np.sin(theta3),  np.cos(theta3), 0],
        [0, 0, 1]
    ])
    
    R = Rz @ Ry @ Rx
    
    # Apply rotation
    X_rotated = X @ R.T
    return X_rotated

In [None]:
## ADD: paraboloids

In [None]:

gaus_homosk_blobs = datasets.make_blobs(n_samples=n_samples,n_features=n_features, random_state=seed)

gaus_heterosk_blobs = datasets.make_blobs(
    n_samples=n_samples,n_features=n_features, cluster_std=[2.0, 2.5, 0.5], random_state=seed
)

swissrolldata = datasets.make_swiss_roll(
    n_samples=n_samples,random_state=seed,noise=0.2
)


In [None]:
iso_gaus = datasets.make_blobs(n_samples=n_samples,n_features=n_features, random_state=seed,cluster_std=[1.0],centers=1)

In [None]:
ellips_homosk_clusters = make_heteroskedastic_anisotropic_blobs_nd(cluster_std=[1.0,1.0,1.0])

In [None]:
ellips_homosk_cluster_one = make_heteroskedastic_anisotropic_blobs_nd(cluster_std=[1.0])

In [None]:
ellips_heterosk_clusters = make_heteroskedastic_anisotropic_blobs_nd(cluster_std=[1.0,0.3,2.0])

In [None]:
noisy_concentric_sph = noisy_concentric_spheres()

In [None]:
noisy_torus = noisy_torus()

In [None]:
gauss_noise_grid = generate_noisy_nd_grid()

In [None]:
S_points, S_color = datasets.make_s_curve(n_samples, random_state=seed)

In [None]:
dataset_lst = [
    (gaus_homosk_blobs[0],{'name' : 'gaussian_homoskedastic',
                           'class_label' : gaus_homosk_blobs[1]},),
    (gaus_heterosk_blobs[0], {'name' : 'gaussian_heteroskedastic',
                           'class_label' : gaus_heterosk_blobs[1]}),
    (swissrolldata[0], {'name' : 'swissroll'}),
    (ellips_homosk_clusters[0],{'name' : 'ellipsoid_homoskedastic',
                                'class_label' : ellips_homosk_clusters[1]}),
    (ellips_heterosk_clusters[0],{'name' : 'ellipsoid_heteroskedastic',
                                'class_label' : ellips_heterosk_clusters[1]}),
    (ellips_homosk_cluster_one[0],{'name' : 'aniso_gaus',}),
    (iso_gaus[0],{'name' : 'iso_gaus',}),
    (gauss_noise_grid,{'name' : 'noisy_grid',
                        }),
    (noisy_concentric_sph,{'name' : 'noisy_spheres',
                        }),
    (noisy_torus,{'name' : 'noisy_torus',
                        }),
    (S_points,{'name' : 's_curve',
                        }),

]


In [None]:

for i_dataset, (dataset_val, params_dict) in enumerate(dataset_lst):
    X = dataset_val

    y = None

    # X = rotate_3d_points(X, np.pi/3, np.pi/4, 0)

    if 'class_label' in params_dict.keys():
        y = params_dict['class_label']


    plot_3d_scatters(X,'')
    plot_dist_distribution(X,point='global_median_centroid',dist_scale='none')
    # fig.suptitle(params_dict['name'], fontsize=16)

In [None]:
dataset_lst = [
    (gaus_homosk_blobs[0],{'name' : 'gaussian_homoskedastic',
                           'class_label' : gaus_homosk_blobs[1]},),
    (gaus_heterosk_blobs[0], {'name' : 'gaussian_heteroskedastic',
                           'class_label' : gaus_heterosk_blobs[1]}),
    (swissrolldata[0], {'name' : 'swissroll'}),
    (ellips_homosk_clusters[0],{'name' : 'ellipsoid_homoskedastic',
                                'class_label' : ellips_homosk_clusters[1]}),
    (ellips_heterosk_clusters[0],{'name' : 'ellipsoid_heteroskedastic',
                                'class_label' : ellips_heterosk_clusters[1]}),
    (gauss_noise_grid,{'name' : 'noisy_grid',
                        }),
    (noisy_concentric_sph,{'name' : 'noisy_spheres',
                        }),
    (noisy_torus,{'name' : 'noisy_torus',
                        }),
    (S_points,{'name' : 's_curve',
                        }),

]

for i_dataset, (dataset_val, params_dict) in enumerate(dataset_lst):
    X = dataset_val

    y = None

    X = rotate_3d_points(X, np.pi/3, np.pi/4, 0)

    if 'class_label' in params_dict.keys():
        y = params_dict['class_label']

    def scatter_helper(ax, xs, ys, zs=None, class_label = None):

        if zs is not None:
            if class_label is not None:
                sc = ax.scatter(xs, ys, zs, c=class_label, cmap='tab10', s=10)
            else:
                sc = ax.scatter(xs, ys, zs, s=10)
            return sc

        else:
            if class_label is not None:
                sc = ax.scatter(xs, ys, c=class_label, cmap='tab10', s=10)
            else:
                sc = ax.scatter(xs, ys, s=10)
            return sc
        
        return None

    # normalize dataset
    X = StandardScaler().fit_transform(X)

    # fig = plt.figure()    
    fig, axs = plt.subplots(1, 4, figsize=(15, 3), subplot_kw={})

    axs[0] = fig.add_subplot(1, 4, 1, projection='3d')
    xs,ys,zs = X[:, 0], X[:, 1], X[:, 2]
    scatter_helper(axs[0],xs,ys,zs,y)
    axs[0].set_title(params_dict['name'])

    # 2D scatter plots
    scatter_helper(axs[1],xs,ys,None,y)
    axs[1].set_title('0 vs 1')

    scatter_helper(axs[2],ys,zs,None,y)
    axs[2].set_title('1 vs 2')

    scatter_helper(axs[3],xs,zs,None,y)
    axs[3].set_title('0 vs 2')
        
    # plt.xlim(-2.5, 2.5)
    # plt.ylim(-2.5, 2.5)

    plt.tight_layout()
    plt.show()

    fig1, axs1 = plt.subplots(1, 5, figsize=(15, 3), subplot_kw={})

    # fit a MCD robust estimator to data
    cov_mat = MinCovDet().fit(X)
    # cov_mat = np.cov(X, rowvar=False)
    mask = np.triu(np.ones_like(cov_mat.covariance_, dtype=bool), k=1)  
    # mask = np.ones()  
    sns.heatmap(cov_mat.covariance_, mask=mask,ax=axs1[0], cbar=True, square=True,
                annot=True, fmt=".2f", cmap="coolwarm")
    axs1[0].set_title('Covariance MCD')

    # fit a MCD robust estimator to data
    cov_mat = EmpiricalCovariance().fit(X)
    # cov_mat = np.cov(X, rowvar=False)
    mask = np.triu(np.ones_like(cov_mat.covariance_, dtype=bool), k=1)  
    # mask = np.ones()  
    sns.heatmap(cov_mat.covariance_, mask=mask,ax=axs1[1], cbar=True, square=True,
                annot=True, fmt=".2f", cmap="coolwarm")
    axs1[1].set_title('Covariance Empirical')

    mi_mtx = continuous_mi_matrix(X)
    sns.heatmap(mi_mtx, ax=axs1[4], cbar=True, square=True,
                annot=True, fmt=".2f", cmap="coolwarm")
    axs1[4].set_title('MI')

    corr_coef = stats.spearmanr(X, axis=0).statistic
    sns.heatmap(corr_coef, mask=mask,ax=axs1[2], cbar=True, square=True,
                annot=True, fmt=".2f", cmap="coolwarm")
    axs1[2].set_title('SpearmanR')

    # kernel_fn = 'sigmoid'
    # corr_coef =  pairwise.pairwise_kernels(X.T, metric=kernel_fn)

    kernel_fn = 'cosine'
    corr_coef =  pairwise.cosine_similarity(X.T)

    sns.heatmap(corr_coef, mask=mask,ax=axs1[3], cbar=True, square=True,
                annot=True, fmt=".2f", cmap="coolwarm")
    axs1[3].set_title(f'Pairwise {kernel_fn}')

    plt.tight_layout()
    plt.show()



In [None]:
import scipy.stats as stats
from sklearn.metrics import pairwise

In [None]:
# X[:,0].shape