Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
pub mod server;

pub use server::{
create_server, start_server, AppState, ErrorResponse, ListQueriesResponse,
QueryDetailsResponse, RegisterQueryRequest, RegisterQueryResponse, ReplayStatusResponse,
StartReplayRequest, SuccessResponse,
create_server, create_server_with_state, start_server, AppState, ErrorResponse,
ListQueriesResponse, QueryDetailsResponse, QueryResultBroadcast, RegisterQueryRequest,
RegisterQueryResponse, ReplayStatusResponse, StartReplayRequest, SuccessResponse,
};
146 changes: 97 additions & 49 deletions src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

use crate::{
api::janus_api::{JanusApi, JanusApiError, QueryHandle, QueryResult, ResultSource},
parsing::janusql_parser::JanusQLParser,
parsing::rdf_parser,
registry::query_registry::{QueryId, QueryRegistry},
storage::segmented_storage::StreamingSegmentedStorage,
stream_bus::{BrokerType, MqttConfig, StreamBus, StreamBusConfig},
Expand All @@ -30,9 +28,11 @@ use std::{
},
time::Instant,
};
use tokio::sync::mpsc;
use tokio::sync::broadcast;
use tower_http::cors::{Any, CorsLayer};

const RESULT_BROADCAST_CAPACITY: usize = 1024;

/// Request to register a new query
#[derive(Debug, Deserialize)]
pub struct RegisterQueryRequest {
Expand Down Expand Up @@ -139,7 +139,12 @@ pub struct AppState {
pub registry: Arc<QueryRegistry>,
pub storage: Arc<StreamingSegmentedStorage>,
pub replay_state: Arc<Mutex<ReplayState>>,
pub query_handles: Arc<Mutex<HashMap<QueryId, Arc<Mutex<QueryHandle>>>>>,
pub query_streams: Arc<Mutex<HashMap<QueryId, QueryResultBroadcast>>>,
}

#[derive(Clone)]
pub struct QueryResultBroadcast {
pub sender: broadcast::Sender<QueryResult>,
}

pub struct ReplayState {
Expand Down Expand Up @@ -204,30 +209,42 @@ pub fn create_server(
registry: Arc<QueryRegistry>,
storage: Arc<StreamingSegmentedStorage>,
) -> Router {
create_server_with_state(janus_api, registry, storage).0
}

/// Create the HTTP server and return the shared state for testing/integration.
pub fn create_server_with_state(
janus_api: Arc<JanusApi>,
registry: Arc<QueryRegistry>,
storage: Arc<StreamingSegmentedStorage>,
) -> (Router, Arc<AppState>) {
let state = Arc::new(AppState {
janus_api,
registry,
storage,
replay_state: Arc::new(Mutex::new(ReplayState::default())),
query_handles: Arc::new(Mutex::new(HashMap::new())),
query_streams: Arc::new(Mutex::new(HashMap::new())),
});

// Configure CORS
let cors = CorsLayer::new().allow_origin(Any).allow_methods(Any).allow_headers(Any);

Router::new()
let router = Router::new()
.route("/api/queries", post(register_query))
.route("/api/queries", get(list_queries))
.route("/api/queries/:id", get(get_query))
.route("/api/queries/:id", delete(stop_query))
.route("/api/queries/:id", delete(delete_query))
.route("/api/queries/:id/start", post(start_query))
.route("/api/queries/:id/stop", post(stop_query))
.route("/api/queries/:id/results", get(stream_results))
.route("/api/replay/start", post(start_replay))
.route("/api/replay/stop", post(stop_replay))
.route("/api/replay/status", get(replay_status))
.route("/health", get(health_check))
.layer(cors)
.with_state(state)
.with_state(Arc::clone(&state));

(router, state)
}

/// Health check endpoint
Expand Down Expand Up @@ -297,34 +314,59 @@ async fn start_query(
Path(query_id): Path<String>,
) -> Result<Json<SuccessResponse>, ApiError> {
let handle = state.janus_api.start_query(&query_id)?;
let (sender, _) = broadcast::channel(RESULT_BROADCAST_CAPACITY);
let sender_for_forwarder = sender.clone();

std::thread::spawn(move || forward_query_results(handle, sender_for_forwarder));

// Store the handle for WebSocket streaming
state
.query_handles
.query_streams
.lock()
.unwrap()
.insert(query_id.clone(), Arc::new(Mutex::new(handle)));
.insert(query_id.clone(), QueryResultBroadcast { sender });

Ok(Json(SuccessResponse {
message: format!("Query '{}' started successfully", query_id),
}))
}

/// DELETE /api/queries/:id - Stop a running query
/// POST /api/queries/:id/stop - Stop a running query
async fn stop_query(
State(state): State<Arc<AppState>>,
Path(query_id): Path<String>,
) -> Result<Json<SuccessResponse>, ApiError> {
state.janus_api.stop_query(&query_id)?;

// Remove the handle
state.query_handles.lock().unwrap().remove(&query_id);
state.query_streams.lock().unwrap().remove(&query_id);

Ok(Json(SuccessResponse {
message: format!("Query '{}' stopped successfully", query_id),
}))
}

/// DELETE /api/queries/:id - Unregister a query from the registry.
async fn delete_query(
State(state): State<Arc<AppState>>,
Path(query_id): Path<String>,
) -> Result<Json<SuccessResponse>, ApiError> {
if state.janus_api.is_running(&query_id) {
return Err(ApiError::BadRequest(format!(
"Query '{}' is running. Stop it before deleting.",
query_id
)));
}

state
.registry
.unregister(&query_id)
.map_err(|e| ApiError::NotFound(e.to_string()))?;
state.query_streams.lock().unwrap().remove(&query_id);

Ok(Json(SuccessResponse {
message: format!("Query '{}' deleted successfully", query_id),
}))
}

/// WS /api/queries/:id/results - Stream query results via WebSocket
async fn stream_results(
ws: WebSocketUpgrade,
Expand All @@ -336,49 +378,54 @@ async fn stream_results(
return Err(ApiError::NotFound(format!("Query '{}' not found", query_id)));
}

Ok(ws.on_upgrade(move |socket| handle_websocket(socket, state, query_id)))
}

async fn handle_websocket(mut socket: WebSocket, state: Arc<AppState>, query_id: String) {
// Create a channel for results
let (tx, mut rx) = mpsc::unbounded_channel::<QueryResult>();

// Spawn a task to receive results from the query handle
let handles = state.query_handles.clone();
let query_id_clone = query_id.clone();
let sender = state
.query_streams
.lock()
.unwrap()
.get(&query_id)
.map(|stream| stream.sender.clone())
.ok_or_else(|| {
ApiError::BadRequest(format!(
"Query '{}' is not running. Start it before subscribing to results.",
query_id
))
})?;

tokio::spawn(async move {
loop {
// Try to get the query handle
let handle_opt = {
let handles_lock = handles.lock().unwrap();
handles_lock.get(&query_id_clone).cloned()
};
Ok(ws.on_upgrade(move |socket| handle_websocket(socket, sender.subscribe(), query_id)))
}

if let Some(handle_arc) = handle_opt {
let handle = handle_arc.lock().unwrap();
fn forward_query_results(handle: QueryHandle, sender: broadcast::Sender<QueryResult>) {
while let Some(result) = handle.receive() {
let _ = sender.send(result);
}
}

// Non-blocking receive
if let Some(result) = handle.try_receive() {
if tx.send(result).is_err() {
break;
}
async fn handle_websocket(
mut socket: WebSocket,
mut receiver: broadcast::Receiver<QueryResult>,
query_id: String,
) {
loop {
let result = match receiver.recv().await {
Ok(result) => result,
Err(broadcast::error::RecvError::Closed) => break,
Err(broadcast::error::RecvError::Lagged(skipped)) => {
let warning = serde_json::json!({
"query_id": query_id,
"type": "lagged",
"dropped_messages": skipped,
});
if socket.send(Message::Text(warning.to_string())).await.is_err() {
break;
}
} else {
// Query handle not found, wait a bit and retry
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
continue;
}
};

// Small delay to prevent busy waiting
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
});

// Send results to WebSocket
while let Some(result) = rx.recv().await {
let json_result = serde_json::json!({
"query_id": result.query_id,
"timestamp": result.timestamp,
"type": "result",
"source": match result.source {
ResultSource::Historical => "historical",
ResultSource::Live => "live",
Expand Down Expand Up @@ -567,7 +614,8 @@ pub async fn start_server(
println!(" GET /api/queries - List all registered queries");
println!(" GET /api/queries/:id - Get query details");
println!(" POST /api/queries/:id/start - Start executing a query");
println!(" DELETE /api/queries/:id - Stop a running query");
println!(" POST /api/queries/:id/stop - Stop a running query");
println!(" DELETE /api/queries/:id - Delete a stopped query");
println!(" WS /api/queries/:id/results - Stream query results (WebSocket)");
println!(" POST /api/replay/start - Start stream bus replay");
println!(" POST /api/replay/stop - Stop stream bus replay");
Expand Down
Loading
Loading