Skip to content

Commit

Permalink
Support for time bucketed fixed size
Browse files Browse the repository at this point in the history
  • Loading branch information
inahga committed Apr 12, 2024
1 parent 9a99601 commit e46d404
Show file tree
Hide file tree
Showing 13 changed files with 273 additions and 35 deletions.
4 changes: 4 additions & 0 deletions cli/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub enum TaskAction {
min_batch_size: u64,
#[arg(long)]
max_batch_size: Option<u64>,
#[arg(long, requires = "max_batch_size")]
batch_time_window_size_seconds: Option<u64>,
#[arg(long)]
time_precision: Duration,
#[arg(long)]
Expand Down Expand Up @@ -72,6 +74,7 @@ impl TaskAction {
vdaf,
min_batch_size,
max_batch_size,
batch_time_window_size_seconds,
time_precision,
collector_credential_id,
categorical_buckets,
Expand Down Expand Up @@ -129,6 +132,7 @@ impl TaskAction {
vdaf,
min_batch_size,
max_batch_size,
batch_time_window_size_seconds,
time_precision_seconds,
collector_credential_id,
};
Expand Down
2 changes: 2 additions & 0 deletions client/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub struct Task {
pub vdaf: Vdaf,
pub min_batch_size: u64,
pub max_batch_size: Option<u64>,
pub batch_time_window_size_seconds: Option<u64>,
#[serde(with = "time::serde::rfc3339")]
pub created_at: OffsetDateTime,
#[serde(with = "time::serde::rfc3339")]
Expand Down Expand Up @@ -42,6 +43,7 @@ pub struct NewTask {
pub vdaf: Vdaf,
pub min_batch_size: u64,
pub max_batch_size: Option<u64>,
pub batch_time_window_size_seconds: Option<u64>,
pub time_precision_seconds: u64,
pub collector_credential_id: Uuid,
}
Expand Down
43 changes: 42 additions & 1 deletion client/tests/integration/tasks.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::harness::{assert_eq, test, *};
use divviup_api::entity::aggregator::Features;
use divviup_api::entity::aggregator::{Feature, Features};
use divviup_client::{NewTask, Vdaf};

#[test(harness = with_configured_client)]
Expand Down Expand Up @@ -27,6 +27,7 @@ async fn create_task(app: Arc<DivviupApi>, account: Account, client: DivviupClie
vdaf: Vdaf::Count,
min_batch_size: fastrand::i64(100..).try_into().unwrap(),
max_batch_size: None,
batch_time_window_size_seconds: None,
time_precision_seconds: fastrand::u64(60..2592000),
collector_credential_id: collector_credential.id,
},
Expand All @@ -40,6 +41,46 @@ async fn create_task(app: Arc<DivviupApi>, account: Account, client: DivviupClie
Ok(())
}

#[test(harness = with_configured_client)]
async fn create_task_time_bucketed_fixed_size(
app: Arc<DivviupApi>,
account: Account,
client: DivviupClient,
) -> TestResult {
let (leader, helper) = fixtures::aggregator_pair(&app, &account).await;

let mut leader = leader.into_active_model();
leader.features =
ActiveValue::Set(Features::from_iter([Feature::TimeBucketedFixedSize]).into());
let leader = leader.update(app.db()).await?;

let collector_credential = fixtures::collector_credential(&app, &account).await;
let time_precision_seconds = fastrand::u64(60..2592000);
let min_batch_size = fastrand::i64(100..).try_into().unwrap();
let response_task = client
.create_task(
account.id,
NewTask {
name: fixtures::random_name(),
leader_aggregator_id: leader.id,
helper_aggregator_id: helper.id,
vdaf: Vdaf::Count,
min_batch_size,
max_batch_size: Some(min_batch_size),
batch_time_window_size_seconds: Some(time_precision_seconds * 2),
time_precision_seconds,
collector_credential_id: collector_credential.id,
},
)
.await?;
let task_from_db = Tasks::find_by_id(&response_task.id)
.one(app.db())
.await?
.unwrap();
assert_same_json_representation(&task_from_db, &response_task);
Ok(())
}

#[test(harness = with_configured_client)]
async fn rename_task(app: Arc<DivviupApi>, account: Account, client: DivviupClient) -> TestResult {
let task = fixtures::task(&app, &account).await;
Expand Down
5 changes: 5 additions & 0 deletions documentation/openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ paths:
type: number
max_batch_size:
type: number
batch_time_window_size_seconds:
type: number
expiration:
type: string
format: date-time
Expand Down Expand Up @@ -707,6 +709,9 @@ components:
max_batch_size:
type: number
nullable: true
batch_time_window_size_seconds:
type: number
nullable: true
time_precision_seconds:
type: number
report_count:
Expand Down
1 change: 1 addition & 0 deletions migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod m20231012_205810_add_features_to_aggregators;
mod m20231012_225001_rename_hpke_configs_to_collector_credentials;
mod m20231012_233117_add_token_hash_to_collector_credential;
mod m20240214_215101_upload_metrics;
mod m20240411_195358_time_bucketed_fixed_size;

pub struct Migrator;

Expand Down
39 changes: 39 additions & 0 deletions migration/src/m20240411_195358_time_bucketed_fixed_size.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
TableAlterStatement::new()
.table(Task::Table)
.add_column(
ColumnDef::new(Task::BatchTimeWindowSize)
.big_integer()
.null(),
)
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
TableAlterStatement::new()
.table(Task::Table)
.drop_column(Task::BatchTimeWindowSize)
.to_owned(),
)
.await
}
}

#[derive(Iden)]
enum Task {
Table,
BatchTimeWindowSize,
}
61 changes: 31 additions & 30 deletions src/clients/aggregator_client/api_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
Features, QueryTypeName, QueryTypeNameSet, Role as AggregatorRole, VdafNameSet,
},
task::vdaf::{BucketLength, ContinuousBuckets, CountVec, Histogram, Sum, SumVec, Vdaf},
Aggregator, Protocol, ProvisionableTask, Task,
Aggregator, NewTask, Protocol, ProvisionableTask, Task,
},
handler::Error,
};
Expand Down Expand Up @@ -153,44 +153,45 @@ impl From<AggregatorVdaf> for Vdaf {
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum QueryType {
TimeInterval,
FixedSize { max_batch_size: u64 },
FixedSize {
max_batch_size: u64,
#[serde(skip_serializing_if = "Option::is_none")]
batch_time_window_size: Option<u64>,
},
}

impl QueryType {
pub fn name(&self) -> QueryTypeName {
match self {
QueryType::TimeInterval => QueryTypeName::TimeInterval,
QueryType::FixedSize { .. } => QueryTypeName::FixedSize,
impl From<ProvisionableTask> for QueryType {
fn from(value: ProvisionableTask) -> Self {
if let Some(max_batch_size) = value.max_batch_size {
Self::FixedSize {
max_batch_size,
batch_time_window_size: value.batch_time_window_size_seconds,
}
} else {
Self::TimeInterval
}
}
}

impl From<QueryType> for Option<i64> {
fn from(value: QueryType) -> Self {
Option::<u64>::from(value).map(|u| u.try_into().unwrap())
}
}

impl From<QueryType> for Option<u64> {
fn from(value: QueryType) -> Self {
match value {
QueryType::TimeInterval => None,
QueryType::FixedSize { max_batch_size } => Some(max_batch_size),
impl From<NewTask> for QueryType {
fn from(value: NewTask) -> Self {
if let Some(max_batch_size) = value.max_batch_size {
Self::FixedSize {
max_batch_size,
batch_time_window_size: value.batch_time_window_size_seconds,
}
} else {
Self::TimeInterval
}
}
}

impl From<Option<u64>> for QueryType {
fn from(value: Option<u64>) -> Self {
value.map_or(QueryType::TimeInterval, |max_batch_size| {
QueryType::FixedSize { max_batch_size }
})
}
}

impl From<Option<i64>> for QueryType {
fn from(value: Option<i64>) -> Self {
value.map(|i| u64::try_from(i).unwrap()).into()
impl QueryType {
pub fn name(&self) -> QueryTypeName {
match self {
QueryType::TimeInterval => QueryTypeName::TimeInterval,
QueryType::FixedSize { .. } => QueryTypeName::FixedSize,
}
}
}

Expand Down Expand Up @@ -294,7 +295,7 @@ impl TaskCreate {
} else {
new_task.leader_aggregator.dap_url.clone().into()
},
query_type: new_task.max_batch_size.into(),
query_type: new_task.clone().into(),
vdaf: new_task.aggregator_vdaf.clone(),
role,
max_batch_query_count: 1,
Expand Down
5 changes: 5 additions & 0 deletions src/entity/aggregator/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::collections::HashSet;
pub enum Feature {
TokenHash,
UploadMetrics,
TimeBucketedFixedSize,
#[serde(untagged)]
Unknown(String),
}
Expand Down Expand Up @@ -79,6 +80,10 @@ mod tests {
serde_json::from_value::<Features>(json!(["TokenHash", "UploadMetrics"])).unwrap(),
Features::from_iter([Feature::TokenHash, Feature::UploadMetrics])
);
assert_eq!(
serde_json::from_value::<Features>(json!(["TimeBucketedFixedSize"])).unwrap(),
Features::from_iter([Feature::TimeBucketedFixedSize])
);
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions src/entity/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub struct Model {
pub vdaf: Json<Vdaf>,
pub min_batch_size: i64,
pub max_batch_size: Option<i64>,
pub batch_time_window_size_seconds: Option<i64>,
#[serde(with = "time::serde::rfc3339")]
pub created_at: OffsetDateTime,
#[serde(with = "time::serde::rfc3339")]
Expand Down
41 changes: 39 additions & 2 deletions src/entity/task/new_task.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use super::*;
use crate::{
clients::aggregator_client::api_types::{AggregatorVdaf, QueryType},
entity::{aggregator::Role, Account, CollectorCredential, Protocol},
entity::{
aggregator::{Feature, Role},
Account, CollectorCredential, Protocol,
},
handler::Error,
};
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
Expand Down Expand Up @@ -30,6 +33,9 @@ pub struct NewTask {
#[validate(range(min = 0))]
pub max_batch_size: Option<u64>,

#[validate(range(min = 0))]
pub batch_time_window_size_seconds: Option<u64>,

#[validate(
required,
range(
Expand Down Expand Up @@ -90,6 +96,26 @@ impl NewTask {
}
}

fn validate_batch_time_window_size(&self, errors: &mut ValidationErrors) {
let window = self.batch_time_window_size_seconds;
if let Some(window) = window {
if self.max_batch_size.is_none() {
errors.add(
"batch_time_window_size_seconds",
ValidationError::new("missing-max-batch-size"),
);
}
if let Some(precision) = self.time_precision_seconds {
if window % precision != 0 {
errors.add(
"batch_time_window_size_seconds",
ValidationError::new("not-multiple-of-time-precision"),
);
}
}
}
}

async fn load_collector_credential(
&self,
account: &Account,
Expand Down Expand Up @@ -192,6 +218,15 @@ impl NewTask {
errors.add("helper_aggregator_id", ValidationError::new("role"))
}

if self.batch_time_window_size_seconds.is_some()
&& !leader.features.contains(&Feature::TimeBucketedFixedSize)
{
errors.add(
"leader_aggregator_id",
ValidationError::new("time-bucketed-fixed-size-unsupported"),
)
}

if errors.is_empty() {
Some((leader, helper, resolved_protocol))
} else {
Expand Down Expand Up @@ -259,7 +294,7 @@ impl NewTask {
helper: &Aggregator,
errors: &mut ValidationErrors,
) {
let name = QueryType::from(self.max_batch_size).name();
let name = QueryType::from(self.clone()).name();
if !leader.query_types.contains(&name) || !helper.query_types.contains(&name) {
errors.add("max_batch_size", ValidationError::new("not-supported"));
}
Expand All @@ -272,6 +307,7 @@ impl NewTask {
) -> Result<ProvisionableTask, ValidationErrors> {
let mut errors = Validate::validate(self).err().unwrap_or_default();
self.validate_min_lte_max(&mut errors);
self.validate_batch_time_window_size(&mut errors);
let aggregators = self.validate_aggregators(&account, db, &mut errors).await;
let collector_credential = self
.validate_collector_credential(
Expand Down Expand Up @@ -312,6 +348,7 @@ impl NewTask {
aggregator_vdaf: aggregator_vdaf.unwrap(),
min_batch_size: self.min_batch_size.unwrap(),
max_batch_size: self.max_batch_size,
batch_time_window_size_seconds: self.batch_time_window_size_seconds,
expiration: Some(OffsetDateTime::now_utc() + DEFAULT_EXPIRATION_DURATION),
time_precision_seconds: self.time_precision_seconds.unwrap(),
collector_credential: collector_credential.unwrap(),
Expand Down
Loading

0 comments on commit e46d404

Please sign in to comment.