# Mondrian Conformal Prediction with Class-wise Coverage Guarantees
## 1. Overview
This implementation focuses on Mondrian Conformal Prediction, a method that provides reliable prediction intervals while maintaining class-specific coverage guarantees. The approach ensures that predictive uncertainty is accurately quantified for each individual class.
## 2. Methodology
The non-conformity scores are calculated using cumulative probability summation, which provides a robust measure of prediction reliability. The method proceeds as follows:

1. For each class, we compute non-conformity scores based on the cumulative probability distribution
2. These scores maintain separate calibration for each class label
3. The result is a prediction set that guarantees the desired coverage level within each class

The non-conformity score α for an example (x, y) is computed as:
$$
C_i = \sum_{k=1}^{K}  |y_{i,k} - \hat{h}_{i,k}(x_i)|,\; i \in X_{calibration}
$$

## 3. Notation and Example
#### 1) Variable Definitions
#### Let:

1. $K$ represent the total number of classes
2. $y_{i,k}$ denote the true label for the $i$-th instance in class $k$
3. $\hat{h}_{i,k}$ represent our predicted probability for the $i$-th instance in class $k$

#### 2) Example Illustration
Consider a binary classification case where we represent labels in cumulative binary form:
#### True Labels
$y = [1, 1, 0, 0]$ represents class 2, where:

- 1's indicate the classes up to and including the true class
- 0's indicate the classes above the true class

#### 3) Predicted Probabilities
$\hat{h} = [0.96, 0.82, 0.45, 0.15]$ represents our model's predictions, where:

- Each value represents the predicted probability for the corresponding position
- Values typically decrease as we move through the cumulative binary representation

#### 4) Mathematical Representation
For this example:

- $K = 4$ (number of positions in the vector)
- $i$ represents a single instance
- Each position $k \in {1,2,3,4}$ has a corresponding $y_{i,k}$ and $\hat{h}_{i,k}$

$$
C = |1 - 0.96| + |1 - 0.82| + |0 - 0.45| + |0 - 0.15|
$$


In [1]:
import numpy as np
import pandas as pd

from training import test_loop
from training import SolarFlSets, HSS2, TSS, F1Pos, HSS_multiclass 

In [2]:
class MCP:
    def __init__(self, arr: np.ndarray = None, calset: int = 3):
        """
        Mondrian Conformal Prediction (MCP) Non-Conformity Measure (NCM).
        
        Args:
            arr (np.ndarray): A 2D NumPy array where:
                - Columns 0 to 3: Model outputs from sigmoid (probabilities).
                - Column 4: Predictive values (integers).
                - Column 5: Ground truth labels (integers 0-3).
            calset (int): Calibration set index (not used in this function but stored for future use).
        """
        self.arr = arr  # Shape: (N, 6), where N is the number of samples
        self.calset = calset

    def threshold(self, q = 0.1):
        """
        Compute non-conformity scores and separate them by class.
        Args:
        q: percentile of the empirical distribution

        Returns:
            Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: 
            Non-conformity scores for each class (0, 1, 2, 3).
        """
        if self.arr is None:
            raise ValueError("Input array (arr) is None. Please provide data.")

        num_classes = 4  # Assuming 4 ordinal classes
        labels = self.arr[:, 5].astype(int)  # Extract ground truth labels (column 5)

        # Convert ground truth labels to cumulative binary form
        binary_labels = (labels[:, None] >= np.arange(num_classes)).astype(np.float32)

        # Compute non-conformity score as absolute difference
        ncm = np.abs(self.arr[:, :num_classes] - binary_labels)

        # Efficiently filter samples per class
        class_ncm = [ncm[labels == c] for c in range(num_classes)]
        self.dist = tuple(class_ncm)  # Return as tuple (class_0, class_1, class_2, class_3)
        
        # define threshold for each class
        thres_cls = [ np.quantile(self.dist[c], q) for c in range(num_classes) ]
        return thres_cls

In [None]:
cal_file = '24image_multi_GOES_classification_Partition3.csv'
test_file = '24image_multi_GOES_classification_Partition4.csv'

        
print('--------------------------------------------------------------------------------')
print(f'Train: ({train}), Test: {test}')
print(f"Initial learning rate: {lr:.1e}, decay value: {wt:.1e}")
print('--------------------------------------------------------------------------------')

# train set
df_train = pd.DataFrame([], columns = ['Timestamp', 'GOES_cls', 'Label'])
for partition in train_list:
    d = pd.read_csv(file_path + partition)
    df_train = pd.concat([df_train, d])

# test set and calibration set
df_test = pd.read_csv(file_path + test_file)

# string to datetime
df_train['Timestamp'] = pd.to_datetime(df_train['Timestamp'], format = '%Y-%m-%d %H:%M:%S')
df_test['Timestamp'] = pd.to_datetime(df_test['Timestamp'], format = '%Y-%m-%d %H:%M:%S')

# training data loader
# over/under sampling
data_training, imbalance_ratio = oversample_func(df = df_train, img_dir = img_dir, channel = channel_tag, norm = True)

# validation data loader
data_testing = SolarFlSets(annotations_df = df_test, img_dir = img_dir, channel = channel_tag, normalization = True)
train_dataloader = DataLoader(data_training, batch_size = batch_size, shuffle = True) # num_workers = 0, pin_memory = True, 
test_dataloader = DataLoader(data_testing, batch_size = batch_size, shuffle = False) # num_workers = 0, pin_memory = True, 

