In [None]:
!pip install numpy pandas seaborn tqdm scipy scikit-learn privacy-meter tensorflow

In [6]:
import warnings

import pandas as pd
import numpy as np
import seaborn as sns
import tensorflow as tf

from tqdm import tqdm as tqdm
from scipy import stats

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, log_loss, roc_curve, roc_auc_score, auc

0


## Train a Target Model

### Get and prepare the data

In [2]:
# Dataset: https://figshare.le.ac.uk/articles/dataset/Myocardial_infarction_complications_Database/12045261/3
# Myocardial infarction complications
df = pd.read_csv("https://figshare.le.ac.uk/ndownloader/files/23581310")
print(df.shape)

complications = df.columns[-12:]
target_complication = "ZSN"  # Chronic heart failure
freq_na_cols = df.columns[df.isna().mean(axis=0) > 0.1]
df = df.drop(columns=freq_na_cols).dropna()
print(df.shape)

X = df.drop(columns=complications)
y = df[target_complication]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.75, random_state=42)
X_test, X_population, y_test, y_population = train_test_split(X_test, y_test, test_size = 0.666, random_state=42)
print(len(X_train), len(X_test), len(X_population))

(1700, 124)
(1074, 107)
268 269 537


### Fit a simple classifier

In [3]:
def train_model(X_train, y_train, seed=42):
    rfc = RandomForestClassifier(
        n_estimators=80,
        min_samples_split=2,
        max_depth=10,
        random_state=seed,
    )
    rfc.fit(X_train, y_train)
    return rfc
    
target_model = train_model(X_train, y_train)

In [None]:
# The classifier's accuracy vs. random baseline. We are doing a bit better than the baseline.
print(f"Baseline: {max(y_test.mean(), 1 - y_test.mean()):0.2f}")
print(f"Our test-score: {target_model.score(X_test, y_test):0.2f}" )
print(f"Our population-score: {target_model.score(X_population, y_population):0.2f}" )

In [None]:
from privacy_meter.dataset import Dataset

# create the target model's dataset
train_ds = {'x': X_train, 'y': y_train}
test_ds = {'x': X_test, 'y': y_test}
target_dataset = Dataset(
    data_dict={'train': train_ds, 'test': test_ds},
    default_input='x', default_output='y'
)

# create the reference dataset
population_ds = {'x': X_population, 'y': y_population}
reference_dataset = Dataset(
    # this is the default mapping that a Metric will look for
    # in a reference dataset
    data_dict={'train': population_ds},
    default_input='x', default_output='y'
)

In [None]:
from privacy_meter.model import TensorflowModel
loss_fn = tf.keras.losses.CategoricalCrossentropy()
target_model = TensorflowModel(model_obj=target_model, loss_fn=loss_fn)

from privacy_meter.audit import Audit, MetricEnum
from privacy_meter.audit_report import ROCCurveReport, SignalHistogramReport
from privacy_meter.constants import InferenceGame
from privacy_meter.information_source import InformationSource

target_info_source = InformationSource(
    models=[target_model], 
    datasets=[target_dataset]
)

reference_info_source = InformationSource(
    models=[target_model],
    datasets=[reference_dataset]
)

## Measuring Population-Wise Privacy Leakage

In [None]:
def measure_membership_vulnerability(vals_train, vals_test, target_fpr=0.01):
    vals = np.concatenate([vals_train, vals_test])
    membership_labels = np.concatenate([[1] * len(vals_train), [0] * len(vals_test)])
    best_tpr = 0.0
    best_threshold = None
    best_preds = np.zeros_like(vals)
    # We find a threshold which maximizes attack TPR for a given level of FPR.
    for threshold in vals:
        tpr = (vals_train > threshold).mean()
        fpr = (vals_test > threshold).mean()
        if fpr <= target_fpr and tpr > best_tpr:
            best_threshold = threshold
            best_tpr = tpr
            best_preds = (vals > threshold)
    
    return membership_labels, best_preds

In [None]:
def logit_scale(y_true, y_pred, eps=1e-16):
    """
    Logit scaling from https://arxiv.org/abs/2112.03570
    """
    if isinstance(y_true, np.integer):
        y_true = [y_true] * len(y_pred)
        
    result = []
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    for i in range(len(y_true)):
        y = y_true[i]
        pred = y_pred[i, y]
        result.append(
            np.log(np.clip(pred / np.clip(1 - pred, eps, np.inf), eps, np.inf))
        )
    return np.array(result)

In [None]:
def visualize_vals(train_vals, test_vals):
    """Visualize feature values on train and test data."""
    return sns.displot(
        data=pd.concat([
                pd.DataFrame(dict(val=train_vals)).assign(membership="train"),
                pd.DataFrame(dict(val=test_vals)).assign(membership="test"),
        ]),
        x="val",
        hue="membership",
        kind="hist",
        stat="probability",
        rug=True,
        common_norm=False
   )

In [None]:
# Extract the features for the membership inference attack.
preds_train = target_model.predict_proba(X_train)
preds_test = target_model.predict_proba(X_test)

logits_train = logit_scale(y_train, preds_train)
logits_test = logit_scale(y_test, preds_test)

In [None]:
# Visualize the features. If it is possible to tell train data from test data, then
# our model is vulnerable to membership inference.
visualize_vals(logits_train, logits_test)

### What's the membership attack TPR at a given FPR?

In [None]:
target_fpr = 0.5
membership_labels, membership_preds = measure_membership_vulnerability(
    logits_train, logits_test, target_fpr
)
print(f"Attack TPR = {(membership_preds[membership_labels == 1] == 1).mean():.2%} @ FPR = {target_fpr:.2%}")

### ROC curve of the membership attack

In [None]:
# In principle, we do not even need to implement the threshold selection in `measure_membership_vulnerability`.
# Here I am just using roc_curve from scikit-learn.

fpr, tpr, _ = roc_curve(membership_labels, np.concatenate([logits_train, logits_test]))
roc_auc = auc(fpr, tpr)

roc_df = pd.DataFrame({'FPR': fpr, 'TPR': tpr})

sns.lineplot(x='FPR', y='TPR', data=roc_df, label=f"AUC = {roc_auc:0.2f}", errorbar=None)
sns.lineplot(x=[0, 1], y=[0, 1], color='grey', linestyle='--')

## Measuring True Privacy Leakage

In [None]:
def compute_feature_func(func, models):
    return np.array([func(model) for model in models])

In [None]:
num_examples_to_attack = 50
num_ref_models = 10

# Collect some arbitrary target examples to attack.
examples_to_attack = []

# ...half from the training data.
for index in X_train.index[:num_examples_to_attack // 2]:
    examples_to_attack.append((index, X_train.loc[index], y_train.loc[index], 1))
    
# ...half from the test data.
for index in X_test.index[:num_examples_to_attack // 2]:
    examples_to_attack.append((index, X_test.loc[index], y_test.loc[index], 0))

result = []

# Now run the re-training attacks!
for index, x, y, is_member in tqdm(examples_to_attack):
    # First, train a bunch of models without the target example (if it is in fact part of the training data)
    out_models = []
    for seed in range(num_ref_models):
        ref_model = train_model(
            X_train.drop(index=[index], errors="ignore"),
            y_train.drop(index=[index], errors="ignore"),
            seed=seed
        )
        out_models.append(ref_model)
    
    # Compute the attack features.
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        preds_in = compute_feature_func(lambda model: model.predict_proba([x])[0], [target_model])
        preds_out = compute_feature_func(lambda model: model.predict_proba([x])[0], out_models)
    
    logit_in = logit_scale(y, preds_in)[0]
    logits_out = logit_scale(y, preds_out)
    
    # Next, we run a parametric test. We assume that "out" logits are Gaussian-distributed, 
    # so compute their mean and variance.
    logits_out_mean = np.mean(logits_out)
    logits_out_var = np.var(logits_out)
    
    # The parametric test is computing the probability that the "out" logits are less than "in" logit,
    # which means that we predict the target record as a member:
    # 
    #   Pr[logit_out <= logit_in], where logit_out ~ Normal(mean, var) with mean and var
    #   estimated from reference models.
    #
    # See https://arxiv.org/abs/2112.03570, Eq. (4)
    prob = stats.norm(logits_out_mean, logits_out_var).cdf(logit_in) 
    
    result.append(dict(
        target_index=index,
        is_member=is_member,
        prob=prob,
    ))

In [None]:
fpr, tpr, _ = roc_curve(pd.DataFrame(result).is_member, pd.DataFrame(result).prob)
roc_auc = auc(fpr, tpr)

roc_df = pd.DataFrame({'FPR': fpr, 'TPR': tpr})

sns.lineplot(x='FPR', y='TPR', data=roc_df, label=f"AUC = {roc_auc:0.2f}", errorbar=None)
sns.lineplot(x=[0, 1], y=[0, 1], color='grey', linestyle='--')