Skip to content

Commit

Permalink
Add a smoke test to check that noise is added
Browse files Browse the repository at this point in the history
  • Loading branch information
divergentdave committed Jul 12, 2024
1 parent f5c550e commit 8de3694
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 31 deletions.
81 changes: 54 additions & 27 deletions integration_tests/tests/integration/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,33 @@ pub async fn verify_aggregate_generic<V>(
) where
V: vdaf::Client<16> + vdaf::Collector + InteropClientEncoding,
V::AggregateResult: PartialEq,
{
let (report_count, aggregate_result) = collect_aggregate_result_generic(
task_parameters,
leader_port,
vdaf,
before_timestamp,
&test_case.aggregation_parameter,
)
.await;

assert_eq!(
report_count,
u64::try_from(test_case.measurements.len()).unwrap()
);
assert_eq!(aggregate_result, test_case.aggregate_result);
}

pub async fn collect_aggregate_result_generic<V>(
task_parameters: &TaskParameters,
leader_port: u16,
vdaf: V,
before_timestamp: Time,
aggregation_parameter: &V::AggregationParam,
) -> (u64, V::AggregateResult)
where
V: vdaf::Client<16> + vdaf::Collector + InteropClientEncoding,
V::AggregateResult: PartialEq,
{
let leader_endpoint = task_parameters
.endpoint_fragments
Expand All @@ -247,7 +274,7 @@ pub async fn verify_aggregate_generic<V>(
.unwrap();

// Send a collect request and verify that we got the correct result.
match &task_parameters.query_type {
let (report_count, aggregate_result) = match &task_parameters.query_type {
QueryType::TimeInterval => {
let batch_interval = Interval::new(
before_timestamp
Expand All @@ -259,43 +286,42 @@ pub async fn verify_aggregate_generic<V>(
)
.unwrap();

let collection = collect_generic(
let collection_1 = collect_generic(
&collector,
Query::new_time_interval(batch_interval),
&test_case.aggregation_parameter,
aggregation_parameter,
)
.await
.unwrap();

assert_eq!(
collection.report_count(),
u64::try_from(test_case.measurements.len()).unwrap()
);
assert_eq!(collection.aggregate_result(), &test_case.aggregate_result);

// Collect again to verify that collections can be repeated.
let collection = collect_generic(
let collection_2 = collect_generic(
&collector,
Query::new_time_interval(batch_interval),
&test_case.aggregation_parameter,
aggregation_parameter,
)
.await
.unwrap();

assert_eq!(collection_1.report_count(), collection_2.report_count());
assert_eq!(
collection.report_count(),
u64::try_from(test_case.measurements.len()).unwrap()
collection_1.aggregate_result(),
collection_2.aggregate_result()
);
assert_eq!(collection.aggregate_result(), &test_case.aggregate_result);

(
collection_2.report_count(),
collection_2.aggregate_result().clone(),
)
}
QueryType::FixedSize { .. } => {
let mut requests = 0;
let collection = loop {
let collection_1 = loop {
requests += 1;
let collection_res = collect_generic::<_, FixedSize>(
&collector,
Query::new_fixed_size(FixedSizeQuery::CurrentBatch),
&test_case.aggregation_parameter,
aggregation_parameter,
)
.await;
match collection_res {
Expand All @@ -313,29 +339,30 @@ pub async fn verify_aggregate_generic<V>(
}
};

let batch_id = *collection.partial_batch_selector().batch_id();
assert_eq!(
collection.report_count(),
u64::try_from(test_case.measurements.len()).unwrap()
);
assert_eq!(collection.aggregate_result(), &test_case.aggregate_result);
let batch_id = *collection_1.partial_batch_selector().batch_id();

// Collect again to verify that collections can be repeated.
let collection = collect_generic(
let collection_2 = collect_generic(
&collector,
Query::new_fixed_size(FixedSizeQuery::ByBatchId { batch_id }),
&test_case.aggregation_parameter,
aggregation_parameter,
)
.await
.unwrap();

assert_eq!(collection_1.report_count(), collection_2.report_count());
assert_eq!(
collection.report_count(),
u64::try_from(test_case.measurements.len()).unwrap()
collection_1.aggregate_result(),
collection_2.aggregate_result()
);
assert_eq!(collection.aggregate_result(), &test_case.aggregate_result);

(
collection_2.report_count(),
collection_2.aggregate_result().clone(),
)
}
};
(report_count, aggregate_result)
}

pub async fn submit_measurements_and_verify_aggregate(
Expand Down
72 changes: 68 additions & 4 deletions integration_tests/tests/integration/janus.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::{
common::{
build_test_task, submit_measurements_and_verify_aggregate,
submit_measurements_and_verify_aggregate_varying_aggregation_parameter, TestContext,
build_test_task, collect_aggregate_result_generic,
submit_measurements_and_verify_aggregate,
submit_measurements_and_verify_aggregate_varying_aggregation_parameter,
submit_measurements_generic, TestContext,
},
initialize_rustls,
};
Expand All @@ -16,8 +18,13 @@ use janus_integration_tests::{client::ClientBackend, janus::JanusInProcess, Task
#[cfg(feature = "testcontainer")]
use janus_interop_binaries::test_util::generate_network_name;
use janus_messages::Role;
use prio::vdaf::dummy;
use std::time::Duration;
use prio::{
dp::{
distributions::PureDpDiscreteLaplace, DifferentialPrivacyStrategy, PureDpBudget, Rational,
},
vdaf::{dummy, prio3::Prio3},
};
use std::{iter, time::Duration};

/// A pair of Janus instances, running in containers, against which integration tests may be run.
#[cfg(feature = "testcontainer")]
Expand Down Expand Up @@ -451,3 +458,60 @@ async fn janus_in_process_one_round_with_agg_param_time_interval() {
)
.await;
}

#[tokio::test(flavor = "multi_thread")]
async fn janus_in_process_histogram_dp_noise() {
static TEST_NAME: &str = "janus_in_process_histogram_dp_noise";
const HISTOGRAM_LENGTH: usize = 100;
const CHUNK_LENGTH: usize = 10;

install_test_trace_subscriber();
initialize_rustls();

let epsilon = Rational::from_unsigned(1u128, 10u128).unwrap();
let janus_pair = JanusInProcessPair::new(TaskBuilder::new(
QueryType::TimeInterval,
VdafInstance::Prio3Histogram {
length: HISTOGRAM_LENGTH,
chunk_length: CHUNK_LENGTH,
dp_strategy: vdaf_dp_strategies::Prio3Histogram::PureDpDiscreteLaplace(
PureDpDiscreteLaplace::from_budget(PureDpBudget::new(epsilon)),
),
},
))
.await;
let vdaf = Prio3::new_histogram_multithreaded(2, HISTOGRAM_LENGTH, CHUNK_LENGTH).unwrap();

let total_measurements: usize = janus_pair
.task_parameters
.min_batch_size
.try_into()
.unwrap();
let measurements = iter::repeat(0).take(total_measurements).collect::<Vec<_>>();
let client_implementation = ClientBackend::InProcess
.build(
TEST_NAME,
&janus_pair.task_parameters,
(janus_pair.leader.port(), janus_pair.helper.port()),
vdaf.clone(),
)
.await
.unwrap();
let before_timestamp = submit_measurements_generic(&measurements, &client_implementation).await;
let (report_count, aggregate_result) = collect_aggregate_result_generic(
&janus_pair.task_parameters,
janus_pair.leader.port(),
vdaf,
before_timestamp,
&(),
)
.await;
assert_eq!(report_count, janus_pair.task_parameters.min_batch_size);

let mut un_noised_result = [0u128; HISTOGRAM_LENGTH];
un_noised_result[0] = report_count.into();
// Smoke test: Just confirm that some noise was added. Since epsilon is small, the noise will be
// large (drawn from Laplace_Z(20) + Laplace_Z(20)), and it is highly unlikely that all 100
// noise values will be zero simultaneously.
assert_ne!(aggregate_result, un_noised_result);
}

0 comments on commit 8de3694

Please sign in to comment.