Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky tests #3294

Merged
merged 5 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@ use crate::aggregator::{
use assert_matches::assert_matches;
use futures::future::try_join_all;
use janus_aggregator_core::{
datastore::{
self,
models::{
AggregationJob, AggregationJobState, BatchAggregation, BatchAggregationState,
ReportAggregation, ReportAggregationState,
},
datastore::models::{
AggregationJob, AggregationJobState, BatchAggregation, BatchAggregationState,
ReportAggregation, ReportAggregationState,
},
task::{test_util::TaskBuilder, QueryType, VerifyKey},
test_util::noop_meter,
Expand Down Expand Up @@ -265,6 +262,9 @@ async fn aggregate_init() {
if helper_task.hpke_keys().contains_key(hpke_config.id()) {
continue;
}
if hpke_keypair.config().id() == hpke_config.id() {
continue;
}
break hpke_config;
};

Expand Down Expand Up @@ -717,6 +717,7 @@ async fn aggregate_init_with_reports_encrypted_by_task_specific_key() {
clock,
ephemeral_datastore: _ephemeral_datastore,
datastore,
hpke_keypair,
..
} = HttpHandlerTest::new().await;

Expand All @@ -734,28 +735,23 @@ async fn aggregate_init_with_reports_encrypted_by_task_specific_key() {
aggregation_param,
);

// Same ID as the task to test having both keys to choose from.
let global_hpke_keypair_same_id =
HpkeKeypair::test_with_id((*helper_task.current_hpke_key().config().id()).into());
datastore
.run_unnamed_tx(|tx| {
let global_hpke_keypair_same_id = global_hpke_keypair_same_id.clone();
Box::pin(async move {
// Leave these in the PENDING state--they should still be decryptable.
match tx
.put_global_hpke_keypair(&global_hpke_keypair_same_id)
.await
{
// Prevent test flakes in case a colliding key is already in the datastore.
// The test context code randomly generates an ID so there's a chance for
// collision.
Ok(_) | Err(datastore::Error::MutationTargetAlreadyExists) => Ok(()),
Err(err) => Err(err),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦 ah I see, there is no ON CONFLICT DO NOTHING in put_global_hpke_keypair() so if there's a conflict it'll fail with a harder-to-match postgres uniqueness constraint violation error.

}
// Same ID as the task to test having both keys to choose from. (skip if there is already a
// global keypair with the same ID set up by the fixture)
if helper_task.current_hpke_key().config().id() != hpke_keypair.config().id() {
let global_hpke_keypair_same_id =
HpkeKeypair::test_with_id((*helper_task.current_hpke_key().config().id()).into());
datastore
.run_unnamed_tx(|tx| {
let global_hpke_keypair_same_id = global_hpke_keypair_same_id.clone();
Box::pin(async move {
// Leave these in the PENDING state--they should still be decryptable.
tx.put_global_hpke_keypair(&global_hpke_keypair_same_id)
.await
})
})
})
.await
.unwrap();
.await
.unwrap();
}

// Create new handler _after_ the keys have been inserted so that they come pre-cached.
let handler = aggregator_handler(
Expand Down
16 changes: 3 additions & 13 deletions aggregator/src/aggregator/http_handlers/tests/hpke_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,10 @@ async fn global_hpke_config() {
clock,
ephemeral_datastore: _ephemeral_datastore,
datastore,
hpke_keypair: first_hpke_keypair,
..
} = HttpHandlerTest::new().await;

// Retrieve the global keypair from the test fixture.
let first_hpke_keypair = datastore
.run_unnamed_tx(|tx| {
Box::pin(async move {
Ok(tx.get_global_hpke_keypairs().await.unwrap()[0]
.hpke_keypair()
.clone())
})
})
.await
.unwrap();

let aggregator = Arc::new(
crate::aggregator::Aggregator::new(
datastore.clone(),
Expand Down Expand Up @@ -162,7 +151,8 @@ async fn global_hpke_config() {
check_hpke_config_is_usable(&hpke_config_list, &first_hpke_keypair);

// Insert an inactive HPKE config.
let second_hpke_keypair = HpkeKeypair::test_with_id(2);
let first_hpke_keypair_id = u8::from(*first_hpke_keypair.config().id());
let second_hpke_keypair = HpkeKeypair::test_with_id(first_hpke_keypair_id.wrapping_add(1));
datastore
.run_unnamed_tx(|tx| {
let keypair = second_hpke_keypair.clone();
Expand Down
46 changes: 21 additions & 25 deletions aggregator/src/aggregator/upload_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use assert_matches::assert_matches;
use futures::future::try_join_all;
use janus_aggregator_core::{
datastore::{
self,
models::{CollectionJob, CollectionJobState, TaskUploadCounter},
test_util::{ephemeral_datastore, EphemeralDatastore},
Datastore,
Expand Down Expand Up @@ -254,9 +253,11 @@ async fn upload_wrong_hpke_config_id() {
let leader_task = task.leader_view().unwrap();
let report = create_report(&leader_task, &hpke_keypair, clock.now());

let mut hpke_keys = leader_task.hpke_keys().clone();
hpke_keys.insert(*hpke_keypair.config().id(), hpke_keypair);
let unused_hpke_config_id = (0..)
.map(HpkeConfigId::from)
.find(|id| !leader_task.hpke_keys().contains_key(id))
.find(|id| !hpke_keys.contains_key(id))
.unwrap();

let report = Report::new(
Expand Down Expand Up @@ -497,6 +498,7 @@ async fn upload_report_encrypted_with_task_specific_key() {
task,
datastore,
ephemeral_datastore: _ephemeral_datastore,
hpke_keypair,
..
} = UploadTest::new(Config {
max_upload_batch_size: 1000,
Expand All @@ -507,30 +509,24 @@ async fn upload_report_encrypted_with_task_specific_key() {
let leader_task = task.leader_view().unwrap();

// Insert a global keypair with the same ID as the task to test having both keys to choose
// from.
let global_hpke_keypair_same_id =
HpkeKeypair::test_with_id((*leader_task.current_hpke_key().config().id()).into());

datastore
.run_unnamed_tx(|tx| {
let global_hpke_keypair_same_id = global_hpke_keypair_same_id.clone();
Box::pin(async move {
// Leave these in the PENDING state--they should still be decryptable.
match tx
.put_global_hpke_keypair(&global_hpke_keypair_same_id)
.await
{
// Prevent test flakes in case a colliding key is already in the datastore.
// The test context code randomly generates an ID so there's a chance for
// collision.
Ok(_) | Err(datastore::Error::MutationTargetAlreadyExists) => Ok(()),
Err(err) => Err(err),
}
// from. (skip if there is already a global keypair with the same ID set up by the fixture)
if leader_task.current_hpke_key().config().id() != hpke_keypair.config().id() {
let global_hpke_keypair_same_id =
HpkeKeypair::test_with_id((*leader_task.current_hpke_key().config().id()).into());

datastore
.run_unnamed_tx(|tx| {
let global_hpke_keypair_same_id = global_hpke_keypair_same_id.clone();
Box::pin(async move {
// Leave these in the PENDING state--they should still be decryptable.
tx.put_global_hpke_keypair(&global_hpke_keypair_same_id)
.await
})
})
})
.await
.unwrap();
aggregator.refresh_caches().await.unwrap();
.await
.unwrap();
aggregator.refresh_caches().await.unwrap();
}

let report = create_report(&leader_task, leader_task.current_hpke_key(), clock.now());
aggregator
Expand Down
21 changes: 10 additions & 11 deletions aggregator/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,11 +432,12 @@ mod tests {
.await
.unwrap();

// That change shouldn't be reflected yet because we've cached the previous task.
// At this point, the above change may or may not be reflected yet, because we've cached the
// previous task.
let task_aggregator = task_aggregators.get(task.id()).await.unwrap().unwrap();
assert_eq!(
task_aggregator.task.task_expiration(),
task.task_expiration()
assert!(
(task_aggregator.task.task_expiration() == task.task_expiration())
|| (task_aggregator.task.task_expiration() == Some(&new_expiration))
);

// Unfortunately, because moka doesn't provide any facility for a fake clock, we have to resort
Expand Down Expand Up @@ -482,9 +483,6 @@ mod tests {
// A wild task appears!
datastore.put_aggregator_task(&task).await.unwrap();

// We shouldn't see the new task yet.
assert!(task_aggregators.get(task.id()).await.unwrap().is_none());

// Unfortunately, because moka doesn't provide any facility for a fake clock, we have to resort
// to sleeps to test TTL functionality.
sleep(Duration::from_secs(1)).await;
Expand All @@ -508,11 +506,12 @@ mod tests {
.await
.unwrap();

// That change shouldn't be reflected yet because we've cached the previous run.
// At this point, the above change may or may not be reflected yet because we've cached the
// previous value.
let task_aggregator = task_aggregators.get(task.id()).await.unwrap().unwrap();
assert_eq!(
task_aggregator.task.task_expiration(),
task.task_expiration()
assert!(
(task_aggregator.task.task_expiration() == task.task_expiration())
|| (task_aggregator.task.task_expiration() == Some(&new_expiration))
);

sleep(Duration::from_secs(1)).await;
Expand Down
Loading