diff --git a/Cargo.lock b/Cargo.lock index 5644fae1e..c5c2f629f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -36,6 +36,7 @@ dependencies = [ "amp-worker-core", "amp-worker-datasets-derived", "amp-worker-datasets-raw", + "amp-worker-gc", "async-trait", "axum", "common", @@ -14947,6 +14948,7 @@ dependencies = [ "amp-worker-core", "amp-worker-datasets-derived", "amp-worker-datasets-raw", + "amp-worker-gc", "async-trait", "backon", "chrono", diff --git a/crates/bin/ampctl/src/cmd/job/list.rs b/crates/bin/ampctl/src/cmd/job/list.rs index 84972e5fd..d1119788a 100644 --- a/crates/bin/ampctl/src/cmd/job/list.rs +++ b/crates/bin/ampctl/src/cmd/job/list.rs @@ -133,6 +133,7 @@ fn show_dataset_descriptor(descriptor: &serde_json::Value) -> Option { dataset_name = desc.dataset_name, hash = &desc.manifest_hash.as_str()[..7], )), + JobDescriptor::Gc(desc) => Some(format!("gc location:{}", desc.location_id)), } } diff --git a/crates/services/admin-api/Cargo.toml b/crates/services/admin-api/Cargo.toml index dd87f638e..3219f9953 100644 --- a/crates/services/admin-api/Cargo.toml +++ b/crates/services/admin-api/Cargo.toml @@ -14,6 +14,7 @@ amp-providers-registry = { path = "../../core/providers-registry" } amp-worker-core = { path = "../../core/worker-core" } amp-worker-datasets-derived = { path = "../../core/worker-datasets-derived" } amp-worker-datasets-raw = { path = "../../core/worker-datasets-raw" } +amp-worker-gc = { path = "../../core/worker-gc" } async-trait.workspace = true axum.workspace = true common = { path = "../../core/common" } diff --git a/crates/services/admin-api/src/scheduler.rs b/crates/services/admin-api/src/scheduler.rs index c093d732d..2861e6e7e 100644 --- a/crates/services/admin-api/src/scheduler.rs +++ b/crates/services/admin-api/src/scheduler.rs @@ -217,6 +217,13 @@ impl From for JobDes } } +impl From for JobDescriptor { + fn from(desc: amp_worker_gc::job_descriptor::JobDescriptor) -> Self { + let raw: EventDetailOwned = desc.into(); + Self(raw.into_inner()) + } +} + /// Errors that can occur when scheduling a dataset dump job #[derive(Debug, thiserror::Error)] pub enum ScheduleJobError { diff --git a/crates/services/worker/Cargo.toml b/crates/services/worker/Cargo.toml index 2bc0ea6ab..9d687b743 100644 --- a/crates/services/worker/Cargo.toml +++ b/crates/services/worker/Cargo.toml @@ -10,6 +10,7 @@ amp-data-store = { path = "../../core/data-store" } amp-worker-core = { path = "../../core/worker-core" } amp-worker-datasets-derived = { path = "../../core/worker-datasets-derived" } amp-worker-datasets-raw = { path = "../../core/worker-datasets-raw" } +amp-worker-gc = { path = "../../core/worker-gc" } async-trait.workspace = true backon.workspace = true chrono.workspace = true diff --git a/crates/services/worker/src/job.rs b/crates/services/worker/src/job.rs index a9988f48b..6efebf397 100644 --- a/crates/services/worker/src/job.rs +++ b/crates/services/worker/src/job.rs @@ -56,4 +56,5 @@ impl From for metadata_db::jobs::Job { pub enum JobDescriptor { MaterializeRaw(amp_worker_datasets_raw::job_descriptor::JobDescriptor), MaterializeDerived(amp_worker_datasets_derived::job_descriptor::JobDescriptor), + Gc(amp_worker_gc::job_descriptor::JobDescriptor), } diff --git a/crates/services/worker/src/service/job_impl.rs b/crates/services/worker/src/service/job_impl.rs index 68ddb364a..586f99b5a 100644 --- a/crates/services/worker/src/service/job_impl.rs +++ b/crates/services/worker/src/service/job_impl.rs @@ -1,6 +1,6 @@ //! Internal job implementation for the worker service. -use std::{future::Future, sync::Arc}; +use std::sync::Arc; use amp_worker_core::{ ProgressReporter, @@ -16,141 +16,159 @@ use crate::{ events::WorkerProgressReporter, job::JobDescriptor, kafka::proto, service::WorkerJobCtx, }; -/// Create and run a worker job that materializes tables from a dataset. +/// Create and run a worker job. /// -/// This function returns a future that executes the materialization operation. -/// Raw datasets are handled by `amp_worker_datasets_raw` and derived -/// datasets are handled by `amp_worker_datasets_derived`. -pub(super) fn new( +/// This function returns a future that executes the job operation. +/// Raw datasets are handled by `amp_worker_datasets_raw`, derived +/// datasets by `amp_worker_datasets_derived`, and garbage collection +/// by `amp_worker_gc`. +pub(super) async fn new( job_ctx: WorkerJobCtx, job_desc: JobDescriptor, job_id: JobId, -) -> impl Future> { - let reference = match &job_desc { - JobDescriptor::MaterializeRaw(desc) => HashReference::new( - desc.dataset_namespace.clone(), - desc.dataset_name.clone(), - desc.manifest_hash.clone(), - ), - JobDescriptor::MaterializeDerived(desc) => HashReference::new( - desc.dataset_namespace.clone(), - desc.dataset_name.clone(), - desc.manifest_hash.clone(), - ), - }; - - let metrics = job_ctx - .meter - .as_ref() - .map(|m| Arc::new(MetricsRegistry::new(m, reference.clone(), *job_id))); - - // Create progress reporter for event streaming - // Always create the reporter - NoOpEmitter will discard events if not needed - let progress_reporter: Option> = { - let dataset_info = proto::DatasetInfo { - namespace: reference.namespace().to_string(), - name: reference.name().to_string(), - manifest_hash: reference.hash().to_string(), - }; - Some(Arc::new(WorkerProgressReporter::new( - job_id, - dataset_info, - job_ctx.event_emitter.clone(), - ))) - }; - - let writer: metadata_db::jobs::JobId = job_id.into(); - async move { - match job_desc { - JobDescriptor::MaterializeRaw(desc) => { - let ctx = amp_worker_datasets_raw::job_ctx::Context { - job_id: Some(writer), - config: amp_worker_datasets_raw::job_ctx::Config { - poll_interval: job_ctx.config.poll_interval, - progress_interval: job_ctx - .config - .events_config - .progress_interval - .clone() - .into(), - parquet_writer: (&job_ctx.config.parquet).into(), - }, - metadata_db: job_ctx.metadata_db.clone(), - datasets_cache: job_ctx.datasets_cache.clone(), - ethcall_udfs_cache: job_ctx.ethcall_udfs_cache.clone(), - data_store: job_ctx.data_store.clone(), - notification_multiplexer: job_ctx.notification_multiplexer.clone(), - metrics, - progress_reporter, +) -> Result<(), JobError> { + match job_desc { + JobDescriptor::Gc(desc) => { + let ctx = amp_worker_gc::job_ctx::Context { + metadata_db: job_ctx.metadata_db.clone(), + data_store: job_ctx.data_store.clone(), + meter: job_ctx.meter.clone(), + }; + + amp_worker_gc::job_impl::execute(ctx, desc) + .instrument(info_span!("gc_job", %job_id)) + .await + .map_err(JobError::Gc)?; + } + JobDescriptor::MaterializeRaw(desc) => { + let reference = HashReference::new( + desc.dataset_namespace.clone(), + desc.dataset_name.clone(), + desc.manifest_hash.clone(), + ); + let metrics = job_ctx + .meter + .as_ref() + .map(|m| Arc::new(MetricsRegistry::new(m, reference.clone(), *job_id))); + let progress_reporter: Option> = { + let dataset_info = proto::DatasetInfo { + namespace: reference.namespace().to_string(), + name: reference.name().to_string(), + manifest_hash: reference.hash().to_string(), }; + Some(Arc::new(WorkerProgressReporter::new( + job_id, + dataset_info, + job_ctx.event_emitter.clone(), + ))) + }; + let writer: metadata_db::jobs::JobId = job_id.into(); - amp_worker_datasets_raw::job_impl::execute(ctx, desc, writer) + let ctx = amp_worker_datasets_raw::job_ctx::Context { + job_id: Some(writer), + config: amp_worker_datasets_raw::job_ctx::Config { + poll_interval: job_ctx.config.poll_interval, + progress_interval: job_ctx + .config + .events_config + .progress_interval + .clone() + .into(), + parquet_writer: (&job_ctx.config.parquet).into(), + }, + metadata_db: job_ctx.metadata_db.clone(), + datasets_cache: job_ctx.datasets_cache.clone(), + ethcall_udfs_cache: job_ctx.ethcall_udfs_cache.clone(), + data_store: job_ctx.data_store.clone(), + notification_multiplexer: job_ctx.notification_multiplexer.clone(), + metrics, + progress_reporter, + }; + + amp_worker_datasets_raw::job_impl::execute(ctx, desc, writer) .instrument( info_span!("materialize_raw_job", %job_id, dataset = %format!("{reference:#}")), ) .await .map_err(JobError::MaterializeRaw)?; - } - JobDescriptor::MaterializeDerived(desc) => { - let ctx = amp_worker_datasets_derived::job_ctx::Context { - job_id: Some(writer), - config: amp_worker_datasets_derived::job_ctx::Config { - keep_alive_interval: job_ctx.config.keep_alive_interval, - max_mem_mb: job_ctx.config.max_mem_mb, - query_max_mem_mb: job_ctx.config.query_max_mem_mb, - spill_location: job_ctx.config.spill_location.clone(), - progress_interval: job_ctx - .config - .events_config - .progress_interval - .clone() - .into(), - parquet_writer: (&job_ctx.config.parquet).into(), - microbatch_max_interval: job_ctx.config.microbatch_max_interval, - }, - metadata_db: job_ctx.metadata_db.clone(), - datasets_cache: job_ctx.datasets_cache.clone(), - ethcall_udfs_cache: job_ctx.ethcall_udfs_cache.clone(), - data_store: job_ctx.data_store.clone(), - isolate_pool: job_ctx.isolate_pool.clone(), - notification_multiplexer: job_ctx.notification_multiplexer.clone(), - metrics, - progress_reporter, + } + JobDescriptor::MaterializeDerived(desc) => { + let reference = HashReference::new( + desc.dataset_namespace.clone(), + desc.dataset_name.clone(), + desc.manifest_hash.clone(), + ); + let metrics = job_ctx + .meter + .as_ref() + .map(|m| Arc::new(MetricsRegistry::new(m, reference.clone(), *job_id))); + let progress_reporter: Option> = { + let dataset_info = proto::DatasetInfo { + namespace: reference.namespace().to_string(), + name: reference.name().to_string(), + manifest_hash: reference.hash().to_string(), }; + Some(Arc::new(WorkerProgressReporter::new( + job_id, + dataset_info, + job_ctx.event_emitter.clone(), + ))) + }; + let writer: metadata_db::jobs::JobId = job_id.into(); - amp_worker_datasets_derived::job_impl::execute(ctx, desc, writer) + let ctx = amp_worker_datasets_derived::job_ctx::Context { + job_id: Some(writer), + config: amp_worker_datasets_derived::job_ctx::Config { + keep_alive_interval: job_ctx.config.keep_alive_interval, + max_mem_mb: job_ctx.config.max_mem_mb, + query_max_mem_mb: job_ctx.config.query_max_mem_mb, + spill_location: job_ctx.config.spill_location.clone(), + progress_interval: job_ctx + .config + .events_config + .progress_interval + .clone() + .into(), + parquet_writer: (&job_ctx.config.parquet).into(), + microbatch_max_interval: job_ctx.config.microbatch_max_interval, + }, + metadata_db: job_ctx.metadata_db.clone(), + datasets_cache: job_ctx.datasets_cache.clone(), + ethcall_udfs_cache: job_ctx.ethcall_udfs_cache.clone(), + data_store: job_ctx.data_store.clone(), + isolate_pool: job_ctx.isolate_pool.clone(), + notification_multiplexer: job_ctx.notification_multiplexer.clone(), + metrics, + progress_reporter, + }; + + amp_worker_datasets_derived::job_impl::execute(ctx, desc, writer) .instrument(info_span!("materialize_derived_job", %job_id, dataset = %format!("{reference:#}"))) .await .map_err(JobError::MaterializeDerived)?; - } } - - Ok(()) } + + Ok(()) } -/// Errors from dataset materialization job execution. +/// Errors from worker job execution. /// -/// Wraps the specific error types from raw and derived dataset materialization operations -/// to provide a unified error type for the worker job system. +/// Wraps the specific error types from each job type to provide a unified +/// error type for the worker job system. #[derive(Debug, thiserror::Error)] pub(crate) enum JobError { - /// Raw dataset materialization operation failed - /// - /// This occurs when the raw dataset extraction and Parquet file writing - /// process encounters an error. Common causes include blockchain client - /// connectivity issues, consistency check failures, and partition task errors. + /// Raw dataset materialization operation failed. #[error("Failed to materialize raw dataset")] MaterializeRaw(#[source] amp_worker_datasets_raw::job_impl::Error), - /// Derived dataset materialization operation failed - /// - /// This occurs when the derived dataset SQL query execution and Parquet - /// file writing process encounters an error. Common causes include query - /// environment creation failures, manifest retrieval errors, and table - /// materialization failures. + /// Derived dataset materialization operation failed. #[error("Failed to materialize derived dataset")] MaterializeDerived(#[source] amp_worker_datasets_derived::job_impl::Error), + + /// Garbage collection job failed. + #[error("Failed to run garbage collection")] + Gc(#[source] amp_worker_gc::job_impl::Error), } impl JobErrorExt for JobError { @@ -158,6 +176,7 @@ impl JobErrorExt for JobError { match self { Self::MaterializeRaw(err) => err.error_code(), Self::MaterializeDerived(err) => err.error_code(), + Self::Gc(err) => err.error_code(), } } } @@ -167,6 +186,7 @@ impl ErrorDetailsProvider for JobError { match self { Self::MaterializeRaw(err) => Some(err), Self::MaterializeDerived(err) => Some(err), + Self::Gc(err) => Some(err), } } } @@ -176,6 +196,7 @@ impl RetryableErrorExt for JobError { match self { Self::MaterializeRaw(err) => err.is_retryable(), Self::MaterializeDerived(err) => err.is_retryable(), + Self::Gc(err) => err.is_retryable(), } } }