In [None]:
#import required libraries
import os
!pip install dice_ml
import dice_ml
from dice_ml.utils import helpers
import pandas as pd
import numpy as np
import pickle
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from sklearn.metrics import roc_curve
from sklearn.metrics import classification_report
from raiutils.exceptions import UserConfigValidationException
from sklearn.model_selection import train_test_split
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.model_selection import RandomizedSearchCV
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

In [None]:
#import train and test sets (same splitting used for MC-SVDD)
df_train=pd.read_csv("./Train.csv")
df_train=df_train.rename(columns={"tot_chol_mmol_L_": "totChol"})
df_test=pd.read_csv('./Test.csv')
df_test=df_test.rename(columns={"tot_chol_mmol_L_": "totChol"})

df_test['sogliaFRS'] = df_test['sogliaFRS'] -  1
df_train['sogliaFRS'] = df_train['sogliaFRS'] -  1

x_train = df_train.iloc[:,0:13]
y_train = df_train.iloc[:,13]

x_test = df_test.iloc[:,0:13]
y_test = df_test.iloc[:,13]


In [None]:
df = pd.concat([df_train,df_test])
outcome_name = "sogliaFRS"
target = df[outcome_name]
X = df.drop(["sogliaFRS"], axis=1)
numerical = ['SBP','DBP','BMI','LDL','HDL','TRIG','FBS','AgeAtOnset','totChol'] #specify numerical columns
categorical = X.columns.difference(numerical) #specify categorical columns as the difference

In [None]:
d = dice_ml.Data(dataframe=df,
                 continuous_features=numerical,
                 outcome_name=outcome_name)

In [None]:
def compute_fpr_multiclass(confusion_matrix):
    # Number of classes
    num_classes = confusion_matrix.shape[0]

    fpr = []
    for i in range(num_classes):
        # Extracting False Positives and True Negatives for class i
        FP = confusion_matrix[:, i].sum() - confusion_matrix[i, i]
        TN = confusion_matrix.sum() - (confusion_matrix[i, :].sum() + confusion_matrix[:, i].sum() - confusion_matrix[i, i])

        FPR = FP / (FP + TN)
        fpr.append(FPR)

    return fpr

In [None]:
numeric_transformer = Pipeline(steps=[
    ('scaler', StandardScaler())])

categorical_transformer = Pipeline(steps=[
    ('onehot', OneHotEncoder(handle_unknown='ignore'))])

transformations = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numerical),
        ('cat', categorical_transformer, categorical)])

kernels = ['linear','poly', 'rbf','sigmoid']
degrees = [2,3,5]
Cs = np.logspace(0.01,1,5)

random_grid = {'classifier__kernel': kernels,
               'classifier__degree': degrees,
               'classifier__C': Cs}

clf = Pipeline(steps=[('preprocessor', transformations),
                      ('classifier', SVC(probability=True))])
clf = RandomizedSearchCV(clf, param_distributions = random_grid, n_iter = 100, cv = 3, verbose=2, random_state=42, n_jobs = -1, refit = True)

In [None]:
model = clf.fit(x_train, y_train)

# Predict probabilities
y_probs_tr = clf.predict_proba(x_train)
y_pred_tr = clf.predict(x_train)
cm_tr = confusion_matrix(y_train, y_pred_tr)
print("Confusion Matrix-TRAIN (before):")
disp = ConfusionMatrixDisplay(confusion_matrix=cm_tr,
                               display_labels=model.classes_)
disp.plot()
plt.show()
y_probs = clf.predict_proba(x_test)

# Make predictions
y_pred = clf.predict(x_test)
cm_ts = confusion_matrix(y_test, y_pred)
print("Confusion Matrix-TEST (before):")
disp = ConfusionMatrixDisplay(confusion_matrix=cm_ts,
                               display_labels=model.classes_)
disp.plot()
plt.show()

# Compute the FPR for each class
fpr = compute_fpr_multiclass(cm)
print("False Positive Rate (FPR) for each class:")
for idx, rate in enumerate(fpr):
    print(f"Class {idx}: {rate}")

'''
# Function to adjust decision thresholds for controlling FPR
def adjust_thresholds(y_test, y_probs, target_fpr):
    thresholds = {}
    for i in range(y_probs.shape[1]):  # Iterate over each class
        fpr, tpr, thresholds_ = roc_curve((y_test == i).astype(int), y_probs[:, i])
        # Find the threshold where FPR is just below the target FPR
        optimal_idx = np.where(fpr <= target_fpr)[0][-1]
        thresholds[i] = thresholds_[optimal_idx]
    return thresholds

# Adjust thresholds to keep FPR under 0.1 for each class
target_fpr = 0.1
thresholds = adjust_thresholds(y_test, y_probs, target_fpr)
print("Adjusted thresholds:")
print(thresholds)

# Predict with adjusted thresholds
def predict_with_thresholds(y_probs, thresholds):
    control_class1 = []
    control_class2 = []
    adjusted_preds = np.zeros(y_probs.shape[0])
    for i in range(y_probs.shape[0]): #iterate over each test record
        class_probs = y_probs[i]  #probabilities for each output class

        class_preds = (class_probs >= [thresholds[j] for j in range(len(class_probs))]).astype(int) #class_preds vecttor 1x3 con 1 se la prob è sopra soglia per una certa classe e 0 altrimenti
        #print('class_preds')
        #print(class_preds)
        if np.sum(class_preds) == 0:
            adjusted_preds[i] = np.argmax(class_probs) + 3 # Default to the highest probability class if no threshold met
            control_class1.append(adjusted_preds[i])
        else:
            adjusted_preds[i] = np.argmax(class_preds)  # Otherwise, choose the first class meeting the threshold
            control_class2.append(adjusted_preds[i])
    return adjusted_preds, control_class1, control_class2

adjusted_preds_tr, control_class1_tr, control_class2_tr = predict_with_thresholds(y_probs_tr, thresholds)
cm_tr = confusion_matrix(y_train,adjusted_preds_tr)
print("Confusion Matrix-TRAIN (after):")
disp = ConfusionMatrixDisplay(confusion_matrix=cm_tr,
                               display_labels=model.classes_)
disp.plot()
plt.show()
adjusted_preds, control_class1, control_class2 = predict_with_thresholds(y_probs, thresholds)
# Generate the confusion matrix
cm_ts = confusion_matrix(y_test,adjusted_preds)
print("Confusion Matrix-TEST (after):")
disp = ConfusionMatrixDisplay(confusion_matrix=cm_ts,
                               display_labels=model.classes_)
disp.plot()
plt.show()
# Compute the FPR for each class
fpr = compute_fpr_multiclass(cm)
print("False Positive Rate (FPR) for each class:")
for idx, rate in enumerate(fpr):
    print(f"Class {idx}: {rate}")

#print(classification_report(y_test, adjusted_preds))
'''

In [None]:
# save the classification model as a pickle file
model_pkl_file = "./svc_model.pkl"

with open(model_pkl_file, 'wb') as file:
    pickle.dump(model, file)

In [None]:
print(("best svc from random search: %.3f"
       % model.score(x_test, y_test)))

In [None]:
#model_pkl_file = "svc_model.pkl"

# load model from pickle file
#with open(model_pkl_file, 'rb') as file:
    #model = pickle.load(file)
# provide the trained ML model to DiCE's model object
backend = 'sklearn'
m = dice_ml.Model(model=model, backend=backend)
#exp = dice_ml.Dice(d,m)
# initiate DiCE
exp_genetic = dice_ml.Dice(d, m, method="genetic")

In [None]:
predictions = model.predict(x_test)
cm = confusion_matrix(y_test, predictions, labels=model.classes_)
disp = ConfusionMatrixDisplay(confusion_matrix=cm,
                               display_labels=model.classes_)
disp.plot()
plt.show()

In [None]:
test_df=x_test.copy()
test_df['real']=y_test
test_df['pred']= predictions
test_df

In [None]:
test_high=test_df.loc[test_df['pred'] == 2,:]  #select test points predicted in class 'high'
test_high

In [None]:
if not os.path.exists("/RESULTS"):
    os.makedirs('/RESULTS')
test_3.to_csv('./factuals_DiCE.csv')

In [None]:
# Generate counterfactual examples of class high->medium
dice_gen32 = exp_genetic.generate_counterfactuals(test_3.iloc[:,0:13],
                                                  total_CFs=1, desired_class=1,
                                                  features_to_vary=["SBP","DBP","BMI","LDL","HDL","TRIG","FBS","totChol"],
                                                  permitted_range={'SBP': [x / 224 for x in [90, 140]],
                                                                   'DBP': [x / 132 for x in [60, 90]],
                                                                   'BMI': [x / 87.90 for x in [18.5, 35]],
                                                                   'LDL': [x / 7 for x in [1.5, 5]],
                                                                   'HDL': [x / 4.62 for x in [1, 2.5]],
                                                                   'TRIG': [x / 10.36 for x in [0.5,5.7]],
                                                                   'FBS': [x / 33.8 for x in [3.2, 7]],
                                                                   'totChol': [x / 17.79 for x in [0.5,6.22]]})

100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 681/681 [27:44<00:00,  2.44s/it]


In [None]:
# Generate counterfactual examples of class high->low
dice_gen31 = exp_genetic.generate_counterfactuals(test_3.iloc[:, 0:13],
                                                  total_CFs=1, desired_class=0,
                                                  features_to_vary=["SBP","DBP","BMI","LDL","HDL","TRIG","FBS","totChol"],
                                                  permitted_range={'SBP': [x / 224 for x in [90, 140]],
                                                                   'DBP': [x / 132 for x in [60, 90]],
                                                                   'BMI': [x / 87.90 for x in [18.5, 35]],
                                                                   'LDL': [x / 7 for x in [1.5, 5]],
                                                                   'HDL': [x / 4.62 for x in [1, 2.5]],
                                                                   'TRIG': [x / 10.36 for x in [0.5,5.7]],
                                                                   'FBS': [x / 33.8 for x in [3.2, 7]],
                                                                   'totChol': [x / 17.79 for x in [0.5,6.22]]})

100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 681/681 [28:48<00:00,  2.54s/it]


In [None]:
# Visualize counterfactual explanation
dice_gen31.visualize_as_dataframe()

In [None]:
dice_gen32.visualize_as_dataframe()

In [None]:
# Save generated counterfactual examples to disk
counterfactuals_genetic32 = pd.DataFrame()

for i in range (0,len(test_3)):
    if (dice_gen32.cf_examples_list[i].final_cfs_df is None):
        row_df = pd.DataFrame(np.nan, index=[i],columns=df.columns)

    else:
        row_df = dice_gen32.cf_examples_list[i].final_cfs_df

    counterfactuals_genetic32 = pd.concat([counterfactuals_genetic32, row_df], ignore_index=True)


counterfactuals_genetic32.to_csv('./cf_DiCE_2.csv')

In [None]:
# Save generated counterfactual examples to disk
counterfactuals_genetic31 = pd.DataFrame()

for i in range (0,len(test_3)):
    if (dice_gen31.cf_examples_list[i].final_cfs_df is None):
        row_df = pd.DataFrame(np.nan, index=[i],columns=df.columns)

    else:
        row_df = dice_gen31.cf_examples_list[i].final_cfs_df

    counterfactuals_genetic31 = pd.concat([counterfactuals_genetic31, row_df], ignore_index=True)
counterfactuals_genetic31.to_csv('./cf_DiCE_1.csv')