diff --git a/crates/core/metadata-db/migrations/20260304000000_add_jobs_status.sql b/crates/core/metadata-db/migrations/20260304000000_add_jobs_status.sql new file mode 100644 index 000000000..6276cbcc8 --- /dev/null +++ b/crates/core/metadata-db/migrations/20260304000000_add_jobs_status.sql @@ -0,0 +1,32 @@ +-- ============================================================= +-- Migration: Add jobs_status projection table +-- ============================================================= +-- 1. Create jobs_status table (one row per job, latest state) +-- 2. Backfill from jobs table +-- 3. Add foreign key constraint +-- 4. Add indexes for common query patterns +-- ============================================================= + +-- 1. Create jobs_status table +CREATE TABLE IF NOT EXISTS jobs_status ( + job_id BIGINT NOT NULL, + node_id TEXT NOT NULL, + status TEXT NOT NULL, + updated_at TIMESTAMPTZ NOT NULL DEFAULT timezone('UTC', now()), + CONSTRAINT jobs_status_pkey PRIMARY KEY (job_id) +); + +-- 2. Backfill from existing jobs +INSERT INTO jobs_status (job_id, node_id, status, updated_at) +SELECT id, node_id, status, updated_at +FROM jobs +ON CONFLICT (job_id) DO NOTHING; + +-- 3. Add foreign key +ALTER TABLE jobs_status + ADD CONSTRAINT fk_jobs_status_job_id + FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE; + +-- 4. Add indexes for common query patterns +CREATE INDEX IF NOT EXISTS idx_jobs_status_status ON jobs_status (status); +CREATE INDEX IF NOT EXISTS idx_jobs_status_node_id_status ON jobs_status (node_id, status); diff --git a/crates/core/metadata-db/src/error.rs b/crates/core/metadata-db/src/error.rs index cc86675e7..35ff0a8ac 100644 --- a/crates/core/metadata-db/src/error.rs +++ b/crates/core/metadata-db/src/error.rs @@ -149,7 +149,7 @@ pub enum Error { /// /// See `JobStatusUpdateError` for specific transition validation errors. #[error("Job status update error: {0}")] - JobStatusUpdate(#[source] crate::jobs::JobStatusUpdateError), + JobStatusUpdate(#[source] crate::job_status::JobStatusUpdateError), /// Failed to get active physical table by location ID /// diff --git a/crates/core/metadata-db/src/job_events/tests/it_job_events.rs b/crates/core/metadata-db/src/job_events/tests/it_job_events.rs index 5fae6a49c..a8312756f 100644 --- a/crates/core/metadata-db/src/job_events/tests/it_job_events.rs +++ b/crates/core/metadata-db/src/job_events/tests/it_job_events.rs @@ -5,15 +5,11 @@ use pgtemp::PgTempDB; use crate::{ config::DEFAULT_POOL_MAX_CONNECTIONS, job_events, - jobs::{self, JobDescriptorRaw, JobStatus}, + jobs::JobStatus, + tests::common::{raw_descriptor, register_job}, workers::{self, WorkerInfo, WorkerNodeId}, }; -fn raw_descriptor(value: &serde_json::Value) -> JobDescriptorRaw<'static> { - let raw = serde_json::value::to_raw_value(value).expect("Failed to serialize to raw value"); - JobDescriptorRaw::from_owned_unchecked(raw) -} - #[tokio::test] async fn register_inserts_event() { //* Given @@ -29,14 +25,9 @@ async fn register_inserts_event() { .expect("Failed to register worker"); let job_desc = raw_descriptor(&serde_json::json!({"test": "event"})); - let job_id = jobs::sql::insert_with_default_status(&conn, worker_id.clone(), &job_desc) - .await - .expect("Failed to insert job"); //* When - job_events::register(&conn, job_id, &worker_id, JobStatus::Scheduled) - .await - .expect("Failed to register event"); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; //* Then let attempts = job_events::get_attempts_for_job(&conn, job_id) @@ -62,14 +53,9 @@ async fn get_attempts_returns_only_scheduled_events() { .expect("Failed to register worker"); let job_desc = raw_descriptor(&serde_json::json!({"test": "filter"})); - let job_id = jobs::sql::insert_with_default_status(&conn, worker_id.clone(), &job_desc) - .await - .expect("Failed to insert job"); // Insert a mix of event types - job_events::register(&conn, job_id, &worker_id, JobStatus::Scheduled) - .await - .expect("Failed to register SCHEDULED"); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; job_events::register(&conn, job_id, &worker_id, JobStatus::Running) .await .expect("Failed to register RUNNING"); @@ -91,34 +77,6 @@ async fn get_attempts_returns_only_scheduled_events() { assert_eq!(attempts[1].retry_index, 1); } -#[tokio::test] -async fn get_attempts_returns_empty_for_no_events() { - //* Given - let temp_db = PgTempDB::new(); - let conn = - crate::connect_pool_with_retry(&temp_db.connection_uri(), DEFAULT_POOL_MAX_CONNECTIONS) - .await - .expect("Failed to connect to metadata db"); - - let worker_id = WorkerNodeId::from_ref_unchecked("test-worker"); - workers::register(&conn, worker_id.clone(), WorkerInfo::default()) - .await - .expect("Failed to register worker"); - - let job_desc = raw_descriptor(&serde_json::json!({"test": "empty"})); - let job_id = jobs::sql::insert_with_default_status(&conn, worker_id.clone(), &job_desc) - .await - .expect("Failed to insert job"); - - //* When - let attempts = job_events::get_attempts_for_job(&conn, job_id) - .await - .expect("Failed to get attempts"); - - //* Then - assert!(attempts.is_empty()); -} - #[tokio::test] async fn get_attempts_scoped_to_job() { //* Given @@ -135,23 +93,13 @@ async fn get_attempts_scoped_to_job() { let job_desc = raw_descriptor(&serde_json::json!({"test": "scope"})); - let job_id_1 = jobs::sql::insert_with_default_status(&conn, worker_id.clone(), &job_desc) - .await - .expect("Failed to insert job 1"); - let job_id_2 = jobs::sql::insert_with_default_status(&conn, worker_id.clone(), &job_desc) - .await - .expect("Failed to insert job 2"); - // Insert events for both jobs - job_events::register(&conn, job_id_1, &worker_id, JobStatus::Scheduled) - .await - .expect("Failed to register event for job 1"); + let job_id_1 = register_job(&conn, &job_desc, &worker_id, None).await; job_events::register(&conn, job_id_1, &worker_id, JobStatus::Scheduled) .await .expect("Failed to register second event for job 1"); - job_events::register(&conn, job_id_2, &worker_id, JobStatus::Scheduled) - .await - .expect("Failed to register event for job 2"); + + let job_id_2 = register_job(&conn, &job_desc, &worker_id, None).await; //* When let attempts_1 = job_events::get_attempts_for_job(&conn, job_id_1) diff --git a/crates/core/metadata-db/src/job_status.rs b/crates/core/metadata-db/src/job_status.rs new file mode 100644 index 000000000..6a9c32f4d --- /dev/null +++ b/crates/core/metadata-db/src/job_status.rs @@ -0,0 +1,427 @@ +//! Job status projection operations for the read-optimized `jobs_status` table. +//! +//! This module provides the public API for managing the current-state projection +//! of jobs. The `jobs_status` table maintains one row per job reflecting its latest +//! state, updated transactionally alongside every event appended to `job_events`. +//! +//! State transitions are enforced via conditional updates that validate the current +//! status before applying changes, preventing invalid transitions. + +use crate::{Error, db::Executor, jobs::JobId, workers::WorkerNodeId}; + +pub(crate) mod sql; + +/// Register an initial status projection for a job +#[tracing::instrument(skip(exe), err)] +pub async fn register<'c, E>( + exe: E, + job_id: impl Into + std::fmt::Debug, + node_id: &WorkerNodeId<'_>, + status: JobStatus, +) -> Result<(), Error> +where + E: Executor<'c>, +{ + sql::insert(exe, job_id.into(), node_id, status) + .await + .map_err(Error::Database) +} + +/// Update job status to StopRequested +/// +/// This function will only update the job status if it's currently in a valid state +/// to be stopped (Scheduled or Running). If the job is already stopping, this is +/// considered success (idempotent behavior). If the job is in a terminal state +/// (Stopped, Completed, Failed), this returns a conflict error. +/// +/// Returns `Ok(true)` if the status actually changed, `Ok(false)` if already in a +/// stop-related state (idempotent no-op), or an error if the job doesn't exist, is +/// in a terminal state, or if there's a database error. +/// +/// **Note:** This function does not send notifications. The caller is responsible for +/// calling `send_job_notification` after successful status update if worker notification +/// is required. +#[tracing::instrument(skip(exe), err)] +pub async fn request_stop<'c, E>( + exe: E, + job_id: impl Into + std::fmt::Debug, +) -> Result +where + E: Executor<'c>, +{ + // Try to update job status + match sql::update_status_if_any_state( + exe, + job_id.into(), + &[JobStatus::Running, JobStatus::Scheduled], + JobStatus::StopRequested, + ) + .await + { + Ok(()) => Ok(true), + // Check if the job is already stopping (idempotent behavior) + Err(JobStatusUpdateError::StateConflict { + actual: JobStatus::StopRequested | JobStatus::Stopping, + .. + }) => Ok(false), + Err(err) => Err(Error::JobStatusUpdate(err)), + } +} + +/// Conditionally marks a job as `RUNNING` only if it's currently `SCHEDULED` +/// +/// This provides idempotent behavior - if the job is already running, completed, or failed, +/// the appropriate error will be returned indicating the state conflict. +#[tracing::instrument(skip(exe), err)] +pub async fn mark_running<'c, E>( + exe: E, + id: impl Into + std::fmt::Debug, +) -> Result<(), Error> +where + E: Executor<'c>, +{ + sql::update_status_if_any_state(exe, id.into(), &[JobStatus::Scheduled], JobStatus::Running) + .await + .map_err(Error::JobStatusUpdate) +} + +/// Conditionally marks a job as `STOPPING` only if it's currently `STOP_REQUESTED` +/// +/// This is typically used by workers to acknowledge a stop request. +#[tracing::instrument(skip(exe), err)] +pub async fn mark_stopping<'c, E>( + exe: E, + id: impl Into + std::fmt::Debug, +) -> Result<(), Error> +where + E: Executor<'c>, +{ + sql::update_status_if_any_state( + exe, + id.into(), + &[JobStatus::StopRequested], + JobStatus::Stopping, + ) + .await + .map_err(Error::JobStatusUpdate) +} + +/// Conditionally marks a job as `STOPPED` only if it's currently `STOPPING` +/// +/// This provides proper state transition from stopping to stopped. +#[tracing::instrument(skip(exe), err)] +pub async fn mark_stopped<'c, E>( + exe: E, + id: impl Into + std::fmt::Debug, +) -> Result<(), Error> +where + E: Executor<'c>, +{ + sql::update_status_if_any_state(exe, id.into(), &[JobStatus::Stopping], JobStatus::Stopped) + .await + .map_err(Error::JobStatusUpdate) +} + +/// Conditionally marks a job as `COMPLETED` only if it's currently `RUNNING` +/// +/// This ensures jobs can only be completed from a running state. +#[tracing::instrument(skip(exe), err)] +pub async fn mark_completed<'c, E>( + exe: E, + id: impl Into + std::fmt::Debug, +) -> Result<(), Error> +where + E: Executor<'c>, +{ + sql::update_status_if_any_state(exe, id.into(), &[JobStatus::Running], JobStatus::Completed) + .await + .map_err(Error::JobStatusUpdate) +} + +/// Conditionally marks a job as `ERROR` from either `RUNNING` or `SCHEDULED` states +/// +/// This is used for recoverable failures where retry attempts can be made. +/// +/// Jobs can fail from either scheduled (startup failure) or running (runtime failure) states. +#[tracing::instrument(skip(exe), err)] +pub async fn mark_failed_recoverable<'c, E>( + exe: E, + id: impl Into + std::fmt::Debug, +) -> Result<(), Error> +where + E: Executor<'c>, +{ + sql::update_status_if_any_state( + exe, + id.into(), + &[JobStatus::Scheduled, JobStatus::Running], + JobStatus::Error, + ) + .await + .map_err(Error::JobStatusUpdate) +} + +/// Conditionally marks a job as `FATAL` from either `RUNNING` or `SCHEDULED` states +/// +/// This is used for unrecoverable failures where retry attempts should not be made. +/// +/// Jobs can fail from either scheduled (startup failure) or running (runtime failure) states. +#[tracing::instrument(skip(exe), err)] +pub async fn mark_failed_fatal<'c, E>( + exe: E, + id: impl Into + std::fmt::Debug, +) -> Result<(), Error> +where + E: Executor<'c>, +{ + sql::update_status_if_any_state( + exe, + id.into(), + &[JobStatus::Scheduled, JobStatus::Running], + JobStatus::Fatal, + ) + .await + .map_err(Error::JobStatusUpdate) +} + +/// Reschedule a failed job for retry +/// +/// Updates the job status to SCHEDULED in the `jobs_status` projection and assigns +/// the job to the given worker node. +/// +/// The caller is responsible for transaction management (commit/rollback) and for +/// inserting the corresponding event into `job_events`. +/// +/// This function does not send notifications. The caller is responsible +/// for sending notifications after successful rescheduling. +#[tracing::instrument(skip(exe), err)] +pub async fn reschedule<'c, E>( + exe: E, + job_id: impl Into + std::fmt::Debug, + new_node_id: impl Into> + std::fmt::Debug, +) -> Result<(), Error> +where + E: Executor<'c>, +{ + let job_id = job_id.into(); + let new_node_id = new_node_id.into(); + + sql::reschedule(exe, job_id, &new_node_id) + .await + .map_err(Error::Database) +} + +/// Error type for conditional job status updates +#[derive(Debug, thiserror::Error)] +pub enum JobStatusUpdateError { + /// The job was not found in the `jobs_status` projection + /// + /// This occurs when attempting to update a job that does not have a status entry. + #[error("Job not found")] + NotFound, + + /// The job exists but its current status does not match any of the expected states + /// + /// This occurs when a status transition is attempted but the job is in + /// an unexpected state (e.g., trying to mark a completed job as running). + #[error("Job state conflict: expected one of {expected:?}, but found {actual}")] + StateConflict { + expected: Vec, + actual: JobStatus, + }, + + /// A database error occurred during the status update query + /// + /// This occurs when the underlying SQL query fails due to connection issues, + /// constraint violations, or other database-level errors. + #[error("Database error during status update")] + Database(#[source] sqlx::Error), +} + +/// Job status enumeration and related implementations +/// +/// Represents the current status of a job +/// +/// This status is used to track the progress of a job, but it is not +/// guaranteed to be up to date. It is responsibility of the caller to +/// confirm the status of the job before proceeding. +/// +/// The status is stored as a `TEXT` column in the database. If the fetched +/// status is not one of the valid values in the enum, the `UNKNOWN` status is +/// returned. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum JobStatus { + /// Job is being scheduled. + /// + /// This is the initial state of a job. + /// + /// The scheduler has added the job to the queue, but the job has not + /// yet been picked up by the worker node. + #[default] + Scheduled, + + /// Job is running + /// + /// The job has been picked up by the worker node and is being executed. + Running, + + /// Job has finished successfully + /// + /// This is a terminal state. + Completed, + + /// Job has stopped + /// + /// The worker node has stopped the job as requested by the scheduler. + /// + /// This is a terminal state. + Stopped, + + /// Job has been requested to stop + /// + /// The scheduler has requested the job to stop. The worker will stop + /// the job as soon as possible. + StopRequested, + + /// Job is stopping + /// + /// The worker node acknowledged the stop request and will stop the job + /// as soon as possible. + Stopping, + + /// Job has failed with a recoverable error + /// + /// A recoverable error occurred while running the job. The job may succeed if retried. + /// + /// This is a terminal state. + Error, + + /// Job has failed with a fatal error + /// + /// A fatal error occurred while running the job. The job will not succeed if retried. + /// + /// This is a terminal state. + Fatal, + + /// Unknown status + /// + /// This is an invalid status, and should never happen. Although + /// it is possible to happen if the worker node version is different + /// from the version of the scheduler. + Unknown, +} + +impl JobStatus { + /// Convert the [`JobStatus`] to a string + pub fn as_str(&self) -> &'static str { + match self { + Self::Scheduled => "SCHEDULED", + Self::Running => "RUNNING", + Self::Completed => "COMPLETED", + Self::Stopped => "STOPPED", + Self::StopRequested => "STOP_REQUESTED", + Self::Stopping => "STOPPING", + Self::Error => "ERROR", + Self::Fatal => "FATAL", + Self::Unknown => "UNKNOWN", + } + } + + /// Returns true if the job status is terminal (cannot be changed further) + /// + /// Terminal states are final states where the job lifecycle has ended: + /// - `Completed`: Job finished successfully + /// - `Stopped`: Job was stopped by request + /// - `Failed`: Job encountered an error and failed + /// + /// Non-terminal states can still transition to other states + pub fn is_terminal(&self) -> bool { + matches!( + self, + Self::Completed | Self::Stopped | Self::Error | Self::Fatal + ) + } + + /// Returns an array of all terminal job statuses + /// + /// These are the statuses that represent completed job lifecycles + /// and can be safely deleted from the system. + pub fn terminal_statuses() -> [JobStatus; 4] { + [Self::Completed, Self::Stopped, Self::Error, Self::Fatal] + } + + /// Returns an array of all non-terminal (active) job statuses + /// + /// These are the statuses that represent jobs still in progress + /// and should be monitored or managed. + pub fn non_terminal_statuses() -> [JobStatus; 4] { + [ + Self::Scheduled, + Self::Running, + Self::StopRequested, + Self::Stopping, + ] + } +} + +impl std::str::FromStr for JobStatus { + type Err = std::convert::Infallible; + + fn from_str(s: &str) -> Result { + // Use `eq_ignore_ascii_case` to make the comparison case-insensitive + match s { + s if s.eq_ignore_ascii_case("SCHEDULED") => Ok(Self::Scheduled), + s if s.eq_ignore_ascii_case("RUNNING") => Ok(Self::Running), + s if s.eq_ignore_ascii_case("COMPLETED") => Ok(Self::Completed), + s if s.eq_ignore_ascii_case("STOPPED") => Ok(Self::Stopped), + s if s.eq_ignore_ascii_case("STOP_REQUESTED") => Ok(Self::StopRequested), + s if s.eq_ignore_ascii_case("STOPPING") => Ok(Self::Stopping), + s if s.eq_ignore_ascii_case("ERROR") => Ok(Self::Error), + s if s.eq_ignore_ascii_case("FATAL") => Ok(Self::Fatal), + _ => Ok(Self::Unknown), // Default to Unknown for Infallible + } + } +} + +impl std::fmt::Display for JobStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} + +impl sqlx::Type for JobStatus { + fn type_info() -> sqlx::postgres::PgTypeInfo { + sqlx::postgres::PgTypeInfo::with_name("TEXT") + } +} + +impl sqlx::postgres::PgHasArrayType for JobStatus { + fn array_type_info() -> sqlx::postgres::PgTypeInfo { + sqlx::postgres::PgTypeInfo::with_name("TEXT[]") + } +} + +impl<'r> sqlx::Decode<'r, sqlx::Postgres> for JobStatus { + fn decode( + value: ::ValueRef<'r>, + ) -> Result { + let value: &str = sqlx::Decode::::decode(value)?; + // Since FromStr::Err is Infallible, unwrap is safe. + Ok(value.parse().unwrap()) + } +} + +impl<'q> sqlx::Encode<'q, sqlx::Postgres> for JobStatus { + fn encode_by_ref( + &self, + buf: &mut ::ArgumentBuffer<'q>, + ) -> Result { + sqlx::Encode::::encode_by_ref(&self.as_str(), buf) + } +} + +/// In-tree integration tests +#[cfg(test)] +mod tests { + mod it_job_status; +} diff --git a/crates/core/metadata-db/src/job_status/sql.rs b/crates/core/metadata-db/src/job_status/sql.rs new file mode 100644 index 000000000..154545783 --- /dev/null +++ b/crates/core/metadata-db/src/job_status/sql.rs @@ -0,0 +1,132 @@ +//! Internal SQL operations for the `jobs_status` projection table. + +use sqlx::{Executor, Postgres}; + +use crate::{ + job_status::{JobStatus, JobStatusUpdateError}, + jobs::JobId, + workers::WorkerNodeId, +}; + +/// Idempotently insert a new job status projection +/// +/// Uses `ON CONFLICT DO NOTHING` to match the idempotent behavior of `jobs::sql::insert`. +/// When a job is re-registered with the same idempotency key, the existing status row is +/// preserved unchanged. +pub async fn insert<'c, E>( + exe: E, + job_id: JobId, + node_id: &WorkerNodeId<'_>, + status: JobStatus, +) -> Result<(), sqlx::Error> +where + E: Executor<'c, Database = Postgres>, +{ + let query = indoc::indoc! {r#" + INSERT INTO jobs_status (job_id, node_id, status, updated_at) + VALUES ($1, $2, $3, (timezone('UTC', now()))) + ON CONFLICT (job_id) DO NOTHING + "#}; + sqlx::query(query) + .bind(job_id) + .bind(node_id) + .bind(status) + .execute(exe) + .await?; + Ok(()) +} + +/// Update the status of a job with multiple possible expected original states +/// +/// This function will only update the job status if the job exists in `jobs_status` +/// and currently has one of the expected original statuses. +/// If the job doesn't exist, returns `JobStatusUpdateError::NotFound`. +/// If the job exists but has a different status, returns `JobStatusUpdateError::StateConflict`. +pub async fn update_status_if_any_state<'c, E>( + exe: E, + id: JobId, + expected_statuses: &[JobStatus], + new_status: JobStatus, +) -> Result<(), JobStatusUpdateError> +where + E: Executor<'c, Database = Postgres>, +{ + /// Internal structure to hold the result of the update operation + #[derive(Debug, sqlx::FromRow)] + struct UpdateResult { + updated_id: Option, + original_status: Option, + } + + let query = indoc::indoc! {r#" + WITH target_job AS ( + SELECT job_id, status + FROM jobs_status + WHERE job_id = $1 + ), + target_job_update AS ( + UPDATE jobs_status + SET status = $3, updated_at = timezone('UTC', now()) + WHERE job_id = $1 AND status = ANY($2) + RETURNING job_id + ) + SELECT + target_job_update.job_id AS updated_id, + target_job.status AS original_status + FROM target_job + LEFT JOIN target_job_update ON target_job.job_id = target_job_update.job_id + "#}; + + let result: Option = sqlx::query_as(query) + .bind(id) + .bind(expected_statuses) + .bind(new_status) + .fetch_optional(exe) + .await + .map_err(JobStatusUpdateError::Database)?; + + match result { + Some(UpdateResult { + updated_id: Some(_), + .. + }) => Ok(()), + Some(UpdateResult { + updated_id: None, + original_status: Some(status), + }) => Err(JobStatusUpdateError::StateConflict { + expected: expected_statuses.to_vec(), + actual: status, + }), + _ => Err(JobStatusUpdateError::NotFound), + } +} + +/// Reschedule a failed job by updating its status to SCHEDULED in `jobs_status` +/// +/// Updates the status and assigns the job to the specified worker node. +pub async fn reschedule<'c, E>( + exe: E, + job_id: JobId, + new_node_id: &WorkerNodeId<'_>, +) -> Result<(), sqlx::Error> +where + E: Executor<'c, Database = Postgres>, +{ + let query = indoc::indoc! {r#" + UPDATE jobs_status + SET + status = $3, + node_id = $2, + updated_at = timezone('UTC', now()) + WHERE job_id = $1 + "#}; + + sqlx::query(query) + .bind(job_id) + .bind(new_node_id) + .bind(JobStatus::Scheduled) + .execute(exe) + .await?; + + Ok(()) +} diff --git a/crates/core/metadata-db/src/job_status/tests/it_job_status.rs b/crates/core/metadata-db/src/job_status/tests/it_job_status.rs new file mode 100644 index 000000000..33481a786 --- /dev/null +++ b/crates/core/metadata-db/src/job_status/tests/it_job_status.rs @@ -0,0 +1,443 @@ +//! Integration tests for the job_status module + +use crate::{ + job_status, + jobs::{self, JobStatus}, + tests::common::{TEST_WORKER_ID, raw_descriptor, register_job, setup_test_db}, + workers::{self, WorkerInfo, WorkerNodeId}, +}; + +#[tokio::test] +async fn register_inserts_status_row() { + //* Given + let (_db, conn) = setup_test_db().await; + let worker_id = WorkerNodeId::from_ref_unchecked(TEST_WORKER_ID); + let job_desc = raw_descriptor(&serde_json::json!({"test": "register"})); + + //* When + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; + + //* Then + let job = jobs::get_by_id(&conn, job_id) + .await + .expect("Failed to get job") + .expect("Job should exist"); + assert_eq!(job.status, JobStatus::Scheduled); + assert_eq!(job.node_id.as_str(), TEST_WORKER_ID); +} + +#[tokio::test] +async fn mark_running_from_scheduled_succeeds() { + //* Given + let (_db, conn) = setup_test_db().await; + let worker_id = WorkerNodeId::from_ref_unchecked(TEST_WORKER_ID); + let job_desc = raw_descriptor(&serde_json::json!({"test": "mark_running"})); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; + + //* When + job_status::mark_running(&conn, job_id) + .await + .expect("Failed to mark job running"); + + //* Then + let job = jobs::get_by_id(&conn, job_id) + .await + .expect("Failed to get job") + .expect("Job should exist"); + assert_eq!(job.status, JobStatus::Running); +} + +#[tokio::test] +async fn mark_running_from_running_returns_state_conflict() { + //* Given + let (_db, conn) = setup_test_db().await; + let worker_id = WorkerNodeId::from_ref_unchecked(TEST_WORKER_ID); + let job_desc = raw_descriptor(&serde_json::json!({"test": "conflict"})); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; + job_status::mark_running(&conn, job_id) + .await + .expect("Failed to mark job running"); + + //* When + let result = job_status::mark_running(&conn, job_id).await; + + //* Then + let err = result.expect_err("Should return error for already running job"); + assert!( + matches!( + err, + crate::error::Error::JobStatusUpdate( + job_status::JobStatusUpdateError::StateConflict { .. } + ) + ), + "Expected StateConflict, got: {err:?}" + ); +} + +#[tokio::test] +async fn mark_running_nonexistent_job_returns_not_found() { + //* Given + let (_db, conn) = setup_test_db().await; + + //* When + let result = job_status::mark_running(&conn, jobs::JobId::from_i64_unchecked(999999)).await; + + //* Then + let err = result.expect_err("Should return error for nonexistent job"); + assert!( + matches!( + err, + crate::error::Error::JobStatusUpdate(job_status::JobStatusUpdateError::NotFound) + ), + "Expected NotFound, got: {err:?}" + ); +} + +#[tokio::test] +async fn mark_completed_from_running_succeeds() { + //* Given + let (_db, conn) = setup_test_db().await; + let worker_id = WorkerNodeId::from_ref_unchecked(TEST_WORKER_ID); + let job_desc = raw_descriptor(&serde_json::json!({"test": "completed"})); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; + job_status::mark_running(&conn, job_id) + .await + .expect("Failed to mark running"); + + //* When + job_status::mark_completed(&conn, job_id) + .await + .expect("Failed to mark completed"); + + //* Then + let job = jobs::get_by_id(&conn, job_id) + .await + .expect("Failed to get job") + .expect("Job should exist"); + assert_eq!(job.status, JobStatus::Completed); +} + +#[tokio::test] +async fn mark_completed_from_scheduled_returns_state_conflict() { + //* Given + let (_db, conn) = setup_test_db().await; + let worker_id = WorkerNodeId::from_ref_unchecked(TEST_WORKER_ID); + let job_desc = raw_descriptor(&serde_json::json!({"test": "completed_conflict"})); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; + + //* When + let result = job_status::mark_completed(&conn, job_id).await; + + //* Then + let err = result.expect_err("Should return error"); + assert!( + matches!( + err, + crate::error::Error::JobStatusUpdate( + job_status::JobStatusUpdateError::StateConflict { .. } + ) + ), + "Expected StateConflict, got: {err:?}" + ); +} + +#[tokio::test] +async fn request_stop_from_scheduled_succeeds() { + //* Given + let (_db, conn) = setup_test_db().await; + let worker_id = WorkerNodeId::from_ref_unchecked(TEST_WORKER_ID); + let job_desc = raw_descriptor(&serde_json::json!({"test": "stop_scheduled"})); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; + + //* When + let changed = job_status::request_stop(&conn, job_id) + .await + .expect("Failed to request stop"); + + //* Then + assert!(changed, "Status should have changed"); + let job = jobs::get_by_id(&conn, job_id) + .await + .expect("Failed to get job") + .expect("Job should exist"); + assert_eq!(job.status, JobStatus::StopRequested); +} + +#[tokio::test] +async fn request_stop_from_running_succeeds() { + //* Given + let (_db, conn) = setup_test_db().await; + let worker_id = WorkerNodeId::from_ref_unchecked(TEST_WORKER_ID); + let job_desc = raw_descriptor(&serde_json::json!({"test": "stop_running"})); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; + job_status::mark_running(&conn, job_id) + .await + .expect("Failed to mark running"); + + //* When + let changed = job_status::request_stop(&conn, job_id) + .await + .expect("Failed to request stop"); + + //* Then + assert!(changed, "Status should have changed"); + let job = jobs::get_by_id(&conn, job_id) + .await + .expect("Failed to get job") + .expect("Job should exist"); + assert_eq!(job.status, JobStatus::StopRequested); +} + +#[tokio::test] +async fn request_stop_is_idempotent_when_already_stop_requested() { + //* Given + let (_db, conn) = setup_test_db().await; + let worker_id = WorkerNodeId::from_ref_unchecked(TEST_WORKER_ID); + let job_desc = raw_descriptor(&serde_json::json!({"test": "stop_idempotent"})); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; + let first = job_status::request_stop(&conn, job_id) + .await + .expect("Failed to request stop"); + assert!(first, "First call should indicate status changed"); + + //* When — second stop request should succeed (idempotent) + let second = job_status::request_stop(&conn, job_id) + .await + .expect("Repeated stop request should be idempotent"); + + //* Then + assert!(!second, "Second call should indicate no status change"); +} + +#[tokio::test] +async fn request_stop_from_stopping_returns_false() { + //* Given + let (_db, conn) = setup_test_db().await; + let worker_id = WorkerNodeId::from_ref_unchecked(TEST_WORKER_ID); + let job_desc = raw_descriptor(&serde_json::json!({"test": "stop_from_stopping"})); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; + job_status::request_stop(&conn, job_id) + .await + .expect("Failed to request stop"); + job_status::mark_stopping(&conn, job_id) + .await + .expect("Failed to mark stopping"); + + //* When + let changed = job_status::request_stop(&conn, job_id) + .await + .expect("request_stop from Stopping should not error"); + + //* Then + assert!(!changed, "Should indicate no status change from Stopping"); +} + +#[tokio::test] +async fn request_stop_from_completed_returns_error() { + //* Given + let (_db, conn) = setup_test_db().await; + let worker_id = WorkerNodeId::from_ref_unchecked(TEST_WORKER_ID); + let job_desc = raw_descriptor(&serde_json::json!({"test": "stop_completed"})); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; + job_status::mark_running(&conn, job_id) + .await + .expect("Failed to mark running"); + job_status::mark_completed(&conn, job_id) + .await + .expect("Failed to mark completed"); + + //* When + let result = job_status::request_stop(&conn, job_id).await; + + //* Then + assert!(result.is_err(), "Should not stop a completed job"); +} + +#[tokio::test] +async fn full_stop_lifecycle() { + //* Given + let (_db, conn) = setup_test_db().await; + let worker_id = WorkerNodeId::from_ref_unchecked(TEST_WORKER_ID); + let job_desc = raw_descriptor(&serde_json::json!({"test": "lifecycle"})); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; + + //* When — walk through the full stop lifecycle + job_status::mark_running(&conn, job_id) + .await + .expect("Scheduled → Running"); + + job_status::request_stop(&conn, job_id) + .await + .expect("Running → StopRequested"); + + job_status::mark_stopping(&conn, job_id) + .await + .expect("StopRequested → Stopping"); + + job_status::mark_stopped(&conn, job_id) + .await + .expect("Stopping → Stopped"); + + //* Then + let job = jobs::get_by_id(&conn, job_id) + .await + .expect("Failed to get job") + .expect("Job should exist"); + assert_eq!(job.status, JobStatus::Stopped); +} + +#[tokio::test] +async fn mark_stopping_from_running_returns_state_conflict() { + //* Given + let (_db, conn) = setup_test_db().await; + let worker_id = WorkerNodeId::from_ref_unchecked(TEST_WORKER_ID); + let job_desc = raw_descriptor(&serde_json::json!({"test": "stopping_conflict"})); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; + job_status::mark_running(&conn, job_id) + .await + .expect("Failed to mark running"); + + //* When — mark_stopping requires StopRequested, not Running + let result = job_status::mark_stopping(&conn, job_id).await; + + //* Then + assert!(result.is_err(), "Should not transition Running → Stopping"); +} + +#[tokio::test] +async fn mark_stopped_from_scheduled_returns_state_conflict() { + //* Given + let (_db, conn) = setup_test_db().await; + let worker_id = WorkerNodeId::from_ref_unchecked(TEST_WORKER_ID); + let job_desc = raw_descriptor(&serde_json::json!({"test": "stopped_conflict"})); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; + + //* When — mark_stopped requires Stopping, not Scheduled + let result = job_status::mark_stopped(&conn, job_id).await; + + //* Then + assert!(result.is_err(), "Should not transition Scheduled → Stopped"); +} + +#[tokio::test] +async fn mark_failed_recoverable_from_running_succeeds() { + //* Given + let (_db, conn) = setup_test_db().await; + let worker_id = WorkerNodeId::from_ref_unchecked(TEST_WORKER_ID); + let job_desc = raw_descriptor(&serde_json::json!({"test": "error_running"})); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; + job_status::mark_running(&conn, job_id) + .await + .expect("Failed to mark running"); + + //* When + job_status::mark_failed_recoverable(&conn, job_id) + .await + .expect("Failed to mark error"); + + //* Then + let job = jobs::get_by_id(&conn, job_id) + .await + .expect("Failed to get job") + .expect("Job should exist"); + assert_eq!(job.status, JobStatus::Error); +} + +#[tokio::test] +async fn mark_failed_recoverable_from_scheduled_succeeds() { + //* Given + let (_db, conn) = setup_test_db().await; + let worker_id = WorkerNodeId::from_ref_unchecked(TEST_WORKER_ID); + let job_desc = raw_descriptor(&serde_json::json!({"test": "error_scheduled"})); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; + + //* When + job_status::mark_failed_recoverable(&conn, job_id) + .await + .expect("Failed to mark error"); + + //* Then + let job = jobs::get_by_id(&conn, job_id) + .await + .expect("Failed to get job") + .expect("Job should exist"); + assert_eq!(job.status, JobStatus::Error); +} + +#[tokio::test] +async fn mark_failed_fatal_from_running_succeeds() { + //* Given + let (_db, conn) = setup_test_db().await; + let worker_id = WorkerNodeId::from_ref_unchecked(TEST_WORKER_ID); + let job_desc = raw_descriptor(&serde_json::json!({"test": "fatal_running"})); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; + job_status::mark_running(&conn, job_id) + .await + .expect("Failed to mark running"); + + //* When + job_status::mark_failed_fatal(&conn, job_id) + .await + .expect("Failed to mark fatal"); + + //* Then + let job = jobs::get_by_id(&conn, job_id) + .await + .expect("Failed to get job") + .expect("Job should exist"); + assert_eq!(job.status, JobStatus::Fatal); +} + +#[tokio::test] +async fn mark_failed_fatal_from_completed_returns_state_conflict() { + //* Given + let (_db, conn) = setup_test_db().await; + let worker_id = WorkerNodeId::from_ref_unchecked(TEST_WORKER_ID); + let job_desc = raw_descriptor(&serde_json::json!({"test": "fatal_completed"})); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; + job_status::mark_running(&conn, job_id) + .await + .expect("Failed to mark running"); + job_status::mark_completed(&conn, job_id) + .await + .expect("Failed to mark completed"); + + //* When + let result = job_status::mark_failed_fatal(&conn, job_id).await; + + //* Then + assert!(result.is_err(), "Should not mark completed job as fatal"); +} + +#[tokio::test] +async fn reschedule_updates_status_and_worker() { + //* Given + let (_db, conn) = setup_test_db().await; + let worker_id = WorkerNodeId::from_ref_unchecked(TEST_WORKER_ID); + + let new_worker_id = WorkerNodeId::from_ref_unchecked("worker-2"); + workers::register(&conn, new_worker_id.clone(), WorkerInfo::default()) + .await + .expect("Failed to register worker-2"); + + let job_desc = raw_descriptor(&serde_json::json!({"test": "reschedule"})); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; + + // Move to Error state so reschedule makes sense + job_status::mark_failed_recoverable(&conn, job_id) + .await + .expect("Failed to mark error"); + + //* When + job_status::reschedule(&conn, job_id, &new_worker_id) + .await + .expect("Failed to reschedule"); + + //* Then + let job = jobs::get_by_id(&conn, job_id) + .await + .expect("Failed to get job") + .expect("Job should exist"); + assert_eq!(job.status, JobStatus::Scheduled); + assert_eq!(job.node_id.as_str(), "worker-2"); +} diff --git a/crates/core/metadata-db/src/jobs.rs b/crates/core/metadata-db/src/jobs.rs index 25c6bfb4e..448ddf945 100644 --- a/crates/core/metadata-db/src/jobs.rs +++ b/crates/core/metadata-db/src/jobs.rs @@ -7,24 +7,27 @@ use sqlx::types::chrono::{DateTime, Utc}; mod job_descriptor; mod job_id; -mod job_status; pub(crate) mod sql; pub use self::{ job_descriptor::{JobDescriptorRaw, JobDescriptorRawOwned}, job_id::JobId, - job_status::JobStatus, sql::JobWithRetryInfo, }; +pub use crate::job_status::JobStatus; use crate::{ db::Executor, error::Error, - job_events, + job_events, job_status, manifests::ManifestHash, workers::{WorkerNodeId, WorkerNodeIdOwned}, }; -/// Register a job in the queue with the default status (Scheduled) +/// Register a job in the queue +/// +/// Creates the job record with its descriptor. The caller is responsible for +/// separately registering the status via [`job_status::register`] and the event +/// via [`job_events::register`]. /// /// **Note:** This function does not send notifications. The caller is responsible for /// calling `send_job_notification` after successful job registration if worker notification @@ -39,50 +42,11 @@ where E: Executor<'c>, { let job_desc = job_desc.into(); - sql::insert_with_default_status(exe, node_id.into(), &job_desc) + sql::insert(exe, &node_id.into(), &job_desc) .await .map_err(Error::Database) } -/// Update job status to StopRequested -/// -/// This function will only update the job status if it's currently in a valid state -/// to be stopped (Scheduled or Running). If the job is already stopping, this is -/// considered success (idempotent behavior). If the job is in a terminal state -/// (Stopped, Completed, Failed), this returns a conflict error. -/// -/// Returns an error if the job doesn't exist, is in a terminal state, or if there's a database error. -/// -/// **Note:** This function does not send notifications. The caller is responsible for -/// calling `send_job_notification` after successful status update if worker notification -/// is required. -#[tracing::instrument(skip(exe), err)] -pub async fn request_stop<'c, E>( - exe: E, - job_id: impl Into + std::fmt::Debug, -) -> Result<(), Error> -where - E: Executor<'c>, -{ - // Try to update job status - match sql::update_status_if_any_state( - exe, - job_id.into(), - &[JobStatus::Running, JobStatus::Scheduled], - JobStatus::StopRequested, - ) - .await - { - Ok(()) => Ok(()), - // Check if the job is already stopping (idempotent behavior) - Err(JobStatusUpdateError::StateConflict { - actual: JobStatus::StopRequested | JobStatus::Stopping, - .. - }) => Ok(()), - Err(err) => Err(Error::JobStatusUpdate(err)), - } -} - /// List jobs with cursor-based pagination support, optionally filtered by status /// /// Uses cursor-based pagination where `last_job_id` is the ID of the last job @@ -215,122 +179,6 @@ where .map_err(Error::Database) } -/// Conditionally marks a job as `RUNNING` only if it's currently `SCHEDULED` -/// -/// This provides idempotent behavior - if the job is already running, completed, or failed, -/// the appropriate error will be returned indicating the state conflict. -#[tracing::instrument(skip(exe), err)] -pub async fn mark_running<'c, E>( - exe: E, - id: impl Into + std::fmt::Debug, -) -> Result<(), Error> -where - E: Executor<'c>, -{ - sql::update_status_if_any_state(exe, id.into(), &[JobStatus::Scheduled], JobStatus::Running) - .await - .map_err(Error::JobStatusUpdate) -} - -/// Conditionally marks a job as `STOPPING` only if it's currently `STOP_REQUESTED` -/// -/// This is typically used by workers to acknowledge a stop request. -#[tracing::instrument(skip(exe), err)] -pub async fn mark_stopping<'c, E>( - exe: E, - id: impl Into + std::fmt::Debug, -) -> Result<(), Error> -where - E: Executor<'c>, -{ - sql::update_status_if_any_state( - exe, - id.into(), - &[JobStatus::StopRequested], - JobStatus::Stopping, - ) - .await - .map_err(Error::JobStatusUpdate) -} - -/// Conditionally marks a job as `STOPPED` only if it's currently `STOPPING` -/// -/// This provides proper state transition from stopping to stopped. -#[tracing::instrument(skip(exe), err)] -pub async fn mark_stopped<'c, E>( - exe: E, - id: impl Into + std::fmt::Debug, -) -> Result<(), Error> -where - E: Executor<'c>, -{ - sql::update_status_if_any_state(exe, id.into(), &[JobStatus::Stopping], JobStatus::Stopped) - .await - .map_err(Error::JobStatusUpdate) -} - -/// Conditionally marks a job as `COMPLETED` only if it's currently `RUNNING` -/// -/// This ensures jobs can only be completed from a running state. -#[tracing::instrument(skip(exe), err)] -pub async fn mark_completed<'c, E>( - exe: E, - id: impl Into + std::fmt::Debug, -) -> Result<(), Error> -where - E: Executor<'c>, -{ - sql::update_status_if_any_state(exe, id.into(), &[JobStatus::Running], JobStatus::Completed) - .await - .map_err(Error::JobStatusUpdate) -} - -/// Conditionally marks a job as `ERROR` from either `RUNNING` or `SCHEDULED` states -/// -/// This is used for recoverable failures where retry attempts can be made. -/// -/// Jobs can fail from either scheduled (startup failure) or running (runtime failure) states. -#[tracing::instrument(skip(exe), err)] -pub async fn mark_failed_recoverable<'c, E>( - exe: E, - id: impl Into + std::fmt::Debug, -) -> Result<(), Error> -where - E: Executor<'c>, -{ - sql::update_status_if_any_state( - exe, - id.into(), - &[JobStatus::Scheduled, JobStatus::Running], - JobStatus::Error, - ) - .await - .map_err(Error::JobStatusUpdate) -} - -/// Conditionally marks a job as `FATAL` from either `RUNNING` or `SCHEDULED` states -/// -/// This is used for unrecoverable failures where retry attempts should not be made. -/// -/// Jobs can fail from either scheduled (startup failure) or running (runtime failure) states. -#[tracing::instrument(skip(exe), err)] -pub async fn mark_failed_fatal<'c, E>( - exe: E, - id: impl Into + std::fmt::Debug, -) -> Result<(), Error> -where - E: Executor<'c>, -{ - sql::update_status_if_any_state( - exe, - id.into(), - &[JobStatus::Scheduled, JobStatus::Running], - JobStatus::Fatal, - ) - .await - .map_err(Error::JobStatusUpdate) -} - /// Delete a job by ID if it's in a terminal state /// /// This function will only delete the job if it exists and is in a terminal state @@ -384,9 +232,9 @@ where /// Reschedule a failed job for retry with atomically tracked attempt /// -/// This function performs two operations using the provided transaction: -/// 1. Updates job status to SCHEDULED and assigns to the given worker node -/// 2. Inserts job attempt record with given retry_index +/// This function performs three operations using the provided transaction: +/// 1. Updates job status to SCHEDULED in `jobs_status` and assigns to the given worker node +/// 2. Inserts a SCHEDULED event into `job_events` /// /// If any operation fails, the error is returned and no further operations are attempted. /// The caller is responsible for transaction management (commit/rollback). @@ -407,35 +255,15 @@ pub async fn reschedule( let job_id = job_id.into(); let new_node_id = new_node_id.into(); - // Update job status to SCHEDULED and assign to worker - sql::reschedule(&mut *tx, job_id, &new_node_id) - .await - .map_err(Error::Database)?; + // Update job status to SCHEDULED in jobs_status and assign to worker + job_status::reschedule(&mut *tx, job_id, &new_node_id).await?; // Insert job event record - job_events::sql::insert(&mut *tx, job_id, &new_node_id, JobStatus::Scheduled) - .await - .map_err(Error::Database)?; + job_events::register(&mut *tx, job_id, &new_node_id, JobStatus::Scheduled).await?; Ok(()) } -/// Error type for conditional job status updates -#[derive(Debug, thiserror::Error)] -pub enum JobStatusUpdateError { - #[error("Job not found")] - NotFound, - - #[error("Job state conflict: expected one of {expected:?}, but found {actual}")] - StateConflict { - expected: Vec, - actual: JobStatus, - }, - - #[error("Database error: {0}")] - Database(#[source] sqlx::Error), -} - /// Represents a job with its metadata and associated node. #[derive(Debug, Clone, sqlx::FromRow)] pub struct Job { diff --git a/crates/core/metadata-db/src/jobs/job_status.rs b/crates/core/metadata-db/src/jobs/job_status.rs deleted file mode 100644 index 1c40e1481..000000000 --- a/crates/core/metadata-db/src/jobs/job_status.rs +++ /dev/null @@ -1,181 +0,0 @@ -//! Job status enumeration and related implementations - -/// Represents the current status of a job -/// -/// This status is used to track the progress of a job, but it is not -/// guaranteed to be up to date. It is responsibility of the caller to -/// confirm the status of the job before proceeding. -/// -/// The status is stored as a `TEXT` column in the database. If the fetched -/// status is not one of the valid values in the enum, the `UNKNOWN` status is -/// returned. -#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub enum JobStatus { - /// Job is being scheduled. - /// - /// This is the initial state of a job. - /// - /// The scheduler has added the job to the queue, but the job has not - /// yet been picked up by the worker node. - #[default] - Scheduled, - - /// Job is running - /// - /// The job has been picked up by the worker node and is being executed. - Running, - - /// Job has finished successfully - /// - /// This is a terminal state. - Completed, - - /// Job has stopped - /// - /// The worker node has stopped the job as requested by the scheduler. - /// - /// This is a terminal state. - Stopped, - - /// Job has been requested to stop - /// - /// The scheduler has requested the job to stop. The worker will stop - /// the job as soon as possible. - StopRequested, - - /// Job is stopping - /// - /// The worker node acknowledged the stop request and will stop the job - /// as soon as possible. - Stopping, - - /// Job has failed with a recoverable error - /// - /// A recoverable error occurred while running the job. The job may succeed if retried. - /// - /// This is a terminal state. - Error, - - /// Job has failed with a fatal error - /// - /// A fatal error occurred while running the job. The job will not succeed if retried. - /// - /// This is a terminal state. - Fatal, - - /// Unknown status - /// - /// This is an invalid status, and should never happen. Although - /// it is possible to happen if the worker node version is different - /// from the version of the scheduler. - Unknown, -} - -impl JobStatus { - /// Convert the [`JobStatus`] to a string - pub fn as_str(&self) -> &'static str { - match self { - Self::Scheduled => "SCHEDULED", - Self::Running => "RUNNING", - Self::Completed => "COMPLETED", - Self::Stopped => "STOPPED", - Self::StopRequested => "STOP_REQUESTED", - Self::Stopping => "STOPPING", - Self::Error => "ERROR", - Self::Fatal => "FATAL", - Self::Unknown => "UNKNOWN", - } - } - - /// Returns true if the job status is terminal (cannot be changed further) - /// - /// Terminal states are final states where the job lifecycle has ended: - /// - `Completed`: Job finished successfully - /// - `Stopped`: Job was stopped by request - /// - `Failed`: Job encountered an error and failed - /// - /// Non-terminal states can still transition to other states - pub fn is_terminal(&self) -> bool { - matches!( - self, - Self::Completed | Self::Stopped | Self::Error | Self::Fatal - ) - } - - /// Returns an array of all terminal job statuses - /// - /// These are the statuses that represent completed job lifecycles - /// and can be safely deleted from the system. - pub fn terminal_statuses() -> [JobStatus; 4] { - [Self::Completed, Self::Stopped, Self::Error, Self::Fatal] - } - - /// Returns an array of all non-terminal (active) job statuses - /// - /// These are the statuses that represent jobs still in progress - /// and should be monitored or managed. - pub fn non_terminal_statuses() -> [JobStatus; 4] { - [ - Self::Scheduled, - Self::Running, - Self::StopRequested, - Self::Stopping, - ] - } -} - -impl std::str::FromStr for JobStatus { - type Err = std::convert::Infallible; - - fn from_str(s: &str) -> Result { - // Use `eq_ignore_ascii_case` to make the comparison case-insensitive - match s { - s if s.eq_ignore_ascii_case("SCHEDULED") => Ok(Self::Scheduled), - s if s.eq_ignore_ascii_case("RUNNING") => Ok(Self::Running), - s if s.eq_ignore_ascii_case("COMPLETED") => Ok(Self::Completed), - s if s.eq_ignore_ascii_case("STOPPED") => Ok(Self::Stopped), - s if s.eq_ignore_ascii_case("STOP_REQUESTED") => Ok(Self::StopRequested), - s if s.eq_ignore_ascii_case("STOPPING") => Ok(Self::Stopping), - s if s.eq_ignore_ascii_case("ERROR") => Ok(Self::Error), - s if s.eq_ignore_ascii_case("FATAL") => Ok(Self::Fatal), - _ => Ok(Self::Unknown), // Default to Unknown for Infallible - } - } -} - -impl std::fmt::Display for JobStatus { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(self.as_str()) - } -} - -impl sqlx::Type for JobStatus { - fn type_info() -> sqlx::postgres::PgTypeInfo { - sqlx::postgres::PgTypeInfo::with_name("TEXT") - } -} - -impl sqlx::postgres::PgHasArrayType for JobStatus { - fn array_type_info() -> sqlx::postgres::PgTypeInfo { - sqlx::postgres::PgTypeInfo::with_name("TEXT[]") - } -} - -impl<'r> sqlx::Decode<'r, sqlx::Postgres> for JobStatus { - fn decode( - value: ::ValueRef<'r>, - ) -> Result { - let value: &str = sqlx::Decode::::decode(value)?; - // Since FromStr::Err is Infallible, unwrap is safe. - Ok(value.parse().unwrap()) - } -} - -impl<'q> sqlx::Encode<'q, sqlx::Postgres> for JobStatus { - fn encode_by_ref( - &self, - buf: &mut ::ArgumentBuffer<'q>, - ) -> Result { - sqlx::Encode::::encode_by_ref(&self.as_str(), buf) - } -} diff --git a/crates/core/metadata-db/src/jobs/sql.rs b/crates/core/metadata-db/src/jobs/sql.rs index 44bbe9e58..168b21aa0 100644 --- a/crates/core/metadata-db/src/jobs/sql.rs +++ b/crates/core/metadata-db/src/jobs/sql.rs @@ -2,12 +2,10 @@ use sqlx::{Executor, Postgres}; -use super::{ - Job, JobStatusUpdateError, job_descriptor::JobDescriptorRaw, job_id::JobId, - job_status::JobStatus, -}; +use super::{Job, job_descriptor::JobDescriptorRaw, job_id::JobId}; use crate::{ datasets::{DatasetName, DatasetNamespace}, + job_status::JobStatus, manifests::ManifestHash, workers::WorkerNodeId, }; @@ -31,118 +29,40 @@ pub struct JobWithRetryInfo { /// Insert a new job into the queue /// -/// The job will be assigned to the given worker node with the specified status. +/// Creates the job record with its descriptor and timestamp. The job's status +/// and worker assignment are stored in the `jobs_status` projection table via +/// a separate insert. pub async fn insert<'c, E>( exe: E, - node_id: WorkerNodeId<'_>, + node_id: &WorkerNodeId<'_>, descriptor: &JobDescriptorRaw<'_>, - status: JobStatus, ) -> Result where E: Executor<'c, Database = Postgres>, { let query = indoc::indoc! {r#" - INSERT INTO jobs (node_id, descriptor, status, created_at, updated_at) - VALUES ($1, $2, $3, (timezone('UTC', now())), (timezone('UTC', now()))) + INSERT INTO jobs (node_id, descriptor, created_at) + VALUES ($1, $2, (timezone('UTC', now()))) RETURNING id "#}; let res = sqlx::query_scalar(query) - .bind(&node_id) + .bind(node_id) .bind(descriptor) - .bind(status) .fetch_one(exe) .await?; Ok(res) } -/// Insert a new job into the queue with the default status -/// -/// The job will be assigned to the given worker node with the default status (Scheduled). -#[inline] -pub async fn insert_with_default_status<'c, E>( - exe: E, - node_id: WorkerNodeId<'_>, - descriptor: &JobDescriptorRaw<'_>, -) -> Result -where - E: Executor<'c, Database = Postgres>, -{ - insert(exe, node_id, descriptor, JobStatus::default()).await -} - -/// Update the status of a job with multiple possible expected original states -/// -/// This function will only update the job status if the job exists and currently has -/// one of the expected original statuses. If the job doesn't exist, returns `UpdateJobStatusError::NotFound`. -/// If the job exists but has a different status than any of the expected ones, returns `UpdateJobStatusError::StateConflict`. -pub async fn update_status_if_any_state<'c, E>( - exe: E, - id: JobId, - expected_statuses: &[JobStatus], - new_status: JobStatus, -) -> Result<(), JobStatusUpdateError> -where - E: Executor<'c, Database = Postgres>, -{ - /// Internal structure to hold the result of the update operation - #[derive(Debug, sqlx::FromRow)] - struct UpdateResult { - updated_id: Option, - original_status: Option, - } - - let query = indoc::indoc! {r#" - WITH target_job AS ( - SELECT id, status - FROM jobs - WHERE id = $1 - ), - target_job_update AS ( - UPDATE jobs - SET status = $3, updated_at = timezone('UTC', now()) - WHERE id = $1 AND status = ANY($2) - RETURNING id - ) - SELECT - target_job_update.id AS updated_id, - target_job.status AS original_status - FROM target_job - LEFT JOIN target_job_update ON target_job.id = target_job_update.id - "#}; - - let result: Option = sqlx::query_as(query) - .bind(id) - .bind(expected_statuses) - .bind(new_status) - .fetch_optional(exe) - .await - .map_err(JobStatusUpdateError::Database)?; - - match result { - Some(UpdateResult { - updated_id: Some(_), - .. - }) => Ok(()), - Some(UpdateResult { - updated_id: None, - original_status: Some(status), - }) => Err(JobStatusUpdateError::StateConflict { - expected: expected_statuses.to_vec(), - actual: status, - }), - _ => Err(JobStatusUpdateError::NotFound), - } -} - /// Get a job by its ID pub async fn get_by_id<'c, E>(exe: E, id: JobId) -> Result, sqlx::Error> where E: Executor<'c, Database = Postgres>, { let query = indoc::indoc! {r#" - SELECT id, node_id, status, descriptor, created_at, updated_at - FROM jobs - WHERE id = $1 + SELECT j.id, js.node_id, js.status, j.descriptor, j.created_at, js.updated_at + FROM jobs j + INNER JOIN jobs_status js ON j.id = js.job_id + WHERE j.id = $1 "#}; let res = sqlx::query_as(query).bind(id).fetch_optional(exe).await?; Ok(res) @@ -159,15 +79,16 @@ where { let query = indoc::indoc! {r#" SELECT - id, - node_id, - status, - descriptor, - created_at, - updated_at - FROM jobs - WHERE node_id = $1 AND status = ANY($2) - ORDER BY id ASC + j.id, + js.node_id, + js.status, + j.descriptor, + j.created_at, + js.updated_at + FROM jobs j + INNER JOIN jobs_status js ON j.id = js.job_id + WHERE js.node_id = $1 AND js.status = ANY($2) + ORDER BY j.id ASC "#}; let res = sqlx::query_as(query) .bind(node_id) @@ -192,12 +113,13 @@ where let query = indoc::indoc! {r#" SELECT DISTINCT j.id, - j.node_id, - j.status, + js.node_id, + js.status, j.descriptor, j.created_at, - j.updated_at + js.updated_at FROM jobs j + INNER JOIN jobs_status js ON j.id = js.job_id INNER JOIN physical_table_revisions ptr ON j.id = ptr.writer INNER JOIN physical_tables pt ON pt.active_revision_id = ptr.id WHERE pt.manifest_hash = $1 @@ -224,17 +146,18 @@ where { let query = indoc::indoc! {r#" SELECT - id, - node_id, - status, - descriptor, - created_at, - updated_at - FROM jobs - WHERE descriptor->>'dataset_namespace' = $1 - AND descriptor->>'dataset_name' = $2 - AND descriptor->>'manifest_hash' = $3 - ORDER BY id ASC + j.id, + js.node_id, + js.status, + j.descriptor, + j.created_at, + js.updated_at + FROM jobs j + INNER JOIN jobs_status js ON j.id = js.job_id + WHERE j.descriptor->>'dataset_namespace' = $1 + AND j.descriptor->>'dataset_name' = $2 + AND j.descriptor->>'manifest_hash' = $3 + ORDER BY j.id ASC "#}; let res = sqlx::query_as(query) .bind(&dataset_namespace) @@ -248,6 +171,7 @@ where /// Delete a job by ID if it matches any of the specified statuses /// /// This function will only delete the job if it exists and is in one of the specified statuses. +/// The `jobs_status` row is deleted automatically via ON DELETE CASCADE. /// Returns true if a job was deleted, false otherwise. pub async fn delete_by_id_and_statuses<'c, E, const N: usize>( exe: E, @@ -259,7 +183,9 @@ where { let query = indoc::indoc! {r#" DELETE FROM jobs - WHERE id = $1 AND status = ANY($2) + WHERE id = $1 AND id IN ( + SELECT job_id FROM jobs_status WHERE status = ANY($2) + ) "#}; let result = sqlx::query(query) @@ -274,6 +200,7 @@ where /// Delete all jobs that match any of the specified statuses /// /// This function deletes all jobs that are in one of the specified statuses. +/// The `jobs_status` rows are deleted automatically via ON DELETE CASCADE. /// Returns the number of jobs that were deleted. pub async fn delete_by_status<'c, E, const N: usize>( exe: E, @@ -284,7 +211,9 @@ where { let query = indoc::indoc! {r#" DELETE FROM jobs - WHERE status = ANY($1) + WHERE id IN ( + SELECT job_id FROM jobs_status WHERE status = ANY($1) + ) "#}; let result = sqlx::query(query).bind(statuses).execute(exe).await?; @@ -309,14 +238,15 @@ where None => { let query = indoc::indoc! {r#" SELECT - id, - node_id, - status, - descriptor, - created_at, - updated_at - FROM jobs - ORDER BY id DESC + j.id, + js.node_id, + js.status, + j.descriptor, + j.created_at, + js.updated_at + FROM jobs j + INNER JOIN jobs_status js ON j.id = js.job_id + ORDER BY j.id DESC LIMIT $1 "#}; @@ -325,15 +255,16 @@ where Some(statuses) => { let query = indoc::indoc! {r#" SELECT - id, - node_id, - status, - descriptor, - created_at, - updated_at - FROM jobs - WHERE status = ANY($2) - ORDER BY id DESC + j.id, + js.node_id, + js.status, + j.descriptor, + j.created_at, + js.updated_at + FROM jobs j + INNER JOIN jobs_status js ON j.id = js.job_id + WHERE js.status = ANY($2) + ORDER BY j.id DESC LIMIT $1 "#}; @@ -365,15 +296,16 @@ where None => { let query = indoc::indoc! {r#" SELECT - id, - node_id, - status, - descriptor, - created_at, - updated_at - FROM jobs - WHERE id < $2 - ORDER BY id DESC + j.id, + js.node_id, + js.status, + j.descriptor, + j.created_at, + js.updated_at + FROM jobs j + INNER JOIN jobs_status js ON j.id = js.job_id + WHERE j.id < $2 + ORDER BY j.id DESC LIMIT $1 "#}; @@ -386,15 +318,16 @@ where Some(statuses) => { let query = indoc::indoc! {r#" SELECT - id, - node_id, - status, - descriptor, - created_at, - updated_at - FROM jobs - WHERE id < $2 AND status = ANY($3) - ORDER BY id DESC + j.id, + js.node_id, + js.status, + j.descriptor, + j.created_at, + js.updated_at + FROM jobs j + INNER JOIN jobs_status js ON j.id = js.job_id + WHERE j.id < $2 AND js.status = ANY($3) + ORDER BY j.id DESC LIMIT $1 "#}; @@ -425,17 +358,18 @@ where let query = indoc::indoc! {r#" SELECT j.id, - j.node_id, - j.status, + js.node_id, + js.status, j.descriptor, j.created_at, - j.updated_at, + js.updated_at, COUNT(je.id)::int4 AS next_retry_index FROM jobs j + INNER JOIN jobs_status js ON j.id = js.job_id LEFT JOIN job_events je ON j.id = je.job_id AND je.event_type = $1 - WHERE j.status = $2 - GROUP BY j.id, j.node_id, j.status, j.descriptor, j.created_at, j.updated_at - HAVING j.updated_at + INTERVAL '1 second' * POW(2, COUNT(je.id))::bigint + WHERE js.status = $2 + GROUP BY j.id, js.node_id, js.status, j.descriptor, j.created_at, js.updated_at + HAVING js.updated_at + INTERVAL '1 second' * POW(2, COUNT(je.id))::bigint <= timezone('UTC', now()) ORDER BY j.id ASC "#}; @@ -446,40 +380,3 @@ where .fetch_all(exe) .await } - -/// Reschedule a failed job for retry -/// -/// This function updates the job status to SCHEDULED and assigns it to a worker node. -/// 1. Sets status to SCHEDULED -/// 2. Assigns the job to the specified worker node -/// 3. Updates the updated_at timestamp -/// -/// Note: Retry tracking is handled via the job_events table. -/// -/// Returns an error if the job doesn't exist or if the database operation fails. -pub async fn reschedule<'c, E>( - exe: E, - job_id: JobId, - new_node_id: &WorkerNodeId<'_>, -) -> Result<(), sqlx::Error> -where - E: Executor<'c, Database = Postgres>, -{ - let query = indoc::indoc! {r#" - UPDATE jobs - SET - status = $3, - node_id = $2, - updated_at = timezone('UTC', now()) - WHERE id = $1 - "#}; - - sqlx::query(query) - .bind(job_id) - .bind(new_node_id) - .bind(JobStatus::Scheduled) - .execute(exe) - .await?; - - Ok(()) -} diff --git a/crates/core/metadata-db/src/jobs/tests/it_jobs.rs b/crates/core/metadata-db/src/jobs/tests/it_jobs.rs index b711eeb74..9a40e57b6 100644 --- a/crates/core/metadata-db/src/jobs/tests/it_jobs.rs +++ b/crates/core/metadata-db/src/jobs/tests/it_jobs.rs @@ -4,8 +4,9 @@ use pgtemp::PgTempDB; use crate::{ config::DEFAULT_POOL_MAX_CONNECTIONS, - job_events, - jobs::{self, JobDescriptorRaw, JobStatus}, + job_events, job_status, + jobs::{self, JobStatus}, + tests::common::{raw_descriptor, register_job}, workers::{self, WorkerInfo, WorkerNodeId}, }; @@ -33,9 +34,7 @@ async fn register_job_creates_with_scheduled_status() { let job_desc = raw_descriptor(&job_desc_json); //* When - let job_id = jobs::sql::insert_with_default_status(&conn, worker_id.clone(), &job_desc) - .await - .expect("Failed to schedule job"); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; //* Then let job = jobs::sql::get_by_id(&conn, job_id) @@ -72,35 +71,14 @@ async fn get_jobs_for_node_filters_by_node_id() { // Register jobs let job_desc1 = raw_descriptor(&serde_json::json!({ "job": 1 })); - let job_id1 = jobs::sql::insert( - &conn, - worker_id_main.clone(), - &job_desc1, - JobStatus::default(), - ) - .await - .expect("Failed to register job 1"); + let job_id1 = register_job(&conn, &job_desc1, &worker_id_main, None).await; let job_desc2 = raw_descriptor(&serde_json::json!({ "job": 2 })); - let job_id2 = jobs::sql::insert( - &conn, - worker_id_main.clone(), - &job_desc2, - JobStatus::default(), - ) - .await - .expect("Failed to register job 2"); + let job_id2 = register_job(&conn, &job_desc2, &worker_id_main, None).await; // Register a job for a different worker to ensure it's not retrieved let job_desc_other = raw_descriptor(&serde_json::json!({ "job": "other" })); - let job_id_other = jobs::sql::insert( - &conn, - worker_id_other.clone(), - &job_desc_other, - JobStatus::default(), - ) - .await - .expect("Failed to register job for other worker"); + let job_id_other = register_job(&conn, &job_desc_other, &worker_id_other, None).await; //* When let jobs_list = jobs::sql::get_by_node_id_and_statuses( @@ -150,35 +128,19 @@ async fn get_jobs_for_node_filters_by_status() { // Active jobs let job_id_scheduled = - jobs::sql::insert(&conn, worker_id.clone(), &job_desc, JobStatus::Scheduled) - .await - .expect("Failed to register job_id_scheduled"); + register_job(&conn, &job_desc, &worker_id, Some(JobStatus::Scheduled)).await; - let job_id_running = jobs::sql::insert(&conn, worker_id.clone(), &job_desc, JobStatus::Running) - .await - .expect("Failed to register job_id_running"); + let job_id_running = register_job(&conn, &job_desc, &worker_id, Some(JobStatus::Running)).await; // Terminal state jobs (should not be retrieved) - jobs::sql::insert(&conn, worker_id.clone(), &job_desc, JobStatus::Completed) - .await - .expect("Failed to register completed job"); + register_job(&conn, &job_desc, &worker_id, Some(JobStatus::Completed)).await; - jobs::sql::insert(&conn, worker_id.clone(), &job_desc, JobStatus::Error) - .await - .expect("Failed to register failed (recoverable) job"); + register_job(&conn, &job_desc, &worker_id, Some(JobStatus::Error)).await; - let job_id_stop_requested = jobs::sql::insert( - &conn, - worker_id.clone(), - &job_desc, - JobStatus::StopRequested, - ) - .await - .expect("Failed to register job_id_stop_requested"); + let job_id_stop_requested = + register_job(&conn, &job_desc, &worker_id, Some(JobStatus::StopRequested)).await; - jobs::sql::insert(&conn, worker_id.clone(), &job_desc, JobStatus::Stopped) - .await - .expect("Failed to register stopped job"); + register_job(&conn, &job_desc, &worker_id, Some(JobStatus::Stopped)).await; //* When let active_jobs = jobs::sql::get_by_node_id_and_statuses( @@ -235,9 +197,7 @@ async fn get_job_by_id_returns_job() { }); let job_desc = raw_descriptor(&job_desc_json); - let job_id = jobs::sql::insert_with_default_status(&conn, worker_id.clone(), &job_desc) - .await - .expect("Failed to register job"); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; //* When let job = jobs::sql::get_by_id(&conn, job_id) @@ -275,9 +235,7 @@ async fn get_job_includes_timestamps() { }); let job_desc = raw_descriptor(&job_desc_json); - let job_id = jobs::sql::insert_with_default_status(&conn, worker_id.clone(), &job_desc) - .await - .expect("Failed to register job"); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; //* When let job = jobs::sql::get_by_id(&conn, job_id) @@ -336,9 +294,7 @@ async fn list_jobs_first_page_respects_limit() { "table": "test-table", })); - let job_id = jobs::sql::insert_with_default_status(&conn, worker_id.clone(), &job_desc) - .await - .expect("Failed to register job"); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; job_ids.push(job_id); // Small delay to ensure different timestamps @@ -383,9 +339,7 @@ async fn list_jobs_next_page_uses_cursor() { "table": "test-table", })); - let job_id = jobs::sql::insert_with_default_status(&conn, worker_id.clone(), &job_desc) - .await - .expect("Failed to register job"); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; all_job_ids.push(job_id); // Small delay to ensure different timestamps @@ -437,9 +391,7 @@ async fn delete_by_id_and_statuses_deletes_matching_job() { let job_desc = raw_descriptor(&serde_json::json!({"test": "job"})); - let job_id = jobs::sql::insert_with_default_status(&conn, worker_id.clone(), &job_desc) - .await - .expect("Failed to insert job"); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; //* When let deleted = jobs::sql::delete_by_id_and_statuses(&conn, job_id, [JobStatus::Scheduled]) @@ -472,9 +424,7 @@ async fn delete_by_id_and_statuses_does_not_delete_wrong_status() { let job_desc = raw_descriptor(&serde_json::json!({"test": "job"})); - let job_id = jobs::sql::insert(&conn, worker_id.clone(), &job_desc, JobStatus::Running) - .await - .expect("Failed to insert job"); + let job_id = register_job(&conn, &job_desc, &worker_id, Some(JobStatus::Running)).await; //* When let deleted = jobs::sql::delete_by_id_and_statuses(&conn, job_id, [JobStatus::Scheduled]) @@ -509,15 +459,9 @@ async fn delete_by_status_deletes_all_matching_jobs() { let job_desc = raw_descriptor(&serde_json::json!({"test": "job"})); // Create 3 jobs, 2 will be Completed, 1 will be Running - let job_id1 = jobs::sql::insert(&conn, worker_id.clone(), &job_desc, JobStatus::Completed) - .await - .expect("Failed to insert job 1"); - let job_id2 = jobs::sql::insert(&conn, worker_id.clone(), &job_desc, JobStatus::Completed) - .await - .expect("Failed to insert job 2"); - let job_id3 = jobs::sql::insert(&conn, worker_id.clone(), &job_desc, JobStatus::Running) - .await - .expect("Failed to insert job 3"); + let job_id1 = register_job(&conn, &job_desc, &worker_id, Some(JobStatus::Completed)).await; + let job_id2 = register_job(&conn, &job_desc, &worker_id, Some(JobStatus::Completed)).await; + let job_id3 = register_job(&conn, &job_desc, &worker_id, Some(JobStatus::Running)).await; //* When let deleted_count = jobs::sql::delete_by_status(&conn, [JobStatus::Completed]) @@ -567,18 +511,10 @@ async fn delete_by_statuses_deletes_jobs_with_any_matching_status() { let job_desc = raw_descriptor(&serde_json::json!({"test": "job"})); // Create 4 jobs with different statuses - let job_id1 = jobs::sql::insert(&conn, worker_id.clone(), &job_desc, JobStatus::Completed) - .await - .expect("Failed to insert job 1"); - let job_id2 = jobs::sql::insert(&conn, worker_id.clone(), &job_desc, JobStatus::Error) - .await - .expect("Failed to insert job 2"); - let job_id3 = jobs::sql::insert(&conn, worker_id.clone(), &job_desc, JobStatus::Stopped) - .await - .expect("Failed to insert job 3"); - let job_id4 = jobs::sql::insert(&conn, worker_id.clone(), &job_desc, JobStatus::Running) - .await - .expect("Failed to insert job 4"); + let job_id1 = register_job(&conn, &job_desc, &worker_id, Some(JobStatus::Completed)).await; + let job_id2 = register_job(&conn, &job_desc, &worker_id, Some(JobStatus::Error)).await; + let job_id3 = register_job(&conn, &job_desc, &worker_id, Some(JobStatus::Stopped)).await; + let job_id4 = register_job(&conn, &job_desc, &worker_id, Some(JobStatus::Running)).await; //* When let deleted_count = jobs::sql::delete_by_status( @@ -637,9 +573,7 @@ async fn get_failed_jobs_ready_for_retry_returns_eligible_jobs() { let job_desc = raw_descriptor(&serde_json::json!({"test": "job"})); // Create a failed (recoverable) job - let job_id = jobs::sql::insert(&conn, worker_id.clone(), &job_desc, JobStatus::Error) - .await - .expect("Failed to insert job"); + let job_id = register_job(&conn, &job_desc, &worker_id, Some(JobStatus::Error)).await; // Wait longer than initial backoff (1 second) tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; @@ -673,9 +607,7 @@ async fn get_failed_jobs_ready_for_retry_excludes_not_ready() { let job_desc = raw_descriptor(&serde_json::json!({"test": "job"})); // Create a failed (recoverable) job - jobs::sql::insert(&conn, worker_id.clone(), &job_desc, JobStatus::Error) - .await - .expect("Failed to insert job"); + register_job(&conn, &job_desc, &worker_id, Some(JobStatus::Error)).await; //* When (immediately check, before backoff expires) let ready_jobs = jobs::sql::get_failed_jobs_ready_for_retry(&conn) @@ -704,9 +636,7 @@ async fn get_failed_jobs_calculates_retry_index_from_attempts() { let job_desc = raw_descriptor(&serde_json::json!({"test": "job"})); // Create a failed (recoverable) job - let job_id = jobs::sql::insert(&conn, worker_id.clone(), &job_desc, JobStatus::Error) - .await - .expect("Failed to insert job"); + let job_id = register_job(&conn, &job_desc, &worker_id, Some(JobStatus::Error)).await; // Insert 3 SCHEDULED events (simulating initial schedule + 2 retries) for _ in 0..3 { @@ -747,9 +677,7 @@ async fn get_failed_jobs_handles_missing_attempts() { let job_desc = raw_descriptor(&serde_json::json!({"test": "job"})); // Create a failed (recoverable) job WITHOUT any SCHEDULED events (edge case) - let job_id = jobs::sql::insert(&conn, worker_id.clone(), &job_desc, JobStatus::Error) - .await - .expect("Failed to insert job"); + let job_id = register_job(&conn, &job_desc, &worker_id, Some(JobStatus::Error)).await; // Wait longer than initial backoff (1 second for retry_index 0) tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; @@ -786,12 +714,10 @@ async fn reschedule_updates_status_and_worker() { let job_desc = raw_descriptor(&serde_json::json!({"test": "job"})); - let job_id = jobs::sql::insert(&conn, worker_id1.clone(), &job_desc, JobStatus::Error) - .await - .expect("Failed to insert job"); + let job_id = register_job(&conn, &job_desc, &worker_id1, Some(JobStatus::Error)).await; //* When - jobs::sql::reschedule(&conn, job_id, &worker_id2) + job_status::sql::reschedule(&conn, job_id, &worker_id2) .await .expect("Failed to reschedule job"); @@ -804,9 +730,3 @@ async fn reschedule_updates_status_and_worker() { assert_eq!(job.node_id, worker_id2); assert!(job.updated_at > job.created_at); } - -/// Helper to create a [`JobDescriptorRaw`] from a [`serde_json::Value`]. -fn raw_descriptor(value: &serde_json::Value) -> JobDescriptorRaw<'static> { - let raw = serde_json::value::to_raw_value(value).expect("Failed to serialize to raw value"); - JobDescriptorRaw::from_owned_unchecked(raw) -} diff --git a/crates/core/metadata-db/src/lib.rs b/crates/core/metadata-db/src/lib.rs index 6634d58cf..0629a7006 100644 --- a/crates/core/metadata-db/src/lib.rs +++ b/crates/core/metadata-db/src/lib.rs @@ -11,6 +11,7 @@ mod error; pub mod files; pub mod gc; pub mod job_events; +pub mod job_status; pub mod jobs; pub mod manifests; pub mod notification_multiplexer; diff --git a/crates/core/metadata-db/src/physical_table/sql.rs b/crates/core/metadata-db/src/physical_table/sql.rs index 6336bbadd..bfb89ffd1 100644 --- a/crates/core/metadata-db/src/physical_table/sql.rs +++ b/crates/core/metadata-db/src/physical_table/sql.rs @@ -187,10 +187,11 @@ where SELECT pt.table_name, ptr.writer AS job_id, - j.status AS job_status + js.status AS job_status FROM physical_tables pt JOIN physical_table_revisions ptr ON ptr.id = pt.active_revision_id LEFT JOIN jobs j ON ptr.writer = j.id + LEFT JOIN jobs_status js ON j.id = js.job_id WHERE pt.manifest_hash = $1 ORDER BY pt.table_name "#}; diff --git a/crates/core/metadata-db/src/physical_table_revision/sql.rs b/crates/core/metadata-db/src/physical_table_revision/sql.rs index 6d12894c1..e1527715a 100644 --- a/crates/core/metadata-db/src/physical_table_revision/sql.rs +++ b/crates/core/metadata-db/src/physical_table_revision/sql.rs @@ -138,13 +138,14 @@ where -- Writer job fields (optional) j.id AS writer_job_id, - j.node_id AS writer_job_node_id, - j.status AS writer_job_status, + js.node_id AS writer_job_node_id, + js.status AS writer_job_status, j.descriptor AS writer_job_descriptor, j.created_at AS writer_job_created_at, - j.updated_at AS writer_job_updated_at + js.updated_at AS writer_job_updated_at FROM physical_table_revisions ptr LEFT JOIN jobs j ON ptr.writer = j.id + LEFT JOIN jobs_status js ON j.id = js.job_id WHERE ptr.id = $1 "}; diff --git a/crates/core/metadata-db/src/physical_table_revision/tests/it_crud.rs b/crates/core/metadata-db/src/physical_table_revision/tests/it_crud.rs index cda207125..2306731d8 100644 --- a/crates/core/metadata-db/src/physical_table_revision/tests/it_crud.rs +++ b/crates/core/metadata-db/src/physical_table_revision/tests/it_crud.rs @@ -4,7 +4,7 @@ use crate::{ datasets::{DatasetName, DatasetNamespace}, db::Connection, error::Error, - jobs::{self, JobId}, + jobs::{self, JobId, JobStatus}, manifests::ManifestHash, physical_table::{self, TableName}, physical_table_revision::{self, LocationId, TablePath}, @@ -196,9 +196,7 @@ async fn get_by_location_id_with_details_returns_revision_with_writer_when_assig serde_json::value::to_raw_value(&serde_json::json!({"operation": "write"})) .expect("Failed to serialize job description"), ); - let job_id = jobs::sql::insert_with_default_status(&mut conn, worker_id, &job_desc) - .await - .expect("Failed to register job"); + let job_id = register_job(&mut conn, &job_desc, &worker_id).await; let table_name = TableName::from_ref_unchecked("writer_table"); let path = TablePath::from_ref_unchecked("test-dataset/writer_table/writer-assigned-revision"); @@ -489,6 +487,25 @@ async fn register_table_and_revision( Ok(revision_id) } +// TODO: Import from tests::common once this file is migrated from Connection to MetadataDb pool. +/// Local helper to register a job using a `&mut Connection` (3-step: insert job → event → status). +async fn register_job( + conn: &mut Connection, + job_desc: &jobs::JobDescriptorRaw<'_>, + worker_id: &WorkerNodeId<'_>, +) -> JobId { + let job_id = jobs::register(&mut *conn, worker_id, job_desc) + .await + .expect("Failed to register job"); + crate::job_events::register(&mut *conn, job_id, worker_id, JobStatus::Scheduled) + .await + .expect("Failed to register job event"); + crate::job_status::register(&mut *conn, job_id, worker_id, JobStatus::Scheduled) + .await + .expect("Failed to register job status"); + job_id +} + #[tokio::test] async fn assign_job_writer_assigns_job_to_multiple_locations() { //* Given @@ -508,7 +525,7 @@ async fn assign_job_writer_assigns_job_to_multiple_locations() { // Create a worker and job let worker_id = WorkerNodeId::from_ref_unchecked("test-writer-worker"); - let worker_info = WorkerInfo::default(); // {} + let worker_info = WorkerInfo::default(); workers::register(&mut conn, &worker_id, worker_info) .await .expect("Failed to register worker"); @@ -517,9 +534,7 @@ async fn assign_job_writer_assigns_job_to_multiple_locations() { serde_json::value::to_raw_value(&serde_json::json!({"operation": "write"})) .expect("Failed to serialize job description"), ); - let job_id = jobs::sql::insert_with_default_status(&mut conn, worker_id, &job_desc) - .await - .expect("Failed to register job"); + let job_id = register_job(&mut conn, &job_desc, &worker_id).await; let table_name = TableName::from_ref_unchecked("output_table"); diff --git a/crates/core/metadata-db/src/tests/common.rs b/crates/core/metadata-db/src/tests/common.rs new file mode 100644 index 000000000..13ec79207 --- /dev/null +++ b/crates/core/metadata-db/src/tests/common.rs @@ -0,0 +1,60 @@ +use pgtemp::PgTempDB; + +use crate::{ + config::DEFAULT_POOL_MAX_CONNECTIONS, + job_events, job_status, + jobs::{self, JobDescriptorRaw, JobStatus}, + workers::{self, WorkerInfo, WorkerNodeId}, +}; + +/// Default worker ID used by [`setup_test_db`]. +pub const TEST_WORKER_ID: &str = "test-worker"; + +/// Helper to create a [`JobDescriptorRaw`] from a [`serde_json::Value`]. +pub fn raw_descriptor(value: &serde_json::Value) -> JobDescriptorRaw<'static> { + let raw = serde_json::value::to_raw_value(value).expect("Failed to serialize to raw value"); + JobDescriptorRaw::from_owned_unchecked(raw) +} + +/// Set up a test database with a single registered worker ([`TEST_WORKER_ID`]). +/// +/// Returns the temp DB handle (must be kept alive) and the connection pool. +/// Use [`TEST_WORKER_ID`] to reference the pre-registered worker. +pub async fn setup_test_db() -> (PgTempDB, crate::MetadataDb) { + let temp_db = PgTempDB::new(); + let conn = + crate::connect_pool_with_retry(&temp_db.connection_uri(), DEFAULT_POOL_MAX_CONNECTIONS) + .await + .expect("Failed to connect to metadata db"); + + let worker_id = WorkerNodeId::from_ref_unchecked(TEST_WORKER_ID); + workers::register(&conn, worker_id, WorkerInfo::default()) + .await + .expect("Failed to register worker"); + + (temp_db, conn) +} + +/// Helper to register a job with its event and status in a single transaction. +/// +/// Performs the 3-step atomic registration: insert job → register event → register status. +pub async fn register_job( + conn: &crate::MetadataDb, + job_desc: &JobDescriptorRaw<'_>, + worker_id: &WorkerNodeId<'_>, + status: Option, +) -> jobs::JobId { + let status = status.unwrap_or(JobStatus::Scheduled); + let mut tx = conn.begin_txn().await.expect("Failed to begin transaction"); + let job_id = jobs::register(&mut tx, worker_id, job_desc) + .await + .expect("Failed to register job"); + job_events::register(&mut tx, job_id, worker_id, status) + .await + .expect("Failed to register job event"); + job_status::register(&mut tx, job_id, worker_id, status) + .await + .expect("Failed to register job status"); + tx.commit().await.expect("Failed to commit transaction"); + job_id +} diff --git a/crates/core/metadata-db/src/tests/it_workers_events.rs b/crates/core/metadata-db/src/tests/it_workers_events.rs index 1cd1b1605..a360dddf2 100644 --- a/crates/core/metadata-db/src/tests/it_workers_events.rs +++ b/crates/core/metadata-db/src/tests/it_workers_events.rs @@ -6,6 +6,7 @@ use pgtemp::PgTempDB; use crate::{ config::DEFAULT_POOL_MAX_CONNECTIONS, jobs::{JobId, JobStatus}, + tests::common::register_job, workers::{self, WorkerInfo, WorkerNodeId}, }; @@ -52,9 +53,7 @@ async fn schedule_job_and_receive_notification() { //* When // Register the job - let job_id = crate::jobs::register(&conn, &worker_id, &job_desc) - .await - .expect("Failed to register job"); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; // Send notification to the worker workers::send_job_notif( diff --git a/crates/core/metadata-db/src/tests/it_workers_jobs.rs b/crates/core/metadata-db/src/tests/it_workers_jobs.rs index e03e4b3e0..5cf7c8562 100644 --- a/crates/core/metadata-db/src/tests/it_workers_jobs.rs +++ b/crates/core/metadata-db/src/tests/it_workers_jobs.rs @@ -5,6 +5,7 @@ use pgtemp::PgTempDB; use crate::{ config::DEFAULT_POOL_MAX_CONNECTIONS, jobs::{self, JobStatus}, + tests::common::{raw_descriptor, register_job}, workers::{self, WorkerInfo, WorkerNodeId}, }; @@ -36,9 +37,7 @@ async fn schedule_and_retrieve_job() { //* When // Register the job - let job_id = jobs::register(&conn, &worker_id, &job_desc) - .await - .expect("Failed to register job"); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; // Get the job let job = jobs::get_by_id(&conn, job_id) @@ -78,9 +77,7 @@ async fn pagination_traverses_all_jobs_ordered() { "table": "test-table", })); - let job_id = jobs::register(&conn, &worker_id, &job_desc) - .await - .expect("Failed to register job"); + let job_id = register_job(&conn, &job_desc, &worker_id, None).await; created_job_ids.push(job_id); } @@ -114,9 +111,3 @@ async fn pagination_traverses_all_jobs_ordered() { assert!(all_jobs[i].id > all_jobs[i + 1].id); } } - -/// Helper to create a [`jobs::JobDescriptorRaw`] from a [`serde_json::Value`]. -fn raw_descriptor(value: &serde_json::Value) -> jobs::JobDescriptorRaw<'static> { - let raw = serde_json::value::to_raw_value(value).expect("Failed to serialize to raw value"); - jobs::JobDescriptorRaw::from_owned_unchecked(raw) -} diff --git a/crates/core/metadata-db/src/tests/mod.rs b/crates/core/metadata-db/src/tests/mod.rs index c844cba43..a8cb3a3ba 100644 --- a/crates/core/metadata-db/src/tests/mod.rs +++ b/crates/core/metadata-db/src/tests/mod.rs @@ -2,3 +2,5 @@ mod it_txn; mod it_workers_events; mod it_workers_heartbeat; mod it_workers_jobs; + +pub(crate) mod common; diff --git a/crates/services/admin-api/src/scheduler.rs b/crates/services/admin-api/src/scheduler.rs index 3a95e5448..8e1a62c73 100644 --- a/crates/services/admin-api/src/scheduler.rs +++ b/crates/services/admin-api/src/scheduler.rs @@ -191,6 +191,15 @@ pub enum ScheduleJobError { #[error("specified worker '{0}' not found or inactive")] WorkerNotAvailable(NodeId), + /// Failed to begin transaction for schedule operation + /// + /// This occurs when: + /// - Database connection pool is exhausted + /// - Database connection fails or is lost + /// - Transaction initialization encounters an error + #[error("failed to begin transaction")] + BeginTransaction(#[source] metadata_db::Error), + /// Failed to register job in the metadata database /// /// This occurs when: @@ -200,6 +209,15 @@ pub enum ScheduleJobError { #[error("failed to register job: {0}")] RegisterJob(#[source] metadata_db::Error), + /// Failed to register job status in the metadata database + /// + /// This occurs when: + /// - Job status insertion into database fails + /// - Unique constraint violation on job ID + /// - Connection is lost during job status registration + #[error("failed to register job status: {0}")] + RegisterJobStatus(#[source] metadata_db::Error), + /// Failed to register job event in the metadata database /// /// This occurs when: @@ -208,15 +226,6 @@ pub enum ScheduleJobError { #[error("failed to register job event: {0}")] RegisterJobEvent(#[source] metadata_db::Error), - /// Failed to begin transaction for schedule operation - /// - /// This occurs when: - /// - Database connection pool is exhausted - /// - Database connection fails or is lost - /// - Transaction initialization encounters an error - #[error("failed to begin transaction")] - BeginTransaction(#[source] metadata_db::Error), - /// Failed to send job notification to worker /// /// This occurs when: diff --git a/crates/services/controller/src/scheduler.rs b/crates/services/controller/src/scheduler.rs index eb754ad4a..115fe237d 100644 --- a/crates/services/controller/src/scheduler.rs +++ b/crates/services/controller/src/scheduler.rs @@ -40,7 +40,7 @@ use datasets_common::{ hash::Hash, hash_reference::HashReference, name::Name, namespace::Namespace, }; use metadata_db::{ - Error as MetadataDbError, MetadataDb, jobs::JobStatusUpdateError, workers::Worker, + Error as MetadataDbError, MetadataDb, job_status::JobStatusUpdateError, workers::Worker, }; use monitoring::logging; use rand::seq::IndexedRandom as _; @@ -144,6 +144,10 @@ impl Scheduler { .map(Into::into) .map_err(ScheduleJobError::RegisterJob)?; + metadata_db::job_status::register(&mut tx, job_id, &node_id, JobStatus::Scheduled.into()) + .await + .map_err(ScheduleJobError::RegisterJobStatus)?; + metadata_db::job_events::register(&mut tx, job_id, &node_id, JobStatus::Scheduled.into()) .await .map_err(ScheduleJobError::RegisterJobEvent)?; @@ -171,14 +175,8 @@ impl Scheduler { .await .map_err(StopJobError::BeginTransaction)?; - // Fetch the job to get its node_id and validate it exists - let job = metadata_db::jobs::get_by_id(&mut tx, &job_id) - .await - .map_err(StopJobError::GetJob)? - .ok_or(StopJobError::JobNotFound)?; - // Attempt to stop the job - metadata_db::jobs::request_stop(&mut tx, &job_id) + let status_changed = metadata_db::job_status::request_stop(&mut tx, &job_id) .await .map_err(|err| match err { MetadataDbError::JobStatusUpdate(JobStatusUpdateError::NotFound) => { @@ -201,19 +199,30 @@ impl Scheduler { other => StopJobError::UpdateJobStatus(other), })?; - metadata_db::job_events::register( - &mut tx, - job_id, - &job.node_id, - JobStatus::StopRequested.into(), - ) - .await - .map_err(StopJobError::RegisterJobEvent)?; - - // Notify the worker about the stop request (within the transaction) - metadata_db::workers::send_job_notif(&mut tx, job.node_id, &JobNotification::stop(job_id)) + if status_changed { + let job = metadata_db::jobs::get_by_id(&mut tx, &job_id) + .await + .map_err(StopJobError::GetJob)? + .ok_or(StopJobError::JobNotFound)?; + + metadata_db::job_events::register( + &mut tx, + job_id, + &job.node_id, + JobStatus::StopRequested.into(), + ) + .await + .map_err(StopJobError::RegisterJobEvent)?; + + // Notify the worker about the stop request (within the transaction) + metadata_db::workers::send_job_notif( + &mut tx, + job.node_id, + &JobNotification::stop(job_id), + ) .await .map_err(StopJobError::SendNotification)?; + } // Commit the transaction tx.commit().await.map_err(StopJobError::CommitTransaction)?; diff --git a/crates/services/worker/src/service/job_queue.rs b/crates/services/worker/src/service/job_queue.rs index 75a719513..c8fb66464 100644 --- a/crates/services/worker/src/service/job_queue.rs +++ b/crates/services/worker/src/service/job_queue.rs @@ -124,7 +124,7 @@ impl JobQueue { let node_id = WorkerNodeId::from(node_id); (async || { let mut tx = self.metadata_db.begin_txn().await?; - metadata_db::jobs::mark_running(&mut tx, job_id).await?; + metadata_db::job_status::mark_running(&mut tx, job_id).await?; metadata_db::job_events::register(&mut tx, job_id, &node_id, JobStatus::Running) .await?; tx.commit().await?; @@ -159,7 +159,7 @@ impl JobQueue { let node_id = WorkerNodeId::from(node_id); (async || { let mut tx = self.metadata_db.begin_txn().await?; - metadata_db::jobs::mark_stopping(&mut tx, job_id).await?; + metadata_db::job_status::mark_stopping(&mut tx, job_id).await?; metadata_db::job_events::register(&mut tx, job_id, &node_id, JobStatus::Stopping) .await?; tx.commit().await?; @@ -194,7 +194,7 @@ impl JobQueue { let node_id = WorkerNodeId::from(node_id); (async || { let mut tx = self.metadata_db.begin_txn().await?; - metadata_db::jobs::mark_stopped(&mut tx, job_id).await?; + metadata_db::job_status::mark_stopped(&mut tx, job_id).await?; metadata_db::job_events::register(&mut tx, job_id, &node_id, JobStatus::Stopped) .await?; tx.commit().await?; @@ -231,7 +231,7 @@ impl JobQueue { let node_id = WorkerNodeId::from(node_id); (async || { let mut tx = self.metadata_db.begin_txn().await?; - metadata_db::jobs::mark_completed(&mut tx, job_id).await?; + metadata_db::job_status::mark_completed(&mut tx, job_id).await?; metadata_db::job_events::register(&mut tx, job_id, &node_id, JobStatus::Completed) .await?; tx.commit().await?; @@ -270,7 +270,7 @@ impl JobQueue { if fatal { (async || { let mut tx = self.metadata_db.begin_txn().await?; - metadata_db::jobs::mark_failed_fatal(&mut tx, job_id).await?; + metadata_db::job_status::mark_failed_fatal(&mut tx, job_id).await?; metadata_db::job_events::register(&mut tx, job_id, &node_id, JobStatus::Fatal) .await?; tx.commit().await?; @@ -290,7 +290,7 @@ impl JobQueue { } else { (async || { let mut tx = self.metadata_db.begin_txn().await?; - metadata_db::jobs::mark_failed_recoverable(&mut tx, job_id).await?; + metadata_db::job_status::mark_failed_recoverable(&mut tx, job_id).await?; metadata_db::job_events::register(&mut tx, job_id, &node_id, JobStatus::Error) .await?; tx.commit().await?;