Skip to content

Commit

Permalink
config: Standardize time suffixes (#3288)
Browse files Browse the repository at this point in the history
* config: Standardize time suffixes

* Add TODOs for later cleanup
  • Loading branch information
inahga committed Jul 10, 2024
1 parent c4994ec commit b6fb2a5
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 143 deletions.
15 changes: 9 additions & 6 deletions aggregator/src/binaries/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ pub async fn main_callback(ctx: BinaryContext<RealClock, Options, Config>) -> Re
ctx.datastore,
ctx.meter,
ctx.config.batch_aggregation_shard_count,
Duration::from_secs(ctx.config.tasks_update_frequency_secs),
Duration::from_secs(ctx.config.aggregation_job_creation_interval_secs),
Duration::from_secs(ctx.config.tasks_update_frequency_s),
Duration::from_secs(ctx.config.aggregation_job_creation_interval_s),
ctx.config.min_aggregation_job_size,
ctx.config.max_aggregation_job_size,
ctx.config.aggregation_job_creation_report_window,
Expand Down Expand Up @@ -66,6 +66,7 @@ impl BinaryOptions for Options {
///
/// let _decoded: Config = serde_yaml::from_str(yaml_config).unwrap();
/// ```
// TODO(#3293): remove aliases during next breaking changes window.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Config {
#[serde(flatten)]
Expand All @@ -76,9 +77,11 @@ pub struct Config {
/// the cost of collection.
pub batch_aggregation_shard_count: u64,
/// How frequently we look for new tasks to start creating aggregation jobs for, in seconds.
pub tasks_update_frequency_secs: u64,
#[serde(alias = "tasks_update_frequency_secs")]
pub tasks_update_frequency_s: u64,
/// How frequently we attempt to create new aggregation jobs for each task, in seconds.
pub aggregation_job_creation_interval_secs: u64,
#[serde(alias = "aggregation_job_creation_interval_secs")]
pub aggregation_job_creation_interval_s: u64,
/// The minimum number of client reports to include in an aggregation job. Applies to the
/// "current" batch only; historical batches will create aggregation jobs of any size, on the
/// theory that almost all reports will have be received for these batches already.
Expand Down Expand Up @@ -132,8 +135,8 @@ mod tests {
max_transaction_retries: default_max_transaction_retries(),
},
batch_aggregation_shard_count: 32,
tasks_update_frequency_secs: 3600,
aggregation_job_creation_interval_secs: 60,
tasks_update_frequency_s: 3600,
aggregation_job_creation_interval_s: 60,
min_aggregation_job_size: 100,
max_aggregation_job_size: 500,
aggregation_job_creation_report_window: 5000,
Expand Down
27 changes: 13 additions & 14 deletions aggregator/src/binaries/aggregation_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,33 @@ pub async fn main_callback(ctx: BinaryContext<RealClock, Options, Config>) -> Re
reqwest::Client::builder()
.user_agent(CLIENT_USER_AGENT)
.timeout(Duration::from_secs(
ctx.config.job_driver_config.http_request_timeout_secs,
ctx.config.job_driver_config.http_request_timeout_s,
))
.connect_timeout(Duration::from_secs(
ctx.config
.job_driver_config
.http_request_connection_timeout_secs,
.http_request_connection_timeout_s,
))
.build()
.context("couldn't create HTTP client")?,
ctx.config.job_driver_config.retry_config(),
&ctx.meter,
ctx.config.batch_aggregation_shard_count,
));
let lease_duration =
Duration::from_secs(ctx.config.job_driver_config.worker_lease_duration_secs);
let lease_duration = Duration::from_secs(ctx.config.job_driver_config.worker_lease_duration_s);

// Start running.
let job_driver = Arc::new(JobDriver::new(
ctx.clock,
TokioRuntime,
ctx.meter,
ctx.stopper,
Duration::from_secs(ctx.config.job_driver_config.job_discovery_interval_secs),
Duration::from_secs(ctx.config.job_driver_config.job_discovery_interval_s),
ctx.config.job_driver_config.max_concurrent_job_workers,
Duration::from_secs(
ctx.config
.job_driver_config
.worker_lease_clock_skew_allowance_secs,
.worker_lease_clock_skew_allowance_s,
),
aggregation_job_driver
.make_incomplete_job_acquirer_callback(Arc::clone(&datastore), lease_duration),
Expand Down Expand Up @@ -161,16 +160,16 @@ mod tests {
max_transaction_retries: default_max_transaction_retries(),
},
job_driver_config: JobDriverConfig {
job_discovery_interval_secs: 10,
job_discovery_interval_s: 10,
max_concurrent_job_workers: 10,
worker_lease_duration_secs: 600,
worker_lease_clock_skew_allowance_secs: 60,
worker_lease_duration_s: 600,
worker_lease_clock_skew_allowance_s: 60,
maximum_attempts_before_failure: 5,
http_request_timeout_secs: 10,
http_request_connection_timeout_secs: 30,
retry_initial_interval_millis: 1000,
retry_max_interval_millis: 30_000,
retry_max_elapsed_time_millis: 300_000,
http_request_timeout_s: 10,
http_request_connection_timeout_s: 30,
retry_initial_interval_ms: 1000,
retry_max_interval_ms: 30_000,
retry_max_elapsed_time_ms: 300_000,
},
batch_aggregation_shard_count: 32,
taskprov_config: TaskprovConfig::default(),
Expand Down
9 changes: 5 additions & 4 deletions aggregator/src/binaries/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,9 @@ pub struct Config {
/// Defines how long to cache tasks for, in seconds. This affects how often the aggregator
/// becomes aware of task parameter changes. If unspecified, default is defined by
/// [`TASK_AGGREGATOR_CACHE_DEFAULT_TTL`]. You shouldn't normally have to specify this.
#[serde(default)]
pub task_cache_ttl_seconds: Option<u64>,
// TODO(#3293): remove this alias during next breaking changes window.
#[serde(default, alias = "task_cache_ttl_seconds")]
pub task_cache_ttl_s: Option<u64>,

/// Defines how many tasks can be cached. This affects how much memory the aggregator might use
/// to store cached tasks. If unspecified, default is defined by
Expand Down Expand Up @@ -464,7 +465,7 @@ impl Config {
.as_deref()
.map(parse_pem_ec_private_key)
.transpose()?,
task_cache_ttl: match self.task_cache_ttl_seconds {
task_cache_ttl: match self.task_cache_ttl_s {
Some(ttl) => Duration::from_secs(ttl),
None => TASK_AGGREGATOR_CACHE_DEFAULT_TTL,
},
Expand Down Expand Up @@ -601,7 +602,7 @@ mod tests {
task_counter_shard_count: 64,
taskprov_config: TaskprovConfig::default(),
global_hpke_configs_refresh_interval: Some(42),
task_cache_ttl_seconds: None,
task_cache_ttl_s: None,
task_cache_capacity: None,
log_forbidden_mutations: Some(PathBuf::from("/tmp/events")),
require_global_hpke_keys: true,
Expand Down
54 changes: 30 additions & 24 deletions aggregator/src/binaries/collection_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,40 +22,39 @@ pub async fn main_callback(ctx: BinaryContext<RealClock, Options, Config>) -> Re
reqwest::Client::builder()
.user_agent(CLIENT_USER_AGENT)
.timeout(Duration::from_secs(
ctx.config.job_driver_config.http_request_timeout_secs,
ctx.config.job_driver_config.http_request_timeout_s,
))
.connect_timeout(Duration::from_secs(
ctx.config
.job_driver_config
.http_request_connection_timeout_secs,
.http_request_connection_timeout_s,
))
.build()
.context("couldn't create HTTP client")?,
ctx.config.job_driver_config.retry_config(),
&ctx.meter,
ctx.config.batch_aggregation_shard_count,
RetryStrategy::new(
Duration::from_secs(ctx.config.min_collection_job_retry_delay_secs),
Duration::from_secs(ctx.config.max_collection_job_retry_delay_secs),
Duration::from_secs(ctx.config.min_collection_job_retry_delay_s),
Duration::from_secs(ctx.config.max_collection_job_retry_delay_s),
ctx.config.collection_job_retry_delay_exponential_factor,
)
.context("Couldn't create collection retry strategy")?,
));
let lease_duration =
Duration::from_secs(ctx.config.job_driver_config.worker_lease_duration_secs);
let lease_duration = Duration::from_secs(ctx.config.job_driver_config.worker_lease_duration_s);

// Start running.
let job_driver = Arc::new(JobDriver::new(
ctx.clock,
TokioRuntime,
ctx.meter,
ctx.stopper,
Duration::from_secs(ctx.config.job_driver_config.job_discovery_interval_secs),
Duration::from_secs(ctx.config.job_driver_config.job_discovery_interval_s),
ctx.config.job_driver_config.max_concurrent_job_workers,
Duration::from_secs(
ctx.config
.job_driver_config
.worker_lease_clock_skew_allowance_secs,
.worker_lease_clock_skew_allowance_s,
),
collection_job_driver
.make_incomplete_job_acquirer_callback(Arc::clone(&datastore), lease_duration),
Expand Down Expand Up @@ -115,6 +114,7 @@ impl BinaryOptions for Options {
///
/// let _decoded: Config = serde_yaml::from_str(yaml_config).unwrap();
/// ```
// TODO(#3293): remove aliases during next breaking changes window.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Config {
#[serde(flatten)]
Expand All @@ -129,13 +129,19 @@ pub struct Config {

/// The minimum duration to wait, in seconds, before retrying a collection job that has been
/// stepped but was not ready yet because not all included reports had finished aggregation.
#[serde(default = "Config::default_min_collection_job_retry_delay_secs")]
pub min_collection_job_retry_delay_secs: u64,
#[serde(
default = "Config::default_min_collection_job_retry_delay_s",
alias = "min_collection_job_retry_delay_secs"
)]
pub min_collection_job_retry_delay_s: u64,

/// The maximum duration to wait, in seconds, before retrying a collection job that has been
/// stepped but was not ready yet because not all included reports had finished aggregation.
#[serde(default = "Config::default_max_collection_job_retry_delay_secs")]
pub max_collection_job_retry_delay_secs: u64,
#[serde(
default = "Config::default_max_collection_job_retry_delay_s",
alias = "max_collection_job_retry_delay_secs"
)]
pub max_collection_job_retry_delay_s: u64,

/// The exponential factor to use when computing a retry delay when retrying a collection job
/// that has been stepped but was not ready yet because not all included reports had finished
Expand All @@ -145,11 +151,11 @@ pub struct Config {
}

impl Config {
fn default_min_collection_job_retry_delay_secs() -> u64 {
fn default_min_collection_job_retry_delay_s() -> u64 {
600
}

fn default_max_collection_job_retry_delay_secs() -> u64 {
fn default_max_collection_job_retry_delay_s() -> u64 {
3600
}

Expand Down Expand Up @@ -196,20 +202,20 @@ mod tests {
max_transaction_retries: default_max_transaction_retries(),
},
job_driver_config: JobDriverConfig {
job_discovery_interval_secs: 10,
job_discovery_interval_s: 10,
max_concurrent_job_workers: 10,
worker_lease_duration_secs: 600,
worker_lease_clock_skew_allowance_secs: 60,
worker_lease_duration_s: 600,
worker_lease_clock_skew_allowance_s: 60,
maximum_attempts_before_failure: 5,
http_request_timeout_secs: 10,
http_request_connection_timeout_secs: 30,
retry_initial_interval_millis: 1000,
retry_max_interval_millis: 30_000,
retry_max_elapsed_time_millis: 300_000,
http_request_timeout_s: 10,
http_request_connection_timeout_s: 30,
retry_initial_interval_ms: 1000,
retry_max_interval_ms: 30_000,
retry_max_elapsed_time_ms: 300_000,
},
batch_aggregation_shard_count: 32,
min_collection_job_retry_delay_secs: 600,
max_collection_job_retry_delay_secs: 3600,
min_collection_job_retry_delay_s: 600,
max_collection_job_retry_delay_s: 3600,
collection_job_retry_delay_exponential_factor: 1.25,
})
}
Expand Down
4 changes: 2 additions & 2 deletions aggregator/src/binary_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub async fn database_pool(db_config: &DbConfig, db_password: Option<&str>) -> R
database_config.password(pass);
}

let connection_pool_timeout = Duration::from_secs(db_config.connection_pool_timeouts_secs);
let connection_pool_timeout = Duration::from_secs(db_config.connection_pool_timeouts_s);

let conn_mgr = if let Some(ref path) = db_config.tls_trust_store_path {
let root_store = load_pem_trust_store(path).context("failed to load TLS trust store")?;
Expand Down Expand Up @@ -706,7 +706,7 @@ mod tests {
url: format!("postgres://postgres@127.0.0.1:{port}/postgres?sslmode=require")
.parse()
.unwrap(),
connection_pool_timeouts_secs: 5,
connection_pool_timeouts_s: 5,
connection_pool_max_size: None,
check_schema_version: false,
tls_trust_store_path: Some("tests/tls_files/rootCA.pem".into()),
Expand Down
Loading

0 comments on commit b6fb2a5

Please sign in to comment.