In [17]:
import sys

sys.path.append("..")
from sklearn.model_selection import train_test_split

import numpy as np
from nonconformist.base import ClassifierAdapter
from nonconformist.cp import IcpClassifier
from nonconformist.nc import ClassifierNc, NcFactory

from ex4.ada_boost import BoostedSeqTree
from ex5.util import parse_dataset

#DATASET_PATH = "../datasets/pioneer.txt"  ## 160 lines
# DATASET_PATH = "../datasets/auslan2.txt"  ## 200 lines
# DATASET_PATH = "../datasets/context.txt"  ## 240 lines
# DATASET_PATH = "../datasets/aslbu.txt"  #### 424 lines
# DATASET_PATH = "../datasets/skating.txt"  ## 530 lines
DATASET_PATH = "../datasets/reuters.txt"  # 1010 lines
# DATASET_PATH = "../datasets/webkb.txt"  ### 3667 lines
# DATASET_PATH = "../datasets/news.txt"  #### 4976 lines
# DATASET_PATH = "../datasets/unix.txt"  #### 5472 lines
ITERATIONS = 10

SIGNIFICANCE = 0.05

In [18]:

class MyClassifierAdapter(ClassifierAdapter):
    def __init__(self, model: BoostedSeqTree):
        super().__init__(model, None)

    def fit(self, x, y):
        self.model.fit(x, y, ITERATIONS)

    def predict(self, x):
        return np.array(
            [
                np.array(
                    [self.model.predict_prob(x, 1), self.model.predict_prob(x, -1)]
                )
                for x in x
            ]
        )


In [19]:
def custom_train_test_split(X,Y, ratio = 0.8):
    # consider each class separately
    classes = np.unique(Y)
    X_train = []
    X_test = []
    Y_train = []
    Y_test = []
    for c in classes:
        data = X[Y == c]
        x_train, x_test = np.split(data, [int(ratio * len(data))])
        X_train.append(x_train)
        X_test.append(x_test)
        Y_train.append(np.full(len(x_train), c))
        Y_test.append(np.full(len(x_test), c))
    
    X_train = np.concatenate(X_train)
    X_test = np.concatenate(X_test)
    Y_train = np.concatenate(Y_train)
    Y_test = np.concatenate(Y_test)
    return X_train, X_test, Y_train, Y_test


In [20]:
def fit(X, Y):
    PHI = {}
    bsts = {}
    classes = np.unique(Y)

    print("Classes:", classes)

    for c in classes:
        print("class:", c)
        X_cl = np.copy(X)
        Y_cl = np.array([1 if y == c else -1 for y in Y])


        # split the dataset into train and calibration sets
        X_t, X_c, Y_t, Y_c = train_test_split(X_cl, Y_cl, test_size=0.2)

        # make sure that in the calibration set there are both positive and negative examples
        if len(np.unique(Y_t)) != 2 or len(np.unique(Y_c)) != 2:
            raise ValueError("Failed to split correctly")

        # train the model
        print(f"Training model for class {c}")
        bst = BoostedSeqTree()
        model = MyClassifierAdapter(bst)
        nc = ClassifierNc(model)
        icp = IcpClassifier(nc)

        print("Fitting Classifier")
        icp.fit(X_t, Y_t)

        print(f"Calibrating model for class {c}")
        icp.calibrate(X_c, Y_c)
        bsts[c] = bst
        PHI[c] = icp

    return PHI, bsts

In [21]:
dataset = parse_dataset(DATASET_PATH)

# divide training and test sets
X_test, X, Y_test, Y = custom_train_test_split(dataset[:, 0], dataset[:, 1])

PHI, bsts = fit(X, Y)

Classes: ['acq' 'crude' 'earn' 'trade']
class: acq
Training model for class acq
Fitting Classifier


KeyboardInterrupt: 

In [None]:
# save to pickle the models
import pickle
import datetime

with open(f"models_{DATASET_PATH.split('/')[-1]}_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.pkl", "wb") as f:
    pickle.dump((PHI, bsts), f)

f.close()

AttributeError: Can't pickle local object 'BaseIcp.__init__.<locals>.<lambda>'

In [None]:
def predict(x):
    predictions = []
    for c, phi in PHI.items():
        print(f"Predicting for class {c}")
        p = phi.predict(x, significance=SIGNIFICANCE)
        predictions.append(p)
        print(f"Conformal prediction: {p}")

        print(bsts[c])
        cp = bsts[c].predict(x[0])
        print(f"Class prediction: {cp}")

        ppp = bsts[c].predict_prob(x[0], 1)
        print(f"Probability prediction positive: {ppp}")

        ppn = bsts[c].predict_prob(x[0], -1)
        print(f"Probability prediction negative: {ppn}")

    return np.array(predictions)

In [None]:
test_data = np.array([np.array([x for x in s]) for s in X_test])
print(test_data.shape)

for idx, i in enumerate(test_data):
    print("###########################################")
    print("Predicting", i)
    predict(np.array([i]))
    print("real:", Y_test[idx])
    print("###########################################")