In [None]:
import pandas as pd
import gc
import warnings
from sklearn.manifold import trustworthiness
from scipy.stats import spearmanr
import math
from sklearn.datasets import fetch_openml
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.datasets import load_iris, load_wine, load_digits, load_breast_cancer, fetch_openml
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import precision_score, recall_score, f1_score, pairwise_distances
import scipy.stats as ss
from sklearn.model_selection import KFold
from sklearn.model_selection import train_test_split
import torch
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsRegressor, KNeighborsClassifier
from sklearn.linear_model import RidgeCV, LogisticRegression
from sklearn.metrics import mean_squared_error, r2_score, accuracy_score, f1_score
from typing import Literal, Dict, Tuple, List
from sklearn.preprocessing import LabelEncoder, OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import KFold, StratifiedKFold
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.metrics import pairwise_distances
from scipy.stats import spearmanr, pearsonr
import time
from torchvision import datasets, transforms
from sklearn.metrics import classification_report
from sklearn.exceptions import UndefinedMetricWarning  
from sklearn.metrics import confusion_matrix
from sklearn.neural_network import MLPClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import LinearSVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis, QuadraticDiscriminantAnalysis
from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import RidgeClassifier
from sklearn.ensemble import VotingClassifier


## MDS

In [None]:
from Gini_MDS import GiniMDS

## Functions to train

In [None]:
import numpy as np
import torch
import pandas as pd
from sklearn.manifold import trustworthiness
from sklearn.metrics import silhouette_score


DEVICE = "cuda:0"

def euclidean_distance_matrix(X):
    with torch.no_grad():
        T = torch.as_tensor(X, dtype=torch.float64)
        diff = T[:, None, :] - T[None, :, :]
        D = torch.sqrt(torch.clamp((diff ** 2).sum(dim=-1), min=0.0))
        return D.cpu().numpy().astype(np.float64)

def safe_trustworthiness(X, Z, n_neighbors=5):
    try:
        return float(trustworthiness(X, Z, n_neighbors=n_neighbors, metric="euclidean"))
    except Exception:
        return np.nan

def safe_silhouette(Z, y):
    try:
        y = np.asarray(y)
        if len(np.unique(y)) < 2 or len(y) <= len(np.unique(y)):
            return np.nan
        return float(silhouette_score(Z, y, metric="euclidean"))
    except Exception:
        return np.nan

def run_gini_mds(X, method, nu, n_components=2, device=DEVICE, dtype=torch.float64,
                 gini_mode="rank", max_iter=800, tol=1e-7):
    model = GiniMDS(n_components=int(n_components), nu=float(nu),
                    gini_mode=gini_mode, mds_method=method,
                    dtype=dtype, device=device, max_iter=max_iter, tol=tol, random_state=0)
    Z = model.fit_transform(X=X)
    D_fit = model.train_D_
    s1 = model.stress(D_fit, Z, kind="stress1")
    #sS = model.stress(D_fit, Z, kind="sammon")
    tw = safe_trustworthiness(X, Z)
    return model, Z, s1, tw

def run_euclid_mds(X, method, n_components=2, device=DEVICE, dtype=torch.float64,
                   iterative_on_cpu=True, max_iter=800, tol=1e-7):
    D = euclidean_distance_matrix(X)
    dev = device if method == "cmds" or not iterative_on_cpu else "cpu"
    dt  = dtype if method == "cmds" else torch.float32
    model = GiniMDS(n_components=int(n_components), mds_method=method,
                    dtype=dt, device=dev, max_iter=max_iter, tol=tol, random_state=0)
    Z = model.fit_transform(D=D)
    s1 = model.stress(D, Z, kind="stress1")
    #sS = model.stress(D, Z, kind="sammon")
    tw = safe_trustworthiness(X, Z)
    return model, Z, s1, tw

def neighborhood_hit(Z, y, n_neighbors=10):
    Z = np.asarray(Z); y = np.ravel(y)
    nn = NearestNeighbors(n_neighbors=n_neighbors+1).fit(Z)
    ind = nn.kneighbors(return_distance=False)[:, 1:]  
    return float((y[ind] == y[:, None]).mean())

def shepard_against(D_ref, Z):
    # D_ref: (n,n) distances in original space (fixed for the dataset)
    # Z    : (n,m) embedding
    DZ = pairwise_distances(Z, metric='euclidean')
    iu = np.triu_indices_from(D_ref, k=1)
    x = D_ref[iu]; z = DZ[iu]
    rho_s, _ = spearmanr(x, z)
    rho_p, _ = pearsonr(x, z)
    return float(rho_s), float(rho_p)



# MNIST + K FOLD

In [None]:
LABELS = (5, 6)
DEVICE = "cuda:1"
DTYPE = torch.float64
N_COMPONENTS = 1
RANDOM_STATE = 0
N_PER_CLASS = 250
NOISE_SIGMA = 1

warnings.filterwarnings("ignore", category=UndefinedMetricWarning)

def add_noise(X, sigma=0.2, seed=0):
    rng = np.random.default_rng(seed)
    noisy = X + sigma * rng.standard_normal(X.shape)
    return np.clip(noisy, 0.0, 1.0)

def extract_by_labels(ds, labels=(1, 7)):
    xs, ys = [], []
    labset = set(labels)
    for x, y in ds:
        if y in labset:
            xs.append(x.view(-1).numpy())
            ys.append(int(y))
    X = np.stack(xs, axis=0).astype(np.float32)
    y = np.array(ys, dtype=np.int64)
    return X, y

def stratified_sample(X, y, n_per_class, labels, seed):
    rng = np.random.default_rng(seed)
    Xs, ys = [], []
    for lab in labels:
        idx = np.where(y == lab)[0]
        rng.shuffle(idx)
        Xs.append(X[idx[:n_per_class]])
        ys.append(np.full(n_per_class, lab, dtype=np.int64))
    return np.vstack(Xs), np.hstack(ys)

# MNIST
print(f"Loading MNIST (labels {LABELS})...")
to_tensor = transforms.Compose([transforms.ToTensor()])
mnist_train = datasets.MNIST(root="./data", train=True, download=True, transform=to_tensor)
X_full, y_full = extract_by_labels(mnist_train, labels=LABELS)
print(f"Total samples: {len(X_full)} | Per class: {[np.sum(y_full==lab) for lab in LABELS]}")

# K-Folds
kf = StratifiedKFold(n_splits=10, shuffle=True, random_state=RANDOM_STATE)
acc_euc_folds, acc_gini_folds = [], []
conf_euc_all, conf_gini_all = [], []
y_true_all, y_pred_euc_all, y_pred_gini_all = [], [], []

fold_idx = 1
for train_idx, test_idx in kf.split(X_full, y_full):
    print(f"\n=============")
    print(f" Fold {fold_idx} / 10")
    print(f"===============")

    X_train_raw, X_test_raw = X_full[train_idx], X_full[test_idx]
    y_train_raw, y_test_raw = y_full[train_idx], y_full[test_idx]

    # Limit to 250 samples per class for train/test
    X_train, y_train = stratified_sample(X_train_raw, y_train_raw, N_PER_CLASS, LABELS, seed=fold_idx)
    X_test, y_test = stratified_sample(X_test_raw, y_test_raw, N_PER_CLASS, LABELS, seed=fold_idx + 100)
    print(f"Fold {fold_idx} — X_train: {X_train.shape}, X_test: {X_test.shape}")
    X_train = add_noise(X_train, sigma=NOISE_SIGMA, seed=0)
    X_test  = add_noise(X_test,  sigma=NOISE_SIGMA, seed=1)

    try:
        print(" Running Euclidean MDS...")
        D_train_euc = pairwise_distances(X_train, metric="euclidean")
        euc_cmds = GiniMDS(
            n_components=N_COMPONENTS,
            mds_method="cmds",
            dtype=DTYPE,
            device=DEVICE,
        )
        euc_cmds.fit(D=D_train_euc)
        Z_train_euc = euc_cmds.embedding_
        D_test_train_euc = pairwise_distances(X_test, X_train, metric="euclidean")
        Z_test_euc = euc_cmds.fit_inverse_transform(D_test_train_euc)

        print(" Running Gini MDS grid search...")
        nu_grid = np.arange(1.1, 5, 0.1)
        best = (None, np.inf, None)
        for nu in nu_grid:
            model = GiniMDS(
                n_components=N_COMPONENTS,
                nu=float(nu),
                gini_mode="rank",
                mds_method="cmds",
                dtype=DTYPE,
                device=DEVICE,  # ok for RTX 8000 with 500 samples
                random_state=RANDOM_STATE,
            )
            model.fit(X_train)
            stress = model.stress(model.train_D_, model.embedding_)
            if stress < best[1]:
                best = (nu, stress, model)

        nu_star, _, model_gini = best
        print(f"Best nu for fold {fold_idx}: {nu_star:.2f}")
        Z_train_gini = model_gini.embedding_

        # Nyström for test set (only 1000 samples total)
        X_all = np.vstack([X_train, X_test])
        with torch.no_grad():
            D_all = model_gini._gini_distance_matrix(
                torch.as_tensor(X_all, device=model_gini.device, dtype=model_gini.dtype)
            ).cpu().numpy()
        D_test_train_gini = D_all[len(X_train):, :len(X_train)]
        Z_test_gini = model_gini.fit_inverse_transform(D_test_train_gini)

        # Voting
        def make_voter():
            clf_mlp = MLPClassifier(hidden_layer_sizes=(64,),max_iter=500,random_state=RANDOM_STATE)
            clf_log = LogisticRegression(max_iter=1000,random_state=RANDOM_STATE)
            clf_knn = KNeighborsClassifier(n_neighbors=5)
            clf_lda = LinearDiscriminantAnalysis()
            clf_qda = QuadraticDiscriminantAnalysis()
            clf_svm = LinearSVC(random_state=RANDOM_STATE)
            clf_tree = DecisionTreeClassifier(random_state=RANDOM_STATE)
            clf_nb = GaussianNB()
            clf_ridge = RidgeClassifier(random_state=RANDOM_STATE)

            return VotingClassifier(
                estimators=[
                    ("mlp", clf_mlp),
                    ("logreg", clf_log),
                    ("knn", clf_knn),
                    ("lda", clf_lda),
                    ("qda", clf_qda),
                    ("svm", clf_svm),
                    ("tree", clf_tree),
                    ("ridge", clf_ridge),
                    ("nb", clf_nb),],
                voting="hard")

        voting_euc = make_voter()
        voting_euc.fit(Z_train_euc, y_train)
        y_pred_euc = voting_euc.predict(Z_test_euc)

        voting_gini = make_voter()
        voting_gini.fit(Z_train_gini, y_train)
        y_pred_gini = voting_gini.predict(Z_test_gini)

        # METRICS
        acc_euc = accuracy_score(y_test, y_pred_euc)
        acc_gini = accuracy_score(y_test, y_pred_gini)
        conf_euc = confusion_matrix(y_test, y_pred_euc, labels=LABELS)
        conf_gini = confusion_matrix(y_test, y_pred_gini, labels=LABELS)
        print(f"Fold {fold_idx} — Euclidean acc: {acc_euc:.4f}, Gini acc: {acc_gini:.4f}")

        acc_euc_folds.append(acc_euc)
        acc_gini_folds.append(acc_gini)
        conf_euc_all.append(conf_euc)
        conf_gini_all.append(conf_gini)
        y_true_all.extend(y_test)
        y_pred_euc_all.extend(y_pred_euc)
        y_pred_gini_all.extend(y_pred_gini)

    except Exception as e:
        print(f"Error on fold {fold_idx}: {e}")

    finally:
        # GPU CLEANUP
        try:
            for obj_name in ["euc_cmds", "model_gini"]:
                if obj_name in locals():
                    obj = locals()[obj_name]
                    if hasattr(obj, "embedding_") and torch.is_tensor(obj.embedding_):
                        obj.embedding_ = obj.embedding_.cpu()
                    if hasattr(obj, "train_D_") and torch.is_tensor(obj.train_D_):
                        obj.train_D_ = obj.train_D_.cpu()
                    del obj

            for var in [
                "D_train_euc",
                "D_all",
                "Z_train_euc",
                "Z_test_euc",
                "Z_train_gini",
                "Z_test_gini",
                "X_train",
                "X_test",
                "y_train",
                "y_test",
            ]:
                if var in locals():
                    del locals()[var]

            gc.collect()
            if torch.cuda.is_available():
                torch.cuda.synchronize()
                torch.cuda.empty_cache()
                torch.cuda.ipc_collect()

        except Exception as cleanup_error:
            print(f"GPU cleanup issue (ignored safely): {cleanup_error}")

        print(f"GPU memory cleared after fold {fold_idx}")
        fold_idx += 1

# Results
print("\n==========================")
print(" FINAL RESULTS (10 FOLDS)")
print("============================")

if len(acc_euc_folds) > 0 and len(acc_gini_folds) > 0:
    print(f"Mean Euclidean acc: {np.mean(acc_euc_folds):.4f}")
    print(f"Mean Gini acc:      {np.mean(acc_gini_folds):.4f}")

if len(y_true_all) > 0:
    final_conf_euc = np.sum(conf_euc_all, axis=0)
    final_conf_gini = np.sum(conf_gini_all, axis=0)

    print("\n Final Euclidean Confusion Matrix:")
    print(final_conf_euc)
    print("\n Final Gini Confusion Matrix:")
    print(final_conf_gini)

    print("\n Euclidean Classification Report (Aggregated):")
    print(classification_report(y_true_all, y_pred_euc_all, labels=LABELS, digits=4, zero_division=0))

    print("\n Gini Classification Report (Aggregated):")
    print(classification_report(y_true_all, y_pred_gini_all, labels=LABELS, digits=4, zero_division=0))
else:
    print("⚠️ No folds completed successfully — no results to report.")


## MINIST without 10-folds

In [None]:
# MNIST MDS pipeline (parametric labels) with noise injection, GPU device cuda:1
# - Uses your GiniMDS class (already defined in your environment)
# - Handles arbitrary label pairs (e.g., (3,5), (4,6), (1,7))
# - Adds Gaussian noise to MNIST samples
# - Avoids duplicate MDS/Nyström helpers by using GiniMDS(cmds) API
# - Includes safety checks for empty/insufficient samples

import numpy as np
import torch
from sklearn.metrics import pairwise_distances, confusion_matrix, classification_report, accuracy_score
from sklearn.neural_network import MLPClassifier
from torchvision import datasets, transforms

# ----------------------
# Config 
# ----------------------
LABELS = (5, 6)           # <-- change this to any pair, e.g., (4, 6) or (1, 7)
# 4-6 no 1-7 no 1-4 no 3-5 no
# 5-6 ok 6-8 ok

N_PER_CLASS_TRAIN = 250
N_PER_CLASS_TEST  = 250
DEVICE = "cuda:0"
DTYPE  = torch.float64
N_COMPONENTS = 2
RANDOM_STATE = 0
NOISE_SIGMA = 0

# ----------------------
# Data helpers
# ----------------------
def extract_by_labels(ds, labels=(1, 7)):
    xs, ys = [], []
    labset = set(labels)
    for x, y in ds:
        if y in labset:
            xs.append(x.view(-1).numpy())  # 28x28 -> 784
            ys.append(int(y))
    if not xs:
        raise ValueError(f"No samples found for labels {labels}. "
                         f"Make sure the dataset split actually contains them.")
    X = np.stack(xs, axis=0).astype(np.float32)  # values in [0,1]
    y = np.array(ys, dtype=np.int64)
    return X, y

def stratified_sample(X, y, n_per_class, labels=(1, 7), seed=0):
    rng = np.random.default_rng(seed)
    Xs, ys = [], []
    for lab in labels:
        idx = np.where(y == lab)[0]
        if len(idx) < n_per_class:
            raise ValueError(
                f"Not enough samples for label {lab}: requested {n_per_class}, "
                f"but only {len(idx)} available."
            )
        rng.shuffle(idx)
        Xs.append(X[idx[:n_per_class]])
        ys.append(np.full(n_per_class, lab, dtype=np.int64))
    return np.vstack(Xs), np.hstack(ys)

def add_noise(X, sigma=0.2, seed=0):
    rng = np.random.default_rng(seed)
    noisy = X + sigma * rng.standard_normal(X.shape)
    return np.clip(noisy, 0.0, 1.0)

# ----------------------
# Load MNIST and extract labels
# ----------------------
print(f"Loading MNIST and keeping labels {LABELS}...")
to_tensor = transforms.Compose([transforms.ToTensor()])
mnist_train = datasets.MNIST(root="./data", train=True,  download=True, transform=to_tensor)
mnist_test  = datasets.MNIST(root="./data", train=False, download=True, transform=to_tensor)

X_train_full, y_train_full = extract_by_labels(mnist_train, labels=LABELS)
X_test_full,  y_test_full  = extract_by_labels(mnist_test,  labels=LABELS)

X_train, y_train = stratified_sample(X_train_full, y_train_full, N_PER_CLASS_TRAIN, labels=LABELS, seed=0)
X_test,  y_test  = stratified_sample(X_test_full,  y_test_full,  N_PER_CLASS_TEST,  labels=LABELS, seed=1)

# Add Gaussian noise
X_train = add_noise(X_train, sigma=NOISE_SIGMA, seed=0)
X_test  = add_noise(X_test,  sigma=NOISE_SIGMA, seed=1)

n_train = X_train.shape[0]
print(f"Train: {n_train} samples | Test: {X_test.shape[0]} samples (with noise)")

# ----------------------
# Euclidean MDS via GiniMDS('cmds') fit on Euclidean D
# ----------------------
print("Running Euclidean MDS (classical MDS on Euclidean distances) ...")
D_train_euc = pairwise_distances(X_train, metric="euclidean")

euc_cmds = GiniMDS(
    n_components=N_COMPONENTS,
    mds_method='cmds',
    dtype=DTYPE,
    device=DEVICE
)
euc_cmds.fit(D=D_train_euc)
Z_train_euc = euc_cmds.embedding_

D_test_train_euc = pairwise_distances(X_test, X_train, metric="euclidean")
Z_test_euc = euc_cmds.fit_inverse_transform(D_test_train_euc)

# ----------------------
# Gini MDS grid search (fit on X; model computes Gini distances internally)
# ----------------------
print("Running Gini MDS grid search...")
nu_grid = np.arange(1.1, 4.1, 0.1)
best = (None, np.inf, None)  # (nu, stress, model)

for nu in nu_grid:
    model = GiniMDS(
        n_components=N_COMPONENTS,
        nu=float(nu),
        gini_mode='rank',
        mds_method='cmds',
        dtype=DTYPE,
        device=DEVICE,
        random_state=RANDOM_STATE
    )
    model.fit(X_train)
    stress = model.stress(model.train_D_, model.embedding_)
    print(f"nu={nu:.2f} -> stress1={stress:.6f}")
    if stress < best[1]:
        best = (nu, stress, model)

nu_star, _, model_gini = best
Z_train_gini = model_gini.embedding_
print(f"Best nu for Gini MDS: {nu_star:.2f}")

# ----------------------
# Gini Nyström: use Gini distances (same metric as training)
# ----------------------
print("Computing Gini test–train distances for Nyström...")
X_all = np.vstack([X_train, X_test])
with torch.no_grad():
    D_all = model_gini._gini_distance_matrix(
        torch.as_tensor(X_all, device=model_gini.device, dtype=model_gini.dtype)
    ).cpu().numpy()
D_test_train_gini = D_all[n_train:, :n_train]
Z_test_gini = model_gini.fit_inverse_transform(D_test_train_gini)

# ----------------------
# Classify 
# ----------------------
clf_euc = MLPClassifier(hidden_layer_sizes=(64,), max_iter=500, random_state=RANDOM_STATE)
clf_euc.fit(Z_train_euc, y_train)
y_pred_euc = clf_euc.predict(Z_test_euc)

clf_gini = MLPClassifier(hidden_layer_sizes=(64,), max_iter=500, random_state=RANDOM_STATE)
clf_gini.fit(Z_train_gini, y_train)
y_pred_gini = clf_gini.predict(Z_test_gini)

#print("\n Euclidean MDS — Accuracy:", accuracy_score(y_test, y_pred_euc))
#print(classification_report(y_test, y_pred_euc, digits=4))
#print("Confusion matrix (Euclidean):")
#print(confusion_matrix(y_test, y_pred_euc))

#print("\n Gini MDS (nu={:.2f}) — Accuracy:".format(nu_star), accuracy_score(y_test, y_pred_gini))
#print(classification_report(y_test, y_pred_gini, digits=4))
#print("Confusion matrix (Gini):")
#print(confusion_matrix(y_test, y_pred_gini))

# Define individual classifiers
logreg_euc = LogisticRegression(max_iter=1000, random_state=RANDOM_STATE)
knn_euc = KNeighborsClassifier(n_neighbors=5)
logreg_gini = LogisticRegression(max_iter=1000, random_state=RANDOM_STATE)
knn_gini = KNeighborsClassifier(n_neighbors=5)

# Train each base classifier
logreg_euc.fit(Z_train_euc, y_train)
knn_euc.fit(Z_train_euc, y_train)
logreg_gini.fit(Z_train_gini, y_train)
knn_gini.fit(Z_train_gini, y_train)

# Vote Euclidean
voting_euc = VotingClassifier(
    estimators=[
        ('mlp', clf_euc),
        ('logreg', logreg_euc),
        ('knn', knn_euc)
    ],
    voting='hard'
)
voting_euc.fit(Z_train_euc, y_train)
y_pred_vote_euc = voting_euc.predict(Z_test_euc)

print("\n Euclidean VotingClassifier — Accuracy:", accuracy_score(y_test, y_pred_vote_euc))
print(classification_report(y_test, y_pred_vote_euc, digits=4))
print("Confusion matrix (Euclidean Voting):")
print(confusion_matrix(y_test, y_pred_vote_euc))

# Vote Gini
voting_gini = VotingClassifier(
    estimators=[
        ('mlp', clf_gini),
        ('logreg', logreg_gini),
        ('knn', knn_gini)
    ],
    voting='hard'
)
voting_gini.fit(Z_train_gini, y_train)
y_pred_vote_gini = voting_gini.predict(Z_test_gini)

print("\n Gini VotingClassifier — Accuracy:", accuracy_score(y_test, y_pred_vote_gini))
print(classification_report(y_test, y_pred_vote_gini, digits=4))
print("Confusion matrix (Gini Voting):")
print(confusion_matrix(y_test, y_pred_vote_gini))


# Images

In [None]:
import matplotlib.pyplot as plt
import numpy as np

def show_samples_6_and_8(X, y, labels=(6, 8), n_per_class=3):
    """Display n samples of digits 6 and 8: top row = first digit, bottom = second."""
    fig, axes = plt.subplots(2, n_per_class, figsize=(n_per_class * 2, 4))

    for row, label in enumerate(labels):
        idx = np.where(y == label)[0][:n_per_class]
        for col, i in enumerate(idx):
            ax = axes[row, col]
            ax.imshow(X[i].reshape(28, 28), cmap='viridis')
            ax.set_title(f"{label}", fontsize=10)
            ax.axis("off")

    plt.suptitle(f"MNIST samples for digits {labels[0]} and {labels[1]}", fontsize=14)
    plt.tight_layout()
    plt.subplots_adjust(top=0.85)
    plt.show()

    
LABELS = (5, 6)  
X_train_full, y_train_full = extract_by_labels(mnist_train, labels=LABELS)
show_samples_6_and_8(X_train_full, y_train_full, labels=LABELS, n_per_class=3)


In [None]:
import numpy as np
import matplotlib.pyplot as plt

def add_noise(X, sigma=0.2, seed=0):
    """
    Add Gaussian noise to flattened MNIST images.
    X: (n_samples, 784)
    sigma: noise standard deviation
    seed: random seed
    """
    rng = np.random.default_rng(seed)
    noisy = X + sigma * rng.standard_normal(X.shape)
    return np.clip(noisy, 0.0, 1.0)


def show_samples_with_noise(X, y, labels=(6, 8), n_per_class=3, sigma=0.2, seed=0):
    """
    Display clean samples (top) and noisy samples (bottom) for the given labels.
    Top 2 rows = clean images (for each label)
    Bottom 2 rows = noisy images (for each label)
    """
    # Add noise
    X_noisy = add_noise(X, sigma=sigma, seed=seed)

    # Select indices for each label
    idx_clean = []
    for lab in labels:
        idx_lab = np.where(y == lab)[0][:n_per_class]
        idx_clean.extend(idx_lab)

    fig, axes = plt.subplots(2, len(idx_clean), figsize=(len(idx_clean) * 1.8, 4))
    plt.suptitle(
        f"MNIST digits {labels[0]} and {labels[1]}",
        fontsize=14,
    )

    for col, i in enumerate(idx_clean):
        # Top row: clean images
        axes[0, col].imshow(X[i].reshape(28, 28), cmap="viridis")
        axes[0, col].set_title(f"Clean {y[i]}", fontsize=9)
        axes[0, col].axis("off")

        # Bottom row: noisy images
        axes[1, col].imshow(X_noisy[i].reshape(28, 28), cmap="viridis")
        axes[1, col].set_title(f"Noisy {y[i]}", fontsize=9)
        axes[1, col].axis("off")

    plt.tight_layout()
    plt.subplots_adjust(top=0.82)
    plt.show()


# Example usage
LABELS = (5, 6)
X_train_full, y_train_full = extract_by_labels(mnist_train, labels=LABELS)
show_samples_with_noise(X_train_full, y_train_full, labels=LABELS, n_per_class=2, sigma=0.3)


# DIGITS

In [None]:
import numpy as np
from sklearn.datasets import load_digits
from sklearn.metrics import pairwise_distances, confusion_matrix
from sklearn.neural_network import MLPClassifier
import torch

# ----------------------
# Step 1: Load Digits (Digits 1 & 7)
# ----------------------
print("Loading Digits dataset (digits 1 and 7)...")
digits = load_digits()
X_raw = digits.data / 16.0  # Normalize (digits pixel values are 0–16)
y_raw = digits.target

# Keep only digits 1 and 7
mask = (y_raw == 6) | (y_raw == 8)
X = X_raw[mask]
y = y_raw[mask]

# ----------------------
# Step 2: Stratified Sampling
# ----------------------
def stratified_sample(X, y, n_per_class, seed=0):
    rng = np.random.default_rng(seed)
    X_out, y_out = [], []
    for label in [6, 8]:
        idx = np.where(y == label)[0]
        rng.shuffle(idx)
        X_out.append(X[idx[:n_per_class]])
        y_out.append(np.full(n_per_class, label))
    return np.vstack(X_out), np.hstack(y_out)

X_train, y_train = stratified_sample(X, y, 50, seed=0)
X_test, y_test = stratified_sample(X[100:], y[100:], 50, seed=1)

n_train = X_train.shape[0]

# ----------------------
# Step 3: Euclidean MDS (classical) using GiniMDS('cmds') on Euclidean D
# ----------------------
print("Running Euclidean MDS with GiniMDS(cmds)...")
D_train_euc = pairwise_distances(X_train, metric="euclidean")

euc_cmds = GiniMDS(
    n_components=3,
    mds_method='cmds',
    dtype=torch.float64,
    device="cuda:1"   # Force GPU usage
)
euc_cmds.fit(D=D_train_euc)
Z_train_euc = euc_cmds.embedding_

D_test_train_euc = pairwise_distances(X_test, X_train, metric="euclidean")
Z_test_euc = euc_cmds.fit_inverse_transform(D_test_train_euc)

# ----------------------
# Step 4: Gini MDS with ν Grid Search
# ----------------------
print("Running Gini MDS grid search...")
nu_grid = np.arange(1.1, 3.1, 0.1)
best = (None, np.inf, None)  # (nu, stress, model)

for nu in nu_grid:
    model = GiniMDS(
        n_components=3,
        nu=float(nu),
        gini_mode='rank',
        mds_method='cmds',
        dtype=torch.float64,
        device="cuda:1"   # Force GPU usage
    )
    model.fit(X_train)
    stress = model.stress(model.train_D_, model.embedding_)
    print(f"nu={nu:.2f},  stress1={stress:.6f}")
    if stress < best[1]:
        best = (nu, stress, model)

nu_star, _, model_gini = best
Z_train_gini = model_gini.embedding_
print(f" Best nu for Gini MDS: {nu_star:.2f}")

# ----------------------
# Step 5: Project test points using Gini's fit_inverse_transform
# ----------------------
print("Computing Gini test–train distances for Nyström...")
X_all = np.vstack([X_train, X_test])
with torch.no_grad():
    D_all = model_gini._gini_distance_matrix(
        torch.as_tensor(X_all, device=model_gini.device, dtype=model_gini.dtype)
    ).cpu().numpy()
D_test_train_gini = D_all[n_train:, :n_train]
Z_test_gini = model_gini.fit_inverse_transform(D_test_train_gini)

# ----------------------
# Step 6: Train and Evaluate MLP Classifier
# ----------------------
clf_euc = MLPClassifier(hidden_layer_sizes=(32,), max_iter=500, random_state=0)
clf_euc.fit(Z_train_euc, y_train)
y_pred_euc = clf_euc.predict(Z_test_euc)

clf_gini = MLPClassifier(hidden_layer_sizes=(32,), max_iter=500, random_state=0)
clf_gini.fit(Z_train_gini, y_train)
y_pred_gini = clf_gini.predict(Z_test_gini)

print("\n Euclidean MDS — Accuracy:", accuracy_score(y_test, y_pred_euc))
print(classification_report(y_test, y_pred_euc, digits=4))
print("Confusion matrix (Euclidean):")
print(confusion_matrix(y_test, y_pred_euc))

print("\n Gini MDS (nu={:.2f}) — Accuracy:".format(nu_star), accuracy_score(y_test, y_pred_gini))
print(classification_report(y_test, y_pred_gini, digits=4))
print("Confusion matrix (Gini):")
print(confusion_matrix(y_test, y_pred_gini))


In [None]:
import matplotlib.pyplot as plt

def show_digits_6_and_8(X, y, n_each=3):
    """Show n samples of digits 6 and 8 from the dataset."""
    fig, axes = plt.subplots(1, 2 * n_each, figsize=(12, 2))

    idx_6 = np.where(y == 6)[0][:n_each]
    idx_8 = np.where(y == 8)[0][:n_each]

    selected = np.concatenate([idx_6, idx_8])
    for i, idx in enumerate(selected):
        ax = axes[i]
        ax.imshow(X[idx].reshape(8, 8), cmap='gray')  # 8x8 for sklearn digits
        ax.set_title(f"Label: {y[idx]}")
        ax.axis("off")

    plt.suptitle("Digits 6 and 8")
    plt.tight_layout()
    plt.show()
show_digits_6_and_8(X, y)


## Outliers

In [None]:
from pyod.models.iforest import IForest         # robust covariance (Mahalanobis)
from outliers import smirnov_grubbs as grubbs

for ds_name, loader in DATASETS.items():
    X, y = loader()
    X = np.asarray(X, dtype=np.float64)
    clf = IForest(contamination=0.01, random_state=0)          # or IForest(), LOF(), etc.
    clf.fit(X)
    labels = clf.labels_            # 1 = outlier, 0 = inlier
    scores = clf.decision_scores_
    n_outliers = labels.sum()
    #outliers = grubbs.test(X[:, 0], alpha=0.05)
    print(f"{ds_name}\n{n_outliers}")



# Time

In [None]:
import numpy as np
import time
from sklearn.metrics import pairwise_distances
from sklearn.manifold import MDS

# from datasets import load_iono
# from gini_mds import run_gini_mds, run_euclid_mds

# ---- Load and prepare data
X, y = load_bank()
X = np.asarray(X, dtype=np.float64)

# --- Add noise to 5% of X
num_samples = X.shape[0]
num_noisy = int(0.05 * num_samples)
noisy_indices = np.random.choice(num_samples, size=num_noisy, replace=False)
noise_std = 10 * np.std(X)
noise = np.random.normal(loc=0.0, scale=noise_std, size=(num_noisy, X.shape[1]))
X_noisy = X.copy()
X_noisy[noisy_indices] += noise

# --- Reference distances
D_ref = pairwise_distances(X_noisy, metric='euclidean')

# ---- Parameters
nu_grid = np.arange(1.1, 4.0 + 1e-9, 0.1)
gini_method = "cmds"
m = 3  # components

# ---- 1️ Gini MDS grid search for best ν
t0 = time.perf_counter()
best = (None, np.inf)
for nu in nu_grid:
    _, Z_g, s1_g, _ = run_gini_mds(X_noisy, gini_method, nu, n_components=m)
    if s1_g < best[1]:
        best = (nu, s1_g)
t_gini = time.perf_counter() - t0
best_nu = best[0]

# ---- 2️ Euclidean MDS (torch-based, via Gini module)
t1 = time.perf_counter()
_, Z_e_torch, s1_e_torch, _ = run_euclid_mds(X_noisy, "cmds", n_components=m)
t_eucl_torch = time.perf_counter() - t1

# ---- 3️ Euclidean MDS (sklearn)
t2 = time.perf_counter()
std_mds = MDS(n_components=m, dissimilarity='euclidean', random_state=42)
_ = std_mds.fit_transform(X_noisy)
t_sklearn = time.perf_counter() - t2

# ---- Print results
print("\n=== Execution Times (Ionosphere, 3 components) ===")
print(f"Gini MDS (best nu={best_nu:.2f}): {t_gini:.3f} s")
print(f"Euclidean MDS (torch):           {t_eucl_torch:.3f} s")
print(f"Euclidean MDS (sklearn):         {t_sklearn:.3f} s")
