In [1]:
import math
import numpy as np

def set_size(width, fraction=1, subplots=(1, 1),ratio=0.618)->dict:
    """Set figure dimensions to avoid scaling in LaTeX"""
    if width == 'thesis':
        width_pt = 426.79135
    elif width == 'beamer':
        width_pt = 307.28987
    else:
        width_pt = width

    # Width of figure (in pts)
    fig_width_pt = width_pt * fraction
    # Convert from pt to inches
    inches_per_pt = 1 / 72.27

    # Golden ratio to set aesthetic figure height
    golden_ratio = ratio

    # Figure width in inches
    fig_width_in = fig_width_pt * inches_per_pt
    # Figure height in inches
    fig_height_in = fig_width_in * golden_ratio * (subplots[0] / subplots[1])

    args = {
        'figsize':[fig_width_in,fig_height_in],
        'dpi': 72
    }

    return args

def generate_rdp_orders():
    dense = 1.07
    alpha_list = [int(dense ** i + 1) for i in range(int(math.floor(math.log(1000, dense))) + 1)]
    alpha_list = np.unique(alpha_list)
    return alpha_list


def F_dp(r, s, weights, values):
    # Initialize a DP table with (r+1) rows and (s+1) columns
    dp = [[0] * (s + 1) for _ in range(r + 1)]

    # Base case: when no items are selected, the value is 1
    for j in range(s + 1):
        dp[0][j] = 1

    # Fill the DP table
    for i in range(1, r + 1):
        ar = weights[i - 1]
        wr = values[i - 1]
        for j in range(s + 1):
            if ar <= j:
                dp[i][j] = dp[i - 1][j] + wr * dp[i - 1][j - ar]
            else:
                dp[i][j] = dp[i - 1][j]

    return dp[r][s]


def calculate_optcomp(k, epsilons, deltas, a_g, eps_0, delta_g, a_i):
    epsilon_g = a_g * eps_0

    # compute left_side_result
    # Calculate the product (1 + exp(epsilon_i))
    product_term = np.prod([1 + np.exp(e) for e in epsilons])

    # compute F(k,B)
    B = (np.sum(a_i) - a_g) // 2

    values_1 = [np.exp(-e) for e in epsilons]
    F_1 = F_dp(k, B, a_i, values_1)

    values_2 = [np.exp(e) for e in epsilons]
    F_2 = F_dp(k, B, a_i, values_2)

    sum_term = np.prod([np.exp(e) for e in epsilons]) * F_1 - np.exp(epsilon_g) * F_2

    # Divide by product term
    left_side_result = sum_term / product_term

    # compute right_side_result
    # Calculate the product (1 - delta)
    product_term_ = np.prod([1 - d for d in deltas])

    right_side_result = 1 - ((1 - delta_g) / product_term_)

    return left_side_result - right_side_result


def binary_search_epsilon_g(eps_0, k, epsilons, deltas, delta_g, a_i):
    a, b = 1, sum(a_i)  # Initial bounds
    i = 0
    while b >= a:
        i += 1
        m = (a + b) // 2
        result = calculate_optcomp(k, epsilons, deltas, m, eps_0, delta_g, a_i)
        if result < 0:
            b = m - 1
        elif result > 0:
            a = m + 1
        else:
            return m, m * eps_0
    return ((a + b) // 2 + 1), ((a + b) // 2 + 1) * eps_0


def compute_privacy_cost_one_step(noise_multiplier, sample_rate, delta):
    """compute the privacy cost of a step"""
    eps = math.sqrt(2 * math.log(1.25 / delta)) / noise_multiplier
    eps = np.log(1 + sample_rate * (np.exp(eps) - 1))
    delta = delta * sample_rate
    return eps, delta

def compute_privacy_cost_all_step(rounds,
                                  steps,
                                  recover_rounds,
                                  recover_steps,
                                  initial_noise_multiplier,
                                  sample_rate,
                                  delta,
                                  noise_config):
    """compute the privacy cost of all step"""
    privacy_costs = []
    deltas = []
    if noise_config['type'] == 'CN':
        eps, delta_ = compute_privacy_cost_one_step(initial_noise_multiplier, sample_rate, delta)
        privacy_costs.extend([eps] * (int(rounds * steps + recover_rounds * recover_steps)))
        deltas.extend([delta_] * (int(rounds * steps + recover_rounds * recover_steps)))
    elif noise_config['type'] == 'SN':
        eps, delta_ = compute_privacy_cost_one_step(initial_noise_multiplier, sample_rate, delta)
        privacy_costs.extend([eps] * (int(rounds * steps)))
        deltas.extend([delta_] * (int(rounds * steps)))
        sigma = initial_noise_multiplier * noise_config['decay_rate']
        eps, delta_ = compute_privacy_cost_one_step(sigma, sample_rate, delta)
        privacy_costs.extend([eps] * (int(recover_rounds * recover_steps)))
        deltas.extend([delta_] * (int(recover_rounds * recover_steps)))
    elif noise_config['type'] == 'IPLN':
        for i in range(int(rounds * steps + recover_rounds * recover_steps)):
            sigma = initial_noise_multiplier / (1 + i) ** noise_config['decay_rate']
            eps, delta_ = compute_privacy_cost_one_step(sigma, sample_rate, delta)
            privacy_costs.append(eps)
            deltas.append(delta_)
    else:
        raise ValueError("The noise type should be chosen from 'CN','SN','IPLN'.")
    return privacy_costs, deltas

def get_privacy_spent(privacy_costs, deltas, delta_g, eta):
    """compute the global privacy loss"""
    eps_mean = sum(privacy_costs) / len(privacy_costs)
    beta = eta / (len(privacy_costs) * (1 + eps_mean) + 1)
    eps_0 = np.log(1 + beta)
    a = []
    eps_pie = []
    for eps_i in privacy_costs:
        a_i = math.ceil(eps_i * (1 / beta + 1))
        eps_i_pie = eps_0 * a_i
        a.append(a_i)
        eps_pie.append(eps_i_pie)
    a_g, epsilon_g = binary_search_epsilon_g(eps_0, len(privacy_costs), eps_pie,
                                             deltas, delta_g, a)
    return a_g, epsilon_g


MAX_SIGMA = 1e6

def get_noise_multiplier_with_fed_dp(
        target_epsilon: float,
        rounds: int = 50,
        steps: int = 5,
        recover_rounds: int = 25,
        recover_steps: int = 2,
        sample_rate: float = 0.25,
        delta: float = 0.001,
        delta_g: float = 0.1,
        eta: float = 0.5,
        noise_config: dict = None,
        eps_tolerance: float = 0.1,
        noise_tolerance: float = 0.01
) -> float:
    r"""
    Computes the initial noise multiplier $sigma_0$ to reach a total budget at the end of iterations
    """

    eps_high = float("inf")

    sigma_low, sigma_high = 0, 5

    while eps_high > target_epsilon:
        sigma_high = 2 * sigma_high
        privacy_costs, deltas = compute_privacy_cost_all_step(
            rounds=rounds,
            steps=steps,
            recover_rounds=recover_rounds,
            recover_steps=recover_steps,
            initial_noise_multiplier=sigma_high,
            sample_rate=sample_rate,
            delta=delta,
            noise_config=noise_config
        )
        _, eps_high = get_privacy_spent(privacy_costs, deltas, delta_g, eta)
        if sigma_high > MAX_SIGMA:
            raise ValueError("The privacy budget is too low.")

    while target_epsilon - eps_high > eps_tolerance and sigma_high - sigma_low > noise_tolerance:
        sigma = (sigma_low + sigma_high) / 2
        privacy_costs, deltas = compute_privacy_cost_all_step(
            rounds=rounds,
            steps=steps,
            recover_rounds=recover_rounds,
            recover_steps=recover_steps,
            initial_noise_multiplier=sigma,
            sample_rate=sample_rate,
            delta=delta,
            noise_config=noise_config
        )
        _, eps_g = get_privacy_spent(privacy_costs, deltas, delta_g, eta)

        if eps_g < target_epsilon:
            sigma_high = sigma
            eps_high = eps_g
        else:
            sigma_low = sigma

    return sigma_high

In [12]:
# Settings
rounds = 50
steps = 2
budgets = np.arange(1.0, 30.0, 0.1)
client_rate = 1.0
sample_rate = 0.01
delta = 0.0001
delta_g = 0.01
eta = 0.5

In [7]:
# Compute the initial noise multiplier w.r.t. privacy budget of IPLN
example = []
for b in budgets:
    noise_multiplier = get_noise_multiplier_with_fed_dp(
        target_epsilon=b,
        rounds=rounds,
        steps=steps,
        recover_rounds=0,
        recover_steps=0,
        sample_rate=sample_rate,
        delta=delta,
        delta_g=delta_g,
        eta=eta,
        noise_config= {'type': 'IPLN', 'decay_rate': 0.5}
    )
    example.append(noise_multiplier)

In [None]:
# Curvefitting
from scipy.optimize import curve_fit
from sklearn.metrics import r2_score
import matplotlib.pyplot as plt
import matplotlib
all_popts, all_r2 = [], []
fit_func = {
    'liner1':(lambda x, a, b: a*x + b, lambda a, b: f"{a:.2f}" + r"$\varepsilon$" + f"+{b:.2f}", [1.0, 1.0]),
    'liner2':(lambda x, a, b, c: a*x**2 + b*x + c, lambda a, b, c: f"{a:.2f}" + r"$\varepsilon^2$" + f"{b:.2f}"+ r"$\varepsilon$" + f"+{c:.2f}", [1.0, 1.0, 1.0]),
    'exp': (lambda x, a, b, c: a * np.exp(b * x) + c, lambda a, b, c: f"{a:.2f}" + f"$e^" + r"{"+ f"{b:.2f}" + r"\varepsilon}$" + f"+{c:.2f}", [10.0, 0.0, 1.0]),
    'log': (lambda x, a, b: a * np.log(x) + b, lambda a, b: f"{a:.2f}log("+ r"$\varepsilon$" + f")+{b:.2f}", [1.0, 1.0]),
    'law': (lambda x, a, b, c: a * x**b + c, lambda a, b, c: f"{a:.2f}"+ r"$\varepsilon^{" + f"{b:.2f}" + r"}$" + f"+{c:.2f}", [5.0, -1.0, 1.0]),
}

keys = fit_func.keys()

popts_dict = dict(zip(keys, [0]*len(keys)))
r2_dict = dict(zip(keys, [0]*len(keys)))
for key, (func, func_str, p0) in fit_func.items():
    popt, _ = curve_fit(func, budgets, example, p0=p0, maxfev=2000)
    r2 = r2_score(example, func(budgets, *popt))
    popts_dict[key] = popt
    r2_dict[key] = r2
    print(f'The R-Squared value of the best-fit curve {func_str(*popt)}:', r2)
all_popts.append(popts_dict)
all_r2.append(r2_dict)


# Plot
matplotlib.rcParams['pdf.fonttype'] = 42
matplotlib.rcParams['ps.fonttype'] = 42
matplotlib.rcParams.update({'font.family': 'serif',
                            'font.serif': 'Times new Roman',})
legend_style = dict(prop={'style': 'normal', 'size': 21, 'weight': "normal"})
label_style = dict(fontdict={'family': 'serif', 'fontname': 'Times new Roman', 'size': 30, 'weight': "normal"}, labelpad=8)
tick_style = dict(axis="both", direction="in", labelsize=28)
title_style = dict(fontdict={'family': 'serif', 'fontname': 'Times new Roman', 'size': 28, 'weight': "bold"})

linestyles =['-.',(0, (2, 1, 8, 1)),'--',':','-']

plt.close('all')
fig, ax = plt.subplots(1, 1, **set_size(700, ratio=0.73))
idx = [1, 7, 16, 26, 39, 54, 69, 84, 99, 114, 129, 144, 159, 174, 189, 204, 219, 234, 249, 264, 277, 289]
e = [example[i] for i in idx]
b = [budgets[i] for i in idx]
ax.scatter(
    x=b,
    y=e,
    s=180,
    label=f'Observations\n' + r'($\varepsilon, n_0$)',
    color='grey',
)
for i, (key, (func, func_str, p0)) in enumerate(fit_func.items()):
    ax.plot(budgets, func(budgets, *all_popts[0][key]),
            label=func_str(*all_popts[0][key])+f"\n($R^2$={all_r2[0][key]:5.4f})",
            linestyle=linestyles[i], linewidth=4.5)
ax.legend( loc='upper right',
           ncol=2,
           bbox_to_anchor=(1.01, 1.015),
           columnspacing=0.5,
           handletextpad=0.3,
           **legend_style)
ax.set_ylim(6.5, 19)
ax.set_xlabel(r'Privacy budget $\varepsilon$', **label_style)
ax.set_ylabel(r'Initial noise multiplier $\sigma_0$', **label_style)
ax.tick_params(**tick_style)
ax.grid(linestyle=':', color='0.6')
plt.show()

In [13]:
examples = {
    'Constant': [],
    'IPLN $\gamma$ = 0.1': [],
    'IPLN $\gamma$ = 0.2': [],
    'IPLN $\gamma$ = 0.3': [],
    'IPLN $\gamma$ = 0.4': [],
    'IPLN $\gamma$ = 0.5': [],
}
for key, value in examples.items():
    if key == 'Constant':
        for b in budgets:
            noise_multiplier = get_noise_multiplier_with_fed_dp(
                target_epsilon=b,
                rounds=rounds,
                steps=steps,
                recover_rounds=0,
                recover_steps=0,
                sample_rate=sample_rate,
                delta=delta,
                delta_g=delta_g,
                eta=eta,
                noise_config= {'type': 'CN'}
            )
            value.append(noise_multiplier)
    elif 'IPLN' in key:
        gamma = float(key.split('=')[-1].strip())
        for b in budgets:
            noise_multiplier = get_noise_multiplier_with_fed_dp(
                target_epsilon=b,
                rounds=rounds,
                steps=steps,
                recover_rounds=0,
                recover_steps=0,
                sample_rate=sample_rate,
                delta=delta,
                delta_g=delta_g,
                eta=eta,
                noise_config= {'type': 'IPLN', 'decay_rate': gamma}
            )
            value.append(noise_multiplier)

In [None]:
# Various decay rate $\gamma$
from scipy.optimize import curve_fit
from sklearn.metrics import r2_score
import matplotlib.pyplot as plt
import matplotlib

func = lambda x, a, b, c: a * x**b + c
func_str = lambda a, b, c: f"{a:.2f}"+ r"$\varepsilon^{" + f"{b:.2f}" + r"}$" + f"+{c:.2f}"
p0 = [5.0, -1.0, 1.0]

res_dict = dict()
for name, exam in examples.items():
    popt, _ = curve_fit(func, budgets, exam, p0=p0, maxfev=2000)
    r2 = r2_score(exam, func(budgets, *popt))
    res_dict[name] = (popt, r2)
    print(f'{name}: The R-Squared value of the best-fit curve {func_str(*popt)}:', r2)

# Plot
matplotlib.rcParams['pdf.fonttype'] = 42
matplotlib.rcParams['ps.fonttype'] = 42
matplotlib.rcParams.update({'font.family': 'serif',
                            'font.serif': 'Times new Roman',})
legend_style = dict(prop={'style': 'normal', 'size': 21, 'weight': "normal"})
label_style = dict(fontdict={'family': 'serif', 'fontname': 'Times new Roman', 'size': 30, 'weight': "normal"}, labelpad=8)
tick_style = dict(axis="both", direction="in", labelsize=28)
title_style = dict(fontdict={'family': 'serif', 'fontname': 'Times new Roman', 'size': 28, 'weight': "bold"})

linestyles =['-.',(0, (4, 2, 10, 2)),'--',':',(0, (2, 1, 8, 1)),'-']

plt.close('all')
fig, ax = plt.subplots(1, 1, **set_size(700, ratio=0.73))
idx = [1, 7, 16, 26, 39, 54, 69, 84, 99, 114, 129, 144, 159, 174, 189, 204, 219, 234, 249, 264, 277, 289]
for exam in examples.values():
    e = [exam[i] for i in idx]
    b = [budgets[i] for i in idx]
    ax.scatter(
        x=b,
        y=e,
        s=180,
    )
for i, (key, (popt, r2)) in enumerate(res_dict.items()):
    print(popt)
    ax.plot(budgets, func(budgets, *popt),
            label=key+f"\n($R^2$={r2:5.4f})",
            linestyle='-', linewidth=4.5)
ax.legend( loc='upper right',
           ncol=2,
           bbox_to_anchor=(1.015, 1.02),
           columnspacing=0.5,
           handletextpad=0.3,
           handlelength=1.4,
           **legend_style)
ax.set_yticks([2, 6, 10, 14, 18])
ax.set_xlabel(r'Privacy budget $\varepsilon$', **label_style)
ax.set_ylabel(r'Initial noise multiplier $\sigma_0$', **label_style)
ax.tick_params(**tick_style)
ax.grid(linestyle=':', color='0.6')
plt.show()