Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/bin/ampctl/src/cmd/dataset/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ async fn deploy_dataset(
let client = global.build_client().map_err(Error::ClientBuild)?;
let job_id = client
.datasets()
.deploy(dataset_ref, end_block, parallelism, worker_id, verify)
// TODO: Accept retry strategy
.deploy(dataset_ref, end_block, parallelism, worker_id, verify, None)
.await
.map_err(Error::Deploy)?;

Expand Down
2 changes: 2 additions & 0 deletions crates/bin/ampctl/src/cmd/job/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub async fn run(Args { global, kind }: Args) -> Result<(), Error> {
let request = match kind {
JobKind::Gc(gc_args) => CreateJobRequest::Gc {
location_id: gc_args.location_id,
// TODO: Accept retry_strategy from args
retry_strategy: None,
},
};

Expand Down
6 changes: 5 additions & 1 deletion crates/clients/admin/src/datasets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//!
//! Provides methods for interacting with the `/datasets` endpoints of the admin API.

use amp_job_core::job_id::JobId;
use amp_job_core::{job_id::JobId, retry_strategy::RetryStrategy};
use amp_worker_core::node_id::{InvalidIdError, NodeId, validate_node_id};
use datasets_common::{
dataset_kind_str::DatasetKindStr, fqn::FullyQualifiedName, hash::Hash, name::Name,
Expand Down Expand Up @@ -274,6 +274,7 @@ impl<'a> DatasetsClient<'a> {
parallelism: u16,
worker_id: Option<NodeSelector>,
verify: bool,
retry_strategy: Option<RetryStrategy>,
) -> Result<JobId, DeployError> {
let namespace = dataset_ref.namespace();
let name = dataset_ref.name();
Expand All @@ -292,6 +293,7 @@ impl<'a> DatasetsClient<'a> {
parallelism,
worker_id,
verify,
retry_strategy,
};

let response = self
Expand Down Expand Up @@ -1412,6 +1414,8 @@ struct DeployRequest {
worker_id: Option<NodeSelector>,
#[serde(skip_serializing_if = "std::ops::Not::not")]
verify: bool,
#[serde(skip_serializing_if = "Option::is_none")]
retry_strategy: Option<RetryStrategy>,
}

/// Input type for dataset registration manifest parameter.
Expand Down
5 changes: 4 additions & 1 deletion crates/clients/admin/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use std::collections::BTreeMap;

use amp_job_core::job_id::JobId;
use amp_job_core::{job_id::JobId, retry_strategy::RetryStrategy};
use monitoring::logging;

use super::{
Expand Down Expand Up @@ -1361,6 +1361,9 @@ pub enum CreateJobRequest {
Gc {
/// The location ID of the physical table revision to garbage collect.
location_id: i64,
/// The retry strategy for this job.
#[serde(default, skip_serializing_if = "Option::is_none")]
retry_strategy: Option<RetryStrategy>,
},
}

Expand Down
52 changes: 35 additions & 17 deletions crates/controller-admin-jobs/src/datasets/handlers/deploy.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt::Debug;

use amp_datasets_registry::error::{ListVersionTagsError, ResolveRevisionError};
use amp_job_core::job_id::JobId;
use amp_job_core::{job_id::JobId, retry_strategy::RetryStrategy};
use amp_job_materialize_datasets_derived::job_descriptor::JobDescriptor as MaterializeDerivedDatasetJobDescriptor;
use amp_job_materialize_datasets_raw::job_descriptor::JobDescriptor as MaterializeRawDatasetJobDescriptor;
use axum::{
Expand Down Expand Up @@ -37,6 +37,7 @@ use crate::{
/// - `parallelism`: Number of parallel workers (default: 1, only for raw datasets)
/// - `worker_id`: Optional worker selector (exact ID or glob pattern)
/// - `verify`: Enable cryptographic verification of EVM block data (default: false)
/// - `retry_strategy`: Optional retry strategy controlling how failed jobs are retried
///
/// ## Response
/// - **202 Accepted**: Job successfully scheduled
Expand Down Expand Up @@ -109,6 +110,7 @@ pub async fn handler(
parallelism,
worker_id,
verify,
retry_strategy,
} = match json {
Ok(Json(request)) => request,
Err(err) => {
Expand Down Expand Up @@ -151,29 +153,33 @@ pub async fn handler(
// Build the job descriptor and compute idempotency key based on dataset kind
let (job_descriptor, idempotency_key) = if dataset.kind() == DerivedDatasetKind {
(
JobDescriptor::from(MaterializeDerivedDatasetJobDescriptor {
end_block: end_block.into(),
dataset_namespace: reference.namespace().clone(),
dataset_name: reference.name().clone(),
manifest_hash: reference.hash().clone(),
}),
JobDescriptor::materialize_derived(
MaterializeDerivedDatasetJobDescriptor {
end_block: end_block.into(),
dataset_namespace: reference.namespace().clone(),
dataset_name: reference.name().clone(),
manifest_hash: reference.hash().clone(),
},
retry_strategy,
),
amp_job_materialize_datasets_derived::job_key::idempotency_key(&reference),
)
} else {
(
MaterializeRawDatasetJobDescriptor {
end_block: end_block.into(),
max_writers: parallelism,
dataset_namespace: reference.namespace().clone(),
dataset_name: reference.name().clone(),
manifest_hash: reference.hash().clone(),
verify,
}
.into(),
JobDescriptor::materialize_raw(
MaterializeRawDatasetJobDescriptor {
end_block: end_block.into(),
max_writers: parallelism,
dataset_namespace: reference.namespace().clone(),
dataset_name: reference.name().clone(),
manifest_hash: reference.hash().clone(),
verify,
},
retry_strategy,
),
amp_job_materialize_datasets_raw::job_key::idempotency_key(&reference),
)
};

// Schedule the extraction job using the scheduler
let job_id = ctx
.scheduler
Expand Down Expand Up @@ -270,6 +276,18 @@ pub struct DeployRequest {
/// Defaults to false if not specified.
#[serde(default)]
pub verify: bool,

/// Optional retry strategy controlling how failed jobs are retried.
///
/// Supports three strategies:
/// - `none`: Never retry — fail immediately on first error.
/// - `bounded`: Retry up to `max_attempts` times with configurable backoff.
/// - `unless_stopped`: Retry indefinitely until the job is explicitly stopped.
///
/// If not specified, the scheduler applies its default strategy.
#[serde(default)]
#[cfg_attr(feature = "utoipa", schema(value_type = Option<Object>))]
pub retry_strategy: Option<RetryStrategy>,
}

fn default_parallelism() -> u16 {
Expand Down
33 changes: 27 additions & 6 deletions crates/controller-admin-jobs/src/jobs/handlers/create.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Job creation handler

use amp_job_core::job_id::JobId;
use amp_job_core::{job_id::JobId, retry_strategy::RetryStrategy};
use amp_job_gc::job_descriptor::JobDescriptor as GcJobDescriptor;
use amp_job_materialize_datasets_derived::job_descriptor::JobDescriptor as MaterializeDerivedJobDescriptor;
use amp_job_materialize_datasets_raw::job_descriptor::JobDescriptor as MaterializeRawJobDescriptor;
Expand All @@ -25,14 +25,20 @@ use crate::{
/// or `materialize-derived`). Additional fields depend on the job kind and are
/// defined by the corresponding job descriptor type.
///
/// ## Request Body
/// - `kind`: Job type (`gc`, `materialize-raw`, or `materialize-derived`)
/// - `worker_id`: Optional worker selector (exact ID or glob pattern)
/// - `retry_strategy`: Optional retry strategy controlling how failed jobs are retried
/// - Additional fields vary by job kind (see [`CreateJobRequest`] variants)
///
/// ## Response
/// - **202 Accepted**: Job scheduled successfully
/// - **400 Bad Request**: Invalid request body or unsupported job kind
/// - **400 Bad Request**: Invalid request body, invalid retry strategy, or no workers
/// - **409 Conflict**: An active job with the same idempotency key already exists
/// - **500 Internal Server Error**: Scheduler error
///
/// ## Error Codes
/// - `INVALID_BODY`: The request body is not valid JSON or has missing/invalid fields
/// - `INVALID_BODY`: The request body is not valid JSON, has missing/invalid fields, or has invalid retry strategy parameters
/// - `NO_WORKERS_AVAILABLE`: No active workers to handle the job
/// - `ACTIVE_JOB_CONFLICT`: An active job with the same idempotency key already exists
/// - `SCHEDULER_ERROR`: Internal scheduler failure
Expand Down Expand Up @@ -67,35 +73,38 @@ pub async fn handler(
CreateJobRequest::Gc {
descriptor,
worker_id,
retry_strategy,
} => {
let key = amp_job_gc::job_key::idempotency_key(descriptor.location_id);
let desc = scheduler::JobDescriptor::from(descriptor);
let desc = scheduler::JobDescriptor::gc(descriptor, retry_strategy);
(key, desc, worker_id)
}
CreateJobRequest::MaterializeRaw {
descriptor,
worker_id,
retry_strategy,
} => {
let reference = HashReference::new(
descriptor.dataset_namespace.clone(),
descriptor.dataset_name.clone(),
descriptor.manifest_hash.clone(),
);
let key = amp_job_materialize_datasets_raw::job_key::idempotency_key(&reference);
let desc = scheduler::JobDescriptor::from(descriptor);
let desc = scheduler::JobDescriptor::materialize_raw(descriptor, retry_strategy);
(key, desc, worker_id)
}
CreateJobRequest::MaterializeDerived {
descriptor,
worker_id,
retry_strategy,
} => {
let reference = HashReference::new(
descriptor.dataset_namespace.clone(),
descriptor.dataset_name.clone(),
descriptor.manifest_hash.clone(),
);
let key = amp_job_materialize_datasets_derived::job_key::idempotency_key(&reference);
let desc = scheduler::JobDescriptor::from(descriptor);
let desc = scheduler::JobDescriptor::materialize_derived(descriptor, retry_strategy);
(key, desc, worker_id)
}
};
Expand Down Expand Up @@ -137,6 +146,10 @@ pub enum CreateJobRequest {
#[serde(default)]
#[cfg_attr(feature = "utoipa", schema(value_type = Option<String>))]
worker_id: Option<scheduler::NodeSelector>,
/// Optional retry strategy controlling how failed jobs are retried.
#[serde(default)]
#[cfg_attr(feature = "utoipa", schema(value_type = Option<Object>))]
retry_strategy: Option<RetryStrategy>,
},
/// Schedule a raw dataset materialization job.
MaterializeRaw {
Expand All @@ -148,6 +161,10 @@ pub enum CreateJobRequest {
#[serde(default)]
#[cfg_attr(feature = "utoipa", schema(value_type = Option<String>))]
worker_id: Option<scheduler::NodeSelector>,
/// Optional retry strategy controlling how failed jobs are retried.
#[serde(default)]
#[cfg_attr(feature = "utoipa", schema(value_type = Option<Object>))]
retry_strategy: Option<RetryStrategy>,
},
/// Schedule a derived dataset materialization job.
MaterializeDerived {
Expand All @@ -159,6 +176,10 @@ pub enum CreateJobRequest {
#[serde(default)]
#[cfg_attr(feature = "utoipa", schema(value_type = Option<String>))]
worker_id: Option<scheduler::NodeSelector>,
/// Optional retry strategy controlling how failed jobs are retried.
#[serde(default)]
#[cfg_attr(feature = "utoipa", schema(value_type = Option<Object>))]
retry_strategy: Option<RetryStrategy>,
},
}

Expand Down
90 changes: 75 additions & 15 deletions crates/controller-admin-jobs/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
//! - Worker information queries
//! - Worker status retrieval

use amp_job_core::{job_id::JobId, status::JobStatus};
use amp_job_core::{job_id::JobId, retry_strategy::RetryStrategy, status::JobStatus};
use amp_worker_core::node_id::{InvalidIdError, NodeId, validate_node_id};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -228,27 +228,87 @@ impl From<&EventDetail<'_>> for JobDescriptor {
}
}

impl From<amp_job_materialize_datasets_raw::job_descriptor::JobDescriptor> for JobDescriptor {
fn from(desc: amp_job_materialize_datasets_raw::job_descriptor::JobDescriptor) -> Self {
let raw: EventDetailOwned = desc.into();
Self(raw.into_inner())
impl JobDescriptor {
/// Construct a scheduler descriptor for a raw dataset materialization job.
///
/// # Panics
///
/// Panics if the descriptor cannot be serialized to JSON. In practice this
/// cannot happen because the concrete descriptor types contain only
/// primitive and string fields.
pub fn materialize_raw(
desc: amp_job_materialize_datasets_raw::job_descriptor::JobDescriptor,
retry_strategy: Option<RetryStrategy>,
) -> Self {
Self::descriptor(
amp_job_materialize_datasets_raw::job_kind::JOB_KIND,
desc,
retry_strategy,
)
}
}

impl From<amp_job_materialize_datasets_derived::job_descriptor::JobDescriptor> for JobDescriptor {
fn from(desc: amp_job_materialize_datasets_derived::job_descriptor::JobDescriptor) -> Self {
let raw: EventDetailOwned = desc.into();
Self(raw.into_inner())
/// Construct a scheduler descriptor for a derived dataset materialization job.
///
/// # Panics
///
/// Panics if the descriptor cannot be serialized to JSON. In practice this
/// cannot happen because the concrete descriptor types contain only
/// primitive and string fields.
pub fn materialize_derived(
desc: amp_job_materialize_datasets_derived::job_descriptor::JobDescriptor,
retry_strategy: Option<RetryStrategy>,
) -> Self {
Self::descriptor(
amp_job_materialize_datasets_derived::job_kind::JOB_KIND,
desc,
retry_strategy,
)
}

/// Construct a scheduler descriptor for a garbage collection job.
///
/// # Panics
///
/// Panics if the descriptor cannot be serialized to JSON. In practice this
/// cannot happen because the concrete descriptor types contain only
/// primitive and string fields.
pub fn gc(
desc: amp_job_gc::job_descriptor::JobDescriptor,
retry_strategy: Option<RetryStrategy>,
) -> Self {
Self::descriptor(amp_job_gc::job_kind::JOB_KIND, desc, retry_strategy)
}
}

impl From<amp_job_gc::job_descriptor::JobDescriptor> for JobDescriptor {
fn from(desc: amp_job_gc::job_descriptor::JobDescriptor) -> Self {
let raw: EventDetailOwned = desc.into();
Self(raw.into_inner())
/// Serialize a job descriptor together with scheduler-level fields.
fn descriptor<D: serde::Serialize>(
kind: &str,
details: D,
retry_strategy: Option<RetryStrategy>,
) -> Self {
// SAFETY: All concrete descriptor types (`MaterializeRawJobDescriptor`,
// `MaterializeDerivedJobDescriptor`, `GcJobDescriptor`) contain only
// primitive and string fields whose `Serialize` impls are infallible.
let raw = serde_json::value::to_raw_value(&Descriptor {
kind,
retry_strategy,
details,
})
.expect("Descriptor serialization is infallible");
Self(raw)
}
}

/// Combines a typed job descriptor with its `kind` tag and scheduler-level
/// fields (retry strategy) into a single JSON object for storage.
#[derive(serde::Serialize)]
struct Descriptor<'a, D> {
kind: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
retry_strategy: Option<RetryStrategy>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make this mandatory, no optional retry_strategy. I understand that for backward-compat, we need this, but we should require the retry policy and trigger to be present in the job ledger job details.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracking issue here

#[serde(flatten)]
details: D,
}

/// Errors that can occur when scheduling a dataset dump job
#[derive(Debug, thiserror::Error)]
pub enum ScheduleJobError {
Expand Down
Loading
Loading