Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for time bucketed fixed size #968

Merged
merged 5 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/src/ApiClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ export interface Task {
updated_at: string;
expiration: string | null;
max_batch_size: number | null;
batch_time_window_size_seconds: number | null;
collector_credential_id: string;
report_counter_interval_collected: number;
report_counter_decode_failure: number;
Expand Down
58 changes: 47 additions & 11 deletions app/src/tasks/TaskDetail/TaskPropertyTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,54 @@ export default function TaskPropertyTable() {
</Await>
</Suspense>
</ListGroup.Item>
<ListGroup.Item>
Query Type:{" "}
<Suspense fallback={<Placeholder animation="glow" xs={6} />}>
<Await resolve={task}>
{(task) =>
typeof task.max_batch_size === "number"
? `Fixed maximum batch size ${task.max_batch_size}`
: "Time Interval"
<Suspense
fallback={
<ListGroup.Item>
<Placeholder animation="glow" xs={6} />
</ListGroup.Item>
}
>
<Await resolve={task}>
{(task) => {
let queryType;
let maxBatchSize;
let batchTimeWindowSize;

if (typeof task.max_batch_size === "number") {
maxBatchSize = (
<ListGroup.Item>
Maximum Batch Size: {task.max_batch_size}
</ListGroup.Item>
);

if (typeof task.batch_time_window_size_seconds === "number") {
queryType = "Time-bucketed Fixed Size";

batchTimeWindowSize = (
<ListGroup.Item>
Batch Time Window Size:{" "}
{humanizeDuration(
1000 * task.batch_time_window_size_seconds,
)}
</ListGroup.Item>
);
} else {
queryType = "Fixed Size";
}
} else {
queryType = "Time Interval";
}
</Await>
</Suspense>
</ListGroup.Item>

return (
<>
<ListGroup.Item>Query Type: {queryType}</ListGroup.Item>
{batchTimeWindowSize}
{maxBatchSize}
</>
);
}}
</Await>
</Suspense>
<ListGroup.Item>
Minimum Batch Size:{" "}
<Suspense fallback={<Placeholder animation="glow" xs={6} />}>
Expand Down
6 changes: 6 additions & 0 deletions cli/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub enum TaskAction {
min_batch_size: u64,
#[arg(long)]
max_batch_size: Option<u64>,
#[arg(long, requires = "max_batch_size")]
batch_time_window_size: Option<Duration>,
#[arg(long)]
time_precision: Duration,
#[arg(long)]
Expand Down Expand Up @@ -72,6 +74,7 @@ impl TaskAction {
vdaf,
min_batch_size,
max_batch_size,
batch_time_window_size,
time_precision,
collector_credential_id,
categorical_buckets,
Expand Down Expand Up @@ -121,6 +124,8 @@ impl TaskAction {
};

let time_precision_seconds = time_precision.as_secs();
let batch_time_window_size_seconds =
batch_time_window_size.map(|window| window.as_secs());

let task = NewTask {
name,
Expand All @@ -129,6 +134,7 @@ impl TaskAction {
vdaf,
min_batch_size,
max_batch_size,
batch_time_window_size_seconds,
time_precision_seconds,
collector_credential_id,
};
Expand Down
2 changes: 2 additions & 0 deletions client/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub struct Task {
pub vdaf: Vdaf,
pub min_batch_size: u64,
pub max_batch_size: Option<u64>,
pub batch_time_window_size_seconds: Option<u64>,
#[serde(with = "time::serde::rfc3339")]
pub created_at: OffsetDateTime,
#[serde(with = "time::serde::rfc3339")]
Expand Down Expand Up @@ -42,6 +43,7 @@ pub struct NewTask {
pub vdaf: Vdaf,
pub min_batch_size: u64,
pub max_batch_size: Option<u64>,
pub batch_time_window_size_seconds: Option<u64>,
pub time_precision_seconds: u64,
pub collector_credential_id: Uuid,
}
Expand Down
43 changes: 42 additions & 1 deletion client/tests/integration/tasks.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::harness::{assert_eq, test, *};
use divviup_api::entity::aggregator::Features;
use divviup_api::entity::aggregator::{Feature, Features};
use divviup_client::{NewTask, Vdaf};

#[test(harness = with_configured_client)]
Expand Down Expand Up @@ -27,6 +27,7 @@ async fn create_task(app: Arc<DivviupApi>, account: Account, client: DivviupClie
vdaf: Vdaf::Count,
min_batch_size: fastrand::i64(100..).try_into().unwrap(),
max_batch_size: None,
batch_time_window_size_seconds: None,
time_precision_seconds: fastrand::u64(60..2592000),
collector_credential_id: collector_credential.id,
},
Expand All @@ -40,6 +41,46 @@ async fn create_task(app: Arc<DivviupApi>, account: Account, client: DivviupClie
Ok(())
}

#[test(harness = with_configured_client)]
async fn create_task_time_bucketed_fixed_size(
app: Arc<DivviupApi>,
account: Account,
client: DivviupClient,
) -> TestResult {
let (leader, helper) = fixtures::aggregator_pair(&app, &account).await;

let mut leader = leader.into_active_model();
leader.features =
ActiveValue::Set(Features::from_iter([Feature::TimeBucketedFixedSize]).into());
let leader = leader.update(app.db()).await?;

let collector_credential = fixtures::collector_credential(&app, &account).await;
let time_precision_seconds = fastrand::u64(60..2592000);
let min_batch_size = fastrand::i64(100..).try_into().unwrap();
let response_task = client
.create_task(
account.id,
NewTask {
name: fixtures::random_name(),
leader_aggregator_id: leader.id,
helper_aggregator_id: helper.id,
vdaf: Vdaf::Count,
min_batch_size,
max_batch_size: Some(min_batch_size),
batch_time_window_size_seconds: Some(time_precision_seconds * 2),
time_precision_seconds,
collector_credential_id: collector_credential.id,
},
)
.await?;
let task_from_db = Tasks::find_by_id(&response_task.id)
.one(app.db())
.await?
.unwrap();
assert_same_json_representation(&task_from_db, &response_task);
Ok(())
}

#[test(harness = with_configured_client)]
async fn rename_task(app: Arc<DivviupApi>, account: Account, client: DivviupClient) -> TestResult {
let task = fixtures::task(&app, &account).await;
Expand Down
5 changes: 5 additions & 0 deletions documentation/openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ paths:
type: number
max_batch_size:
type: number
batch_time_window_size_seconds:
type: number
expiration:
type: string
format: date-time
Expand Down Expand Up @@ -707,6 +709,9 @@ components:
max_batch_size:
type: number
nullable: true
batch_time_window_size_seconds:
type: number
nullable: true
time_precision_seconds:
type: number
report_count:
Expand Down
2 changes: 2 additions & 0 deletions migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod m20231012_205810_add_features_to_aggregators;
mod m20231012_225001_rename_hpke_configs_to_collector_credentials;
mod m20231012_233117_add_token_hash_to_collector_credential;
mod m20240214_215101_upload_metrics;
mod m20240411_195358_time_bucketed_fixed_size;

pub struct Migrator;

Expand Down Expand Up @@ -53,6 +54,7 @@ impl MigratorTrait for Migrator {
Box::new(m20231012_225001_rename_hpke_configs_to_collector_credentials::Migration),
Box::new(m20231012_233117_add_token_hash_to_collector_credential::Migration),
Box::new(m20240214_215101_upload_metrics::Migration),
Box::new(m20240411_195358_time_bucketed_fixed_size::Migration),
]
}
}
39 changes: 39 additions & 0 deletions migration/src/m20240411_195358_time_bucketed_fixed_size.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
TableAlterStatement::new()
.table(Task::Table)
.add_column(
ColumnDef::new(Task::BatchTimeWindowSizeSeconds)
.big_integer()
.null(),
)
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
TableAlterStatement::new()
.table(Task::Table)
.drop_column(Task::BatchTimeWindowSizeSeconds)
.to_owned(),
)
.await
}
}

#[derive(Iden)]
enum Task {
Table,
BatchTimeWindowSizeSeconds,
}
37 changes: 6 additions & 31 deletions src/clients/aggregator_client/api_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,11 @@ impl From<AggregatorVdaf> for Vdaf {
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum QueryType {
TimeInterval,
FixedSize { max_batch_size: u64 },
FixedSize {
max_batch_size: u64,
#[serde(skip_serializing_if = "Option::is_none")]
batch_time_window_size: Option<u64>,
},
}

impl QueryType {
Expand All @@ -165,35 +169,6 @@ impl QueryType {
}
}

impl From<QueryType> for Option<i64> {
fn from(value: QueryType) -> Self {
Option::<u64>::from(value).map(|u| u.try_into().unwrap())
}
}

impl From<QueryType> for Option<u64> {
fn from(value: QueryType) -> Self {
match value {
QueryType::TimeInterval => None,
QueryType::FixedSize { max_batch_size } => Some(max_batch_size),
}
}
}

impl From<Option<u64>> for QueryType {
fn from(value: Option<u64>) -> Self {
value.map_or(QueryType::TimeInterval, |max_batch_size| {
QueryType::FixedSize { max_batch_size }
})
}
}

impl From<Option<i64>> for QueryType {
fn from(value: Option<i64>) -> Self {
value.map(|i| u64::try_from(i).unwrap()).into()
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "type")]
pub enum AuthenticationToken {
Expand Down Expand Up @@ -294,7 +269,7 @@ impl TaskCreate {
} else {
new_task.leader_aggregator.dap_url.clone().into()
},
query_type: new_task.max_batch_size.into(),
query_type: new_task.query_type(),
vdaf: new_task.aggregator_vdaf.clone(),
role,
max_batch_query_count: 1,
Expand Down
5 changes: 5 additions & 0 deletions src/entity/aggregator/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::collections::HashSet;
pub enum Feature {
TokenHash,
UploadMetrics,
TimeBucketedFixedSize,
#[serde(untagged)]
Unknown(String),
}
Expand Down Expand Up @@ -79,6 +80,10 @@ mod tests {
serde_json::from_value::<Features>(json!(["TokenHash", "UploadMetrics"])).unwrap(),
Features::from_iter([Feature::TokenHash, Feature::UploadMetrics])
);
assert_eq!(
serde_json::from_value::<Features>(json!(["TimeBucketedFixedSize"])).unwrap(),
Features::from_iter([Feature::TimeBucketedFixedSize])
);
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions src/entity/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub struct Model {
pub vdaf: Json<Vdaf>,
pub min_batch_size: i64,
pub max_batch_size: Option<i64>,
pub batch_time_window_size_seconds: Option<i64>,
#[serde(with = "time::serde::rfc3339")]
pub created_at: OffsetDateTime,
#[serde(with = "time::serde::rfc3339")]
Expand Down
Loading
Loading