# 1. Imports and Setup
You'll begin by importing necessary libraries, including your GeometricCalibrator library and other standard packages like tensorflow, scikit-learn, or pytorch for the CNN part, and numpy or pandas for data manipulation.

In [1]:
import sys
import os

# Add the parent directory to the sys.path so Python can find the utils module
sys.path.append(os.path.abspath('..'))

In [2]:
import numpy as np
import tensorflow as tf
from sklearn.metrics import accuracy_score
from utils.metrics import ECE_calc
from calibrators.geometric_calibrators import GeometricCalibrator
from utils.logging_config import *

setup_logging()


2024-09-24 13:27:24.184795: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-09-24 13:27:24.206359: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-09-24 13:27:24.212900: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


# 2. Step 1: Data Retrieval
This part loads your dataset. It could be MNIST, CIFAR-10, or any dataset you're working with. Depending on the experiment, you may switch between different datasets to test the generality of your method.

In [3]:
from keras.src.datasets.mnist import load_data
from sklearn.model_selection import train_test_split

# Split and save the data for reproducible experiments (to be done once)
# Load MNIST data
(train_X_original, train_y_original), (test_X_original, test_y_original) = load_data()

# Combine train and test data for further splitting
data = np.concatenate((train_X_original, test_X_original), axis=0)
labels = np.concatenate((train_y_original, test_y_original), axis=0)

# Perform the splitting dynamically
X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.2, random_state=0)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.25, random_state=0)  # 0.25 * 0.8 = 0.2



# 3. Step 2: Data Augmentation
Here, you'll apply different augmentation techniques (e.g., rotation, noise addition, or translation). Each augmentation method is treated as a different experiment. You'll want to apply these transformations to the training data before passing them through the model.

In [4]:
# Ensure the shape is correct by only adding the channel dimension once
# If you already added a channel, avoid calling expand_dims again
if X_train.ndim == 3:  # Check if the array is missing the channel dimension
    X_train = np.expand_dims(X_train, axis=-1)  # Shape becomes (batch_size, 28, 28, 1)
if X_test.ndim == 3:
    X_test = np.expand_dims(X_test, axis=-1)
if X_val.ndim == 3:
    X_val = np.expand_dims(X_val, axis=-1)

# Augmentation methods
def augment_data(images, method='rotation'):
    if method == 'rotation':
        datagen = tf.keras.preprocessing.image.ImageDataGenerator(rotation_range=90)
    elif method == 'shift':
        datagen = tf.keras.preprocessing.image.ImageDataGenerator(width_shift_range=0.1, height_shift_range=0.1)
    elif method == 'noise':
        noise = np.random.normal(loc=0.0, scale=0.1, size=images.shape)
        return np.clip(images + noise, 0., 1.)
    
    # Ensure images have the correct shape for ImageDataGenerator (rank 4)
    if images.ndim != 4:
        raise ValueError(f"Input to `ImageDataGenerator` should be rank 4. Got shape: {images.shape}")

    datagen.fit(images)
    
    # Use .flow() to create an iterator and retrieve augmented images
    augmented_images = datagen.flow(images, batch_size=len(images), shuffle=False)
    
    # Convert iterator output to Numpy array
    return next(augmented_images)  # Retrieve the first batch of augmented images

# Apply the selected augmentation method
augmented_train_images = augment_data(X_train, method='rotation')

# 4. Step 3: Build Stability Space using CNN + Pooling
You now use the augmented data and pass it through a convolutional neural network with pooling to create the stability space before performing geometric separation.

In [5]:
import os

# Define file path to save the trained model
model_path = 'saved_models/cnn_model.keras'

# Define CNN model with input shape of (28, 28, 1) for grayscale images
def build_cnn_model():
    model = tf.keras.models.Sequential([
        tf.keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)),  # Adjusted input shape
        tf.keras.layers.MaxPooling2D((2, 2)),
        tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
        tf.keras.layers.MaxPooling2D((2, 2)),
        tf.keras.layers.Conv2D(128, (3, 3), activation='relu'),
        tf.keras.layers.MaxPooling2D((2, 2)),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(10, activation='softmax')  # 10 classes for MNIST
    ])
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

# Check if model already exists, load it if found, otherwise train a new model
if os.path.exists(model_path):
    print("Loading pre-trained model...")
    cnn_model = tf.keras.models.load_model(model_path)
else:
    print("Training new model...")
    cnn_model = build_cnn_model()
    
    # Train the CNN model
    cnn_model.fit(augmented_train_images, y_train, epochs=10, validation_data=(X_val, y_val))
    
    # Save the trained model
    os.makedirs(os.path.dirname(model_path), exist_ok=True)  # Create directory if not exists
    cnn_model.save(model_path)
    print(f"Model saved at {model_path}")

# Use the trained model to predict probabilities on validation and test data
features_val = cnn_model.predict(X_val)
features_test = cnn_model.predict(X_test)

Loading pre-trained model...
[1m438/438[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 6ms/step
[1m438/438[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 6ms/step


# 5. Step 4: Apply Geometric Calibration
Once you have the features, pass them through your GeometricCalibrator to compute the stability space and perform uncertainty calibration.

In [6]:
import bisect
import concurrent
from itertools import repeat
# import scann
# from annoy import AnnoyIndex
# import nmslib
# import hnswlib
import torch
import numpy as np
from sklearn.decomposition import PCA
import tensorflow as tf
from math import sqrt

import faiss
import numpy as np
from scipy.optimize import optimize, minimize
import h5py
import os
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.linear_model import ElasticNet, Lasso, LinearRegression
from sklearn.neighbors import NearestNeighbors
from scipy.optimize import curve_fit
from sklearn.isotonic import IsotonicRegression
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
import json
import scipy.stats
from tqdm import tqdm
import torchvision.transforms as transforms
import shutil
import time
import logging
from utils.logging_config import setup_logging
# geometric_calibrators.py
import numpy as np
import logging
from sklearn.isotonic import IsotonicRegression
from tqdm import tqdm

from calibrators.base_calibrator import BaseCalibrator
from utils.utils import StabilitySpace, Compression, calc_balanced_acc
from utils.logging_config import setup_logging
from sklearn.metrics import balanced_accuracy_score


In [14]:
def calc_balanced_acc(stability, y_true, y_pred, round_digits=2, num_classes=2):
    '''
    Returns the dicts of description of stability/separation (balanced accuracy, #num of True samples, #num of samples)

    Parameters:
        stability (list of floats or ndarray): Stability list of calculations
        y_true (list): Actual class labels
        y_pred (list): Predicted class labels
        round_digits (int): Number of decimal places to round the stability values to bin them
        num_classes (int): Number of unique classes in the dataset

    Returns:
        s_bal_acc (dict): {key = normalized unique stability, value = balanced accuracy for that stability}
        s_true (dict): {key = normalized unique stability, value = amount of True samples per this stability for each class}
        s_all (dict): {key = normalized unique stability, value = amount of instances exist for that stability for each class}
    '''
    logger.info("Starting the calculation of balanced accuracy.")
    # Round the stability values to the desired precision to create bins
    rounded_stability = np.round(stability, decimals=round_digits)

    # Get unique stability values and their frequencies
    stab_values, _ = np.unique(rounded_stability, return_counts=True)
    logger.info(f"Unique rounded stability values: {stab_values}")

    # Create dictionaries to count True positives and total samples per class for each stability value
    s_true = {stab: np.zeros(num_classes) for stab in stab_values}  # True positives per class
    s_all = {stab: np.zeros(num_classes) for stab in stab_values}  # Total samples per class

    # Count true positives and total samples
    for stab, true, pred in zip(rounded_stability, y_true, y_pred):
        class_index = int(true)  # Assuming y_true contains class labels from 0 to num_classes-1
        s_all[stab][class_index] += 1
        if true == pred:
            s_true[stab][class_index] += 1

    logger.info("Completed counting true positives and total samples for each stability value.")

    # Calculate balanced accuracy for each stability
    s_bal_acc = {}
    i = 0
    for stab in stab_values:
        if np.sum(s_all[stab]) == 0:  # Avoid division by zero
            s_bal_acc[stab] = 0
        else:
            # Balanced accuracy: average of recall for each class
            class_recalls = [s_true[stab][i] / s_all[stab][i] if s_all[stab][i] != 0 else 0 for i in range(num_classes)]
            s_bal_acc[stab] = np.mean(class_recalls)
        if i % 350 == 0:
            logger.info(f"Stability {stab}: Balanced Accuracy = {s_bal_acc[stab]}")
        i += 1

    return s_bal_acc, s_true, s_all

In [15]:
class StabilitySpace:
    """
    Class to compute stability and geometric values for the input X using various similarity search libraries.
    """

    def __init__(self, X_train, y_train, compression=None, library='faiss', metric='l2', num_labels=None):
        """
        Initialize the stability space by compressing the input data (optional) and setting up nearest neighbor models.
        """
        self.logger = logging.getLogger(self.__class__.__name__)
        self.logger.info(f"Initializing StabilitySpace with {library} library and {metric} metric.")

        self.metric = metric
        self.library = library
        self.num_labels = num_labels or len(set(y_train))
        self.compression = compression

        if self.compression:
            X_train, y_train = self.compression(X_train, y_train)

        self.X_train = X_train
        self.y_train = y_train

        self.logger.info(f"StabilitySpace initialized with {X_train.shape[0]} samples and {self.num_labels} classes.")

        # Initialize nearest neighbor indices for each class
        self.same_nbrs = {}
        self.other_nbrs = {}
        self._build_neighbors_indices()
        # Initialize the stability_calculators dictionary
        self.stability_calculators = {
            'faiss': self._stability_faiss_knn,
            'knn': self._stability_faiss_knn,  # Reuse the same function for KNN
            # Other libraries like 'annoy', 'nmslib', etc. can be added here if needed
        }        

    def _build_neighbors_indices(self):
        """
        Build nearest-neighbor indices for same and other classes.
        """
        self.logger.info("Starting to build nearest-neighbor indices for each class.")
    
        # Reshape X_train if it's multi-dimensional (for example, if it's an image dataset)
        if len(self.X_train.shape) > 2:
            self.logger.debug("Reshaping X_train for FAISS/KNN compatibility.")
            self.X_train = self.X_train.reshape(self.X_train.shape[0], -1).astype('float32')
    
        dim = self.X_train.shape[1]  # Now dim should be the number of features per sample
        self.logger.info(f"Number of features per sample: {dim}")
    
        # Build separate FAISS indices for same and other-class samples
        for label in range(self.num_labels):
            self.logger.info(f"Processing label {label}/{self.num_labels - 1}.")
    
            # Get indices of samples with the same and different labels
            idx_same = np.where(self.y_train == label)[0]
            idx_other = np.where(self.y_train != label)[0]
    
            if self.library == 'faiss':
                self.logger.info(f"Using FAISS for label {label}.")
    
                # Build FAISS index for same-class data
                try:
                    self.same_nbrs[label] = faiss.IndexFlatL2(dim)
                    self.same_nbrs[label].add(self.X_train[idx_same].astype('float32'))
                    self.logger.info(f"FAISS index for same-class built successfully for label {label}.")
                except Exception as e:
                    self.logger.error(f"Error building FAISS index for same-class for label {label}: {e}")
    
                # Build FAISS index for other-class data
                try:
                    self.other_nbrs[label] = faiss.IndexFlatL2(dim)
                    self.other_nbrs[label].add(self.X_train[idx_other].astype('float32'))
                    self.logger.info(f"FAISS index for other-class built successfully for label {label}.")
                except Exception as e:
                    self.logger.error(f"Error building FAISS index for other-class for label {label}: {e}")
    
    def _stability_faiss_knn(self, trainX, testX, train_y, test_y_pred, num_labels, metric='l2'):
        """
        Calculates the stability of the test set using FAISS or KNN (depending on the prebuilt indices).
        """
        logger.info(f"Starting stability calculation using {self.library} with metric: {metric}.")
        predicted_labels = np.argmax(test_y_pred, axis=1)
        stability = np.zeros(len(testX))

        for i in tqdm(range(testX.shape[0]), desc="Calculating Stability", unit="sample"):
            x = testX[i].reshape(1, -1)  # Flatten the test instance
            pred_label = int(predicted_labels[i])

            if self.library == 'faiss':
                _, dist_same = self.same_nbrs[pred_label].search(x, 1)  # Nearest in same class
                _, dist_other = self.other_nbrs[pred_label].search(x, 1)  # Nearest in other classes

            stability[i] = (dist_other[0][0] - dist_same[0][0]) / 2

        logger.info("Completed stability calculation.")
        return stability

    def calc_stab(self, X_test, y_test_pred):
        """
        Calculate stability for the test set.
        """
        if self.compression:
            X_test, _ = self.compression(X_test, None, train=False)

        self.logger.info(f"Calculating stability using {self.library} with metric {self.metric}.")
        stability_calculator = self.stability_calculators.get(self.library)

        if stability_calculator is not None:
            return stability_calculator(self.X_train, X_test, self.y_train, y_test_pred, self.num_labels, metric=self.metric)
        else:
            self.logger.error(f"Unsupported library: {self.library}")
            raise ValueError(f"Unsupported library: {self.library}")
            
    def calc_balanced_accuracy(self, stability, y_true, y_pred):
        """
        Calculate balanced accuracy for each unique stability value in classification using sklearn.
        """
        unique_stabilities = np.unique(stability)
        balanced_accuracies = {}

        for stab in unique_stabilities:
            # Mask for current stability value
            mask = stability == stab
            if np.any(mask):
                # Calculate balanced accuracy for the current stability subset
                balanced_accuracies[stab] = balanced_accuracy_score(y_true[mask], y_pred[mask])

        return balanced_accuracies
        
    def calculate_stability_and_accuracy(self, X_test, y_test_true, y_test_pred):
        """
        A method to calculate both stability and balanced accuracy for the given test dataset.
        """
        stability = self.calc_stab(X_test, y_test_pred)
        balanced_accuracy = self.calc_balanced_accuracy(stability, y_test_true, y_test_pred)

        return stability, balanced_accuracy

In [18]:
setup_logging()
logger = logging.getLogger(__name__)


class GeometricCalibrator(BaseCalibrator):
    """
    Class serving as a wrapper for the geometric calibration method (stability/separation).
    """

    def __init__(self, model, X_train, y_train, fitting_func=None, compression_mode=None, compression_param=None,
                 metric='l2', stability_space=None, library='faiss'):
        """
        Initializes the GeometricCalibrator with a model, stability space, and calibration function.

        Args:
            model: The model to be calibrated (with `predict` and `predict_proba` methods).
            X_train: Training data (flattened images).
            y_train: Training labels.
            fitting_func: Custom fitting function (default: IsotonicRegression).
            compression_mode: Compression mode for data.
            compression_param: Parameter controlling the compression level.
            metric: Distance metric for stability/separation calculations.
            stability_space: Optional custom StabilitySpace instance. If not provided, one is initialized automatically.
            library: The library used for stability calculation (default is 'faiss').
        """
        super().__init__()
        self.model = model
        self.popt = None
        self._fitted = False

        # Determine the number of classes (unique labels in y_train)
        self.num_labels = len(np.unique(y_train))  # Fix: Initialize num_labels based on the training labels

        # Default to IsotonicRegression if no custom fitting function is provided
        self.fitting_func = fitting_func if fitting_func else IsotonicRegression(out_of_bounds="clip")

        # Use provided stability space or create a new one with the default settings
        if stability_space:
            self.stab_space = stability_space  # User provided custom StabilitySpace
            logger.info(f"{self.__class__.__name__}: Using custom StabilitySpace provided by user.")
        else:
            # Automatically initialize StabilitySpace with defaults if not provided
            self.stab_space = StabilitySpace(X_train, y_train,
                                             compression=Compression(compression_mode, compression_param),
                                             library=library, metric=metric)
            logger.info(f"{self.__class__.__name__}: Initialized StabilitySpace with default settings"
                        f" (library: {library}, metric: {metric}).")

        logger.info(f"Initialized {self.__class__.__name__} with model {self.model.__class__.__name__}"
                    f" and fitting function {self.fitting_func.__class__.__name__}.")

    def fit(self, X_val, y_val):
        """
        Fits the calibrator with the validation data using rounded stability and balanced accuracy.
    
        Args:
            X_val: Validation data (flattened images).
            y_val: Validation labels.
        """
        logger.info(f"{self.__class__.__name__}: Fitting with validation data using balanced accuracy and rounded stability.")
    
        try:
            # Step 1: Predict on validation data
            y_pred_val = self.model.predict(X_val)
            y_pred_classes = np.argmax(y_pred_val, axis=1)  # Convert predictions to class labels
    
            # Step 2: Compute stability values based on predictions
            stability_val = self.stab_space.calc_stab(X_val, y_pred_val)
            logger.info(f"Stability values (first 200): {stability_val[:200]}")  # Log a sample of the stability values
    
            # Step 3: Round the stability values for binning
            round_digits = 5  # Precision for rounding stability values
            rounded_stability = np.round(stability_val, decimals=round_digits)
            unique_stabilities = np.unique(rounded_stability)
    
            # Step 4: Calculate accuracy for each unique stability value
            stability_accuracy = {}
            i = 0
            for stab in unique_stabilities:
                indices = np.where(rounded_stability == stab)[0]  # Get indices of points with this stability
                y_true_stab = y_val[indices]  # Get true labels for these points
                y_pred_stab = y_pred_classes[indices]  # Get predicted labels for these points
    
                # Calculate balanced accuracy for this stability value
                if len(np.unique(y_true_stab)) > 1:  # Ensure we have more than one class
                    acc = balanced_accuracy_score(y_true_stab, y_pred_stab)
                else:
                    acc = np.mean(y_true_stab == y_pred_stab)  # Use normal accuracy for single-class stability
    
                stability_accuracy[stab] = acc
                if i % 350 == 0:
                    logger.info(f"Stability {stab}: Accuracy = {acc}")
                i += 1
    
            # Step 5: Prepare calibration data: (rounded_stability, accuracy) pairs
            calibration_data = [(stab, stability_accuracy[stab]) for stab in stability_accuracy]
    
            # Step 6: Fit the provided fitting function (e.g., IsotonicRegression) on the calibration data
            stability_vals, accuracies = zip(*calibration_data)
            self.popt = self.fitting_func.fit(np.array(stability_vals).reshape(-1, 1), np.array(accuracies))
    
            self._fitted = True
            logger.info(f"{self.__class__.__name__}: Successfully fitted using stability-accuracy pairs and {self.fitting_func.__class__.__name__}.")
    
        except Exception as e:
            logger.error(f"{self.__class__.__name__}: Failed to fit with error: {e}")
            raise
            
    def calibrate(self, X_test):
        """
        Calibrates the test data based on the fitted model.
    
        Args:
            X_test: Test data (flattened images).
    
        Returns:
            np.ndarray: Calibrated probability matrix for each image and class.
        """
        if not self._fitted:
            raise ValueError("You must fit the calibrator before using it.")
    
        logger.info(f"{self.__class__.__name__}: Calibrating test data.")
    
        try:
            # Predict on the test data using the trained model (get predicted probabilities for all classes)
            y_test_pred = self.model.predict(X_test)
            y_test_labels = np.argmax(y_test_pred, axis=1)  # Get predicted class labels from probabilities
    
            # Initialize progress bar using tqdm
            num_samples = X_test.shape[0]
            num_classes = y_test_pred.shape[1]
            calibrated_probs = np.zeros((num_samples, num_classes))  # Initialize a matrix to store calibrated probabilities
    
            logger.info(f"Starting calibration for {num_samples} samples and {num_classes} classes.")
    
            # Compute stability for the predicted probabilities
            stability_test = self.stab_space.calc_stab(X_test, y_test_pred)
            logger.info(f"Stability values during calibration (first 10): {stability_test[:10]}")  # Add logging
    
            # Apply the fitted calibration function to the stability values
            calibrated_values = self.popt.predict(stability_test.reshape(-1, 1))
            logger.info(f"Calibrated values (first 10): {calibrated_values[:10]}")  # Add logging
    
            # Distribute the calibrated values across the predicted class
            for i in range(X_test.shape[0]):
                # Assign the calibrated probability to the predicted class label
                calibrated_probs[i, y_test_labels[i]] = calibrated_values[i]
                
                # Distribute remaining probability equally across other classes
                remaining_prob = (1 - calibrated_values[i]) / (self.num_labels - 1)
                for j in range(self.num_labels):
                    if j != y_test_labels[i]:
                        calibrated_probs[i, j] = remaining_prob
    
            # Ensure probabilities are in [0, 1] and sum to 1
            calibrated_probs = np.clip(calibrated_probs, 0, 1)
            calibrated_probs = calibrated_probs / calibrated_probs.sum(axis=1, keepdims=True)
    
            logger.info(f"{self.__class__.__name__}: Calibration successful.")
    
            return calibrated_probs
    
        except Exception as e:
            logger.error(f"{self.__class__.__name__}: Calibration failed with error: {e}")
            raise


In [None]:
# Initialize Geometric Calibrator with FAISS
geo_calibrator_faiss = GeometricCalibrator(model=cnn_model, X_train=augmented_train_images, y_train=y_train, library='faiss')

# Fit the calibrator using FAISS on the validation data
geo_calibrator_faiss.fit(X_val, y_val)

# # Initialize Geometric Calibrator with KNN
# geo_calibrator_knn = GeometricCalibrator(model=cnn_model, X_train=augmented_train_images, y_train=y_train, library='knn')

# # Fit the calibrator using KNN on the validation data
# geo_calibrator_knn.fit(X_val, y_val)

print("Both FAISS and KNN calibrators have been initialized and fitted.")

2024-09-24 14:06:35,729 - INFO - Initialized GeometricCalibrator with n_classes=None, bins=15, temperature=1.0
2024-09-24 14:06:35,731 - INFO - Initializing StabilitySpace with faiss library and l2 metric.
2024-09-24 14:06:35,738 - INFO - StabilitySpace initialized with 42000 samples and 10 classes.
2024-09-24 14:06:35,739 - INFO - Starting to build nearest-neighbor indices for each class.


Running without compression, the shape of X needs to be square


2024-09-24 14:06:39,493 - INFO - Number of features per sample: 784
2024-09-24 14:06:39,494 - INFO - Processing label 0/9.
2024-09-24 14:06:39,495 - INFO - Using FAISS for label 0.
2024-09-24 14:06:41,007 - INFO - FAISS index for same-class built successfully for label 0.
2024-09-24 14:07:12,609 - INFO - FAISS index for other-class built successfully for label 0.
2024-09-24 14:07:12,610 - INFO - Processing label 1/9.
2024-09-24 14:07:12,612 - INFO - Using FAISS for label 1.
2024-09-24 14:07:12,638 - INFO - FAISS index for same-class built successfully for label 1.
2024-09-24 14:07:20,467 - INFO - FAISS index for other-class built successfully for label 1.
2024-09-24 14:07:20,468 - INFO - Processing label 2/9.
2024-09-24 14:07:20,469 - INFO - Using FAISS for label 2.
2024-09-24 14:07:20,489 - INFO - FAISS index for same-class built successfully for label 2.
2024-09-24 14:07:20,809 - INFO - FAISS index for other-class built successfully for label 2.
2024-09-24 14:07:20,810 - INFO - Proce

[1m438/438[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 7ms/step


2024-09-24 14:07:26,552 - INFO - Calculating stability using faiss with metric l2.
2024-09-24 14:07:26,553 - INFO - Starting stability calculation using faiss with metric: l2.


Running without compression, the shape of X needs to be square


Calculating Stability: 100%|██████████| 14000/14000 [04:00<00:00, 58.11sample/s]
2024-09-24 14:11:27,484 - INFO - Completed stability calculation.
2024-09-24 14:11:27,487 - INFO - Stability values (first 200): [10323.  10775.5  6828.  11233.   5364.5 15242.5  9082.   3573.5  4628.
  4009.   8945.  16456.5 17064.  14602.5 10527.   -315.5  9867.   8717.
  3271.5 17258.5  8129.5   452.  16059.   5790.5 13773.   7730.5  4651.
 13388.  18062.5  1359.5 16414.   9206.   9134.5  5459.   -361.5  2337.
  8511.  14318.   7120.5  4837.5   474.  10053.   2009.   1643.5 17095.5
  1501.  10611.  17493.    -89.  15377.5 13833.5   476.5 16573.5  7034.5
  6665.5 15093.   9847.  15710.5  5595.  10347.  14083.  16095.5 14662.5
 17695.5 18497.  16576.5  5516.5 10561.  15031.  17525.5  5984.5  4590.
  -624.  12844.   3362.5  8933.   4318.   6106.5 16106.   7887.   8475.
  6920.   8496.5  3307.5  9301.  14182.5  -193.5  -352.5  8466.5  -214.
   283.5 16627.5  1765.  17464.  -1301.  12370.5  4872.5 14524.5 11

# 6. Step 5: Calibrate the Test Data
After fitting the calibrator, use it to predict and calibrate the probabilities for the test set.

In [16]:
# Calibrate the predictions on the test set using FAISS-based calibrator
calibrated_probs_faiss = geo_calibrator_faiss.calibrate(X_test)

# Get predicted labels by selecting the class with the highest calibrated probability for FAISS
y_test_pred_faiss = np.argmax(calibrated_probs_faiss, axis=1)

# Calculate accuracy for FAISS
accuracy_faiss = accuracy_score(y_test, y_test_pred_faiss)
print(f"Accuracy after calibration with FAISS: {accuracy_faiss}")

# # Calibrate the predictions on the test set using KNN-based calibrator
# calibrated_probs_knn = geo_calibrator_knn.calibrate(X_test)

# # Get predicted labels by selecting the class with the highest calibrated probability for KNN
# y_test_pred_knn = np.argmax(calibrated_probs_knn, axis=1)

# # Calculate accuracy for KNN
# accuracy_knn = accuracy_score(y_test, y_test_pred_knn)
# print(f"Accuracy after calibration with KNN: {accuracy_knn}")


2024-09-23 19:09:05,958 - INFO - GeometricCalibrator: Calibrating test data.


[1m438/438[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 6ms/step


2024-09-23 19:09:09,012 - INFO - Starting calibration for 14000 samples and 10 classes.
2024-09-23 19:09:09,013 - INFO - Calculating stability using faiss with metric l2.
2024-09-23 19:09:09,014 - INFO - Starting stability calculation using faiss with metric: l2.


Running without compression, the shape of X needs to be square


Calculating Stability: 100%|██████████| 14000/14000 [03:07<00:00, 74.57sample/s]
2024-09-23 19:12:16,754 - INFO - Completed stability calculation.
2024-09-23 19:12:16,756 - INFO - Stability values during calibration (first 10): [12427.5  9949.5  5789.    206.  -1659.5 16998.   3926.5  9906.   9403.
 -1217. ]
2024-09-23 19:12:16,759 - INFO - Calibrated values (first 10): [0.11476546 0.11476546 0.11442429 0.11442429 0.104      0.11515334
 0.11442429 0.11476546 0.11476546 0.10757576]
2024-09-23 19:12:16,813 - INFO - GeometricCalibrator: Calibration successful.


Accuracy after calibration with FAISS: 0.9390714285714286


# 7. Step 6: Calculate ECE
Evaluate the calibration by computing the Expected Calibration Error (ECE).

In [17]:
import logging

logger = logging.getLogger(__name__)
def ECE_calc(probs, y_pred, y_real, n_bins=20):
    """
    Expected Calibration Error (ECE) calculation.

    Parameters:
        probs (np.ndarray): Calibrated probabilities for each class.
        y_pred (np.ndarray): Predicted class labels.
        y_real (np.ndarray): True class labels.
        n_bins (int): Number of bins to divide probabilities.

    Returns:
        float: ECE value.
    """
    logger.info("Starting Expected Calibration Error (ECE) calculation.")

    # Select the probabilities of the predicted classes
    confidence_of_pred_class = np.max(probs, axis=1)

    # Bin the confidences
    bin_boundaries = np.linspace(0, 1, n_bins + 1)
    bin_indices = np.digitize(confidence_of_pred_class, bin_boundaries) - 1

    logger.info(f"Bin boundaries: {bin_boundaries}")
    logger.info(f"Bin indices (first 10): {bin_indices[:10]}")
    
    total_error = 0.0
    total_weight = 0.0  # To track the weight distribution

    for i in range(n_bins):
        bin_mask = bin_indices == i
        bin_confidences = confidence_of_pred_class[bin_mask]
        bin_real = y_real[bin_mask]
        bin_pred = y_pred[bin_mask]

        if len(bin_confidences) > 0:
            bin_acc = np.mean(bin_real == bin_pred)
            bin_conf = np.mean(bin_confidences)
            bin_weight = len(bin_confidences) / len(probs)
            total_error += bin_weight * np.abs(bin_acc - bin_conf)
            total_weight += bin_weight

            logger.info(f"Bin {i}:")
            logger.info(f"  Bin size: {len(bin_confidences)}")
            logger.info(f"  Accuracy: {bin_acc}")
            logger.info(f"  Confidence: {bin_conf}")
            logger.info(f"  Bin weight: {bin_weight}")
            logger.info(f"  ECE contribution: {bin_weight * np.abs(bin_acc - bin_conf)}")
        else:
            logger.info(f"Bin {i} is empty.")

    logger.info(f"Total weight: {total_weight}")
    logger.info(f"Final ECE value: {total_error}")
    
    return total_error
# Predict on the test data using the trained model (without calibration)
y_test_pred_raw = cnn_model.predict(X_test)  # This is the predicted probabilities
y_test_pred_labels_raw = np.argmax(y_test_pred_raw, axis=1)  # This is the predicted class labels

# Calculate ECE for the model without calibration (raw probabilities)
ece_value_raw = ECE_calc(y_test_pred_raw, y_test_pred_labels_raw, y_test)  # Pass raw probabilities and labels
print(f"ECE without calibration: {ece_value_raw}")

# Calculate ECE for FAISS-based calibrated probabilities
ece_value_faiss = ECE_calc(calibrated_probs_faiss, y_test_pred_faiss, y_test)  # Pass calibrated probabilities and labels
print(f"ECE after FAISS calibration: {ece_value_faiss}")

print(f"For debug purpose, raw probs: {y_test_pred_raw} calibrated probs: {calibrated_probs_faiss}")
# # Calculate ECE for KNN-based calibrated probabilities
# ece_value_knn = ECE_calc(y_test_pred_knn, calibrated_probs_knn, y_test)
# print(f"ECE after KNN calibration: {ece_value_knn}")


[1m438/438[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 6ms/step


2024-09-23 19:12:19,581 - INFO - Starting Expected Calibration Error (ECE) calculation.
2024-09-23 19:12:19,584 - INFO - Bin boundaries: [0.   0.05 0.1  0.15 0.2  0.25 0.3  0.35 0.4  0.45 0.5  0.55 0.6  0.65
 0.7  0.75 0.8  0.85 0.9  0.95 1.  ]
2024-09-23 19:12:19,584 - INFO - Bin indices (first 10): [19 19 19 19 19 19 19 19 19 19]
2024-09-23 19:12:19,585 - INFO - Bin 0 is empty.
2024-09-23 19:12:19,587 - INFO - Bin 1 is empty.
2024-09-23 19:12:19,588 - INFO - Bin 2 is empty.
2024-09-23 19:12:19,589 - INFO - Bin 3 is empty.
2024-09-23 19:12:19,589 - INFO - Bin 4 is empty.
2024-09-23 19:12:19,591 - INFO - Bin 5:
2024-09-23 19:12:19,591 - INFO -   Bin size: 1
2024-09-23 19:12:19,592 - INFO -   Accuracy: 1.0
2024-09-23 19:12:19,593 - INFO -   Confidence: 0.29476022720336914
2024-09-23 19:12:19,594 - INFO -   Bin weight: 7.142857142857143e-05
2024-09-23 19:12:19,594 - INFO -   ECE contribution: 5.037426948547364e-05
2024-09-23 19:12:19,596 - INFO - Bin 6:
2024-09-23 19:12:19,597 - INFO -  

ECE without calibration: 0.03827404156114375
ECE after FAISS calibration: 0.8246275055230443
For debug purpose, raw probs: [[9.99999940e-01 5.24300392e-15 1.04556974e-10 ... 3.71166476e-13
  5.11456121e-11 2.98514269e-10]
 [5.04472283e-16 3.51001847e-14 6.73396777e-13 ... 4.49320581e-09
  3.75413727e-08 3.08899240e-09]
 [2.44305959e-19 9.99999940e-01 2.11173073e-17 ... 1.20417158e-13
  5.66953626e-15 1.77918609e-16]
 ...
 [1.12077350e-06 5.98498627e-06 8.02869454e-06 ... 9.99950349e-01
  2.81106995e-08 1.42379859e-08]
 [5.48741655e-15 7.35618814e-13 2.69353533e-11 ... 3.35309460e-06
  2.00430563e-07 3.11651771e-09]
 [6.10045001e-12 3.77386803e-18 1.26491837e-16 ... 2.14303473e-21
  1.97911479e-11 4.51686106e-13]] calibrated probs: [[0.11476546 0.09835939 0.09835939 ... 0.09835939 0.09835939 0.09835939]
 [0.09835939 0.09835939 0.09835939 ... 0.09835939 0.09835939 0.09835939]
 [0.0983973  0.11442429 0.0983973  ... 0.0983973  0.0983973  0.0983973 ]
 ...
 [0.0983973  0.0983973  0.0983973  

# 8. Step 7: Run Multiple Experiments
Wrap the entire pipeline in a loop to test different augmentation methods and repeat the experiment multiple times.

In [None]:
augmentation_methods = ['rotation', 'shift', 'noise']
results = {}

for method in augmentation_methods:
    print(f"Running experiment with {method} augmentation...")
    augmented_train_images = augment_data(X_train, method=method)
    
    # Train the CNN model
    cnn_model.fit(augmented_train_images, y_train, epochs=10)
    features_val = cnn_model.predict(X_val)
    features_test = cnn_model.predict(X_test)
    
    # Initialize and fit the geometric calibrator
    geo_calibrator = GeometricCalibrator(model=cnn_model, X_train=features_val, y_train=y_val)
    geo_calibrator.fit(X_val, y_val)
    
    # Calibrate the test set
    calibrated_probs = geo_calibrator.calibrate(X_test)
    y_test_pred = np.argmax(calibrated_probs, axis=1)
    
    # Calculate ECE
    ece_value = ECE_calc(y_test_pred, calibrated_probs, y_test)
    results[method] = ece_value
    print(f"{method} ECE: {ece_value}")

# Find the best augmentation method based on ECE
best_method = min(results, key=results.get)
print(f"Best augmentation method: {best_method} with ECE = {results[best_method]}")