In [2]:
import sys
import os
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from scipy import linalg
from scipy.sparse.linalg import eigsh
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans

# Add project root to path
sys.path.append(os.path.abspath('../../'))

from spatial_fdr_evaluation.data.adbench_loader import load_from_ADbench
from spatial_fdr_evaluation.methods.kernels import compute_kernel_matrix, estimate_length_scale

sns.set_context("notebook", font_scale=1.2)
sns.set_style("whitegrid")
%matplotlib inline

In [None]:
def analyze_full_dataset(dataset_name, sigma_factor=1.0):
    """
    Loads full dataset, computes kernel, and checks spectral gap.
    """
    print(f"--- Processing {dataset_name} ---")
    
    # 1. Load & Scale Full Data
    data = load_from_ADbench(dataset_name)
    X_full = data['X_train']
    scaler = StandardScaler()
    X_full = scaler.fit_transform(X_full)
    
    # 2. Compute Global Kernel
    base_sigma = estimate_length_scale(X_full, method='median')
    sigma = base_sigma * sigma_factor
    print(f"  N={len(X_full)}, D={X_full.shape[1]}")
    print(f"  Sigma: {sigma:.4f}")
    
    K_full = compute_kernel_matrix(X_full, kernel_type='rbf', length_scale=sigma)
    
    # 3. Spectral Eigengap (Check for Blocks)
    # Normalized Affinity: D^-1/2 K D^-1/2
    D = np.array(K_full.sum(axis=1)).flatten()
    D[D < 1e-10] = 1e-10
    D_inv_sqrt = np.diag(1.0 / np.sqrt(D))
    A_norm = D_inv_sqrt @ K_full @ D_inv_sqrt
    
    # Get largest eigenvalues (equivalent to smallest Laplacian eigenvalues)
    try:
        evals, evecs = eigsh(A_norm, k=15, which='LA')
    except:
        evals, evecs = linalg.eigh(A_norm)
        evals, evecs = evals[-15:], evecs[:, -15:]
        
    # Sort descending
    idx = np.argsort(evals)[::-1]
    evals = evals[idx]
    evecs = evecs[:, idx]
    
    # Gap analysis (on Laplacian scale: 1 - lambda)
    gaps = np.diff(1 - evals)
    optimal_k = np.argmax(gaps) + 1
    max_gap = gaps[optimal_k-1]
    
    print(f"  Max Eigengap: {max_gap:.4f} at k={optimal_k}")
    
    return {
        'X': X_full,
        'K': K_full,
        'evecs': evecs,
        'optimal_k': optimal_k,
        'sigma': sigma,
        'max_gap': max_gap
    }

# Run
global_res = analyze_full_dataset('2_annthyroid')

In [None]:
def subsample_experiment(global_res, n_total=500, h1_ratio=0.1):
    """
    1. Defines Signal Region on FULL dataset.
    2. Subsamples to get exact H1/H0 ratio.
    """
    X = global_res['X']
    optimal_k = global_res['optimal_k']
    evecs = global_res['evecs']
    
    # 1. Define Signal Block on Full Data
    # Use spectral embedding to find the 'tightest' block
    kmeans = KMeans(n_clusters=optimal_k, random_state=42, n_init=10)
    labels_full = kmeans.fit_predict(evecs[:, :optimal_k])
    
    # Pick a random cluster to be the "Signal Source"
    signal_cluster_id = np.random.randint(optimal_k)
    h1_candidates = np.where(labels_full == signal_cluster_id)[0]
    h0_candidates = np.where(labels_full != signal_cluster_id)[0]
    
    # 2. Subsample
    n_h1 = int(n_total * h1_ratio)
    n_h0 = n_total - n_h1
    
    # Randomly draw required counts
    if len(h1_candidates) < n_h1:
        print(f"Warning: Signal cluster too small ({len(h1_candidates)}), taking all.")
        n_h1 = len(h1_candidates)
        
    idx_h1 = np.random.choice(h1_candidates, n_h1, replace=False)
    idx_h0 = np.random.choice(h0_candidates, n_h0, replace=False)
    
    final_indices = np.concatenate([idx_h1, idx_h0])
    # Shuffle so H1s aren't all at the start
    np.random.shuffle(final_indices)
    
    # 3. Create Final Experiment Set
    X_sub = X[final_indices]
    
    # Recompute K for the subset (Standard practice: Kernel depends on X_sub)
    # Alternatively, slice K_full. Slicing is faster and consistent.
    K_sub = global_res['K'][np.ix_(final_indices, final_indices)]
    
    # Create Labels (0=H1, 1=H0)
    # We stick to: 0=H1 (Alternative), 1=H0 (Null) as per your previous code
    true_labels = np.ones(n_total, dtype=int)
    # We need to find where the h1 indices ended up after shuffle
    # Easier way: create label array aligned with final_indices
    is_h1 = np.isin(final_indices, idx_h1)
    true_labels[is_h1] = 0
    
    return {
        'X': X_sub,
        'K': K_sub,
        'true_labels': true_labels,
        'indices': final_indices
    }

experiment = subsample_experiment(global_res, n_total=500, h1_ratio=0.1)

In [None]:
def test_kernel_separation(experiment):
    """
    Validates if H1 points are actually closer to each other than to H0s.
    """
    K = experiment['K']
    labels = experiment['true_labels']
    
    h1_idx = np.where(labels == 0)[0]
    h0_idx = np.where(labels == 1)[0]
    
    # Extract Pairwise Similarities
    # H1-H1 (exclude diagonal self-similarity)
    k_h1_h1 = K[np.ix_(h1_idx, h1_idx)]
    sim_h1_h1 = k_h1_h1[~np.eye(k_h1_h1.shape[0], dtype=bool)]
    
    # H0-H0
    k_h0_h0 = K[np.ix_(h0_idx, h0_idx)]
    sim_h0_h0 = k_h0_h0[~np.eye(k_h0_h0.shape[0], dtype=bool)]
    
    # H1-H0 (Across)
    k_h1_h0 = K[np.ix_(h1_idx, h0_idx)]
    sim_h1_h0 = k_h1_h0.flatten()
    
    # Plot
    plt.figure(figsize=(10, 6))
    data_to_plot = [sim_h1_h1, sim_h0_h0, sim_h1_h0]
    plt.boxplot(data_to_plot, labels=['H1-H1 (Within Signal)', 'H0-H0 (Within Noise)', 'H1-H0 (Across)'])
    plt.title("Kernel Similarity Distribution")
    plt.ylabel("Kernel Value $K(x,y)$")
    
    # Quantify
    mean_h1 = np.mean(sim_h1_h1)
    mean_across = np.mean(sim_h1_h0)
    print(f"Mean H1-H1 Similarity: {mean_h1:.4f}")
    print(f"Mean Across Similarity: {mean_across:.4f}")
    
    if mean_h1 > mean_across + 0.1: # Threshold is arbitrary, but gap should be visible
        print("PASS: H1 points are significantly closer to each other.")
    else:
        print("FAIL/WARNING: H1 points are not distinct from background.")

test_kernel_separation(experiment)

In [None]:
def generate_and_visualize(experiment):
    labels = experiment['true_labels']
    
    # Generate P-values
    p_values = np.zeros(len(labels))
    p_values[labels==1] = np.random.uniform(0, 1, size=(labels==1).sum()) # Nulls
    p_values[labels==0] = np.random.beta(0.1, 1, size=(labels==0).sum())  # Alts
    
    # Oracle View (Isolation Plot)
    # Calculate Isolation for H1s
    K = experiment['K']
    
    # Isolation = 1 - Mean sim to top 5 other H1s
    # (We compute this for ALL points to see the separation)
    K_nodiag = K.copy()
    np.fill_diagonal(K_nodiag, 0)
    
    # Sort neighbors
    nearest_sims = np.sort(K_nodiag, axis=1)[:, ::-1]
    top_k_sim = np.mean(nearest_sims[:, :10], axis=1)
    isolation = 1 - top_k_sim
    
    # Plot
    plt.figure(figsize=(10, 6))
    plt.scatter(isolation[labels==1], np.log10(p_values[labels==1]+1e-10), 
                c='blue', alpha=0.3, label='H0 (Null)')
    plt.scatter(isolation[labels==0], np.log10(p_values[labels==0]+1e-10), 
                c='red', alpha=0.8, label='H1 (Signal)')
    
    plt.xlabel("Kernel Isolation (1 - Sim)")
    plt.ylabel("log10(p-value)")
    plt.legend()
    plt.title("Evaluation Set: Difficulty Check")
    plt.show()

generate_and_visualize(experiment)