From 89706f8d268a5c458c7c37e370ed367d7fd75449 Mon Sep 17 00:00:00 2001 From: David Cook Date: Wed, 10 Jul 2024 11:07:50 -0500 Subject: [PATCH 1/5] Fix flake in global_hpke_config test --- .../http_handlers/tests/hpke_config.rs | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/aggregator/src/aggregator/http_handlers/tests/hpke_config.rs b/aggregator/src/aggregator/http_handlers/tests/hpke_config.rs index 744a766f6..11659f73c 100644 --- a/aggregator/src/aggregator/http_handlers/tests/hpke_config.rs +++ b/aggregator/src/aggregator/http_handlers/tests/hpke_config.rs @@ -113,21 +113,10 @@ async fn global_hpke_config() { clock, ephemeral_datastore: _ephemeral_datastore, datastore, + hpke_keypair: first_hpke_keypair, .. } = HttpHandlerTest::new().await; - // Retrieve the global keypair from the test fixture. - let first_hpke_keypair = datastore - .run_unnamed_tx(|tx| { - Box::pin(async move { - Ok(tx.get_global_hpke_keypairs().await.unwrap()[0] - .hpke_keypair() - .clone()) - }) - }) - .await - .unwrap(); - let aggregator = Arc::new( crate::aggregator::Aggregator::new( datastore.clone(), @@ -162,7 +151,8 @@ async fn global_hpke_config() { check_hpke_config_is_usable(&hpke_config_list, &first_hpke_keypair); // Insert an inactive HPKE config. - let second_hpke_keypair = HpkeKeypair::test_with_id(2); + let first_hpke_keypair_id = u8::from(*first_hpke_keypair.config().id()); + let second_hpke_keypair = HpkeKeypair::test_with_id(first_hpke_keypair_id.wrapping_add(1)); datastore .run_unnamed_tx(|tx| { let keypair = second_hpke_keypair.clone(); From 4af2e6e311e0ed47858e9043c5534164cdb334dc Mon Sep 17 00:00:00 2001 From: David Cook Date: Wed, 10 Jul 2024 11:12:12 -0500 Subject: [PATCH 2/5] Fix flake in upload_wrong_hpke_config_id test --- aggregator/src/aggregator/upload_tests.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/aggregator/src/aggregator/upload_tests.rs b/aggregator/src/aggregator/upload_tests.rs index 9dcdb406a..cc4083def 100644 --- a/aggregator/src/aggregator/upload_tests.rs +++ b/aggregator/src/aggregator/upload_tests.rs @@ -254,9 +254,11 @@ async fn upload_wrong_hpke_config_id() { let leader_task = task.leader_view().unwrap(); let report = create_report(&leader_task, &hpke_keypair, clock.now()); + let mut hpke_keys = leader_task.hpke_keys().clone(); + hpke_keys.insert(*hpke_keypair.config().id(), hpke_keypair); let unused_hpke_config_id = (0..) .map(HpkeConfigId::from) - .find(|id| !leader_task.hpke_keys().contains_key(id)) + .find(|id| !hpke_keys.contains_key(id)) .unwrap(); let report = Report::new( From c56f9162c5b7afc3837994010ba6f7e476a826f2 Mon Sep 17 00:00:00 2001 From: David Cook Date: Wed, 10 Jul 2024 11:19:40 -0500 Subject: [PATCH 3/5] Fix flake in aggregate_init test --- .../src/aggregator/http_handlers/tests/aggregation_job_init.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/aggregator/src/aggregator/http_handlers/tests/aggregation_job_init.rs b/aggregator/src/aggregator/http_handlers/tests/aggregation_job_init.rs index 6d3fdd766..b2baca82b 100644 --- a/aggregator/src/aggregator/http_handlers/tests/aggregation_job_init.rs +++ b/aggregator/src/aggregator/http_handlers/tests/aggregation_job_init.rs @@ -265,6 +265,9 @@ async fn aggregate_init() { if helper_task.hpke_keys().contains_key(hpke_config.id()) { continue; } + if hpke_keypair.config().id() == hpke_config.id() { + continue; + } break hpke_config; }; From 349503317aa2677c5428de5c5f80e6a58b1c35c2 Mon Sep 17 00:00:00 2001 From: David Cook Date: Wed, 10 Jul 2024 11:27:32 -0500 Subject: [PATCH 4/5] Fix handling of error from put_global_hpke_keypair put_global_hpke_keypair() cannot return MutationTargetAlreadyExists. Rather than change that method, this PR fixes two tests to just avoid calling the method when it's not needed, and removes matching on the MutationTargetAlreadyExists error variant. --- .../tests/aggregation_job_init.rs | 47 ++++++++----------- aggregator/src/aggregator/upload_tests.rs | 42 +++++++---------- 2 files changed, 38 insertions(+), 51 deletions(-) diff --git a/aggregator/src/aggregator/http_handlers/tests/aggregation_job_init.rs b/aggregator/src/aggregator/http_handlers/tests/aggregation_job_init.rs index b2baca82b..a952a1c3a 100644 --- a/aggregator/src/aggregator/http_handlers/tests/aggregation_job_init.rs +++ b/aggregator/src/aggregator/http_handlers/tests/aggregation_job_init.rs @@ -13,12 +13,9 @@ use crate::aggregator::{ use assert_matches::assert_matches; use futures::future::try_join_all; use janus_aggregator_core::{ - datastore::{ - self, - models::{ - AggregationJob, AggregationJobState, BatchAggregation, BatchAggregationState, - ReportAggregation, ReportAggregationState, - }, + datastore::models::{ + AggregationJob, AggregationJobState, BatchAggregation, BatchAggregationState, + ReportAggregation, ReportAggregationState, }, task::{test_util::TaskBuilder, QueryType, VerifyKey}, test_util::noop_meter, @@ -720,6 +717,7 @@ async fn aggregate_init_with_reports_encrypted_by_task_specific_key() { clock, ephemeral_datastore: _ephemeral_datastore, datastore, + hpke_keypair, .. } = HttpHandlerTest::new().await; @@ -737,28 +735,23 @@ async fn aggregate_init_with_reports_encrypted_by_task_specific_key() { aggregation_param, ); - // Same ID as the task to test having both keys to choose from. - let global_hpke_keypair_same_id = - HpkeKeypair::test_with_id((*helper_task.current_hpke_key().config().id()).into()); - datastore - .run_unnamed_tx(|tx| { - let global_hpke_keypair_same_id = global_hpke_keypair_same_id.clone(); - Box::pin(async move { - // Leave these in the PENDING state--they should still be decryptable. - match tx - .put_global_hpke_keypair(&global_hpke_keypair_same_id) - .await - { - // Prevent test flakes in case a colliding key is already in the datastore. - // The test context code randomly generates an ID so there's a chance for - // collision. - Ok(_) | Err(datastore::Error::MutationTargetAlreadyExists) => Ok(()), - Err(err) => Err(err), - } + // Same ID as the task to test having both keys to choose from. (skip if there is already a + // global keypair with the same ID set up by the fixture) + if helper_task.current_hpke_key().config().id() != hpke_keypair.config().id() { + let global_hpke_keypair_same_id = + HpkeKeypair::test_with_id((*helper_task.current_hpke_key().config().id()).into()); + datastore + .run_unnamed_tx(|tx| { + let global_hpke_keypair_same_id = global_hpke_keypair_same_id.clone(); + Box::pin(async move { + // Leave these in the PENDING state--they should still be decryptable. + tx.put_global_hpke_keypair(&global_hpke_keypair_same_id) + .await + }) }) - }) - .await - .unwrap(); + .await + .unwrap(); + } // Create new handler _after_ the keys have been inserted so that they come pre-cached. let handler = aggregator_handler( diff --git a/aggregator/src/aggregator/upload_tests.rs b/aggregator/src/aggregator/upload_tests.rs index cc4083def..e8bd13403 100644 --- a/aggregator/src/aggregator/upload_tests.rs +++ b/aggregator/src/aggregator/upload_tests.rs @@ -7,7 +7,6 @@ use assert_matches::assert_matches; use futures::future::try_join_all; use janus_aggregator_core::{ datastore::{ - self, models::{CollectionJob, CollectionJobState, TaskUploadCounter}, test_util::{ephemeral_datastore, EphemeralDatastore}, Datastore, @@ -499,6 +498,7 @@ async fn upload_report_encrypted_with_task_specific_key() { task, datastore, ephemeral_datastore: _ephemeral_datastore, + hpke_keypair, .. } = UploadTest::new(Config { max_upload_batch_size: 1000, @@ -509,30 +509,24 @@ async fn upload_report_encrypted_with_task_specific_key() { let leader_task = task.leader_view().unwrap(); // Insert a global keypair with the same ID as the task to test having both keys to choose - // from. - let global_hpke_keypair_same_id = - HpkeKeypair::test_with_id((*leader_task.current_hpke_key().config().id()).into()); - - datastore - .run_unnamed_tx(|tx| { - let global_hpke_keypair_same_id = global_hpke_keypair_same_id.clone(); - Box::pin(async move { - // Leave these in the PENDING state--they should still be decryptable. - match tx - .put_global_hpke_keypair(&global_hpke_keypair_same_id) - .await - { - // Prevent test flakes in case a colliding key is already in the datastore. - // The test context code randomly generates an ID so there's a chance for - // collision. - Ok(_) | Err(datastore::Error::MutationTargetAlreadyExists) => Ok(()), - Err(err) => Err(err), - } + // from. (skip if there is already a global keypair with the same ID set up by the fixture) + if leader_task.current_hpke_key().config().id() != hpke_keypair.config().id() { + let global_hpke_keypair_same_id = + HpkeKeypair::test_with_id((*leader_task.current_hpke_key().config().id()).into()); + + datastore + .run_unnamed_tx(|tx| { + let global_hpke_keypair_same_id = global_hpke_keypair_same_id.clone(); + Box::pin(async move { + // Leave these in the PENDING state--they should still be decryptable. + tx.put_global_hpke_keypair(&global_hpke_keypair_same_id) + .await + }) }) - }) - .await - .unwrap(); - aggregator.refresh_caches().await.unwrap(); + .await + .unwrap(); + aggregator.refresh_caches().await.unwrap(); + } let report = create_report(&leader_task, leader_task.current_hpke_key(), clock.now()); aggregator From 354b6be6fd305b208e7ed18b11ee01c4e1a23005 Mon Sep 17 00:00:00 2001 From: David Cook Date: Wed, 10 Jul 2024 11:47:43 -0500 Subject: [PATCH 5/5] Remove racy checks in task cache tests --- aggregator/src/cache.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/aggregator/src/cache.rs b/aggregator/src/cache.rs index a34c72ad4..9b6de39bc 100644 --- a/aggregator/src/cache.rs +++ b/aggregator/src/cache.rs @@ -432,11 +432,12 @@ mod tests { .await .unwrap(); - // That change shouldn't be reflected yet because we've cached the previous task. + // At this point, the above change may or may not be reflected yet, because we've cached the + // previous task. let task_aggregator = task_aggregators.get(task.id()).await.unwrap().unwrap(); - assert_eq!( - task_aggregator.task.task_expiration(), - task.task_expiration() + assert!( + (task_aggregator.task.task_expiration() == task.task_expiration()) + || (task_aggregator.task.task_expiration() == Some(&new_expiration)) ); // Unfortunately, because moka doesn't provide any facility for a fake clock, we have to resort @@ -482,9 +483,6 @@ mod tests { // A wild task appears! datastore.put_aggregator_task(&task).await.unwrap(); - // We shouldn't see the new task yet. - assert!(task_aggregators.get(task.id()).await.unwrap().is_none()); - // Unfortunately, because moka doesn't provide any facility for a fake clock, we have to resort // to sleeps to test TTL functionality. sleep(Duration::from_secs(1)).await; @@ -508,11 +506,12 @@ mod tests { .await .unwrap(); - // That change shouldn't be reflected yet because we've cached the previous run. + // At this point, the above change may or may not be reflected yet because we've cached the + // previous value. let task_aggregator = task_aggregators.get(task.id()).await.unwrap().unwrap(); - assert_eq!( - task_aggregator.task.task_expiration(), - task.task_expiration() + assert!( + (task_aggregator.task.task_expiration() == task.task_expiration()) + || (task_aggregator.task.task_expiration() == Some(&new_expiration)) ); sleep(Duration::from_secs(1)).await;