Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition.workspace = true
rust-version.workspace = true

[dependencies]
apx-common = { path = "../common" }
apx-core = { path = "../core" }
apx-mcp = { path = "../mcp" }
clap.workspace = true
Expand Down
31 changes: 13 additions & 18 deletions crates/cli/src/dev/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use tracing::debug;

use crate::common::resolve_app_dir;
use crate::run_cli_async_helper;
use apx_common::{LogAggregator, Storage, db_path, should_skip_log};
use apx_core::dev::common::{lock_path, read_lock};
use apx_core::flux::{Storage, db_path};
use apx_core::ops::logs::{
DEFAULT_LOG_DURATION, LogAggregator, format_log_record, parse_duration, should_skip_log,
DEFAULT_LOG_DURATION, format_aggregated_record, format_log_record, parse_duration,
since_timestamp_nanos,
};

Expand Down Expand Up @@ -95,16 +95,11 @@ fn read_logs(storage: &Storage, app_path: &str, since_ns: i64) -> Result<(), Str
let mut aggregator = LogAggregator::new();

for record in &filtered {
let timestamp_ns = if record.timestamp_ns == 0 {
record.observed_timestamp_ns
} else {
record.timestamp_ns
};
let timestamp_ms = timestamp_ns / 1_000_000;
let timestamp_ms = record.effective_timestamp_ms();

// Flush expired aggregations before processing this record
for line in aggregator.flush_expired(timestamp_ms, true) {
println!("{line}");
for agg in aggregator.flush_expired(timestamp_ms) {
println!("{}", format_aggregated_record(&agg, true));
}

// Try to aggregate, if not aggregatable print directly
Expand All @@ -114,8 +109,8 @@ fn read_logs(storage: &Storage, app_path: &str, since_ns: i64) -> Result<(), Str
}

// Flush any remaining aggregations
for line in aggregator.flush_all(true) {
println!("{line}");
for agg in aggregator.flush_all() {
println!("{}", format_aggregated_record(&agg, true));
}

Ok(())
Expand Down Expand Up @@ -147,17 +142,17 @@ async fn follow_logs(
_ = tokio::signal::ctrl_c() => {
debug!("Received Ctrl+C, stopping logs stream.");
// Flush remaining aggregations
for line in aggregator.flush_all(true) {
println!("{line}");
for agg in aggregator.flush_all() {
println!("{}", format_aggregated_record(&agg, true));
}
break;
}
_ = tokio::time::sleep(Duration::from_millis(200)) => {
let current_time_ms = Utc::now().timestamp_millis();

// Flush expired aggregations
for line in aggregator.flush_expired(current_time_ms, true) {
println!("{line}");
for agg in aggregator.flush_expired(current_time_ms) {
println!("{}", format_aggregated_record(&agg, true));
}

// Poll for new logs
Expand All @@ -183,8 +178,8 @@ async fn follow_logs(
if server_was_running && !lock_path.exists() {
debug!("Dev server stopped (lockfile removed), exiting logs follow.");
// Flush remaining aggregations
for line in aggregator.flush_all(true) {
println!("{line}");
for agg in aggregator.flush_all() {
println!("{}", format_aggregated_record(&agg, true));
}
println!("\n📭 Dev server stopped.");
break;
Expand Down
5 changes: 4 additions & 1 deletion crates/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ use std::path::PathBuf;
use std::time::Duration;

// Re-export commonly used types
pub use storage::{LogRecord, Storage, db_path, flux_dir};
pub use storage::{
AggregatedRecord, LogAggregator, LogRecord, Storage, db_path, flux_dir, get_aggregation_key,
should_skip_log, source_label,
};

/// Flux port for OTLP HTTP receiver
pub const FLUX_PORT: u16 = 11111;
Expand Down
215 changes: 215 additions & 0 deletions crates/common/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! OpenTelemetry logs in a local SQLite database.

use rusqlite::{Connection, params};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tracing::debug;
Expand Down Expand Up @@ -33,6 +34,220 @@ pub struct LogRecord {
pub span_id: Option<String>,
}

impl LogRecord {
/// Return the effective timestamp in milliseconds, falling back to
/// `observed_timestamp_ns` when `timestamp_ns` is zero (e.g. OpenTelemetry
/// tracing bridge logs).
pub fn effective_timestamp_ms(&self) -> i64 {
let ns = if self.timestamp_ns == 0 {
self.observed_timestamp_ns
} else {
self.timestamp_ns
};
ns / 1_000_000
}

/// Derive a short source label from `service_name`.
pub fn source_label(&self) -> &'static str {
source_label(self.service_name.as_deref().unwrap_or("unknown"))
}
}

/// Derive a short source label from a service name string.
pub fn source_label(service_name: &str) -> &'static str {
if service_name.ends_with("_app") {
"app"
} else if service_name.ends_with("_ui") {
"ui"
} else if service_name.ends_with("_db") {
"db"
} else {
"apx"
}
}

// ---------------------------------------------------------------------------
// Log aggregation
// ---------------------------------------------------------------------------

/// Time window for aggregating similar messages (in milliseconds).
const AGGREGATION_WINDOW_MS: i64 = 2000;

/// Minimum severity level for apx internal logs (DEBUG = 5, skipping TRACE = 1-4).
const APX_MIN_SEVERITY: i32 = 5;

/// Check if a log record should be skipped (internal/noisy logs).
pub fn should_skip_log(record: &LogRecord) -> bool {
let message = record.body.as_deref().unwrap_or("");
let service_name = record.service_name.as_deref().unwrap_or("");
let severity_number = record.severity_number.unwrap_or(9);

if service_name == "_core" && severity_number < APX_MIN_SEVERITY {
return true;
}

if message.starts_with("BatchLogProcessor.")
|| message.starts_with("ReqwestBlockingClient.")
|| message.starts_with("HttpLogsClient.")
|| message.starts_with("HttpClient.")
|| message.starts_with("Http::connect")
{
return true;
}

if message.starts_with("starting new connection:")
|| message.starts_with("connecting to ")
|| message.starts_with("connected to ")
|| message.starts_with("reuse idle connection")
|| message.starts_with("pooling idle connection")
{
return true;
}

if message.starts_with("preparing query ")
|| message.starts_with("DEBUG: parse ")
|| message.starts_with("DEBUG: bind ")
|| message.starts_with("executing statement ")
{
return true;
}

if message.starts_with("take? (")
|| message.starts_with("wait at most")
|| message.starts_with("connection ")
|| message.contains(".cargo/registry/src/")
|| message.starts_with("event /")
{
return true;
}

false
}

/// Get aggregation key for a message if it should be aggregated.
pub fn get_aggregation_key(record: &LogRecord) -> Option<(String, &'static str)> {
let message = record.body.as_deref().unwrap_or("");
let service = record.service_name.as_deref().unwrap_or("");

if service.ends_with("_db") && message.starts_with("Client connected from") {
return Some((
format!("{service}_client_connected"),
"db connections in last 2s",
));
}

if service.ends_with("_db") && message.starts_with("Client disconnected") {
return Some((
format!("{service}_client_disconnected"),
"db disconnections in last 2s",
));
}

None
}

/// A single flushed aggregation bucket.
#[derive(Debug, Clone)]
pub struct AggregatedRecord {
pub count: usize,
pub timestamp_ms: i64,
pub template: &'static str,
pub service_name: String,
}

/// Internal bucket used by [`LogAggregator`].
#[derive(Debug)]
struct AggBucket {
count: usize,
first_ts_ms: i64,
last_ts_ms: i64,
template: &'static str,
service_name: String,
}

/// Tracks aggregated messages within time windows.
#[derive(Debug, Default)]
pub struct LogAggregator {
buckets: HashMap<String, AggBucket>,
}

// Manual Debug for AggBucket is not needed since LogAggregator uses a custom Debug
// that only shows bucket count.

impl LogAggregator {
pub fn new() -> Self {
Self::default()
}

/// Try to aggregate the record. Returns `true` if it was aggregated.
pub fn add(&mut self, record: &LogRecord) -> bool {
let Some((key, template)) = get_aggregation_key(record) else {
return false;
};

let timestamp_ms = record.effective_timestamp_ms();
let service_name = record.service_name.as_deref().unwrap_or("").to_string();

let entry = self.buckets.entry(key).or_insert(AggBucket {
count: 0,
first_ts_ms: timestamp_ms,
last_ts_ms: timestamp_ms,
template,
service_name,
});
entry.count += 1;
entry.last_ts_ms = timestamp_ms;

true
}

/// Flush buckets whose last timestamp is older than `current_time_ms` by
/// more than the aggregation window. Only returns buckets with count > 1.
pub fn flush_expired(&mut self, current_time_ms: i64) -> Vec<AggregatedRecord> {
let mut output = Vec::new();
let mut to_remove = Vec::new();

for (key, bucket) in &self.buckets {
if current_time_ms - bucket.last_ts_ms > AGGREGATION_WINDOW_MS {
if bucket.count > 1 {
output.push(AggregatedRecord {
count: bucket.count,
timestamp_ms: bucket.first_ts_ms,
template: bucket.template,
service_name: bucket.service_name.clone(),
});
}
to_remove.push(key.clone());
}
}

for key in to_remove {
self.buckets.remove(&key);
}

output
}

/// Flush all remaining buckets. Only returns buckets with count > 1.
pub fn flush_all(&mut self) -> Vec<AggregatedRecord> {
let mut output = Vec::new();

for bucket in self.buckets.values() {
if bucket.count > 1 {
output.push(AggregatedRecord {
count: bucket.count,
timestamp_ms: bucket.first_ts_ms,
template: bucket.template,
service_name: bucket.service_name.clone(),
});
}
}

self.buckets.clear();
output
}
}

/// Thread-safe storage handle.
#[derive(Clone)]
pub struct Storage {
Expand Down
Loading