In [6]:
import pandas as pd
import numpy as np
from sklearn.metrics import confusion_matrix
import scipy.stats as stats

# Optimisation
from pymoo.model.repair import Repair
from pymoo.model.problem import Problem
from pymoo.optimize import minimize
from pymoo.algorithms.nsga2 import NSGA2
from pymoo.factory import *
from pymoo.model.sampling import Sampling

ModuleNotFoundError: No module named 'pymoo'

In [None]:
#define the results of each of the used classifier as input instade of inputting only one classifier result
df1 = pd.read_csv("nn.csv")
df2 = pd.read_csv("nb.csv")
dfs = [df1 , df2]
inputs = {
    "id_column": "id",
    "ground_truth_column": "ground_truth",
    "reject_label": "REJECT",
    "min": 0,
    "max": 1,
    
}

# Core functions

In [None]:
def calculate_matches(all_probabilities, lower, upper):
    not_match = np.less(all_probabilities[0], lower)
    match = np.greater_equal(all_probabilities[0], upper)
    for prob in all_probabilities[1:]:
        not_match_temp = np.less(prob, lower)
        not_match = np.logical_or(not_match, not_match_temp)
        
        match_temp = np.greater_equal(prob, upper)
        match = np.logical_or(match, match_temp)
    
    rejects = ~np.logical_xor(not_match, match)
    return np.stack([not_match, match, rejects])

def apply_thresholds(all_probabilities, lower, upper):
    temp = calculate_matches(all_probabilities, lower, upper)
    def value(x):
        return np.where(x == True)[0][0]
    return np.apply_along_axis(value, 0, temp)
 
def calculate_confusion_matrices(gt, pred):
    result = np.zeros((gt.shape[1],3,3), dtype=np.int)
    for i,(x,y) in enumerate(zip(gt.T,pred.T)):
        matrix = confusion_matrix(x,y)
        if np.shape(matrix) < (3,3):
            matrix = np.c_[matrix, np.zeros(2)]
            matrix = np.r_[matrix, [np.zeros(3)]]
        elif np.shape(matrix) > (3, 3):
            raise ValueError("Matrix should be 3x3, error in input labels")

        # Column order: matches, not match, rejects
        matrix[:,[0,1]] = matrix[:,[1, 0]]
        # Row order: matches, not match, rejects 
        matrix[[0,1], :] = matrix[[1, 0], :]

        result[i] = matrix
    return result

def normalise_probs_in_place(df, inputs, labels):
    if inputs["min"] == 0 and inputs["max"] == 100:
        for label in labels:
            if label == inputs["reject_label"]:
                continue
            if (df[label] < 1).any() and (df[label] >= 0).any():
                return 
            df[label] = df[label] / 100      
    elif inputs["min"] == 0 and inputs["max"] == 1:
        return
    else:
        raise ValueError("Normalisation rule not specified")

def derive_probabilities(dfs, inputs):
    if "probability_column" in inputs:
        ##NA##
        df = dfs[0]
        probabilities = pd.DataFrame()
        probabilities[inputs["target_label"]] = df[inputs["probability_column"]]
        probabilities["id"] = df[inputs["id_column"]]
    else:
        labels = list(retrieve_labels(dfs[0], inputs))
        labels.insert(0, inputs["id_column"])
        all_probabilities = []
        for df in dfs:
            probabilities = df[labels]
            probabilities = probabilities.rename(columns={inputs["id_column"]: "id"})
            probabilities.drop_duplicates("id")
            all_probabilities.append(probabilities)
    return all_probabilities

def retrieve_labels(df, inputs):
    return df[inputs["ground_truth_column"]].str.strip().sort_values().unique()

def check_labels_have_columns(df, labels):
    return len(set(df.columns) & set(labels)) == len(labels)

def prepare_labels(df, inputs):
    labels = retrieve_labels(df[0], inputs)

    if "target_label" in inputs:
        labels = list(filter(lambda x: x == inputs["target_label"], labels))
    
    if not check_labels_have_columns(df[0], labels) and not "probability_column" in inputs:
        raise ValueError("Labels do not have column names for probabilities")   
        
    return sorted(labels)


def prepare_ground_truth(df, inputs, mapping):
    ids = []
    truth_columns = inputs["ground_truth_column"]

    if not "target_label" in inputs:
        all_labels = []
        for a_id in df[inputs["id_column"]].unique():
            ground_truth_labels = df[df[inputs["id_column"]] == a_id][truth_columns]
            indexes = [mapping[s.strip()] for s in list(ground_truth_labels)]     
            ground_truth = np.zeros(len(mapping), dtype=int)
            ground_truth[indexes] = 1
            ids.append(a_id)
            all_labels.append(ground_truth)
        columns = list(mapping.keys())
        ground_truth = pd.DataFrame(all_labels)
        ground_truth.columns = columns
    else:
        ids = df[inputs["id_column"]].unique()
        ground_truth = pd.DataFrame()
        ground_truth[inputs["target_label"]] = (df[truth_columns] == inputs["target_label"]) * 1
    ground_truth["id"] = ids    
    return ground_truth


def get_objective(all_probabilities, ground_truth, lower_thresholds, upper_thresholds):
    """
    returns: (all confusion matrices, thresholds) 
    """
    thres = apply_thresholds(all_probabilities, lower_thresholds, upper_thresholds)  
    all_results = calculate_confusion_matrices(ground_truth, thres)
    return all_results, thres

def summarise_results(all_results):
    """
    returns Summary array of true match, 
    false match, missed match, and rejects, 
    """
    return np.array([all_results[:,0][:,0].sum(),
            all_results[:,1][:,0].sum(),
            all_results[:,0][:,1].sum(),
            all_results[:,:,2].sum()])

# Usage guide 

In [None]:

labels = prepare_labels(dfs, inputs)

# Probabilities
normalise_probs_in_place(dfs, inputs, labels)
probabilities = derive_probabilities(dfs, inputs)

# Ground truth
mapping = {label: i for i, label in enumerate(labels)}
ground_truth = prepare_ground_truth(dfs[0], inputs, mapping)

np_probs = [p[labels].to_numpy() for p in probabilities]
np_ground_truth = ground_truth[labels].to_numpy()
print(np_probs[0].shape)
print(np_ground_truth.shape)

In [None]:

low_threshold = np.full(2, 0.75, dtype=np.double)
up_threshold = np.full(2, 0.25, dtype=np.double)

all_matrices, thresholded = get_objective(np_probs, np_ground_truth, low_threshold, up_threshold)
all_matrices



In [None]:
#true match, false match, missed match, and rejects,
summarise_results(all_matrices)

# Cost calculations

In [None]:
params = {
  "cost": {
    "matrix" : [[50,500, 50],[5000,50,50], [0,0,0]],
    "portion_size" : 1000,
    "estimate_size":10000    # Estimate the cost for classifying N # of items
  }
}

def calculate_cost(predicted_matrix, cost_weights, portion_size, estimate_size):
    cost_per_prediction = np.array(cost_weights) / portion_size
    scaled_matrix = np.round(predicted_matrix.astype(np.double) / predicted_matrix.sum() * estimate_size)
    return scaled_matrix * cost_per_prediction

# Usage guide
calculate_cost(all_matrices[0], params["cost"]["matrix"], params["cost"]["portion_size"], params["cost"]["estimate_size"])

In [None]:
def calculate_all_costs(all_matrices, cost_weights, portion_size, estimate_size):
    cost_results = np.zeros((len(all_matrices), 3, 3))
    for i, a in enumerate(all_matrices):
        cost_results[i] = calculate_cost(a, cost_weights, portion_size, estimate_size)
    return cost_results

# Usage guide
all_costs = calculate_all_costs(all_matrices, params["cost"]["matrix"], params["cost"]["portion_size"], params["cost"]["estimate_size"])
all_costs

In [None]:
summarise_results(all_costs)

# Auto-Recommender

In [None]:
class MyRepair(Repair):

    def _do(self, problem, pop, **kwargs):
        for i in range(len(pop)):
            x = pop[i].X
            pop[i].X = np.around(x, decimals=2)
        return pop

class FindThresholds(Problem):
    def __init__(self, all_probabilities, ground_truth, **kwargs):
        self.all_probabilities = all_probabilities     
        self.ground_truth = ground_truth
        n_var = all_probabilities[0].shape[1]
        super().__init__(n_var=n_var, 
                         n_constr=0, 
                         n_obj=4, 
                         xl=anp.zeros((n_var,), dtype=anp.int), 
                         xu=anp.ones((n_var,), dtype=anp.int),  
                         type_var=anp.int, 
                         elementwise_evaluation=True)
    
    def _evaluate(self, X, out, *args, **kwargs):
        f0 = []
        f1 = []
        f2 = []
        f3 = []
        thresholds = X 
        all_matrices, thresholded = get_objective(self.all_probabilities, self.ground_truth, thresholds[0], thresholds[1])
        
      
        true_matches, false_matches, missed_matches, rejects = summarise_results(all_matrices)
        
        f0.append(true_matches * -1)   # True matches need to be maximised, pymoo only minimises
        f1.append(false_matches)
        f2.append(missed_matches)
        f3.append(rejects)
            
        out["F"] = anp.column_stack([f0, f1, f2, f3]).astype(anp.double)
        


In [None]:
class BetaDistribution(Sampling):

    def __init__(self):
        super().__init__()

    def _do(self, problem, n_samples, **kwargs):        
        return stats.beta(a=4, b=2).rvs(size=(n_samples, problem.n_var))
    
algorithm = NSGA2(
    pop_size=40,
    n_offsprings=10,
    repair=MyRepair(),
    sampling= BetaDistribution(), #get_sampling("int_lhs"),
    crossover=get_crossover("real_sbx", prob=0.9, eta=15),
    mutation=get_mutation("real_pm", eta=20),
)    

problem = FindThresholds(np_probs, np_ground_truth, parallelizable=("threads", 4))

In [None]:
import timeit

results = None

def value():
    global results
    results = minimize(problem,
               algorithm,
               ("n_gen", 10),
               save_history=True,
               verbose=True)
timeit.timeit(value,number=1)

In [None]:
F = results.F
weights = np.array([0.5,0.25,0.25,0])
I = get_decomposition("weighted-sum").do(F, weights).argmin()

# Defails of Final Results

In [5]:
print("Values from the final population ")
results.pop.get("X"), results.pop.get("F")

Values from the final population 


NameError: name 'results' is not defined

In [3]:
lower, upper = results.X[I]
print("Optimisation Results")
print("Best Lower Threshold:", lower)
print("Best Upper Threshold:", upper)

NameError: name 'results' is not defined

In [4]:
low_threshold = np.full(2, lower, dtype=np.double)
up_threshold = np.full(2, upper, dtype=np.double)
# True match, false match, missed match, rejects
all_matrices, thresholded = get_objective(np_probs, np_ground_truth, low_threshold, up_threshold)
true_match, false_match, missed_match, rejects = summarise_results(all_matrices)
print("True Match:",true_match)
print("False Match:",false_match)
print("Missed Match:",missed_match)
print("Rejects:",rejects)

NameError: name 'lower' is not defined