diff --git a/crates/bin/ampctl/src/cmd/dataset/deploy.rs b/crates/bin/ampctl/src/cmd/dataset/deploy.rs index e26a0bb82..61cad87f1 100644 --- a/crates/bin/ampctl/src/cmd/dataset/deploy.rs +++ b/crates/bin/ampctl/src/cmd/dataset/deploy.rs @@ -134,7 +134,8 @@ async fn deploy_dataset( let client = global.build_client().map_err(Error::ClientBuild)?; let job_id = client .datasets() - .deploy(dataset_ref, end_block, parallelism, worker_id, verify) + // TODO: Accept retry strategy + .deploy(dataset_ref, end_block, parallelism, worker_id, verify, None) .await .map_err(Error::Deploy)?; diff --git a/crates/bin/ampctl/src/cmd/job/create.rs b/crates/bin/ampctl/src/cmd/job/create.rs index 65a845960..6b5133ec6 100644 --- a/crates/bin/ampctl/src/cmd/job/create.rs +++ b/crates/bin/ampctl/src/cmd/job/create.rs @@ -15,6 +15,8 @@ pub async fn run(Args { global, kind }: Args) -> Result<(), Error> { let request = match kind { JobKind::Gc(gc_args) => CreateJobRequest::Gc { location_id: gc_args.location_id, + // TODO: Accept retry_strategy from args + retry_strategy: None, }, }; diff --git a/crates/clients/admin/src/datasets.rs b/crates/clients/admin/src/datasets.rs index eee67a3cb..e018c167a 100644 --- a/crates/clients/admin/src/datasets.rs +++ b/crates/clients/admin/src/datasets.rs @@ -2,7 +2,7 @@ //! //! Provides methods for interacting with the `/datasets` endpoints of the admin API. -use amp_job_core::job_id::JobId; +use amp_job_core::{job_id::JobId, retry_strategy::RetryStrategy}; use amp_worker_core::node_id::{InvalidIdError, NodeId, validate_node_id}; use datasets_common::{ dataset_kind_str::DatasetKindStr, fqn::FullyQualifiedName, hash::Hash, name::Name, @@ -274,6 +274,7 @@ impl<'a> DatasetsClient<'a> { parallelism: u16, worker_id: Option, verify: bool, + retry_strategy: Option, ) -> Result { let namespace = dataset_ref.namespace(); let name = dataset_ref.name(); @@ -292,6 +293,7 @@ impl<'a> DatasetsClient<'a> { parallelism, worker_id, verify, + retry_strategy, }; let response = self @@ -1412,6 +1414,8 @@ struct DeployRequest { worker_id: Option, #[serde(skip_serializing_if = "std::ops::Not::not")] verify: bool, + #[serde(skip_serializing_if = "Option::is_none")] + retry_strategy: Option, } /// Input type for dataset registration manifest parameter. diff --git a/crates/clients/admin/src/jobs.rs b/crates/clients/admin/src/jobs.rs index b2151fd4c..0fcf114fd 100644 --- a/crates/clients/admin/src/jobs.rs +++ b/crates/clients/admin/src/jobs.rs @@ -4,7 +4,7 @@ use std::collections::BTreeMap; -use amp_job_core::job_id::JobId; +use amp_job_core::{job_id::JobId, retry_strategy::RetryStrategy}; use monitoring::logging; use super::{ @@ -1361,6 +1361,9 @@ pub enum CreateJobRequest { Gc { /// The location ID of the physical table revision to garbage collect. location_id: i64, + /// The retry strategy for this job. + #[serde(default, skip_serializing_if = "Option::is_none")] + retry_strategy: Option, }, } diff --git a/crates/controller-admin-jobs/src/datasets/handlers/deploy.rs b/crates/controller-admin-jobs/src/datasets/handlers/deploy.rs index 46f4b2941..9d10f27a8 100644 --- a/crates/controller-admin-jobs/src/datasets/handlers/deploy.rs +++ b/crates/controller-admin-jobs/src/datasets/handlers/deploy.rs @@ -1,7 +1,7 @@ use std::fmt::Debug; use amp_datasets_registry::error::{ListVersionTagsError, ResolveRevisionError}; -use amp_job_core::job_id::JobId; +use amp_job_core::{job_id::JobId, retry_strategy::RetryStrategy}; use amp_job_materialize_datasets_derived::job_descriptor::JobDescriptor as MaterializeDerivedDatasetJobDescriptor; use amp_job_materialize_datasets_raw::job_descriptor::JobDescriptor as MaterializeRawDatasetJobDescriptor; use axum::{ @@ -37,6 +37,7 @@ use crate::{ /// - `parallelism`: Number of parallel workers (default: 1, only for raw datasets) /// - `worker_id`: Optional worker selector (exact ID or glob pattern) /// - `verify`: Enable cryptographic verification of EVM block data (default: false) +/// - `retry_strategy`: Optional retry strategy controlling how failed jobs are retried /// /// ## Response /// - **202 Accepted**: Job successfully scheduled @@ -109,6 +110,7 @@ pub async fn handler( parallelism, worker_id, verify, + retry_strategy, } = match json { Ok(Json(request)) => request, Err(err) => { @@ -151,29 +153,33 @@ pub async fn handler( // Build the job descriptor and compute idempotency key based on dataset kind let (job_descriptor, idempotency_key) = if dataset.kind() == DerivedDatasetKind { ( - JobDescriptor::from(MaterializeDerivedDatasetJobDescriptor { - end_block: end_block.into(), - dataset_namespace: reference.namespace().clone(), - dataset_name: reference.name().clone(), - manifest_hash: reference.hash().clone(), - }), + JobDescriptor::materialize_derived( + MaterializeDerivedDatasetJobDescriptor { + end_block: end_block.into(), + dataset_namespace: reference.namespace().clone(), + dataset_name: reference.name().clone(), + manifest_hash: reference.hash().clone(), + }, + retry_strategy, + ), amp_job_materialize_datasets_derived::job_key::idempotency_key(&reference), ) } else { ( - MaterializeRawDatasetJobDescriptor { - end_block: end_block.into(), - max_writers: parallelism, - dataset_namespace: reference.namespace().clone(), - dataset_name: reference.name().clone(), - manifest_hash: reference.hash().clone(), - verify, - } - .into(), + JobDescriptor::materialize_raw( + MaterializeRawDatasetJobDescriptor { + end_block: end_block.into(), + max_writers: parallelism, + dataset_namespace: reference.namespace().clone(), + dataset_name: reference.name().clone(), + manifest_hash: reference.hash().clone(), + verify, + }, + retry_strategy, + ), amp_job_materialize_datasets_raw::job_key::idempotency_key(&reference), ) }; - // Schedule the extraction job using the scheduler let job_id = ctx .scheduler @@ -270,6 +276,18 @@ pub struct DeployRequest { /// Defaults to false if not specified. #[serde(default)] pub verify: bool, + + /// Optional retry strategy controlling how failed jobs are retried. + /// + /// Supports three strategies: + /// - `none`: Never retry — fail immediately on first error. + /// - `bounded`: Retry up to `max_attempts` times with configurable backoff. + /// - `unless_stopped`: Retry indefinitely until the job is explicitly stopped. + /// + /// If not specified, the scheduler applies its default strategy. + #[serde(default)] + #[cfg_attr(feature = "utoipa", schema(value_type = Option))] + pub retry_strategy: Option, } fn default_parallelism() -> u16 { diff --git a/crates/controller-admin-jobs/src/jobs/handlers/create.rs b/crates/controller-admin-jobs/src/jobs/handlers/create.rs index 27328b03a..62e354cc8 100644 --- a/crates/controller-admin-jobs/src/jobs/handlers/create.rs +++ b/crates/controller-admin-jobs/src/jobs/handlers/create.rs @@ -1,6 +1,6 @@ //! Job creation handler -use amp_job_core::job_id::JobId; +use amp_job_core::{job_id::JobId, retry_strategy::RetryStrategy}; use amp_job_gc::job_descriptor::JobDescriptor as GcJobDescriptor; use amp_job_materialize_datasets_derived::job_descriptor::JobDescriptor as MaterializeDerivedJobDescriptor; use amp_job_materialize_datasets_raw::job_descriptor::JobDescriptor as MaterializeRawJobDescriptor; @@ -25,14 +25,20 @@ use crate::{ /// or `materialize-derived`). Additional fields depend on the job kind and are /// defined by the corresponding job descriptor type. /// +/// ## Request Body +/// - `kind`: Job type (`gc`, `materialize-raw`, or `materialize-derived`) +/// - `worker_id`: Optional worker selector (exact ID or glob pattern) +/// - `retry_strategy`: Optional retry strategy controlling how failed jobs are retried +/// - Additional fields vary by job kind (see [`CreateJobRequest`] variants) +/// /// ## Response /// - **202 Accepted**: Job scheduled successfully -/// - **400 Bad Request**: Invalid request body or unsupported job kind +/// - **400 Bad Request**: Invalid request body, invalid retry strategy, or no workers /// - **409 Conflict**: An active job with the same idempotency key already exists /// - **500 Internal Server Error**: Scheduler error /// /// ## Error Codes -/// - `INVALID_BODY`: The request body is not valid JSON or has missing/invalid fields +/// - `INVALID_BODY`: The request body is not valid JSON, has missing/invalid fields, or has invalid retry strategy parameters /// - `NO_WORKERS_AVAILABLE`: No active workers to handle the job /// - `ACTIVE_JOB_CONFLICT`: An active job with the same idempotency key already exists /// - `SCHEDULER_ERROR`: Internal scheduler failure @@ -67,14 +73,16 @@ pub async fn handler( CreateJobRequest::Gc { descriptor, worker_id, + retry_strategy, } => { let key = amp_job_gc::job_key::idempotency_key(descriptor.location_id); - let desc = scheduler::JobDescriptor::from(descriptor); + let desc = scheduler::JobDescriptor::gc(descriptor, retry_strategy); (key, desc, worker_id) } CreateJobRequest::MaterializeRaw { descriptor, worker_id, + retry_strategy, } => { let reference = HashReference::new( descriptor.dataset_namespace.clone(), @@ -82,12 +90,13 @@ pub async fn handler( descriptor.manifest_hash.clone(), ); let key = amp_job_materialize_datasets_raw::job_key::idempotency_key(&reference); - let desc = scheduler::JobDescriptor::from(descriptor); + let desc = scheduler::JobDescriptor::materialize_raw(descriptor, retry_strategy); (key, desc, worker_id) } CreateJobRequest::MaterializeDerived { descriptor, worker_id, + retry_strategy, } => { let reference = HashReference::new( descriptor.dataset_namespace.clone(), @@ -95,7 +104,7 @@ pub async fn handler( descriptor.manifest_hash.clone(), ); let key = amp_job_materialize_datasets_derived::job_key::idempotency_key(&reference); - let desc = scheduler::JobDescriptor::from(descriptor); + let desc = scheduler::JobDescriptor::materialize_derived(descriptor, retry_strategy); (key, desc, worker_id) } }; @@ -137,6 +146,10 @@ pub enum CreateJobRequest { #[serde(default)] #[cfg_attr(feature = "utoipa", schema(value_type = Option))] worker_id: Option, + /// Optional retry strategy controlling how failed jobs are retried. + #[serde(default)] + #[cfg_attr(feature = "utoipa", schema(value_type = Option))] + retry_strategy: Option, }, /// Schedule a raw dataset materialization job. MaterializeRaw { @@ -148,6 +161,10 @@ pub enum CreateJobRequest { #[serde(default)] #[cfg_attr(feature = "utoipa", schema(value_type = Option))] worker_id: Option, + /// Optional retry strategy controlling how failed jobs are retried. + #[serde(default)] + #[cfg_attr(feature = "utoipa", schema(value_type = Option))] + retry_strategy: Option, }, /// Schedule a derived dataset materialization job. MaterializeDerived { @@ -159,6 +176,10 @@ pub enum CreateJobRequest { #[serde(default)] #[cfg_attr(feature = "utoipa", schema(value_type = Option))] worker_id: Option, + /// Optional retry strategy controlling how failed jobs are retried. + #[serde(default)] + #[cfg_attr(feature = "utoipa", schema(value_type = Option))] + retry_strategy: Option, }, } diff --git a/crates/controller-admin-jobs/src/scheduler.rs b/crates/controller-admin-jobs/src/scheduler.rs index 093213b3c..0028655e1 100644 --- a/crates/controller-admin-jobs/src/scheduler.rs +++ b/crates/controller-admin-jobs/src/scheduler.rs @@ -23,7 +23,7 @@ //! - Worker information queries //! - Worker status retrieval -use amp_job_core::{job_id::JobId, status::JobStatus}; +use amp_job_core::{job_id::JobId, retry_strategy::RetryStrategy, status::JobStatus}; use amp_worker_core::node_id::{InvalidIdError, NodeId, validate_node_id}; use async_trait::async_trait; use chrono::{DateTime, Utc}; @@ -228,27 +228,87 @@ impl From<&EventDetail<'_>> for JobDescriptor { } } -impl From for JobDescriptor { - fn from(desc: amp_job_materialize_datasets_raw::job_descriptor::JobDescriptor) -> Self { - let raw: EventDetailOwned = desc.into(); - Self(raw.into_inner()) +impl JobDescriptor { + /// Construct a scheduler descriptor for a raw dataset materialization job. + /// + /// # Panics + /// + /// Panics if the descriptor cannot be serialized to JSON. In practice this + /// cannot happen because the concrete descriptor types contain only + /// primitive and string fields. + pub fn materialize_raw( + desc: amp_job_materialize_datasets_raw::job_descriptor::JobDescriptor, + retry_strategy: Option, + ) -> Self { + Self::descriptor( + amp_job_materialize_datasets_raw::job_kind::JOB_KIND, + desc, + retry_strategy, + ) } -} -impl From for JobDescriptor { - fn from(desc: amp_job_materialize_datasets_derived::job_descriptor::JobDescriptor) -> Self { - let raw: EventDetailOwned = desc.into(); - Self(raw.into_inner()) + /// Construct a scheduler descriptor for a derived dataset materialization job. + /// + /// # Panics + /// + /// Panics if the descriptor cannot be serialized to JSON. In practice this + /// cannot happen because the concrete descriptor types contain only + /// primitive and string fields. + pub fn materialize_derived( + desc: amp_job_materialize_datasets_derived::job_descriptor::JobDescriptor, + retry_strategy: Option, + ) -> Self { + Self::descriptor( + amp_job_materialize_datasets_derived::job_kind::JOB_KIND, + desc, + retry_strategy, + ) + } + + /// Construct a scheduler descriptor for a garbage collection job. + /// + /// # Panics + /// + /// Panics if the descriptor cannot be serialized to JSON. In practice this + /// cannot happen because the concrete descriptor types contain only + /// primitive and string fields. + pub fn gc( + desc: amp_job_gc::job_descriptor::JobDescriptor, + retry_strategy: Option, + ) -> Self { + Self::descriptor(amp_job_gc::job_kind::JOB_KIND, desc, retry_strategy) } -} -impl From for JobDescriptor { - fn from(desc: amp_job_gc::job_descriptor::JobDescriptor) -> Self { - let raw: EventDetailOwned = desc.into(); - Self(raw.into_inner()) + /// Serialize a job descriptor together with scheduler-level fields. + fn descriptor( + kind: &str, + details: D, + retry_strategy: Option, + ) -> Self { + // SAFETY: All concrete descriptor types (`MaterializeRawJobDescriptor`, + // `MaterializeDerivedJobDescriptor`, `GcJobDescriptor`) contain only + // primitive and string fields whose `Serialize` impls are infallible. + let raw = serde_json::value::to_raw_value(&Descriptor { + kind, + retry_strategy, + details, + }) + .expect("Descriptor serialization is infallible"); + Self(raw) } } +/// Combines a typed job descriptor with its `kind` tag and scheduler-level +/// fields (retry strategy) into a single JSON object for storage. +#[derive(serde::Serialize)] +struct Descriptor<'a, D> { + kind: &'a str, + #[serde(skip_serializing_if = "Option::is_none")] + retry_strategy: Option, + #[serde(flatten)] + details: D, +} + /// Errors that can occur when scheduling a dataset dump job #[derive(Debug, thiserror::Error)] pub enum ScheduleJobError { diff --git a/crates/job-core/src/retry_strategy.rs b/crates/job-core/src/retry_strategy.rs index ac9457ecd..e2cee420c 100644 --- a/crates/job-core/src/retry_strategy.rs +++ b/crates/job-core/src/retry_strategy.rs @@ -4,12 +4,14 @@ //! The metadata-db layer stores this as opaque JSON; this module provides the //! typed interpretation used by the scheduler. -use std::time::Duration; +use std::{fmt, time::Duration}; + +use serde::de; /// Retry strategy for a job, deserialized from the descriptor's `retry_strategy` field. /// /// Each variant carries only the parameters relevant to that strategy. -#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)] #[serde(tag = "strategy", rename_all = "snake_case")] pub enum RetryStrategy { /// Never retry — transition to FATAL on first failure. @@ -17,8 +19,8 @@ pub enum RetryStrategy { /// Retry up to `max_attempts` times, then mark FATAL. Bounded { - /// Maximum number of retry attempts (required). - max_attempts: u32, + /// Maximum number of retry attempts (required, >= 1). + max_attempts: MaxAttempts, /// Backoff configuration. Defaults to exponential (base=1s, mult=2.0). #[serde(default)] backoff: Backoff, @@ -32,14 +34,84 @@ pub enum RetryStrategy { }, } +impl RetryStrategy { + /// Parse a `RetryStrategy` from a job descriptor JSON string. + /// + /// Extracts the `retry_strategy` field from the descriptor object and + /// deserializes it. Returns `None` if the field is missing or malformed. + pub fn from_descriptor(descriptor_json: &str) -> Option { + #[derive(serde::Deserialize)] + struct Extract { + retry_strategy: Option, + } + serde_json::from_str::(descriptor_json) + .ok()? + .retry_strategy + } + + /// Returns `true` if retries are exhausted for the given attempt index (0-based). + pub fn is_exhausted(&self, attempt_index: u32) -> bool { + match self { + RetryStrategy::None => true, + RetryStrategy::Bounded { max_attempts, .. } => attempt_index >= max_attempts.get(), + RetryStrategy::UnlessStopped { .. } => false, + } + } + + /// Returns `true` if the backoff variant is `ExponentialWithJitter`. + pub fn needs_jitter(&self) -> bool { + matches!( + self, + RetryStrategy::Bounded { + backoff: Backoff::ExponentialWithJitter { .. }, + .. + } | RetryStrategy::UnlessStopped { + backoff: Backoff::ExponentialWithJitter { .. } + } + ) + } + + /// Compute the backoff delay for a given attempt index (0-based). + /// Does NOT include jitter — caller applies jitter for `ExponentialWithJitter`. + pub fn compute_delay(&self, attempt_index: u32) -> Duration { + match self { + RetryStrategy::None => Duration::ZERO, + RetryStrategy::Bounded { backoff, .. } | RetryStrategy::UnlessStopped { backoff } => { + backoff.compute_delay(attempt_index) + } + } + } +} + +impl Default for RetryStrategy { + /// Matches current behavior: unlimited retries with exponential backoff. + fn default() -> Self { + Self::UnlessStopped { + backoff: Backoff::default(), + } + } +} + +impl fmt::Display for RetryStrategy { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + RetryStrategy::None => write!(f, "none"), + RetryStrategy::Bounded { max_attempts, .. } => { + write!(f, "bounded(max_attempts={})", max_attempts.get()) + } + RetryStrategy::UnlessStopped { .. } => write!(f, "unless_stopped"), + } + } +} + /// Backoff algorithm with variant-specific parameters. -#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)] #[serde(tag = "kind", rename_all = "snake_case")] pub enum Backoff { /// Fixed delay: `delay = base_delay_secs` for every attempt. Fixed { - #[serde(default = "default_base_delay_secs")] - base_delay_secs: u64, + #[serde(default = "BaseDelaySecs::default")] + base_delay_secs: BaseDelaySecs, }, /// Exponential: `delay = base_delay_secs * multiplier ^ attempt_index`. @@ -55,119 +127,279 @@ pub enum Backoff { }, } +impl Backoff { + /// Compute the backoff delay for a given attempt index (0-based). + /// Does NOT include jitter — caller applies jitter for `ExponentialWithJitter`. + fn compute_delay(&self, attempt_index: u32) -> Duration { + match self { + Backoff::Fixed { base_delay_secs } => Duration::from_secs(base_delay_secs.get()), + Backoff::Exponential { params } | Backoff::ExponentialWithJitter { params } => { + params.compute_delay(attempt_index) + } + } + } +} + +impl Default for Backoff { + fn default() -> Self { + Self::Exponential { + params: ExponentialParams::default(), + } + } +} + /// Shared parameters for exponential backoff variants. -#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] +/// +/// Cross-field invariant: when `max_delay_secs` is present it must be +/// `>= base_delay_secs`. This is enforced by the custom `Deserialize` impl +/// and the `new()` constructor. +#[derive(Debug, Clone, Default, PartialEq, serde::Serialize)] pub struct ExponentialParams { - #[serde(default = "default_base_delay_secs")] - pub base_delay_secs: u64, - #[serde(default = "default_multiplier")] - pub multiplier: f64, + base_delay_secs: BaseDelaySecs, + multiplier: Multiplier, /// Optional cap on computed delay (seconds). #[serde(default, skip_serializing_if = "Option::is_none")] - pub max_delay_secs: Option, + max_delay_secs: Option, } -fn default_base_delay_secs() -> u64 { - 1 -} +/// Errors when constructing [`ExponentialParams`] with invalid values +#[derive(Debug, thiserror::Error)] +pub enum ExponentialParamsError { + /// The `base_delay_secs` value is invalid + #[error("invalid base delay")] + BaseDelay(#[source] BaseDelaySecsError), -fn default_multiplier() -> f64 { - 2.0 + /// The `multiplier` value is invalid + #[error("invalid multiplier")] + Multiplier(#[source] MultiplierError), + + /// The `max_delay_secs` cap is less than `base_delay_secs` + /// + /// This makes the cap unreachable from the first attempt. + #[error("max_delay_secs ({max}) must be >= base_delay_secs ({base})")] + MaxDelayBelowBase { + /// The provided max delay cap + max: u64, + /// The provided base delay + base: u64, + }, } impl ExponentialParams { + /// Create a new `ExponentialParams` with the given values. + /// + /// Returns an error if any field is invalid or `max_delay_secs < base_delay_secs`. + pub fn new( + base_delay_secs: u64, + multiplier: f64, + max_delay_secs: Option, + ) -> Result { + let base = + BaseDelaySecs::new(base_delay_secs).map_err(ExponentialParamsError::BaseDelay)?; + let mult = Multiplier::new(multiplier).map_err(ExponentialParamsError::Multiplier)?; + + if let Some(max) = max_delay_secs + && max < base_delay_secs + { + return Err(ExponentialParamsError::MaxDelayBelowBase { + max, + base: base_delay_secs, + }); + } + + Ok(Self { + base_delay_secs: base, + multiplier: mult, + max_delay_secs, + }) + } + + /// Returns the base delay in seconds. + pub fn base_delay_secs(&self) -> u64 { + self.base_delay_secs.get() + } + + /// Returns the multiplier. + pub fn multiplier(&self) -> f64 { + self.multiplier.get() + } + + /// Returns the optional maximum delay cap in seconds. + pub fn max_delay_secs(&self) -> Option { + self.max_delay_secs + } + fn compute_delay(&self, attempt_index: u32) -> Duration { - let base = self.base_delay_secs as f64; + let base = self.base_delay_secs.get() as f64; let cap = self.max_delay_secs.map(|c| c as f64).unwrap_or(f64::MAX); // Clamp exponent to avoid f64 overflow → infinity → Duration panic. let exp = attempt_index.min(63) as i32; - let raw = base * self.multiplier.powi(exp); + let raw = base * self.multiplier.get().powi(exp); Duration::from_secs_f64(raw.clamp(0.0, cap)) } } -impl Default for ExponentialParams { - fn default() -> Self { - Self { - base_delay_secs: default_base_delay_secs(), - multiplier: default_multiplier(), - max_delay_secs: None, +impl Eq for ExponentialParams {} + +/// Helper for deserializing `ExponentialParams` with cross-field validation. +impl<'de> de::Deserialize<'de> for ExponentialParams { + fn deserialize>(deserializer: D) -> Result { + /// Raw helper that mirrors the wire format without validation. + #[derive(serde::Deserialize)] + struct Raw { + #[serde(default = "BaseDelaySecs::default")] + base_delay_secs: BaseDelaySecs, + #[serde(default = "Multiplier::default")] + multiplier: Multiplier, + #[serde(default)] + max_delay_secs: Option, + } + + let raw = Raw::deserialize(deserializer)?; + + if let Some(max) = raw.max_delay_secs + && max < raw.base_delay_secs.get() + { + return Err(de::Error::custom( + "max_delay_secs must be >= base_delay_secs", + )); } + + Ok(Self { + base_delay_secs: raw.base_delay_secs, + multiplier: raw.multiplier, + max_delay_secs: raw.max_delay_secs, + }) } } -impl Default for Backoff { - fn default() -> Self { - Self::Exponential { - params: ExponentialParams::default(), +/// Maximum retry attempts for a bounded strategy. Must be at least 1. +/// +/// Validates on construction and deserialization. +#[derive(Debug, Clone, Copy, Eq, PartialEq, serde::Serialize)] +#[serde(transparent)] +pub struct MaxAttempts(u32); + +/// The provided `max_attempts` value is zero +/// +/// A bounded retry strategy requires at least one attempt to be meaningful. +#[derive(Debug, thiserror::Error)] +#[error("max_attempts must be at least 1, got {value}")] +pub struct MaxAttemptsError { + value: u32, +} + +impl MaxAttempts { + /// Create a new `MaxAttempts`, returning an error if the value is zero. + pub fn new(value: u32) -> Result { + if value == 0 { + return Err(MaxAttemptsError { value }); } + Ok(Self(value)) + } + + /// Returns the inner value. + pub fn get(self) -> u32 { + self.0 } } -impl Default for RetryStrategy { - /// Matches current behavior: unlimited retries with exponential backoff. - fn default() -> Self { - Self::UnlessStopped { - backoff: Backoff::default(), - } +impl<'de> de::Deserialize<'de> for MaxAttempts { + fn deserialize>(deserializer: D) -> Result { + let value = u32::deserialize(deserializer)?; + Self::new(value).map_err(de::Error::custom) } } -impl Backoff { - /// Compute the backoff delay for a given attempt index (0-based). - /// Does NOT include jitter — caller applies jitter for `ExponentialWithJitter`. - fn compute_delay(&self, attempt_index: u32) -> Duration { - match self { - Backoff::Fixed { base_delay_secs } => Duration::from_secs(*base_delay_secs), - Backoff::Exponential { params } | Backoff::ExponentialWithJitter { params } => { - params.compute_delay(attempt_index) - } +/// Base delay in seconds. Must be at least 1. +/// +/// Validates on construction and deserialization — invalid values are rejected +/// at parse time rather than via a post-hoc `validate()` call. +#[derive(Debug, Clone, Copy, Eq, PartialEq, serde::Serialize)] +#[serde(transparent)] +pub struct BaseDelaySecs(u64); + +/// The provided `base_delay_secs` value is zero +/// +/// A delay of zero seconds is meaningless for backoff configuration. +#[derive(Debug, thiserror::Error)] +#[error("base_delay_secs must be at least 1, got {value}")] +pub struct BaseDelaySecsError { + value: u64, +} + +impl BaseDelaySecs { + /// Create a new `BaseDelaySecs`, returning an error if the value is zero. + pub fn new(value: u64) -> Result { + if value == 0 { + return Err(BaseDelaySecsError { value }); } + Ok(Self(value)) + } + + /// Returns the inner value. + pub fn get(self) -> u64 { + self.0 } } -impl RetryStrategy { - /// Parse a `RetryStrategy` from a job descriptor JSON string. - /// - /// Extracts the `retry_strategy` field from the descriptor object and - /// deserializes it. Returns `None` if the field is missing or malformed. - pub fn from_descriptor(descriptor_json: &str) -> Option { - let value: serde_json::Value = serde_json::from_str(descriptor_json).ok()?; - let retry_obj = value.get("retry_strategy")?; - serde_json::from_value(retry_obj.clone()).ok() +impl Default for BaseDelaySecs { + fn default() -> Self { + Self(1) } +} - /// Returns `true` if retries are exhausted for the given attempt index (0-based). - pub fn is_exhausted(&self, attempt_index: u32) -> bool { - match self { - RetryStrategy::None => true, - RetryStrategy::Bounded { max_attempts, .. } => attempt_index >= *max_attempts, - RetryStrategy::UnlessStopped { .. } => false, +impl<'de> de::Deserialize<'de> for BaseDelaySecs { + fn deserialize>(deserializer: D) -> Result { + let value = u64::deserialize(deserializer)?; + Self::new(value).map_err(de::Error::custom) + } +} + +/// Exponential backoff multiplier. Must be finite and positive. +/// +/// Validates on construction and deserialization. +#[derive(Debug, Clone, Copy, PartialEq, serde::Serialize)] +#[serde(transparent)] +pub struct Multiplier(f64); + +/// The provided `multiplier` value is not a finite positive number +/// +/// This occurs when the multiplier is NaN, infinite, zero, or negative. +#[derive(Debug, thiserror::Error)] +#[error("multiplier must be a finite positive number, got {value}")] +pub struct MultiplierError { + value: f64, +} + +impl Multiplier { + /// Create a new `Multiplier`, returning an error if the value is not finite + /// and positive. + pub fn new(value: f64) -> Result { + if !value.is_finite() || value <= 0.0 { + return Err(MultiplierError { value }); } + Ok(Self(value)) } - /// Returns `true` if the backoff variant is `ExponentialWithJitter`. - pub fn needs_jitter(&self) -> bool { - matches!( - self, - RetryStrategy::Bounded { - backoff: Backoff::ExponentialWithJitter { .. }, - .. - } | RetryStrategy::UnlessStopped { - backoff: Backoff::ExponentialWithJitter { .. } - } - ) + /// Returns the inner value. + pub fn get(self) -> f64 { + self.0 } +} - /// Compute the backoff delay for a given attempt index (0-based). - /// Does NOT include jitter — caller applies jitter for `ExponentialWithJitter`. - pub fn compute_delay(&self, attempt_index: u32) -> Duration { - match self { - RetryStrategy::None => Duration::ZERO, - RetryStrategy::Bounded { backoff, .. } | RetryStrategy::UnlessStopped { backoff } => { - backoff.compute_delay(attempt_index) - } - } +impl Default for Multiplier { + fn default() -> Self { + Self(2.0) + } +} + +impl Eq for Multiplier {} + +impl<'de> de::Deserialize<'de> for Multiplier { + fn deserialize>(deserializer: D) -> Result { + let value = f64::deserialize(deserializer)?; + Self::new(value).map_err(de::Error::custom) } } @@ -216,7 +448,9 @@ mod tests { fn compute_delay_with_fixed_backoff_returns_constant_delay() { //* Given let strategy = RetryStrategy::UnlessStopped { - backoff: Backoff::Fixed { base_delay_secs: 5 }, + backoff: Backoff::Fixed { + base_delay_secs: BaseDelaySecs::new(5).unwrap(), + }, }; //* When @@ -241,10 +475,7 @@ mod tests { //* Given let strategy = RetryStrategy::UnlessStopped { backoff: Backoff::Exponential { - params: ExponentialParams { - max_delay_secs: Some(60), - ..ExponentialParams::default() - }, + params: ExponentialParams::new(1, 2.0, Some(60)).unwrap(), }, }; @@ -264,10 +495,7 @@ mod tests { //* Given let strategy = RetryStrategy::UnlessStopped { backoff: Backoff::Exponential { - params: ExponentialParams { - max_delay_secs: Some(3600), - ..ExponentialParams::default() - }, + params: ExponentialParams::new(1, 2.0, Some(3600)).unwrap(), }, }; @@ -311,7 +539,7 @@ mod tests { fn is_exhausted_with_bounded_strategy_respects_max_attempts() { //* Given let strategy = RetryStrategy::Bounded { - max_attempts: 3, + max_attempts: MaxAttempts::new(3).unwrap(), backoff: Backoff::default(), }; @@ -348,9 +576,9 @@ mod tests { assert_eq!( strategy, Some(RetryStrategy::Bounded { - max_attempts: 5, + max_attempts: MaxAttempts::new(5).unwrap(), backoff: Backoff::Fixed { - base_delay_secs: 10 + base_delay_secs: BaseDelaySecs::new(10).unwrap(), }, }), "should parse retry_strategy from descriptor JSON" @@ -385,7 +613,7 @@ mod tests { fn needs_jitter_with_exponential_with_jitter_returns_true() { //* Given let strategy = RetryStrategy::Bounded { - max_attempts: 3, + max_attempts: MaxAttempts::new(3).unwrap(), backoff: Backoff::ExponentialWithJitter { params: ExponentialParams::default(), }, @@ -402,7 +630,7 @@ mod tests { fn needs_jitter_with_exponential_returns_false() { //* Given let strategy = RetryStrategy::Bounded { - max_attempts: 3, + max_attempts: MaxAttempts::new(3).unwrap(), backoff: Backoff::Exponential { params: ExponentialParams::default(), }, @@ -419,7 +647,9 @@ mod tests { fn needs_jitter_with_fixed_returns_false() { //* Given let strategy = RetryStrategy::UnlessStopped { - backoff: Backoff::Fixed { base_delay_secs: 5 }, + backoff: Backoff::Fixed { + base_delay_secs: BaseDelaySecs::new(5).unwrap(), + }, }; //* Then @@ -429,17 +659,119 @@ mod tests { ); } + // --- Deserialization rejection tests (replaces validate() tests) --- + + #[test] + fn deserialize_rejects_zero_max_attempts() { + //* Given + let json = r#"{"strategy":"bounded","max_attempts":0}"#; + + //* When + let result = serde_json::from_str::(json); + + //* Then + let err = result.expect_err("zero max_attempts should be rejected at parse time"); + assert!( + err.to_string().contains("max_attempts must be at least 1"), + "unexpected error: {err}" + ); + } + + #[test] + fn deserialize_rejects_nan_multiplier() { + //* Given — NaN is not valid JSON, so we test via the constructor + let result = Multiplier::new(f64::NAN); + + //* Then + assert!( + result.is_err(), + "NaN multiplier should be rejected at construction" + ); + } + + #[test] + fn deserialize_rejects_negative_multiplier() { + //* Given + let json = r#"{"strategy":"bounded","max_attempts":3,"backoff":{"kind":"exponential","multiplier":-1.0}}"#; + + //* When + let result = serde_json::from_str::(json); + + //* Then + let err = result.expect_err("negative multiplier should be rejected at parse time"); + assert!( + err.to_string() + .contains("multiplier must be a finite positive number"), + "unexpected error: {err}" + ); + } + + #[test] + fn deserialize_rejects_zero_base_delay() { + //* Given + let json = + r#"{"strategy":"unless_stopped","backoff":{"kind":"fixed","base_delay_secs":0}}"#; + + //* When + let result = serde_json::from_str::(json); + + //* Then + let err = result.expect_err("zero base delay should be rejected at parse time"); + assert!( + err.to_string() + .contains("base_delay_secs must be at least 1"), + "unexpected error: {err}" + ); + } + + #[test] + fn deserialize_rejects_max_delay_less_than_base_delay() { + //* Given + let json = r#"{"strategy":"bounded","max_attempts":3,"backoff":{"kind":"exponential","base_delay_secs":60,"max_delay_secs":10}}"#; + + //* When + let result = serde_json::from_str::(json); + + //* Then + let err = result.expect_err("max_delay < base_delay should be rejected at parse time"); + assert!( + err.to_string() + .contains("max_delay_secs must be >= base_delay_secs"), + "unexpected error: {err}" + ); + } + + #[test] + fn deserialize_accepts_valid_bounded_strategy() { + //* Given + let json = r#"{"strategy":"bounded","max_attempts":5,"backoff":{"kind":"exponential","base_delay_secs":1,"multiplier":2.0,"max_delay_secs":60}}"#; + + //* When + let result = serde_json::from_str::(json); + + //* Then + assert!(result.is_ok(), "valid strategy should parse successfully"); + } + + #[test] + fn deserialize_accepts_none_strategy() { + //* Given + let json = r#"{"strategy":"none"}"#; + + //* When + let result = serde_json::from_str::(json); + + //* Then + assert!(result.is_ok(), "none strategy should parse successfully"); + } + #[test] fn retry_strategy_serde_round_trip_preserves_values() { //* Given let strategy = RetryStrategy::Bounded { - max_attempts: 5, + max_attempts: MaxAttempts::new(5).unwrap(), backoff: Backoff::ExponentialWithJitter { - params: ExponentialParams { - base_delay_secs: 2, - multiplier: 3.0, - max_delay_secs: Some(120), - }, + params: ExponentialParams::new(2, 3.0, Some(120)).unwrap(), }, }; diff --git a/docs/schemas/openapi/admin.spec.json b/docs/schemas/openapi/admin.spec.json index bd3184c7b..2083a4781 100644 --- a/docs/schemas/openapi/admin.spec.json +++ b/docs/schemas/openapi/admin.spec.json @@ -295,7 +295,7 @@ "datasets" ], "summary": "Handler for the `POST /datasets/{namespace}/{name}/versions/{revision}/deploy` endpoint", - "description": "Schedules a data extraction job for the specified dataset revision.\n\n## Path Parameters\n- `namespace`: Dataset namespace\n- `name`: Dataset name\n- `revision`: Revision (version, hash, \"latest\", or \"dev\")\n\n## Request Body\n- `end_block`: End block configuration (null for continuous, \"latest\", number, or negative offset)\n- `parallelism`: Number of parallel workers (default: 1, only for raw datasets)\n- `worker_id`: Optional worker selector (exact ID or glob pattern)\n- `verify`: Enable cryptographic verification of EVM block data (default: false)\n\n## Response\n- **202 Accepted**: Job successfully scheduled\n- **400 Bad Request**: Invalid path parameters or request body\n- **404 Not Found**: Dataset or revision not found\n- **500 Internal Server Error**: Database or scheduler error\n\n## Error Codes\n- `INVALID_PATH`: Invalid path parameters (namespace, name, or revision)\n- `INVALID_BODY`: Invalid request body (malformed JSON or missing required fields)\n- `DATASET_NOT_FOUND`: The specified dataset or revision does not exist\n- `LIST_VERSION_TAGS_ERROR`: Failed to list version tags from dataset store\n- `RESOLVE_REVISION_ERROR`: Failed to resolve revision to manifest hash\n- `GET_DATASET_ERROR`: Failed to load dataset from store\n- `WORKER_NOT_AVAILABLE`: Specified worker not found or inactive\n- `SCHEDULER_ERROR`: Failed to schedule extraction job\n\n## Behavior\nThis endpoint schedules a data extraction job for a dataset:\n1. Resolves the revision to find the corresponding version tag\n2. Loads the full dataset configuration from the dataset store\n3. Schedules an extraction job with the specified parameters\n4. Returns job ID for tracking\n\nThe revision parameter supports four types:\n- Semantic version (e.g., \"1.2.3\") - uses that specific version\n- \"latest\" - resolves to the highest semantic version\n- \"dev\" - resolves to the development version tag\n- Manifest hash (SHA256 hash) - finds the version that points to this hash\n\nJobs are executed asynchronously by worker nodes. Use the returned job ID\nto track progress via the jobs endpoints.", + "description": "Schedules a data extraction job for the specified dataset revision.\n\n## Path Parameters\n- `namespace`: Dataset namespace\n- `name`: Dataset name\n- `revision`: Revision (version, hash, \"latest\", or \"dev\")\n\n## Request Body\n- `end_block`: End block configuration (null for continuous, \"latest\", number, or negative offset)\n- `parallelism`: Number of parallel workers (default: 1, only for raw datasets)\n- `worker_id`: Optional worker selector (exact ID or glob pattern)\n- `verify`: Enable cryptographic verification of EVM block data (default: false)\n- `retry_strategy`: Optional retry strategy controlling how failed jobs are retried\n\n## Response\n- **202 Accepted**: Job successfully scheduled\n- **400 Bad Request**: Invalid path parameters or request body\n- **404 Not Found**: Dataset or revision not found\n- **500 Internal Server Error**: Database or scheduler error\n\n## Error Codes\n- `INVALID_PATH`: Invalid path parameters (namespace, name, or revision)\n- `INVALID_BODY`: Invalid request body (malformed JSON or missing required fields)\n- `DATASET_NOT_FOUND`: The specified dataset or revision does not exist\n- `LIST_VERSION_TAGS_ERROR`: Failed to list version tags from dataset store\n- `RESOLVE_REVISION_ERROR`: Failed to resolve revision to manifest hash\n- `GET_DATASET_ERROR`: Failed to load dataset from store\n- `WORKER_NOT_AVAILABLE`: Specified worker not found or inactive\n- `SCHEDULER_ERROR`: Failed to schedule extraction job\n\n## Behavior\nThis endpoint schedules a data extraction job for a dataset:\n1. Resolves the revision to find the corresponding version tag\n2. Loads the full dataset configuration from the dataset store\n3. Schedules an extraction job with the specified parameters\n4. Returns job ID for tracking\n\nThe revision parameter supports four types:\n- Semantic version (e.g., \"1.2.3\") - uses that specific version\n- \"latest\" - resolves to the highest semantic version\n- \"dev\" - resolves to the development version tag\n- Manifest hash (SHA256 hash) - finds the version that points to this hash\n\nJobs are executed asynchronously by worker nodes. Use the returned job ID\nto track progress via the jobs endpoints.", "operationId": "deploy_dataset", "parameters": [ { @@ -916,7 +916,7 @@ "jobs" ], "summary": "Handler for the `POST /jobs` endpoint", - "description": "Schedules a new job for execution by an available worker. The job type is\ndetermined by the `kind` field in the request body (`gc`, `materialize-raw`,\nor `materialize-derived`). Additional fields depend on the job kind and are\ndefined by the corresponding job descriptor type.\n\n## Response\n- **202 Accepted**: Job scheduled successfully\n- **400 Bad Request**: Invalid request body or unsupported job kind\n- **409 Conflict**: An active job with the same idempotency key already exists\n- **500 Internal Server Error**: Scheduler error\n\n## Error Codes\n- `INVALID_BODY`: The request body is not valid JSON or has missing/invalid fields\n- `NO_WORKERS_AVAILABLE`: No active workers to handle the job\n- `ACTIVE_JOB_CONFLICT`: An active job with the same idempotency key already exists\n- `SCHEDULER_ERROR`: Internal scheduler failure", + "description": "Schedules a new job for execution by an available worker. The job type is\ndetermined by the `kind` field in the request body (`gc`, `materialize-raw`,\nor `materialize-derived`). Additional fields depend on the job kind and are\ndefined by the corresponding job descriptor type.\n\n## Request Body\n- `kind`: Job type (`gc`, `materialize-raw`, or `materialize-derived`)\n- `worker_id`: Optional worker selector (exact ID or glob pattern)\n- `retry_strategy`: Optional retry strategy controlling how failed jobs are retried\n- Additional fields vary by job kind (see [`CreateJobRequest`] variants)\n\n## Response\n- **202 Accepted**: Job scheduled successfully\n- **400 Bad Request**: Invalid request body, invalid retry strategy, or no workers\n- **409 Conflict**: An active job with the same idempotency key already exists\n- **500 Internal Server Error**: Scheduler error\n\n## Error Codes\n- `INVALID_BODY`: The request body is not valid JSON, has missing/invalid fields, or has invalid retry strategy parameters\n- `NO_WORKERS_AVAILABLE`: No active workers to handle the job\n- `ACTIVE_JOB_CONFLICT`: An active job with the same idempotency key already exists\n- `SCHEDULER_ERROR`: Internal scheduler failure", "operationId": "jobs_create", "requestBody": { "content": { @@ -2652,6 +2652,13 @@ { "type": "object", "properties": { + "retry_strategy": { + "type": [ + "object", + "null" + ], + "description": "Optional retry strategy controlling how failed jobs are retried." + }, "worker_id": { "type": [ "string", @@ -2688,6 +2695,13 @@ { "type": "object", "properties": { + "retry_strategy": { + "type": [ + "object", + "null" + ], + "description": "Optional retry strategy controlling how failed jobs are retried." + }, "worker_id": { "type": [ "string", @@ -2724,6 +2738,13 @@ { "type": "object", "properties": { + "retry_strategy": { + "type": [ + "object", + "null" + ], + "description": "Optional retry strategy controlling how failed jobs are retried." + }, "worker_id": { "type": [ "string", @@ -2962,6 +2983,13 @@ "description": "Number of parallel workers to run\n\nEach worker will be responsible for an equal number of blocks.\nFor example, if extracting blocks 0-10,000,000 with parallelism=10,\neach worker will handle a contiguous section of 1 million blocks.\n\nOnly applicable to raw datasets (EVM RPC, Firehose, etc.).\nDerived datasets ignore this parameter.\n\nDefaults to 1 if not specified.", "minimum": 0 }, + "retry_strategy": { + "type": [ + "object", + "null" + ], + "description": "Optional retry strategy controlling how failed jobs are retried.\n\nSupports three strategies:\n- `none`: Never retry — fail immediately on first error.\n- `bounded`: Retry up to `max_attempts` times with configurable backoff.\n- `unless_stopped`: Retry indefinitely until the job is explicitly stopped.\n\nIf not specified, the scheduler applies its default strategy." + }, "verify": { "type": "boolean", "description": "Enable cryptographic verification of EVM block data during extraction.\n\nWhen enabled, verifies block hashes, transaction roots, and receipt roots\nbefore writing data to storage. Only applicable to EVM raw datasets.\nVerification failures are retryable errors.\n\nDefaults to false if not specified." diff --git a/tests/src/testlib/fixtures/ampctl.rs b/tests/src/testlib/fixtures/ampctl.rs index e0b97a5c0..24e77b25e 100644 --- a/tests/src/testlib/fixtures/ampctl.rs +++ b/tests/src/testlib/fixtures/ampctl.rs @@ -214,6 +214,7 @@ impl Ampctl { parallelism.unwrap_or(1), worker_id, verify, + None, ) .await .map_err(Into::into) diff --git a/tests/src/tests/it_admin_api_datasets_deploy.rs b/tests/src/tests/it_admin_api_datasets_deploy.rs index 2e7391edd..6e7ccfef9 100644 --- a/tests/src/tests/it_admin_api_datasets_deploy.rs +++ b/tests/src/tests/it_admin_api_datasets_deploy.rs @@ -1,11 +1,14 @@ -//! Integration tests for dataset redeployment idempotency +//! Integration tests for dataset deployment via the admin API. //! -//! Verifies that deploying the same dataset version multiple times returns the same -//! job ID (idempotency key deduplication) and that re-deploying with a different -//! end_block while the job is still active does not create a new job. +//! Verifies redeployment idempotency and retry_strategy round-tripping through +//! the admin API: the field is persisted in the job descriptor when provided +//! and absent when omitted. use amp_client_admin::{self as client, end_block::EndBlock}; -use amp_job_core::job_id::JobId; +use amp_job_core::{ + job_id::JobId, + retry_strategy::{Backoff, ExponentialParams, MaxAttempts, RetryStrategy}, +}; use datasets_common::reference::Reference; use crate::testlib::ctx::TestCtxBuilder; @@ -38,6 +41,68 @@ async fn redeploy_with_different_end_block_returns_same_job_id() { ); } +// --- Retry strategy tests --- + +#[tokio::test] +async fn deploy_with_retry_strategy_persists_in_descriptor() { + //* Given + let ctx = TestCtx::setup("deploy_retry_bounded").await; + + let retry_strategy = RetryStrategy::Bounded { + max_attempts: MaxAttempts::new(10).unwrap(), + backoff: Backoff::Exponential { + params: ExponentialParams::new(1, 3.0, None).unwrap(), + }, + }; + + //* When + let job_id = ctx + .deploy_dataset_with_retry(Some(EndBlock::Latest), Some(retry_strategy.clone())) + .await + .expect("deployment should succeed"); + + //* Then + let job = ctx + .ampctl_client + .jobs() + .get(&job_id) + .await + .expect("get job should succeed") + .expect("job should exist"); + + let rs: RetryStrategy = serde_json::from_value(job.descriptor["retry_strategy"].clone()) + .expect("retry_strategy should deserialize"); + assert_eq!(rs, retry_strategy, "persisted strategy should match input"); +} + +#[tokio::test] +async fn deploy_without_retry_strategy_omits_field() { + //* Given + let ctx = TestCtx::setup("deploy_retry_omitted").await; + + //* When + let job_id = ctx + .deploy_dataset(Some(EndBlock::Latest)) + .await + .expect("deployment should succeed"); + + //* Then + let job = ctx + .ampctl_client + .jobs() + .get(&job_id) + .await + .expect("get job should succeed") + .expect("job should exist"); + + assert!( + job.descriptor.get("retry_strategy").is_none(), + "retry_strategy should be absent from descriptor" + ); +} + +// --- Test helpers --- + struct TestCtx { ctx: crate::testlib::ctx::TestCtx, dataset_ref: Reference, @@ -82,10 +147,18 @@ impl TestCtx { async fn deploy_dataset( &self, end_block: Option, + ) -> Result { + self.deploy_dataset_with_retry(end_block, None).await + } + + async fn deploy_dataset_with_retry( + &self, + end_block: Option, + retry_strategy: Option, ) -> Result { self.ampctl_client .datasets() - .deploy(&self.dataset_ref, end_block, 1, None, false) + .deploy(&self.dataset_ref, end_block, 1, None, false, retry_strategy) .await } } diff --git a/tests/src/tests/it_admin_api_datasets_list_jobs.rs b/tests/src/tests/it_admin_api_datasets_list_jobs.rs index f0d884e64..f84c775d7 100644 --- a/tests/src/tests/it_admin_api_datasets_list_jobs.rs +++ b/tests/src/tests/it_admin_api_datasets_list_jobs.rs @@ -211,7 +211,7 @@ impl TestCtx { ); self.ampctl_client .datasets() - .deploy(&reference, None, 1, None, false) + .deploy(&reference, None, 1, None, false, None) .await } diff --git a/tests/src/tests/it_admin_api_datasets_stop_job.rs b/tests/src/tests/it_admin_api_datasets_stop_job.rs index 11a142a39..af63e253b 100644 --- a/tests/src/tests/it_admin_api_datasets_stop_job.rs +++ b/tests/src/tests/it_admin_api_datasets_stop_job.rs @@ -301,7 +301,7 @@ impl TestCtx { ) -> Result { self.ampctl_client .datasets() - .deploy(&self.dataset_ref, end_block, 1, None, false) + .deploy(&self.dataset_ref, end_block, 1, None, false, None) .await } @@ -330,7 +330,7 @@ impl TestCtx { ); self.ampctl_client .datasets() - .deploy(&reference, None, 1, None, false) + .deploy(&reference, None, 1, None, false, None) .await } diff --git a/tests/src/tests/it_admin_api_job_delete.rs b/tests/src/tests/it_admin_api_job_delete.rs index 448cc86ef..3c2c15bb4 100644 --- a/tests/src/tests/it_admin_api_job_delete.rs +++ b/tests/src/tests/it_admin_api_job_delete.rs @@ -406,7 +406,7 @@ impl TestCtx { ); self.ampctl_client .datasets() - .deploy(&reference, None, 1, None, false) + .deploy(&reference, None, 1, None, false, None) .await }