In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.tree import DecisionTreeClassifier
from typing import List, Dict, Tuple

In [3]:
# Load data
# df = pd.read_csv("data/heart_2020_cleaned.csv")
# train, val = train_test_split(df, test_size=0.3, random_state=42, stratify=df["HeartDisease"])
# val, test = train_test_split(val, test_size=0.5, random_state=42, stratify=val["HeartDisease"])
# train.to_csv("data/heart_train.csv", index=False)
# val.to_csv("data/heart_val.csv", index=False)
# test.to_csv("data/heart_test.csv", index=False)

In [2]:
train = pd.read_csv('data/heart_train.csv')
val = pd.read_csv('data/heart_val.csv')
test = pd.read_csv('data/heart_test.csv')

In [3]:
# Preprocessing data

categorical_features = [
    "Smoking",
    "AlcoholDrinking",
    "Stroke",
    "DiffWalking",
    "Sex",
    "AgeCategory",
    "Race",
    "Diabetic",
    "PhysicalActivity",
    "GenHealth",
    "Asthma",
    "KidneyDisease",
    "SkinCancer"
]

continuos_features = [
    "BMI",
    "PhysicalHealth",
    "MentalHealth"
]

target_variable = "HeartDisease"

def data_preprocessing(data: pd.DataFrame, categorical_features: List[str], continuous_features: List[str], target_variable: str):

    df = data.copy()

    # protected variables
    sex = df["Sex"].values
    age = df["AgeCategory"].values
    race = df["Race"].values

    # target
    target = df[target_variable].values

    df_processed = df[categorical_features + continuous_features]
    df_processed = pd.get_dummies(df_processed, prefix=None, prefix_sep='_', dummy_na=False, columns=categorical_features, drop_first=True)

    return df_processed, df, target, sex, age, race

#df_processed, df_original, target, sex, age, race = data_preprocessing(df, categorical_features=categorical_features, continuous_features=continuos_features, target_variable=target_variable)
train_processed, train_original, train_target, train_sex, train_age, train_race = data_preprocessing(train, categorical_features=categorical_features, continuous_features=continuos_features, target_variable=target_variable)
val_processed, val_original, val_target, val_sex, val_age, val_race = data_preprocessing(val, categorical_features=categorical_features, continuous_features=continuos_features, target_variable=target_variable)

In [9]:
# just renaming stuff
# X_train, X_val = train_processed.values, val_processed.values
y_train, y_val = train_target, val_target

In [5]:
# Fairness metric function
def equailized_odds(preds, groups, test):
    print('Equalized Odds')
    df = pd.DataFrame(list(zip(preds, groups, test)), columns=['preds', 'groups', 'test'])
    labels = []
    y = []
    total_class_difference = 0
    for target in df['test'].unique():
        for group in df['groups'].unique():
            selection = df.loc[(df['test'] == target) & (df['groups'] == group)]
            corrects = selection.loc[selection['preds'] == True]
            score = round(len(corrects) / len(selection), 3)
            print(f'Target [{target}] and group [{group}]: {score} ')
            labels.append(f'T: {target}, G: {group}')
            y.append(score)

In [6]:
# Train whitebox model

# Scale continuous variables
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler
# scaler = ColumnTransformer([('scaler', StandardScaler(), continuos_features)], remainder='passthrough')

# Whitebox model
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import classification_report
from sklearn import tree
import matplotlib.pyplot as plt
from sklearn.pipeline import Pipeline

# whitebox_model = Pipeline(steps=[('scaler', scaler), ('clf', DecisionTreeClassifier(min_samples_split = 0.01, min_samples_leaf= 0.01, max_features="auto", max_depth = 5, criterion = "gini", random_state = 42))])
# # whitebox_model = Pipeline(steps=[('scaler', scaler), ('clf', DecisionTreeClassifier(random_state = 42))])

# whitebox_model.fit(X_train, y_train)
# y_pred_whitebox = whitebox_model.predict(X_val)
# y_pred_proba_whitebox = whitebox_model.predict_proba(X_val)

# print(classification_report(y_val, y_pred_whitebox))

# # plot tree
# plt.figure(figsize=(25,20))  # set plot size (denoted in inches)
# tree.plot_tree(whitebox_model['clf'], fontsize=9, feature_names=df_processed.columns)
# plt.show()

# Optuna Study

In [7]:
from typing import Callable, List

import numpy as np
from sklearn import metrics

import optuna

optuna.logging.set_verbosity(1)

class Objective(object):
    def __init__(
        self,
        X_train: np.ndarray,
        X_val: np.ndarray,
        y_train: np.ndarray,
        y_val: np.ndarray,
        evaluation_func: Callable,
    ):
        self.X_train = X_train
        self.X_val = X_val
        self.y_train = y_train
        self.y_val = y_val
        self.evaluation_func = evaluation_func

    def __call__(self, trial) -> float:
        """This method is called by Optuna to compute the objective
        function."""
        # Initialize general hyper parameters

        params = {
            "criterion": trial.suggest_categorical("criterion", ["gini", "entropy"]),
            "max_depth": trial.suggest_int("max_depth", 5, 15, step = 2),
            "min_samples_split": trial.suggest_loguniform("min_samples_split", 1e-3, 0.01),
            "min_samples_leaf": trial.suggest_loguniform("min_samples_leaf", 1e-3, 0.01),
        }

        # parameters for fitting a model
        whitebox_model = DecisionTreeClassifier(**params, random_state=42).fit(self.X_train, self.y_train)


        preds: np.ndarray = whitebox_model.predict(self.X_val)

        return self.evaluation_func(preds, self.y_val)

  from .autonotebook import tqdm as notebook_tqdm


In [10]:
# Run optimization
study = optuna.create_study(direction="maximize")

# Scaling
# Scale continuous variables
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler
scaler = ColumnTransformer([('scaler', StandardScaler(), continuos_features)], remainder='passthrough')
X_train = scaler.fit_transform(train_processed)
X_val = scaler.transform(val_processed)

# Define objective
objective = Objective(X_train, X_val, y_train, y_val, metrics.accuracy_score)

# Make a study to optimize the objective.
study.optimize(objective, n_trials=5, n_jobs = -1, show_progress_bar=True)

[32m[I 2022-05-20 17:13:57,310][0m A new study created in memory with name: no-name-154f7b3e-c2e9-437b-87ff-c67f5ed70b7a[0m


: 

: 

# EDA

In [None]:
plt.rcParams["figure.figsize"] = (20,20)

# Checking for correlation between variables
from dython import nominal

data_test = train_original.copy()

# Encode the object as an enumerated type or categorical variable.
data_test[categorical_features] = data_test[categorical_features].apply(lambda x : pd.factorize(x)[0])
nominal.associations(data_test)