From f4992359eb07fa8b209595623e739de8cd19ec13 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Fri, 8 May 2026 07:01:07 +0530 Subject: [PATCH] test(workflow_ops): unit tests for module helpers --- .../src/py_queue/workflow_ops/mod.rs | 241 ++++++++++++++++++ 1 file changed, 241 insertions(+) diff --git a/crates/taskito-python/src/py_queue/workflow_ops/mod.rs b/crates/taskito-python/src/py_queue/workflow_ops/mod.rs index c7eb43e..cf5e69e 100644 --- a/crates/taskito-python/src/py_queue/workflow_ops/mod.rs +++ b/crates/taskito-python/src/py_queue/workflow_ops/mod.rs @@ -122,3 +122,244 @@ pub(super) fn cascade_skip_pending_nodes( } Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + + use taskito_core::job::{now_millis, NewJob}; + use taskito_core::storage::sqlite::SqliteStorage; + use taskito_workflows::{ + WorkflowDefinition, WorkflowRun, WorkflowSqliteStorage, WorkflowStorageBackend, + }; + + fn make_storages() -> (StorageBackend, WorkflowStorageBackend) { + let sql = SqliteStorage::in_memory().unwrap(); + let backend = StorageBackend::Sqlite(sql.clone()); + let wf = WorkflowSqliteStorage::new(sql).unwrap(); + let wf_backend = WorkflowStorageBackend::Sqlite(wf); + (backend, wf_backend) + } + + fn enqueue_test_job(storage: &StorageBackend, task_name: &str) -> String { + let job = storage + .enqueue(NewJob { + queue: "default".to_string(), + task_name: task_name.to_string(), + payload: vec![1, 2, 3], + priority: 0, + scheduled_at: now_millis(), + max_retries: 3, + timeout_ms: 300_000, + unique_key: None, + metadata: None, + depends_on: vec![], + expires_at: None, + result_ttl_ms: None, + namespace: None, + }) + .unwrap(); + job.id + } + + fn seed_run(wf_storage: &WorkflowStorageBackend) -> String { + let now = now_millis(); + let definition = WorkflowDefinition { + id: uuid::Uuid::now_v7().to_string(), + name: "audit_pipeline".to_string(), + version: 1, + dag_data: vec![], + step_metadata: HashMap::new(), + created_at: now, + }; + wf_storage.create_workflow_definition(&definition).unwrap(); + + let run = WorkflowRun { + id: uuid::Uuid::now_v7().to_string(), + definition_id: definition.id, + params: None, + state: WorkflowState::Running, + started_at: Some(now), + completed_at: None, + error: None, + parent_run_id: None, + parent_node_name: None, + created_at: now, + }; + let run_id = run.id.clone(); + wf_storage.create_workflow_run(&run).unwrap(); + run_id + } + + fn seed_node( + wf_storage: &WorkflowStorageBackend, + run_id: &str, + node_name: &str, + status: WorkflowNodeStatus, + job_id: Option, + ) -> WorkflowNode { + let node = WorkflowNode { + id: uuid::Uuid::now_v7().to_string(), + run_id: run_id.to_string(), + node_name: node_name.to_string(), + job_id, + status, + result_hash: None, + fan_out_count: None, + fan_in_data: None, + started_at: None, + completed_at: None, + error: None, + }; + wf_storage.create_workflow_node(&node).unwrap(); + node + } + + fn fetch_node( + wf_storage: &WorkflowStorageBackend, + run_id: &str, + node_name: &str, + ) -> WorkflowNode { + wf_storage + .get_workflow_nodes(run_id) + .unwrap() + .into_iter() + .find(|n| n.node_name == node_name) + .unwrap() + } + + #[test] + fn build_metadata_json_round_trips_special_characters() { + let json = build_metadata_json("run-1", "node\\with\"quotes\nand\ttabs"); + let v: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(v["workflow_run_id"], "run-1"); + assert_eq!(v["workflow_node_name"], "node\\with\"quotes\nand\ttabs"); + } + + #[test] + fn build_metadata_json_preserves_unicode_node_names() { + let json = build_metadata_json("run-2", "ノード/ステップ"); + let v: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(v["workflow_node_name"], "ノード/ステップ"); + } + + #[test] + fn parse_step_metadata_round_trips_minimal_payload() { + let json = r#"{ + "extract": { + "task_name": "task_extract", + "queue": null, + "args_template": null, + "kwargs_template": null, + "max_retries": null, + "timeout_ms": null, + "priority": null, + "fan_out": null, + "fan_in": null, + "condition": null + } + }"#; + let map = parse_step_metadata(json).unwrap(); + assert_eq!(map.len(), 1); + assert_eq!(map["extract"].task_name, "task_extract"); + } + + #[test] + fn parse_step_metadata_rejects_invalid_json() { + // PyValueError construction needs a live Python interpreter. + pyo3::prepare_freethreaded_python(); + let err = parse_step_metadata("not-json").unwrap_err(); + assert!(err.to_string().contains("invalid step_metadata JSON")); + } + + #[test] + fn status_to_py_returns_canonical_strings() { + assert_eq!(status_to_py(WorkflowState::Pending), "pending"); + assert_eq!(status_to_py(WorkflowState::Running), "running"); + assert_eq!(status_to_py(WorkflowState::Completed), "completed"); + assert_eq!(status_to_py(WorkflowState::Failed), "failed"); + assert_eq!(status_to_py(WorkflowState::Cancelled), "cancelled"); + } + + #[test] + fn cascade_skip_skips_pending_and_ready_only() { + let (storage, wf_storage) = make_storages(); + let run_id = seed_run(&wf_storage); + seed_node(&wf_storage, &run_id, "p", WorkflowNodeStatus::Pending, None); + seed_node(&wf_storage, &run_id, "r", WorkflowNodeStatus::Ready, None); + seed_node( + &wf_storage, + &run_id, + "running", + WorkflowNodeStatus::Running, + None, + ); + seed_node( + &wf_storage, + &run_id, + "done", + WorkflowNodeStatus::Completed, + None, + ); + + let nodes = wf_storage.get_workflow_nodes(&run_id).unwrap(); + cascade_skip_pending_nodes(&storage, &wf_storage, &run_id, &nodes).unwrap(); + + assert_eq!( + fetch_node(&wf_storage, &run_id, "p").status, + WorkflowNodeStatus::Skipped, + ); + assert_eq!( + fetch_node(&wf_storage, &run_id, "r").status, + WorkflowNodeStatus::Skipped, + ); + assert_eq!( + fetch_node(&wf_storage, &run_id, "running").status, + WorkflowNodeStatus::Running, + ); + assert_eq!( + fetch_node(&wf_storage, &run_id, "done").status, + WorkflowNodeStatus::Completed, + ); + } + + #[test] + fn cascade_skip_cancels_pending_node_jobs() { + let (storage, wf_storage) = make_storages(); + let run_id = seed_run(&wf_storage); + + let pending_job_id = enqueue_test_job(&storage, "task_pending"); + let running_job_id = enqueue_test_job(&storage, "task_running"); + seed_node( + &wf_storage, + &run_id, + "p", + WorkflowNodeStatus::Pending, + Some(pending_job_id.clone()), + ); + seed_node( + &wf_storage, + &run_id, + "running", + WorkflowNodeStatus::Running, + Some(running_job_id.clone()), + ); + + let nodes = wf_storage.get_workflow_nodes(&run_id).unwrap(); + cascade_skip_pending_nodes(&storage, &wf_storage, &run_id, &nodes).unwrap(); + + let pending_job = storage.get_job(&pending_job_id).unwrap().unwrap(); + assert_eq!(pending_job.status.wire_name(), "Cancelled"); + + let running_job = storage.get_job(&running_job_id).unwrap().unwrap(); + assert_ne!(running_job.status.wire_name(), "Cancelled"); + } + + #[test] + fn cascade_skip_is_a_noop_for_empty_node_slice() { + let (storage, wf_storage) = make_storages(); + let run_id = seed_run(&wf_storage); + cascade_skip_pending_nodes(&storage, &wf_storage, &run_id, &[]).unwrap(); + assert!(wf_storage.get_workflow_nodes(&run_id).unwrap().is_empty()); + } +}