Skip to content

Commit

Permalink
Sum per partition histogram (#512)
Browse files Browse the repository at this point in the history
  • Loading branch information
dvadym committed Dec 4, 2023
1 parent dc8443d commit 39917cd
Show file tree
Hide file tree
Showing 5 changed files with 326 additions and 91 deletions.
20 changes: 11 additions & 9 deletions analysis/tests/parameter_tuning_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def test_find_candidate_parameters_maximum_number_of_candidates_is_respected_whe

mock_histograms = histograms.DatasetHistograms(mock_l0_histogram, None,
mock_linf_histogram,
None, None, None)
None, None, None, None)
parameters_to_tune = parameter_tuning.ParametersToTune(
max_partitions_contributed=True,
max_contributions_per_partition=True)
Expand All @@ -99,7 +99,7 @@ def test_find_candidate_parameters_more_candidates_for_l_0_when_not_so_many_l_in

mock_histograms = histograms.DatasetHistograms(mock_l0_histogram, None,
mock_linf_histogram,
None, None, None)
None, None, None, None)
parameters_to_tune = parameter_tuning.ParametersToTune(
max_partitions_contributed=True,
max_contributions_per_partition=True)
Expand Down Expand Up @@ -128,7 +128,7 @@ def test_find_candidate_parameters_more_candidates_for_l_inf_when_not_so_many_l_

mock_histograms = histograms.DatasetHistograms(mock_l0_histogram, None,
mock_linf_histogram,
None, None, None)
None, None, None, None)
parameters_to_tune = parameter_tuning.ParametersToTune(
max_partitions_contributed=True,
max_contributions_per_partition=True)
Expand Down Expand Up @@ -185,7 +185,8 @@ def test_find_candidate_parameters_count(self, max_value, max_candidates,
mock_l0_histogram.max_value = mock.Mock(return_value=max_value)

mock_histograms = histograms.DatasetHistograms(mock_l0_histogram, None,
None, None, None, None)
None, None, None, None,
None)
parameters_to_tune = parameter_tuning.ParametersToTune(
max_partitions_contributed=True,
max_contributions_per_partition=False)
Expand Down Expand Up @@ -257,7 +258,8 @@ def test_find_candidate_parameters_sum(self, bins, max_candidates,
mock_linf_sum_contributions_histogram = histograms.Histogram(
histograms.HistogramType.LINF_SUM_CONTRIBUTIONS, bins)
mock_histograms = histograms.DatasetHistograms(
None, None, None, mock_linf_sum_contributions_histogram, None, None)
None, None, None, mock_linf_sum_contributions_histogram, None, None,
None)
parameters_to_tune = parameter_tuning.ParametersToTune(
max_partitions_contributed=False,
min_sum_per_partition=False,
Expand Down Expand Up @@ -286,7 +288,7 @@ def test_find_candidate_parameters_both_l0_and_linf_sum_to_be_tuned(self):

mock_histograms = histograms.DatasetHistograms(
mock_l0_histogram, None, None,
mock_linf_sum_contributions_histogram, None, None)
mock_linf_sum_contributions_histogram, None, None, None)
parameters_to_tune = parameter_tuning.ParametersToTune(
max_partitions_contributed=True,
min_sum_per_partition=False,
Expand Down Expand Up @@ -332,7 +334,7 @@ def test_find_candidate_parameters_generate_linf(
histograms.HistogramType.LINF_CONTRIBUTIONS, bins=[])
mock_histograms = histograms.DatasetHistograms(mock_l0_histogram, None,
mock_linf_histogram,
None, None, None)
None, None, None, None)

mock_find_candidate_from_histogram.return_value = [1, 2]

Expand Down Expand Up @@ -538,7 +540,7 @@ def test_tune_params_validation(self, error_msg,
is_public_partitions: bool):
tune_options = _get_tune_options(metrics)
contribution_histograms = histograms.DatasetHistograms(
None, None, None, None, None, None)
None, None, None, None, None, None, None)
data_extractors = pipeline_dp.DataExtractors(
privacy_id_extractor=lambda _: 0, partition_extractor=lambda _: 0)
public_partitions = [1] if is_public_partitions else None
Expand All @@ -554,7 +556,7 @@ def test_tune_min_sum_per_partition_is_not_supported(self):
min_sum_per_partition=True,
max_sum_per_partition=True))
contribution_histograms = histograms.DatasetHistograms(
None, None, None, None, None, None)
None, None, None, None, None, None, None)
data_extractors = pipeline_dp.DataExtractors(
privacy_id_extractor=lambda _: 0, partition_extractor=lambda _: 0)
with self.assertRaisesRegex(
Expand Down
104 changes: 89 additions & 15 deletions pipeline_dp/dataset_histograms/computing_histograms.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from pipeline_dp import pipeline_backend, pipeline_functions
from pipeline_dp.dataset_histograms import histograms as hist

NUMBER_OF_BUCKETS_IN_LINF_SUM_CONTRIBUTIONS_HISTOGRAM = 10000
NUMBER_OF_BUCKETS_SUM_HISTOGRAM = 10000


def _to_bin_lower_upper_logarithmic(value: int) -> Tuple[int, int]:
Expand Down Expand Up @@ -201,6 +201,7 @@ def _list_to_contribution_histograms(
l0_contributions = l1_contributions = None
linf_contributions = linf_sum_contributions = None
count_per_partition = privacy_id_per_partition_count = None
sum_per_partition_histogram = None
for histogram in histograms:
if histogram.name == hist.HistogramType.L0_CONTRIBUTIONS:
l0_contributions = histogram
Expand All @@ -214,10 +215,14 @@ def _list_to_contribution_histograms(
count_per_partition = histogram
elif histogram.name == hist.HistogramType.COUNT_PRIVACY_ID_PER_PARTITION:
privacy_id_per_partition_count = histogram
elif histogram.name == hist.HistogramType.SUM_PER_PARTITION:
sum_per_partition_histogram = histogram

return hist.DatasetHistograms(l0_contributions, l1_contributions,
linf_contributions, linf_sum_contributions,
count_per_partition,
privacy_id_per_partition_count)
privacy_id_per_partition_count,
sum_per_partition_histogram)


def _to_dataset_histograms(histogram_list,
Expand Down Expand Up @@ -318,10 +323,10 @@ def _compute_linf_sum_contributions_histogram(
This histogram contains: the number of (privacy id, partition_key)-pairs
which have sum of values X_1, X_2, ..., X_n, where X_1 = min_sum,
X_n = one before max sum and n is equal to
NUMBER_OF_BUCKETS_IN_LINF_SUM_CONTRIBUTIONS_HISTOGRAM.
NUMBER_OF_BUCKETS_SUM_HISTOGRAM.
Args:
col: collection with elements (privacy_id, partition_key, value).
col: collection with elements ((privacy_id, partition_key), value).
backend: PipelineBackend to run operations on the collection.
Returns:
1 element collection, which contains the computed hist.Histogram.
Expand All @@ -332,8 +337,7 @@ def _compute_linf_sum_contributions_histogram(
col = backend.values(col, "Drop keys")
# col: (float)
col = backend.to_multi_transformable_collection(col)
lowers = _min_max_lowers(
col, NUMBER_OF_BUCKETS_IN_LINF_SUM_CONTRIBUTIONS_HISTOGRAM, backend)
lowers = _min_max_lowers(col, NUMBER_OF_BUCKETS_SUM_HISTOGRAM, backend)

return _compute_frequency_histogram_helper_with_lowers(
col, backend, hist.HistogramType.LINF_SUM_CONTRIBUTIONS, lowers)
Expand All @@ -355,11 +359,15 @@ def _min_max_lowers(col, number_of_buckets,
"""
min_max_values = pipeline_functions.min_max_elements(
backend, col, "Min and max value in dataset")

# min_max_values: 1 element collection with a pair (min, max)
return backend.map(
min_max_values, lambda min_max: np.linspace(min_max[0], min_max[1],
(number_of_buckets + 1)),
"map to lowers")
def generate_lowers(min_max: Tuple[float, float]) -> List[float]:
min_, max_ = min_max
if min_ == max_:
return [min_, min_]
return list(np.linspace(min_, max_, (number_of_buckets + 1)))

return backend.map(min_max_values, generate_lowers, "map to lowers")


def _compute_partition_count_histogram(
Expand Down Expand Up @@ -417,6 +425,34 @@ def _compute_partition_privacy_id_count_histogram(
col, backend, hist.HistogramType.COUNT_PRIVACY_ID_PER_PARTITION)


def _compute_partition_sum_histogram(col,
backend: pipeline_backend.PipelineBackend):
"""Computes histogram of sum per partition.
This histogram contains: the number of partition_keys which have sum of
values X_1, X_2, ..., X_n, where X_1 = min_sum, X_n = one before max sum and
n is equal to NUMBER_OF_BUCKETS_SUM_HISTOGRAM.
Args:
col: collection with elements ((privacy_id, partition_key), value).
backend: PipelineBackend to run operations on the collection.
Returns:
1 element collection, which contains the computed hist.Histogram.
"""

col = backend.map_tuple(col, lambda pid_pk, value: (pid_pk[1], value),
"Drop privacy id")
col = backend.sum_per_key(col, "Sum of contributions per partition")
# col: (pk, sum_per_partition)
col = backend.values(col, "Drop keys")
# col: (float)
col = backend.to_multi_transformable_collection(col)
lowers = _min_max_lowers(col, NUMBER_OF_BUCKETS_SUM_HISTOGRAM, backend)

return _compute_frequency_histogram_helper_with_lowers(
col, backend, hist.HistogramType.SUM_PER_PARTITION, lowers)


def compute_dataset_histograms(col, data_extractors: pipeline_dp.DataExtractors,
backend: pipeline_backend.PipelineBackend):
"""Computes dataset histograms.
Expand Down Expand Up @@ -464,13 +500,16 @@ def compute_dataset_histograms(col, data_extractors: pipeline_dp.DataExtractors,
partition_count_histogram = _compute_partition_count_histogram(col, backend)
partition_privacy_id_count_histogram = _compute_partition_privacy_id_count_histogram(
col_distinct, backend)
partition_sum_histogram = _compute_partition_sum_histogram(
col_with_values, backend)
# all histograms are 1 element collections which contains ContributionHistogram

# Combine histograms to histograms.DatasetHistograms.
return _to_dataset_histograms([
l0_contributions_histogram, l1_contributions_histogram,
linf_contributions_histogram, linf_sum_contributions_histogram,
partition_count_histogram, partition_privacy_id_count_histogram
partition_count_histogram, partition_privacy_id_count_histogram,
partition_sum_histogram
], backend)


Expand Down Expand Up @@ -560,7 +599,7 @@ def _compute_linf_sum_contributions_histogram_on_preaggregated_data(
This histogram contains: the number of (privacy id, partition_key)-pairs
which have sum of values X_1, X_2, ..., X_n, where X_1 = min_sum,
X_n = one before max sum and n is equal to
NUMBER_OF_BUCKETS_IN_LINF_SUM_CONTRIBUTIONS_HISTOGRAM.
NUMBER_OF_BUCKETS_SUM_HISTOGRAM.
Args:
col: collection with a pre-aggregated dataset, each element is
Expand All @@ -576,8 +615,7 @@ def _compute_linf_sum_contributions_histogram_on_preaggregated_data(
# col: (float,) where each element is the sum of values the
# corresponding privacy_id contributes to the partition.
col = backend.to_multi_transformable_collection(col)
lowers = _min_max_lowers(
col, NUMBER_OF_BUCKETS_IN_LINF_SUM_CONTRIBUTIONS_HISTOGRAM, backend)
lowers = _min_max_lowers(col, NUMBER_OF_BUCKETS_SUM_HISTOGRAM, backend)
# lowers: (float,) where each value defines a lower of a bin in the
# generated histogram.

Expand Down Expand Up @@ -613,6 +651,39 @@ def _compute_partition_count_histogram_on_preaggregated_data(
hist.HistogramType.COUNT_PER_PARTITION)


def _compute_partition_sum_histogram_on_preaggregated_data(
col, backend: pipeline_backend.PipelineBackend):
"""Computes histogram of counts per partition.
This histogram contains: the number of partition_keys which have sum of
values X_1, X_2, ..., X_n, where X_1 = min_sum, X_n = one before max sum and
n is equal to NUMBER_OF_BUCKETS_SUM_HISTOGRAM.
Args:
col: collection with a pre-aggregated dataset, each element is
(partition_key, (count, sum, n_partitions, n_contributions)).
backend: PipelineBackend to run operations on the collection.
Returns:
1 element collection, which contains the computed histograms.Histogram.
"""
col = backend.map_values(
col,
lambda x: x[1], # x is (count, sum, n_partitions, n_contributions)
"Extract sum per partition contribution")
# col: (pk, int)
col = backend.sum_per_key(col, "Sum per partition")
# col: (pk, int), where each element is the total sum per partition.
col = backend.values(col, "Drop partition keys")
# col: (int,)
col = backend.to_multi_transformable_collection(col)
lowers = _min_max_lowers(col, NUMBER_OF_BUCKETS_SUM_HISTOGRAM, backend)
# lowers: (float,) where each value defines a lower of a bin in the
# generated histogram.

return _compute_frequency_histogram_helper_with_lowers(
col, backend, hist.HistogramType.SUM_PER_PARTITION, lowers)


def _compute_partition_privacy_id_count_histogram_on_preaggregated_data(
col, backend: pipeline_backend.PipelineBackend):
"""Computes a histogram of privacy id counts per partition.
Expand Down Expand Up @@ -675,10 +746,13 @@ def compute_dataset_histograms_on_preaggregated_data(
col, backend)
partition_privacy_id_count_histogram = _compute_partition_privacy_id_count_histogram_on_preaggregated_data(
col, backend)
partition_sum_histogram = _compute_partition_sum_histogram_on_preaggregated_data(
col, backend)

# Combine histograms to histograms.DatasetHistograms.
return _to_dataset_histograms([
l0_contributions_histogram, l1_contributions_histogram,
linf_contributions_histogram, linf_sum_contributions_histogram,
partition_count_histogram, partition_privacy_id_count_histogram
partition_count_histogram, partition_privacy_id_count_histogram,
partition_sum_histogram
], backend)
7 changes: 6 additions & 1 deletion pipeline_dp/dataset_histograms/histograms.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class HistogramType(enum.Enum):
LINF_SUM_CONTRIBUTIONS = 'linf_sum_contributions'
COUNT_PER_PARTITION = 'count_per_partition'
COUNT_PRIVACY_ID_PER_PARTITION = 'privacy_id_per_partition_count'
SUM_PER_PARTITION = 'sum_per_partition'


@dataclass
Expand Down Expand Up @@ -112,7 +113,10 @@ def is_integer(self) -> bool:
* Floating histograms, which is used for sums per partition. Which
has the same size bins.
"""
return self.name != HistogramType.LINF_SUM_CONTRIBUTIONS
return self.name not in [
HistogramType.LINF_SUM_CONTRIBUTIONS,
HistogramType.SUM_PER_PARTITION
]

def total_count(self):
return sum([bin.count for bin in self.bins])
Expand Down Expand Up @@ -209,3 +213,4 @@ class DatasetHistograms:
linf_sum_contributions_histogram: Histogram
count_per_partition_histogram: Histogram
count_privacy_id_per_partition: Histogram
sum_per_partition_histogram: Histogram
Loading

0 comments on commit 39917cd

Please sign in to comment.