Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 48 additions & 7 deletions src/api/janus_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ struct RunningQuery {
historical_handles: Vec<thread::JoinHandle<()>>,
baseline_handle: Option<thread::JoinHandle<()>>,
live_handle: Option<thread::JoinHandle<()>>,
mqtt_subscriber_handle: Option<thread::JoinHandle<()>>,
mqtt_subscriber_handles: Vec<thread::JoinHandle<()>>,
// shutdown sender signals used to stop the workers
shutdown_senders: Vec<Sender<()>>,
// MQTT subscriber instances (for stopping)
Expand Down Expand Up @@ -224,13 +224,13 @@ impl JanusApi {
parsed.baseline.as_ref().map(|baseline| baseline.window_name.clone());
let mut historical_handles = Vec::new();
let mut shutdown_senders = Vec::new();
let status = Arc::new(RwLock::new(
let initial_status =
if !parsed.live_windows.is_empty() && !parsed.historical_windows.is_empty() {
ExecutionStatus::WarmingBaseline
} else {
ExecutionStatus::Running
},
));
};
let status = Arc::new(RwLock::new(initial_status.clone()));

// 4. Spawn historical worker threads (one per historical window)
for (i, window) in parsed.historical_windows.iter().enumerate() {
Expand Down Expand Up @@ -308,7 +308,7 @@ impl JanusApi {

// 5. Spawn live worker thread and MQTT subscribers (if there are live windows)
let mut mqtt_subscribers = Vec::new();
let mut mqtt_subscriber_handle = None;
let mut mqtt_subscriber_handles = Vec::new();
let mut baseline_handle = None;

let live_handle = if !parsed.live_windows.is_empty() && !parsed.rspql_query.is_empty() {
Expand Down Expand Up @@ -354,6 +354,8 @@ impl JanusApi {
let parsed_clone = parsed.clone();
let processor_for_baseline = Arc::clone(&live_processor);
let status_for_baseline = Arc::clone(&status);
let registry_for_baseline = Arc::clone(&self.registry);
let query_id_for_baseline = query_id.clone();
let baseline_mode = effective_baseline_mode;
let baseline_window = effective_baseline_window.clone();
let (baseline_shutdown_tx, baseline_shutdown_rx) = mpsc::channel::<()>();
Expand Down Expand Up @@ -390,12 +392,16 @@ impl JanusApi {
*state = ExecutionStatus::Running;
}
}
let _ = registry_for_baseline
.set_status(&query_id_for_baseline, "Running");
}
Err(err) => {
eprintln!("Async baseline warm-up error: {}", err);
if let Ok(mut state) = status_for_baseline.write() {
*state = ExecutionStatus::Failed(err.to_string());
}
let _ = registry_for_baseline
.set_status(&query_id_for_baseline, format!("Failed({err})"));
}
}
}));
Expand Down Expand Up @@ -431,7 +437,7 @@ impl JanusApi {
});

mqtt_subscribers.push(subscriber);
mqtt_subscriber_handle = Some(sub_handle);
mqtt_subscriber_handles.push(sub_handle);
}

// Spawn live worker thread to receive results
Expand Down Expand Up @@ -470,6 +476,21 @@ impl JanusApi {
None
};

self.registry.increment_execution_count(query_id).map_err(|e| {
JanusApiError::RegistryError(format!(
"Failed to increment execution count for '{}': {}",
query_id, e
))
})?;
self.registry
.set_status(query_id, format!("{:?}", initial_status))
.map_err(|e| {
JanusApiError::RegistryError(format!(
"Failed to update query status for '{}': {}",
query_id, e
))
})?;

// 6. Store running query information
let running = RunningQuery {
metadata,
Expand All @@ -479,7 +500,7 @@ impl JanusApi {
historical_handles,
baseline_handle,
live_handle,
mqtt_subscriber_handle,
mqtt_subscriber_handles,
shutdown_senders,
mqtt_subscribers,
};
Expand All @@ -506,6 +527,7 @@ impl JanusApi {
let running = running_map.remove(query_id).ok_or_else(|| {
JanusApiError::ExecutionError(format!("Query '{}' is not running", query_id))
})?;
drop(running_map);

// Send shutdown signals
for shutdown_tx in running.shutdown_senders {
Expand All @@ -521,6 +543,25 @@ impl JanusApi {
if let Ok(mut status) = running.status.write() {
*status = ExecutionStatus::Stopped;
}
self.registry.set_status(query_id, "Stopped").map_err(|e| {
JanusApiError::RegistryError(format!(
"Failed to update query status for '{}': {}",
query_id, e
))
})?;

for handle in running.historical_handles {
let _ = handle.join();
}
if let Some(handle) = running.baseline_handle {
let _ = handle.join();
}
if let Some(handle) = running.live_handle {
let _ = handle.join();
}
for handle in running.mqtt_subscriber_handles {
let _ = handle.join();
}

Ok(())
}
Expand Down
11 changes: 1 addition & 10 deletions src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,15 +296,6 @@ async fn get_query(
.ok_or_else(|| ApiError::NotFound(format!("Query '{}' not found", query_id)))?;

let is_running = state.janus_api.is_running(&query_id);
let status = if is_running {
state
.janus_api
.get_query_status(&query_id)
.map(|s| format!("{:?}", s))
.unwrap_or_else(|| "Unknown".to_string())
} else {
"Registered".to_string()
};

Ok(Json(QueryDetailsResponse {
query_id: metadata.query_id,
Expand All @@ -313,7 +304,7 @@ async fn get_query(
registered_at: metadata.registered_at,
execution_count: metadata.execution_count,
is_running,
status,
status: metadata.status,
}))
}

Expand Down
16 changes: 16 additions & 0 deletions src/registry/query_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub struct QueryMetadata {
pub baseline_mode: BaselineBootstrapMode,
pub registered_at: u64,
pub execution_count: u64,
pub status: String,
pub subscribers: Vec<QueryId>,
}

Expand Down Expand Up @@ -102,6 +103,7 @@ impl QueryRegistry {
baseline_mode,
registered_at: Self::current_timestamp(),
execution_count: 0,
status: "Registered".to_string(),
subscribers: Vec::new(),
};

Expand Down Expand Up @@ -145,6 +147,20 @@ impl QueryRegistry {
Ok(())
}

pub fn set_status(
&self,
query_id: &QueryId,
status: impl Into<String>,
) -> Result<(), QueryRegistryError> {
let mut queries = self.queries.write().unwrap();
let query = queries
.get_mut(query_id)
.ok_or_else(|| QueryRegistryError::QueryNotFound(query_id.clone()))?;

query.status = status.into();
Ok(())
}

/// To remove a query from the registry
pub fn unregister(&self, query_id: &QueryId) -> Result<QueryMetadata, QueryRegistryError> {
let mut queries = self.queries.write().unwrap();
Expand Down
3 changes: 2 additions & 1 deletion tests/http_server_integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ async fn test_stop_route_stops_running_query_and_delete_requires_stop() {
assert!(get_response.status().is_success());
let get_body: Value = get_response.json().await.expect("invalid get response");
assert_eq!(get_body["is_running"], false);
assert_eq!(get_body["status"], "Registered");
assert_eq!(get_body["status"], "Stopped");
assert_eq!(get_body["execution_count"], 1);

let delete_response = server
.client
Expand Down
47 changes: 47 additions & 0 deletions tests/janus_api_integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,53 @@ fn test_stop_query() {
);
}

#[test]
fn test_execution_count_and_status_update_across_lifecycle() {
let storage = Arc::new(
StreamingSegmentedStorage::new(StreamingConfig::default())
.expect("Failed to create storage"),
);
let parser = JanusQLParser::new().expect("Failed to create parser");
let registry = Arc::new(QueryRegistry::new());

let api = JanusApi::new(parser, Arc::clone(&registry), storage).expect("Failed to create API");

let janusql = r#"
PREFIX ex: <http://example.org/>
SELECT ?s
FROM NAMED WINDOW ex:w ON STREAM ex:stream1 [RANGE 1000 STEP 200]
WHERE { WINDOW ex:w { ?s ?p ?o } }
"#;

let metadata = api
.register_query("lifecycle_query".into(), janusql)
.expect("Failed to register query");
assert_eq!(metadata.execution_count, 0);
assert_eq!(metadata.status, "Registered");

let _handle = api.start_query(&"lifecycle_query".into()).expect("Failed to start query");

let after_start =
registry.get(&"lifecycle_query".into()).expect("query should exist after start");
assert_eq!(after_start.execution_count, 1);
assert_eq!(after_start.status, "Running");

api.stop_query(&"lifecycle_query".into()).expect("Failed to stop query");

let after_stop =
registry.get(&"lifecycle_query".into()).expect("query should exist after stop");
assert_eq!(after_stop.execution_count, 1);
assert_eq!(after_stop.status, "Stopped");

let _handle = api.start_query(&"lifecycle_query".into()).expect("Failed to restart query");

let after_restart = registry
.get(&"lifecycle_query".into())
.expect("query should exist after restart");
assert_eq!(after_restart.execution_count, 2);
assert_eq!(after_restart.status, "Running");
}

#[test]
fn test_multiple_queries_concurrent() {
let storage = create_test_storage_with_data().expect("Failed to create storage");
Expand Down
Loading