Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
940 changes: 690 additions & 250 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace]
members = ["crates/common", "crates/agent", "crates/studio", "crates/core", "crates/mcp", "crates/cli", "crates/databricks_sdk", "crates/apx"]
members = ["crates/common", "crates/agent", "crates/studio", "crates/core", "crates/mcp", "crates/cli", "crates/databricks_sdk", "crates/apx", "crates/db"]
resolver = "2"

[workspace.package]
Expand All @@ -15,6 +15,7 @@ apx-core = { path = "crates/core" }
apx-mcp = { path = "crates/mcp" }
apx-cli = { path = "crates/cli" }
apx-databricks-sdk = { path = "crates/databricks_sdk" }
apx-db = { path = "crates/db" }

# Serialization
serde = { version = "1.0.228", features = ["derive"] }
Expand All @@ -24,18 +25,19 @@ toml = "0.8.20"
toml_edit = "0.24.0"

# Database
rusqlite = { version = "0.38.0", features = ["bundled"] }
sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite"] }

# Async runtime
tokio = { version = "1.49", features = ["rt-multi-thread", "macros", "sync", "process", "io-util", "signal", "io-std", "net"] }
tokio-stream = "0.1.17"
tokio-util = { version = "0.7", features = ["io"] }
futures-util = "0.3.31"
rmcp = { version = "0.15", features = ["server", "transport-io", "schemars"] }

# Web server
axum = { version = "0.8.8", features = ["ws"] }
reqwest = { version = "0.13.1", features = ["blocking", "json", "stream"] }
tokio-tungstenite = { version = "0.28", features = ["connect", "handshake", "stream", "rustls-tls-webpki-roots"] }
reqwest = { version = "0.13.1", default-features = false, features = ["blocking", "json", "stream", "native-tls", "http2", "charset", "system-proxy"] }
tokio-tungstenite = { version = "0.28", features = ["connect", "handshake", "stream", "native-tls"] }

# OpenTelemetry
opentelemetry = "0.29"
Expand Down
1 change: 1 addition & 0 deletions crates/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ path = "src/main.rs"

[dependencies]
apx-common.workspace = true
apx-db.workspace = true
axum.workspace = true
tokio.workspace = true
serde.workspace = true
Expand Down
19 changes: 11 additions & 8 deletions crates/agent/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
//! This module implements an Axum HTTP server that receives OpenTelemetry logs
//! via OTLP HTTP protocol, supporting both JSON and Protobuf content types.

use apx_common::{FLUX_PORT, LogRecord, Storage};
use apx_common::{FLUX_PORT, LogRecord};
use apx_db::LogsDb;
use axum::{
Router,
body::Bytes,
Expand All @@ -21,7 +22,7 @@ use tracing::{debug, error, info};
/// Application state shared across handlers.
#[derive(Clone, Debug)]
struct AppState {
storage: Storage,
storage: LogsDb,
}

/// Run the flux server (entry point for `apx-agent`).
Expand All @@ -34,7 +35,9 @@ pub async fn run_server() -> Result<(), String> {
eprintln!("[{now}] Flux daemon starting...");

// Open storage
let storage = Storage::open()?;
let storage = LogsDb::open()
.await
.map_err(|e| format!("Storage error: {e}"))?;
eprintln!("[{now}] Storage initialized");

// Start cleanup scheduler as a background task
Expand All @@ -49,14 +52,14 @@ pub async fn run_server() -> Result<(), String> {

/// Periodic cleanup loop that runs within the daemon process.
/// Deletes logs older than 7 days every hour.
async fn run_cleanup_loop(storage: Storage) {
async fn run_cleanup_loop(storage: LogsDb) {
// Cleanup interval: 1 hour
let interval = Duration::from_secs(60 * 60);

info!("Cleanup scheduler started (interval: 1 hour, retention: 7 days)");

// Run initial cleanup
match storage.cleanup_old_logs() {
match storage.cleanup_old_logs().await {
Ok(deleted) if deleted > 0 => info!("Initial cleanup: removed {} old log records", deleted),
Ok(_) => debug!("Initial cleanup: no old records to remove"),
Err(e) => error!("Initial cleanup failed: {}", e),
Expand All @@ -65,7 +68,7 @@ async fn run_cleanup_loop(storage: Storage) {
loop {
tokio::time::sleep(interval).await;

match storage.cleanup_old_logs() {
match storage.cleanup_old_logs().await {
Ok(deleted) if deleted > 0 => {
info!("Cleanup: removed {} old log records", deleted);
}
Expand All @@ -80,7 +83,7 @@ async fn run_cleanup_loop(storage: Storage) {
}

/// Start the flux HTTP server with the given storage.
async fn run_http_server(storage: Storage) -> Result<(), String> {
async fn run_http_server(storage: LogsDb) -> Result<(), String> {
let state = AppState { storage };

let app = Router::new()
Expand Down Expand Up @@ -143,7 +146,7 @@ async fn handle_logs(

debug!("Received {} log records", records.len());

match state.storage.insert_logs(&records) {
match state.storage.insert_logs(&records).await {
Ok(count) => {
debug!("Stored {} log records", count);
StatusCode::OK
Expand Down
1 change: 1 addition & 0 deletions crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ rust-version.workspace = true
[dependencies]
apx-common = { path = "../common" , version = "0.3.0-rc1" }
apx-core = { path = "../core" , version = "0.3.0-rc1" }
apx-db.workspace = true
apx-databricks-sdk = { path = "../databricks_sdk" , version = "0.3.0-rc1" }
apx-mcp = { path = "../mcp" , version = "0.3.0-rc1" }
clap.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions crates/cli/src/__generate_openapi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ pub struct GenerateOpenapiArgs {
pub app_dir: PathBuf,
}

pub fn run(args: GenerateOpenapiArgs) -> i32 {
match generate_openapi(&args.app_dir) {
pub async fn run(args: GenerateOpenapiArgs) -> i32 {
match generate_openapi(&args.app_dir).await {
Ok(()) => {
println!("regenerated");
0
Expand Down
2 changes: 1 addition & 1 deletion crates/cli/src/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async fn run_inner(args: BuildArgs) -> Result<(), String> {
fs::write(build_dir.join(".gitignore"), "*\n")
.map_err(|err| format!("Failed to write build .gitignore: {err}"))?;

generate_openapi(&app_path)?;
generate_openapi(&app_path).await?;

if args.skip_ui_build {
println!("Skipping UI build");
Expand Down
25 changes: 14 additions & 11 deletions crates/cli/src/dev/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ use tracing::debug;

use crate::common::resolve_app_dir;
use crate::run_cli_async_helper;
use apx_common::{LogAggregator, Storage, db_path, should_skip_log};
use apx_common::{LogAggregator, should_skip_log};
use apx_core::dev::common::{lock_path, read_lock};
use apx_core::ops::logs::{
DEFAULT_LOG_DURATION, format_aggregated_record, format_log_record, parse_duration,
since_timestamp_nanos,
};
use apx_db::LogsDb;

#[derive(Args, Debug, Clone)]
pub struct LogsArgs {
Expand Down Expand Up @@ -59,15 +60,17 @@ async fn run_async(args: LogsArgs) -> Result<(), String> {
}

// Check if database exists
let db_path = db_path()?;
let db_path = apx_db::logs_db_path()?;
if !db_path.exists() {
println!("⚠️ No logs database found at {}\n", db_path.display());
println!("Logs will appear here once the dev server is started and produces output.");
return Ok(());
}

// Open storage
let storage = Storage::open().map_err(|e| format!("Failed to open logs database: {e}"))?;
let storage = LogsDb::open()
.await
.map_err(|e| format!("Failed to open logs database: {e}"))?;

let duration = parse_duration(&args.duration)?;
let since_ns = since_timestamp_nanos(duration);
Expand All @@ -76,13 +79,13 @@ async fn run_async(args: LogsArgs) -> Result<(), String> {
println!("📜 Streaming logs... (Ctrl+C to stop)\n");
follow_logs(&storage, &app_path_canonical, since_ns, &lock_path).await
} else {
read_logs(&storage, &app_path_canonical, since_ns)
read_logs(&storage, &app_path_canonical, since_ns).await
}
}

/// Read logs from database, filtered by app path and timestamp
fn read_logs(storage: &Storage, app_path: &str, since_ns: i64) -> Result<(), String> {
let records = storage.query_logs(Some(app_path), since_ns, None)?;
async fn read_logs(storage: &LogsDb, app_path: &str, since_ns: i64) -> Result<(), String> {
let records = storage.query_logs(Some(app_path), since_ns, None).await?;

let filtered: Vec<_> = records.iter().filter(|r| !should_skip_log(r)).collect();

Expand Down Expand Up @@ -118,18 +121,18 @@ fn read_logs(storage: &Storage, app_path: &str, since_ns: i64) -> Result<(), Str

/// Follow logs for new entries
async fn follow_logs(
storage: &Storage,
storage: &LogsDb,
app_path: &str,
since_ns: i64,
lock_path: &std::path::Path,
) -> Result<(), String> {
use chrono::Utc;

// First, read existing logs
read_logs(storage, app_path, since_ns)?;
read_logs(storage, app_path, since_ns).await?;

// Track last seen ID for incremental queries
let mut last_id = storage.get_latest_id()?;
let mut last_id = storage.get_latest_id().await?;

// Track if server was initially running
let server_was_running = lock_path.exists();
Expand All @@ -156,7 +159,7 @@ async fn follow_logs(
}

// Poll for new logs
let new_records = storage.query_logs_after_id(Some(app_path), last_id)?;
let new_records = storage.query_logs_after_id(Some(app_path), last_id).await?;

for record in &new_records {
if !should_skip_log(record) {
Expand All @@ -168,7 +171,7 @@ async fn follow_logs(
}

// Update last_id
if let Ok(new_id) = storage.get_latest_id()
if let Ok(new_id) = storage.get_latest_id().await
&& new_id > last_id
{
last_id = new_id;
Expand Down
68 changes: 36 additions & 32 deletions crates/cli/src/dev/mcp.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::run_cli_async_helper;
use apx_core::common::run_preflight_checks;
use apx_core::components::new_cache_state;
use apx_core::databricks_sdk_doc::fetch_latest_sdk_version;
use apx_core::interop::get_databricks_sdk_version;
use apx_mcp::server::{AppContext, IndexState, SdkIndexParams, build_server};
use apx_db::DevDb;
use apx_mcp::context::{AppContext, IndexState, SdkIndexParams};
use apx_mcp::server::run_server;
use clap::Args;
use std::sync::Arc;
use tokio::sync::{Mutex, broadcast};
Expand All @@ -12,11 +14,6 @@ pub struct McpArgs {}

pub async fn run(_args: McpArgs) -> i32 {
run_cli_async_helper(|| async {
let app_dir = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));

// Run preflight checks (installs deps if needed)
run_preflight_checks(&app_dir).await?;

// Create shutdown channel
let (shutdown_tx, _) = broadcast::channel::<()>(1);

Expand All @@ -27,18 +24,28 @@ pub async fn run(_args: McpArgs) -> i32 {
let cache_state = new_cache_state();

// Get SDK version via subprocess before spawning async task
const DEFAULT_SDK_VERSION: &str = "0.89.0";
let sdk_version = match get_databricks_sdk_version() {
Ok(version) => {
if let Some(ref v) = version {
tracing::info!("Found Databricks SDK version: {}", v);
} else {
tracing::debug!("Databricks SDK not installed");
}
version
Ok(Some(v)) => {
tracing::info!("Found Databricks SDK version: {}", v);
v
}
Err(e) => {
tracing::warn!("Failed to get Databricks SDK version: {}", e);
None
Ok(None) | Err(_) => {
tracing::info!("SDK not detected locally, fetching latest version from GitHub");
match fetch_latest_sdk_version().await {
Ok(v) => {
tracing::info!("Latest SDK version from GitHub: {}", v);
v
}
Err(e) => {
tracing::warn!(
"Failed to fetch latest SDK version: {}. Using default {}",
e,
DEFAULT_SDK_VERSION
);
DEFAULT_SDK_VERSION.to_string()
}
}
}
};

Expand All @@ -49,22 +56,19 @@ pub async fn run(_args: McpArgs) -> i32 {
sdk_doc_index: Arc::clone(&sdk_doc_index),
};

// Build server with SDK params - all indexing happens sequentially in one task
let server = build_server(
AppContext {
app_dir,
sdk_doc_index,
cache_state,
index_state,
shutdown_tx: shutdown_tx.clone(),
},
Some(sdk_params),
);

server
.run_stdio(shutdown_tx)
let dev_db = DevDb::open()
.await
.map_err(|e| format!("MCP server error: {e}"))
.map_err(|e| format!("Failed to open dev database: {e}"))?;

let ctx = AppContext {
dev_db,
sdk_doc_index,
cache_state,
index_state,
shutdown_tx: shutdown_tx.clone(),
};

run_server(ctx, Some(sdk_params)).await
})
.await
}
3 changes: 2 additions & 1 deletion crates/cli/src/dev/restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::path::PathBuf;

use crate::common::resolve_app_dir;
use crate::run_cli_async_helper;
use apx_core::common::OutputMode;
use apx_core::ops::dev::restart_dev_server;

#[derive(Args, Debug, Clone)]
Expand All @@ -21,6 +22,6 @@ pub async fn run(args: RestartArgs) -> i32 {
async fn run_inner(args: RestartArgs) -> Result<(), String> {
let app_dir = resolve_app_dir(args.app_path);

restart_dev_server(&app_dir).await?;
restart_dev_server(&app_dir, OutputMode::Interactive).await?;
Ok(())
}
6 changes: 4 additions & 2 deletions crates/cli/src/dev/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::path::PathBuf;

use crate::common::resolve_app_dir;
use crate::run_cli_async_helper;
use apx_core::common::OutputMode;
use apx_core::ops::dev::stop_dev_server;
use apx_core::ops::dev::{spawn_server, start_dev_server};

Expand Down Expand Up @@ -46,7 +47,7 @@ pub async fn run(args: StartArgs) -> i32 {

async fn run_detached(args: StartArgs) -> Result<(), String> {
let app_dir = resolve_app_dir(args.app_path);
let _ = start_dev_server(&app_dir).await?;
let _ = start_dev_server(&app_dir, OutputMode::Interactive).await?;
Ok(())
}

Expand All @@ -58,6 +59,7 @@ async fn run_attached(args: StartArgs) -> Result<(), String> {
None,
args.skip_credentials_validation,
args.timeout,
OutputMode::Interactive,
)
.await?;

Expand All @@ -71,6 +73,6 @@ async fn run_attached(args: StartArgs) -> Result<(), String> {
// Run logs command (will return on Ctrl+C)
let _ = super::logs::run(logs_args).await;

stop_dev_server(&app_dir).await?;
stop_dev_server(&app_dir, OutputMode::Interactive).await?;
Ok(())
}
Loading