In [11]:
import bisect
import warnings

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt

from sklearn.model_selection import KFold, train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import LabelEncoder, MultiLabelBinarizer
from sklearn.metrics import accuracy_score, f1_score


RANDOM_SEED = 42
warnings.filterwarnings("ignore")

In [2]:
class InductiveConformalPredictor():
    def __init__(self, predictor):
        self.predictor = predictor
        
        self._le = LabelEncoder()
        self.classes = self._le.fit_transform(predictor.classes_)

    def fit(self, X, y):
        self.calibration_score = self._uncertainty_conformity_score(X)
        self.calibration_class = self._le.transform(y)
        return self

    def _uncertainty_conformity_score(self, data):
        uncertainty_score = 1 - self.predictor.predict_proba(data)
        return uncertainty_score

    def predict_proba(self, X, mondrian=True):
        conformity_score = self._uncertainty_conformity_score(X)
        conformal_pred = np.zeros(conformity_score.shape)

        for c in self.classes:
            if mondrian:
                calibration_filt = self.calibration_score[
                    self.calibration_class == c
                ]
                calib = calibration_filt[:, c]
            else:
                calib = self.calibration_score[
                    range(len(self.calibration_class)), 
                    self.calibration_class
                ]

            sorted_calib = np.sort(calib)
            conformal_pred[:, c] = [
                float(bisect.bisect(sorted_calib, x))/len(calib)
                for x in conformity_score[:, c]
            ]

        return conformal_pred

    def predict(self, X, mondrian=True, alpha=0.05):
        _conformal_proba = self.predict_proba(X=X, mondrian=mondrian)
        conformal_pred = (_conformal_proba > alpha).astype(int)

        mlb = MultiLabelBinarizer()
        mlb.fit([self._le.classes_])
        pred = mlb.inverse_transform(conformal_pred)

        return pred

In [4]:
class ConformalInferenceExperiment:
    def __init__(self, data, target, n_splits=5, coverage_quantiles=[0.25, 0.5, 0.75], random_state=42):
        self.data = data
        self.target = target
        self.n_splits = n_splits
        self.coverage_quantiles = coverage_quantiles
        self.random_state = random_state

    def get_coverage(self, cfp, X_test, alpha_interval):
        coverage = np.array([
            np.array([len(y_set) == 1
                      for y_set in cfp.predict(X_test, alpha=alpha)]).mean()
            for alpha in alpha_interval
        ])
        return coverage

    def run(self):
        y = self.data[self.target]
        X = self.data.drop(self.target, axis=1)

        kf = KFold(n_splits=self.n_splits, shuffle=True, random_state=self.random_state)
        model = LogisticRegression(max_iter=1000, random_state=self.random_state)

        metrics = {
            'overall_accuracy': [],
            'overall_f1_score': []
        }
        for q in self.coverage_quantiles:
            metrics[f"top_{int(q * 100)}_accuracy"] = []
            metrics[f"top_{int(q * 100)}_f1_score"] = []

        for train_index, test_index in kf.split(X):
            X_train, X_test = X.iloc[train_index], X.iloc[test_index]
            y_train, y_test = y[train_index], y[test_index]

            X_train, X_calib, y_train, y_calib = train_test_split(
                X_train, y_train, test_size=0.3,
                stratify=y_train, random_state=self.random_state
            )

            model.fit(X_train, y_train)
            y_pred = model.predict(X_test)
            
            cfp = InductiveConformalPredictor(predictor=model)
            cfp.fit(X_calib, y_calib)

            accuracy = accuracy_score(y_test, y_pred)
            metrics["overall_accuracy"].append(accuracy)

            f1 = f1_score(y_test, y_pred)
            metrics["overall_f1_score"].append(f1)

            alpha_interval = np.arange(0, .9, .01)
            coverage = self.get_coverage(cfp, X_test, alpha_interval)

            for q in self.coverage_quantiles:
                index = np.argmin(np.abs(coverage - q))

                y_test_sets = cfp.predict(X_test, alpha=alpha_interval[index])
                indexes = np.where(np.vectorize(len)(y_test_sets) == 1)

                accuracy = accuracy_score(y_test.values[indexes], y_pred[indexes])
                metrics[f"top_{int(q * 100)}_accuracy"].append(accuracy)

                f1 = f1_score(y_test.values[indexes], y_pred[indexes])
                metrics[f"top_{int(q * 100)}_f1_score"].append(f1)

        return pd.DataFrame(metrics).describe().T[["mean", "std"]]

## Heart Attack

In [12]:
%%time

heart = pd.read_csv("../data/processed/heart.csv")

experiment = ConformalInferenceExperiment(
    data=heart, 
    target="HeartDisease",
    random_state=RANDOM_SEED
)
metrics = experiment.run()
metrics

CPU times: user 1.17 s, sys: 5.68 ms, total: 1.18 s
Wall time: 1.18 s


Unnamed: 0,mean,std
overall_accuracy,0.841987,0.045861
overall_f1_score,0.854634,0.046175
top_25_accuracy,0.943466,0.02438
top_25_f1_score,0.95235,0.018114
top_50_accuracy,0.91744,0.02899
top_50_f1_score,0.929177,0.024686
top_75_accuracy,0.901095,0.034858
top_75_f1_score,0.909748,0.033008


## Australian Credit

In [14]:
%%time

australian = pd.read_csv("../data/processed/australian.csv")

experiment = ConformalInferenceExperiment(
    data=australian, 
    target="14",
    random_state=RANDOM_SEED
)
metrics = experiment.run()
metrics

CPU times: user 1.39 s, sys: 6.16 ms, total: 1.4 s
Wall time: 1.4 s


Unnamed: 0,mean,std
overall_accuracy,0.856522,0.028712
overall_f1_score,0.835903,0.037906
top_25_accuracy,0.942017,0.054252
top_25_f1_score,0.922652,0.088812
top_50_accuracy,0.939122,0.034875
top_50_f1_score,0.930957,0.040429
top_75_accuracy,0.918155,0.022458
top_75_f1_score,0.909308,0.018027


## Diabetes

In [17]:
%%time

diabetes = pd.read_csv("../data/processed/diabetes.csv")

experiment = ConformalInferenceExperiment(
    data=diabetes, 
    target="Outcome",
    random_state=RANDOM_SEED
)
metrics = experiment.run()
metrics

CPU times: user 842 ms, sys: 3.7 ms, total: 846 ms
Wall time: 846 ms


Unnamed: 0,mean,std
overall_accuracy,0.768237,0.024967
overall_f1_score,0.630647,0.032666
top_25_accuracy,0.892794,0.021332
top_25_f1_score,0.856557,0.060112
top_50_accuracy,0.867799,0.036305
top_50_f1_score,0.832877,0.040517
top_75_accuracy,0.797269,0.029223
top_75_f1_score,0.72304,0.029513


## Qsar

In [20]:
%%time

qsar = pd.read_csv("../data/processed/qsar.csv")

experiment = ConformalInferenceExperiment(
    data=qsar, 
    target="Class",
    random_state=RANDOM_SEED
)
metrics = experiment.run()
metrics

CPU times: user 5.45 s, sys: 837 ms, total: 6.29 s
Wall time: 1.38 s


Unnamed: 0,mean,std
overall_accuracy,0.854028,0.022077
overall_f1_score,0.780188,0.046006
top_25_accuracy,0.977493,0.030388
top_25_f1_score,0.95711,0.06385
top_50_accuracy,0.950507,0.016588
top_50_f1_score,0.926547,0.036069
top_75_accuracy,0.902687,0.016916
top_75_f1_score,0.862471,0.025067


## Titanic

In [22]:
%%time

titanic = pd.read_csv("../data/processed/titanic.csv")

experiment = ConformalInferenceExperiment(
    data=titanic, 
    target="Survived",
    random_state=RANDOM_SEED
)
metrics = experiment.run()
metrics

CPU times: user 1.27 s, sys: 3.71 ms, total: 1.27 s
Wall time: 1.27 s


Unnamed: 0,mean,std
overall_accuracy,0.794595,0.029221
overall_f1_score,0.72117,0.052346
top_25_accuracy,0.92213,0.054371
top_25_f1_score,0.900926,0.077954
top_50_accuracy,0.890366,0.024365
top_50_f1_score,0.863451,0.045495
top_75_accuracy,0.845207,0.02334
top_75_f1_score,0.810099,0.044441
