Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/bin/ampctl/src/cmd/job/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ fn show_dataset_descriptor(descriptor: &serde_json::Value) -> Option<String> {
dataset_name = desc.dataset_name,
hash = &desc.manifest_hash.as_str()[..7],
)),
JobDescriptor::Gc(desc) => Some(format!("gc location:{}", desc.location_id)),
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/services/admin-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ amp-providers-registry = { path = "../../core/providers-registry" }
amp-worker-core = { path = "../../core/worker-core" }
amp-worker-datasets-derived = { path = "../../core/worker-datasets-derived" }
amp-worker-datasets-raw = { path = "../../core/worker-datasets-raw" }
amp-worker-gc = { path = "../../core/worker-gc" }
async-trait.workspace = true
axum.workspace = true
common = { path = "../../core/common" }
Expand Down
7 changes: 7 additions & 0 deletions crates/services/admin-api/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,13 @@ impl From<amp_worker_datasets_derived::job_descriptor::JobDescriptor> for JobDes
}
}

impl From<amp_worker_gc::job_descriptor::JobDescriptor> for JobDescriptor {
fn from(desc: amp_worker_gc::job_descriptor::JobDescriptor) -> Self {
let raw: EventDetailOwned = desc.into();
Self(raw.into_inner())
}
}

/// Errors that can occur when scheduling a dataset dump job
#[derive(Debug, thiserror::Error)]
pub enum ScheduleJobError {
Expand Down
1 change: 1 addition & 0 deletions crates/services/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ amp-data-store = { path = "../../core/data-store" }
amp-worker-core = { path = "../../core/worker-core" }
amp-worker-datasets-derived = { path = "../../core/worker-datasets-derived" }
amp-worker-datasets-raw = { path = "../../core/worker-datasets-raw" }
amp-worker-gc = { path = "../../core/worker-gc" }
async-trait.workspace = true
backon.workspace = true
chrono.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/services/worker/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,5 @@ impl From<Job> for metadata_db::jobs::Job {
pub enum JobDescriptor {
MaterializeRaw(amp_worker_datasets_raw::job_descriptor::JobDescriptor),
MaterializeDerived(amp_worker_datasets_derived::job_descriptor::JobDescriptor),
Gc(amp_worker_gc::job_descriptor::JobDescriptor),
}
237 changes: 129 additions & 108 deletions crates/services/worker/src/service/job_impl.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Internal job implementation for the worker service.

use std::{future::Future, sync::Arc};
use std::sync::Arc;

use amp_worker_core::{
ProgressReporter,
Expand All @@ -16,148 +16,167 @@ use crate::{
events::WorkerProgressReporter, job::JobDescriptor, kafka::proto, service::WorkerJobCtx,
};

/// Create and run a worker job that materializes tables from a dataset.
/// Create and run a worker job.
///
/// This function returns a future that executes the materialization operation.
/// Raw datasets are handled by `amp_worker_datasets_raw` and derived
/// datasets are handled by `amp_worker_datasets_derived`.
pub(super) fn new(
/// This function returns a future that executes the job operation.
/// Raw datasets are handled by `amp_worker_datasets_raw`, derived
/// datasets by `amp_worker_datasets_derived`, and garbage collection
/// by `amp_worker_gc`.
pub(super) async fn new(
job_ctx: WorkerJobCtx,
job_desc: JobDescriptor,
job_id: JobId,
) -> impl Future<Output = Result<(), JobError>> {
let reference = match &job_desc {
JobDescriptor::MaterializeRaw(desc) => HashReference::new(
desc.dataset_namespace.clone(),
desc.dataset_name.clone(),
desc.manifest_hash.clone(),
),
JobDescriptor::MaterializeDerived(desc) => HashReference::new(
desc.dataset_namespace.clone(),
desc.dataset_name.clone(),
desc.manifest_hash.clone(),
),
};

let metrics = job_ctx
.meter
.as_ref()
.map(|m| Arc::new(MetricsRegistry::new(m, reference.clone(), *job_id)));

// Create progress reporter for event streaming
// Always create the reporter - NoOpEmitter will discard events if not needed
let progress_reporter: Option<Arc<dyn ProgressReporter>> = {
let dataset_info = proto::DatasetInfo {
namespace: reference.namespace().to_string(),
name: reference.name().to_string(),
manifest_hash: reference.hash().to_string(),
};
Some(Arc::new(WorkerProgressReporter::new(
job_id,
dataset_info,
job_ctx.event_emitter.clone(),
)))
};

let writer: metadata_db::jobs::JobId = job_id.into();
async move {
match job_desc {
JobDescriptor::MaterializeRaw(desc) => {
let ctx = amp_worker_datasets_raw::job_ctx::Context {
job_id: Some(writer),
config: amp_worker_datasets_raw::job_ctx::Config {
poll_interval: job_ctx.config.poll_interval,
progress_interval: job_ctx
.config
.events_config
.progress_interval
.clone()
.into(),
parquet_writer: (&job_ctx.config.parquet).into(),
},
metadata_db: job_ctx.metadata_db.clone(),
datasets_cache: job_ctx.datasets_cache.clone(),
ethcall_udfs_cache: job_ctx.ethcall_udfs_cache.clone(),
data_store: job_ctx.data_store.clone(),
notification_multiplexer: job_ctx.notification_multiplexer.clone(),
metrics,
progress_reporter,
) -> Result<(), JobError> {
match job_desc {
JobDescriptor::Gc(desc) => {
let ctx = amp_worker_gc::job_ctx::Context {
metadata_db: job_ctx.metadata_db.clone(),
data_store: job_ctx.data_store.clone(),
meter: job_ctx.meter.clone(),
};

amp_worker_gc::job_impl::execute(ctx, desc)
.instrument(info_span!("gc_job", %job_id))
.await
.map_err(JobError::Gc)?;
}
JobDescriptor::MaterializeRaw(desc) => {
let reference = HashReference::new(
desc.dataset_namespace.clone(),
desc.dataset_name.clone(),
desc.manifest_hash.clone(),
);
let metrics = job_ctx
.meter
.as_ref()
.map(|m| Arc::new(MetricsRegistry::new(m, reference.clone(), *job_id)));
let progress_reporter: Option<Arc<dyn ProgressReporter>> = {
let dataset_info = proto::DatasetInfo {
namespace: reference.namespace().to_string(),
name: reference.name().to_string(),
manifest_hash: reference.hash().to_string(),
};
Some(Arc::new(WorkerProgressReporter::new(
job_id,
dataset_info,
job_ctx.event_emitter.clone(),
)))
};
let writer: metadata_db::jobs::JobId = job_id.into();

amp_worker_datasets_raw::job_impl::execute(ctx, desc, writer)
let ctx = amp_worker_datasets_raw::job_ctx::Context {
job_id: Some(writer),
config: amp_worker_datasets_raw::job_ctx::Config {
poll_interval: job_ctx.config.poll_interval,
progress_interval: job_ctx
.config
.events_config
.progress_interval
.clone()
.into(),
parquet_writer: (&job_ctx.config.parquet).into(),
},
metadata_db: job_ctx.metadata_db.clone(),
datasets_cache: job_ctx.datasets_cache.clone(),
ethcall_udfs_cache: job_ctx.ethcall_udfs_cache.clone(),
data_store: job_ctx.data_store.clone(),
notification_multiplexer: job_ctx.notification_multiplexer.clone(),
metrics,
progress_reporter,
};

amp_worker_datasets_raw::job_impl::execute(ctx, desc, writer)
.instrument(
info_span!("materialize_raw_job", %job_id, dataset = %format!("{reference:#}")),
)
.await
.map_err(JobError::MaterializeRaw)?;
}
JobDescriptor::MaterializeDerived(desc) => {
let ctx = amp_worker_datasets_derived::job_ctx::Context {
job_id: Some(writer),
config: amp_worker_datasets_derived::job_ctx::Config {
keep_alive_interval: job_ctx.config.keep_alive_interval,
max_mem_mb: job_ctx.config.max_mem_mb,
query_max_mem_mb: job_ctx.config.query_max_mem_mb,
spill_location: job_ctx.config.spill_location.clone(),
progress_interval: job_ctx
.config
.events_config
.progress_interval
.clone()
.into(),
parquet_writer: (&job_ctx.config.parquet).into(),
microbatch_max_interval: job_ctx.config.microbatch_max_interval,
},
metadata_db: job_ctx.metadata_db.clone(),
datasets_cache: job_ctx.datasets_cache.clone(),
ethcall_udfs_cache: job_ctx.ethcall_udfs_cache.clone(),
data_store: job_ctx.data_store.clone(),
isolate_pool: job_ctx.isolate_pool.clone(),
notification_multiplexer: job_ctx.notification_multiplexer.clone(),
metrics,
progress_reporter,
}
JobDescriptor::MaterializeDerived(desc) => {
let reference = HashReference::new(
desc.dataset_namespace.clone(),
desc.dataset_name.clone(),
desc.manifest_hash.clone(),
);
let metrics = job_ctx
.meter
.as_ref()
.map(|m| Arc::new(MetricsRegistry::new(m, reference.clone(), *job_id)));
let progress_reporter: Option<Arc<dyn ProgressReporter>> = {
let dataset_info = proto::DatasetInfo {
namespace: reference.namespace().to_string(),
name: reference.name().to_string(),
manifest_hash: reference.hash().to_string(),
};
Some(Arc::new(WorkerProgressReporter::new(
job_id,
dataset_info,
job_ctx.event_emitter.clone(),
)))
};
let writer: metadata_db::jobs::JobId = job_id.into();

amp_worker_datasets_derived::job_impl::execute(ctx, desc, writer)
let ctx = amp_worker_datasets_derived::job_ctx::Context {
job_id: Some(writer),
config: amp_worker_datasets_derived::job_ctx::Config {
keep_alive_interval: job_ctx.config.keep_alive_interval,
max_mem_mb: job_ctx.config.max_mem_mb,
query_max_mem_mb: job_ctx.config.query_max_mem_mb,
spill_location: job_ctx.config.spill_location.clone(),
progress_interval: job_ctx
.config
.events_config
.progress_interval
.clone()
.into(),
parquet_writer: (&job_ctx.config.parquet).into(),
microbatch_max_interval: job_ctx.config.microbatch_max_interval,
},
metadata_db: job_ctx.metadata_db.clone(),
datasets_cache: job_ctx.datasets_cache.clone(),
ethcall_udfs_cache: job_ctx.ethcall_udfs_cache.clone(),
data_store: job_ctx.data_store.clone(),
isolate_pool: job_ctx.isolate_pool.clone(),
notification_multiplexer: job_ctx.notification_multiplexer.clone(),
metrics,
progress_reporter,
};

amp_worker_datasets_derived::job_impl::execute(ctx, desc, writer)
.instrument(info_span!("materialize_derived_job", %job_id, dataset = %format!("{reference:#}")))
.await
.map_err(JobError::MaterializeDerived)?;
}
}

Ok(())
}

Ok(())
}

/// Errors from dataset materialization job execution.
/// Errors from worker job execution.
///
/// Wraps the specific error types from raw and derived dataset materialization operations
/// to provide a unified error type for the worker job system.
/// Wraps the specific error types from each job type to provide a unified
/// error type for the worker job system.
#[derive(Debug, thiserror::Error)]
pub(crate) enum JobError {
/// Raw dataset materialization operation failed
///
/// This occurs when the raw dataset extraction and Parquet file writing
/// process encounters an error. Common causes include blockchain client
/// connectivity issues, consistency check failures, and partition task errors.
/// Raw dataset materialization operation failed.
#[error("Failed to materialize raw dataset")]
MaterializeRaw(#[source] amp_worker_datasets_raw::job_impl::Error),

/// Derived dataset materialization operation failed
///
/// This occurs when the derived dataset SQL query execution and Parquet
/// file writing process encounters an error. Common causes include query
/// environment creation failures, manifest retrieval errors, and table
/// materialization failures.
/// Derived dataset materialization operation failed.
#[error("Failed to materialize derived dataset")]
MaterializeDerived(#[source] amp_worker_datasets_derived::job_impl::Error),

/// Garbage collection job failed.
#[error("Failed to run garbage collection")]
Gc(#[source] amp_worker_gc::job_impl::Error),
}

impl JobErrorExt for JobError {
fn error_code(&self) -> &'static str {
match self {
Self::MaterializeRaw(err) => err.error_code(),
Self::MaterializeDerived(err) => err.error_code(),
Self::Gc(err) => err.error_code(),
}
}
}
Expand All @@ -167,6 +186,7 @@ impl ErrorDetailsProvider for JobError {
match self {
Self::MaterializeRaw(err) => Some(err),
Self::MaterializeDerived(err) => Some(err),
Self::Gc(err) => Some(err),
}
}
}
Expand All @@ -176,6 +196,7 @@ impl RetryableErrorExt for JobError {
match self {
Self::MaterializeRaw(err) => err.is_retryable(),
Self::MaterializeDerived(err) => err.is_retryable(),
Self::Gc(err) => err.is_retryable(),
}
}
}
Loading