In [48]:
import itertools
import pandas as pd
import numpy as np
from scipy.stats import chi2_contingency
import onnxruntime as rt

In [49]:
# constants and parameters

data_path = '../data/300k_dataset.csv'
data_path_train = '../data/investigation_train_large_checked.csv'
good_model_path = '../model/good_model.onnx'
biased_model_path = '../model/biased_model.onnx'
features = ['persoon_leeftijd_bij_onderzoek', 'persoon_geslacht_vrouw', 'contacten_onderwerp_no_show', 'persoonlijke_eigenschappen_spreektaal']

In [50]:
# functions

# Helper function to calculate false positive and false negative rates
def calculate_error_rates(group_data, true_label_column, prediction_column):
    # Calculate the confusion matrix components for each group
    tp = np.sum((group_data[true_label_column] == 1) & (group_data[prediction_column] == 1))  # True Positive
    tn = np.sum((group_data[true_label_column] == 0) & (group_data[prediction_column] == 0))  # True Negative
    fp = np.sum((group_data[true_label_column] == 0) & (group_data[prediction_column] == 1))  # False Positive
    fn = np.sum((group_data[true_label_column] == 1) & (group_data[prediction_column] == 0))  # False Negative

    # Compute the error rates
    false_positive_rate = fp / (fp + tn) if (fp + tn) > 0 else 0
    false_negative_rate = fn / (fn + tp) if (fn + tp) > 0 else 0
    
    return false_positive_rate, false_negative_rate

# Helper function to calculate the predictive parity for each group
def calculate_predictive_parity(group_data, true_label_column, prediction_column):
    # Calculate the precision for each group where the predicted value is 1 (positive)
    tp = np.sum((group_data[true_label_column] == 1) & (group_data[prediction_column] == 1))  # True Positive
    fp = np.sum((group_data[true_label_column] == 0) & (group_data[prediction_column] == 1))  # False Positive

    # Compute the predictive parity (precision for positive predictions)
    predictive_parity = tp / (tp + fp) if (tp + fp) > 0 else 0
    
    return predictive_parity

# Function to perform the combinational test for Equalized Odds
def evaluate_equalized_odds(data, predictions, features, true_label_column, fairness_threshold=0.05):
    # Add the predictions to the original data
    data['predictions'] = predictions

    # Generate a combination column representing feature combinations
    feature_combinations = list(itertools.product(*[data[feature].unique() for feature in features]))
    combination_to_label = {combination: chr(65 + i) for i, combination in enumerate(feature_combinations)}
    data['combination'] = data[features].apply(lambda row: combination_to_label[tuple(row)], axis=1)

    # Calculate error rates for each group in each combination
    error_rates = {}
    for combination in data['combination'].unique():
        group_data = data[data['combination'] == combination]
        false_positive_rate, false_negative_rate = calculate_error_rates(group_data, true_label_column, 'predictions')
        error_rates[combination] = (false_positive_rate, false_negative_rate)
    
    # Calculate the difference in false positive and false negative rates between groups
    max_fp_diff = max([abs(error_rates[comb][0] - error_rates[other_comb][0]) for comb in error_rates for other_comb in error_rates])
    max_fn_diff = max([abs(error_rates[comb][1] - error_rates[other_comb][1]) for comb in error_rates for other_comb in error_rates])

    # Check if the maximum differences exceed the fairness threshold
    fp_fair = max_fp_diff < fairness_threshold
    fn_fair = max_fn_diff < fairness_threshold

    # Return whether fairness is met for both false positive and false negative rates
    fairness_result = fp_fair and fn_fair

    return max_fp_diff, max_fn_diff, fairness_result

# Function to perform the combinational test for Predictive Parity
def evaluate_predictive_parity(data, predictions, features, true_label_column, fairness_threshold=0.05):
    # Add the predictions to the original data
    data['predictions'] = predictions

    # Generate a combination column representing feature combinations
    feature_combinations = list(itertools.product(*[data[feature].unique() for feature in features]))
    combination_to_label = {combination: chr(65 + i) for i, combination in enumerate(feature_combinations)}
    data['combination'] = data[features].apply(lambda row: combination_to_label[tuple(row)], axis=1)

    # Calculate predictive parity for each group in each combination
    predictive_parity_values = {}
    for combination in data['combination'].unique():
        group_data = data[data['combination'] == combination]
        predictive_parity = calculate_predictive_parity(group_data, true_label_column, 'predictions')
        predictive_parity_values[combination] = predictive_parity

    # Calculate the difference in predictive parity between groups
    max_parity_diff = max([abs(predictive_parity_values[comb] - predictive_parity_values[other_comb]) 
                           for comb in predictive_parity_values for other_comb in predictive_parity_values])

    # Check if the maximum difference in predictive parity exceeds the fairness threshold
    parity_fair = max_parity_diff < fairness_threshold

    # Return the maximum predictive parity difference and whether the model meets fairness criteria
    return max_parity_diff, parity_fair

# Combinational test function for Equalized Odds
def combinational_test_equalized_odds(data_path: str, model_path: str, features: list):

    true_label_column = 'checked'

    # Load the data
    data = pd.read_csv(data_path).drop(['checked', 'Ja', 'Nee'], axis=1).astype(np.float32)
    data_with_labels = pd.read_csv(data_path).drop(['Ja', 'Nee'], axis=1).astype(np.float32)

    # Load the ONNX model
    session = rt.InferenceSession(model_path)
    input_name = session.get_inputs()[0].name

    # Run the model to get predictions for all data
    predictions = session.run(None, {input_name: data.values.astype(np.float32)})[0]
    print(f"the shape of prediction is: {predictions.shape}")
    print(f"the type of prediction is: {type(predictions)}")

    # Evaluate the result using the updated evaluate function for Equalized Odds
    max_parity_diff, fairness_result = evaluate_predictive_parity(data_with_labels, predictions, features, true_label_column)

    # Return the evaluation results
    return max_parity_diff, fairness_result 

In [51]:
max_parity_diff, fairness_result  = combinational_test_equalized_odds(data_path, good_model_path, features)
print(f"maximum predictive parity difference is: {max_parity_diff}")
if fairness_result:
    print("The model meets the fairness criteria for Predictive Parity")
else:
    print("The model does not meet the fairness criteria for Predictive Parity")

the shape of prediction is: (300000,)
the type of prediction is: <class 'numpy.ndarray'>
maximum predictive parity difference is: 1.0
The model does not meet the fairness criteria for Predictive Parity


In [52]:
max_parity_diff, fairness_result  = combinational_test_equalized_odds(data_path, biased_model_path, features)
print(f"maximum predictive parity difference is: {max_parity_diff}")
if fairness_result:
    print("The model meets the fairness criteria for Predictive Parity")
else:
    print("The model does not meet the fairness criteria for Predictive Parity")

the shape of prediction is: (300000,)
the type of prediction is: <class 'numpy.ndarray'>
maximum predictive parity difference is: 1.0
The model does not meet the fairness criteria for Predictive Parity
