Skip to content

Commit

Permalink
Task rewrite: aggregate shares
Browse files Browse the repository at this point in the history
Adopts `AggregatorTask` and `NewTaskBuilder` across the helper's
aggregate share handlers and the leader's collection job driver.

Part of #1524
  • Loading branch information
tgeoghegan committed Sep 29, 2023
1 parent c877dd5 commit 055ed90
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 16 deletions.
2 changes: 1 addition & 1 deletion aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2892,7 +2892,7 @@ impl VdafOps {

let (helper_aggregate_share, report_count, checksum) =
compute_aggregate_share::<SEED_SIZE, Q, A>(
&task,
&task.view_for_role()?,
&batch_aggregations,
)
.await
Expand Down
4 changes: 2 additions & 2 deletions aggregator/src/aggregator/aggregate_share.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Implements functionality for computing & validating aggregate shares.

use super::Error;
use janus_aggregator_core::{datastore::models::BatchAggregation, task::Task};
use janus_aggregator_core::{datastore::models::BatchAggregation, task::AggregatorTask};
use janus_core::report_id::ReportIdChecksumExt;
use janus_messages::{query_type::QueryType, ReportIdChecksum};
use prio::vdaf::{self, Aggregatable};
Expand All @@ -16,7 +16,7 @@ pub(crate) async fn compute_aggregate_share<
Q: QueryType,
A: vdaf::Aggregator<SEED_SIZE, 16>,
>(
task: &Task,
task: &AggregatorTask,
batch_aggregations: &[BatchAggregation<SEED_SIZE, Q, A>],
) -> Result<(A::AggregateShare, u64, ReportIdChecksum), Error> {
// At the moment we construct an aggregate share (either handling AggregateShareReq in the
Expand Down
35 changes: 22 additions & 13 deletions aggregator/src/aggregator/collection_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl CollectionJobDriver {
// TODO(#224): Consider fleshing out `AcquiredCollectionJob` to include a `Task`,
// `A::AggregationParam`, etc. so that we don't have to do more DB queries here.
let task = tx
.get_task(lease.leased().task_id())
.get_aggregator_task(lease.leased().task_id())
.await?
.ok_or_else(|| {
datastore::Error::User(
Expand Down Expand Up @@ -161,7 +161,7 @@ impl CollectionJobDriver {
let batch_aggregations: Vec<_> =
Q::get_batch_aggregations_for_collection_identifier(
tx,
&task.leader_view()?,
&task,
vdaf.as_ref(),
collection_job.batch_identifier(),
collection_job.aggregation_parameter(),
Expand All @@ -177,7 +177,7 @@ impl CollectionJobDriver {
// transactionally to avoid the possibility of overwriting other transactions'
// updates to batch aggregations.
let empty_batch_aggregations = empty_batch_aggregations(
&task.leader_view()?,
&task,
batch_aggregation_shard_count,
collection_job.batch_identifier(),
collection_job.aggregation_parameter(),
Expand Down Expand Up @@ -224,7 +224,9 @@ impl CollectionJobDriver {
let resp_bytes = send_request_to_helper(
&self.http_client,
Method::POST,
task.aggregate_shares_uri()?,
task.aggregate_shares_uri()?.ok_or_else(|| {
Error::InvalidConfiguration("task is not leader and has no aggregate share URI")
})?,
AGGREGATE_SHARES_ROUTE,
AggregateShareReq::<TimeInterval>::MEDIA_TYPE,
req,
Expand Down Expand Up @@ -541,7 +543,10 @@ mod tests {
test_util::ephemeral_datastore,
Datastore,
},
task::{test_util::TaskBuilder, QueryType, Task},
task::{
test_util::{NewTaskBuilder as TaskBuilder, Task},
QueryType,
},
test_util::noop_meter,
};
use janus_core::{
Expand All @@ -557,7 +562,7 @@ mod tests {
use janus_messages::{
problem_type::DapProblemType, query_type::TimeInterval, AggregateShare, AggregateShareReq,
AggregationJobStep, BatchSelector, Duration, HpkeCiphertext, HpkeConfigId, Interval, Query,
ReportIdChecksum, Role,
ReportIdChecksum,
};
use prio::codec::{Decode, Encode};
use rand::random;
Expand All @@ -575,11 +580,13 @@ mod tests {
CollectionJob<0, TimeInterval, dummy_vdaf::Vdaf>,
) {
let time_precision = Duration::from_seconds(500);
let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake, Role::Leader)
let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake)
.with_helper_aggregator_endpoint(server.url().parse().unwrap())
.with_time_precision(time_precision)
.with_min_batch_size(10)
.build();

let leader_task = task.leader_view().unwrap();
let batch_interval = Interval::new(clock.now(), Duration::from_seconds(2000)).unwrap();
let aggregation_param = AggregationParam(0);

Expand All @@ -595,9 +602,9 @@ mod tests {
let lease = datastore
.run_tx(|tx| {
let (clock, task, collection_job) =
(clock.clone(), task.clone(), collection_job.clone());
(clock.clone(), leader_task.clone(), collection_job.clone());
Box::pin(async move {
tx.put_task(&task).await?;
tx.put_aggregator_task(&task).await?;

tx.put_collection_job::<0, TimeInterval, dummy_vdaf::Vdaf>(&collection_job)
.await?;
Expand Down Expand Up @@ -714,12 +721,14 @@ mod tests {
let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await);

let time_precision = Duration::from_seconds(500);
let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake, Role::Leader)
let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake)
.with_helper_aggregator_endpoint(server.url().parse().unwrap())
.with_time_precision(time_precision)
.with_min_batch_size(10)
.build();
let agg_auth_token = task.aggregator_auth_token().unwrap();

let leader_task = task.leader_view().unwrap();
let agg_auth_token = task.aggregator_auth_token();
let batch_interval = Interval::new(clock.now(), Duration::from_seconds(2000)).unwrap();
let aggregation_param = AggregationParam(0);
let report_timestamp = clock
Expand All @@ -729,10 +738,10 @@ mod tests {

let (collection_job_id, lease) = ds
.run_tx(|tx| {
let task = task.clone();
let task = leader_task.clone();
let clock = clock.clone();
Box::pin(async move {
tx.put_task(&task).await?;
tx.put_aggregator_task(&task).await?;

for offset in [0, 500, 1000, 1500] {
tx.put_batch(&Batch::<0, TimeInterval, dummy_vdaf::Vdaf>::new(
Expand Down

0 comments on commit 055ed90

Please sign in to comment.