In [None]:
import json

import pandas as pd

import gudhi as gd
import gudhi.wasserstein as wasserstein
import gudhi.hera as hera

from collections import defaultdict
from tqdm import tqdm

from itertools import combinations, combinations_with_replacement, product

import ripserplusplus as rpp

from scipy.spatial import distance_matrix
from scipy.stats import pearsonr
from sklearn.preprocessing import StandardScaler

from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

import numpy as np
import torch
import matplotlib.pyplot as plt

%matplotlib inline

In [None]:
cmap = 'viridis'

# Metrics

In [None]:
dataset_name = 'MNIST'
version = 'd16'
#version = 'd256_K32_N32_A_v1'

models = {
     'PCA':'PCA',
     'UMAP':'UMAP',
    'Basic AutoEncoder':'AE',
    'Topological AutoEncoder':'TopoAE',
    'RTD AutoEncoder H1':'RTD-AE',
    'NSA AutoEncoder':'NSA-AE',
    'LNSA AutoEncoder':'LNSA-AE',
    'GNSA AutoEncoder':'GNSA-AE',
}

## Calculate distance matrix

In [None]:
import torch

def pdist_gpu(a, b, device = 'cuda'):
    A = torch.tensor(a, dtype = torch.float64)
    B = torch.tensor(b, dtype = torch.float64)

    size = (A.shape[0] + B.shape[0]) * A.shape[1] / 1e9
    max_size = 0.2

    if size > max_size:
        parts = int(size / max_size) + 1
    else:
        parts = 1

    pdist = np.zeros((A.shape[0], B.shape[0]))
    At = A.to(device)

    for p in range(parts):
        i1 = int(p * B.shape[0] / parts)
        i2 = int((p + 1) * B.shape[0] / parts)
        i2 = min(i2, B.shape[0])

        Bt = B[i1:i2].to(device)
        pt = torch.cdist(At, Bt)
        pdist[:, i1:i2] = pt.cpu()

        del Bt, pt
        torch.cuda.empty_cache()

    del At

    return pdist

def zero_out_diagonal(distances):# make 0 on diagonal
    return distances * (np.ones_like(distances) - np.eye(*distances.shape))

In [None]:
if 'COIL' in dataset_name:
    data = np.load(f'data/{dataset_name}/prepared/data.npy')
else:
    data = np.load(f'data/{dataset_name}/prepared/train_data.npy')
data = data.reshape(data.shape[0], -1)
ids = np.random.choice(np.arange(len(data)), size=min(30000, len(data)), replace=False)
data = data[ids]

In [None]:
torch.cuda.empty_cache()

In [None]:
original_distances = pdist_gpu(data, data)

In [None]:
original_distances = zero_out_diagonal(original_distances)

## Pearson correlation for pairwise distances

In [None]:
try:
    if 'COIL' in dataset_name:
        labels = np.load(f'data/{dataset_name}/prepared/labels.npy')
    else:
        labels = np.load(f'data/{dataset_name}/prepared/train_labels.npy')    
except FileNotFoundError:
    labels = np.load(f'data/{dataset_name}/prepared/train_data.npy')

def get_distances(data):
    data = data.reshape(data.shape[0], -1)
    distances = distance_matrix(data, data)
    distances = distances[np.triu(np.ones_like(distances), k=1) > 0]
    return distances
 # take only different 

In [None]:
labels.shape

In [None]:
results = {}
for model_name in models:
    try:
        latent = np.load(f'data/{dataset_name}/{model_name}_latent_output_{version}.npy')[ids]
        print(latent.shape)
        latent_distances = pdist_gpu(latent, latent)
    except FileNotFoundError:
        latent = None
        continue
    results[model_name] = pearsonr(
        latent_distances[np.triu(np.ones_like(original_distances), k=1) > 0], 
        original_distances[np.triu(np.ones_like(original_distances), k=1) > 0])[0]

In [None]:
results

## Triplet accuracy

In [None]:
def triplet_accuracy(input_data, latent_data, triplets=None):
    # calculate distance matricies
    input_data = input_data.reshape(input_data.shape[0], -1)
    input_distances = zero_out_diagonal(pdist_gpu(input_data, input_data))
    latent_data = latent_data.reshape(latent_data.shape[0], -1)
    latent_distances = zero_out_diagonal(pdist_gpu(latent_data, latent_data))
    # generate triplets
    if triplets is None:
        triplets = np.asarray(list(combinations(range(len(input_data)), r=3)))
    i_s = triplets[:, 0]
    j_s = triplets[:, 1]
    k_s = triplets[:, 2]
    acc = (np.logical_xor(
        input_distances[i_s, j_s] < input_distances[i_s, k_s], 
        latent_distances[i_s, j_s] < latent_distances[i_s, k_s]
    ) == False)
    acc = np.mean(acc.astype(np.int32))
    return acc


def avg_triplet_accuracy(input_data, latent_data, batch_size=128, n_runs=20):
    # average over batches
    accs = []
    triplets = np.asarray(list(combinations(range(min(batch_size, len(input_data))), r=3)))
    if batch_size > len(input_data):
        accs.append(triplet_accuracy(input_data, latent_data, triplets=triplets))
        return accs
    for _ in range(n_runs):
        ids = np.random.choice(np.arange(len(input_data)), size=batch_size, replace=False)
        accs.append(triplet_accuracy(input_data[ids], latent_data[ids], triplets=triplets))
    return accs

In [None]:
if 'COIL' in dataset_name:
    input_data = np.load(f'data/{dataset_name}/prepared/data.npy')
else:
    input_data = np.load(f'data/{dataset_name}/prepared/train_data.npy')
for model_name in models:
    try:
        latent_data = np.load(f'data/{dataset_name}/{model_name}_latent_output_{version}.npy')
        print(latent_data.shape)
    except FileNotFoundError:
        continue
    accs = avg_triplet_accuracy(input_data, latent_data, batch_size=150, n_runs=10)
    print(f"Model: {model_name}, triplet acc: ${np.mean(accs):.3f} \pm {np.std(accs):.3f}$")

# RTD

Switch to ripser++ from ArGentum

In [None]:
from src.loss import RTDLoss
import torch

In [None]:
n_runs = 10
batch_size = 200

loss = RTDLoss(dim=1, engine='ripser')
if 'COIL' in dataset_name:
    data = np.load(f'data/{dataset_name}/prepared/data.npy')
else:
    data = np.load(f'data/{dataset_name}/prepared/train_data.npy')

# data = np.load(f'data/{dataset_name}/prepared/train_data.npy')
data = data.reshape(len(data), -1)
print(data.shape)
if batch_size > len(data):
    n_runs=1
    
max_dim = 1
results = defaultdict(list)

for i in tqdm(range(n_runs)):
    ids = np.random.choice(np.arange(0, len(data)), size=min(batch_size, len(data)), replace=False)
    
    x = data[ids]
    x_distances = distance_matrix(x, x)
    x_distances = x_distances/np.percentile(x_distances.flatten(), 90)
    
    for model_name in models:
        try:
            z = np.load(f'data/{dataset_name}/{model_name}_latent_output_{version}.npy')
        except FileNotFoundError:
            try:
                z = np.load(f'data/{dataset_name}/{model_name}_latent_output.npy')
            except FileNotFoundError:
                continue
        z = z.reshape(len(z), -1)[ids]
        z_distances = distance_matrix(z, z)
        z_distances = z_distances/np.percentile(z_distances.flatten(), 90)
        print(f'Calculating RTD for: {model_name}')
        with torch.no_grad():
            _, _, value = loss(torch.tensor(x_distances), torch.tensor(z_distances))
        results[model_name].append(value.item())

In [None]:
names = [
    'PCA', 
    'UMAP', 
    'Basic AutoEncoder', 
    'Topological AutoEncoder', 
    'RTD AutoEncoder H1',
    'NSA AutoEncoder',
    'GNSA AutoEncoder',
    'LNSA AutoEncoder',
]
for model_name in names:
    if model_name in results:
        print(f"{model_name}: ${np.mean(results[model_name]):.2f} \pm {np.std(results[model_name]):.2f}$")

# Tripet acc. between cluster centers

In [None]:
def get_cluster_distances(data, labels):
    clusters = []
    if len(data.shape) > 2:
        data = data.reshape(data.shape[0], -1)
    for l in np.sort(np.unique(labels)):
        clusters.append(np.mean(data[labels == l], axis=0))
    clusters = np.asarray(clusters)
    return distance_matrix(clusters, clusters)

In [None]:
#data = np.load(f'data/{dataset_name}/prepared/train_data.npy')
if 'COIL' in dataset_name:
    data = np.load(f'data/{dataset_name}/prepared/data.npy')
    labels = np.load(f'data/{dataset_name}/prepared/labels.npy')
else:
    data = np.load(f'data/{dataset_name}/prepared/train_data.npy')
    labels = np.load(f'data/{dataset_name}/prepared/train_labels.npy')


original_distances = get_cluster_distances(data, labels)

In [None]:
def triplet_accuracy_between_clusters(original_distances, latent_distances):
    triplets = np.asarray(list(combinations(range(len(original_distances)), r=3)))
    i_s = triplets[:, 0]
    j_s = triplets[:, 1]
    k_s = triplets[:, 2]
    acc = (np.logical_xor(
        original_distances[i_s, j_s] < original_distances[i_s, k_s], 
        latent_distances[i_s, j_s] < latent_distances[i_s, k_s]
    ) == False)
    return acc

def triplet_accuracy_between_clusters_(original_distances, latent_distances):
    ids = range(len(original_distances))
    triplets = np.asarray(list(product(ids, ids, ids)))
    i_s = triplets[:, 0]
    j_s = triplets[:, 1]
    k_s = triplets[:, 2]
    acc = (np.logical_xor(
        original_distances[i_s, j_s] < original_distances[i_s, k_s], 
        latent_distances[i_s, j_s] < latent_distances[i_s, k_s]
    ) == False)
    return acc

In [None]:
for model_name in models:
    try:
        latent_data = np.load(f'data/{dataset_name}/{model_name}_latent_output_{version}.npy')
        print(latent_data.shape)
    except FileNotFoundError:
        continue
    latent_distances = get_cluster_distances(latent_data, labels)
    accs = triplet_accuracy_between_clusters_(original_distances, latent_distances)
    print(f"Model: {model_name}, triplet acc: ${np.mean(accs):.3f} \pm {np.std(accs):.3f}$")

### MSE Between X and X hat

In [None]:
from torch import nn
criterion = nn.MSELoss()

In [None]:
if dataset_name in ['COIL-20','COIL-100']:
    data = np.load(f'data/{dataset_name}/prepared/data.npy')
    data = data.reshape(len(data), -1)
    labels = np.load(f'data/{dataset_name}/prepared/labels.npy')
else:
    data = np.load(f'data/{dataset_name}/prepared/train_data.npy')
    data = data.reshape(len(data), -1)
    labels = np.load(f'data/{dataset_name}/prepared/train_labels.npy')

In [None]:
scaler = {
    "Basic AutoEncoder":255,
    "RTD AutoEncoder H1":255,
    "LNSA AutoEncoder":255,
    "NSA AutoEncoder":255,
    "GNSA AutoEncoder":255,
    "Topological AutoEncoder":255,
}

In [None]:
for model_name in models:
    try:
        output_data = np.load(f'data/{dataset_name}/{model_name}_final_output_{version}.npy')
    except FileNotFoundError:
        continue
    accs = criterion(torch.tensor(data)/scaler[model_name], torch.tensor(output_data))
    print(f"Model: {model_name}, MSE value: {accs:e}")

#### Test MSE

In [None]:
#No Test MSE for COIL Datasets

data = np.load(f'data/{dataset_name}/prepared/test_data.npy')
data = data.reshape(len(data), -1)
labels = np.load(f'data/{dataset_name}/prepared/test_labels.npy')

In [None]:
for model_name in models:
    try:
        output_data = np.load(f'data/{dataset_name}/{model_name}_final_output_{version}_test.npy')
    except FileNotFoundError:
        continue
    accs = criterion(torch.tensor(data)/scaler[model_name], torch.tensor(output_data))
    print(f"Model: {model_name}, MSE value: {accs:e}")

### Calculate GNSA

In [None]:
from src.loss import NSALoss

In [None]:
criterion = NSALoss()

In [None]:
n_runs = 10
batch_size = 2000

loss = criterion

if dataset_name in ['COIL-20','COIL-100']:
    data = np.load(f'data/{dataset_name}/prepared/data.npy')
    data = data.reshape(len(data), -1)
else:
    data = np.load(f'data/{dataset_name}/prepared/train_data.npy')
    data = data.reshape(len(data), -1)
data = data/255

if batch_size > len(data):
    n_runs=1
    
results = defaultdict(list)

for i in tqdm(range(n_runs)):
    ids = np.random.choice(np.arange(0, len(data)), size=min(batch_size, len(data)), replace=False)
    
    x = data[ids]
    x = x/255
    
    for model_name in models:
        try:
            z = np.load(f'data/{dataset_name}/{model_name}_latent_output_{version}.npy')
        except FileNotFoundError:
            try:
                z = np.load(f'data/{dataset_name}/{model_name}_latent_output.npy')
            except FileNotFoundError:
                continue
        z = z.reshape(len(z), -1)[ids]
        print(f'Calculating NSA for: {model_name}')
        with torch.no_grad():
            value = loss(torch.tensor(x), torch.tensor(z))
        results[model_name].append(value.item())

In [None]:
names = [
    'PCA', 
    'UMAP', 
    'Basic AutoEncoder', 
    'Topological AutoEncoder', 
    'RTD AutoEncoder H1',
    'NSA AutoEncoder',
    'LNSA AutoEncoder',
    'GNSA AutoEncoder',
]
for model_name in names:
    if model_name in results:
        print(f"{model_name}: ${np.mean(results[model_name]):.4f} \pm {np.std(results[model_name]):.2f}$")

### LNSA

In [None]:
from src.loss import LID_NSALoss_v1

In [None]:
criterion = LID_NSALoss_v1(k=100,full=False)

In [None]:
n_runs = 10
batch_size = 4000

loss = criterion

if dataset_name in ['COIL-20','COIL-100']:
    data = np.load(f'data/{dataset_name}/prepared/data.npy')
    data = data.reshape(len(data), -1)
else:
    data = np.load(f'data/{dataset_name}/prepared/train_data.npy')
    data = data.reshape(len(data), -1)
data = data/255

if batch_size > len(data):
    n_runs=1
    
results = defaultdict(list)

for i in tqdm(range(n_runs)):
    ids = np.random.choice(np.arange(0, len(data)), size=min(batch_size, len(data)), replace=False)
    
    x = data[ids]
    x = x/255
    
    for model_name in models:
        try:
            z = np.load(f'data/{dataset_name}/{model_name}_latent_output_{version}.npy')
        except FileNotFoundError:
            try:
                z = np.load(f'data/{dataset_name}/{model_name}_latent_output.npy')
            except FileNotFoundError:
                continue
        z = z.reshape(len(z), -1)[ids]
        print(f'Calculating NSA for: {model_name}')
        with torch.no_grad():
            value1 = loss(torch.tensor(x), torch.tensor(z))
            value2 = loss(torch.tensor(z), torch.tensor(x))
        results[model_name].append((value1.item()+value2.item())/2)

In [None]:
names = [
    'PCA', 
    'UMAP', 
    'Basic AutoEncoder', 
    'Topological AutoEncoder', 
    'RTD AutoEncoder H1',
    'NSA AutoEncoder',
    'LNSA AutoEncoder',
    'GNSA AutoEncoder',
]
for model_name in names:
    if model_name in results:
        print(f"{model_name}: ${np.mean(results[model_name]):.4f} \pm {np.std(results[model_name]):.2f}$")

## K-NN Consistency

How many of the k nearest neighbors are common between all the points in the two spaces

In [None]:
import numpy as np
from sklearn.neighbors import NearestNeighbors
import random

def calculate_knn_consistency(space1, space2, k=5, num_points=10):
    """
    Calculate the K-NN consistency between two spaces.

    :param space1: numpy array of shape (n_samples, n_features_space1)
    :param space2: numpy array of shape (n_samples, n_features_space2)
    :param k: number of nearest neighbors to consider
    :return: consistency score
    """
    # Ensure the number of points is the same in both spaces
    assert space1.shape[0] == space2.shape[0], "The two spaces must have the same number of points"

    indices = random.sample(range(space1.shape[0]), num_points)

    # Fit K-NN on both spaces
    nn_space1 = NearestNeighbors(n_neighbors=k+1).fit(space1)
    nn_space2 = NearestNeighbors(n_neighbors=k+1).fit(space2)

    # Find K nearest neighbors in space1
    consistency_count = 0
    for i in indices:
        # Find K nearest neighbors in both spaces for the selected points
        neighbors_space1 = nn_space1.kneighbors([space1[i]], return_distance=False)[0]
        neighbors_space2 = nn_space2.kneighbors([space2[i]], return_distance=False)[0]
        # print(neighbors_space1)
        # print(neighbors_space2)
        # Exclude the point itself and calculate consistency
        common_neighbors = set(neighbors_space1[1:]).intersection(neighbors_space2[1:])
        consistency_count += len(common_neighbors)

    # _, indices_space1 = nn_space1.kneighbors(space1)

    # # Find K nearest neighbors in space2 for the corresponding points
    # _, indices_space2 = nn_space2.kneighbors(space2)

    # # Calculate consistency
    # consistency_count = 0
    # for i in range(space1.shape[0]):
    #     # Use set intersection to find common neighbors; exclude the point itself (hence indices starting from 1)
    #     common_neighbors = set(indices_space1[i, 1:]).intersection(indices_space2[i, 1:])
    #     consistency_count += len(common_neighbors)

    #consistency_score = consistency_count / (space1.shape[0] * k)
    consistency_score = consistency_count / (num_points * k)
    return consistency_score

In [None]:
if dataset_name in ['COIL-20','COIL-100']:
    data = np.load(f'data/{dataset_name}/prepared/data.npy')
    data = data.reshape(len(data), -1)
else:
    data = np.load(f'data/{dataset_name}/prepared/train_data.npy')
    data = data.reshape(len(data), -1)
data = data/255

In [None]:
for model_name in models:
    try:
        latent_data = np.load(f'data/{dataset_name}/{model_name}_latent_output_{version}.npy')
    except FileNotFoundError:
        continue
    print(data.shape)
    print(latent_data.shape)
    consistency_score = calculate_knn_consistency(data, latent_data, k=100, num_points=500)
    print(f"Model: {model_name}, k-NN consistency: {consistency_score:.3f}")

## Local Continuity Meta-Criterion (Not Used)
 
The Local Continuity Meta-Criterion (LCMC) is a metric used to evaluate the quality of dimensionality reduction techniques, particularly in terms of how well local neighborhoods are preserved in the reduced dimensionality space. To compute the LCMC, you typically compare the ranks of distances in the original high-dimensional space with the ranks in the reduced low-dimensional space.

In [None]:
import numpy as np
from scipy.spatial import distance
from scipy.stats import spearmanr

def calculate_lcmc(original_space, reduced_space, k, num_points=2000):
    """
    Calculate the Local Continuity Meta-Criterion (LCMC).

    :param original_space: numpy array of shape (n_samples, n_features_original)
    :param reduced_space: numpy array of shape (n_samples, n_features_reduced)
    :param k: number of nearest neighbors to consider
    :return: lcmc score
    """
    indices = random.sample(range(original_space.shape[0]), num_points)

    # Calculate pairwise distances in both spaces
    selected_original = original_space[indices]
    selected_reduced = reduced_space[indices]

    # Compute pairwise distances from selected points to all points
    dist_original = distance.cdist(selected_original, original_space)
    dist_reduced = distance.cdist(selected_reduced, reduced_space)

    # dist_original = distance.squareform(distance.pdist(original_space))
    # dist_reduced = distance.squareform(distance.pdist(reduced_space))


    
    # # Ranking distances for each point
    # ranks_original = np.argsort(dist_original, axis=1)[:, 1:k+1]
    # ranks_reduced = np.argsort(dist_reduced, axis=1)[:, 1:k+1]

    # # Calculate LCMC
    # correlations = []
    # for i in range(original_space.shape[0]):
    #     # Spearman rank correlation for the k-nearest neighbors
    #     rank_corr, _ = spearmanr(ranks_original[i], ranks_reduced[i])
    #     correlations.append(rank_corr)

    correlations = []
    for i in range(len(indices)):
        # Get ranks of the k nearest neighbors
        neighbors_original = np.argsort(dist_original[i])[:k + 1]
        neighbors_reduced = np.argsort(dist_reduced[i])[:k + 1]

        # Spearman rank correlation
        rank_corr, _ = spearmanr(neighbors_original, neighbors_reduced)
        correlations.append(rank_corr)
    
    lcmc_score = np.nanmean(correlations)
    return lcmc_score

In [None]:
if dataset_name in ['COIL-20','COIL-100']:
    data = np.load(f'data/{dataset_name}/prepared/data.npy')
    data = data.reshape(len(data), -1)
else:
    data = np.load(f'data/{dataset_name}/prepared/train_data.npy')
    data = data.reshape(len(data), -1)
data = data/255

In [None]:
for model_name in models:
    try:
        latent_data = np.load(f'data/{dataset_name}/{model_name}_latent_output_{version}.npy')
    except FileNotFoundError:
        continue
    print(data.shape)
    print(latent_data.shape)
    lcmc_score = calculate_lcmc(data, latent_data, k=30)
    print(f"Model: {model_name}, LCMC Score: {lcmc_score:.3f}")

In [None]:
# Example usage
# original_space = np.array([...]) # Your high-dimensional data
# reduced_space = np.array([...])  # Your reduced-dimensional data
# k = 5 # Number of nearest neighbors
