From d1417a005bcd48143ee2d6d72e2d62bf0d2b9c60 Mon Sep 17 00:00:00 2001 From: Kush Bisen Date: Wed, 8 Apr 2026 16:29:05 +0200 Subject: [PATCH] Evaluate live extension functions in Janus --- docs/live_extension_function_architecture.md | 57 ++++++++ src/stream/live_stream_processing.rs | 139 ++++++++++++++++++- tests/live_stream_integration_test.rs | 44 ++++++ 3 files changed, 236 insertions(+), 4 deletions(-) create mode 100644 docs/live_extension_function_architecture.md diff --git a/docs/live_extension_function_architecture.md b/docs/live_extension_function_architecture.md new file mode 100644 index 0000000..933744c --- /dev/null +++ b/docs/live_extension_function_architecture.md @@ -0,0 +1,57 @@ +# Live Extension Function Architecture + +This document describes how Janus executes Janus-specific extension functions for live queries +without modifying the upstream `rsp-rs` crate. + +## Flow + +```mermaid +flowchart TD + A["JanusQL query"] --> B["JanusQLParser"] + B --> C["Historical windows + SPARQL"] + B --> D["Live windows + RSP-QL"] + + D --> E["LiveStreamProcessing"] + E --> F["rsp-rs RSPEngine::initialize()"] + F --> G["rsp-rs stream registry"] + F --> H["rsp-rs CSPARQL windows"] + + I["MQTT / live RDF events"] --> G + G --> H + + H --> J["Janus window subscriptions"] + J --> K["Merge emitted window content with sibling windows"] + K --> L["Add mirrored static background quads"] + L --> M["Oxigraph Store"] + M --> N["build_evaluator()"] + N --> O["Oxigraph SPARQL execution"] + O --> P["Janus extension functions"] + P --> Q["BindingWithTimestamp / QueryResult"] + + C --> R["HistoricalExecutor"] + R --> S["OxigraphAdapter"] + S --> N +``` + +## Responsibilities + +- `rsp-rs` + - stream ingestion + - timestamp-driven window lifecycle + - window materialization + - window closure notifications + +- `Janus` + - JanusQL parsing + - historical execution + - live query orchestration + - Janus-specific custom function registration through `build_evaluator()` + - final SPARQL evaluation for both historical and live paths + +## Why this design + +- Keeps `rsp-rs` minimal and reusable. +- Avoids a Janus-specific fork or API expansion in `rsp-rs`. +- Lets Janus use the same extension-function mechanism on both historical and live queries. +- Intercepts at the materialized-window stage, so Janus does not re-evaluate already-produced live + bindings. Instead, it performs the final SPARQL evaluation itself once per emitted window. diff --git a/src/stream/live_stream_processing.rs b/src/stream/live_stream_processing.rs index 92f876a..a631201 100644 --- a/src/stream/live_stream_processing.rs +++ b/src/stream/live_stream_processing.rs @@ -4,10 +4,14 @@ //! It integrates RSP-QL query execution with Janus's RDFEvent data model. use crate::core::RDFEvent; +use crate::extensions::query_options::build_evaluator; use oxigraph::model::{GraphName, NamedNode, Quad, Term}; -use rsp_rs::{BindingWithTimestamp, RDFStream, RSPEngine}; -use std::collections::HashMap; +use oxigraph::sparql::QueryResults; +use oxigraph::store::Store; +use rsp_rs::{BindingWithTimestamp, RDFStream, RSPEngine, StreamType}; +use std::collections::{HashMap, HashSet}; use std::sync::mpsc::{Receiver, RecvError}; +use std::sync::{mpsc, Arc, Mutex}; /// Live stream processing engine for RSP-QL queries pub struct LiveStreamProcessing { @@ -17,6 +21,8 @@ pub struct LiveStreamProcessing { streams: HashMap, /// Result receiver for query results result_receiver: Option>, + /// Static quads mirrored in Janus for Janus-side live query evaluation. + static_data: Arc>>, /// Flag indicating if processing has started processing_started: bool, } @@ -81,6 +87,7 @@ impl LiveStreamProcessing { engine, streams: HashMap::new(), result_receiver: None, + static_data: Arc::new(Mutex::new(HashSet::new())), processing_started: false, }) } @@ -117,7 +124,7 @@ impl LiveStreamProcessing { return Err(LiveStreamProcessingError("Processing already started".to_string())); } - let receiver = self.engine.start_processing(); + let receiver = self.register_live_callbacks()?; self.result_receiver = Some(receiver); self.processing_started = true; @@ -265,7 +272,8 @@ impl LiveStreamProcessing { /// * `event` - RDFEvent representing static knowledge pub fn add_static_data(&mut self, event: RDFEvent) -> Result<(), LiveStreamProcessingError> { let quad = self.rdf_event_to_quad(&event)?; - self.engine.add_static_data(quad); + self.engine.add_static_data(quad.clone()); + self.static_data.lock().unwrap().insert(quad); Ok(()) } @@ -420,6 +428,129 @@ impl LiveStreamProcessing { Ok(Quad::new(subject, predicate, object, graph)) } + fn register_live_callbacks( + &self, + ) -> Result, LiveStreamProcessingError> { + let parsed_query = self.engine.parsed_query().clone(); + let sparql_query = Arc::new(parsed_query.sparql_query.clone()); + let (tx, rx) = mpsc::channel(); + + let mut windows = HashMap::new(); + for window_def in &parsed_query.s2r { + let window = self.engine.get_window(&window_def.window_name).ok_or_else(|| { + LiveStreamProcessingError(format!( + "Window '{}' not found in engine", + window_def.window_name + )) + })?; + windows.insert(window_def.window_name.clone(), window); + } + let windows = Arc::new(windows); + let static_data = Arc::clone(&self.static_data); + + for window_def in parsed_query.s2r { + let window_arc = windows.get(&window_def.window_name).cloned().ok_or_else(|| { + LiveStreamProcessingError(format!( + "Window '{}' not available for subscription", + window_def.window_name + )) + })?; + let tx_clone = tx.clone(); + let sparql_query = Arc::clone(&sparql_query); + let all_windows = Arc::clone(&windows); + let static_data = Arc::clone(&static_data); + let window_name = window_def.window_name.clone(); + let window_width = window_def.width; + + let mut window = window_arc.lock().unwrap(); + window.subscribe(StreamType::RStream, move |mut container| { + let timestamp = container.last_timestamp_changed; + + for (other_name, other_window_arc) in all_windows.iter() { + if other_name == &window_name { + continue; + } + if let Ok(other_window) = other_window_arc.lock() { + if let Some(other_container) = + other_window.get_content_from_window(timestamp) + { + for quad in &other_container.elements { + container.add(quad.clone(), timestamp); + } + } + } + } + + match Self::execute_live_query( + &container, + &sparql_query, + &static_data.lock().unwrap(), + ) { + Ok(bindings) => { + for binding in bindings { + let result = BindingWithTimestamp { + bindings: binding, + timestamp_from: timestamp, + timestamp_to: timestamp + window_width, + }; + let _ = tx_clone.send(result); + } + } + Err(err) => { + eprintln!("Live Janus evaluation error: {}", err); + } + } + }); + } + + Ok(rx) + } + + fn execute_live_query( + container: &rsp_rs::QuadContainer, + query: &str, + static_data: &HashSet, + ) -> Result, LiveStreamProcessingError> { + let store = Store::new() + .map_err(|e| LiveStreamProcessingError(format!("Failed to create store: {}", e)))?; + + for quad in &container.elements { + store.insert(quad).map_err(|e| { + LiveStreamProcessingError(format!("Failed to insert live quad into store: {}", e)) + })?; + } + for quad in static_data { + store.insert(quad).map_err(|e| { + LiveStreamProcessingError(format!( + "Failed to insert static quad into live store: {}", + e + )) + })?; + } + + let parsed_query = build_evaluator().parse_query(query).map_err(|e| { + LiveStreamProcessingError(format!("Failed to parse live SPARQL: {}", e)) + })?; + let results = parsed_query.on_store(&store).execute().map_err(|e| { + LiveStreamProcessingError(format!("Failed to execute live SPARQL: {}", e)) + })?; + + let mut bindings = Vec::new(); + if let QueryResults::Solutions(solutions) = results { + for solution in solutions { + let solution = solution.map_err(|e| { + LiveStreamProcessingError(format!( + "Failed to evaluate live solution binding: {}", + e + )) + })?; + bindings.push(format!("{:?}", solution)); + } + } + + Ok(bindings) + } + /// Returns the list of registered stream URIs pub fn get_registered_streams(&self) -> Vec { self.streams.keys().cloned().collect() diff --git a/tests/live_stream_integration_test.rs b/tests/live_stream_integration_test.rs index dec4e43..1cf759d 100644 --- a/tests/live_stream_integration_test.rs +++ b/tests/live_stream_integration_test.rs @@ -272,6 +272,50 @@ fn test_literal_and_uri_objects() { assert!(!results.is_empty()); } +#[test] +fn test_live_query_with_janus_extension_function() { + let query = r#" + PREFIX ex: + PREFIX janus: + REGISTER RStream AS + SELECT ?sensor ?reading + FROM NAMED WINDOW ex:w1 ON STREAM ex:stream1 [RANGE 1000 STEP 500] + WHERE { + WINDOW ex:w1 { + ?sensor ex:hasReading ?reading . + FILTER(janus:absolute_threshold_exceeded(?reading, "25", "2")) + } + } + "#; + + let mut processor = LiveStreamProcessing::new(query.to_string()).unwrap(); + processor.register_stream("http://example.org/stream1").unwrap(); + processor.start_processing().unwrap(); + + processor + .add_event( + "http://example.org/stream1", + RDFEvent::new( + 0, + "http://example.org/sensor-pass", + "http://example.org/hasReading", + "30", + "", + ), + ) + .unwrap(); + + processor.close_stream("http://example.org/stream1", 3000).unwrap(); + thread::sleep(Duration::from_millis(500)); + + let results = processor.collect_results(None).unwrap(); + assert!( + results.iter().any(|result| result.bindings.contains("sensor-pass")), + "Expected at least one live result to pass the Janus extension-function filter, got {:?}", + results + ); +} + #[test] fn test_rapid_event_stream() { let query = r#"