In [183]:
import os
import numpy as np
import matplotlib.pyplot as plt
import json

from scipy.stats import ttest_ind
from sklearn.metrics import roc_auc_score
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split

In [222]:
def get_feature_vector(index, p=7):
    # feature_points = np.array([9, 29])
    # illuminant = np.load(f'illuminant_features/{p}/{index}.npy')
    # R = illuminant[:, 0][feature_points]
    # G = illuminant[:, 1][feature_points]
    # B = illuminant[:, 2][feature_points]
    
    # feature1 = R[1] - R[0]
    # feature2 = G[1] - G[0]
    # feature3 = np.std(B)
    # feature4 = feature1 * feature2 * feature3

    feature_points = np.arange(0, 40, 1)
    feature_vector = np.load(f'illuminant_features/{index}.npy').sum(axis=1)[feature_points]
    feature1 = np.mean(feature_vector)
    feature2 = np.std(feature_vector)
    feature3 = np.max(feature_vector)
    feature4 = np.min(feature_vector)
    
    return np.array([feature1, feature2, feature3, feature4])

def get_p_feature_vector(index):
    # feature_points = np.array([9, 29])
    feature_vector = []
    for p in [1, 3, 7, 10]:
        # 0: sigma = 0.5, 1: sigma = 1, 2:sigma = 1.5
        feature_vector.append(np.load(f'illuminant_features/{p}/{index}.npy').sum(axis=1)[1])

    feature_vector = np.array(feature_vector)
    feature1 = np.mean(feature_vector)
    feature2 = np.var(feature_vector)
    feature3 = np.max(feature_vector)
    feature4 = np.min(feature_vector)
    return np.array([feature1, feature2, feature3, feature4])

In [None]:
with open('arr_reference.json', 'r') as f:
    ref = json.load(f)
    
others = ref['others']
worst25 = ref['worst25']

In [197]:
def single_point_discriminability(data_all: np.ndarray,
                                  data_worst: np.ndarray,
                                  feature_nums: np.ndarray,
                                  output_dir: str = None) -> list:
    """
    Evaluate and visualize univariate discriminability at each sigma.

    For each sigma value, this function:
      1. Computes Cohen's d between `data_all` and `data_worst` at that sigma.
      2. Performs an independent t-test (unequal variance) and returns the p-value.
      3. Computes ROC AUC treating `data_worst` as the positive class.
      4. Plots overlaid histograms of the two distributions with annotations.

    Parameters
    ----------
    data_all : np.ndarray, shape (n_all, n_sigmas)
        Feature values for the entire dataset across different sigma values.
    data_worst : np.ndarray, shape (n_worst, n_sigmas)
        Feature values for the worst-25% subset across the same sigma values.
    sigmas : np.ndarray, shape (n_sigmas,)
        The sigma values corresponding to the columns of data_all and data_worst.
    output_dir : str, optional
        Directory path to save each histogram plot. If None, plots display interactively.

    Returns
    -------
    metrics : list of dicts
        A list where each entry corresponds to one sigma and contains:
          - 'sigma': the sigma value
          - 'cohen_d': Cohen's d effect size
          - 'p_value': two-sided p-value from t-test
          - 'auc': ROC AUC score

    Example
    -------
    >>> metrics = single_point_discriminability(
    ...     data_all=all_feats,              # shape (513, 21)
    ...     data_worst=worst25_feats,         # shape (128, 21)
    ...     sigmas=np.linspace(0, 2, 21),
    ...     output_dir='figures')
    """
    n_all, _ = data_all.shape
    n_worst, _ = data_worst.shape
    combined = np.vstack([data_all, data_worst])
    labels = np.concatenate([np.zeros(n_all), np.ones(n_worst)])

    metrics = []
    os.makedirs(output_dir, exist_ok=True) if output_dir else None

    for idx, feature_nums in enumerate(feature_nums):
        x_all = data_all[:, idx]
        x_worst = data_worst[:, idx]

        # Cohen's d
        var_all = x_all.var(ddof=1)
        var_worst = x_worst.var(ddof=1)
        pooled_sd = np.sqrt(((n_all - 1)*var_all + (n_worst - 1)*var_worst) / (n_all + n_worst - 2))
        cohen_d = (x_all.mean() - x_worst.mean()) / pooled_sd

        # t-test (unequal variance)
        t_stat, p_value = ttest_ind(x_all, x_worst, equal_var=False)

        # AUC (worst25 as positive class)
        auc = roc_auc_score(labels, combined[:, idx])

        metrics.append({
            'feature_nums': feature_nums,
            'cohen_d': cohen_d,
            'p_value': p_value,
            'auc': auc
        })

        # Plot distributions
        plt.figure()
        plt.hist(x_all, bins=30, alpha=0.5, label='all')
        plt.hist(x_worst, bins=30, alpha=0.5, label='worst25')
        plt.title(f"sigma={feature_nums:.2f} | d={cohen_d:.2f}, p={p_value:.3f}, AUC={auc:.3f}")
        plt.xlabel('Illuminant feature')
        plt.ylabel('Count')
        plt.legend()

        if output_dir:
            fname = os.path.join(output_dir, f"feature_nums_{feature_nums:.2f}.png")
            plt.savefig(fname, bbox_inches='tight')
            plt.close()
        else:
            plt.show()

    return metrics


In [224]:
other_features = []
for i in others:
    feature_vector = get_p_feature_vector(i)
    other_features.append(feature_vector)
    
worst25_features = []
for i in worst25:
    feature_vector = get_p_feature_vector(i)
    worst25_features.append(feature_vector)

os.makedirs(f'./figs/', exist_ok=True)
single_point_discriminability(
        data_all=np.array(other_features),
        data_worst=np.array(worst25_features),
        feature_nums=np.array([1, 2, 3, 4]),
        output_dir=f'./figs/')

  cohen_d = (x_all.mean() - x_worst.mean()) / pooled_sd


[{'feature_nums': 1,
  'cohen_d': -0.35440858236572526,
  'p_value': 0.0006502859954185577,
  'auc': 0.6197037337662338},
 {'feature_nums': 2, 'cohen_d': nan, 'p_value': nan, 'auc': 0.5},
 {'feature_nums': 3,
  'cohen_d': -0.35440858236572526,
  'p_value': 0.0006502859954185577,
  'auc': 0.6197037337662338},
 {'feature_nums': 4,
  'cohen_d': -0.35440858236572526,
  'p_value': 0.0006502859954185577,
  'auc': 0.6197037337662338}]

In [230]:
other_features = []
for i in others:
    feature_vector = get_feature_vector(i, p=7)
    other_features.append(feature_vector)
    
worst25_features = []
for i in worst25:
    feature_vector = get_feature_vector(i, p=7)
    worst25_features.append(feature_vector)

single_point_discriminability(
        data_all=np.array(other_features),
        data_worst=np.array(worst25_features),
        feature_nums=np.array([1, 2, 3, 4]),
        output_dir=f'./')

[{'feature_nums': 1,
  'cohen_d': -0.37928431009039354,
  'p_value': 0.006882092089552756,
  'auc': 0.6261831848552338},
 {'feature_nums': 2,
  'cohen_d': -0.6139777283160801,
  'p_value': 0.0017935657631015255,
  'auc': 0.644244153674833},
 {'feature_nums': 3,
  'cohen_d': -0.4455512168017853,
  'p_value': 0.0013626844883303298,
  'auc': 0.6508560690423162},
 {'feature_nums': 4,
  'cohen_d': -0.21828466907754995,
  'p_value': 0.11812817279781472,
  'auc': 0.5626739977728284}]

In [181]:
def train_2d_classifier(data_all: np.ndarray,
                        data_worst: np.ndarray,
                        sigmas: np.ndarray,
                        sigma_pair: tuple,
                        test_size: float = 0.2,
                        random_state: int = 42):
    """
    Train a 2D logistic regression classifier using two sigma features.

    Parameters
    ----------
    data_all : np.ndarray, shape (n_all, n_sigmas)
    data_worst : np.ndarray, shape (n_worst, n_sigmas)
    sigmas : np.ndarray, shape (n_sigmas,)
    sigma_pair : tuple of two floats
        The sigma values to use as the 2D features.
    test_size : float
        Proportion of data to use for test split.
    random_state : int
        Random seed for reproducibility.

    Returns
    -------
    model : sklearn.linear_model.LogisticRegression
        Trained logistic regression model.
    indices : tuple of two ints
        Column indices of the chosen sigma values.
    accuracy : float
        Classification accuracy on the test split.

    Example
    -------
    >>> model, indices, acc = train_2d_classifier(
    ...     all_feats, worst_feats,
    ...     sigmas=np.linspace(0,2,21),
    ...     sigma_pair=(0.5, 1.5)
    ... )
    """
    # map sigma values to nearest indices
    # idx1 = int(np.argmin(np.abs(sigmas - sigma_pair[0])))
    # idx2 = int(np.argmin(np.abs(sigmas - sigma_pair[1])))

    # X_all = data_all[:, [idx1, idx2]]
    # X_worst = data_worst[:, [idx1, idx2]]
    X_all = data_all
    X_worst = data_worst

    X = np.vstack([X_all, X_worst])
    y = np.concatenate([np.zeros(len(X_all)), np.ones(len(X_worst))])

    X_train, X_test, y_train, y_test = train_test_split(
        X, y,
        test_size=test_size,
        random_state=random_state
    )
    model = LogisticRegression()
    model.fit(X_train, y_train)
    accuracy = model.score(X_test, y_test)
    print(f"2D classifier test accuracy: {accuracy:.3f}")
    return model


def classify_point(model, point: tuple) -> tuple:
    """
    Classify a single 2D point using the trained model.

    Parameters
    ----------
    model : trained sklearn classifier
    point : tuple of two floats
        Feature values corresponding to the chosen sigma pair.

    Returns
    -------
    label : int
        Predicted class (0 for all, 1 for worst25).
    prob : float
        Predicted probability of the positive class (worst25).

    Example
    -------
    >>> label, prob = classify_point(model, (0.545, 0.544))
    """
    prob = model.predict_proba([point])[0, 1]
    label = int(model.predict([point])[0])
    return label, prob


def plot_decision_boundary(model,
                           data_all: np.ndarray,
                           data_worst: np.ndarray,
                           sigmas: np.ndarray,
                           sigma_pair: tuple,
                           grid_steps: int = 200,
                           output_path: str = None):
    """
    Plot decision boundary of the 2D classifier with scatter data.

    Parameters
    ----------
    model : trained sklearn classifier
    data_all : np.ndarray, shape (n_all, n_sigmas)
    data_worst : np.ndarray, shape (n_worst, n_sigmas)
    sigmas : np.ndarray, shape (n_sigmas,)
    sigma_pair : tuple of two floats
    grid_steps : int
        Resolution of the meshgrid.
    output_path : str, optional
        If provided, save the figure; otherwise show interactively.
    """
    idx1 = int(np.argmin(np.abs(sigmas - sigma_pair[0])))
    idx2 = int(np.argmin(np.abs(sigmas - sigma_pair[1])))

    X1_all = data_all[:, idx1]
    X2_all = data_all[:, idx2]
    X1_w = data_worst[:, idx1]
    X2_w = data_worst[:, idx2]

    # create meshgrid
    x_min = min(X1_all.min(), X1_w.min())
    x_max = max(X1_all.max(), X1_w.max())
    y_min = min(X2_all.min(), X2_w.min())
    y_max = max(X2_all.max(), X2_w.max())
    xx, yy = np.meshgrid(
        np.linspace(x_min, x_max, grid_steps),
        np.linspace(y_min, y_max, grid_steps)
    )
    grid = np.c_[xx.ravel(), yy.ravel()]

    Z = model.predict(grid).reshape(xx.shape)

    plt.figure(figsize=(6, 5))
    plt.contourf(xx, yy, Z, alpha=0.3)
    plt.scatter(X1_all, X2_all, label='all', s=20, edgecolor='k')
    plt.scatter(X1_w, X2_w, label='worst25', s=20, edgecolor='k')
    plt.xlabel(f'Feature at σ={sigmas[idx1]:.2f}')
    plt.ylabel(f'Feature at σ={sigmas[idx2]:.2f}')
    plt.legend()
    if output_path:
        plt.savefig(output_path, bbox_inches='tight')
        plt.close()
    else:
        plt.show()
