Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 33 additions & 45 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,52 +1,40 @@
# Rust build artifacts
target/
# Rust
/target/
Cargo.lock

# Debug symbols
**/*.rs.bk
*.pdb

# Backup files
*~
*.swp
*.swo
*.swn
# Test data and logs
test_data/
server.log
docker/mosquitto/log/
docker/mosquitto/data/
data/

# Python
*.pyc
__pycache__/
*.py[cod]
*$py.class

# Dashboard build artifacts
janus-dashboard/dist/
janus-dashboard/node_modules/
janus-dashboard/.vscode/

# macOS
.DS_Store
.AppleDouble
.LSOverride

# IDE and editor directories
.idea/
# Editor directories
.vscode/
*.iml
.zed/

# Environment files
.env
.env.local
.env.*.local

# Test coverage
*.profraw
*.profdata
coverage/
tarpaulin-report.html

# Documentation build
target/doc/

# Temporary files
tmp/
temp/

# OS specific
.DS_Store
Thumbs.db

# RDF Store data
fuseki-config/databases/

# Docker volumes
*.db
*.db-shm
*.db-wal
.idea/
*.swp
*.swo
*~

# Data for the Benchmarking
/data/
# Temporary debug/test files
debug_*.py
tests/reproduction_test.rs
tests/user_query_repro.rs
10 changes: 0 additions & 10 deletions .zed/settings.json

This file was deleted.

38 changes: 33 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@ readme = "README.md"
regex = "1.0"
serde = { version = "1.0", features = ["derive"] }
bincode = "1.0"
rsp-rs = "0.2.1"
rsp-rs = "0.3.5"
oxigraph = "0.5"
rumqttc = "0.25.1"
rumqttc = { version = "0.25.1", default-features = false }
serde_json = "1.0.145"

[target.'cfg(not(windows))'.dependencies]
rdkafka = "0.38.0"
tokio = { version = "1.48.0", features = ["full"] }
ctrlc = "3.5.1"
clap = { version = "4.5", features = ["derive"] }
axum = { version = "0.7", features = ["ws"] }
tower-http = { version = "0.5", features = ["cors", "trace"] }
tokio-tungstenite = "0.21"
reqwest = { version = "0.11", features = ["json"] }
futures-util = "0.3"

[lib]
name = "janus"
Expand All @@ -30,6 +35,29 @@ path = "src/lib.rs"
name = "janus"
path = "src/main.rs"

[[bin]]
name = "http_server"
path = "src/bin/http_server.rs"

[dev-dependencies]
criterion = { version = "0.5", features = ["html_reports"] }

[[bench]]
name = "storage_write"
harness = false

[[bench]]
name = "historical_fixed"
harness = false

[[bench]]
name = "historical_sliding"
harness = false

[[bench]]
name = "live_injection"
harness = false

[profile.release]
opt-level = 3
lto = true
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ check: fmt-check lint ## Run all checks (formatting and linting)

ci-check: ## Run full CI/CD checks locally before pushing
@echo "$(BLUE)Running CI/CD checks...$(NC)"
@./ci-check.sh
@./scripts/ci-check.sh

clean: ## Clean build artifacts
@echo "$(BLUE)Cleaning build artifacts...$(NC)"
Expand Down
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ Before pushing to the repository, run the CI/CD checks locally:
make ci-check

# Or use the script directly
./ci-check.sh
```
./scripts/ci-check.sh```

This will run:
- **rustfmt** - Code formatting check
Expand Down
75 changes: 75 additions & 0 deletions START_HERE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Janus HTTP API - START HERE

## Quick Start (30 seconds)

```bash
# 1. Setup (one time)
./scripts/test_setup.sh
# 2. Start MQTT
docker-compose up -d mosquitto

# 3. Start Server
cargo run --bin http_server

# 4. Open Dashboard
open examples/demo_dashboard.html
```

Then click: **Start Replay** → **Start Query**

## What This Does

1. **Start Replay**: Loads RDF data from `data/sensors.nq`, publishes to MQTT, stores locally
2. **Start Query**: Executes a JanusQL query, streams results via WebSocket to dashboard

## Documentation

- **QUICK_REFERENCE.md** - One-page cheat sheet
- **RUNTIME_FIX_SUMMARY.md** - How the runtime issue was fixed
- **COMPLETE_SOLUTION.md** - Full implementation details
- **SETUP_GUIDE.md** - Detailed setup instructions
- **README_HTTP_API.md** - Complete API documentation
- **FINAL_TEST.md** - Verification steps

## Key Points

✅ **No more runtime panics** - Fixed by spawning StreamBus in separate thread
✅ **Correct JanusQL syntax** - All examples updated to match parser
✅ **MQTT integration** - Full broker setup with Docker Compose
✅ **Two-button demo** - Interactive dashboard for easy testing
✅ **Production-ready** - Stable, tested, documented

⚠️ **Known limitation**: Replay metrics show status but not event counts (acceptable trade-off)

## Troubleshooting

```bash
# Server won't start (port in use)
lsof -ti:8080 | xargs kill -9

# MQTT not running
docker-compose up -d mosquitto

# Check if working
curl http://localhost:8080/health
```

## Success Indicators

When everything works correctly:
1. Server starts with clean output (no panics)
2. Dashboard shows "Connected to Janus HTTP API server"
3. Replay button → Status changes to "Running"
4. Query button → WebSocket connects, results appear
5. Results tagged as "historical" or "live"

## Need Help?

1. Read **QUICK_REFERENCE.md** for common commands
2. Check **FINAL_TEST.md** for verification steps
3. See **RUNTIME_FIX_SUMMARY.md** if you see panics
4. Review **SETUP_GUIDE.md** for detailed instructions

---

**Everything is ready. Just run the Quick Start commands above!** 🚀
79 changes: 79 additions & 0 deletions benches/historical_fixed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
use janus::{
execution::historical_executor::HistoricalExecutor,
parsing::janusql_parser::{WindowDefinition, WindowType},
querying::oxigraph_adapter::OxigraphAdapter,
storage::{segmented_storage::StreamingSegmentedStorage, util::StreamingConfig},
};
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use std::time::{SystemTime, UNIX_EPOCH};

static COUNTER: AtomicU64 = AtomicU64::new(0);

fn unique_config() -> StreamingConfig {
let id = COUNTER.fetch_add(1, Ordering::Relaxed);
let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
StreamingConfig {
segment_base_path: format!("/tmp/janus_bench_fixed_{}_{}", ts, id),
max_batch_events: 1_000_000,
max_batch_age_seconds: 3600,
max_batch_bytes: 1_000_000_000,
sparse_interval: 64,
entries_per_index_block: 256,
}
}

/// Write N events at timestamps [1000, 1000+N) into a fresh storage.
/// These land in the in-memory batch buffer — no flush needed before querying.
fn setup(n: usize) -> (Arc<StreamingSegmentedStorage>, WindowDefinition) {
let storage = StreamingSegmentedStorage::new(unique_config()).unwrap();
for i in 0..n as u64 {
storage
.write_rdf(
1_000 + i,
&format!("http://example.org/sensor{}", i % 5),
"http://saref.etsi.org/core/hasValue",
&format!("{}", 20 + (i % 10)),
"http://example.org/graph",
)
.unwrap();
}
let window = WindowDefinition {
window_name: "w".to_string(),
stream_name: "http://example.org/stream".to_string(),
width: n as u64,
slide: n as u64,
offset: None,
start: Some(1_000),
end: Some(1_000 + n as u64 - 1),
window_type: WindowType::HistoricalFixed,
};
(Arc::new(storage), window)
}

const SPARQL: &str = "SELECT ?s ?p ?o WHERE { ?s ?p ?o }";

fn historical_fixed(c: &mut Criterion) {
let mut group = c.benchmark_group("historical/fixed_window");

for &n in &[100usize, 1_000, 10_000] {
group.bench_with_input(BenchmarkId::new("events", n), &n, |b, &n| {
b.iter_batched(
|| setup(n),
|(storage, window)| {
let executor = HistoricalExecutor::new(storage, OxigraphAdapter::new());
black_box(executor.execute_fixed_window(&window, SPARQL).unwrap())
},
criterion::BatchSize::SmallInput,
);
});
}

group.finish();
}

criterion_group!(benches, historical_fixed);
criterion_main!(benches);
Loading
Loading