diff --git a/crates/scouter_dataframe/src/parquet/control/engine.rs b/crates/scouter_dataframe/src/parquet/control/engine.rs index caeb6f56d..028adc7a7 100644 --- a/crates/scouter_dataframe/src/parquet/control/engine.rs +++ b/crates/scouter_dataframe/src/parquet/control/engine.rs @@ -9,7 +9,6 @@ use chrono::{DateTime, Duration, Utc}; use datafusion::logical_expr::{col, lit}; use datafusion::prelude::SessionContext; use deltalake::{DeltaTable, DeltaTableBuilder, TableProperty}; -use scouter_settings::ObjectStorageSettings; use std::sync::Arc; use tokio::sync::RwLock as AsyncRwLock; use tracing::{debug, info, warn}; @@ -118,7 +117,6 @@ pub fn get_pod_id() -> String { pub struct ControlTableEngine { schema: SchemaRef, #[allow(dead_code)] // Used for future vacuum/maintenance operations - object_store: ObjectStore, table: Arc>, ctx: Arc, pod_id: String, @@ -129,13 +127,9 @@ impl ControlTableEngine { /// /// The `pod_id` identifies this instance for distributed locking. In K8s, /// pass the pod hostname (`std::env::var("HOSTNAME")`). - pub async fn new( - storage_settings: &ObjectStorageSettings, - pod_id: String, - ) -> Result { - let object_store = ObjectStore::new(storage_settings)?; + pub async fn new(object_store: &ObjectStore, pod_id: String) -> Result { let schema = Arc::new(control_schema()); - let table = build_or_create_control_table(&object_store, schema.clone()).await?; + let table = build_or_create_control_table(object_store, schema.clone()).await?; let ctx = object_store.get_session()?; if let Ok(provider) = table.table_provider().await { @@ -146,7 +140,6 @@ impl ControlTableEngine { Ok(Self { schema, - object_store, table: Arc::new(AsyncRwLock::new(table)), ctx: Arc::new(ctx), pod_id, @@ -399,32 +392,40 @@ impl ControlTableEngine { continue; } - let get_string = |col_name: &str| -> String { - let col = batch.column_by_name(col_name).unwrap(); - let casted = - arrow::compute::cast(col, &DataType::Utf8).expect("cast to Utf8 failed"); - let arr = casted.as_any().downcast_ref::().unwrap(); - arr.value(0).to_string() - }; - - let get_timestamp = |col_name: &str| -> Option> { - let col = batch.column_by_name(col_name).unwrap(); - if col.is_null(0) { - return None; - } - let arr = col + let get_string = |col_name: &'static str| -> Result { + let col = batch + .column_by_name(col_name) + .ok_or(TraceEngineError::DowncastError(col_name))?; + let casted = arrow::compute::cast(col, &DataType::Utf8) + .map_err(TraceEngineError::ArrowError)?; + let arr = casted .as_any() - .downcast_ref::() - .unwrap(); - DateTime::from_timestamp_micros(arr.value(0)) + .downcast_ref::() + .ok_or(TraceEngineError::DowncastError(col_name))?; + Ok(arr.value(0).to_string()) }; - let task_name_val = get_string("task_name"); - let status_val = get_string("status"); - let pod_id_val = get_string("pod_id"); - let claimed_at = get_timestamp("claimed_at").unwrap_or_else(Utc::now); - let completed_at = get_timestamp("completed_at"); - let next_run_at = get_timestamp("next_run_at").unwrap_or_else(Utc::now); + let get_timestamp = + |col_name: &'static str| -> Result>, TraceEngineError> { + let col = batch + .column_by_name(col_name) + .ok_or(TraceEngineError::DowncastError(col_name))?; + if col.is_null(0) { + return Ok(None); + } + let arr = col + .as_any() + .downcast_ref::() + .ok_or(TraceEngineError::DowncastError(col_name))?; + Ok(DateTime::from_timestamp_micros(arr.value(0))) + }; + + let task_name_val = get_string("task_name")?; + let status_val = get_string("status")?; + let pod_id_val = get_string("pod_id")?; + let claimed_at = get_timestamp("claimed_at")?.unwrap_or_else(Utc::now); + let completed_at = get_timestamp("completed_at")?; + let next_run_at = get_timestamp("next_run_at")?.unwrap_or_else(Utc::now); return Ok(Some(TaskRecord { task_name: task_name_val, @@ -453,6 +454,17 @@ impl ControlTableEngine { // First, delete the existing row for this task (if any). // On a brand-new table with no data, delete will fail — that's fine. + // Safety: task_name is always an internal constant (e.g. "summary_optimize"). + // The delta-rs delete API takes a String predicate, not a parameterized Expr, + // so we assert the invariant defensively. + debug_assert!( + record + .task_name + .chars() + .all(|c| c.is_alphanumeric() || c == '_'), + "task_name must be alphanumeric + underscore, got: {}", + record.task_name + ); let predicate = format!("task_name = '{}'", record.task_name); let delete_result = table_guard.clone().delete().with_predicate(predicate).await; @@ -470,8 +482,19 @@ impl ControlTableEngine { } *table_guard = updated_table; } - Err(_) => { - // No existing data to delete (new table) — just append + Err(e) => { + let err_msg = e.to_string(); + // On a brand-new table with no data, delete fails because there's nothing + // to remove — this is expected and we fall through to append. + // All other errors are real failures and must be propagated. + if !err_msg.contains("No data") && !err_msg.contains("empty") { + warn!( + "Delete before write_task_update failed unexpectedly: {}", + err_msg + ); + return Err(TraceEngineError::DataTableError(e)); + } + let updated_table = table_guard .clone() .write(vec![batch]) @@ -587,12 +610,16 @@ mod tests { use super::*; use scouter_settings::ObjectStorageSettings; + fn make_test_object_store(storage_settings: &ObjectStorageSettings) -> ObjectStore { + ObjectStore::new(storage_settings).unwrap() + } + fn cleanup() { let storage_settings = ObjectStorageSettings::default(); let current_dir = std::env::current_dir().unwrap(); let storage_path = current_dir.join(storage_settings.storage_root()); if storage_path.exists() { - std::fs::remove_dir_all(storage_path).unwrap(); + let _ = std::fs::remove_dir_all(storage_path); } } @@ -601,7 +628,8 @@ mod tests { cleanup(); let settings = ObjectStorageSettings::default(); - let engine = ControlTableEngine::new(&settings, "pod-1".to_string()).await?; + let object_store = make_test_object_store(&settings); + let engine = ControlTableEngine::new(&object_store, "pod-1".to_string()).await?; // No tasks should exist yet let due = engine.is_task_due("optimize").await?; @@ -616,7 +644,8 @@ mod tests { cleanup(); let settings = ObjectStorageSettings::default(); - let engine = ControlTableEngine::new(&settings, "pod-1".to_string()).await?; + let object_store = make_test_object_store(&settings); + let engine = ControlTableEngine::new(&object_store, "pod-1".to_string()).await?; // Claim a new task let claimed = engine.try_claim_task("optimize").await?; @@ -645,7 +674,8 @@ mod tests { cleanup(); let settings = ObjectStorageSettings::default(); - let engine = ControlTableEngine::new(&settings, "pod-1".to_string()).await?; + let object_store = make_test_object_store(&settings); + let engine = ControlTableEngine::new(&object_store, "pod-1".to_string()).await?; // Claim and release with 0-second interval (immediately due again) let claimed = engine.try_claim_task("vacuum").await?; @@ -673,7 +703,8 @@ mod tests { cleanup(); let settings = ObjectStorageSettings::default(); - let engine = ControlTableEngine::new(&settings, "pod-1".to_string()).await?; + let object_store = make_test_object_store(&settings); + let engine = ControlTableEngine::new(&object_store, "pod-1".to_string()).await?; // Claim two different tasks let claimed_opt = engine.try_claim_task("optimize").await?; diff --git a/crates/scouter_dataframe/src/parquet/tracing/engine.rs b/crates/scouter_dataframe/src/parquet/tracing/engine.rs index 583cdf229..d1ee8b3c8 100644 --- a/crates/scouter_dataframe/src/parquet/tracing/engine.rs +++ b/crates/scouter_dataframe/src/parquet/tracing/engine.rs @@ -200,7 +200,7 @@ impl TraceSpanDBEngine { } else { info!("Empty table at init — deferring SessionContext registration until first write"); } - let control = ControlTableEngine::new(storage_settings, get_pod_id()).await?; + let control = ControlTableEngine::new(&object_store, get_pod_id()).await?; Ok(TraceSpanDBEngine { schema, @@ -324,6 +324,9 @@ impl TraceSpanDBEngine { self.ctx.deregister_table(TRACE_SPAN_TABLE_NAME)?; self.ctx .register_table(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?)?; + // Ensure the table's object store is registered with the DataFusion session + // so that DeltaScan::scan() can resolve file URLs during query execution. + updated_table.update_datafusion_session(&self.ctx.state())?; *table_guard = updated_table; @@ -351,6 +354,7 @@ impl TraceSpanDBEngine { self.ctx.deregister_table(TRACE_SPAN_TABLE_NAME)?; self.ctx .register_table(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?)?; + updated_table.update_datafusion_session(&self.ctx.state())?; *table_guard = updated_table; @@ -370,6 +374,7 @@ impl TraceSpanDBEngine { self.ctx.deregister_table(TRACE_SPAN_TABLE_NAME)?; self.ctx .register_table(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?)?; + updated_table.update_datafusion_session(&self.ctx.state())?; *table_guard = updated_table; @@ -404,6 +409,7 @@ impl TraceSpanDBEngine { self.ctx.deregister_table(TRACE_SPAN_TABLE_NAME)?; self.ctx .register_table(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?)?; + updated_table.update_datafusion_session(&self.ctx.state())?; *table_guard = updated_table; @@ -495,6 +501,8 @@ impl TraceSpanDBEngine { } } TableCommand::Optimize { respond_to } => { + // Response is sent before vacuum so callers aren't blocked + // on the potentially slow file-deletion pass. let _ = respond_to.send(self.optimize_table().await); if let Err(e) = self.vacuum_table(0).await { error!("Post-optimize vacuum failed: {}", e); diff --git a/crates/scouter_dataframe/src/parquet/tracing/service.rs b/crates/scouter_dataframe/src/parquet/tracing/service.rs index ce0a8926e..380a848ba 100644 --- a/crates/scouter_dataframe/src/parquet/tracing/service.rs +++ b/crates/scouter_dataframe/src/parquet/tracing/service.rs @@ -1,6 +1,7 @@ use crate::error::TraceEngineError; use crate::parquet::tracing::engine::{TableCommand, TraceSpanDBEngine}; use crate::parquet::tracing::queries::TraceQueries; +use crate::storage::ObjectStore; use datafusion::prelude::SessionContext; use scouter_settings::ObjectStorageSettings; use scouter_types::TraceSpanRecord; @@ -74,6 +75,9 @@ pub struct TraceSpanService { pub query_service: TraceQueries, /// Shared SessionContext — exposes `trace_spans` registration for TraceSummaryService. pub ctx: Arc, + /// Shared ObjectStore — passed to TraceSummaryService so both engines use the same + /// CachingStore instance, preventing stale reads on cloud backends (GCS/S3). + pub object_store: ObjectStore, } impl TraceSpanService { @@ -100,6 +104,7 @@ impl TraceSpanService { ); let ctx = engine.ctx.clone(); + let object_store = engine.object_store.clone(); let (engine_tx, engine_handle) = engine.start_actor(compaction_interval_hours, retention_days); let (span_tx, span_rx) = mpsc::channel::>(100); @@ -121,6 +126,7 @@ impl TraceSpanService { buffer_handle, query_service: TraceQueries::new(ctx.clone()), ctx, + object_store, }) } @@ -345,7 +351,7 @@ mod tests { let current_dir = std::env::current_dir().unwrap(); let storage_path = current_dir.join(storage_settings.storage_root()); if storage_path.exists() { - std::fs::remove_dir_all(storage_path).unwrap(); + let _ = std::fs::remove_dir_all(storage_path); } } @@ -817,4 +823,142 @@ mod tests { cleanup(); Ok(()) } + + /// Regression test for stale DataFusion session state after Delta Lake writes. + /// + /// Before the fix, `SessionContext` cached file metadata from the first write and + /// subsequent writes were invisible to queries — the session held a stale snapshot + /// of the Delta log. The fix calls `update_datafusion_session()` after each write + /// to re-register the table provider with the latest Delta log state. + #[tokio::test] + async fn test_span_write_visibility_across_multiple_writes() -> Result<(), TraceEngineError> { + cleanup(); + + let storage_settings = ObjectStorageSettings::default(); + let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?; + + let start = Utc::now() - chrono::Duration::hours(1); + let end = Utc::now() + chrono::Duration::hours(1); + + // Write batch #1 (2 spans) + let trace1 = TraceId::from_bytes([0xB0; 16]); + let spans1 = vec![ + make_span( + &trace1, + SpanId::from_bytes([0xB0; 8]), + None, + "svc_vis", + "op1", + vec![], + ), + make_span( + &trace1, + SpanId::from_bytes([0xB1; 8]), + Some(SpanId::from_bytes([0xB0; 8])), + "svc_vis", + "op2", + vec![], + ), + ]; + service.write_spans_direct(spans1).await?; + + let result = service + .query_service + .get_trace_spans( + Some(trace1.as_bytes()), + None, + Some(&start), + Some(&end), + None, + ) + .await?; + assert_eq!( + result.len(), + 2, + "After write #1: expected 2 spans, got {}", + result.len() + ); + + // Write batch #2 (2 more spans, different trace) + let trace2 = TraceId::from_bytes([0xB2; 16]); + let spans2 = vec![ + make_span( + &trace2, + SpanId::from_bytes([0xB2; 8]), + None, + "svc_vis", + "op3", + vec![], + ), + make_span( + &trace2, + SpanId::from_bytes([0xB3; 8]), + Some(SpanId::from_bytes([0xB2; 8])), + "svc_vis", + "op4", + vec![], + ), + ]; + service.write_spans_direct(spans2).await?; + + let result = service + .query_service + .get_trace_spans( + Some(trace2.as_bytes()), + None, + Some(&start), + Some(&end), + None, + ) + .await?; + assert_eq!( + result.len(), + 2, + "After write #2: expected 2 spans for trace2, got {} (stale snapshot?)", + result.len() + ); + + // Write batch #3 (2 more spans, third trace) + let trace3 = TraceId::from_bytes([0xB4; 16]); + let spans3 = vec![ + make_span( + &trace3, + SpanId::from_bytes([0xB4; 8]), + None, + "svc_vis", + "op5", + vec![], + ), + make_span( + &trace3, + SpanId::from_bytes([0xB5; 8]), + Some(SpanId::from_bytes([0xB4; 8])), + "svc_vis", + "op6", + vec![], + ), + ]; + service.write_spans_direct(spans3).await?; + + let result = service + .query_service + .get_trace_spans( + Some(trace3.as_bytes()), + None, + Some(&start), + Some(&end), + None, + ) + .await?; + assert_eq!( + result.len(), + 2, + "After write #3: expected 2 spans for trace3, got {} (stale snapshot?)", + result.len() + ); + + service.shutdown().await?; + cleanup(); + Ok(()) + } } diff --git a/crates/scouter_dataframe/src/parquet/tracing/summary.rs b/crates/scouter_dataframe/src/parquet/tracing/summary.rs index 730b6a41e..2bae0ef7a 100644 --- a/crates/scouter_dataframe/src/parquet/tracing/summary.rs +++ b/crates/scouter_dataframe/src/parquet/tracing/summary.rs @@ -9,13 +9,12 @@ use arrow::compute; use arrow::datatypes::*; use arrow_array::Array; use arrow_array::RecordBatch; -use chrono::{DateTime, Datelike, TimeZone, Utc}; +use chrono::{DateTime, Datelike, Utc}; use datafusion::logical_expr::{cast as df_cast, col, lit, SortExpr}; use datafusion::prelude::*; use datafusion::scalar::ScalarValue; use deltalake::operations::optimize::OptimizeType; use deltalake::{DeltaTable, DeltaTableBuilder, TableProperty}; -use scouter_settings::ObjectStorageSettings; use scouter_types::sql::{TraceFilters, TraceListItem}; use scouter_types::{Attribute, TraceCursor, TraceId, TracePaginationResponse, TraceSummaryRecord}; use std::sync::Arc; @@ -171,7 +170,7 @@ impl TraceSummaryBatchBuilder { } let duration = rec .end_time - .map(|end| (end - rec.start_time).num_milliseconds()); + .map(|end| (end - rec.start_time).num_milliseconds().max(0)); match duration { Some(d) => self.duration_ms.append_value(d), None => self.duration_ms.append_null(), @@ -378,12 +377,11 @@ impl TraceSummaryDBEngine { /// `trace_spans` and `trace_summaries` live in the same context and can participate in /// JOIN queries. pub async fn new( - storage_settings: &ObjectStorageSettings, + object_store: &ObjectStore, ctx: Arc, ) -> Result { - let object_store = ObjectStore::new(storage_settings)?; let schema = Arc::new(create_summary_schema()); - let delta_table = build_or_create_summary_table(&object_store, schema.clone()).await?; + let delta_table = build_or_create_summary_table(object_store, schema.clone()).await?; // A freshly-created table has no committed Parquet files yet — table_provider() // returns an error in that case. Defer registration until the first write. if let Ok(provider) = delta_table.table_provider().await { @@ -392,7 +390,7 @@ impl TraceSummaryDBEngine { info!("Empty summary table at init — deferring SessionContext registration until first write"); } - let control = ControlTableEngine::new(storage_settings, get_pod_id()).await?; + let control = ControlTableEngine::new(object_store, get_pod_id()).await?; Ok(TraceSummaryDBEngine { schema, @@ -422,13 +420,16 @@ impl TraceSummaryDBEngine { let batch = self.build_batch(records)?; let mut table_guard = self.table.write().await; - - if let Err(e) = table_guard.update_incremental(None).await { - info!("Summary table update skipped (new table): {}", e); - } - - let updated_table = table_guard - .clone() + // update_incremental is intentionally omitted here. + // + // This engine runs as a single-writer actor — no other process commits to this + // Delta table, so the in-memory state is always current. Calling update_incremental + // can mutate table_guard into a corrupted intermediate state before the error + // propagates, producing a DeltaTable whose snapshot may not reflect newly written + // files — causing stale query results until restart. + + let current_table = table_guard.clone(); + let updated_table = current_table .write(vec![batch]) .with_save_mode(deltalake::protocol::SaveMode::Append) .with_partition_columns(vec![PARTITION_DATE_COL.to_string()]) @@ -437,6 +438,7 @@ impl TraceSummaryDBEngine { self.ctx.deregister_table(SUMMARY_TABLE_NAME)?; self.ctx .register_table(SUMMARY_TABLE_NAME, updated_table.table_provider().await?)?; + updated_table.update_datafusion_session(&self.ctx.state())?; *table_guard = updated_table; info!("Summary table updated with {} records", count); @@ -458,6 +460,7 @@ impl TraceSummaryDBEngine { self.ctx.deregister_table(SUMMARY_TABLE_NAME)?; self.ctx .register_table(SUMMARY_TABLE_NAME, updated_table.table_provider().await?)?; + updated_table.update_datafusion_session(&self.ctx.state())?; *table_guard = updated_table; Ok(()) } @@ -474,6 +477,7 @@ impl TraceSummaryDBEngine { self.ctx.deregister_table(SUMMARY_TABLE_NAME)?; self.ctx .register_table(SUMMARY_TABLE_NAME, updated_table.table_provider().await?)?; + updated_table.update_datafusion_session(&self.ctx.state())?; *table_guard = updated_table; Ok(()) } @@ -534,9 +538,10 @@ impl TraceSummaryDBEngine { let _ = respond_to.send(result); } SummaryTableCommand::Optimize { respond_to } => { - // Direct admin request — bypass control table + // Direct admin request — bypass control table. + // Response is sent before vacuum so callers aren't blocked + // on the potentially slow file-deletion pass. let _ = respond_to.send(self.optimize_table().await); - // vacuum table if let Err(e) = self.vacuum_table(0).await { error!("Post-optimize vacuum failed: {}", e); } @@ -571,11 +576,11 @@ pub struct TraceSummaryService { impl TraceSummaryService { pub async fn new( - storage_settings: &ObjectStorageSettings, + object_store: &ObjectStore, compaction_interval_hours: u64, ctx: Arc, ) -> Result { - let engine = TraceSummaryDBEngine::new(storage_settings, ctx).await?; + let engine = TraceSummaryDBEngine::new(object_store, ctx).await?; let engine_ctx = engine.ctx.clone(); let (engine_tx, engine_handle) = engine.start_actor(compaction_interval_hours); @@ -1024,11 +1029,28 @@ fn extract_map_attributes(map_array: &MapArray, row_idx: usize) -> Vec().unwrap(); - let keys_arr = compute::cast(struct_array.column(0).as_ref(), &DataType::Utf8).unwrap(); - let keys = keys_arr.as_any().downcast_ref::().unwrap(); - let values_arr = compute::cast(struct_array.column(1).as_ref(), &DataType::Utf8).unwrap(); - let values = values_arr.as_any().downcast_ref::().unwrap(); + let Some(struct_array) = entry.as_any().downcast_ref::() else { + tracing::warn!("extract_map_attributes: failed to downcast to StructArray"); + return Vec::new(); + }; + let Some(keys_arr) = compute::cast(struct_array.column(0).as_ref(), &DataType::Utf8).ok() + else { + tracing::warn!("extract_map_attributes: failed to cast keys to Utf8"); + return Vec::new(); + }; + let Some(keys) = keys_arr.as_any().downcast_ref::() else { + tracing::warn!("extract_map_attributes: failed to downcast keys to StringArray"); + return Vec::new(); + }; + let Some(values_arr) = compute::cast(struct_array.column(1).as_ref(), &DataType::Utf8).ok() + else { + tracing::warn!("extract_map_attributes: failed to cast values to Utf8"); + return Vec::new(); + }; + let Some(values) = values_arr.as_any().downcast_ref::() else { + tracing::warn!("extract_map_attributes: failed to downcast values to StringArray"); + return Vec::new(); + }; (0..struct_array.len()) .map(|i| Attribute { @@ -1215,11 +1237,11 @@ fn batches_to_trace_list_items( for i in 0..batch.num_rows() { let trace_id_hex = hex::encode(trace_ids.value(i)); - let start_time = micros_to_datetime(start_times.value(i)); + let start_time = micros_to_datetime(start_times.value(i))?; let end_time = if end_times.is_null(i) { None } else { - Some(micros_to_datetime(end_times.value(i))) + Some(micros_to_datetime(end_times.value(i))?) }; let duration_ms = if durations.is_null(i) { None @@ -1261,10 +1283,10 @@ fn batches_to_trace_list_items( Ok(items) } -fn micros_to_datetime(micros: i64) -> DateTime { - let secs = micros / 1_000_000; - let nanos = ((micros % 1_000_000) * 1_000) as u32; - Utc.timestamp_opt(secs, nanos).unwrap() +fn micros_to_datetime(micros: i64) -> Result, TraceEngineError> { + DateTime::from_timestamp_micros(micros).ok_or(TraceEngineError::InvalidTimestamp( + "out-of-range microsecond timestamp", + )) } #[cfg(test)] @@ -1285,19 +1307,18 @@ mod tests { let current_dir = std::env::current_dir().unwrap(); let storage_path = current_dir.join(storage_settings.storage_root()); if storage_path.exists() { - std::fs::remove_dir_all(storage_path).unwrap(); + let _ = std::fs::remove_dir_all(storage_path); } } + fn make_test_object_store(storage_settings: &ObjectStorageSettings) -> ObjectStore { + ObjectStore::new(storage_settings).unwrap() + } + /// Build a standalone SessionContext for test use (no trace_spans registered). /// Attribute-filter paths that need trace_spans are not exercised in these tests. - fn make_test_ctx(storage_settings: &ObjectStorageSettings) -> Arc { - Arc::new( - ObjectStore::new(storage_settings) - .unwrap() - .get_session() - .unwrap(), - ) + fn make_test_ctx(object_store: &ObjectStore) -> Arc { + Arc::new(object_store.get_session().unwrap()) } fn make_summary( @@ -1335,8 +1356,9 @@ mod tests { cleanup(); let storage_settings = ObjectStorageSettings::default(); - let ctx = make_test_ctx(&storage_settings); - let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?; + let object_store = make_test_object_store(&storage_settings); + let ctx = make_test_ctx(&object_store); + let service = TraceSummaryService::new(&object_store, 24, ctx).await?; let s1 = make_summary([1u8; 16], "svc_a", 0, vec![]); let s2 = make_summary([2u8; 16], "svc_b", 0, vec![]); @@ -1379,8 +1401,9 @@ mod tests { cleanup(); let storage_settings = ObjectStorageSettings::default(); - let ctx = make_test_ctx(&storage_settings); - let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?; + let object_store = make_test_object_store(&storage_settings); + let ctx = make_test_ctx(&object_store); + let service = TraceSummaryService::new(&object_store, 24, ctx).await?; let ok_summary = make_summary([3u8; 16], "svc", 0, vec![]); let err_summary = make_summary([4u8; 16], "svc", 2, vec![]); @@ -1457,8 +1480,9 @@ mod tests { cleanup(); let storage_settings = ObjectStorageSettings::default(); - let ctx = make_test_ctx(&storage_settings); - let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?; + let object_store = make_test_object_store(&storage_settings); + let ctx = make_test_ctx(&object_store); + let service = TraceSummaryService::new(&object_store, 24, ctx).await?; let s_alpha = make_summary([5u8; 16], "alpha_service", 0, vec![]); let s_beta = make_summary([6u8; 16], "beta_service", 0, vec![]); @@ -1507,8 +1531,9 @@ mod tests { cleanup(); let storage_settings = ObjectStorageSettings::default(); - let ctx = make_test_ctx(&storage_settings); - let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?; + let object_store = make_test_object_store(&storage_settings); + let ctx = make_test_ctx(&object_store); + let service = TraceSummaryService::new(&object_store, 24, ctx).await?; let wanted_id = TraceId::from_bytes([7u8; 16]); let unwanted_id = TraceId::from_bytes([8u8; 16]); @@ -1563,8 +1588,9 @@ mod tests { async fn test_summary_cursor_pagination() -> Result<(), TraceEngineError> { cleanup(); let storage_settings = ObjectStorageSettings::default(); - let ctx = make_test_ctx(&storage_settings); - let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?; + let object_store = make_test_object_store(&storage_settings); + let ctx = make_test_ctx(&object_store); + let service = TraceSummaryService::new(&object_store, 24, ctx).await?; let now = Utc::now(); let summaries: Vec = (0u8..100) @@ -1631,7 +1657,8 @@ mod tests { let shared_ctx = span_service.ctx.clone(); // TraceSummaryService shares the same ctx — JOIN to trace_spans will work - let summary_service = TraceSummaryService::new(&storage_settings, 24, shared_ctx).await?; + let summary_service = + TraceSummaryService::new(&span_service.object_store, 24, shared_ctx).await?; let now = Utc::now(); let kafka_trace = TraceId::from_bytes([70u8; 16]); @@ -1712,7 +1739,8 @@ mod tests { let shared_ctx = span_service.ctx.clone(); // TraceSummaryService shares the same ctx so JOIN path works - let summary_service = TraceSummaryService::new(&storage_settings, 24, shared_ctx).await?; + let summary_service = + TraceSummaryService::new(&span_service.object_store, 24, shared_ctx).await?; let now = Utc::now(); let queue_trace = TraceId::from_bytes([90u8; 16]); @@ -1848,8 +1876,9 @@ mod tests { cleanup(); let storage_settings = ObjectStorageSettings::default(); - let ctx = make_test_ctx(&storage_settings); - let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?; + let object_store = make_test_object_store(&storage_settings); + let ctx = make_test_ctx(&object_store); + let service = TraceSummaryService::new(&object_store, 24, ctx).await?; let attrs = vec![Attribute { key: "cloud.region".to_string(), @@ -1890,4 +1919,71 @@ mod tests { cleanup(); Ok(()) } + + /// Regression test: multiple sequential writes must be immediately visible to queries. + /// This catches the stale snapshot bug where re-registration doesn't refresh the + /// DataFusion session's object store, causing queries after subsequent writes to + /// return stale results. + #[tokio::test] + async fn test_summary_write_visibility_across_multiple_writes() -> Result<(), TraceEngineError> + { + cleanup(); + + let storage_settings = ObjectStorageSettings::default(); + let object_store = make_test_object_store(&storage_settings); + let ctx = make_test_ctx(&object_store); + let service = TraceSummaryService::new(&object_store, 24, ctx).await?; + + let start = Utc::now() - chrono::Duration::hours(1); + let end = Utc::now() + chrono::Duration::hours(1); + let filters = TraceFilters { + start_time: Some(start), + end_time: Some(end), + limit: Some(100), + ..Default::default() + }; + + // Write batch #1 (2 summaries) + let s1 = make_summary([0xA0; 16], "svc_vis", 0, vec![]); + let s2 = make_summary([0xA1; 16], "svc_vis", 0, vec![]); + service.write_summaries(vec![s1, s2]).await?; + + let response = service.query_service.get_paginated_traces(&filters).await?; + assert_eq!( + response.items.len(), + 2, + "After write #1: expected 2 items, got {}", + response.items.len() + ); + + // Write batch #2 (2 more summaries) + let s3 = make_summary([0xA2; 16], "svc_vis", 0, vec![]); + let s4 = make_summary([0xA3; 16], "svc_vis", 0, vec![]); + service.write_summaries(vec![s3, s4]).await?; + + let response = service.query_service.get_paginated_traces(&filters).await?; + assert_eq!( + response.items.len(), + 4, + "After write #2: expected 4 items, got {} (stale snapshot?)", + response.items.len() + ); + + // Write batch #3 (2 more summaries) + let s5 = make_summary([0xA4; 16], "svc_vis", 0, vec![]); + let s6 = make_summary([0xA5; 16], "svc_vis", 0, vec![]); + service.write_summaries(vec![s5, s6]).await?; + + let response = service.query_service.get_paginated_traces(&filters).await?; + assert_eq!( + response.items.len(), + 6, + "After write #3: expected 6 items, got {} (stale snapshot?)", + response.items.len() + ); + + service.shutdown().await?; + cleanup(); + Ok(()) + } } diff --git a/crates/scouter_server/src/api/routes/trace/route.rs b/crates/scouter_server/src/api/routes/trace/route.rs index 9c55b51e5..655a33ad9 100644 --- a/crates/scouter_server/src/api/routes/trace/route.rs +++ b/crates/scouter_server/src/api/routes/trace/route.rs @@ -294,6 +294,36 @@ pub async fn v1_otel_traces( Ok(Json(TraceReceivedResponse { received: true })) } +#[instrument(skip_all)] +pub async fn debug_recent_traces( + State(data): State>, +) -> Result, (StatusCode, Json)> { + let end_time = chrono::Utc::now(); + let start_time = end_time - chrono::Duration::hours(24); + + let filters = TraceFilters { + start_time: Some(start_time), + end_time: Some(end_time), + limit: Some(10), + ..Default::default() + }; + + let response = data + .trace_summary_service + .query_service + .get_paginated_traces(&filters) + .await + .map_err(|e| { + error!("Failed to get debug recent traces: {:?}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ScouterServerError::get_paginated_traces_error(e)), + ) + })?; + + Ok(Json(response)) +} + pub async fn get_trace_router(prefix: &str) -> Result>> { let result = catch_unwind(AssertUnwindSafe(|| { Router::new() @@ -310,6 +340,10 @@ pub async fn get_trace_router(prefix: &str) -> Result>> { ) .route(&format!("{prefix}/trace/metrics"), post(trace_metrics)) .route(&format!("{prefix}/v1/traces"), post(v1_otel_traces)) + .route( + &format!("{prefix}/trace/debug/recent"), + get(debug_recent_traces), + ) // add {id}/spans route for otel compat .route( &(format!("{prefix}/v1/traces/") + "{id}/spans"), diff --git a/crates/scouter_server/src/api/setup.rs b/crates/scouter_server/src/api/setup.rs index bb83949c5..3fd599de0 100644 --- a/crates/scouter_server/src/api/setup.rs +++ b/crates/scouter_server/src/api/setup.rs @@ -159,7 +159,7 @@ impl ScouterSetupComponents { let trace_summary_service = Arc::new( TraceSummaryService::new( - &config.storage_settings, + &trace_service.object_store, compaction_hours, trace_service.ctx.clone(), ) diff --git a/py-scouter/examples/tracing/README.md b/py-scouter/examples/tracing/README.md new file mode 100644 index 000000000..793a1c774 --- /dev/null +++ b/py-scouter/examples/tracing/README.md @@ -0,0 +1,47 @@ +# Tracing examples + +These examples show how to instrument a Python application with Scouter tracing. + +## `instrumentor_example.py` + +Demonstrates `ScouterInstrumentor` — the recommended entry point for production +applications. + +`ScouterInstrumentor` registers Scouter as the **global OpenTelemetry +`TracerProvider`**. This means any library that already emits OTel spans +(httpx, SQLAlchemy, FastAPI, gRPC, etc.) will automatically route those spans to +Scouter without per-library configuration. + +### What the example covers + +| Section | Concept | +|---------|---------| +| `instrument()` | Setup at startup — transport, batching, sampling, default attributes | +| `trace.get_tracer()` | Standard OTel API works after instrumentation | +| `get_tracer()` | Scouter tracer with `.span()` decorator | +| Nested spans | Child spans created inside a root span | +| Baggage | Propagating context across service boundaries | +| Default attributes | `env`, `version`, etc. stamped on every span automatically | +| `uninstrument()` | Clean shutdown, flushes export queue | + +### Run it + +```bash +# This example assumes you have a Scouter server running +cd py-scouter +uv run python examples/tracing/instrumentor_example.py +``` + +### `ScouterInstrumentor` vs `init_tracer` + +Use `ScouterInstrumentor` when you want to capture spans from third-party +libraries automatically. Use `init_tracer` / `get_tracer` when you only need +to instrument your own code. + +| | `ScouterInstrumentor` | `init_tracer` | +|---|---|---| +| Sets global OTel provider | Yes | Yes | +| Third-party auto-instrumentation | Works automatically | Works automatically | +| Decorator support | Via `get_tracer()` | Via `get_tracer()` | +| Singleton guard | Built-in | Manual | +| Recommended for | Application entrypoints | Library / module-level use | diff --git a/py-scouter/examples/tracing/instrumentor_example.py b/py-scouter/examples/tracing/instrumentor_example.py new file mode 100644 index 000000000..a872939a3 --- /dev/null +++ b/py-scouter/examples/tracing/instrumentor_example.py @@ -0,0 +1,154 @@ +""" +ScouterInstrumentor example. + +ScouterInstrumentor registers Scouter as the global OpenTelemetry TracerProvider. +Any library that emits OTel spans (httpx, sqlalchemy, fastapi, etc.) will +automatically route those spans to Scouter — no per-library configuration needed. + +Requirements: + pip install scouter-ml opentelemetry-api + +""" + +from opentelemetry import trace +from scouter.tracing import BatchConfig, ScouterInstrumentor, get_tracer + +# --------------------------------------------------------------------------- +# 1. Instrument once at application startup +# --------------------------------------------------------------------------- +# +# instrument() registers a Scouter-backed TracerProvider as the global OTel +# provider. After this call, trace.get_tracer() anywhere in the process +# returns a tracer backed by Scouter. +# +# Key options: +# transport_config – where to send spans (GrpcConfig / HttpConfig). +# Defaults to the SCOUTER_SERVER_URI env var. +# batch_config – controls flush timing (scheduled_delay_ms) and +# queue sizes. Tune for your latency / throughput. +# sample_ratio – fraction of traces to export (1.0 = 100%). +# attributes – key/value pairs stamped on every span produced by +# this provider (env, version, region, etc.). +# +# Call uninstrument() on shutdown to flush the export queue cleanly. + +instrumentor = ScouterInstrumentor() +instrumentor.instrument( + batch_config=BatchConfig(scheduled_delay_ms=500), + sample_ratio=1.0, + attributes={"env": "production", "service.version": "1.2.0"}, +) + +# --------------------------------------------------------------------------- +# 2. Use the standard OTel API — no Scouter import needed in business logic +# --------------------------------------------------------------------------- +# +# Because ScouterInstrumentor set the global provider, the standard OTel +# trace.get_tracer() call returns a tracer backed by Scouter. Third-party +# libraries that are already OTel-instrumented will use this same provider +# transparently. + +otel_tracer = trace.get_tracer("my-service") + + +def call_external_api(url: str) -> dict: + """Simulate an outbound HTTP call with a child span.""" + with otel_tracer.start_as_current_span("http.get", kind=trace.SpanKind.CLIENT) as span: + span.set_attribute("http.url", url) + span.set_attribute("http.method", "GET") + # ... real HTTP call here ... + span.set_attribute("http.status_code", 200) + return {"status": "ok"} + + +def query_database(query: str) -> list: + """Simulate a DB query with a child span.""" + with otel_tracer.start_as_current_span("db.query", kind=trace.SpanKind.CLIENT) as span: + span.set_attribute("db.system", "postgresql") + span.set_attribute("db.statement", query) + # ... real query here ... + return [] + + +# --------------------------------------------------------------------------- +# 3. Use the Scouter Tracer for decorator support +# --------------------------------------------------------------------------- +# +# get_tracer() returns a Scouter Tracer whose .span() decorator automatically +# creates a span around the decorated function. Both the standard OTel API +# and the Scouter tracer export to the same provider set by instrument(). + +scouter_tracer = get_tracer("my-service") + + +@scouter_tracer.span("validate_payload") +def validate_payload(payload: dict) -> bool: + """A simple decorated function — span is created and closed automatically.""" + return bool(payload) + + +def process_request(user_id: int, payload: dict) -> dict: + """Context manager approach — access span attributes directly.""" + with scouter_tracer.start_as_current_span("process_request") as span: + span.set_attribute("user.id", str(user_id)) + span.set_attribute("payload.size", len(str(payload))) + + validate_payload(payload) + result = call_external_api("https://api.example.com/data") + rows = query_database("SELECT * FROM events WHERE user_id = $1") + + span.add_event( + "processing_complete", + {"row_count": len(rows), "api_status": result["status"]}, + ) + return {"user_id": user_id, "rows": len(rows)} + + +# --------------------------------------------------------------------------- +# 4. Root span with baggage +# --------------------------------------------------------------------------- +# +# Baggage is a list of dicts propagated to downstream services via HTTP +# headers. Each dict is a single key/value pair. + + +def handle_incoming_request(request_id: str, user_id: int) -> None: + with scouter_tracer.start_as_current_span( + "handle_request", + baggage=[{"request_id": request_id}], + ) as root: + root.set_attribute("user.id", str(user_id)) + process_request(user_id, {"action": "purchase", "item_id": 42}) + + print(f"trace_id: {root.trace_id}") + print(f"span_id: {root.span_id}") + + +# --------------------------------------------------------------------------- +# 5. Default attributes propagate to every span +# --------------------------------------------------------------------------- +# +# The attributes passed to instrument() ("env", "service.version") are +# automatically stamped on every span — including spans emitted by +# third-party libraries. You do not need to set them manually. + +# --------------------------------------------------------------------------- +# 6. Shutdown +# --------------------------------------------------------------------------- +# +# uninstrument() flushes the export queue and resets the global OTel provider. +# Call this during application shutdown (SIGTERM handler, lifespan event, etc.). + + +def shutdown() -> None: + instrumentor.uninstrument() + + +# --------------------------------------------------------------------------- +# Run +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + handle_incoming_request(request_id="req-abc-123", user_id=7) + shutdown() + print("Done. Spans exported to Scouter.") diff --git a/py-scouter/python/scouter/tracing/__init__.py b/py-scouter/python/scouter/tracing/__init__.py index d0189ab39..a99a8d044 100644 --- a/py-scouter/python/scouter/tracing/__init__.py +++ b/py-scouter/python/scouter/tracing/__init__.py @@ -493,7 +493,7 @@ def _instrument(self, **kwargs) -> None: trace._TRACER_PROVIDER_SET_ONCE._done = False # pylint: disable=protected-access trace._TRACER_PROVIDER_SET_ONCE._lock = __import__("threading").Lock() # pylint: disable=protected-access - set_tracer_provider(self._provider) + set_tracer_provider(self._provider) # type: ignore def instrument( self,