In [9]:
import numpy as np
from sklearn.datasets import make_classification, load_iris, load_wine, load_breast_cancer
from sklearn.model_selection import train_test_split
import torch
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from simple_model import DyadOneHotPairDataset, DyadRankingModel, create_dyads

class ConformalPredictor:

    def __init__(self, model, alpha=0.05):
        self.model = model
        self.alpha = alpha

    def fit(self, X, y, cal_size=0.33,**kwargs):
        X_train, X_cal, y_train, y_cal = train_test_split(X,y,test_size=cal_size)
        self.model.fit(X_train, y_train,**kwargs)
        y_pred_cal = self.model.predict_proba(X_cal)
        self.scores = 1 - y_pred_cal[np.arange(len(y_cal)), y_cal]
        n = len(self.scores)
        self.threshold = np.quantile(self.scores, np.ceil((n+1)*(1-self.alpha))/n, method="inverted_cdf")

    def predict_set(self,X):
        y_probas = self.model.predict_proba(X)
        pred_sets = []
        for y_proba in y_probas:
            pred_set = np.where(1 - y_proba <= self.threshold)[0]
            pred_sets.append(pred_set)
        return pred_sets
    


class ConformalRankingPredictor:
    def __init__(self, num_classes, alpha=0.05):
        self.num_classes = num_classes
        self.alpha = 0.05

    def fit(self, X, y, cal_size=0.33, hidden_dim=16, **kwargs):
        X_train, X_cal, y_train, y_cal = train_test_split(X,y,test_size=cal_size)
        self.model = DyadRankingModel(input_dim = X_train.shape[1] + y.max()+1, hidden_dim=16)
        self.model.fit(X_train, y_train,**kwargs)
        
        # # here we typically compute non conformity scores. For the ranker
        # # we use the predicted latent skill value
        
        # y_pred_cal = self.model.predict_proba(X_cal)
        # self.scores = 1 - y_pred_cal[np.arange(len(y_cal)), y_cal]
        cal_dyads = create_dyads(X_cal, y_cal, self.num_classes)
        with torch.no_grad():
            self.scores = self.model(cal_dyads).detach().numpy()
        n = len(self.scores)
        # TODO check alpha here
        self.threshold = np.quantile(self.scores, np.ceil((n+1)*(self.alpha))/n, method="inverted_cdf")

    def predict_set(self,X):
                
        y_skills = self.model.predict_class_skills(X)
        

        pred_sets = []
        for y_skill in y_skills:
            pred_set = np.where(y_skill > self.threshold)[0]
            pred_sets.append(pred_set)
        return pred_sets

In [10]:
# X, y = make_classification(n_samples=5000, n_informative = 5, n_classes=2, n_features=40, n_redundant=0, n_repeated=0)
X,y = load_breast_cancer(return_X_y=True)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33)

In [11]:
from simple_model import ClassifierModel, DyadRankingModel
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier


# clf = ClassifierModel(input_dim = X_train.shape[1], hidden_dim=16, output_dim=y.max()+1)
clf = RandomForestClassifier()

cp = ConformalPredictor(clf, alpha=0.05)

cp.fit(X_train, y_train)
pred_sets_clf = cp.predict_set(X_test)

# crp = ConformalRankingPredictor(num_classes=y_train.max()+1, alpha= 0.05)
# crp.fit(X_train,y_train, num_epochs=1000, learning_rate=0.001)

# pred_sets_rnk = crp.predict_set(X_test)


In [12]:

coverage_clf = np.mean([y_test[i] in pred_sets_clf[i] for i in range(len(y_test))])
efficiency_clf = np.mean([len(pred_sets_clf[i]) for i in range(len(y_test))])

# coverage_rnk = np.mean([y_test[i] in pred_sets_rnk[i] for i in range(len(y_test))])
# efficiency_rnk = np.mean([len(pred_sets_rnk[i]) for i in range(len(y_test))])

y_test_clf = cp.model.predict(X_test)
# y_test_rnk = crp.model.predict_class(X_test)

print(f"Accuracy clf {accuracy_score(y_test_clf, y_test)}")
# print(f"Accuracy rnk {accuracy_score(y_test_rnk, y_test)}")

print(f"Coverage clf {coverage_clf} efficiency clf {efficiency_clf}")
# print(f"Coverage rnk {coverage_rnk} efficiency rnk {efficiency_rnk}")


Accuracy clf 0.9627659574468085
Coverage clf 0.9468085106382979 efficiency clf 0.9787234042553191


In [13]:
y_test

array([0, 0, 1, 1, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0, 1, 1, 1, 1, 1, 0, 0,
       1, 0, 0, 0, 1, 1, 0, 0, 1, 1, 0, 1, 1, 0, 1, 1, 1, 0, 1, 0, 1, 0,
       1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1,
       0, 0, 1, 0, 0, 1, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 0, 1,
       1, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 0, 1, 1, 0, 1,
       1, 1, 0, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1, 0, 1, 0, 1, 1, 1, 0, 1, 1,
       1, 1, 1, 0, 1, 1, 0, 1, 1, 0, 0, 1, 1, 0, 1, 1, 0, 1, 0, 1, 0, 1,
       1, 0, 1, 1, 1, 0, 0, 0, 1, 0, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1,
       1, 1, 1, 0, 0, 0, 1, 1, 1, 0, 0, 1])