Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix setting partition selection field in utility analysis result #511

Merged
merged 3 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions analysis/tests/utility_analysis_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,11 @@ def test_multi_parameters(self):

multi_param = analysis.MultiParameterConfiguration(
max_partitions_contributed=[1, 2],
max_contributions_per_partition=[1, 2])
max_contributions_per_partition=[1, 2],
partition_selection_strategy=[
pipeline_dp.PartitionSelectionStrategy.TRUNCATED_GEOMETRIC,
pipeline_dp.PartitionSelectionStrategy.GAUSSIAN_THRESHOLDING
])

# Input collection has 1 privacy id, which contributes to 2 partitions
# 1 and 2 times correspondingly.
Expand All @@ -253,8 +257,6 @@ def test_multi_parameters(self):
partition_extractor=lambda x: x[1],
value_extractor=lambda x: 0)

public_partitions = ["pk0", "pk1"]

output, _ = analysis.perform_utility_analysis(
col=input,
backend=pipeline_dp.LocalBackend(),
Expand All @@ -264,7 +266,6 @@ def test_multi_parameters(self):
aggregate_params=aggregate_params,
multi_param_configuration=multi_param),
data_extractors=data_extractors,
public_partitions=public_partitions,
)

utility_reports = list(output)
Expand All @@ -273,16 +274,16 @@ def test_multi_parameters(self):
self.assertLen(utility_reports, 2) # one report per each configuration.

# Check the parameter configuration
expected_noise_std = [3.02734375, 8.56262117843085]
expected_noise_std = [5.9765625, 16.904271487740903]
expected_l0_error = [-0.5, 0]
expected_partition_info = metrics.PartitionsInfo(
public_partitions=True,
num_dataset_partitions=2,
num_non_public_partitions=0,
num_empty_partitions=0)
for i_configuration, report in enumerate(utility_reports):
self.assertEqual(report.configuration_index, i_configuration)
self.assertEqual(report.partitions_info, expected_partition_info)
self.assertFalse(report.partitions_info.public_partitions)
self.assertEqual(report.partitions_info.num_dataset_partitions, 2)
self.assertEqual(report.partitions_info.num_dataset_partitions, 2)
self.assertEqual(
report.partitions_info.strategy,
multi_param.partition_selection_strategy[i_configuration])
self.assertLen(report.metric_errors, 1) # metrics for COUNT
errors = report.metric_errors[0]
self.assertEqual(errors.metric, pipeline_dp.Metrics.COUNT)
Expand Down
31 changes: 16 additions & 15 deletions analysis/utility_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,21 +115,6 @@ def perform_utility_analysis(
"Compute cross-partition metrics")
# ((configuration_index, bucket), UtilityReport)

if public_partitions is None:
# Add partition selection strategy for private partitions.
strategies = data_structures.get_partition_selection_strategy(options)

def add_partition_selection_strategy(report: metrics.UtilityReport):
# Beam does not allow to change input arguments in map, so copy it.
report = copy.deepcopy(report)
report.partitions_info.strategy = strategies[
report.configuration_index]
return report

cross_partition_metrics = backend.map_values(
cross_partition_metrics, add_partition_selection_strategy,
"Add Partition Selection Strategy")

cross_partition_metrics = backend.map_tuple(
cross_partition_metrics, lambda key, value: (key[0], (key[1], value)),
"Rekey")
Expand All @@ -140,6 +125,22 @@ def add_partition_selection_strategy(report: metrics.UtilityReport):
# (configuration_index, Iterable[(bucket, UtilityReport)])
result = backend.map_tuple(cross_partition_metrics, _group_utility_reports,
"Group utility reports")
if public_partitions is None:
# Add partition selection strategy for private partitions.
strategies = data_structures.get_partition_selection_strategy(options)

def add_partition_selection_strategy(report: metrics.UtilityReport):
# Beam does not allow to change input arguments in map, so copy it.
report = copy.deepcopy(report)
strategy = strategies[report.configuration_index]
report.partitions_info.strategy = strategy
for bin in report.utility_report_histogram:
bin.report.partitions_info.strategy = strategy
return report

result = backend.map(result, add_partition_selection_strategy,
"Add Partition Selection Strategy")
# result: (UtilityReport)
# result: (UtilityReport)
return result, per_partition_result

Expand Down
Loading