In [None]:
import numpy as np
import pandas as pd
import numpy as np
import pandas as pd 
from collections import namedtuple, Counter, defaultdict
import statistics
from sklearn.metrics import precision_recall_fscore_support, accuracy_score, classification_report, confusion_matrix
from sklearn.metrics import brier_score_loss
from scipy.special import softmax
from scipy.optimize import minimize 
import time
from keras.losses import categorical_crossentropy
from os.path import join
import sklearn.metrics as metrics
from sklearn.metrics import log_loss

In [None]:
#This notebook is created to run value analysis with your own model on multi-class datasets
#Use this code to first calibrate your model via temperature scaling, and then obtain the value analysis
#specify the required info here and run the notebook to receive value analysis result for your model
modelName = 'name_of_your_model'
resPath = 'define_the_path_for_results'
data_folder = 'define_the_path_where_you_keep_the_confidence_values_of_your_model_and_the_datasets'
confidencesToVal = 'name_of_the_numpy_array_for_the_confidences_on_validation_set.npy'
dataToVal = 'name_of_validation_set.csv'
confidencesToTest = 'name_of_the_numpy_array_for_the_confidences_on_test_set.npy'
dataToTest ='name_of_test_set.csv'
ground_truth_column = 'specify_the_column_for_ground_truth_in_your_csv_files'
txt = 'specify_the_column_for_text_in_your_csv_files'
datasetName = 'name_of_your_dataset'

In [None]:
#result file
logfile_name = "{}_{}_tscaled".format(datasetName)

#cost-based parameters
Vr = 0.0
Vc = 1.0
Vw_g = list(np.arange(0, -10.1, -1))
t_g = list(np.arange(0, 1.01, 0.01))

In [None]:
logits_val = np.load(data_folder  + confidencesToVal)
y_val_df = pd.read_csv(data_folder + dataToVal)
y_val = y_val_df[ground_truth_column].values

logits_test = np.load(data_folder  + confidencesToTest)
y_test_df = pd.read_csv(data_folder + dataToTest)
y_test = y_test_df[ground_truth_column].values

In [None]:
def softmax(x):
    """
    Compute softmax values for each sets of scores in x.
    
    Parameters:
        x (numpy.ndarray): array containing m samples with n-dimensions (m,n)
    Returns:
        x_softmax (numpy.ndarray) softmaxed values for initial (m,n) array
    """
    e_x = np.exp(x - np.max(x))
    return e_x / e_x.sum(axis=1, keepdims=1)

def compute_acc_bin(conf_thresh_lower, conf_thresh_upper, conf, pred, true):
    """
    # Computes accuracy and average confidence for bin
    
    Args:
        conf_thresh_lower (float): Lower Threshold of confidence interval
        conf_thresh_upper (float): Upper Threshold of confidence interval
        conf (numpy.ndarray): list of confidences
        pred (numpy.ndarray): list of predictions
        true (numpy.ndarray): list of true labels
    
    Returns:
        (accuracy, avg_conf, len_bin): accuracy of bin, confidence of bin and number of elements in bin.
    """
    filtered_tuples = [x for x in zip(pred, true, conf) if x[2] > conf_thresh_lower and x[2] <= conf_thresh_upper]
    if len(filtered_tuples) < 1:
        return 0,0,0
    else:
        correct = len([x for x in filtered_tuples if x[0] == x[1]])  # How many correct labels
        len_bin = len(filtered_tuples)  # How many elements falls into given bin
        avg_conf = sum([x[2] for x in filtered_tuples]) / len_bin  # Avg confidence of BIN
        accuracy = float(correct)/len_bin  # accuracy of BIN
        return accuracy, avg_conf, len_bin
  

def ECE(conf, pred, true, bin_size = 0.1):

    """
    Expected Calibration Error
    
    Args:
        conf (numpy.ndarray): list of confidences
        pred (numpy.ndarray): list of predictions
        true (numpy.ndarray): list of true labels
        bin_size: (float): size of one bin (0,1)  # TODO should convert to number of bins?
        
    Returns:
        ece: expected calibration error
    """
    
    upper_bounds = np.arange(bin_size, 1+bin_size, bin_size)  # Get bounds of bins
    
    n = len(conf)
    ece = 0  # Starting error
    
    for conf_thresh in upper_bounds:  # Go through bounds and find accuracies and confidences
        acc, avg_conf, len_bin = compute_acc_bin(conf_thresh-bin_size, conf_thresh, conf, pred, true)        
        ece += np.abs(acc-avg_conf)*len_bin/n  # Add weigthed difference to ECE
        
    return ece
        
      
def MCE(conf, pred, true, bin_size = 0.1):

    """
    Maximal Calibration Error
    
    Args:
        conf (numpy.ndarray): list of confidences
        pred (numpy.ndarray): list of predictions
        true (numpy.ndarray): list of true labels
        bin_size: (float): size of one bin (0,1)  # TODO should convert to number of bins?
        
    Returns:
        mce: maximum calibration error
    """
    
    upper_bounds = np.arange(bin_size, 1+bin_size, bin_size)
    
    cal_errors = []
    
    for conf_thresh in upper_bounds:
        acc, avg_conf, _ = compute_acc_bin(conf_thresh-bin_size, conf_thresh, conf, pred, true)
        cal_errors.append(np.abs(acc-avg_conf))
        
    return max(cal_errors)

def evaluate(probs, y_true, verbose = False, normalize = False, bins = 15):
    """
    Evaluate model using various scoring measures: Error Rate, ECE, MCE, NLL
    
    Params:
        probs: a list containing probabilities for all the classes with a shape of (samples, classes)
        y_true: a list containing the actual class labels
        verbose: (bool) are the scores printed out. (default = False)
        normalize: (bool) in case of 1-vs-K calibration, the probabilities need to be normalized.
        bins: (int) - into how many bins are probabilities divided (default = 15)
        
    Returns:
        (error, ece, mce, loss), returns various scoring measures
    """
    
    preds = np.argmax(probs, axis=1)  # Take maximum confidence as prediction
    
    if normalize:
        confs = np.max(probs, axis=1)/np.sum(probs, axis=1)
        # Check if everything below or equal to 1?
    else:
        confs = np.max(probs, axis=1)  # Take only maximum confidence
    
    accuracy = metrics.accuracy_score(y_true, preds) * 100
    error = 100 - accuracy
    
        # Calculate ECE
    ece = ECE(confs, preds, y_true, bin_size = 1/bins)
    # Calculate MCE
    mce = MCE(confs, preds, y_true, bin_size = 1/bins)
    
    loss = log_loss(y_true=y_true, y_pred=probs)
    
    y_prob_true = np.array([probs[i, idx] for i, idx in enumerate(y_true)])  # Probability of positive class
    
    if verbose:
        print("Accuracy:", accuracy)
        print("Error:", error)
        print("ECE:", ece)
        print("MCE:", mce)
        print("Loss:", loss)
    
    return (error, ece, mce, loss)


class TemperatureScaling():
    
    def __init__(self, temp = 1, maxiter = 50, solver = "BFGS"):
        """
        Initialize class
        
        Params:
            temp (float): starting temperature, default 1
            maxiter (int): maximum iterations done by optimizer, however 8 iterations have been maximum.
        """
        self.temp = temp
        self.maxiter = maxiter
        self.solver = solver
    
    def _loss_fun(self, x, probs, true):
        # Calculates the loss using log-loss (cross-entropy loss)
        scaled_probs = self.predict(probs, x)    
        loss = log_loss(y_true=true, y_pred=scaled_probs)
        return loss
    
    # Find the temperature
    def fit(self, logits, true):
        """
        Trains the model and finds optimal temperature
        
        Params:
            logits: the output from neural network for each class (shape [samples, classes])
            true: one-hot-encoding of true labels.
            
        Returns:
            the results of optimizer after minimizing is finished.
        """
        
        true = true.flatten() # Flatten y_val
        opt = minimize(self._loss_fun, x0 = 1, args=(logits, true), options={'maxiter':self.maxiter}, method = self.solver)
        self.temp = opt.x[0]
        
        return opt
        
    def predict(self, logits, temp = None):
        """
        Scales logits based on the temperature and returns calibrated probabilities
        
        Params:
            logits: logits values of data (output from neural network) for each class (shape [samples, classes])
            temp: if not set use temperatures find by model or previously set.
            
        Returns:
            calibrated probabilities (nd.array with shape [samples, classes])
        """
        
        if not temp:
            return softmax(logits/self.temp)
        else:
            return softmax(logits/temp)

class CELoss(object):

    def compute_bin_boundaries(self, probabilities = np.array([])):

        #uniform bin spacing
        if probabilities.size == 0:
            bin_boundaries = np.linspace(0, 1, self.n_bins + 1)
            self.bin_lowers = bin_boundaries[:-1]
            self.bin_uppers = bin_boundaries[1:]
        else:
            #size of bins 
            bin_n = int(self.n_data/self.n_bins)

            bin_boundaries = np.array([])

            probabilities_sort = np.sort(probabilities)  

            for i in range(0,self.n_bins):
                bin_boundaries = np.append(bin_boundaries,probabilities_sort[i*bin_n])
            bin_boundaries = np.append(bin_boundaries,1.0)

            self.bin_lowers = bin_boundaries[:-1]
            self.bin_uppers = bin_boundaries[1:]


    def get_probabilities(self, output, labels, logits):
        #If not probabilities apply softmax!
        if logits:
            self.probabilities = softmax(output, axis=1)
        else:
            self.probabilities = output

        self.labels = labels
        self.confidences = np.max(self.probabilities, axis=1)
        self.predictions = np.argmax(self.probabilities, axis=1)
        self.accuracies = np.equal(self.predictions,labels)

    def binary_matrices(self):
        idx = np.arange(self.n_data)
        #make matrices of zeros
        pred_matrix = np.zeros([self.n_data,self.n_class])
        label_matrix = np.zeros([self.n_data,self.n_class])
        #self.acc_matrix = np.zeros([self.n_data,self.n_class])
        pred_matrix[idx,self.predictions] = 1
        label_matrix[idx,self.labels] = 1

        self.acc_matrix = np.equal(pred_matrix, label_matrix)


    def compute_bins(self, index = None):
        self.bin_prop = np.zeros(self.n_bins)
        self.bin_acc = np.zeros(self.n_bins)
        self.bin_conf = np.zeros(self.n_bins)
        self.bin_score = np.zeros(self.n_bins)

        if index == None:
            confidences = self.confidences
            accuracies = self.accuracies
        else:
            confidences = self.probabilities[:,index]
            accuracies = self.acc_matrix[:,index]


        for i, (bin_lower, bin_upper) in enumerate(zip(self.bin_lowers, self.bin_uppers)):
            # Calculated |confidence - accuracy| in each bin
            in_bin = np.greater(confidences,bin_lower.item()) * np.less_equal(confidences,bin_upper.item())
            self.bin_prop[i] = np.mean(in_bin)

            if self.bin_prop[i].item() > 0:
                self.bin_acc[i] = np.mean(accuracies[in_bin])
                self.bin_conf[i] = np.mean(confidences[in_bin])
                self.bin_score[i] = np.abs(self.bin_conf[i] - self.bin_acc[i])

class MaxProbCELoss(CELoss):
    def loss(self, output, labels, n_bins = 15, logits = True):
        self.n_bins = n_bins
        super().compute_bin_boundaries()
        super().get_probabilities(output, labels, logits)
        super().compute_bins()

#http://people.cs.pitt.edu/~milos/research/AAAI_Calibration.pdf
class ECELoss(MaxProbCELoss):

    def loss(self, output, labels, n_bins = 15, logits = False):
        super().loss(output, labels, n_bins, logits)
        return np.dot(self.bin_prop,self.bin_score)

class MCELoss(MaxProbCELoss):
    
    def loss(self, output, labels, n_bins = 15, logits = True):
        super().loss(output, labels, n_bins, logits)
        return np.max(self.bin_score)

#https://arxiv.org/abs/1905.11001
#Overconfidence Loss (Good in high risk applications where confident but wrong predictions can be especially harmful)
class OELoss(MaxProbCELoss):

    def loss(self, output, labels, n_bins = 15, logits = True):
        super().loss(output, labels, n_bins, logits)
        return np.dot(self.bin_prop,self.bin_conf * np.maximum(self.bin_conf-self.bin_acc,np.zeros(self.n_bins)))


#https://arxiv.org/abs/1904.01685
class SCELoss(CELoss):

    def loss(self, output, labels, n_bins = 15, logits = True):
        sce = 0.0
        self.n_bins = n_bins
        self.n_data = len(output)
        self.n_class = len(output[0])

        super().compute_bin_boundaries()
        super().get_probabilities(output, labels, logits)
        super().binary_matrices()

        for i in range(self.n_class):
            super().compute_bins(i)
            sce += np.dot(self.bin_prop,self.bin_score)

        return sce/self.n_class

class TACELoss(CELoss):

    def loss(self, output, labels, threshold = 0.01, n_bins = 15, logits = True):
        tace = 0.0
        self.n_bins = n_bins
        self.n_data = len(output)
        self.n_class = len(output[0])

        super().get_probabilities(output, labels, logits)
        self.probabilities[self.probabilities < threshold] = 0
        super().binary_matrices()

        for i in range(self.n_class):
            super().compute_bin_boundaries(self.probabilities[:,i]) 
            super().compute_bins(i)
            tace += np.dot(self.bin_prop,self.bin_score)

        return tace/self.n_class

#create TACELoss with threshold fixed at 0
class ACELoss(TACELoss):

    def loss(self, output, labels, n_bins = 15, logits = False):
        return super().loss(output, labels, 0.0 , n_bins, logits)


def cal_results(fn, logits_val, y_val, logits_test, y_test, m_kwargs = {}, approach = "all"):
  
    if approach == "all":            

        y_val = y_val.flatten()

        model = fn(**m_kwargs)

        model.fit(logits_val, y_val)

        probs_val = model.predict(logits_val) 
        probs_test = model.predict(logits_test)
        error, ece, mce, loss = evaluate(probs_test, y_test, verbose=True)
            
        print("Error %f; ece %f; mce %f; loss %f" % evaluate(probs_val, y_val, verbose=False, normalize=True))

            
    else:  # 1-vs-k models
        probs_val = softmax(logits_val)  # Softmax logits
        probs_test = softmax(logits_test)
        K = probs_test.shape[1]
            
        # Go through all the classes
        for k in range(K):
            # Prep class labels (1 fixed true class, 0 other classes)
            y_cal = np.array(y_val == k, dtype="int")[:, 0]

            # Train model
            model = fn(**m_kwargs)
            model.fit(probs_val[:, k], y_cal) # Get only one column with probs for given class "k"

            probs_val[:, k] = model.predict(probs_val[:, k])  # Predict new values based on the fittting
            probs_test[:, k] = model.predict(probs_test[:, k])

            # Replace NaN with 0, as it should be close to zero  # TODO is it needed?
            idx_nan = np.where(np.isnan(probs_test))
            probs_test[idx_nan] = 0

            idx_nan = np.where(np.isnan(probs_val))
            probs_val[idx_nan] = 0

        # Get results for test set
        error, ece, mce, loss = evaluate(probs_test, y_test, verbose=True, normalize=True)
            
        print("Error %f; ece %f; mce %f; loss %f" % evaluate(probs_val, y_val, verbose=False, normalize=True))
            
        
    return probs_val, probs_test

In [None]:
def cost_based_threshold(k):
    t = (k)/(k+1)
    return t

def calculate_value(y_hat_proba, y, t, Vr, Vc, Vw):

    y_pred = np.array([np.where(l == np.amax(l))[0][0] if (np.amax(l) > t) else -1 for l in y_hat_proba])

    # now lets compute the actual value of each prediction
    
    value_vector = np.full(y_pred.shape[0], Vc)

    value_vector[(y_pred != y) & (y_pred != -1)] = Vw
    
    #loss due to asking humans
    value_vector[y_pred == -1] = Vr
    value = np.sum(value_vector) / len(y)

    numOfRejectedSamples = np.count_nonzero(y_pred == -1)
    numOfWrongPredictions = np.count_nonzero((y_pred != y) & (y_pred != -1))
    return value, numOfRejectedSamples, numOfWrongPredictions

def find_optimum_confidence_threshold(y_hat_proba, y, t_list, Vr, Vc, Vw):

    cost_list = {}

    for t in t_list:
        value, _ , __ = calculate_value(y_hat_proba, y, t, Vr, Vc, Vw)
        cost_list["{}".format(t)] = value
    # find t values with maximum value
    maxValue = max(cost_list.values())
    optTList = [float(k) for k, v in cost_list.items() if v == maxValue]
    # pick the one with the lowest confidence
    optimumT = min(optTList)

    return optimumT, cost_list 

#cost based calibration analysis
def cost_based_analysis(y_hat_proba_val, y_val, y_hat_proba_test, y_test, res_path, logfile_name, Vr, Vc, Vw_list, confT_list):

    # create log file
    rc_path = res_path + logfile_name + "_costBased_test.csv"
    with open(rc_path, 'w') as f:
        c = 'Vr, Vc, Vw, k, t, value, rejected, wrong, t_optimal, value_optimal, rejected_opt, wrong_opt'
        f.write(c + '\n')

    for Vw in Vw_list:
        k = (-1)*(Vw / Vc)
        t = cost_based_threshold(k)
        value_test, rej_test, wrong_test = calculate_value(y_hat_proba_test, y_test, t, Vr, Vc, Vw)

        t_optimal, cost_list = find_optimum_confidence_threshold(y_hat_proba_val, y_val, confT_list, Vr, Vc, Vw)
        value_test_opt, rej_test_opt, wrong_test_opt = calculate_value(y_hat_proba_test, y_test, t_optimal, Vr, Vc, Vw)

        with open(rc_path, 'a') as f:
            res_i = '{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}\n'.format(Vr, Vc, Vw, k, t, value_test, rej_test, wrong_test, t_optimal, value_test_opt, rej_test_opt, wrong_test_opt)
            f.write(res_i)  

In [None]:
logits_val_cal, logits_test_cal = cal_results(TemperatureScaling, logits_val, y_val, logits_test, y_test, approach = "1-vs-k")

In [None]:
cost_based_analysis(logits_val, y_val, logits_test, y_test, res_path, logfile_name, Vr, Vc, Vw_g, t_g)