Skip to content

Commit

Permalink
Introduce boolean variable for enabling/disabling of cross partition …
Browse files Browse the repository at this point in the history
…contribution bounding (#515)
  • Loading branch information
dvadym committed May 17, 2024
1 parent 1333a00 commit afbb949
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 5 deletions.
6 changes: 6 additions & 0 deletions pipeline_dp/aggregate_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,11 @@ class AggregateParams:
is ignored when public partitions are used.
More details on pre-thresholding are in
https://github.com/google/differential-privacy/blob/main/common_docs/pre_thresholding.md
perform_cross_partition_contribution_bounding: whether to perform cross
partition contribution bounding.
Warning: turn off cross partition contribution bounding only when the
number of contributed partitions per privacy unit is already bounded
by max_partitions_contributed.
"""
metrics: List[Metric]
noise_kind: NoiseKind = NoiseKind.LAPLACE
Expand All @@ -254,6 +259,7 @@ class AggregateParams:
partition_selection_strategy: PartitionSelectionStrategy = PartitionSelectionStrategy.TRUNCATED_GEOMETRIC
pre_threshold: Optional[int] = None
post_aggregation_thresholding: bool = False
perform_cross_partition_contribution_bounding: bool = True

@property
def metrics_str(self) -> str:
Expand Down
55 changes: 53 additions & 2 deletions pipeline_dp/contribution_bounders.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@
from pipeline_dp import pipeline_backend
from pipeline_dp import sampling_utils

# TODO(dvadym):
# 1. rename ContributionBounder -> ContributionSampler, because all those
# classes do contribution bounding only by sampling.
# 2. Introduce L0/Linf/L1 sampling in names (the current names are too long
# and not readable).


class ContributionBounder(abc.ABC):
"""Interface for objects which perform contribution bounding."""
Expand Down Expand Up @@ -76,8 +82,8 @@ def bound_contributions(self, col, params, backend, report_generator,
"Sample per (privacy_id, partition_key)")
report_generator.add_stage(
f"Per-partition contribution bounding: for each privacy_id and each"
f"partition, randomly select max(actual_contributions_per_partition"
f", {max_contributions_per_partition}) contributions.")
f" partition, randomly select max(actual_contributions_per_partitio"
f"n, {max_contributions_per_partition}) contributions.")
# ((privacy_id, partition_key), [value])
col = backend.map_values(
col, aggregate_fn,
Expand Down Expand Up @@ -195,6 +201,51 @@ def rekey_per_privacy_id_per_partition_key(pid_pk_v_values):
"Apply aggregate_fn after cross-partition contribution bounding")


class LinfSampler(ContributionBounder):
"""Bounds the contribution of privacy_id per partition.
It ensures that each privacy_id contributes to each partition not more than
max_contributions_per_partition records (per-partition contribution
bounding), by performing sampling if needed.
"""

def bound_contributions(self, col, params, backend, report_generator,
aggregate_fn):
col = backend.map_tuple(
col, lambda pid, pk, v: ((pid, pk), v),
"Rekey to ((privacy_id, partition_key), value)")

col = backend.sample_fixed_per_key(
col, params.max_contributions_per_partition,
"Sample per (privacy_id, partition_key)")
# ((privacy_id, partition_key), value)

report_generator.add_stage(
f"Per-partition contribution bounding: for each privacy_id and each"
f" partition, randomly select max(actual_contributions_per_partitio"
f"n, {params.max_contributions_per_partition}) contributions.")

return backend.map_values(
col, aggregate_fn,
"Apply aggregate_fn after cross-partition contribution bounding")


class NoOpSampler(ContributionBounder):
"""Does no sampling."""

def bound_contributions(self, col, params, backend, report_generator,
aggregate_fn):
col = backend.map_tuple(
col, lambda pid, pk, v: ((pid, pk), v),
"Rekey to ((privacy_id, partition_key), value)")
# ((privacy_id, partition_key), value)

col = backend.group_by_key(col, "Group by (privacy_id, partition_key)")
# ((privacy_id, partition_key), [value])

return backend.map_values(col, aggregate_fn, "Apply aggregate_fn")


def collect_values_per_partition_key_per_privacy_id(
col, backend: pipeline_backend.PipelineBackend):
"""Collects values into a list for each privacy_id and partition_key.
Expand Down
14 changes: 11 additions & 3 deletions pipeline_dp/dp_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,18 @@ def _create_contribution_bounder(
return \
contribution_bounders.SamplingPerPrivacyIdContributionBounder(
)
if expects_per_partition_sampling:
return contribution_bounders.SamplingCrossAndPerPartitionContributionBounder(
if params.perform_cross_partition_contribution_bounding:
if expects_per_partition_sampling:
return contribution_bounders.SamplingCrossAndPerPartitionContributionBounder(
)
return contribution_bounders.SamplingCrossPartitionContributionBounder(
)
return contribution_bounders.SamplingCrossPartitionContributionBounder()
# no cross partition contribution
if expects_per_partition_sampling:
return contribution_bounders.LinfSampler()
# No sampling, but combiners themselves do per partition contribution
# bounding.
return contribution_bounders.NoOpSampler()

def _extract_columns(self, col,
data_extractors: pipeline_dp.DataExtractors):
Expand Down
59 changes: 59 additions & 0 deletions tests/contribution_bounders_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
CrossAndPerPartitionContributionParams = collections.namedtuple(
"CrossAndPerPartitionContributionParams",
["max_partitions_contributed", "max_contributions_per_partition"])
PerPartitionContributionParams = collections.namedtuple(
"PerPartitionContributionParams", ["max_contributions_per_partition"])

aggregate_fn = lambda input_value: (len(input_value), np.sum(input_value),
np.sum(np.square(input_value)))
Expand Down Expand Up @@ -150,3 +152,60 @@ def test_contribution_bounding_empty_col(self):
bound_result = self._run_contribution_bounding(input, max_contributions)

self.assertEmpty(bound_result)


class LinfSamplerTest(parameterized.TestCase):

def _run_sampling(self, input, max_contributions_per_partition):
params = PerPartitionContributionParams(max_contributions_per_partition)

bounder = contribution_bounders.LinfSampler()
return list(
bounder.bound_contributions(input, params,
pipeline_dp.LocalBackend(),
_create_report_generator(),
lambda x: x))

def test_samping_applied(self):
input = [('pid1', 'pk1', 1), ('pid1', 'pk1', 2), ('pid2', 'pk1', 3),
('pid2', 'pk1', 4)]
max_contributions_per_partition = 1

bound_result = self._run_sampling(input,
max_contributions_per_partition)
bound_result = dict(bound_result)

# {(privacy_id, partition_key), [values])
self.assertLen(bound_result, 2)
self.assertLen(bound_result[('pid1', 'pk1')], 1)
self.assertLen(bound_result[('pid2', 'pk1')], 1)

def test_more_contributions_than_bound_nothing_dropped(self):
input = [('pid1', 'pk1', 1), ('pid1', 'pk1', 2), ('pid1', 'pk1', 3)]
max_contributions_per_partition = 3

bound_result = self._run_sampling(input,
max_contributions_per_partition)

self.assertEqual(bound_result, [(('pid1', 'pk1'), [1, 2, 3])])

def test_empty_col(self):
self.assertEmpty(
self._run_sampling([], max_contributions_per_partition=1))


class NoOpContributionBounderTest(parameterized.TestCase):

def test_sampling_applied(self):
input = [('pid1', 'pk1', 1), ('pid1', 'pk1', 2), ('pid2', 'pk1', 3),
('pid3', 'pk2', 4)]
bounder = contribution_bounders.NoOpSampler()
bound_result = bounder.bound_contributions(
input,
params=(),
backend=pipeline_dp.LocalBackend(),
report_generator=_create_report_generator(),
aggregate_fn=lambda x: x)
self.assertEqual(list(bound_result), [(('pid1', 'pk1'), [1, 2]),
(('pid2', 'pk1'), [3]),
(('pid3', 'pk2'), [4])])
54 changes: 54 additions & 0 deletions tests/dp_engine_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,60 @@ def test_aggregate_computation_graph_per_partition_bounding(
unittest.mock.ANY,
unittest.mock.ANY)

@patch('pipeline_dp.contribution_bounders.LinfSampler.bound_contributions')
def test_aggregate_computation_graph_only_linf_sampling(
self, mock_bound_contributions):
# Arrange
aggregate_params = pipeline_dp.AggregateParams(
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
metrics=[pipeline_dp.Metrics.SUM],
min_value=0,
max_value=1,
max_partitions_contributed=1,
max_contributions_per_partition=1,
perform_cross_partition_contribution_bounding=False)

engine = self._create_dp_engine_default()
mock_bound_contributions.return_value = []

engine.aggregate(col=[0],
params=aggregate_params,
data_extractors=self._get_default_extractors())

# Assert
mock_bound_contributions.assert_called_with(unittest.mock.ANY,
aggregate_params,
unittest.mock.ANY,
unittest.mock.ANY,
unittest.mock.ANY)

@patch('pipeline_dp.contribution_bounders.NoOpSampler.bound_contributions')
def test_aggregate_computation_graph_no_sampling_for_sum_when_no_cross_partition(
self, mock_bound_contributions):
# Arrange
aggregate_params = pipeline_dp.AggregateParams(
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
metrics=[pipeline_dp.Metrics.SUM],
min_sum_per_partition=0,
max_sum_per_partition=1,
max_partitions_contributed=1,
max_contributions_per_partition=1,
perform_cross_partition_contribution_bounding=False)

engine = self._create_dp_engine_default()
mock_bound_contributions.return_value = []

engine.aggregate(col=[0],
params=aggregate_params,
data_extractors=self._get_default_extractors())

# Assert
mock_bound_contributions.assert_called_with(unittest.mock.ANY,
aggregate_params,
unittest.mock.ANY,
unittest.mock.ANY,
unittest.mock.ANY)

@patch('pipeline_dp.dp_engine.DPEngine._drop_partitions',)
def test_aggregate_no_partition_filtering_public_partitions(
self, mock_drop_partitions):
Expand Down

0 comments on commit afbb949

Please sign in to comment.