In [1]:
import os
import csv
# warning filters
import warnings
warnings.filterwarnings("ignore", message="Pandas requires version")
warnings.filterwarnings("ignore", message="A NumPy version >=")

# general imports
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

# anonymity library
import pycanon
from anjana.anonymity import k_anonymity

# ML models
from xgboost import XGBClassifier as XGB

# our generic functions
from utils import get_metrics, get_generalization_levels, get_train_test_data

# our data-specific functions
from utils import clean_process_data, get_hierarchies
import config_experiments as cfg

def write_results_to_csv(values, header=False):
    """Write the results to a csv file."""

    file_path = "results/test.csv"
    # Check if the file exists and is empty
    file_exists = os.path.isfile(file_path)
    file_empty = os.stat(file_path).st_size == 0 if file_exists else True

    with open(file_path, mode='a', newline='') as scores_file:
        scores_writer = csv.writer(scores_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
        if header and file_empty:# Write header if specified and file is empty
            scores_writer.writerow(["SEED", "dataset", "protected_att", "target", "method", "k_parameter", "anon_parameter", "SPD", "EOD", "MAD", "PED", "PRD", "ACC", "f1", "Precision", "Recall", "ROC_AUC", "CM"])
        if not header: # Write the actual values
            scores_writer.writerow(values)

# Main execution
write_results_to_csv([], header=True)

# Define the parameters
dataset = 'bank'
method = 'k-anonymity'

# Get parameters from config file
supp_level = cfg.supp_level[1]
lst_k = cfg.lst_k
max_seed = 1
test_size = cfg.test_size
if dataset == 'adult':
    lst_threshold_target = cfg.adult_threshold_target
else:
    lst_threshold_target = [None]

# Loop over several threshold targets (same dataset but with different Y distribution)
for threshold_target in lst_threshold_target:
    print(f"Threshold target: {threshold_target}")

    # read data
    if dataset == 'adult':

        # Sensitive/target and protected attributes
        sens_att = "income"
        protected_att = "gender"
        
        # Read and process the data
        data = clean_process_data(pd.read_csv("adult_reconstruction.csv"), dataset, sens_att, protected_att, threshold_target)

    elif dataset == 'bank':

        # Sensitive/target and protected attributes
        sens_att = "y"
        protected_att = "age"
        
        # Read and process the data
        data = clean_process_data(pd.read_csv("bank-full.csv", delimiter=';'), dataset, sens_att, protected_att, threshold_target)

    # Import/defining the hierarquies for each quasi-identifier. 
    hierarchies = get_hierarchies(data, dataset)

    # Define the quasi-identifiers and the sensitive/protected attribute
    quasi_ident = list(set(data.columns) - {protected_att} - {sens_att})

    # Loop over several seeds
    SEED = 0
    while SEED < max_seed:
        print(f"SEED: {SEED}")

        # Loop over several k values
        for k in lst_k:
            print(f"k: {k}")

            try:
                # Split into train and test data
                train_data, test_data = train_test_split(data, test_size=test_size, random_state=SEED)

                # Anonymize data
                train_data_anon = k_anonymity(train_data, [], quasi_ident, k, supp_level, hierarchies)
                if 'index' in train_data_anon.columns:
                    del train_data_anon['index'] 

                # Assert that the level of k-anonymity is at least k
                actual_k_anonymity = pycanon.anonymity.k_anonymity(train_data_anon, quasi_ident)
                assert actual_k_anonymity >= k, f"k-anonymity constraint not met: Expected >= {k}, but got {actual_k_anonymity}"

                if k > 1:
                    # Get generalization levels of the training set to apply the same to the test set
                    generalization_levels = get_generalization_levels(train_data_anon, quasi_ident, hierarchies)

                    # Apply the same generalization levels to the test data (Except for the protected attribute: for fairness measurements)
                    for col in set(quasi_ident) - {protected_att}:
                        level = generalization_levels.get(col)
                        
                        if level is not None:
                            # Retrieve the mapping dictionary for this level
                            hierarchy_mapping = dict(zip(hierarchies[col][0], hierarchies[col][level]))
                            
                            # Apply the mapping to the test data
                            test_data[col] = test_data[col].map(hierarchy_mapping)

                # Separate features and target
                X_train, y_train, X_test, y_test = get_train_test_data(train_data_anon, test_data, sens_att)

                # Train the model
                model = XGB(random_state=SEED, n_jobs=-1)
                model.fit(X_train, y_train)

                # Get fairness/utility metrics
                df_fm = test_data.copy()
                df_fm['y_pred'] = np.round(model.predict(X_test)).reshape(-1).astype(int)
                dic_metrics = get_metrics(df_fm, protected_att, sens_att)
                print(dic_metrics)

                # Write results to csv
                write_results_to_csv([SEED, dataset + "_" + str(threshold_target), protected_att, sens_att, method, k, k] + list(dic_metrics.values()))

            except Exception as e:
                    print(f"An error occurred for SEED {SEED}, k {k}: {e}")
                    continue
        
        SEED += 1
        print('-------------------------------------------------------------\n')
    print('=============================================================\n')

Threshold target: None
SEED: 0
k: 1
The data verifies k-anonymity with k=1
{'SPD': -0.25787328569407647, 'EOD': -0.20987032184371435, 'MAD': 0.22722797640212733, 'PED': -0.18074063313039002, 'PRD': 0.03652919501133778, 'ACC': 0.8953848422294308, 'f1': 0.4005069708491762, 'Precision': 0.6139896373056994, 'Recall': 0.29717868338557996, 'ROC_AUC': 0.63614051555861, 'CM': array([[11671,   298],
       [ 1121,   474]], dtype=int64)}
k: 2
{'SPD': -0.24802092533330272, 'EOD': -0.24155076261506417, 'MAD': 0.2262831247682079, 'PED': -0.17175324384854185, 'PRD': -0.01146973419700692, 'ACC': 0.8898554998525509, 'f1': 0.3401060070671378, 'Precision': 0.5754857997010463, 'Recall': 0.2413793103448276, 'ROC_AUC': 0.6088256732190342, 'CM': array([[11685,   284],
       [ 1210,   385]], dtype=int64)}
k: 3
{'SPD': -0.19968676997426868, 'EOD': -0.19250151179197741, 'MAD': 0.24596299354996576, 'PED': -0.1453998634169035, 'PRD': 0.07113870717374426, 'ACC': 0.8900766735476261, 'f1': 0.273037542662116, 'Prec

  _warn_prf(average, modifier, msg_start, len(result))


{'SPD': -0.14969135802469136, 'EOD': -0.1859504132231405, 'MAD': 0.27950546551506605, 'PED': -0.12807881773399016, 'PRD': -0.4639175257731959, 'ACC': 0.8818932468298437, 'f1': 0.053191489361702135, 'Precision': 0.4639175257731959, 'Recall': 0.02821316614420063, 'ROC_AUC': 0.5119343046862703, 'CM': array([[11917,    52],
       [ 1550,    45]], dtype=int64)}
k: 50


KeyboardInterrupt: 

In [16]:
pd.cut(data["balance"], bins=[-float("inf"), 500, 5000, float("inf")]).astype(str)[0]

'(500.0, 5000.0]'

In [3]:
train_data

Unnamed: 0,age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,y
24951,1,management,married,primary,no,1021,no,no,unknown,18,nov,205,1,-1,0,unknown,0
22129,1,blue-collar,married,unknown,no,196,no,no,cellular,21,aug,168,2,-1,0,unknown,0
986,1,admin.,married,secondary,no,159,yes,no,unknown,7,may,216,2,-1,0,unknown,0
14999,1,management,married,primary,no,1880,yes,no,cellular,17,jul,63,6,-1,0,unknown,0
4027,1,technician,divorced,tertiary,no,647,yes,no,unknown,16,may,512,6,-1,0,unknown,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
30403,1,management,single,tertiary,no,995,no,no,cellular,5,feb,39,1,-1,0,unknown,0
21243,1,management,married,tertiary,no,750,yes,no,cellular,18,aug,233,12,-1,0,unknown,0
42613,1,management,married,tertiary,no,323,no,no,cellular,11,jan,261,2,-1,0,unknown,1
43567,0,retired,married,secondary,no,616,no,no,cellular,27,apr,149,2,182,1,failure,0


In [4]:
k_anonymity(train_data, quasi_ident, 2, supp_level, hierarchies)

BeartypeCallHintParamViolation: Function anjana.anonymity._k_anonymity.k_anonymity() parameter quasi_ident=2 violates type hint typing.Union[list, numpy.ndarray], as int 2 not <protocol "numpy.ndarray"> or list.