Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions docs/live_extension_function_architecture.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Live Extension Function Architecture

This document describes how Janus executes Janus-specific extension functions for live queries
without modifying the upstream `rsp-rs` crate.

## Flow

```mermaid
flowchart TD
A["JanusQL query"] --> B["JanusQLParser"]
B --> C["Historical windows + SPARQL"]
B --> D["Live windows + RSP-QL"]

D --> E["LiveStreamProcessing"]
E --> F["rsp-rs RSPEngine::initialize()"]
F --> G["rsp-rs stream registry"]
F --> H["rsp-rs CSPARQL windows"]

I["MQTT / live RDF events"] --> G
G --> H

H --> J["Janus window subscriptions"]
J --> K["Merge emitted window content with sibling windows"]
K --> L["Add mirrored static background quads"]
L --> M["Oxigraph Store"]
M --> N["build_evaluator()"]
N --> O["Oxigraph SPARQL execution"]
O --> P["Janus extension functions"]
P --> Q["BindingWithTimestamp / QueryResult"]

C --> R["HistoricalExecutor"]
R --> S["OxigraphAdapter"]
S --> N
```

## Responsibilities

- `rsp-rs`
- stream ingestion
- timestamp-driven window lifecycle
- window materialization
- window closure notifications

- `Janus`
- JanusQL parsing
- historical execution
- live query orchestration
- Janus-specific custom function registration through `build_evaluator()`
- final SPARQL evaluation for both historical and live paths

## Why this design

- Keeps `rsp-rs` minimal and reusable.
- Avoids a Janus-specific fork or API expansion in `rsp-rs`.
- Lets Janus use the same extension-function mechanism on both historical and live queries.
- Intercepts at the materialized-window stage, so Janus does not re-evaluate already-produced live
bindings. Instead, it performs the final SPARQL evaluation itself once per emitted window.
139 changes: 135 additions & 4 deletions src/stream/live_stream_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
//! It integrates RSP-QL query execution with Janus's RDFEvent data model.

use crate::core::RDFEvent;
use crate::extensions::query_options::build_evaluator;
use oxigraph::model::{GraphName, NamedNode, Quad, Term};
use rsp_rs::{BindingWithTimestamp, RDFStream, RSPEngine};
use std::collections::HashMap;
use oxigraph::sparql::QueryResults;
use oxigraph::store::Store;
use rsp_rs::{BindingWithTimestamp, RDFStream, RSPEngine, StreamType};
use std::collections::{HashMap, HashSet};
use std::sync::mpsc::{Receiver, RecvError};
use std::sync::{mpsc, Arc, Mutex};

/// Live stream processing engine for RSP-QL queries
pub struct LiveStreamProcessing {
Expand All @@ -17,6 +21,8 @@ pub struct LiveStreamProcessing {
streams: HashMap<String, RDFStream>,
/// Result receiver for query results
result_receiver: Option<Receiver<BindingWithTimestamp>>,
/// Static quads mirrored in Janus for Janus-side live query evaluation.
static_data: Arc<Mutex<HashSet<Quad>>>,
/// Flag indicating if processing has started
processing_started: bool,
}
Expand Down Expand Up @@ -81,6 +87,7 @@ impl LiveStreamProcessing {
engine,
streams: HashMap::new(),
result_receiver: None,
static_data: Arc::new(Mutex::new(HashSet::new())),
processing_started: false,
})
}
Expand Down Expand Up @@ -117,7 +124,7 @@ impl LiveStreamProcessing {
return Err(LiveStreamProcessingError("Processing already started".to_string()));
}

let receiver = self.engine.start_processing();
let receiver = self.register_live_callbacks()?;
self.result_receiver = Some(receiver);
self.processing_started = true;

Expand Down Expand Up @@ -265,7 +272,8 @@ impl LiveStreamProcessing {
/// * `event` - RDFEvent representing static knowledge
pub fn add_static_data(&mut self, event: RDFEvent) -> Result<(), LiveStreamProcessingError> {
let quad = self.rdf_event_to_quad(&event)?;
self.engine.add_static_data(quad);
self.engine.add_static_data(quad.clone());
self.static_data.lock().unwrap().insert(quad);
Comment on lines +275 to +276
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add_static_data uses self.static_data.lock().unwrap(), which will panic if the mutex is poisoned. Since this is part of the public API surface, it would be better to convert lock poisoning into a LiveStreamProcessingError (or similar) instead of panicking.

Suggested change
self.engine.add_static_data(quad.clone());
self.static_data.lock().unwrap().insert(quad);
let mut static_data = self.static_data.lock().map_err(|_| {
LiveStreamProcessingError(
"Static data store is unavailable due to a poisoned mutex".to_string(),
)
})?;
static_data.insert(quad.clone());
self.engine.add_static_data(quad);

Copilot uses AI. Check for mistakes.
Ok(())
}

Expand Down Expand Up @@ -420,6 +428,129 @@ impl LiveStreamProcessing {
Ok(Quad::new(subject, predicate, object, graph))
}

fn register_live_callbacks(
&self,
) -> Result<Receiver<BindingWithTimestamp>, LiveStreamProcessingError> {
let parsed_query = self.engine.parsed_query().clone();
let sparql_query = Arc::new(parsed_query.sparql_query.clone());
let (tx, rx) = mpsc::channel();

let mut windows = HashMap::new();
for window_def in &parsed_query.s2r {
let window = self.engine.get_window(&window_def.window_name).ok_or_else(|| {
LiveStreamProcessingError(format!(
"Window '{}' not found in engine",
window_def.window_name
))
})?;
windows.insert(window_def.window_name.clone(), window);
}
let windows = Arc::new(windows);
let static_data = Arc::clone(&self.static_data);

for window_def in parsed_query.s2r {
let window_arc = windows.get(&window_def.window_name).cloned().ok_or_else(|| {
LiveStreamProcessingError(format!(
"Window '{}' not available for subscription",
window_def.window_name
))
})?;
let tx_clone = tx.clone();
let sparql_query = Arc::clone(&sparql_query);
let all_windows = Arc::clone(&windows);
let static_data = Arc::clone(&static_data);
let window_name = window_def.window_name.clone();
let window_width = window_def.width;

let mut window = window_arc.lock().unwrap();
window.subscribe(StreamType::RStream, move |mut container| {
let timestamp = container.last_timestamp_changed;

for (other_name, other_window_arc) in all_windows.iter() {
if other_name == &window_name {
continue;
}
if let Ok(other_window) = other_window_arc.lock() {
if let Some(other_container) =
other_window.get_content_from_window(timestamp)
{
for quad in &other_container.elements {
container.add(quad.clone(), timestamp);
}
Comment on lines +473 to +479
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inside the window subscription callback, this code locks sibling windows (other_window_arc.lock()) while executing a callback for the current window. If rsp-rs invokes callbacks concurrently for different windows, acquiring multiple window locks in iteration order can deadlock (e.g., w1 callback waits on w2 while w2 waits on w1). Consider avoiding cross-window locking in the callback (snapshot contents with try_lock, enforce a global lock ordering, or have rsp-rs emit merged window content upstream).

Suggested change
if let Ok(other_window) = other_window_arc.lock() {
if let Some(other_container) =
other_window.get_content_from_window(timestamp)
{
for quad in &other_container.elements {
container.add(quad.clone(), timestamp);
}
// Avoid blocking on sibling window locks while running a
// callback for the current window. This prevents
// cross-window lock cycles if rsp-rs invokes callbacks
// concurrently for different windows.
let sibling_elements = if let Ok(other_window) = other_window_arc.try_lock() {
other_window
.get_content_from_window(timestamp)
.map(|other_container| other_container.elements.clone())
} else {
None
};
if let Some(elements) = sibling_elements {
for quad in elements {
container.add(quad, timestamp);

Copilot uses AI. Check for mistakes.
}
}
}

match Self::execute_live_query(
&container,
&sparql_query,
&static_data.lock().unwrap(),
) {
Comment on lines +484 to +488
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static_data.lock().unwrap() is held across execute_live_query(...), which builds an Oxigraph store and runs SPARQL evaluation. Holding the mutex during this potentially expensive work can block add_static_data() and any other callback threads, and unwrap() will panic on a poisoned lock. Prefer cloning/snapshotting the static quads under a short lock (and returning an error on poison) before executing the query.

Suggested change
match Self::execute_live_query(
&container,
&sparql_query,
&static_data.lock().unwrap(),
) {
let static_snapshot = match static_data.lock() {
Ok(static_quads) => static_quads.clone(),
Err(err) => {
eprintln!("Live Janus static data lock error: {}", err);
return;
}
};
match Self::execute_live_query(&container, &sparql_query, &static_snapshot) {

Copilot uses AI. Check for mistakes.
Ok(bindings) => {
for binding in bindings {
let result = BindingWithTimestamp {
bindings: binding,
timestamp_from: timestamp,
timestamp_to: timestamp + window_width,
};
let _ = tx_clone.send(result);
}
}
Err(err) => {
eprintln!("Live Janus evaluation error: {}", err);
}
}
});
}

Ok(rx)
}

fn execute_live_query(
container: &rsp_rs::QuadContainer,
query: &str,
static_data: &HashSet<Quad>,
) -> Result<Vec<String>, LiveStreamProcessingError> {
let store = Store::new()
.map_err(|e| LiveStreamProcessingError(format!("Failed to create store: {}", e)))?;

for quad in &container.elements {
store.insert(quad).map_err(|e| {
LiveStreamProcessingError(format!("Failed to insert live quad into store: {}", e))
})?;
}
for quad in static_data {
store.insert(quad).map_err(|e| {
LiveStreamProcessingError(format!(
"Failed to insert static quad into live store: {}",
e
))
})?;
}

let parsed_query = build_evaluator().parse_query(query).map_err(|e| {
LiveStreamProcessingError(format!("Failed to parse live SPARQL: {}", e))
})?;
let results = parsed_query.on_store(&store).execute().map_err(|e| {
LiveStreamProcessingError(format!("Failed to execute live SPARQL: {}", e))
})?;
Comment on lines +514 to +536
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

execute_live_query creates a new Store, re-inserts all live/static quads, rebuilds the evaluator, and re-parses the same SPARQL query on every window emission. This is likely to be a major CPU/alloc hot spot for live workloads. Consider caching the parsed query/evaluator once (e.g., build in register_live_callbacks and share via Arc), and reusing a store/dataset or using a more incremental evaluation strategy.

Copilot uses AI. Check for mistakes.

let mut bindings = Vec::new();
if let QueryResults::Solutions(solutions) = results {
for solution in solutions {
let solution = solution.map_err(|e| {
LiveStreamProcessingError(format!(
"Failed to evaluate live solution binding: {}",
e
))
})?;
bindings.push(format!("{:?}", solution));
}
}

Ok(bindings)
}

/// Returns the list of registered stream URIs
pub fn get_registered_streams(&self) -> Vec<String> {
self.streams.keys().cloned().collect()
Expand Down
44 changes: 44 additions & 0 deletions tests/live_stream_integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,50 @@ fn test_literal_and_uri_objects() {
assert!(!results.is_empty());
}

#[test]
fn test_live_query_with_janus_extension_function() {
let query = r#"
PREFIX ex: <http://example.org/>
PREFIX janus: <https://janus.rs/fn#>
REGISTER RStream <output> AS
SELECT ?sensor ?reading
FROM NAMED WINDOW ex:w1 ON STREAM ex:stream1 [RANGE 1000 STEP 500]
WHERE {
WINDOW ex:w1 {
?sensor ex:hasReading ?reading .
FILTER(janus:absolute_threshold_exceeded(?reading, "25", "2"))
}
}
"#;

let mut processor = LiveStreamProcessing::new(query.to_string()).unwrap();
processor.register_stream("http://example.org/stream1").unwrap();
processor.start_processing().unwrap();

processor
.add_event(
"http://example.org/stream1",
RDFEvent::new(
0,
"http://example.org/sensor-pass",
"http://example.org/hasReading",
"30",
"",
),
)
.unwrap();

processor.close_stream("http://example.org/stream1", 3000).unwrap();
thread::sleep(Duration::from_millis(500));

let results = processor.collect_results(None).unwrap();
assert!(
results.iter().any(|result| result.bindings.contains("sensor-pass")),
"Expected at least one live result to pass the Janus extension-function filter, got {:?}",
results
);
Comment on lines +308 to +316
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test relies on thread::sleep(Duration::from_millis(500)) to wait for asynchronous window closure/results, which can be flaky under load or on slower CI machines. Prefer a bounded poll loop (deadline + try_receive_result()/collect_results(Some(..))) that waits until at least one expected result arrives (or times out with a clear assertion).

Copilot uses AI. Check for mistakes.
}

#[test]
fn test_rapid_event_stream() {
let query = r#"
Expand Down
Loading