In [5]:
# Importing substantial libraries
import os

%matplotlib inline

import torch
import torch.nn as nn
import numpy as np

import random
import matplotlib.pyplot as plt
import pandas as pd
from config import get_cfg_defaults

from kale.utils.download import download_file_by_url
from kale.loaddata.image_access import read_dicom_images
from kale.interpret import visualize
from kale.embed.mpca import MPCA

from sklearn.model_selection import train_test_split
import torchvision
import torchvision.transforms as transforms
import torch.nn.functional as F
import matplotlib.lines as mlines
import scipy.integrate as integrate

from sklearn.svm import SVC
from sklearn.model_selection import cross_validate
from sklearn.metrics import brier_score_loss
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report
from sklearn.metrics import log_loss
#from kale.pipeline.mpca_trainer import MPCATrainer
from sklearn.calibration import CalibratedClassifierCV

In [6]:
# =============================================================================
# Author: Shuo Zhou, szhou20@sheffield.ac.uk
#         Haiping Lu, h.lu@sheffield.ac.uk or hplu@ieee.org
# =============================================================================

"""Implementation of MPCA->Feature Selection->Linear SVM/LogisticRegression Pipeline

References:
    [1] Swift, A. J., Lu, H., Uthoff, J., Garg, P., Cogliano, M., Taylor, J., ... & Kiely, D. G. (2020). A machine
    learning cardiac magnetic resonance approach to extract disease features and automate pulmonary arterial
    hypertension diagnosis. European Heart Journal-Cardiovascular Imaging.
    [2] Song, X., Meng, L., Shi, Q., & Lu, H. (2015, October). Learning tensor-based features for whole-brain fMRI
    classification. In International Conference on Medical Image Computing and Computer-Assisted Intervention
    (pp. 613-620). Springer, Cham.
    [3] Lu, H., Plataniotis, K. N., & Venetsanopoulos, A. N. (2008). MPCA: Multilinear principal component analysis of
    tensor objects. IEEE Transactions on Neural Networks, 19(1), 18-39.
"""

import logging

import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.feature_selection import f_classif
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV
from sklearn.svm import LinearSVC, SVC
from sklearn.utils.validation import check_is_fitted



param_c_grids = list(np.logspace(-4, 2, 7))
classifiers = {
    "svc": [SVC, {"kernel": ["linear"], "C": param_c_grids}],
    "linear_svc": [LinearSVC, {"C": param_c_grids}],
    "lr": [LogisticRegression, {"C": param_c_grids}],
}

# k-fold cross validation used for grid search, i.e. searching for optimal value of C
default_search_params = {"cv": 5}
default_mpca_params = {"var_ratio": 0.97, "return_vector": True}


class MPCATrainer(BaseEstimator, ClassifierMixin):
    def __init__(
        self, classifier="svc", classifier_params="auto", mpca_params=None, n_features=None, search_params=None
    ):
        """Trainer of pipeline: MPCA->Feature selection->Classifier

        Args:
            classifier (str, optional): Available classifier options: {"svc", "linear_svc", "lr"}, where "svc" trains a
                support vector classifier, supports both linear and non-linear kernels, optimizes with library "libsvm";
                "linear_svc" trains a support vector classifier with linear kernel only, and optimizes with library
                "liblinear", which suppose to be faster and better in handling large number of samples; and "lr" trains
                a classifier with logistic regression. Defaults to "svc".
            classifier_params (dict, optional): Parameters of classifier. Defaults to 'auto'.
            mpca_params (dict, optional): Parameters of MPCA. Defaults to None.
            n_features (int, optional): Number of features for feature selection. Defaults to None, i.e. all features
                after dimension reduction will be used.
            search_params (dict, optional): Parameters of grid search. Defaults to None.

        """
        if classifier not in ["svc", "linear_svc", "lr"]:
            error_msg = "Valid classifier should be 'svc', 'linear_svc', or 'lr', but given %s" % classifier
            logging.error(error_msg)
            raise ValueError(error_msg)

        self.classifier = classifier
        # init mpca object
        if mpca_params is None:
            self.mpca_params = default_mpca_params
        else:
            self.mpca_params = mpca_params
        self.mpca = MPCA(**self.mpca_params)
        # init feature selection parameters
        self.n_features = n_features
        self.feature_order = None
        # init classifier object
        if search_params is None:
            self.search_params = default_search_params
        else:
            self.search_params = search_params

        self.auto_classifier_param = False
        if classifier_params == "auto":
            self.auto_classifier_param = True
            clf_param_grid = classifiers[classifier][1]
            self.grid_search = GridSearchCV(
                classifiers[classifier][0](), param_grid=clf_param_grid, **self.search_params
            )
            self.clf = None
        elif isinstance(classifier_params, dict):
            self.clf = classifiers[classifier][0](**classifier_params)
        else:
            error_msg = "Invalid classifier parameter type"
            logging.error(error_msg)
            raise ValueError(error_msg)

        self.classifier_params = classifier_params

    def fit(self, x, y):
        """Fit a pipeline with the given data x and labels y

        Args:
            x (array-like tensor): input data, shape (n_samples, I_1, I_2, ..., I_N)
            y (array-like): data labels, shape (n_samples, )

        Returns:
            self
        """
        # fit mpca
        self.mpca.fit(x)
        self.mpca.set_params(**{"return_vector": True})
        self.x_transformed = self.mpca.transform(x)

        # feature selection
        if self.n_features is None:
            self.n_features = self.x_transformed.shape[1]
            self.feature_order = self.mpca.idx_order
        else:
            f_score, p_val = f_classif(self.x_transformed, y)
            self.feature_order = (-1 * f_score).argsort()
        self.x_transformed = self.x_transformed[:, self.feature_order][:, : self.n_features]

        # fit classifier
        if self.auto_classifier_param:
            self.grid_search.param_grid["C"].append(1 / x.shape[0])
            self.grid_search.fit(self.x_transformed, y)
            self.clf = self.grid_search.best_estimator_
        if self.classifier == "svc":
            self.clf.set_params(**{"probability": True})

        self.clf.fit(self.x_transformed, y)

    def predict(self, x):
        """Predict the labels for the given data x

        Args:
            x (array-like tensor): input data, shape (n_samples, I_1, I_2, ..., I_N)

        Returns:
            array-like: Predicted labels, shape (n_samples, )
        """
        return self.clf.predict(self._extract_feature(x))

    def decision_function(self, x):
        """Decision scores of each class for the given data x

        Args:
            x (array-like tensor): input data, shape (n_samples, I_1, I_2, ..., I_N)

        Returns:
            array-like: decision scores, shape (n_samples,) for binary case, else (n_samples, n_class)
        """
        return self.clf.decision_function(self._extract_feature(x))

    def predict_proba(self, x):
        """Probability of each class for the given data x. Not supported by "linear_svc".

        Args:
            x (array-like tensor): input data, shape (n_samples, I_1, I_2, ..., I_N)

        Returns:
            array-like: probabilities, shape (n_samples, n_class)
        """
        if self.classifier == "linear_svc":
            error_msg = "Linear SVC does not support computing probability."
            logging.error(error_msg)
            raise ValueError(error_msg)
        return self.clf.predict_proba(self._extract_feature(x))

    def _extract_feature(self, x):
        """Extracting features for the given data x with MPCA->Feature selection

        Args:
            x (array-like tensor): input data, shape (n_samples, I_1, I_2, ..., I_N)

        Returns:
            array-like: n_new, shape (n_samples, n_features)
        """
        check_is_fitted(self.clf)
        self.x_transformed = self.mpca.transform(x)

        return self.x_transformed[:, self.feature_order][:, : self.n_features]

In [9]:
cfg_path = "pykale/examples/cmri_mpca/configs/tutorial_svc.yaml" # Path to `.yaml` config file

cfg = get_cfg_defaults()
cfg.merge_from_file(cfg_path)
cfg.freeze()
print(cfg)

KeyError: 'Non-existent config key: DAN'

In [None]:
base_dir = cfg.DATASET.BASE_DIR
file_format = cfg.DATASET.FILE_FORAMT
download_file_by_url(cfg.DATASET.SOURCE, cfg.DATASET.ROOT, "%s.%s" % (base_dir, file_format), file_format)

In [None]:
img_path = os.path.join(cfg.DATASET.ROOT, base_dir, cfg.DATASET.IMG_DIR)
images = read_dicom_images(img_path, sort_instance=True, sort_patient=True)

mask_path = os.path.join(cfg.DATASET.ROOT, base_dir, cfg.DATASET.MASK_DIR)
mask = read_dicom_images(mask_path, sort_instance=True)

In [None]:
# Plotting all images from a random subject
i = random.randint(0, 178)

fig = plt.figure(figsize=(15,15))
for j in range(1,21):
    fig.add_subplot(4,5, j)
    plt.imshow(images[i][j-1], cmap="gray")



In [None]:
fig = plt.figure(figsize=(3,3))
plt.imshow(images[0][0], cmap='gray')
plt.savefig('image_org', format='jpg')

In [None]:
images.shape # (179, 20, 64, 64)

In [None]:
landmark_path = os.path.join(cfg.DATASET.ROOT, base_dir, cfg.DATASET.LANDMARK_FILE)
landmark_df = pd.read_csv(landmark_path, index_col="Subject")  # read .csv file as dataframe
landmarks = landmark_df.iloc[:, :6].values
y = landmark_df["Group"].values
y[np.where(y != 0)] = 1 

In [None]:
#visualisation with landmarks

visualize.plot_multi_images(
    images[:, 0, ...], marker_locs=landmarks, im_kwargs=dict(cfg.IM_KWARGS), marker_kwargs=dict(cfg.MARKER_KWARGS)
).show()

In [None]:
from kale.prepdata.image_transform import mask_img_stack, normalize_img_stack, reg_img_stack, rescale_img_stack

In [None]:
img_reg, max_dist = reg_img_stack(images.copy(), landmarks)

In [None]:
#Masking
img_masked = mask_img_stack(img_reg.copy(), mask[0, 0, ...])

In [None]:
#Plotting masked images
i = random.randint(0, 178)

fig = plt.figure(figsize=(15,15))
for j in range(1,21):
    fig.add_subplot(4,5, j)
    plt.imshow(img_masked[i][j-1], cmap="gray")

In [None]:
fig = plt.figure(figsize=(3,3))
plt.imshow(img_masked[0][0], cmap='gray')
plt.savefig('image_masked', format='jpg')

In [None]:
img_rescaled = rescale_img_stack(img_masked.copy(), scale=1 / 2)

In [None]:
fig = plt.figure(figsize=(3,3))
plt.imshow(img_rescaled[0][0], cmap='gray')
plt.savefig('image_rescaled', format='jpg')

In [None]:
img_rescaled.shape # (179, 20, 32, 32) -> rescaling would not be necessary for Deep Learning model

In [None]:
img_norm = normalize_img_stack(img_rescaled.copy())

In [None]:
fig = plt.figure(figsize=(3,3))
plt.imshow(img_norm[0][0], cmap='gray')
plt.savefig('image_norm', format='jpg')

In [None]:
img_norm.shape

In [None]:
# Training

x = img_norm.copy() # creating copy to avoid modifying original images due to the pointer nature
trainer = MPCATrainer(classifier='svc', n_features=200)
cv_results = cross_validate(trainer, x, y, cv=10, scoring=["accuracy", "roc_auc"], n_jobs=1)

In [None]:
cv_results

In [None]:
print(f"Averaged test accuracy: {cv_results['test_accuracy'].mean()}")
print(f"Averaged test area under ROC curve: {cv_results['test_roc_auc'].mean()}")

In [None]:
from sklearn.metrics import roc_curve, auc
from sklearn.model_selection import KFold
from sklearn.calibration import calibration_curve

In [None]:
z = img_norm
mpca = MPCA(var_ratio=0.97)

In [None]:
z_projected = mpca.fit_transform(z)

In [None]:
z_projected = mpca.transform(z)

In [None]:
z_projected.shape

In [None]:
img_mpca = mpca.transform(z)
plt.imshow(img_mpca[0][0], cmap="gray")


In [None]:
clf = SVC(probability=True, gamma='auto')
clf.get_params().keys()

In [None]:
nsamples, nbatches, nx, ny = img_mpca.shape
img_mpca.shape

In [None]:
d2_img_mpca = np.reshape(img_mpca, (nsamples, -1))
y.shape

In [None]:
X_train, X_test, y_train, y_test = train_test_split(d2_img_mpca, y, test_size=0.2)
params = {'kernel': ('linear', 'rbf'), 'C':[1, 10, 20]}
grd = GridSearchCV(clf, params, scoring='roc_auc', cv=10)
grd.fit(X_train, y_train)

clf = SVC(probability=True, gamma='auto', kernel='linear', C=1)

In [None]:
pd.DataFrame(grd.cv_results_)

In [None]:
#cv_results = KFold(n_splits=10, shuffle=True, )
clf.fit(X_train, y_train)
y_hat = clf.predict(X_test)
y_hat

In [None]:

fpr, tpr, thresholds = roc_curve(y_test, y_hat, pos_label=1)
roc_auc = auc(fpr, tpr)
plt.figure()
lw=2
plt.plot(fpr, tpr, color='darkorange', lw=lw, label='ROC curve (area ={0:.2f})'.format(roc_auc))
plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic example')
plt.legend(loc="lower right")
plt.show()

In [None]:
# plotting calibration curve
y_probs = clf.predict_proba(X_test)[:,1]
prob_true, prob_pred = calibration_curve(y_test, y_probs, n_bins=10, normalize=True)

In [None]:
prob_pred

In [None]:
prob_true

In [None]:
fig, ax = plt.subplots()
plt.plot(prob_pred, prob_true)
line = mlines.Line2D([0, 1], [0, 1], color='black')
ax.add_line(line)
ax.set_xlabel("Probability of positives")
ax.set_ylabel("Fraction of positives")
fig.suptitle("Calibration curve for SVC for cMRI classification")

In [None]:
uncalibrated_report = classification_report(y_test, y_hat)
print(uncalibrated_report)

In [None]:
brier_score_loss(y_test, y_probs)

In [None]:
log_loss(y_test, y_probs)

In [None]:
from sklearn.metrics import balanced_accuracy_score

In [None]:
balanced_accuracy_score(y_test, y_hat)

In [None]:
# calibrating the classifier
from sklearn.calibration import CalibratedClassifierCV
clf = SVC(probability=True, gamma='auto', kernel='linear', C=1)

In [None]:
calibrated_clf = CalibratedClassifierCV(base_estimator=clf, cv=10, method="sigmoid")

In [None]:
calibrated_clf.fit(X_train, y_train)

In [None]:
X_train.shape

In [None]:
y_hat_calb = calibrated_clf.predict(X_test)
y_probs_calb = calibrated_clf.predict_proba(X_test)[:,1]
prob_true, prob_pred = calibration_curve(y_test, y_probs, n_bins=10, normalize=True)

In [None]:
fig, ax = plt.subplots()
plt.plot(prob_pred, prob_true)
line = mlines.Line2D([0, 1], [0, 1], color='black')
ax.add_line(line)
ax.set_xlabel("Probability of positives")
ax.set_ylabel("Fraction of positives")
fig.suptitle("Calibration curve for Calibrated SVC for cMRI classification (Platt scaling method)")

In [None]:
unique, counts = np.unique(y_train, return_counts=True)

In [None]:
counts[0]/(counts[0] + counts[1])

In [None]:
calibrated_report = classification_report(y_test, y_hat_calb)
print(calibrated_report)

In [None]:
brier_score_loss(y_test, y_probs_calb)

In [None]:
log_loss(y_test, y_probs_calb)

In [None]:
balanced_accuracy_score(y_test, y_hat_calb)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.2)

In [None]:
trainer = MPCATrainer(classifier='svc', n_features=None)
trainer.fit(X_train, y_train)
#cv_results = cross_validate(trainer, x, y, cv=10, scoring=["accuracy", "roc_auc"], n_jobs=1)

In [None]:
#print(f"Averaged test accuracy: {cv_results['test_accuracy'].mean()}")
#print(f"Averaged test area under ROC curve: {cv_results['test_roc_auc'].mean()}")

In [None]:
y_probs = trainer.predict_proba(X_test)[:,1]

In [None]:
prob_true, prob_pred = calibration_curve(y_test, y_probs, n_bins=10, normalize=True)

In [None]:
fig, ax = plt.subplots()
plt.plot(prob_pred, prob_true)
line = mlines.Line2D([0, 1], [0, 1], color='black')
ax.add_line(line)
plt.grid()
ax.set_xlabel("Probability of positives")
ax.set_ylabel("Fraction of positives")
fig.suptitle("Calibration curve for uncalibrated SVC for cMRI classification (MPCATrainer)")

In [None]:
calibrated_trainer = MPCATrainer(classifier='svc', n_features=None)
calibrated_trainer.fit(X_train, y_train)
#cv_results = cross_validate(calibrated_trainer, X_train, y_train, cv=10, scoring=["accuracy", "roc_auc"], n_jobs=1)

y_probs = calibrated_trainer.predict_proba(X_test)[:, 1]

In [None]:
calibrated_trainer.x_transformed

In [None]:
cv_results

In [None]:
print(f"Averaged test accuracy: {cv_results['test_accuracy'].mean()}")
print(f"Averaged test area under ROC curve: {cv_results['test_roc_auc'].mean()}")

In [None]:
from sklearn.linear_model import LogisticRegression

In [None]:
class PlattScaling:
    
    def transform(self, y_probs, model, y_test):
        self.lr = LogisticRegression()
        self.lr.fit(model.x_transformed, y_test)
        print(self.lr.coef_.shape)
        output = np.multiply(y_probs, self.lr.coef_) + self.lr.intercept_
        output = 1/(1 + np.exp(output))
        
        return output
    
    def calibrate(self, model, y_probs, y_test):
        #self.coef = model.coef_[0]
        #self.intercept = model.intercept_
        y_probs_calibrated = self.transform(y_probs, model, y_test)
        
        return y_probs_calibrated
    
    def expected_error(self, prob_true, prob_pred, y_probs):
        ese = np.sum(abs(prob_true - prob_pred) * ((len(prob_true) + 1) / len(y_probs)))
        
        return ese
    
    def calibration(self, y_test, model, y_probs):
        y_probs_calibrated = self.transform(y_probs, model, y_test)
        prob_true, prob_pred = calibration_curve(y_test, y_probs_calibrated, n_bins=10, normalize=True)
        ese = self.expected_error(prob_true, prob_pred, y_probs)
        
        return prob_true, prob_pred, ese

In [None]:
platt = PlattScaling()

In [None]:
# Here starts the best calibration that I could implement.

In [None]:
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.2, shuffle=True)
X_train_sub, X_val, y_train_sub, y_val = train_test_split(X_train, y_train, stratify=y_train, shuffle=True)

In [None]:
uncalibrated_trainer = MPCATrainer(classifier='svc', n_features=None)
calibrated_trainer = MPCATrainer(classifier='svc', n_features=None)

In [None]:
uncalibrated_trainer.fit(X_train, y_train)
calibrated_trainer.fit(X_train_sub, y_train_sub)

In [None]:
calibrated_trainer.clf = CalibratedClassifierCV(calibrated_trainer.clf, cv='prefit', method='sigmoid')
calibrated_trainer.fit(X_val, y_val)

In [None]:
y_probs_uncalb = uncalibrated_trainer.predict_proba(X_test)[:, 1]
y_probs_calb = calibrated_trainer.predict_proba(X_test)[:, 1]

In [None]:
prob_true_uncalb, prob_pred_uncalb = calibration_curve(y_test, y_probs_uncalb, n_bins=10, normalize=True)
prob_true_calb, prob_pred_calb = calibration_curve(y_test, y_probs_calb, n_bins=10, normalize=True)

In [None]:
ese_uncalb = platt.expected_error(prob_true_uncalb, prob_pred_uncalb, y_probs_uncalb)
ese_calb = platt.expected_error(prob_true_calb, prob_pred_calb, y_probs_calb)

In [None]:
fig, ax = plt.subplots(figsize=(15,15))
plt.plot(prob_pred_uncalb, prob_true_uncalb, label="Uncalibrated")
plt.plot(prob_pred_calb, prob_true_calb, label="Calibrated")
line = mlines.Line2D([0, 1], [0, 1], color='black')
ax.add_line(line)
plt.grid()
ax.set_xlabel("Probability of positives")
ax.set_ylabel("Fraction of positives")
ax.legend()
plt.text(0.7,0.25, f"Uncalibrated Expected error = {ese_uncalb:.3f}")
plt.text(0.7,0.2, f"Calibrated Expected error = {ese_calb:.3f}")
fig.suptitle("Calibration curves for SVC (MPCATrainer)", fontsize=14)
fig.savefig("Calibrated vs uncalibrated", format='jpg')

In [None]:
calibrated_trainer.x_transformed.shape

In [None]:
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=123)

In [None]:
expected_errors = []
for i in range(1, 1001, 10):
    trainer = MPCATrainer(classifier='svc', n_features=i)
    trainer.fit(X_train, y_train)
    y_probs = trainer.predict_proba(X_test)[:, 1]
    
    prob_true, prob_pred = calibration_curve(y_test, y_probs, n_bins=10, normalize=True)
    ese = platt.expected_error(prob_true, prob_pred, y_probs)
    
    expected_errors.append(ese)
    

In [None]:
domain = range(1, 1001, 10)
fig, ax = plt.subplots(figsize=(13,15))
plt.plot(domain, expected_errors)
ax.grid()
ax.set_xlabel("Number of selected features")
ax.set_ylabel("Expected Calibration Error")
fig.suptitle("Impact of number of selected features on reliability", fontsize=14)
plt.savefig("impact1", format="jpg")

In [None]:
index = expected_errors.index(min(expected_errors))

In [None]:
index

In [None]:
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=1)

In [None]:
expected_errors = []
for i in range(1, 1001, 10):
    trainer = MPCATrainer(classifier='svc', n_features=i)
    trainer.fit(X_train, y_train)
    y_probs = trainer.predict_proba(X_test)[:, 1]
    
    prob_true, prob_pred = calibration_curve(y_test, y_probs, n_bins=10, normalize=True)
    ese = platt.expected_error(prob_true, prob_pred, y_probs)
    
    expected_errors.append(ese)

In [None]:
domain = range(1, 1001, 10)
fig, ax = plt.subplots(figsize=(13,15))
plt.plot(domain, expected_errors)
ax.grid()
ax.set_xlabel("Number of selected features")
ax.set_ylabel("Expected Calibration Error")
fig.suptitle("Impact of number of selected features on reliability", fontsize=14)
plt.savefig("impact2", format="jpg")

In [None]:
index = expected_errors.index(min(expected_errors))

In [None]:
index

In [None]:
y

In [None]:
import seaborn as sns
from collections import Counter

In [None]:
count = Counter(y)
count

In [None]:
sns.set(rc={'figure.figsize':(11.7,8.27)})
sns.histplot(y)

In [None]:
# imbalanced dataset, claimed also due to the poor performance of algorithm on classification of 0 class data

In [None]:
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=23)


In [None]:
uncalibrated_trainer = MPCATrainer(classifier='svc', n_features=300)
calibrated_trainer = MPCATrainer(classifier='svc', n_features=300)
calibrated_trainer.clf = CalibratedClassifierCV(calibrated_trainer.clf, cv=10, method='sigmoid')

In [None]:
uncalibrated_trainer.fit(X_train, y_train)
calibrated_trainer.fit(X_train, y_train)

In [None]:
y_hat_uncalb = uncalibrated_trainer.predict(X_test)
y_hat_calb = calibrated_trainer.predict(X_test)

In [None]:
y_uncalb_probs = uncalibrated_trainer.predict_proba(X_test)
y_calb_probs = calibrated_trainer.predict_proba(X_test)

In [None]:
y_probs_uncalb = uncalibrated_trainer.predict_proba(X_test)[:, 1]
y_probs_calb = calibrated_trainer.predict_proba(X_test)[:, 1]

In [None]:
print(classification_report(y_test, y_hat_uncalb))

In [None]:
print(classification_report(y_test, y_hat_calb))

In [None]:
y_hat_uncalb

In [None]:
y_hat_calb

In [None]:
fig, ax = plt.subplots(figsize=(15,15))
plt.plot(prob_pred_uncalb, prob_true_uncalb, label="Uncalibrated")
plt.plot(prob_pred_calb, prob_true_calb, label="Calibrated")
line = mlines.Line2D([0, 1], [0, 1], color='black')
ax.add_line(line)
plt.grid()
ax.set_xlabel("Probability of positives")
ax.set_ylabel("Fraction of positives")
ax.legend()
plt.text(0.7,0.25, f"Uncalibrated Expected error = {ese_uncalb:.3f}")
plt.text(0.7,0.2, f"Calibrated Expected error = {ese_calb:.3f}")
fig.suptitle("Calibration curves for SVC (MPCATrainer)", fontsize=14)
fig.savefig("Calibrated vs uncalibrated", format='jpg')

Hypothesis: Creating additional images through data augmentation of deficient class will increase the probabilistic estimation of an algorithm due to removal of the imbalanced characteristics of the dataset.


In [None]:
x.shape

In [None]:
import torchvision.transforms as transforms
from PIL import Image
import cv2

In [None]:
!pip install albumentations
import albumentations as A

In [None]:
plt.imshow(x[0][0])
plt.show()

Now, some data augmentation is going to be implemented.


In [None]:
transform = A.Compose(
    [
        A.Rotate(limit=40, p=0.9, border_mode=cv2.BORDER_CONSTANT),
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.1)
    ]
)

In [None]:
# Creating an array of synthesised images
# Create a mechanism which would choose an index of image with label '0' so that augmented images would come from image of this class.
augmented_images = 0
img = []
labels = []
for k in range(50):
    idx = np.where(y==0)[0][k]
    image = X_train[idx][0]
    images_list = [image]
    label = y[idx]
    for i in range(19):
        augmentations = transform(image=image)
        augmented_img = augmentations["image"]
        images_list.append(augmented_img)
    labels.append(label)
    img.append(images_list)
augmented_images = np.array(img)

In [None]:
augmented_images.shape, len(labels)

In [None]:
plt.imshow(augmented_images[9][0])
plt.show()

In [None]:
y_aug = np.array(labels)

In [None]:
y_aug

In [None]:
y_aug = y_aug.flatten()
y_aug.shape

In [None]:
y_train.reshape((y_train.shape[0], 1))
y_train.shape

In [None]:
#for j in range(augmented_images.shape[0]):
X_train_aug = np.concatenate((X_train, augmented_images), axis=0)
y_train_aug = np.concatenate((y_train, y_aug), axis=0)

In [None]:
X_train_aug.shape, y_train_aug.shape

In [None]:
augmented_images.shape

In [None]:
def expected_calibration_error(y_predicted: np.ndarray, y_probs: np.ndarray, y_test: np.ndarray, n_bins=10) -> float:
    n = y_test.shape[0]
    
    if y_test.ndim > 1:
        y_test = np.argmax(y_test)
    y_pred_idx = np.argmax(y_predicted)
    acc = np.zeros(n_bins) 
    conf = np.zeros(n_bins)
    Bm = np.zeros(n_bins)
    
    for m in range(n_bins):
        a = m / n_bins
        b = (m+1) / n_bins
        for i in range(n):
            if y_predicted[i] > a and y_predicted[i] <= b:
                Bm[m] += 1
                if y_predicted[i] == y_test[i]:
                    acc[m] += 1
                conf[m] += y_probs[i]
        if Bm[m] != 0:
            acc[m] = acc[m] / Bm[m]
            conf[m] = conf[m] / Bm[m]
    
    epsilon = 0
    for m in range(n_bins):
        epsilon += Bm[m] * np.abs((acc[m] - conf[m]))
    return epsilon / np.sum(Bm[m])    
   
    

In [None]:
uncalibrated_report = classification_report(y_test, y_hat_uncalb)
print(uncalibrated_report)

In [None]:
calibrated_report = classification_report(y_test, y_hat_calb)
print(calibrated_report)

In [None]:
expected_calibration_error(y_hat_calb, y_calb_probs[:, 1], y_test), expected_calibration_error(y_hat_calb, y_uncalb_probs[:, 1], y_test)

This implementation of expected calibration error is much better and accurate. Although there are still cases of data shuffling data calibation will not help (or even make the error large than for uncalibrated trainer. However, it's still worth doing that.

In [None]:
brier_score_loss(y_test, y_calb_probs[:, 0]), brier_score_loss(y_test, y_uncalb_probs[:, 0])

In [None]:
brier_score_loss(y_test, y_calb_probs[:, 1]), brier_score_loss(y_test, y_uncalb_probs[:, 1])

In [None]:
uncalibrated_trainer = MPCATrainer(classifier='svc', n_features=300)
calibrated_trainer = MPCATrainer(classifier='svc', n_features=300)
calibrated_trainer.clf = CalibratedClassifierCV(calibrated_trainer.clf, cv=10, method='sigmoid')

In [None]:
uncalibrated_trainer.fit(X_train_aug, y_train_aug)
calibrated_trainer.fit(X_train_aug, y_train_aug)

In [None]:
y_hat_uncalb = uncalibrated_trainer.predict(X_test)
y_hat_calb = calibrated_trainer.predict(X_test)

In [None]:
y_uncalb_probs = uncalibrated_trainer.predict_proba(X_test)
y_calb_probs = calibrated_trainer.predict_proba(X_test)

In [None]:
print(classification_report(y_test, y_hat_calb))

In [None]:
print(classification_report(y_test, y_hat_uncalb))

In [None]:
expected_calibration_error(y_hat_calb, y_calb_probs[:, 1], y_test), expected_calibration_error(y_hat_calb, y_uncalb_probs[:, 1], y_test)

In [None]:
brier_score_loss(y_test, y_calb_probs[:, 0]), brier_score_loss(y_test, y_uncalb_probs[:, 0])

In [None]:
brier_score_loss(y_test, y_calb_probs[:, 1]), brier_score_loss(y_test, y_uncalb_probs[:, 1])

In [None]:
expected_errors = []
accuracies = []
for i in range(1, 1001, 10):
    trainer = MPCATrainer(classifier='svc', n_features=i)
    trainer.fit(X_train, y_train)
    y_hat = trainer.predict(X_test)
    y_probs = trainer.predict_proba(X_test)[:, 1]
    
    prob_true, prob_pred = calibration_curve(y_test, y_probs, n_bins=10, normalize=True)
    ese = expected_calibration_error(y_hat, y_probs, y_test)
    acc = balanced_accuracy_score(y_test, y_hat)
    
    accuracies.append(acc)
    expected_errors.append(ese)

In [None]:
domain = range(1, 1001, 10)
fig, ax = plt.subplots(figsize=(13,15))
plt.plot(domain, expected_errors)
ax.grid()
ax.set_xlabel("Number of selected features")
ax.set_ylabel("Expected Calibration Error")
fig.suptitle("Impact of number of selected features on reliability", fontsize=14)

In [None]:
domain = range(1, 1001, 10)
fig, ax = plt.subplots(figsize=(13,15))
plt.plot(domain, accuracies)
ax.grid()
ax.set_xlabel("Number of selected features")
ax.set_ylabel("Accuracy of an uncalibrated trainer")
fig.suptitle("Impact of number of selected features on reliability", fontsize=14)

In [None]:
expected_errors[17], accuracies[17]

In [None]:
accuracies