diff --git a/pipeline_dp/aggregate_params.py b/pipeline_dp/aggregate_params.py index ff777f24..7d78b3ee 100644 --- a/pipeline_dp/aggregate_params.py +++ b/pipeline_dp/aggregate_params.py @@ -234,6 +234,11 @@ class AggregateParams: is ignored when public partitions are used. More details on pre-thresholding are in https://github.com/google/differential-privacy/blob/main/common_docs/pre_thresholding.md + perform_cross_partition_contribution_bounding: whether to perform cross + partition contribution bounding. + Warning: turn off cross partition contribution bounding only when the + number of contributed partitions per privacy unit is already bounded + by max_partitions_contributed. """ metrics: List[Metric] noise_kind: NoiseKind = NoiseKind.LAPLACE @@ -254,6 +259,7 @@ class AggregateParams: partition_selection_strategy: PartitionSelectionStrategy = PartitionSelectionStrategy.TRUNCATED_GEOMETRIC pre_threshold: Optional[int] = None post_aggregation_thresholding: bool = False + perform_cross_partition_contribution_bounding: bool = True @property def metrics_str(self) -> str: diff --git a/pipeline_dp/contribution_bounders.py b/pipeline_dp/contribution_bounders.py index 2ac2eed8..96de7ffb 100644 --- a/pipeline_dp/contribution_bounders.py +++ b/pipeline_dp/contribution_bounders.py @@ -21,6 +21,12 @@ from pipeline_dp import pipeline_backend from pipeline_dp import sampling_utils +# TODO(dvadym): +# 1. rename ContributionBounder -> ContributionSampler, because all those +# classes do contribution bounding only by sampling. +# 2. Introduce L0/Linf/L1 sampling in names (the current names are too long +# and not readable). + class ContributionBounder(abc.ABC): """Interface for objects which perform contribution bounding.""" @@ -76,8 +82,8 @@ def bound_contributions(self, col, params, backend, report_generator, "Sample per (privacy_id, partition_key)") report_generator.add_stage( f"Per-partition contribution bounding: for each privacy_id and each" - f"partition, randomly select max(actual_contributions_per_partition" - f", {max_contributions_per_partition}) contributions.") + f" partition, randomly select max(actual_contributions_per_partitio" + f"n, {max_contributions_per_partition}) contributions.") # ((privacy_id, partition_key), [value]) col = backend.map_values( col, aggregate_fn, @@ -195,6 +201,51 @@ def rekey_per_privacy_id_per_partition_key(pid_pk_v_values): "Apply aggregate_fn after cross-partition contribution bounding") +class LinfSampler(ContributionBounder): + """Bounds the contribution of privacy_id per partition. + + It ensures that each privacy_id contributes to each partition not more than + max_contributions_per_partition records (per-partition contribution + bounding), by performing sampling if needed. + """ + + def bound_contributions(self, col, params, backend, report_generator, + aggregate_fn): + col = backend.map_tuple( + col, lambda pid, pk, v: ((pid, pk), v), + "Rekey to ((privacy_id, partition_key), value)") + + col = backend.sample_fixed_per_key( + col, params.max_contributions_per_partition, + "Sample per (privacy_id, partition_key)") + # ((privacy_id, partition_key), value) + + report_generator.add_stage( + f"Per-partition contribution bounding: for each privacy_id and each" + f" partition, randomly select max(actual_contributions_per_partitio" + f"n, {params.max_contributions_per_partition}) contributions.") + + return backend.map_values( + col, aggregate_fn, + "Apply aggregate_fn after cross-partition contribution bounding") + + +class NoOpSampler(ContributionBounder): + """Does no sampling.""" + + def bound_contributions(self, col, params, backend, report_generator, + aggregate_fn): + col = backend.map_tuple( + col, lambda pid, pk, v: ((pid, pk), v), + "Rekey to ((privacy_id, partition_key), value)") + # ((privacy_id, partition_key), value) + + col = backend.group_by_key(col, "Group by (privacy_id, partition_key)") + # ((privacy_id, partition_key), [value]) + + return backend.map_values(col, aggregate_fn, "Apply aggregate_fn") + + def collect_values_per_partition_key_per_privacy_id( col, backend: pipeline_backend.PipelineBackend): """Collects values into a list for each privacy_id and partition_key. diff --git a/pipeline_dp/dp_engine.py b/pipeline_dp/dp_engine.py index 346dc7bf..902e4dff 100644 --- a/pipeline_dp/dp_engine.py +++ b/pipeline_dp/dp_engine.py @@ -386,10 +386,18 @@ def _create_contribution_bounder( return \ contribution_bounders.SamplingPerPrivacyIdContributionBounder( ) - if expects_per_partition_sampling: - return contribution_bounders.SamplingCrossAndPerPartitionContributionBounder( + if params.perform_cross_partition_contribution_bounding: + if expects_per_partition_sampling: + return contribution_bounders.SamplingCrossAndPerPartitionContributionBounder( + ) + return contribution_bounders.SamplingCrossPartitionContributionBounder( ) - return contribution_bounders.SamplingCrossPartitionContributionBounder() + # no cross partition contribution + if expects_per_partition_sampling: + return contribution_bounders.LinfSampler() + # No sampling, but combiners themselves do per partition contribution + # bounding. + return contribution_bounders.NoOpSampler() def _extract_columns(self, col, data_extractors: pipeline_dp.DataExtractors): diff --git a/tests/contribution_bounders_test.py b/tests/contribution_bounders_test.py index 494bfbe0..eafb81df 100644 --- a/tests/contribution_bounders_test.py +++ b/tests/contribution_bounders_test.py @@ -24,6 +24,8 @@ CrossAndPerPartitionContributionParams = collections.namedtuple( "CrossAndPerPartitionContributionParams", ["max_partitions_contributed", "max_contributions_per_partition"]) +PerPartitionContributionParams = collections.namedtuple( + "PerPartitionContributionParams", ["max_contributions_per_partition"]) aggregate_fn = lambda input_value: (len(input_value), np.sum(input_value), np.sum(np.square(input_value))) @@ -150,3 +152,60 @@ def test_contribution_bounding_empty_col(self): bound_result = self._run_contribution_bounding(input, max_contributions) self.assertEmpty(bound_result) + + +class LinfSamplerTest(parameterized.TestCase): + + def _run_sampling(self, input, max_contributions_per_partition): + params = PerPartitionContributionParams(max_contributions_per_partition) + + bounder = contribution_bounders.LinfSampler() + return list( + bounder.bound_contributions(input, params, + pipeline_dp.LocalBackend(), + _create_report_generator(), + lambda x: x)) + + def test_samping_applied(self): + input = [('pid1', 'pk1', 1), ('pid1', 'pk1', 2), ('pid2', 'pk1', 3), + ('pid2', 'pk1', 4)] + max_contributions_per_partition = 1 + + bound_result = self._run_sampling(input, + max_contributions_per_partition) + bound_result = dict(bound_result) + + # {(privacy_id, partition_key), [values]) + self.assertLen(bound_result, 2) + self.assertLen(bound_result[('pid1', 'pk1')], 1) + self.assertLen(bound_result[('pid2', 'pk1')], 1) + + def test_more_contributions_than_bound_nothing_dropped(self): + input = [('pid1', 'pk1', 1), ('pid1', 'pk1', 2), ('pid1', 'pk1', 3)] + max_contributions_per_partition = 3 + + bound_result = self._run_sampling(input, + max_contributions_per_partition) + + self.assertEqual(bound_result, [(('pid1', 'pk1'), [1, 2, 3])]) + + def test_empty_col(self): + self.assertEmpty( + self._run_sampling([], max_contributions_per_partition=1)) + + +class NoOpContributionBounderTest(parameterized.TestCase): + + def test_sampling_applied(self): + input = [('pid1', 'pk1', 1), ('pid1', 'pk1', 2), ('pid2', 'pk1', 3), + ('pid3', 'pk2', 4)] + bounder = contribution_bounders.NoOpSampler() + bound_result = bounder.bound_contributions( + input, + params=(), + backend=pipeline_dp.LocalBackend(), + report_generator=_create_report_generator(), + aggregate_fn=lambda x: x) + self.assertEqual(list(bound_result), [(('pid1', 'pk1'), [1, 2]), + (('pid2', 'pk1'), [3]), + (('pid3', 'pk2'), [4])]) diff --git a/tests/dp_engine_test.py b/tests/dp_engine_test.py index c713db60..6169ad9b 100644 --- a/tests/dp_engine_test.py +++ b/tests/dp_engine_test.py @@ -572,6 +572,60 @@ def test_aggregate_computation_graph_per_partition_bounding( unittest.mock.ANY, unittest.mock.ANY) + @patch('pipeline_dp.contribution_bounders.LinfSampler.bound_contributions') + def test_aggregate_computation_graph_only_linf_sampling( + self, mock_bound_contributions): + # Arrange + aggregate_params = pipeline_dp.AggregateParams( + noise_kind=pipeline_dp.NoiseKind.GAUSSIAN, + metrics=[pipeline_dp.Metrics.SUM], + min_value=0, + max_value=1, + max_partitions_contributed=1, + max_contributions_per_partition=1, + perform_cross_partition_contribution_bounding=False) + + engine = self._create_dp_engine_default() + mock_bound_contributions.return_value = [] + + engine.aggregate(col=[0], + params=aggregate_params, + data_extractors=self._get_default_extractors()) + + # Assert + mock_bound_contributions.assert_called_with(unittest.mock.ANY, + aggregate_params, + unittest.mock.ANY, + unittest.mock.ANY, + unittest.mock.ANY) + + @patch('pipeline_dp.contribution_bounders.NoOpSampler.bound_contributions') + def test_aggregate_computation_graph_no_sampling_for_sum_when_no_cross_partition( + self, mock_bound_contributions): + # Arrange + aggregate_params = pipeline_dp.AggregateParams( + noise_kind=pipeline_dp.NoiseKind.GAUSSIAN, + metrics=[pipeline_dp.Metrics.SUM], + min_sum_per_partition=0, + max_sum_per_partition=1, + max_partitions_contributed=1, + max_contributions_per_partition=1, + perform_cross_partition_contribution_bounding=False) + + engine = self._create_dp_engine_default() + mock_bound_contributions.return_value = [] + + engine.aggregate(col=[0], + params=aggregate_params, + data_extractors=self._get_default_extractors()) + + # Assert + mock_bound_contributions.assert_called_with(unittest.mock.ANY, + aggregate_params, + unittest.mock.ANY, + unittest.mock.ANY, + unittest.mock.ANY) + @patch('pipeline_dp.dp_engine.DPEngine._drop_partitions',) def test_aggregate_no_partition_filtering_public_partitions( self, mock_drop_partitions):