From 59813276c9989cb839c819fb398e81f562f784b9 Mon Sep 17 00:00:00 2001 From: Kush Bisen Date: Thu, 9 Apr 2026 16:22:18 +0200 Subject: [PATCH 1/2] runtime: harden query lifecycle state handling --- src/api/janus_api.rs | 57 +++++++++++++++++++++++---- src/http/server.rs | 11 +----- src/registry/query_registry.rs | 16 ++++++++ tests/http_server_integration_test.rs | 3 +- tests/janus_api_integration_test.rs | 54 +++++++++++++++++++++++++ 5 files changed, 123 insertions(+), 18 deletions(-) diff --git a/src/api/janus_api.rs b/src/api/janus_api.rs index aaebd25..9ea3866 100644 --- a/src/api/janus_api.rs +++ b/src/api/janus_api.rs @@ -99,7 +99,7 @@ struct RunningQuery { historical_handles: Vec>, baseline_handle: Option>, live_handle: Option>, - mqtt_subscriber_handle: Option>, + mqtt_subscriber_handles: Vec>, // shutdown sender signals used to stop the workers shutdown_senders: Vec>, // MQTT subscriber instances (for stopping) @@ -224,13 +224,13 @@ impl JanusApi { parsed.baseline.as_ref().map(|baseline| baseline.window_name.clone()); let mut historical_handles = Vec::new(); let mut shutdown_senders = Vec::new(); - let status = Arc::new(RwLock::new( + let initial_status = if !parsed.live_windows.is_empty() && !parsed.historical_windows.is_empty() { ExecutionStatus::WarmingBaseline } else { ExecutionStatus::Running - }, - )); + }; + let status = Arc::new(RwLock::new(initial_status.clone())); // 4. Spawn historical worker threads (one per historical window) for (i, window) in parsed.historical_windows.iter().enumerate() { @@ -308,7 +308,7 @@ impl JanusApi { // 5. Spawn live worker thread and MQTT subscribers (if there are live windows) let mut mqtt_subscribers = Vec::new(); - let mut mqtt_subscriber_handle = None; + let mut mqtt_subscriber_handles = Vec::new(); let mut baseline_handle = None; let live_handle = if !parsed.live_windows.is_empty() && !parsed.rspql_query.is_empty() { @@ -354,6 +354,8 @@ impl JanusApi { let parsed_clone = parsed.clone(); let processor_for_baseline = Arc::clone(&live_processor); let status_for_baseline = Arc::clone(&status); + let registry_for_baseline = Arc::clone(&self.registry); + let query_id_for_baseline = query_id.clone(); let baseline_mode = effective_baseline_mode; let baseline_window = effective_baseline_window.clone(); let (baseline_shutdown_tx, baseline_shutdown_rx) = mpsc::channel::<()>(); @@ -390,12 +392,18 @@ impl JanusApi { *state = ExecutionStatus::Running; } } + let _ = + registry_for_baseline.set_status(&query_id_for_baseline, "Running"); } Err(err) => { eprintln!("Async baseline warm-up error: {}", err); if let Ok(mut state) = status_for_baseline.write() { *state = ExecutionStatus::Failed(err.to_string()); } + let _ = registry_for_baseline.set_status( + &query_id_for_baseline, + format!("Failed({err})"), + ); } } })); @@ -431,7 +439,7 @@ impl JanusApi { }); mqtt_subscribers.push(subscriber); - mqtt_subscriber_handle = Some(sub_handle); + mqtt_subscriber_handles.push(sub_handle); } // Spawn live worker thread to receive results @@ -470,6 +478,21 @@ impl JanusApi { None }; + self.registry.increment_execution_count(query_id).map_err(|e| { + JanusApiError::RegistryError(format!( + "Failed to increment execution count for '{}': {}", + query_id, e + )) + })?; + self.registry + .set_status(query_id, format!("{:?}", initial_status)) + .map_err(|e| { + JanusApiError::RegistryError(format!( + "Failed to update query status for '{}': {}", + query_id, e + )) + })?; + // 6. Store running query information let running = RunningQuery { metadata, @@ -479,7 +502,7 @@ impl JanusApi { historical_handles, baseline_handle, live_handle, - mqtt_subscriber_handle, + mqtt_subscriber_handles, shutdown_senders, mqtt_subscribers, }; @@ -506,6 +529,7 @@ impl JanusApi { let running = running_map.remove(query_id).ok_or_else(|| { JanusApiError::ExecutionError(format!("Query '{}' is not running", query_id)) })?; + drop(running_map); // Send shutdown signals for shutdown_tx in running.shutdown_senders { @@ -521,6 +545,25 @@ impl JanusApi { if let Ok(mut status) = running.status.write() { *status = ExecutionStatus::Stopped; } + self.registry.set_status(query_id, "Stopped").map_err(|e| { + JanusApiError::RegistryError(format!( + "Failed to update query status for '{}': {}", + query_id, e + )) + })?; + + for handle in running.historical_handles { + let _ = handle.join(); + } + if let Some(handle) = running.baseline_handle { + let _ = handle.join(); + } + if let Some(handle) = running.live_handle { + let _ = handle.join(); + } + for handle in running.mqtt_subscriber_handles { + let _ = handle.join(); + } Ok(()) } diff --git a/src/http/server.rs b/src/http/server.rs index 8c1fead..babc977 100644 --- a/src/http/server.rs +++ b/src/http/server.rs @@ -296,15 +296,6 @@ async fn get_query( .ok_or_else(|| ApiError::NotFound(format!("Query '{}' not found", query_id)))?; let is_running = state.janus_api.is_running(&query_id); - let status = if is_running { - state - .janus_api - .get_query_status(&query_id) - .map(|s| format!("{:?}", s)) - .unwrap_or_else(|| "Unknown".to_string()) - } else { - "Registered".to_string() - }; Ok(Json(QueryDetailsResponse { query_id: metadata.query_id, @@ -313,7 +304,7 @@ async fn get_query( registered_at: metadata.registered_at, execution_count: metadata.execution_count, is_running, - status, + status: metadata.status, })) } diff --git a/src/registry/query_registry.rs b/src/registry/query_registry.rs index 3f0d716..1837410 100644 --- a/src/registry/query_registry.rs +++ b/src/registry/query_registry.rs @@ -15,6 +15,7 @@ pub struct QueryMetadata { pub baseline_mode: BaselineBootstrapMode, pub registered_at: u64, pub execution_count: u64, + pub status: String, pub subscribers: Vec, } @@ -102,6 +103,7 @@ impl QueryRegistry { baseline_mode, registered_at: Self::current_timestamp(), execution_count: 0, + status: "Registered".to_string(), subscribers: Vec::new(), }; @@ -145,6 +147,20 @@ impl QueryRegistry { Ok(()) } + pub fn set_status( + &self, + query_id: &QueryId, + status: impl Into, + ) -> Result<(), QueryRegistryError> { + let mut queries = self.queries.write().unwrap(); + let query = queries + .get_mut(query_id) + .ok_or_else(|| QueryRegistryError::QueryNotFound(query_id.clone()))?; + + query.status = status.into(); + Ok(()) + } + /// To remove a query from the registry pub fn unregister(&self, query_id: &QueryId) -> Result { let mut queries = self.queries.write().unwrap(); diff --git a/tests/http_server_integration_test.rs b/tests/http_server_integration_test.rs index c0fbb8a..e27ea30 100644 --- a/tests/http_server_integration_test.rs +++ b/tests/http_server_integration_test.rs @@ -226,7 +226,8 @@ async fn test_stop_route_stops_running_query_and_delete_requires_stop() { assert!(get_response.status().is_success()); let get_body: Value = get_response.json().await.expect("invalid get response"); assert_eq!(get_body["is_running"], false); - assert_eq!(get_body["status"], "Registered"); + assert_eq!(get_body["status"], "Stopped"); + assert_eq!(get_body["execution_count"], 1); let delete_response = server .client diff --git a/tests/janus_api_integration_test.rs b/tests/janus_api_integration_test.rs index 1a52003..2228d64 100644 --- a/tests/janus_api_integration_test.rs +++ b/tests/janus_api_integration_test.rs @@ -355,6 +355,60 @@ fn test_stop_query() { ); } +#[test] +fn test_execution_count_and_status_update_across_lifecycle() { + let storage = Arc::new( + StreamingSegmentedStorage::new(StreamingConfig::default()) + .expect("Failed to create storage"), + ); + let parser = JanusQLParser::new().expect("Failed to create parser"); + let registry = Arc::new(QueryRegistry::new()); + + let api = + JanusApi::new(parser, Arc::clone(®istry), storage).expect("Failed to create API"); + + let janusql = r#" + PREFIX ex: + SELECT ?s + FROM NAMED WINDOW ex:w ON STREAM ex:stream1 [RANGE 1000 STEP 200] + WHERE { WINDOW ex:w { ?s ?p ?o } } + "#; + + let metadata = api + .register_query("lifecycle_query".into(), janusql) + .expect("Failed to register query"); + assert_eq!(metadata.execution_count, 0); + assert_eq!(metadata.status, "Registered"); + + let _handle = api + .start_query(&"lifecycle_query".into()) + .expect("Failed to start query"); + + let after_start = registry + .get(&"lifecycle_query".into()) + .expect("query should exist after start"); + assert_eq!(after_start.execution_count, 1); + assert_eq!(after_start.status, "Running"); + + api.stop_query(&"lifecycle_query".into()).expect("Failed to stop query"); + + let after_stop = registry + .get(&"lifecycle_query".into()) + .expect("query should exist after stop"); + assert_eq!(after_stop.execution_count, 1); + assert_eq!(after_stop.status, "Stopped"); + + let _handle = api + .start_query(&"lifecycle_query".into()) + .expect("Failed to restart query"); + + let after_restart = registry + .get(&"lifecycle_query".into()) + .expect("query should exist after restart"); + assert_eq!(after_restart.execution_count, 2); + assert_eq!(after_restart.status, "Running"); +} + #[test] fn test_multiple_queries_concurrent() { let storage = create_test_storage_with_data().expect("Failed to create storage"); From bedce0a6a8107e5c1e5d5f91c235968a3b81d194 Mon Sep 17 00:00:00 2001 From: Kush Bisen Date: Thu, 9 Apr 2026 16:28:49 +0200 Subject: [PATCH 2/2] style: format runtime lifecycle changes --- src/api/janus_api.rs | 10 ++++------ tests/janus_api_integration_test.rs | 21 +++++++-------------- 2 files changed, 11 insertions(+), 20 deletions(-) diff --git a/src/api/janus_api.rs b/src/api/janus_api.rs index 9ea3866..86ae114 100644 --- a/src/api/janus_api.rs +++ b/src/api/janus_api.rs @@ -392,18 +392,16 @@ impl JanusApi { *state = ExecutionStatus::Running; } } - let _ = - registry_for_baseline.set_status(&query_id_for_baseline, "Running"); + let _ = registry_for_baseline + .set_status(&query_id_for_baseline, "Running"); } Err(err) => { eprintln!("Async baseline warm-up error: {}", err); if let Ok(mut state) = status_for_baseline.write() { *state = ExecutionStatus::Failed(err.to_string()); } - let _ = registry_for_baseline.set_status( - &query_id_for_baseline, - format!("Failed({err})"), - ); + let _ = registry_for_baseline + .set_status(&query_id_for_baseline, format!("Failed({err})")); } } })); diff --git a/tests/janus_api_integration_test.rs b/tests/janus_api_integration_test.rs index 2228d64..ad6db96 100644 --- a/tests/janus_api_integration_test.rs +++ b/tests/janus_api_integration_test.rs @@ -364,8 +364,7 @@ fn test_execution_count_and_status_update_across_lifecycle() { let parser = JanusQLParser::new().expect("Failed to create parser"); let registry = Arc::new(QueryRegistry::new()); - let api = - JanusApi::new(parser, Arc::clone(®istry), storage).expect("Failed to create API"); + let api = JanusApi::new(parser, Arc::clone(®istry), storage).expect("Failed to create API"); let janusql = r#" PREFIX ex: @@ -380,27 +379,21 @@ fn test_execution_count_and_status_update_across_lifecycle() { assert_eq!(metadata.execution_count, 0); assert_eq!(metadata.status, "Registered"); - let _handle = api - .start_query(&"lifecycle_query".into()) - .expect("Failed to start query"); + let _handle = api.start_query(&"lifecycle_query".into()).expect("Failed to start query"); - let after_start = registry - .get(&"lifecycle_query".into()) - .expect("query should exist after start"); + let after_start = + registry.get(&"lifecycle_query".into()).expect("query should exist after start"); assert_eq!(after_start.execution_count, 1); assert_eq!(after_start.status, "Running"); api.stop_query(&"lifecycle_query".into()).expect("Failed to stop query"); - let after_stop = registry - .get(&"lifecycle_query".into()) - .expect("query should exist after stop"); + let after_stop = + registry.get(&"lifecycle_query".into()).expect("query should exist after stop"); assert_eq!(after_stop.execution_count, 1); assert_eq!(after_stop.status, "Stopped"); - let _handle = api - .start_query(&"lifecycle_query".into()) - .expect("Failed to restart query"); + let _handle = api.start_query(&"lifecycle_query".into()).expect("Failed to restart query"); let after_restart = registry .get(&"lifecycle_query".into())