Skip to content

Commit

Permalink
Task rewrite: aggregation job handling
Browse files Browse the repository at this point in the history
Adopts `AggregatorTask` and `NewTaskBuilder` across the aggregation job
handling components.

Part of #1524
  • Loading branch information
tgeoghegan committed Sep 29, 2023
1 parent 6976df2 commit 25019ca
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 186 deletions.
4 changes: 3 additions & 1 deletion aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1595,7 +1595,7 @@ impl VdafOps {
let agg_param = A::AggregationParam::get_decoded(req.aggregation_parameter())?;

let mut accumulator = Accumulator::<SEED_SIZE, Q, A>::new(
Arc::clone(&task),
Arc::new(task.view_for_role()?),
batch_aggregation_shard_count,
agg_param.clone(),
);
Expand Down Expand Up @@ -2065,6 +2065,8 @@ impl VdafOps {
));
}

let task = Arc::new(task.view_for_role()?);

// TODO(#224): don't hold DB transaction open while computing VDAF updates?
// TODO(#224): don't do O(n) network round-trips (where n is the number of prepare steps)
Ok(datastore
Expand Down
13 changes: 5 additions & 8 deletions aggregator/src/aggregator/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use janus_aggregator_core::{
Transaction,
},
query_type::AccumulableQueryType,
task::Task,
task::AggregatorTask,
};
use janus_core::{
report_id::ReportIdChecksumExt,
Expand All @@ -33,7 +33,7 @@ pub struct Accumulator<
Q: AccumulableQueryType,
A: vdaf::Aggregator<SEED_SIZE, 16>,
> {
task: Arc<Task>,
task: Arc<AggregatorTask>,
shard_count: u64,
#[derivative(Debug = "ignore")]
aggregation_parameter: A::AggregationParam,
Expand All @@ -55,7 +55,7 @@ impl<const SEED_SIZE: usize, Q: AccumulableQueryType, A: vdaf::Aggregator<SEED_S
{
/// Creates a new accumulator.
pub fn new(
task: Arc<Task>,
task: Arc<AggregatorTask>,
shard_count: u64,
aggregation_parameter: A::AggregationParam,
) -> Self {
Expand All @@ -80,11 +80,8 @@ impl<const SEED_SIZE: usize, Q: AccumulableQueryType, A: vdaf::Aggregator<SEED_S
client_timestamp: &Time,
output_share: &A::OutputShare,
) -> Result<(), datastore::Error> {
let batch_identifier = Q::to_batch_identifier(
&self.task.view_for_role()?,
partial_batch_identifier,
client_timestamp,
)?;
let batch_identifier =
Q::to_batch_identifier(&self.task, partial_batch_identifier, client_timestamp)?;
let client_timestamp_interval =
Interval::from_time(client_timestamp).map_err(|e| datastore::Error::User(e.into()))?;
let batch_aggregation_fn = || {
Expand Down
4 changes: 2 additions & 2 deletions aggregator/src/aggregator/aggregation_job_continue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use janus_aggregator_core::{
Transaction,
},
query_type::AccumulableQueryType,
task::Task,
task::AggregatorTask,
};
use janus_core::time::Clock;
use janus_messages::{
Expand All @@ -32,7 +32,7 @@ impl VdafOps {
/// `report_aggregations` with the step `n+1` ping pong messages in `leader_aggregation_job`.
pub(super) async fn step_aggregation_job<const SEED_SIZE: usize, C, Q, A>(
tx: &Transaction<'_, C>,
task: Arc<Task>,
task: Arc<AggregatorTask>,
vdaf: Arc<A>,
batch_aggregation_shard_count: u64,
helper_aggregation_job: AggregationJob<SEED_SIZE, Q, A>,
Expand Down
Loading

0 comments on commit 25019ca

Please sign in to comment.