From 60ab811a0541dd5e4a27b57b5afa6b885b240720 Mon Sep 17 00:00:00 2001 From: Kush Bisen Date: Thu, 9 Apr 2026 11:42:09 +0200 Subject: [PATCH] Harden HTTP query lifecycle and result streaming --- src/http/mod.rs | 6 +- src/http/server.rs | 146 ++++++++---- tests/http_server_integration_test.rs | 331 ++++++++++++++++++++++++++ 3 files changed, 431 insertions(+), 52 deletions(-) create mode 100644 tests/http_server_integration_test.rs diff --git a/src/http/mod.rs b/src/http/mod.rs index cd9a938..91989d9 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -8,7 +8,7 @@ pub mod server; pub use server::{ - create_server, start_server, AppState, ErrorResponse, ListQueriesResponse, - QueryDetailsResponse, RegisterQueryRequest, RegisterQueryResponse, ReplayStatusResponse, - StartReplayRequest, SuccessResponse, + create_server, create_server_with_state, start_server, AppState, ErrorResponse, + ListQueriesResponse, QueryDetailsResponse, QueryResultBroadcast, RegisterQueryRequest, + RegisterQueryResponse, ReplayStatusResponse, StartReplayRequest, SuccessResponse, }; diff --git a/src/http/server.rs b/src/http/server.rs index 62c83d7..bd8d0bc 100644 --- a/src/http/server.rs +++ b/src/http/server.rs @@ -5,8 +5,6 @@ use crate::{ api::janus_api::{JanusApi, JanusApiError, QueryHandle, QueryResult, ResultSource}, - parsing::janusql_parser::JanusQLParser, - parsing::rdf_parser, registry::query_registry::{QueryId, QueryRegistry}, storage::segmented_storage::StreamingSegmentedStorage, stream_bus::{BrokerType, MqttConfig, StreamBus, StreamBusConfig}, @@ -30,9 +28,11 @@ use std::{ }, time::Instant, }; -use tokio::sync::mpsc; +use tokio::sync::broadcast; use tower_http::cors::{Any, CorsLayer}; +const RESULT_BROADCAST_CAPACITY: usize = 1024; + /// Request to register a new query #[derive(Debug, Deserialize)] pub struct RegisterQueryRequest { @@ -139,7 +139,12 @@ pub struct AppState { pub registry: Arc, pub storage: Arc, pub replay_state: Arc>, - pub query_handles: Arc>>>>, + pub query_streams: Arc>>, +} + +#[derive(Clone)] +pub struct QueryResultBroadcast { + pub sender: broadcast::Sender, } pub struct ReplayState { @@ -204,30 +209,42 @@ pub fn create_server( registry: Arc, storage: Arc, ) -> Router { + create_server_with_state(janus_api, registry, storage).0 +} + +/// Create the HTTP server and return the shared state for testing/integration. +pub fn create_server_with_state( + janus_api: Arc, + registry: Arc, + storage: Arc, +) -> (Router, Arc) { let state = Arc::new(AppState { janus_api, registry, storage, replay_state: Arc::new(Mutex::new(ReplayState::default())), - query_handles: Arc::new(Mutex::new(HashMap::new())), + query_streams: Arc::new(Mutex::new(HashMap::new())), }); // Configure CORS let cors = CorsLayer::new().allow_origin(Any).allow_methods(Any).allow_headers(Any); - Router::new() + let router = Router::new() .route("/api/queries", post(register_query)) .route("/api/queries", get(list_queries)) .route("/api/queries/:id", get(get_query)) - .route("/api/queries/:id", delete(stop_query)) + .route("/api/queries/:id", delete(delete_query)) .route("/api/queries/:id/start", post(start_query)) + .route("/api/queries/:id/stop", post(stop_query)) .route("/api/queries/:id/results", get(stream_results)) .route("/api/replay/start", post(start_replay)) .route("/api/replay/stop", post(stop_replay)) .route("/api/replay/status", get(replay_status)) .route("/health", get(health_check)) .layer(cors) - .with_state(state) + .with_state(Arc::clone(&state)); + + (router, state) } /// Health check endpoint @@ -297,34 +314,59 @@ async fn start_query( Path(query_id): Path, ) -> Result, ApiError> { let handle = state.janus_api.start_query(&query_id)?; + let (sender, _) = broadcast::channel(RESULT_BROADCAST_CAPACITY); + let sender_for_forwarder = sender.clone(); + + std::thread::spawn(move || forward_query_results(handle, sender_for_forwarder)); - // Store the handle for WebSocket streaming state - .query_handles + .query_streams .lock() .unwrap() - .insert(query_id.clone(), Arc::new(Mutex::new(handle))); + .insert(query_id.clone(), QueryResultBroadcast { sender }); Ok(Json(SuccessResponse { message: format!("Query '{}' started successfully", query_id), })) } -/// DELETE /api/queries/:id - Stop a running query +/// POST /api/queries/:id/stop - Stop a running query async fn stop_query( State(state): State>, Path(query_id): Path, ) -> Result, ApiError> { state.janus_api.stop_query(&query_id)?; - // Remove the handle - state.query_handles.lock().unwrap().remove(&query_id); + state.query_streams.lock().unwrap().remove(&query_id); Ok(Json(SuccessResponse { message: format!("Query '{}' stopped successfully", query_id), })) } +/// DELETE /api/queries/:id - Unregister a query from the registry. +async fn delete_query( + State(state): State>, + Path(query_id): Path, +) -> Result, ApiError> { + if state.janus_api.is_running(&query_id) { + return Err(ApiError::BadRequest(format!( + "Query '{}' is running. Stop it before deleting.", + query_id + ))); + } + + state + .registry + .unregister(&query_id) + .map_err(|e| ApiError::NotFound(e.to_string()))?; + state.query_streams.lock().unwrap().remove(&query_id); + + Ok(Json(SuccessResponse { + message: format!("Query '{}' deleted successfully", query_id), + })) +} + /// WS /api/queries/:id/results - Stream query results via WebSocket async fn stream_results( ws: WebSocketUpgrade, @@ -336,49 +378,54 @@ async fn stream_results( return Err(ApiError::NotFound(format!("Query '{}' not found", query_id))); } - Ok(ws.on_upgrade(move |socket| handle_websocket(socket, state, query_id))) -} - -async fn handle_websocket(mut socket: WebSocket, state: Arc, query_id: String) { - // Create a channel for results - let (tx, mut rx) = mpsc::unbounded_channel::(); - - // Spawn a task to receive results from the query handle - let handles = state.query_handles.clone(); - let query_id_clone = query_id.clone(); + let sender = state + .query_streams + .lock() + .unwrap() + .get(&query_id) + .map(|stream| stream.sender.clone()) + .ok_or_else(|| { + ApiError::BadRequest(format!( + "Query '{}' is not running. Start it before subscribing to results.", + query_id + )) + })?; - tokio::spawn(async move { - loop { - // Try to get the query handle - let handle_opt = { - let handles_lock = handles.lock().unwrap(); - handles_lock.get(&query_id_clone).cloned() - }; + Ok(ws.on_upgrade(move |socket| handle_websocket(socket, sender.subscribe(), query_id))) +} - if let Some(handle_arc) = handle_opt { - let handle = handle_arc.lock().unwrap(); +fn forward_query_results(handle: QueryHandle, sender: broadcast::Sender) { + while let Some(result) = handle.receive() { + let _ = sender.send(result); + } +} - // Non-blocking receive - if let Some(result) = handle.try_receive() { - if tx.send(result).is_err() { - break; - } +async fn handle_websocket( + mut socket: WebSocket, + mut receiver: broadcast::Receiver, + query_id: String, +) { + loop { + let result = match receiver.recv().await { + Ok(result) => result, + Err(broadcast::error::RecvError::Closed) => break, + Err(broadcast::error::RecvError::Lagged(skipped)) => { + let warning = serde_json::json!({ + "query_id": query_id, + "type": "lagged", + "dropped_messages": skipped, + }); + if socket.send(Message::Text(warning.to_string())).await.is_err() { + break; } - } else { - // Query handle not found, wait a bit and retry - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + continue; } + }; - // Small delay to prevent busy waiting - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - } - }); - - // Send results to WebSocket - while let Some(result) = rx.recv().await { let json_result = serde_json::json!({ "query_id": result.query_id, "timestamp": result.timestamp, + "type": "result", "source": match result.source { ResultSource::Historical => "historical", ResultSource::Live => "live", @@ -567,7 +614,8 @@ pub async fn start_server( println!(" GET /api/queries - List all registered queries"); println!(" GET /api/queries/:id - Get query details"); println!(" POST /api/queries/:id/start - Start executing a query"); - println!(" DELETE /api/queries/:id - Stop a running query"); + println!(" POST /api/queries/:id/stop - Stop a running query"); + println!(" DELETE /api/queries/:id - Delete a stopped query"); println!(" WS /api/queries/:id/results - Stream query results (WebSocket)"); println!(" POST /api/replay/start - Start stream bus replay"); println!(" POST /api/replay/stop - Stop stream bus replay"); diff --git a/tests/http_server_integration_test.rs b/tests/http_server_integration_test.rs new file mode 100644 index 0000000..c0fbb8a --- /dev/null +++ b/tests/http_server_integration_test.rs @@ -0,0 +1,331 @@ +use futures_util::StreamExt; +use janus::{ + api::janus_api::{JanusApi, QueryResult, ResultSource}, + http::server::{create_server_with_state, AppState, QueryResultBroadcast}, + parsing::janusql_parser::JanusQLParser, + registry::query_registry::QueryRegistry, + storage::{segmented_storage::StreamingSegmentedStorage, util::StreamingConfig}, +}; +use reqwest::Client; +use serde_json::{json, Value}; +use std::{collections::HashMap, sync::Arc}; +use tempfile::TempDir; +use tokio::{ + net::TcpListener, + sync::broadcast, + task::JoinHandle, + time::{sleep, Duration}, +}; +use tokio_tungstenite::{connect_async, tungstenite::Error as WsError}; + +struct TestServer { + base_url: String, + ws_base_url: String, + client: Client, + state: Arc, + _temp_dir: TempDir, + server_task: JoinHandle<()>, +} + +impl Drop for TestServer { + fn drop(&mut self) { + self.server_task.abort(); + } +} + +fn historical_query(query_id: &str) -> Value { + json!({ + "query_id": query_id, + "janusql": r#" + PREFIX ex: + + SELECT ?sensor ?temp + + FROM NAMED WINDOW ex:hist ON LOG ex:historicalAccl [START 1000 END 2000] + + WHERE { + WINDOW ex:hist { ?sensor ex:temperature ?temp } + } + "# + }) +} + +async fn spawn_test_server() -> TestServer { + let temp_dir = TempDir::new().expect("failed to create temp dir"); + let mut storage = StreamingSegmentedStorage::new(StreamingConfig { + segment_base_path: temp_dir.path().to_string_lossy().into_owned(), + max_batch_events: 10, + max_batch_age_seconds: 60, + max_batch_bytes: 1024 * 1024, + sparse_interval: 10, + entries_per_index_block: 100, + }) + .expect("failed to create storage"); + storage.start_background_flushing(); + storage + .write_rdf( + 1_000, + "http://example.org/sensor1", + "http://example.org/temperature", + "21", + "http://example.org/sensors", + ) + .expect("failed to write rdf"); + storage.flush().expect("failed to flush storage"); + + let storage = Arc::new(storage); + let registry = Arc::new(QueryRegistry::new()); + let janus_api = Arc::new( + JanusApi::new( + JanusQLParser::new().expect("failed to create parser"), + Arc::clone(®istry), + Arc::clone(&storage), + ) + .expect("failed to create api"), + ); + + let (app, state) = create_server_with_state(janus_api, registry, storage); + let listener = TcpListener::bind("127.0.0.1:0").await.expect("failed to bind listener"); + let addr = listener.local_addr().expect("failed to read local addr"); + let server_task = tokio::spawn(async move { + axum::serve(listener, app).await.expect("test server crashed"); + }); + + sleep(Duration::from_millis(50)).await; + + TestServer { + base_url: format!("http://{}", addr), + ws_base_url: format!("ws://{}", addr), + client: Client::new(), + state, + _temp_dir: temp_dir, + server_task, + } +} + +#[tokio::test] +async fn test_health_endpoint() { + let server = spawn_test_server().await; + + let response = server + .client + .get(format!("{}/health", server.base_url)) + .send() + .await + .expect("health request failed"); + + assert!(response.status().is_success()); + let body: Value = response.json().await.expect("invalid health response"); + assert_eq!(body["message"], "Janus HTTP API is running"); +} + +#[tokio::test] +async fn test_query_lifecycle_register_list_get_delete() { + let server = spawn_test_server().await; + + let register_response = server + .client + .post(format!("{}/api/queries", server.base_url)) + .json(&historical_query("http_lifecycle")) + .send() + .await + .expect("register request failed"); + assert!(register_response.status().is_success()); + + let list_response = server + .client + .get(format!("{}/api/queries", server.base_url)) + .send() + .await + .expect("list request failed"); + assert!(list_response.status().is_success()); + let list_body: Value = list_response.json().await.expect("invalid list response"); + assert_eq!(list_body["total"], 1); + assert_eq!(list_body["queries"][0], "http_lifecycle"); + + let get_response = server + .client + .get(format!("{}/api/queries/http_lifecycle", server.base_url)) + .send() + .await + .expect("get request failed"); + assert!(get_response.status().is_success()); + let get_body: Value = get_response.json().await.expect("invalid query response"); + assert_eq!(get_body["query_id"], "http_lifecycle"); + assert_eq!(get_body["is_running"], false); + assert_eq!(get_body["status"], "Registered"); + + let delete_response = server + .client + .delete(format!("{}/api/queries/http_lifecycle", server.base_url)) + .send() + .await + .expect("delete request failed"); + assert!(delete_response.status().is_success()); + + let missing_response = server + .client + .get(format!("{}/api/queries/http_lifecycle", server.base_url)) + .send() + .await + .expect("missing get request failed"); + assert_eq!(missing_response.status(), reqwest::StatusCode::NOT_FOUND); +} + +#[tokio::test] +async fn test_stop_route_stops_running_query_and_delete_requires_stop() { + let server = spawn_test_server().await; + + let register_response = server + .client + .post(format!("{}/api/queries", server.base_url)) + .json(&historical_query("stop_delete")) + .send() + .await + .expect("register request failed"); + assert!(register_response.status().is_success()); + + let start_response = server + .client + .post(format!("{}/api/queries/stop_delete/start", server.base_url)) + .send() + .await + .expect("start request failed"); + assert!(start_response.status().is_success()); + + let delete_while_running = server + .client + .delete(format!("{}/api/queries/stop_delete", server.base_url)) + .send() + .await + .expect("delete running request failed"); + assert_eq!(delete_while_running.status(), reqwest::StatusCode::BAD_REQUEST); + let error_body: Value = delete_while_running.json().await.expect("invalid error response"); + assert!( + error_body["error"] + .as_str() + .expect("error should be a string") + .contains("Stop it before deleting"), + "unexpected delete error body: {error_body:?}" + ); + + let stop_response = server + .client + .post(format!("{}/api/queries/stop_delete/stop", server.base_url)) + .send() + .await + .expect("stop request failed"); + assert!(stop_response.status().is_success()); + + let get_response = server + .client + .get(format!("{}/api/queries/stop_delete", server.base_url)) + .send() + .await + .expect("get request failed"); + assert!(get_response.status().is_success()); + let get_body: Value = get_response.json().await.expect("invalid get response"); + assert_eq!(get_body["is_running"], false); + assert_eq!(get_body["status"], "Registered"); + + let delete_response = server + .client + .delete(format!("{}/api/queries/stop_delete", server.base_url)) + .send() + .await + .expect("delete request failed"); + assert!(delete_response.status().is_success()); +} + +#[tokio::test] +async fn test_results_websocket_requires_running_query() { + let server = spawn_test_server().await; + + let register_response = server + .client + .post(format!("{}/api/queries", server.base_url)) + .json(&historical_query("ws_not_started")) + .send() + .await + .expect("register request failed"); + assert!(register_response.status().is_success()); + + let ws_result = + connect_async(format!("{}/api/queries/ws_not_started/results", server.ws_base_url)).await; + + match ws_result { + Err(WsError::Http(response)) => { + assert_eq!(response.status(), axum::http::StatusCode::BAD_REQUEST); + } + other => panic!("expected websocket http error, got {other:?}"), + } +} + +#[tokio::test] +async fn test_results_websocket_broadcasts_to_multiple_subscribers() { + let server = spawn_test_server().await; + + let register_response = server + .client + .post(format!("{}/api/queries", server.base_url)) + .json(&historical_query("ws_broadcast")) + .send() + .await + .expect("register request failed"); + assert!(register_response.status().is_success()); + + let (sender, _) = broadcast::channel(16); + server + .state + .query_streams + .lock() + .unwrap() + .insert("ws_broadcast".to_string(), QueryResultBroadcast { sender: sender.clone() }); + + let (mut first_socket, _) = + connect_async(format!("{}/api/queries/ws_broadcast/results", server.ws_base_url)) + .await + .expect("first websocket should connect"); + let (mut second_socket, _) = + connect_async(format!("{}/api/queries/ws_broadcast/results", server.ws_base_url)) + .await + .expect("second websocket should connect"); + + let mut bindings = HashMap::new(); + bindings.insert("sensor".to_string(), "http://example.org/sensor1".to_string()); + bindings.insert("temp".to_string(), "21".to_string()); + + sender + .send(QueryResult { + query_id: "ws_broadcast".to_string(), + timestamp: 1_234, + source: ResultSource::Historical, + bindings: vec![bindings], + }) + .expect("send to subscribers should succeed"); + + let first_message = tokio::time::timeout(Duration::from_secs(2), first_socket.next()) + .await + .expect("timed out waiting for first subscriber") + .expect("first websocket closed unexpectedly") + .expect("first websocket message failed"); + let second_message = tokio::time::timeout(Duration::from_secs(2), second_socket.next()) + .await + .expect("timed out waiting for second subscriber") + .expect("second websocket closed unexpectedly") + .expect("second websocket message failed"); + + let first_body = parse_ws_json(first_message); + let second_body = parse_ws_json(second_message); + + assert_eq!(first_body["query_id"], "ws_broadcast"); + assert_eq!(first_body["type"], "result"); + assert_eq!(first_body["source"], "historical"); + assert_eq!(first_body["bindings"][0]["sensor"], "http://example.org/sensor1"); + assert_eq!(first_body, second_body); +} + +fn parse_ws_json(message: tokio_tungstenite::tungstenite::Message) -> Value { + let text = message.into_text().expect("websocket payload should be text"); + serde_json::from_str(&text).expect("websocket message should be valid json") +}