This colab loads canary observations and computes an estimated epsilon.

--

Copyright 2024 DeepMind Technologies Limited.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

In [None]:
# @title Imports

import functools
import json

import dp_accounting
import matplotlib.pyplot as plt
import numpy as np
import scipy
import math
import jax

from jax_privacy.auditing import canary_score_auditor

# Audit using gradient canaries

In [None]:
# @title Load auditing observations.

TRAIN_METRICS_PATH = ''  # @param {type: 'string'}
with open(TRAIN_METRICS_PATH, 'r') as f:
  train_metrics = json.load(f)

CONFIGS_PATH = ''  # @param {type: 'string'}
with open(CONFIGS_PATH, 'r') as f:
  config = json.load(f)

# Separate observations into in and out of training canaries.
out_canary = []
in_canary = []
for i in range(len(train_metrics['canary_count'])):
  # 'canary_count' is zero if not in training.
  if not train_metrics['canary_count'][i]:
    out_canary.append(train_metrics['canary_dot_prod'][i])
  else:
    in_canary.append(train_metrics['canary_dot_prod'][i])
out_canary, in_canary = np.array(out_canary), np.array(in_canary)

In [None]:
# @title Extract config details
batch_size = config['training']['batch_size']['total']
num_samples = config['data_train']['config']['num_samples']
noise_multiplier = config['training']['dp']['algorithm']['noise_multiplier']
nom_epsilon = config['training']['dp']['auto_tune_target_epsilon']
clipping_norm = config['training']['dp']['clipping_norm']
target_delta = config['training']['dp']['delta']
num_steps = len(out_canary) + len(in_canary)

In [None]:
# @title Define functions to convert observations to epsilons


def pld_epsilon(
    sampling_probability: float,
    noise_multiplier: float,
    steps: int,
    delta: float = 1e-5,
) -> float:
  """Calculates the epsilon value for a differentially private mechanism.

  This function computes the epsilon value for a differentially private
  mechanism
  that uses Poisson sampling and Gaussian noise, using the Privacy Loss
  Distribution (PLD) accountant.

  Args:
    sampling_probability: The probability of sampling an example from the
      dataset.
    noise_multiplier: The noise multiplier for the Gaussian mechanism.
    steps: The number of steps the mechanism is applied.
    delta: The desired delta value for the (epsilon, delta)-differential privacy
      guarantee.

  Returns:
    The epsilon value for the given parameters.
  """
  event = dp_accounting.dp_event.PoissonSampledDpEvent(
      sampling_probability,
      event=dp_accounting.dp_event.GaussianDpEvent(noise_multiplier),
  )
  pld_accountant = dp_accounting.pld.PLDAccountant()
  pld_accountant.compose(event, steps)
  epsilon = pld_accountant.get_epsilon(delta)
  return epsilon


def privacy_boundary_gdp(thr: float, mu: float = 1.0) -> tuple[float, float]:
  """Get GDP tradeoff metrics for a given threshold and mu."""
  fnr = scipy.stats.norm.cdf(thr)
  fpr = 1 - scipy.stats.norm.cdf(thr - mu)
  return fnr, fpr


def find_mu_gdp(eps: float, delta: float = 1e-5) -> float:
  """Get mu GDP parameter for a given epsilon and delta."""
  dp_approx = (
      lambda best_mu: (
          scipy.stats.norm.cdf(-(eps / best_mu) + best_mu / 2)
          - np.exp(eps) * scipy.stats.norm.cdf(-(eps / best_mu) - best_mu / 2)
      )
      - delta
  )
  best_mu = scipy.optimize.bisect(dp_approx, 0.01, 10, xtol=1e-9)
  return best_mu

In [None]:
# @title Get epsilon estimate and nominal upper bound.

auditor = canary_score_auditor.CanaryScoreAuditor(
    in_canary_scores=in_canary,
    out_canary_scores=out_canary
    )

# We find the epsilon privacy parameter for the canary.
nom_canary_epsilon = pld_epsilon(
    sampling_probability=1.0,
    noise_multiplier=noise_multiplier,
    steps=1,
    delta=target_delta,
)

# Get epsilon lower bound.
eps_lb = auditor.epsilon_lower_bound(
    alpha=0.05,
    delta=1e-5)

# Get epsilon lower bound from GDP analysis.
eps_lb_from_gdp = auditor.epsilon_from_gdp(
    alpha=0.05,
    delta=1e-5)


# We turn this into a general estimate for epsilon by estimating the noise multiplier and then composing with an accountant.
estimate_noise_multiplier_fn = (
    lambda nm: eps_lb_from_gdp
    - pld_epsilon(
        sampling_probability=1.0,
        noise_multiplier=nm,
        steps=1,
        delta=target_delta,
    )
)
noise_multiplier_estimate = scipy.optimize.bisect(
    estimate_noise_multiplier_fn, 0, 10, xtol=0.01
)
epsilon_estimate = pld_epsilon(
    sampling_probability=batch_size / num_samples,
    noise_multiplier=noise_multiplier_estimate,
    steps=num_steps,
    delta=target_delta,
)

# Plot a histogram of observations.
plt.hist(out_canary, alpha=0.3, label='Canary out')
plt.hist(in_canary, alpha=0.3, label='Canary in')
plt.xlabel('Observation')
plt.title(
    f'Canary epsilon: {nom_canary_epsilon:.3f} Canary estimate from GDP:'
    f' {eps_lb_from_gdp:.3f} Canary estimate: {eps_lb:.3f}\n'
    f' Epsilon: {nom_epsilon:.3f} Estimate from GDP: {epsilon_estimate:.3f}'
)
plt.legend()
plt.show()

In [None]:
# @title Plot trade-off curve (observed and estimated).

# Boundaries for thresholding.
min_thr = min(min(out_canary), min(in_canary))
max_thr = max(max(out_canary), max(in_canary))
num_thresholds_to_try = 1000  # @param {type: 'integer'}

# Find mu and corresponding fpr and fnr.
best_mu = find_mu_gdp(eps_lb_from_gdp, delta=target_delta)
met = list(
    map(
        functools.partial(privacy_boundary_gdp, mu=best_mu),
        np.linspace(min_thr, max_thr, num_thresholds_to_try),
    )
)
mu_fnrs, mu_fprs = zip(*met)
mu_fnrs, mu_fprs = np.array(mu_fnrs), np.array(mu_fprs)

# Plot observed rates vs estimated rates.
tns, fns = canary_score_auditor._get_tn_fn_counts(in_canary, out_canary)
fps = len(in_canary) - tns
fnrs = fns / len(in_canary)
fprs = fps / len(out_canary)

plt.plot(fprs, fnrs, label='Observation', linestyle='--')
plt.plot(
    np.linspace(0, 1, 50),
    1 - np.linspace(0, 1, 50),
    color='black',
    linestyle='--',
    alpha=0.1,
)
plt.plot(mu_fprs, mu_fnrs, color='g', alpha=0.4)
plt.plot(
    1 - mu_fprs, 1 - mu_fnrs, label=f'{best_mu:.2f}-GDP', color='g', alpha=0.4
)
plt.xlabel('FPR')
plt.ylabel('FNR')
plt.legend()
plt.show()

# Audit using input canaries

Using techniques from Thomas Steinke, Milad Nasr, and Matthew Jagielski. "Privacy auditing with one (1) training run." Advances in Neural Information Processing Systems 36 (2023): 49268-49280. https://arxiv.org/abs/2305.08846

In [None]:
# @title Load auditing observations.

INPUT_CANARY_METRICS_PATH = ''  # @param {type: 'string'}
with open(INPUT_CANARY_METRICS_PATH, 'r') as f:
  input_canary_metrics = json.load(f)

INPUT_CANARY_CONFIGS_PATH = ''  # @param {type: 'string'}
with open(INPUT_CANARY_CONFIGS_PATH, 'r') as f:
  input_canary_config = json.load(f)

target_delta = input_canary_config['training']['dp']['delta']

In [None]:
#@title Methods for computing epsilon (Appendix D of https://arxiv.org/abs/2305.08846)

def p_value_DP_audit(m, r, v, eps, delta):
    """
    Calculates the p-value for a DP audit.

    Args:
        m: Number of examples, each included independently with probability 0.5.
        r: Number of guesses (excluding abstentions).
        v: Number of correct guesses by auditor.
        eps: DP guarantee of null hypothesis.
        delta: DP guarantee of null hypothesis.

    Returns:
        p: The p-value, probability of >= v correct guesses under null hypothesis.
    """
    assert 0 <= v <= r <= m
    assert eps >= 0
    assert 0 <= delta <= 1

    q = 1 / (1 + math.exp(-eps))  # accuracy of eps-DP randomized response
    beta = scipy.stats.binom.sf(v - 1, r, q)  # P[Binomial(r, q) >= v]

    alpha = 0
    sum_val = 0  # P[v > Binomial(r, q) >= vi]

    for i in range(1, v + 1):
      sum_val += scipy.stats.binom.pmf(v-i, r, q)

      if sum_val > i * alpha:
          alpha = sum_val / i

    p = beta + alpha * delta * 2 * m
    return min(p, 1)

def get_eps_audit(m, r, v, delta, p):
    """
    Calculates a lower bound on eps for a DP audit.

    Args:
        m: Number of examples, each included independently with probability 0.5.
        r: Number of guesses (excluding abstentions).
        v: Number of correct guesses by auditor.
        delta: DP guarantee of null hypothesis.
        p: 1 - confidence (e.g., p = 0.05 corresponds to 95%).

    Returns:
        eps_min: Lower bound on eps, i.e., algorithm is not (eps, delta)-DP.
    """
    assert 0 <= v < r <= m
    assert 0 <= delta <= 1
    assert 0 < p < 1

    eps_min = 0  # maintain p_value_DP(eps_min) < p
    eps_max = 1  # maintain p_value_DP(eps_max) >= p

    while p_value_DP_audit(m, r, v, eps_max, delta) < p:
        eps_max += 1

    for _ in range(30):  # binary search
        eps = (eps_min + eps_max) / 2
        if p_value_DP_audit(m, r, v, eps, delta) < p:
            eps_min = eps
        else:
            eps_max = eps

    return eps_min

In [None]:
#@title Convert observations to scores.

# Load in and out canary logits and labels saved from the final evaluation step.
in_canary_logits = np.array(input_canary_metrics['incanaryeval_logits_last'])[-1]
in_canary_labels = np.array(input_canary_metrics['incanaryeval_labels_last'])[-1]
out_canary_logits = np.array(input_canary_metrics['outcanaryeval_logits_last'])[-1]
out_canary_labels = np.array(input_canary_metrics['outcanaryeval_labels_last'])[-1]

# Convert these into a loss score.
in_canary_losses = np.sum(
    -jax.nn.log_softmax(in_canary_logits) * in_canary_labels, axis=-1)
out_canary_losses = np.sum(
    -jax.nn.log_softmax(out_canary_logits) * out_canary_labels, axis=-1)

# Convert losses to scores and compute predictions following Algorithm 1 in
# https://arxiv.org/abs/2305.08846
scores = (
    [(float(l), 1) for l in in_canary_losses] +
    [(float(l), -1) for l in out_canary_losses]
    )
sorted_scores = sorted(scores, key=lambda x: x[0])

num_pos_guesses = 20 #@param
num_neg_guesses = 20 #@param
assert num_pos_guesses + num_neg_guesses <= len(sorted_scores)


# Compute S and T variables from Algorithm 1 in https://arxiv.org/abs/2305.08846
T = (
    [1] * num_pos_guesses +
    [0] * (len(sorted_scores) - num_pos_guesses - num_neg_guesses) +
    [-1] * num_neg_guesses
)
_, S = zip(*sorted_scores)

# Output number of correct guesses.
num_correct_guesses = sum([s*t for s, t in zip(S, T)])

In [None]:
#@title Output epsilon estimate

# p = 1 - confidence
p = 0.05 # @param {type:"slider", min:0, max:1, step:0.001}

eps_lb = get_eps_audit(m=len(sorted_scores),
                       r=num_pos_guesses+num_neg_guesses,
                       v=num_correct_guesses,
                       delta=target_delta, p=p)
print(f'Epsilon lower bound: {eps_lb:.3f}')