In [None]:
# for auto-reloading external modules
# see http://stackoverflow.com/questions/1907993/autoreload-of-modules-in-ipython
%load_ext autoreload
%autoreload 2

In [None]:
import collections

import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve, auc
from sklearn.calibration import calibration_curve 

In [None]:
!echo $PWD

In [None]:
# Constants

console=True
real_gt_filepath='../ground_truth.txt'
gt_filepath='../test_gt_yes_no.txt'
predictions_filepath='../../test_results.tsv'  # This should point to the test_results.txt file
examples_to_spit_out=10

# Read Ground Truth and Results

In [None]:
def read_ground_truth(file, label_list, label_separator, columns, label_column_idx=-1, delimiter="\t", to_console=False, examples_to_spit_out=10):
    def convert_to_digit(elem):
        assert isinstance(elem, str)
        digit = -1
        if elem == "yes" or elem == "no":
            digit = 1 if elem == "yes" else 0
        else:
            digit = int(elem)
        return digit


    grouth_truth_labels = []
    examples_per_label = collections.Counter()
    examples_read = 0
    expected_columns=len(columns)
    print(f"Getting ground truth from file: {file}")
    with open(file, "r") as reader:
        while True:
            line_attributes = reader.readline().rstrip().split(delimiter)
            if len(line_attributes) != expected_columns:
                break

            example_labels = line_attributes[label_column_idx]
            example_label_list = example_labels.split(label_separator)
            example_label_list = map(lambda x: convert_to_digit(x), example_label_list)
            example_active_labels = list(set(label_list) & set(example_label_list))
            if to_console and examples_read < examples_to_spit_out:
                line_to_print=""
                for i, col_name in enumerate(columns):
                    if i == len(columns) - 1 or i==label_column_idx:
                        line_to_print+=f" {col_name} -> {line_attributes[i][:100]}"
                    else:
                        line_to_print+=f" {i}) {col_name} = {line_attributes[i][:100]}..."
                print(line_to_print)
            examples_read += 1
            for label in example_active_labels:
                examples_per_label[label] += 1
            grouth_truth_labels.append(example_active_labels if len(example_active_labels) > 1 else example_active_labels[0])
    print(f"Examples per label transformed ground truth: {examples_per_label}. Sum: {sum(examples_per_label.values())}")
    return grouth_truth_labels, examples_per_label

def read_binary_preds(file, gt, delimiter="\t", pos_label_idx=0, threshold=0.5, to_console=False):
    y_preds_scores_pos_label = []
    y_preds_probas = np.empty((0, 2))

    with open(file, "r") as reader:
        while True:
            line_attributes = reader.readline().rstrip().split(delimiter)
            if len(line_attributes) != 2:
                break
            np.testing.assert_almost_equal(float(line_attributes[0]) + float(line_attributes[1]), 1.0)
            if float(line_attributes[pos_label_idx]) >= threshold:
                y_preds_scores_pos_label.append(1)
            else:
                y_preds_scores_pos_label.append(0)

            y_probas = [float(line_attributes[0]), float(line_attributes[1])]
            y_preds_probas = np.append(y_preds_probas, [y_probas], axis=0)
    if to_console:
        for i in range(0, 10):
            print(f"GT / Prediction Pos Label / Probas: {gt[i]} / {y_preds_scores_pos_label[i]} / {y_preds_probas[i]}")

    return y_preds_scores_pos_label, y_preds_probas


In [None]:
gt_labels, examples_per_label = read_ground_truth(gt_filepath, [0, 1], ",", ["url", "title", "body", "labels"], to_console=console, examples_to_spit_out=examples_to_spit_out)

In [None]:
yes_perc = examples_per_label[1]/(examples_per_label[0] + examples_per_label[1])
no_perc = examples_per_label[0]/(examples_per_label[0] + examples_per_label[1])

In [None]:
print(f" Yes/No %: {yes_perc}/{no_perc}")

In [None]:
y_preds_pos_label, y_probas = read_binary_preds(predictions_filepath, gt_labels, to_console=console)

In [None]:
y_probas_pos_label = y_probas[:,0]  # Probability of the "yes" class

In [None]:
y_probas_pos_label

# As Binary problem...

In [None]:
# calculate the values of calibration curve for bin pos_label vs all
prob_true_binary, prob_pred_binary = calibration_curve(gt_labels, y_probas_pos_label, n_bins=10, normalize=False)

def plot_reliability_diagram(prob_true, prob_pred, model_name, ax=None):
    # Plot the calibration curve for ResNet in comparison with what a perfectly calibrated model would look like
    if ax==None:
        fig = plt.figure(figsize=(10, 10))
        ax = plt.gca()
    else:
        plt.sca(ax)
    
    plt.plot([0, 1], [0, 1], color="#FE4A49", linestyle=":", label="Perfectly calibrated model")
    plt.plot(prob_pred, prob_true, "s-", label=model_name, color="#162B37")

    plt.ylabel("Fraction of positives", fontsize=16)
    plt.xlabel("Mean predicted value", fontsize=16,)

    plt.legend(fontsize=16)
    plt.xticks(fontsize=14)
    plt.yticks(fontsize=14)

    plt.grid(True, color="#B2C7D9")

    plt.tight_layout()

In [None]:
prob_true_binary, prob_pred_binary

In [None]:
plot_reliability_diagram(prob_true_binary, prob_pred_binary, "pos class (Yes, 1) vs all")

## Expected calibration error


In [None]:
# complete this function to calculate ece
def ece_calculation_binary(prob_true, prob_pred, bin_sizes):
    ece = 0
    for m in np.arange(len(bin_sizes)):
        ece = ece + (bin_sizes[m] / sum(bin_sizes)) * np.abs(prob_true[m] - prob_pred[m])
    return ece

# print the calculated ece
n_bins_binary = len(prob_true_binary)
pred_hist = np.histogram(a=y_preds_pos_label, range=(0, 1), bins=n_bins_binary)[0]
print(ece_calculation_binary(prob_true_binary, prob_pred_binary, pred_hist))

## Maximum calibration error

In [None]:
def mce_calculation_binary(prob_true, prob_pred, bin_sizes):
    mce = 0
    for m in np.arange(len(bin_sizes)):
        mce = max(mce, np.abs(prob_true[m] - prob_pred[m]))
    return mce

#print the calculated mce
print(mce_calculation_binary(prob_true_binary, prob_pred_binary, pred_hist))

## RMSE calibration error

In [None]:
def rmsce_calculation_binary(prob_true, prob_pred, bin_sizes):
    ### YOUR CODE HERE 
    rmsce = 0
    for m in np.arange(len(bin_sizes)):
        rmsce = rmsce + (bin_sizes[m] / sum(bin_sizes)) * (prob_true[m] - prob_pred[m]) ** 2
    return np.sqrt(rmsce)

# print the calculated rmsce
print(rmsce_calculation_binary(prob_true_binary, prob_pred_binary, pred_hist))

# As Multiclass problem

In [None]:
y_gt_multiclass = np.empty((0, 2))
for y_gt in gt_labels:
    if y_gt==1:
        y_gt_multiclass = np.vstack((y_gt_multiclass, [1,0]))
    else:
        y_gt_multiclass = np.vstack((y_gt_multiclass, [0,1]))

print(y_gt_multiclass.shape)
print(y_gt_multiclass)

In [None]:
def ece_calculation_multiclass(y_true, y_pred):
    ### use calibration_curve and your binary function to complete this function
    ece_bin = []
    for a_class in range(y_true.shape[1]):
        prob_true, prob_pred = calibration_curve(y_true[:, a_class], y_pred[:, a_class], n_bins=10)
        plot_reliability_diagram(prob_true, prob_pred, f"Class {a_class}")        
        bin_sizes = np.histogram(a=y_pred[:, a_class], range=(0, 1), bins=len(prob_true))[0]
        ece_bin.append(ece_calculation_binary(prob_true, prob_pred, bin_sizes))
    ## here we have a choice - do we wish to weight our metric depending on the number
    ## of positive examples in each class, or take an unweighted mean
    
#     return sum(ece_bin*class_weights)/n_classes
    return sum(ece_bin*np.array([yes_perc, no_perc]))/2
#     return np.mean(ece_bin)
        
    
def mce_calculation_multiclass(y_true, y_pred):
    ### use calibration_curve and your binary function to complete this function
    mce_bin = []
    for a_class in range(y_true.shape[1]):
        prob_true, prob_pred = calibration_curve(y_true[:, a_class], y_pred[:, a_class], n_bins=10)
        print(prob_true, prob_pred)
        plot_reliability_diagram(prob_true, prob_pred, f"Class {a_class}")
        bin_sizes = np.histogram(a=y_pred[:, a_class], range=(0, 1), bins=len(prob_true))[0]
        mce_bin.append(mce_calculation_binary(prob_true, prob_pred, bin_sizes))
    ## here we have a choice - do we wish to weight our metric depending on the number
    ## of positive examples in each class, or take an unweighted mean
    
    # return sum(ece_bin*class_weights)/n_classes
    return sum(mce_bin*np.array([yes_perc, no_perc]))/2
#     return np.mean(mce_bin)
    
def rmsce_calculation_multiclass(y_true, y_pred):
    ### use calibration_curve and your binary function to complete this function
    rmsce_bin = []
    for a_class in range(y_true.shape[1]):
        prob_true, prob_pred = calibration_curve(y_true[:, a_class], y_pred[:, a_class], n_bins=10)
        plot_reliability_diagram(prob_true, prob_pred, f"Class {a_class}")
        bin_sizes = np.histogram(a=y_pred[:, a_class], range=(0, 1), bins=len(prob_true))[0]
        rmsce_bin.append(rmsce_calculation_binary(prob_true, prob_pred, bin_sizes))
    ## here we have a choice - do we wish to weight our metric depending on the number
    ## of positive examples in each class, or take an unweighted mean
    
    # return sum(ece_bin*class_weights)/n_classes
    return sum(rmsce_bin*np.array([yes_perc, no_perc]))/2    
#     return np.mean(rmsce_bin)

In [None]:
# Just a check
for x,y in y_probas:
    np.testing.assert_almost_equal(x+y, 1.0, err_msg="Yay!")

In [None]:
ece_calculation_multiclass(y_gt_multiclass, y_probas)

In [None]:
mce_calculation_multiclass(y_gt_multiclass, y_probas)

In [None]:
rmsce_calculation_multiclass(y_gt_multiclass, y_probas)