In [1]:
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import LabelEncoder
import logging
import random

# Dataset 1: Heart Disease

In [2]:
logging.info('Loading dataset 1: heart disease (continuous features) ...')

Load dataset one: heart disease

In [3]:
filename_num = '../data/framingham.csv'

all_features_num = ['sex', 'age', 'education', 'smoker', 'cigs_per_day', 'bp_meds', 'prevalent_stroke', 'prevelant_hyp', \
                    'diabetes', 'total_chol', 'sys_bp', 'dia_bp', 'bmi', 'heart_rate', 'glucose', 'heart_disease_label']

data_num = pd.read_csv(filename_num, names=all_features_num)

For this dataset we only look at numerical data so we drop the categorical columns. We also drop the column "education" for which there is no feature description on kaggle: https://www.kaggle.com/dileep070/heart-disease-prediction-using-logistic-regression

In [4]:
data_num = data_num.drop('sex', axis=1).drop('smoker', axis=1).drop('bp_meds', axis=1).drop('prevalent_stroke', axis=1)\
    .drop('prevelant_hyp', axis=1).drop('diabetes', axis=1).drop('education', axis=1)

data_num.head(5)

Unnamed: 0,age,cigs_per_day,total_chol,sys_bp,dia_bp,bmi,heart_rate,glucose,heart_disease_label
0,39,0.0,195.0,106.0,70.0,26.97,80.0,77.0,0
1,46,0.0,250.0,121.0,81.0,28.73,95.0,76.0,0
2,48,20.0,245.0,127.5,80.0,25.34,75.0,70.0,0
3,61,30.0,225.0,150.0,95.0,28.58,65.0,103.0,1
4,46,23.0,285.0,130.0,84.0,23.1,85.0,85.0,0


Remove any rows that are missing data. Afterwards there should be no more entries with NaN values. We also drop any duplicate rows.

In [5]:
data_num = data_num.dropna()
data_num = data_num.drop_duplicates()

data_num = data_num.astype(float)

data_num.isnull().sum()

age                    0
cigs_per_day           0
total_chol             0
sys_bp                 0
dia_bp                 0
bmi                    0
heart_rate             0
glucose                0
heart_disease_label    0
dtype: int64

In [6]:
continuous_features_num = ['age', 'cigs_per_day', 'total_chol', 'sys_bp', 'dia_bp', 'bmi', 'heart_rate', 'glucose']
outcome_name_num = 'heart_disease_label'

# Dataset 2: Census Income (categorical)

In [7]:
logging.info('Loading dataset 2: census income (categorical features) ...')

Load dataset two: census income

In [8]:
filename_cat = '../data/adult.data.csv'

all_features_cat = ['age', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital_status', 'occupation', \
         'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_per_week', 'native_country', 'label']

data_cat = pd.read_csv(filename_cat, names=all_features_cat)

There is missing data in the columns workclass, native_country and occupation that needs to be removed.

In [9]:
data_cat = data_cat[data_cat.workclass != ' ?']
data_cat = data_cat[data_cat.native_country != ' ?']
data_cat = data_cat[data_cat.occupation != ' ?']

We will only use the categorical features of this dataset. Remove continuous columns:

In [10]:
data_cat = data_cat.drop('age', axis=1).drop('fnlwgt', axis=1).drop('education_num', axis=1).drop('capital_gain', axis=1)\
    .drop('capital_loss', axis=1).drop('hours_per_week', axis=1)

data_cat.head(3)

Unnamed: 0,workclass,education,marital_status,occupation,relationship,race,sex,native_country,label
0,State-gov,Bachelors,Never-married,Adm-clerical,Not-in-family,White,Male,United-States,<=50K
1,Self-emp-not-inc,Bachelors,Married-civ-spouse,Exec-managerial,Husband,White,Male,United-States,<=50K
2,Private,HS-grad,Divorced,Handlers-cleaners,Not-in-family,White,Male,United-States,<=50K


Transform workclass, education, marital_status, occupation, relationship, race, sex and native_country into label encoded features:

In [11]:
data_cat['workclass_encoded'] = LabelEncoder().fit_transform(data_cat['workclass'])
data_cat['education_encoded'] = LabelEncoder().fit_transform(data_cat['education'])
data_cat['marital_status_encoded'] = LabelEncoder().fit_transform(data_cat['marital_status'])
data_cat['occupation_encoded'] = LabelEncoder().fit_transform(data_cat['occupation'])
data_cat['relationship_encoded'] = LabelEncoder().fit_transform(data_cat['relationship'])
data_cat['race_encoded'] = LabelEncoder().fit_transform(data_cat['race'])
data_cat['native_country_encoded'] = LabelEncoder().fit_transform(data_cat['native_country'])

data_cat = data_cat.drop('workclass', axis=1).drop('education', axis=1).drop('marital_status', axis=1)\
    .drop('occupation', axis=1).drop('relationship', axis=1).drop('race', axis=1).drop('native_country', axis=1)
    
data_cat.head(3)

Unnamed: 0,sex,label,workclass_encoded,education_encoded,marital_status_encoded,occupation_encoded,relationship_encoded,race_encoded,native_country_encoded
0,Male,<=50K,5,9,4,0,1,4,38
1,Male,<=50K,4,9,2,3,0,4,38
2,Male,<=50K,2,11,0,5,1,4,38


Transform label and sex into binary encoding:

In [12]:
data_cat['female'] = data_cat['sex'].map( {' Male': 0, ' Female': 1} )
data_cat['income'] = data_cat['label'].map( {' <=50K': 0, ' >50K': 1} )

data_cat = data_cat.drop('sex', axis=1).drop('label', axis=1)

data_cat.head(3)

Unnamed: 0,workclass_encoded,education_encoded,marital_status_encoded,occupation_encoded,relationship_encoded,race_encoded,native_country_encoded,female,income
0,5,9,4,0,1,4,38,0,0
1,4,9,2,3,0,4,38,0,0
2,2,11,0,5,1,4,38,0,0


Drop duplicates.

In [13]:
data_cat = data_cat.drop_duplicates()

In [14]:
continuous_features_cat = []

outcome_name_cat = 'income'

# Membership Inference and Training Data Extraction Experiment Framework

In [1]:
class XaiPrivacyExperiment():
    """Generic framework for an XAI and data privacy experiment (membership inference or training data extraction)
    
    Attributes
    ----------
    rs, rng
        Random states for numpy
    data
        Pandas dataframe of the dataset that the experiment is executed on. Contains features and labels.
    continuous_features : list[str]
        The continuous feature names of the dataset.
    categorical_features : list[str]
        The categorical feature names of the dataset.
    outcome_name : str
        The name of the column that contains the labels.
    features
        Pandas dataframe that only contains the feature values of all samples (not labels).
    labels
        Pandas dataframe that only contains the labels of all samples (not features).
        
    Methods
    -------
    membership_inference_experiment(repetitions: int, model, model_access: bool)
        Executes membership inference experiment
    train_explainer(data_train, model):
        Trains the explainer on the given data and model (abstract method).
    membership_inference_attack_model_access(explainer, samples_df, model):
        Executes membership inference attack with access to the model
    membership_inference_attack_no_model_access(explainer, samples_df):
        Executes membership inference attack without access to the model
    training_data_extraction_model_access(explainer, stop_after, feature_format, rng, model):
        Executes training data extraction attack with access to the model
    training_data_extraction_no_model_access(explainer, stop_after, feature_format, rng):
        Executes training data extraction attack without access to the model
    
    """
    
    def __init__(self, data, continuous_features, outcome_name, random_state: int):
        """        
        Parameters
        ----------
        data
            Pandas dataframe of the dataset that the experiment is executed on. Contains features and labels.
        continuous_features : list[str]
            The continuous feature names of the dataset.
        outcome_name : str
            The name of the column that contains the labels.
        random_state: int
            The seed for all random actions during the experiment (such as drawing test samples)
        """
        # create random state from seed. This will be used for all random actions (such as drawing test samples)
        self.rs = np.random.RandomState(seed=random_state)
        self.rng = np.random.default_rng(random_state)
        random.seed(random_state)
        
        self.data = data
        self.continuous_features = continuous_features
        self.outcome_name = outcome_name
    
        # split dataset into features and labels.
        self.features = self.data.drop(outcome_name, axis=1)
        self.labels = self.data[outcome_name]
        
        # names of the categorical features
        self.categorical_features = self.features.columns.difference(continuous_features)
    
    def membership_inference_experiment(self, repetitions: int, model, model_access: bool):
        """Executes membership inference experiment
        
        Executes the membership inference experiment with the dataset that this object was instantiated with. Trains given
        model on half the dataset and tests accuracy, precision and recall of the implemented membership inference attack.
        If model_access is True, the attack method with the parameter "model" is used (the attacker has access to the model).
        Otherwise, the attack method without that parameter is used (the attacker has no access to the model).
        
        Parameters
        ----------
        repetitions : int
            Number of test samples that the membership inference attack is attempted on. Should not be greater the len(data)
        model
            The untrained model used in the experiment
        model_access : bool
            Whether the membership inference attack is executed with attacker access to the model or without.
        """
        # create pipeline that transforms categorical features to one hot encoding
        model = self._model_pipeline(model)

        # split data into two halves (one is used for training and inference, the other only for inference)
        # data_train contains all training samples with labels, while x_train only contains the samples without labels
        # and y_train only contains the labels without features.
        data_train, x_train, y_train, x_ctrl, y_ctrl = self._split_data()

        # train classifier on training data
        model = model.fit(x_train, y_train)

        # train explainer on training data and classifier
        explainer = self.train_explainer(data_train, model)
        
        # draw test samples from training and control data. record each sample's membership in training data.
        samples_df, actual_membership = self._draw_test_samples(repetitions, x_train, x_ctrl)
            
        # infer membership using membership inference attack against the explainer
        if model_access:
            inferred_membership = self.membership_inference_attack_model_access(explainer, samples_df, model)
        else:
            inferred_membership = self.membership_inference_attack_no_model_access(explainer, samples_df)

        # calculate accuracy, precision and recall
        return self._calc_accuracy_precision_recall(repetitions, actual_membership, inferred_membership)

    def training_data_extraction_experiment(self, stop_after: None or int, model, model_access: bool):
        """Executes training data extraction experiment
        
        Executes the training data extraction experiment with the dataset that this object was instantiated with. Trains given
        model on dataset and tests precision and recall of the implemented training data extraction attack.
        If model_access is True, the attack method with the parameter "model" is used (the attacker has access to the model).
        Otherwise, the attack method without that parameter is used (the attacker has no access to the model).
        
        Parameters
        ----------
        stop_after : None or int
            The amount of samples that the attack is meant to attempt extraction for. If None, the attack attempts to extract
            the entire dataset.
        model
            The untrained model used in the experiment.
        model_access : bool
            Whether the attack is executed with attacker access to the model or without.
        """
        # create pipeline that transforms categorical features to one hot encoding
        model = self._model_pipeline(model)

        # train classifier on dataset
        model = model.fit(self.features, self.labels)

        # train explainer on training data and classifier
        explainer = self.train_explainer(self.data, model)
        
        # generate the feature format information that is available to the attacker
        feature_format = self._generate_feature_info(self.features, self.continuous_features)
            
        # extract samples using training data extraction attack against the explainer
        if model_access:
            extracted_samples = self.training_data_extraction_model_access(explainer, stop_after, feature_format, self.rng, model)
        else:
            extracted_samples = self.training_data_extraction_no_model_access(explainer, stop_after, feature_format, self.rng)

        # compare the extracted samples to the training data -> number of accurate extractions
        accurate_samples, num_extracted_samples, all_samples = self._compare_data(extracted_samples, self.data, stop_after)
            
        # calculate precision and recall
        return self._calc_precision_recall_tde(accurate_samples, num_extracted_samples, all_samples)
    
    def _model_pipeline(self, model):
        if len(self.categorical_features) > 0:
            # Define transformer to transform categorical features into one-hot encoding
            categorical_transformer = Pipeline(steps=[
                ('onehot', OneHotEncoder(handle_unknown='ignore'))
            ])

            transformations = ColumnTransformer(transformers=[
                ('cat', categorical_transformer, self.categorical_features)
            ])


            return Pipeline(steps=[('preprocessor', transformations),
                                  ('classifier', model)])
        else:
            # if there are no categorical features, then nothing needs to be transformed
            return model
    
    def _split_data(self):
        # split data into two halves. One is used for training, the other as control data that is not part of the training data.
        # this control data will be needed as test samples that do not belong to the training data.
        idx_mid = int(self.features.shape[0] / 2)

        data_train = self.data.iloc[idx_mid:, :]
        
        x_train = self.features.iloc[idx_mid:, :]
        y_train = self.labels.iloc[idx_mid:]
        
        x_ctrl = self.features.iloc[:idx_mid, :]
        y_ctrl = self.labels.iloc[:idx_mid]
        
        return data_train, x_train, y_train, x_ctrl, y_ctrl
    
    def _draw_test_samples(self, repetitions, x_train, x_ctrl):
        # create new dataframe that will hold all test samples for the experiment
        samples_df = pd.DataFrame(columns=list(self.features.columns.values))
        
        # record each test samples actual membership. If the sample comes from the training data -> True. If the sample comes
        # from the control data -> False.
        sample_membership = np.empty(repetitions)

        # half the test samples come from the training data, the other half from the control data
        for i in range(repetitions):
            if i % 2 == 0:
                # choose sample from training data.
                sample = x_train.sample(random_state=self.rs)
                sample_membership[i] = True
                logging.debug('%s taken from training data' % sample.to_numpy())
            else:
                # choose sample from control data.
                sample = x_ctrl.sample(random_state=self.rs)
                sample_membership[i] = False
                logging.debug('%s taken from control data' % sample.to_numpy())

            samples_df = samples_df.append(sample, ignore_index=True)
            
        return samples_df, sample_membership
    
    @staticmethod
    def _calc_accuracy_precision_recall(repetitions, actual_membership, inferred_membership):
        samples_in_training_data = np.count_nonzero(actual_membership)
        samples_not_in_training_data = repetitions - samples_in_training_data

        pred_positives = np.count_nonzero(inferred_membership)

        correct_predictions = np.count_nonzero(np.equal(inferred_membership, actual_membership))
        true_positives = np.count_nonzero(inferred_membership[actual_membership == True])

        accuracy = correct_predictions / repetitions
        if pred_positives > 0:
            precision = true_positives / pred_positives
        else:
            # If the attack predicted membership for no test sample then precision cannot be calculated
            precision = float("NaN")
        recall = true_positives / samples_in_training_data
        
        print(f'Accuracy: {accuracy}, precision: {precision}, recall: {recall}')
        
        return accuracy, precision, recall
    
    @staticmethod
    def _generate_feature_info(features, continuous_features):
        feature_information = []
        
        features_np = features.to_numpy()
        
        # Get the minimum and maximum value for all continuous features in the training data.
        # Get the categories for all categorical features.
        for i, feature_name in enumerate(features.columns.values):
            this_feature = {'name': feature_name}

            if feature_name in continuous_features:
                this_feature['isCont'] = True

                this_feature['min'] = np.amin(features_np[:, i])
                this_feature['max'] = np.amax(features_np[:, i])

            else:
                this_feature['isCont'] = False

                this_feature['categories'] = features[feature_name].unique()

            feature_information.append(this_feature)
            
        return feature_information
    
    @staticmethod
    def _compare_data(extracted_samples, actual_samples, stop_after: None or int):
        # convert data to numpy so that comparison becomes simpler
        extracted_samples = extracted_samples.to_numpy().astype(float)
        actual_samples = actual_samples.to_numpy().astype(float)
        
        # If only the features (without the labels) were extracted, then the labels are cut off from the actual_samples array
        # in order to be able to compare the two arrays
        if actual_samples.shape[1] > extracted_samples.shape[1]:
            actual_samples = actual_samples[:,:-1]
            
        # drop duplicates from the extracted samples and from the actual samples to get accurate precision/recall
        extracted_samples = np.unique(extracted_samples, axis=0)
        actual_samples = np.unique(actual_samples, axis=0)
        
        # all_samples is the maximum amount of samples that could have been extracted during this attack
        # If stop_after is None, it means the attack attempted to extracted all samples in the training data.
        # Otherwise the attack stopped after the first stop_after training samples.
        if stop_after is None:
            all_samples = len(actual_samples)
        else:
            all_samples = stop_after
        
        num_extracted_samples = extracted_samples.shape[0]
        num_accurate_samples = 0
        
        for extracted_sample in extracted_samples:
            logging.debug(f'Extracted sample: {extracted_sample}')

            # Get all indices of the extracted sample in the given training data. features_np == row creates a boolean array 
            # with True if the cells match and False otherwise. all(axis=1) returns for each row if all elements in the row 
            # are True. np.where returns an array of indices where the boolean array contains the value True.
            close_values = np.isclose(actual_samples, extracted_sample)
            close_rows = close_values.all(axis=1)
            indices_of_sample = np.where(close_rows)[0]

            if indices_of_sample.shape[0] > 0:
                logging.debug(f'Appears in training data at indices {indices_of_sample}')
                num_accurate_samples += 1
            else:
                logging.debug('Does not appear in training data')
        
        return num_accurate_samples, num_extracted_samples, all_samples
    
    @staticmethod
    def _calc_precision_recall_tde(accurate_samples, num_extracted_samples, all_samples):
        # Percentage of extracted samples that actually appears within the training data
        if num_extracted_samples > 0:
            precision = accurate_samples / num_extracted_samples
        else:
            # If the attack did not extract a single sample then precision cannot be calculated
            precision = float("NaN")

        recall = accurate_samples / all_samples
        
        print(f'Number of extracted samples: {num_extracted_samples}')
        print(f'Number of accurate extracted samples: {accurate_samples}')
        print(f'Precision: {precision}, recall: {recall}')
        
        return precision, recall
    
    def train_explainer(self, data_train, model):
        """Trains the explainer on the given data and model
        
        Abstract method that must be implemented by subclass. Returns the explainer.
        
        Parameters
        ----------
        data_train
            The training data (features and labels).
        model
            The trained model that will be explained by the explainer.
            
        Raises
        ------
        NotImplementedError
            Must be implemented by subclass.
        """
        
        raise NotImplementedError
    
    @staticmethod
    def membership_inference_attack_model_access(explainer, samples_df, model):
        """Executes membership inference attack with access to the model
        
        Abstract method that must be implemented by subclass. Executes the attack against the explainer with access to the 
        model. Infers membership for each sample in samples_df. Returns an numpy array with boolean values indicating the 
        inferred membership of each test sample. Must be same length as samples_df.
        
        Parameters
        ----------
        explainer
            The explainer or explanation that will be attacked.
        samples_df
            A pandas dataframe that contains the feature values of all test samples.
        model
            The trained model that is explained by the explainer.
            
        Raises
        ------
        NotImplementedError
            Must be implemented by subclass.
        """
        raise NotImplementedError
    
    @staticmethod
    def membership_inference_attack_no_model_access(explainer, samples_df):
        """Executes membership inference attack without access to the model
        
        Abstract method that must be implemented by subclass. Executes the attack against the explainer without access to the 
        model. Infers membership for each sample in samples_df. Returns an numpy array with boolean values indicating the 
        inferred membership of each test sample. Must be same length as samples_df.
        
        Parameters
        ----------
        explainer
            The explainer or explanation that will be attacked.
        samples_df
            A pandas dataframe that contains the feature values of all test samples.
            
        Raises
        ------
        NotImplementedError
            Must be implemented by subclass.
        """
        raise NotImplementedError
        
    @staticmethod
    def training_data_extraction_model_access(explainer, stop_after, feature_format, rng, model):
        """Executes training data extraction attack with access to the model
        
        Abstract method that must be implemented by subclass. Executes the attack against the explainer with access to the 
        model. Attempts to extract stop_after samples. If stop_after is None, attempt to extract all samples. 
        Returns a dataframe containing all extracted samples.
        
        Parameters
        ----------
        explainer
            The explainer or explanation that will be attacked.
        stop_after : None or int
            The amount of samples that the attack is meant to attempt extraction for. If None, the attack attempts to extract
            the entire dataset.
        feature_format
            A dictionary that contains information for each sample (whether it is continuous or categorical, minimum, maximum,
            the categories)
        rng
            Numpy rng object that can be used for reproducible random decisions.
        model
            The trained model that is explained by the explainer.
            
        Raises
        ------
        NotImplementedError
            Must be implemented by subclass.
        """
        raise NotImplementedError
        
    @staticmethod
    def training_data_extraction_no_model_access(explainer, stop_after, feature_format, rng):
        """Executes training data extraction attack without access to the model
        
        Abstract method that must be implemented by subclass. Executes the attack against the explainer without access to the 
        model. Attempts to extract stop_after samples. If stop_after is None, attempt to extract all samples. 
        Returns a dataframe containing all extracted samples.
        
        Parameters
        ----------
        explainer
            The explainer or explanation that will be attacked.
        stop_after : None or int
            The amount of samples that the attack is meant to attempt extraction for. If None, the attack attempts to extract
            the entire dataset.
        feature_format
            A dictionary that contains information for each sample (whether it is continuous or categorical, minimum, maximum,
            the categories)
        rng
            Numpy rng object that can be used for reproducible random decisions.
            
        Raises
        ------
        NotImplementedError
            Must be implemented by subclass.
        """
        raise NotImplementedError