diff --git a/cli/src/tasks.rs b/cli/src/tasks.rs index 69f200dc..5aa109ac 100644 --- a/cli/src/tasks.rs +++ b/cli/src/tasks.rs @@ -31,6 +31,8 @@ pub enum TaskAction { min_batch_size: u64, #[arg(long)] max_batch_size: Option, + #[arg(long, requires = "max_batch_size")] + batch_time_window_size_seconds: Option, #[arg(long)] time_precision: Duration, #[arg(long)] @@ -72,6 +74,7 @@ impl TaskAction { vdaf, min_batch_size, max_batch_size, + batch_time_window_size_seconds, time_precision, collector_credential_id, categorical_buckets, @@ -129,6 +132,7 @@ impl TaskAction { vdaf, min_batch_size, max_batch_size, + batch_time_window_size_seconds, time_precision_seconds, collector_credential_id, }; diff --git a/client/src/task.rs b/client/src/task.rs index 16c41568..c4356e5b 100644 --- a/client/src/task.rs +++ b/client/src/task.rs @@ -10,6 +10,7 @@ pub struct Task { pub vdaf: Vdaf, pub min_batch_size: u64, pub max_batch_size: Option, + pub batch_time_window_size_seconds: Option, #[serde(with = "time::serde::rfc3339")] pub created_at: OffsetDateTime, #[serde(with = "time::serde::rfc3339")] @@ -42,6 +43,7 @@ pub struct NewTask { pub vdaf: Vdaf, pub min_batch_size: u64, pub max_batch_size: Option, + pub batch_time_window_size_seconds: Option, pub time_precision_seconds: u64, pub collector_credential_id: Uuid, } diff --git a/client/tests/integration/tasks.rs b/client/tests/integration/tasks.rs index 153ea6c8..c3baf0ce 100644 --- a/client/tests/integration/tasks.rs +++ b/client/tests/integration/tasks.rs @@ -1,5 +1,5 @@ use crate::harness::{assert_eq, test, *}; -use divviup_api::entity::aggregator::Features; +use divviup_api::entity::aggregator::{Feature, Features}; use divviup_client::{NewTask, Vdaf}; #[test(harness = with_configured_client)] @@ -27,6 +27,7 @@ async fn create_task(app: Arc, account: Account, client: DivviupClie vdaf: Vdaf::Count, min_batch_size: fastrand::i64(100..).try_into().unwrap(), max_batch_size: None, + batch_time_window_size_seconds: None, time_precision_seconds: fastrand::u64(60..2592000), collector_credential_id: collector_credential.id, }, @@ -40,6 +41,46 @@ async fn create_task(app: Arc, account: Account, client: DivviupClie Ok(()) } +#[test(harness = with_configured_client)] +async fn create_task_time_bucketed_fixed_size( + app: Arc, + account: Account, + client: DivviupClient, +) -> TestResult { + let (leader, helper) = fixtures::aggregator_pair(&app, &account).await; + + let mut leader = leader.into_active_model(); + leader.features = + ActiveValue::Set(Features::from_iter([Feature::TimeBucketedFixedSize]).into()); + let leader = leader.update(app.db()).await?; + + let collector_credential = fixtures::collector_credential(&app, &account).await; + let time_precision_seconds = fastrand::u64(60..2592000); + let min_batch_size = fastrand::i64(100..).try_into().unwrap(); + let response_task = client + .create_task( + account.id, + NewTask { + name: fixtures::random_name(), + leader_aggregator_id: leader.id, + helper_aggregator_id: helper.id, + vdaf: Vdaf::Count, + min_batch_size, + max_batch_size: Some(min_batch_size), + batch_time_window_size_seconds: Some(time_precision_seconds * 2), + time_precision_seconds, + collector_credential_id: collector_credential.id, + }, + ) + .await?; + let task_from_db = Tasks::find_by_id(&response_task.id) + .one(app.db()) + .await? + .unwrap(); + assert_same_json_representation(&task_from_db, &response_task); + Ok(()) +} + #[test(harness = with_configured_client)] async fn rename_task(app: Arc, account: Account, client: DivviupClient) -> TestResult { let task = fixtures::task(&app, &account).await; diff --git a/documentation/openapi.yml b/documentation/openapi.yml index b4d9661a..dc1e1fa7 100644 --- a/documentation/openapi.yml +++ b/documentation/openapi.yml @@ -290,6 +290,8 @@ paths: type: number max_batch_size: type: number + batch_time_window_size_seconds: + type: number expiration: type: string format: date-time @@ -707,6 +709,9 @@ components: max_batch_size: type: number nullable: true + batch_time_window_size_seconds: + type: number + nullable: true time_precision_seconds: type: number report_count: diff --git a/migration/src/lib.rs b/migration/src/lib.rs index cbdec961..79e34463 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -23,6 +23,7 @@ mod m20231012_205810_add_features_to_aggregators; mod m20231012_225001_rename_hpke_configs_to_collector_credentials; mod m20231012_233117_add_token_hash_to_collector_credential; mod m20240214_215101_upload_metrics; +mod m20240411_195358_time_bucketed_fixed_size; pub struct Migrator; diff --git a/migration/src/m20240411_195358_time_bucketed_fixed_size.rs b/migration/src/m20240411_195358_time_bucketed_fixed_size.rs new file mode 100644 index 00000000..cfc85562 --- /dev/null +++ b/migration/src/m20240411_195358_time_bucketed_fixed_size.rs @@ -0,0 +1,39 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + TableAlterStatement::new() + .table(Task::Table) + .add_column( + ColumnDef::new(Task::BatchTimeWindowSize) + .big_integer() + .null(), + ) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + TableAlterStatement::new() + .table(Task::Table) + .drop_column(Task::BatchTimeWindowSize) + .to_owned(), + ) + .await + } +} + +#[derive(Iden)] +enum Task { + Table, + BatchTimeWindowSize, +} diff --git a/src/clients/aggregator_client/api_types.rs b/src/clients/aggregator_client/api_types.rs index bb239f98..a4e99669 100644 --- a/src/clients/aggregator_client/api_types.rs +++ b/src/clients/aggregator_client/api_types.rs @@ -4,7 +4,7 @@ use crate::{ Features, QueryTypeName, QueryTypeNameSet, Role as AggregatorRole, VdafNameSet, }, task::vdaf::{BucketLength, ContinuousBuckets, CountVec, Histogram, Sum, SumVec, Vdaf}, - Aggregator, Protocol, ProvisionableTask, Task, + Aggregator, NewTask, Protocol, ProvisionableTask, Task, }, handler::Error, }; @@ -153,44 +153,45 @@ impl From for Vdaf { #[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum QueryType { TimeInterval, - FixedSize { max_batch_size: u64 }, + FixedSize { + max_batch_size: u64, + #[serde(skip_serializing_if = "Option::is_none")] + batch_time_window_size: Option, + }, } -impl QueryType { - pub fn name(&self) -> QueryTypeName { - match self { - QueryType::TimeInterval => QueryTypeName::TimeInterval, - QueryType::FixedSize { .. } => QueryTypeName::FixedSize, +impl From for QueryType { + fn from(value: ProvisionableTask) -> Self { + if let Some(max_batch_size) = value.max_batch_size { + Self::FixedSize { + max_batch_size, + batch_time_window_size: value.batch_time_window_size_seconds, + } + } else { + Self::TimeInterval } } } -impl From for Option { - fn from(value: QueryType) -> Self { - Option::::from(value).map(|u| u.try_into().unwrap()) - } -} - -impl From for Option { - fn from(value: QueryType) -> Self { - match value { - QueryType::TimeInterval => None, - QueryType::FixedSize { max_batch_size } => Some(max_batch_size), +impl From for QueryType { + fn from(value: NewTask) -> Self { + if let Some(max_batch_size) = value.max_batch_size { + Self::FixedSize { + max_batch_size, + batch_time_window_size: value.batch_time_window_size_seconds, + } + } else { + Self::TimeInterval } } } -impl From> for QueryType { - fn from(value: Option) -> Self { - value.map_or(QueryType::TimeInterval, |max_batch_size| { - QueryType::FixedSize { max_batch_size } - }) - } -} - -impl From> for QueryType { - fn from(value: Option) -> Self { - value.map(|i| u64::try_from(i).unwrap()).into() +impl QueryType { + pub fn name(&self) -> QueryTypeName { + match self { + QueryType::TimeInterval => QueryTypeName::TimeInterval, + QueryType::FixedSize { .. } => QueryTypeName::FixedSize, + } } } @@ -294,7 +295,7 @@ impl TaskCreate { } else { new_task.leader_aggregator.dap_url.clone().into() }, - query_type: new_task.max_batch_size.into(), + query_type: new_task.clone().into(), vdaf: new_task.aggregator_vdaf.clone(), role, max_batch_query_count: 1, diff --git a/src/entity/aggregator/feature.rs b/src/entity/aggregator/feature.rs index 530f2b9b..1f59f1ee 100644 --- a/src/entity/aggregator/feature.rs +++ b/src/entity/aggregator/feature.rs @@ -5,6 +5,7 @@ use std::collections::HashSet; pub enum Feature { TokenHash, UploadMetrics, + TimeBucketedFixedSize, #[serde(untagged)] Unknown(String), } @@ -79,6 +80,10 @@ mod tests { serde_json::from_value::(json!(["TokenHash", "UploadMetrics"])).unwrap(), Features::from_iter([Feature::TokenHash, Feature::UploadMetrics]) ); + assert_eq!( + serde_json::from_value::(json!(["TimeBucketedFixedSize"])).unwrap(), + Features::from_iter([Feature::TimeBucketedFixedSize]) + ); } #[test] diff --git a/src/entity/task.rs b/src/entity/task.rs index 9f2e45f8..c2250b7a 100644 --- a/src/entity/task.rs +++ b/src/entity/task.rs @@ -38,6 +38,7 @@ pub struct Model { pub vdaf: Json, pub min_batch_size: i64, pub max_batch_size: Option, + pub batch_time_window_size_seconds: Option, #[serde(with = "time::serde::rfc3339")] pub created_at: OffsetDateTime, #[serde(with = "time::serde::rfc3339")] diff --git a/src/entity/task/new_task.rs b/src/entity/task/new_task.rs index 9419e556..d96e0492 100644 --- a/src/entity/task/new_task.rs +++ b/src/entity/task/new_task.rs @@ -1,7 +1,10 @@ use super::*; use crate::{ clients::aggregator_client::api_types::{AggregatorVdaf, QueryType}, - entity::{aggregator::Role, Account, CollectorCredential, Protocol}, + entity::{ + aggregator::{Feature, Role}, + Account, CollectorCredential, Protocol, + }, handler::Error, }; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; @@ -30,6 +33,9 @@ pub struct NewTask { #[validate(range(min = 0))] pub max_batch_size: Option, + #[validate(range(min = 0))] + pub batch_time_window_size_seconds: Option, + #[validate( required, range( @@ -90,6 +96,26 @@ impl NewTask { } } + fn validate_batch_time_window_size(&self, errors: &mut ValidationErrors) { + let window = self.batch_time_window_size_seconds; + if let Some(window) = window { + if self.max_batch_size.is_none() { + errors.add( + "batch_time_window_size_seconds", + ValidationError::new("missing-max-batch-size"), + ); + } + if let Some(precision) = self.time_precision_seconds { + if window % precision != 0 { + errors.add( + "batch_time_window_size_seconds", + ValidationError::new("not-multiple-of-time-precision"), + ); + } + } + } + } + async fn load_collector_credential( &self, account: &Account, @@ -192,6 +218,15 @@ impl NewTask { errors.add("helper_aggregator_id", ValidationError::new("role")) } + if self.batch_time_window_size_seconds.is_some() + && !leader.features.contains(&Feature::TimeBucketedFixedSize) + { + errors.add( + "leader_aggregator_id", + ValidationError::new("time-bucketed-fixed-size-unsupported"), + ) + } + if errors.is_empty() { Some((leader, helper, resolved_protocol)) } else { @@ -259,7 +294,7 @@ impl NewTask { helper: &Aggregator, errors: &mut ValidationErrors, ) { - let name = QueryType::from(self.max_batch_size).name(); + let name = QueryType::from(self.clone()).name(); if !leader.query_types.contains(&name) || !helper.query_types.contains(&name) { errors.add("max_batch_size", ValidationError::new("not-supported")); } @@ -272,6 +307,7 @@ impl NewTask { ) -> Result { let mut errors = Validate::validate(self).err().unwrap_or_default(); self.validate_min_lte_max(&mut errors); + self.validate_batch_time_window_size(&mut errors); let aggregators = self.validate_aggregators(&account, db, &mut errors).await; let collector_credential = self .validate_collector_credential( @@ -312,6 +348,7 @@ impl NewTask { aggregator_vdaf: aggregator_vdaf.unwrap(), min_batch_size: self.min_batch_size.unwrap(), max_batch_size: self.max_batch_size, + batch_time_window_size_seconds: self.batch_time_window_size_seconds, expiration: Some(OffsetDateTime::now_utc() + DEFAULT_EXPIRATION_DURATION), time_precision_seconds: self.time_precision_seconds.unwrap(), collector_credential: collector_credential.unwrap(), diff --git a/src/entity/task/provisionable_task.rs b/src/entity/task/provisionable_task.rs index ed9ed3b4..ae27072d 100644 --- a/src/entity/task/provisionable_task.rs +++ b/src/entity/task/provisionable_task.rs @@ -1,6 +1,6 @@ use super::*; use crate::{ - clients::aggregator_client::api_types::{AggregatorVdaf, AuthenticationToken}, + clients::aggregator_client::api_types::{AggregatorVdaf, AuthenticationToken, QueryType}, entity::{Account, CollectorCredential, Protocol, Task}, handler::Error, Crypter, @@ -28,6 +28,7 @@ pub struct ProvisionableTask { pub aggregator_vdaf: AggregatorVdaf, pub min_batch_size: u64, pub max_batch_size: Option, + pub batch_time_window_size_seconds: Option, pub expiration: Option, pub time_precision_seconds: u64, pub collector_credential: CollectorCredential, @@ -67,7 +68,7 @@ impl ProvisionableTask { "min_batch_size", )?; assert_same( - &self.max_batch_size.into(), + &QueryType::from(self.clone()), &response.query_type, "query_type", )?; @@ -113,6 +114,10 @@ impl ProvisionableTask { vdaf: self.vdaf.into(), min_batch_size: self.min_batch_size.try_into()?, max_batch_size: self.max_batch_size.map(TryInto::try_into).transpose()?, + batch_time_window_size_seconds: self + .batch_time_window_size_seconds + .map(TryInto::try_into) + .transpose()?, created_at: OffsetDateTime::now_utc(), updated_at: OffsetDateTime::now_utc(), time_precision_seconds: self.time_precision_seconds.try_into()?, diff --git a/test-support/src/fixtures.rs b/test-support/src/fixtures.rs index e1922948..f690233e 100644 --- a/test-support/src/fixtures.rs +++ b/test-support/src/fixtures.rs @@ -112,6 +112,7 @@ pub async fn task(app: &DivviupApi, account: &Account) -> Task { vdaf: task::vdaf::Vdaf::Count.into(), min_batch_size: 100, max_batch_size: Some(200), + batch_time_window_size_seconds: None, created_at: OffsetDateTime::now_utc(), updated_at: OffsetDateTime::now_utc(), time_precision_seconds: 60, diff --git a/tests/integration/new_task.rs b/tests/integration/new_task.rs index ca68571d..94aabaaf 100644 --- a/tests/integration/new_task.rs +++ b/tests/integration/new_task.rs @@ -1,3 +1,4 @@ +use divviup_api::entity::aggregator::{Feature, Features}; use test_support::{assert_eq, test, *}; pub async fn assert_errors(app: &DivviupApi, new_task: &mut NewTask, field: &str, codes: &[&str]) { @@ -15,6 +16,20 @@ pub async fn assert_errors(app: &DivviupApi, new_task: &mut NewTask, field: &str ); } +pub async fn assert_no_errors(app: &DivviupApi, new_task: &mut NewTask, field: &str) { + let account = fixtures::account(app).await; + let errors = new_task + .normalize_and_validate(account, app.db()) + .await + .unwrap_err(); + let errors = errors + .field_errors() + .get(field) + .map(|c| c.iter().map(|error| &error.code).collect::>()) + .unwrap_or_default(); + assert!(errors.is_empty(), "{:?}", errors); +} + #[test(harness = set_up)] async fn batch_size(app: DivviupApi) -> TestResult { assert_errors( @@ -43,6 +58,87 @@ async fn batch_size(app: DivviupApi) -> TestResult { Ok(()) } +#[test(harness = set_up)] +async fn time_bucketed_fixed_size(app: DivviupApi) -> TestResult { + let mut leader = fixtures::aggregator(&app, None).await.into_active_model(); + leader.role = ActiveValue::Set(Role::Leader); + leader.features = + ActiveValue::Set(Features::from_iter([Feature::TimeBucketedFixedSize]).into()); + let leader = leader.update(app.db()).await?; + + let mut helper = fixtures::aggregator(&app, None).await.into_active_model(); + helper.role = ActiveValue::Set(Role::Helper); + let helper = helper.update(app.db()).await?; + + assert_errors( + &app, + &mut NewTask { + leader_aggregator_id: Some(leader.id.to_string()), + helper_aggregator_id: Some(helper.id.to_string()), + time_precision_seconds: Some(300), + max_batch_size: None, + batch_time_window_size_seconds: Some(300), + ..Default::default() + }, + "batch_time_window_size_seconds", + &["missing-max-batch-size"], + ) + .await; + + assert_errors( + &app, + &mut NewTask { + leader_aggregator_id: Some(leader.id.to_string()), + helper_aggregator_id: Some(helper.id.to_string()), + time_precision_seconds: Some(123), + min_batch_size: Some(100), + max_batch_size: Some(100), + batch_time_window_size_seconds: Some(300), + ..Default::default() + }, + "batch_time_window_size_seconds", + &["not-multiple-of-time-precision"], + ) + .await; + + assert_no_errors( + &app, + &mut NewTask { + leader_aggregator_id: Some(leader.id.to_string()), + helper_aggregator_id: Some(helper.id.to_string()), + time_precision_seconds: Some(123), + min_batch_size: Some(100), + max_batch_size: Some(100), + batch_time_window_size_seconds: Some(300), + ..Default::default() + }, + "leader_aggregator_id", + ) + .await; + + let mut leader = fixtures::aggregator(&app, None).await.into_active_model(); + leader.role = ActiveValue::Set(Role::Leader); + let leader = leader.update(app.db()).await?; + + assert_errors( + &app, + &mut NewTask { + leader_aggregator_id: Some(leader.id.to_string()), + helper_aggregator_id: Some(helper.id.to_string()), + time_precision_seconds: Some(300), + min_batch_size: Some(100), + max_batch_size: Some(100), + batch_time_window_size_seconds: Some(300), + ..Default::default() + }, + "leader_aggregator_id", + &["time-bucketed-fixed-size-unsupported"], + ) + .await; + + Ok(()) +} + #[test(harness = set_up)] async fn aggregator_roles(app: DivviupApi) -> TestResult { let mut leader = fixtures::aggregator(&app, None).await.into_active_model();