diff --git a/crates/scouter_dataframe/src/parquet/tracing/engine.rs b/crates/scouter_dataframe/src/parquet/tracing/engine.rs index 6a92fda1..40027bdb 100644 --- a/crates/scouter_dataframe/src/parquet/tracing/engine.rs +++ b/crates/scouter_dataframe/src/parquet/tracing/engine.rs @@ -29,7 +29,7 @@ use std::sync::Arc; use tokio::sync::oneshot; use tokio::sync::{RwLock as AsyncRwLock, mpsc}; use tokio::time::{Duration, interval}; -use tracing::{debug, error, info, instrument}; +use tracing::{Instrument, Level, debug, error, info, instrument, span}; use url::Url; const TRACE_SPAN_TABLE_NAME: &str = "trace_spans"; @@ -38,6 +38,17 @@ const TRACE_SPAN_TABLE_NAME: &str = "trace_spans"; const TASK_OPTIMIZE: &str = "trace_optimize"; const TASK_RETENTION: &str = "trace_retention"; +mod phase0 { + #[allow(dead_code)] + pub mod spans { + pub const DELTA_TABLE_LOAD: &str = "delta.table.load"; + pub const DELTA_SNAPSHOT_REFRESH: &str = "delta.snapshot.refresh"; + pub const DELTA_CATALOG_SWAP: &str = "delta.catalog.swap"; + pub const DELTA_OPTIMIZE: &str = "delta.optimize"; + pub const UPDATE_INCREMENTAL: &str = "update_incremental"; + } +} + /// Days from year-0001 to Unix epoch (1970-01-01), used to convert chrono → Arrow Date32. /// Equivalent to `NaiveDate::from_ymd_opt(1970, 1, 1).unwrap().num_days_from_ce()`. const UNIX_EPOCH_DAYS: i32 = 719_163; @@ -149,6 +160,12 @@ async fn build_or_create_table_inner( Ok(builder) => builder .with_storage_backend(store, table_url.clone()) .load() + .instrument(span!( + Level::INFO, + phase0::spans::DELTA_TABLE_LOAD, + table = TRACE_SPAN_TABLE_NAME, + mode = "probe" + )) .await .is_ok(), Err(_) => false, @@ -168,6 +185,12 @@ async fn build_or_create_table_inner( let mut table = DeltaTableBuilder::from_url(table_url.clone())? .with_storage_backend(store, table_url) .load() + .instrument(span!( + Level::INFO, + phase0::spans::DELTA_TABLE_LOAD, + table = TRACE_SPAN_TABLE_NAME, + mode = "existing" + )) .await?; // Schema evolution: add any columns present in the desired schema but missing from the @@ -254,6 +277,13 @@ impl TraceSpanDBEngine { // A freshly-created table has no committed Parquet files yet — table_provider() // returns an error in that case. Defer registration until the first write populates the log. if let Ok(provider) = delta_table.table_provider().await { + let _span = span!( + Level::INFO, + phase0::spans::DELTA_CATALOG_SWAP, + table = TRACE_SPAN_TABLE_NAME, + reason = "init" + ) + .entered(); catalog.swap(TRACE_SPAN_TABLE_NAME, provider); } else { info!("Empty table at init — deferring catalog registration until first write"); @@ -416,7 +446,15 @@ impl TraceSpanDBEngine { let new_provider = updated_table.table_provider().await?; // Atomic single-step swap — no deregister/register gap where queries see "not found". + let _catalog_span = span!( + Level::INFO, + phase0::spans::DELTA_CATALOG_SWAP, + table = TRACE_SPAN_TABLE_NAME, + reason = "write" + ) + .entered(); self.catalog.swap(TRACE_SPAN_TABLE_NAME, new_provider); + drop(_catalog_span); // Ensure the table's object store is registered with the DataFusion session // so that DeltaScan::scan() can resolve file URLs during query execution. updated_table.update_datafusion_session(&self.ctx.state())?; @@ -431,21 +469,38 @@ impl TraceSpanDBEngine { let current_table = table_guard.clone(); - let (updated_table, _metrics) = current_table - .optimize() - .with_target_size(std::num::NonZero::new(128 * 1024 * 1024).unwrap()) - .with_type(OptimizeType::ZOrder(vec![ - "start_time".to_string(), - "service_name".to_string(), - ])) - // Bloom filters must be re-specified here — compaction rewrites all Parquet files - // from scratch using these properties. Without this, every compaction cycle - // silently discards all bloom filters on the rewritten files. - .with_writer_properties(Self::build_writer_props()) - .await?; + let optimize_span = span!( + Level::INFO, + phase0::spans::DELTA_OPTIMIZE, + table = TRACE_SPAN_TABLE_NAME + ); + let (updated_table, _metrics) = async { + current_table + .optimize() + .with_target_size(std::num::NonZero::new(128 * 1024 * 1024).unwrap()) + .with_type(OptimizeType::ZOrder(vec![ + "start_time".to_string(), + "service_name".to_string(), + ])) + // Bloom filters must be re-specified here — compaction rewrites all Parquet files + // from scratch using these properties. Without this, every compaction cycle + // silently discards all bloom filters on the rewritten files. + .with_writer_properties(Self::build_writer_props()) + .await + } + .instrument(optimize_span) + .await?; - self.catalog - .swap(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?); + let new_provider = updated_table.table_provider().await?; + let _catalog_span = span!( + Level::INFO, + phase0::spans::DELTA_CATALOG_SWAP, + table = TRACE_SPAN_TABLE_NAME, + reason = "optimize" + ) + .entered(); + self.catalog.swap(TRACE_SPAN_TABLE_NAME, new_provider); + drop(_catalog_span); updated_table.update_datafusion_session(&self.ctx.state())?; *table_guard = updated_table; @@ -463,8 +518,16 @@ impl TraceSpanDBEngine { .with_enforce_retention_duration(false) .await?; - self.catalog - .swap(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?); + let new_provider = updated_table.table_provider().await?; + let _catalog_span = span!( + Level::INFO, + phase0::spans::DELTA_CATALOG_SWAP, + table = TRACE_SPAN_TABLE_NAME, + reason = "vacuum" + ) + .entered(); + self.catalog.swap(TRACE_SPAN_TABLE_NAME, new_provider); + drop(_catalog_span); updated_table.update_datafusion_session(&self.ctx.state())?; *table_guard = updated_table; @@ -498,8 +561,16 @@ impl TraceSpanDBEngine { cutoff_date ); - self.catalog - .swap(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?); + let new_provider = updated_table.table_provider().await?; + let _catalog_span = span!( + Level::INFO, + phase0::spans::DELTA_CATALOG_SWAP, + table = TRACE_SPAN_TABLE_NAME, + reason = "expire" + ) + .entered(); + self.catalog.swap(TRACE_SPAN_TABLE_NAME, new_provider); + drop(_catalog_span); updated_table.update_datafusion_session(&self.ctx.state())?; *table_guard = updated_table; @@ -569,6 +640,7 @@ impl TraceSpanDBEngine { /// This is mainly for multiple pods sharing the same storage. /// Safety: clones the table before calling `update_incremental` so that a failure /// (e.g. "Not a Delta table" on an empty table) leaves the original guard intact. + #[instrument(skip_all, name = "delta.snapshot.refresh", fields(table = TRACE_SPAN_TABLE_NAME))] async fn refresh_table(&self) -> Result<(), TraceEngineError> { let mut table_guard = self.table.write().await; let current_version = table_guard.version(); @@ -576,7 +648,15 @@ impl TraceSpanDBEngine { // Clone before update_incremental — on failure the clone is discarded and the // original guard stays intact, avoiding the corrupted-state bug described at line 301. let mut refreshed = table_guard.clone(); - match refreshed.update_incremental(None).await { + match refreshed + .update_incremental(None) + .instrument(span!( + Level::INFO, + phase0::spans::UPDATE_INCREMENTAL, + table = TRACE_SPAN_TABLE_NAME + )) + .await + { Ok(_) => { if refreshed.version() > current_version { info!( @@ -586,7 +666,15 @@ impl TraceSpanDBEngine { ); let new_provider = refreshed.table_provider().await?; // Atomic swap — no gap between deregister and register. + let _catalog_span = span!( + Level::INFO, + phase0::spans::DELTA_CATALOG_SWAP, + table = TRACE_SPAN_TABLE_NAME, + reason = "refresh" + ) + .entered(); self.catalog.swap(TRACE_SPAN_TABLE_NAME, new_provider); + drop(_catalog_span); refreshed.update_datafusion_session(&self.ctx.state())?; *table_guard = refreshed; } diff --git a/crates/scouter_dataframe/src/parquet/tracing/queries.rs b/crates/scouter_dataframe/src/parquet/tracing/queries.rs index b00222c3..f1a6838c 100644 --- a/crates/scouter_dataframe/src/parquet/tracing/queries.rs +++ b/crates/scouter_dataframe/src/parquet/tracing/queries.rs @@ -24,7 +24,20 @@ use std::hash::{Hash, Hasher}; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; -use tracing::{error, info, instrument}; +use tracing::{Instrument, Level, error, info, instrument, span}; + +mod phase0 { + pub mod spans { + pub const TRACE_QUERY_METRICS: &str = "scouter.trace.query.metrics"; + pub const TRACE_QUERY_SPANS: &str = "scouter.trace.query.spans"; + pub const DF_TABLE_RESOLVE: &str = "df.table.resolve"; + pub const DF_LOGICAL_BUILD: &str = "df.logical.build"; + pub const DF_PHYSICAL_PLAN: &str = "df.physical.plan"; + pub const DF_COLLECT: &str = "df.collect"; + pub const ARROW_CONVERT: &str = "arrow.convert"; + pub const TRACE_TREE_BUILD: &str = "trace.tree.build"; + } +} /// Days from year-0001 to Unix epoch (1970-01-01), used to convert chrono → Arrow Date32. const UNIX_EPOCH_DAYS: i32 = 719_163; @@ -128,50 +141,119 @@ struct FlatSpan { struct TraceQueryBuilder { df: DataFrame, + endpoint: &'static str, + table_name: &'static str, } impl TraceQueryBuilder { async fn set_table( ctx: Arc, table_name: &str, + endpoint: &'static str, ) -> Result { let df = ctx .table(table_name) + .instrument(span!( + Level::INFO, + phase0::spans::DF_TABLE_RESOLVE, + endpoint, + table = table_name + )) .await .inspect_err(|e| error!("Failed to load table {}: {}", table_name, e))?; - Ok(Self { df }) + Ok(Self { + df, + endpoint, + table_name: SPAN_TABLE_NAME, + }) } fn select_columns(mut self, columns: &[&str]) -> Result { + let _span = span!( + Level::INFO, + phase0::spans::DF_LOGICAL_BUILD, + endpoint = self.endpoint, + table = self.table_name, + phase = "select_columns" + ) + .entered(); self.df = self.df.select_columns(columns)?; Ok(self) } fn add_filter(mut self, expr: Expr) -> Result { + let _span = span!( + Level::INFO, + phase0::spans::DF_LOGICAL_BUILD, + endpoint = self.endpoint, + table = self.table_name, + phase = "filter" + ) + .entered(); self.df = self.df.filter(expr)?; Ok(self) } fn add_sort(mut self, sort: Vec) -> Result { + let _span = span!( + Level::INFO, + phase0::spans::DF_LOGICAL_BUILD, + endpoint = self.endpoint, + table = self.table_name, + phase = "sort" + ) + .entered(); self.df = self.df.sort(sort)?; Ok(self) } fn with_limit(mut self, n: Option) -> Result { + let _span = span!( + Level::INFO, + phase0::spans::DF_LOGICAL_BUILD, + endpoint = self.endpoint, + table = self.table_name, + phase = "limit" + ) + .entered(); self.df = self.df.limit(0, n)?; Ok(self) } async fn execute(self) -> Result, TraceEngineError> { - let batches = self - .df - .collect() - .await - .inspect_err(|e| error!("Failed to collect query results: {}", e))?; + let batches = collect_with_phase0(self.df, self.endpoint, self.table_name).await?; Ok(batches) } } +async fn collect_with_phase0( + df: DataFrame, + endpoint: &'static str, + table_name: &'static str, +) -> Result, TraceEngineError> { + df.clone() + .create_physical_plan() + .instrument(span!( + Level::INFO, + phase0::spans::DF_PHYSICAL_PLAN, + endpoint, + table = table_name + )) + .await + .map_err(TraceEngineError::DatafusionError)?; + + df.collect() + .instrument(span!( + Level::INFO, + phase0::spans::DF_COLLECT, + endpoint, + table = table_name + )) + .await + .inspect_err(|e| error!("Failed to collect query results: {}", e)) + .map_err(TraceEngineError::DatafusionError) +} + /// Extract attributes from a MapArray at a given row index. fn extract_attributes(map_array: &MapArray, row_idx: usize) -> Vec { if map_array.is_null(row_idx) { @@ -822,7 +904,7 @@ impl TraceQueries { /// /// When `trace_id_bytes` is 16 bytes, results are cached for 5 minutes — repeat detail /// clicks (common in the UI) return in <1µs without hitting Delta Lake. - #[instrument(skip_all)] + #[instrument(skip_all, name = "scouter.trace.query.spans")] #[allow(clippy::too_many_arguments)] pub async fn get_trace_spans( &self, @@ -890,7 +972,12 @@ impl TraceQueries { end_time: Option<&DateTime>, limit: Option, ) -> Result, TraceEngineError> { - let mut builder = TraceQueryBuilder::set_table(self.ctx.clone(), SPAN_TABLE_NAME).await?; + let mut builder = TraceQueryBuilder::set_table( + self.ctx.clone(), + SPAN_TABLE_NAME, + phase0::spans::TRACE_QUERY_SPANS, + ) + .await?; // Partition filters FIRST — eliminates whole partition_date=YYYY-MM-DD/ directories // at directory level before any file metadata or Parquet statistics are read. @@ -941,8 +1028,26 @@ impl TraceQueries { batches.len() ); - let flat_spans = batches_to_flat_spans(batches)?; - Ok(build_span_tree(flat_spans)) + let flat_spans = { + let _span = span!( + Level::INFO, + phase0::spans::ARROW_CONVERT, + endpoint = phase0::spans::TRACE_QUERY_SPANS, + table = SPAN_TABLE_NAME + ) + .entered(); + batches_to_flat_spans(batches)? + }; + let spans = { + let _span = span!( + Level::INFO, + phase0::spans::TRACE_TREE_BUILD, + endpoint = phase0::spans::TRACE_QUERY_SPANS + ) + .entered(); + build_span_tree(flat_spans) + }; + Ok(spans) } /// Find committed anchor spans for awaiting eval rows. @@ -991,8 +1096,12 @@ impl TraceQueries { .map(|span_id| lit(ScalarValue::Binary(Some(span_id.as_bytes().to_vec())))) .collect(); - let mut builder = - TraceQueryBuilder::set_table(self.ctx.clone(), SPAN_TABLE_NAME).await?; + let mut builder = TraceQueryBuilder::set_table( + self.ctx.clone(), + SPAN_TABLE_NAME, + phase0::spans::TRACE_QUERY_SPANS, + ) + .await?; builder = builder.add_filter(col(PARTITION_DATE_COL).gt_eq(date_lit(&window_start)))?; builder = builder.add_filter(col(PARTITION_DATE_COL).lt_eq(date_lit(&window_end)))?; builder = builder.add_filter(col(END_TIME_COL).gt_eq(ts_lit(&window_start)))?; @@ -1099,7 +1208,7 @@ impl TraceQueries { /// where `parent_span_id IS NULL`. Service filter applies to root spans only. /// /// `entity_trace_ids` is an optional pre-resolved list of binary trace IDs (16 bytes each). - #[instrument(skip_all)] + #[instrument(skip_all, name = "scouter.trace.query.metrics")] pub async fn get_trace_metrics( &self, request: &scouter_types::TraceMetricsRequest, @@ -1125,18 +1234,36 @@ impl TraceQueries { let mut spans_df = self .ctx .table(SPAN_TABLE_NAME) + .instrument(span!( + Level::INFO, + phase0::spans::DF_TABLE_RESOLVE, + endpoint = phase0::spans::TRACE_QUERY_METRICS, + table = SPAN_TABLE_NAME + )) .await .map_err(TraceEngineError::DatafusionError)?; - // Partition directory pruning — eliminates whole YYYY-MM-DD/ directories before - // DataFusion reads a single file's metadata or Parquet column statistics. - spans_df = spans_df.filter(col(PARTITION_DATE_COL).gt_eq(date_lit(&request.start_time)))?; - spans_df = spans_df.filter(col(PARTITION_DATE_COL).lt_eq(date_lit(&request.end_time)))?; - - // Row-group pruning — typed Timestamp(Microsecond, UTC) literals let DataFusion - // use Parquet column min/max stats within the surviving partition directories. - spans_df = spans_df.filter(col(START_TIME_COL).gt_eq(ts_lit(&request.start_time)))?; - spans_df = spans_df.filter(col(START_TIME_COL).lt(ts_lit(&request.end_time)))?; + { + let _span = span!( + Level::INFO, + phase0::spans::DF_LOGICAL_BUILD, + endpoint = phase0::spans::TRACE_QUERY_METRICS, + table = SPAN_TABLE_NAME, + phase = "time_filters" + ) + .entered(); + // Partition directory pruning — eliminates whole YYYY-MM-DD/ directories before + // DataFusion reads a single file's metadata or Parquet column statistics. + spans_df = + spans_df.filter(col(PARTITION_DATE_COL).gt_eq(date_lit(&request.start_time)))?; + spans_df = + spans_df.filter(col(PARTITION_DATE_COL).lt_eq(date_lit(&request.end_time)))?; + + // Row-group pruning — typed Timestamp(Microsecond, UTC) literals let DataFusion + // use Parquet column min/max stats within the surviving partition directories. + spans_df = spans_df.filter(col(START_TIME_COL).gt_eq(ts_lit(&request.start_time)))?; + spans_df = spans_df.filter(col(START_TIME_COL).lt(ts_lit(&request.end_time)))?; + } // ── Phase 3: trace_level — aggregate per-trace ─────────────────────── // @@ -1188,7 +1315,17 @@ impl TraceQueries { max(error_count_case).alias(ERROR_COUNT_COL), ]; - let trace_level_df = spans_df.aggregate(vec![col(TRACE_ID_COL)], agg_exprs)?; + let trace_level_df = { + let _span = span!( + Level::INFO, + phase0::spans::DF_LOGICAL_BUILD, + endpoint = phase0::spans::TRACE_QUERY_METRICS, + table = SPAN_TABLE_NAME, + phase = "trace_level_aggregate" + ) + .entered(); + spans_df.aggregate(vec![col(TRACE_ID_COL)], agg_exprs)? + }; // ── Phase 4: service_filtered — duration_ms, null guard, service filter ── // @@ -1201,9 +1338,19 @@ impl TraceQueries { - df_cast(col(START_TIME_COL), DataType::Int64)) / lit(1000i64); - let mut service_filtered_df = trace_level_df - .filter(col("trace_end").is_not_null())? - .with_column("duration_ms", duration_expr)?; + let mut service_filtered_df = { + let _span = span!( + Level::INFO, + phase0::spans::DF_LOGICAL_BUILD, + endpoint = phase0::spans::TRACE_QUERY_METRICS, + table = SPAN_TABLE_NAME, + phase = "service_filter" + ) + .entered(); + trace_level_df + .filter(col("trace_end").is_not_null())? + .with_column("duration_ms", duration_expr)? + }; if let Some(clause) = &request.clause { let summary_view_seq = METRICS_SUMMARY_VIEW_SEQ.fetch_add(1, Ordering::Relaxed); @@ -1258,7 +1405,17 @@ impl TraceQueries { // Replaces the `bucketed` CTE. // date_trunc(precision_literal, timestamp_expr) — precision is a Utf8 scalar. let bucket_expr = date_trunc(lit(bucket_interval), col(START_TIME_COL)); - let bucketed_df = service_filtered_df.with_column("bucket_start", bucket_expr)?; + let bucketed_df = { + let _span = span!( + Level::INFO, + phase0::spans::DF_LOGICAL_BUILD, + endpoint = phase0::spans::TRACE_QUERY_METRICS, + table = SPAN_TABLE_NAME, + phase = "bucket" + ) + .entered(); + service_filtered_df.with_column("bucket_start", bucket_expr)? + }; // ── Phase 6: Final bucketed aggregation ───────────────────────────── let duration_f64 = df_cast(col("duration_ms"), DataType::Float64); @@ -1267,115 +1424,138 @@ impl TraceQueries { // approx_percentile_cont in DataFusion 52: (SortExpr, percentile, limit: Option) // SortExpr is col.sort(asc, nulls_first); None limit = no row-count cap. - let final_df = bucketed_df - .aggregate( - vec![col("bucket_start")], - vec![ - count(lit(1i64)).alias("trace_count"), - avg(duration_f64.clone()).alias("avg_duration_ms"), - approx_percentile_cont( - duration_f64.clone().sort(true, false), - lit(0.50f64), - None, - ) - .alias("p50_duration_ms"), - approx_percentile_cont( - duration_f64.clone().sort(true, false), - lit(0.95f64), - None, - ) - .alias("p95_duration_ms"), - approx_percentile_cont(duration_f64.sort(true, false), lit(0.99f64), None) - .alias("p99_duration_ms"), - avg(error_rate_case).alias("error_rate"), - ], - )? - .sort(vec![col("bucket_start").sort(true, true)])?; + let final_df = { + let _span = span!( + Level::INFO, + phase0::spans::DF_LOGICAL_BUILD, + endpoint = phase0::spans::TRACE_QUERY_METRICS, + table = SPAN_TABLE_NAME, + phase = "final_aggregate" + ) + .entered(); + bucketed_df + .aggregate( + vec![col("bucket_start")], + vec![ + count(lit(1i64)).alias("trace_count"), + avg(duration_f64.clone()).alias("avg_duration_ms"), + approx_percentile_cont( + duration_f64.clone().sort(true, false), + lit(0.50f64), + None, + ) + .alias("p50_duration_ms"), + approx_percentile_cont( + duration_f64.clone().sort(true, false), + lit(0.95f64), + None, + ) + .alias("p95_duration_ms"), + approx_percentile_cont(duration_f64.sort(true, false), lit(0.99f64), None) + .alias("p99_duration_ms"), + avg(error_rate_case).alias("error_rate"), + ], + )? + .sort(vec![col("bucket_start").sort(true, true)])? + }; - let batches = final_df - .collect() - .await - .map_err(TraceEngineError::DatafusionError)?; + let batches = collect_with_phase0( + final_df, + phase0::spans::TRACE_QUERY_METRICS, + SPAN_TABLE_NAME, + ) + .await?; let mut metrics = Vec::new(); - for batch in &batches { - let schema = batch.schema(); - - // DATE_TRUNC may return Timestamp(Nanosecond) when string literals in the WHERE - // clause cause DataFusion to upcast the column. Cast explicitly to - // Timestamp(Microsecond, UTC) so Arrow handles the ns→µs division correctly, - // regardless of the sub-type returned by the query plan. - let raw_bucket = batch.column(schema.index_of("bucket_start").unwrap()); - let bucket_arr = arrow::compute::cast( - raw_bucket, - &arrow::datatypes::DataType::Timestamp( - arrow::datatypes::TimeUnit::Microsecond, - Some("UTC".into()), - ), + { + let _span = span!( + Level::INFO, + phase0::spans::ARROW_CONVERT, + endpoint = phase0::spans::TRACE_QUERY_METRICS, + table = SPAN_TABLE_NAME ) - .map_err(|e| TraceEngineError::BatchConversion(format!("bucket_start cast: {}", e)))?; - let bucket_col = bucket_arr - .as_any() - .downcast_ref::() - .ok_or_else(|| TraceEngineError::BatchConversion("bucket_start".into()))?; - let count_col = batch - .column(schema.index_of("trace_count").unwrap()) - .as_any() - .downcast_ref::() - .ok_or_else(|| TraceEngineError::BatchConversion("trace_count".into()))?; - let avg_col = batch - .column(schema.index_of("avg_duration_ms").unwrap()) - .as_any() - .downcast_ref::() - .ok_or_else(|| TraceEngineError::BatchConversion("avg_duration_ms".into()))?; - let p50_col = batch - .column(schema.index_of("p50_duration_ms").unwrap()) - .as_any() - .downcast_ref::() - .ok_or_else(|| TraceEngineError::BatchConversion("p50_duration_ms".into()))?; - let p95_col = batch - .column(schema.index_of("p95_duration_ms").unwrap()) - .as_any() - .downcast_ref::() - .ok_or_else(|| TraceEngineError::BatchConversion("p95_duration_ms".into()))?; - let p99_col = batch - .column(schema.index_of("p99_duration_ms").unwrap()) - .as_any() - .downcast_ref::() - .ok_or_else(|| TraceEngineError::BatchConversion("p99_duration_ms".into()))?; - let err_col = batch - .column(schema.index_of("error_rate").unwrap()) - .as_any() - .downcast_ref::() - .ok_or_else(|| TraceEngineError::BatchConversion("error_rate".into()))?; - - for i in 0..batch.num_rows() { - let micros = bucket_col.value(i); - let bucket_start = DateTime::from_timestamp_micros(micros) - .unwrap_or_default() - .with_timezone(&Utc); - - metrics.push(TraceMetricBucket { - bucket_start, - trace_count: count_col.value(i), - avg_duration_ms: avg_col.value(i), - p50_duration_ms: if p50_col.is_null(i) { - None - } else { - Some(p50_col.value(i)) - }, - p95_duration_ms: if p95_col.is_null(i) { - None - } else { - Some(p95_col.value(i)) - }, - p99_duration_ms: if p99_col.is_null(i) { - None - } else { - Some(p99_col.value(i)) - }, - error_rate: err_col.value(i), - }); + .entered(); + for batch in &batches { + let schema = batch.schema(); + + // DATE_TRUNC may return Timestamp(Nanosecond) when string literals in the WHERE + // clause cause DataFusion to upcast the column. Cast explicitly to + // Timestamp(Microsecond, UTC) so Arrow handles the ns→µs division correctly, + // regardless of the sub-type returned by the query plan. + let raw_bucket = batch.column(schema.index_of("bucket_start").unwrap()); + let bucket_arr = arrow::compute::cast( + raw_bucket, + &arrow::datatypes::DataType::Timestamp( + arrow::datatypes::TimeUnit::Microsecond, + Some("UTC".into()), + ), + ) + .map_err(|e| { + TraceEngineError::BatchConversion(format!("bucket_start cast: {}", e)) + })?; + let bucket_col = bucket_arr + .as_any() + .downcast_ref::() + .ok_or_else(|| TraceEngineError::BatchConversion("bucket_start".into()))?; + let count_col = batch + .column(schema.index_of("trace_count").unwrap()) + .as_any() + .downcast_ref::() + .ok_or_else(|| TraceEngineError::BatchConversion("trace_count".into()))?; + let avg_col = batch + .column(schema.index_of("avg_duration_ms").unwrap()) + .as_any() + .downcast_ref::() + .ok_or_else(|| TraceEngineError::BatchConversion("avg_duration_ms".into()))?; + let p50_col = batch + .column(schema.index_of("p50_duration_ms").unwrap()) + .as_any() + .downcast_ref::() + .ok_or_else(|| TraceEngineError::BatchConversion("p50_duration_ms".into()))?; + let p95_col = batch + .column(schema.index_of("p95_duration_ms").unwrap()) + .as_any() + .downcast_ref::() + .ok_or_else(|| TraceEngineError::BatchConversion("p95_duration_ms".into()))?; + let p99_col = batch + .column(schema.index_of("p99_duration_ms").unwrap()) + .as_any() + .downcast_ref::() + .ok_or_else(|| TraceEngineError::BatchConversion("p99_duration_ms".into()))?; + let err_col = batch + .column(schema.index_of("error_rate").unwrap()) + .as_any() + .downcast_ref::() + .ok_or_else(|| TraceEngineError::BatchConversion("error_rate".into()))?; + + for i in 0..batch.num_rows() { + let micros = bucket_col.value(i); + let bucket_start = DateTime::from_timestamp_micros(micros) + .unwrap_or_default() + .with_timezone(&Utc); + + metrics.push(TraceMetricBucket { + bucket_start, + trace_count: count_col.value(i), + avg_duration_ms: avg_col.value(i), + p50_duration_ms: if p50_col.is_null(i) { + None + } else { + Some(p50_col.value(i)) + }, + p95_duration_ms: if p95_col.is_null(i) { + None + } else { + Some(p95_col.value(i)) + }, + p99_duration_ms: if p99_col.is_null(i) { + None + } else { + Some(p99_col.value(i)) + }, + error_rate: err_col.value(i), + }); + } } } @@ -1427,39 +1607,82 @@ impl TraceQueries { let mut spans_df = self .ctx .table(SPAN_TABLE_NAME) + .instrument(span!( + Level::INFO, + phase0::spans::DF_TABLE_RESOLVE, + endpoint = phase0::spans::TRACE_QUERY_SPANS, + table = SPAN_TABLE_NAME + )) .await .map_err(TraceEngineError::DatafusionError)?; - if let Some(start) = filters.start_time { - spans_df = spans_df.filter(col(PARTITION_DATE_COL).gt_eq(date_lit(&start)))?; - spans_df = spans_df.filter(col(START_TIME_COL).gt_eq(ts_lit(&start)))?; - } - if let Some(end) = filters.end_time { - spans_df = spans_df.filter(col(PARTITION_DATE_COL).lt_eq(date_lit(&end)))?; - spans_df = spans_df.filter(col(START_TIME_COL).lt(ts_lit(&end)))?; + { + let _span = span!( + Level::INFO, + phase0::spans::DF_LOGICAL_BUILD, + endpoint = phase0::spans::TRACE_QUERY_SPANS, + table = SPAN_TABLE_NAME, + phase = "filter_trace_spans" + ) + .entered(); + if let Some(start) = filters.start_time { + spans_df = spans_df.filter(col(PARTITION_DATE_COL).gt_eq(date_lit(&start)))?; + spans_df = spans_df.filter(col(START_TIME_COL).gt_eq(ts_lit(&start)))?; + } + if let Some(end) = filters.end_time { + spans_df = spans_df.filter(col(PARTITION_DATE_COL).lt_eq(date_lit(&end)))?; + spans_df = spans_df.filter(col(START_TIME_COL).lt(ts_lit(&end)))?; + } + spans_df = spans_df.select_columns(SPAN_COLUMNS)?; + spans_df = spans_df.sort(vec![col(START_TIME_COL).sort(true, true)])?; } - spans_df = spans_df.select_columns(SPAN_COLUMNS)?; - spans_df = spans_df.sort(vec![col(START_TIME_COL).sort(true, true)])?; // ── Phase 4: Inner join — spans filtered to the single matching trace ─ - let result_df = spans_df.join( - first_trace_df, - JoinType::Inner, - &[TRACE_ID_COL], - &["_match_tid"], - None, - )?; - - let batches = result_df - .collect() - .await - .map_err(TraceEngineError::DatafusionError)?; + let result_df = { + let _span = span!( + Level::INFO, + phase0::spans::DF_LOGICAL_BUILD, + endpoint = phase0::spans::TRACE_QUERY_SPANS, + table = SPAN_TABLE_NAME, + phase = "join_matching_trace" + ) + .entered(); + spans_df.join( + first_trace_df, + JoinType::Inner, + &[TRACE_ID_COL], + &["_match_tid"], + None, + )? + }; + + let batches = + collect_with_phase0(result_df, phase0::spans::TRACE_QUERY_SPANS, SPAN_TABLE_NAME) + .await?; if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) { return Ok(Vec::new()); } - let flat_spans = batches_to_flat_spans(batches)?; - Ok(build_span_tree(flat_spans)) + let flat_spans = { + let _span = span!( + Level::INFO, + phase0::spans::ARROW_CONVERT, + endpoint = phase0::spans::TRACE_QUERY_SPANS, + table = SPAN_TABLE_NAME + ) + .entered(); + batches_to_flat_spans(batches)? + }; + let spans = { + let _span = span!( + Level::INFO, + phase0::spans::TRACE_TREE_BUILD, + endpoint = phase0::spans::TRACE_QUERY_SPANS + ) + .entered(); + build_span_tree(flat_spans) + }; + Ok(spans) } } diff --git a/crates/scouter_dataframe/src/parquet/tracing/summary.rs b/crates/scouter_dataframe/src/parquet/tracing/summary.rs index 0f0cba91..840aa9c0 100644 --- a/crates/scouter_dataframe/src/parquet/tracing/summary.rs +++ b/crates/scouter_dataframe/src/parquet/tracing/summary.rs @@ -28,7 +28,7 @@ use std::sync::Arc; use tokio::sync::oneshot; use tokio::sync::{RwLock as AsyncRwLock, mpsc}; use tokio::time::{Duration, interval}; -use tracing::{debug, error, info, instrument}; +use tracing::{Instrument, Level, debug, error, info, instrument, span}; use url::Url; /// Days from CE epoch to Unix epoch (1970-01-01). @@ -40,6 +40,17 @@ const SUMMARY_TABLE_NAME: &str = "trace_summaries"; /// Control table task name for summary compaction coordination. const TASK_SUMMARY_OPTIMIZE: &str = "summary_optimize"; +mod phase0 { + pub mod spans { + pub const TRACE_QUERY_PAGINATED: &str = "scouter.trace.query.paginated"; + pub const DF_TABLE_RESOLVE: &str = "df.table.resolve"; + pub const DF_LOGICAL_BUILD: &str = "df.logical.build"; + pub const DF_PHYSICAL_PLAN: &str = "df.physical.plan"; + pub const DF_COLLECT: &str = "df.collect"; + pub const ARROW_CONVERT: &str = "arrow.convert"; + } +} + // ── Column name constants ──────────────────────────────────────────────────── const TRACE_ID_COL: &str = "trace_id"; const SERVICE_NAME_COL: &str = "service_name"; @@ -783,6 +794,33 @@ pub struct TraceSummaryQueries { const MAX_PAGE_LIMIT: usize = 500; +async fn collect_with_phase0( + df: DataFrame, + endpoint: &'static str, + table_name: &'static str, +) -> Result, TraceEngineError> { + df.clone() + .create_physical_plan() + .instrument(span!( + Level::INFO, + phase0::spans::DF_PHYSICAL_PLAN, + endpoint, + table = table_name + )) + .await + .map_err(TraceEngineError::DatafusionError)?; + + df.collect() + .instrument(span!( + Level::INFO, + phase0::spans::DF_COLLECT, + endpoint, + table = table_name + )) + .await + .map_err(TraceEngineError::DatafusionError) +} + /// Build one summary row per trace over the requested time window. /// /// The summary table can contain multiple rows for a trace as late spans arrive. @@ -796,17 +834,35 @@ pub(crate) async fn deduped_summary_df( use crate::parquet::tracing::queries::{date_lit, ts_lit}; use datafusion::functions_aggregate::expr_fn::{first_value, max, min, sum}; - let mut df = ctx.table(SUMMARY_TABLE_NAME).await?; + let mut df = ctx + .table(SUMMARY_TABLE_NAME) + .instrument(span!( + Level::INFO, + phase0::spans::DF_TABLE_RESOLVE, + endpoint = phase0::spans::TRACE_QUERY_PAGINATED, + table = SUMMARY_TABLE_NAME + )) + .await?; - // Time predicates stay first so Delta Lake can prune partitions and Parquet - // row groups before the aggregation merges summary fragments per trace. - if let Some(start) = time_window.start { - df = df.filter(col(PARTITION_DATE_COL).gt_eq(date_lit(&start)))?; - df = df.filter(col(START_TIME_COL).gt_eq(ts_lit(&start)))?; - } - if let Some(end) = time_window.end { - df = df.filter(col(PARTITION_DATE_COL).lt_eq(date_lit(&end)))?; - df = df.filter(col(START_TIME_COL).lt(ts_lit(&end)))?; + { + let _span = span!( + Level::INFO, + phase0::spans::DF_LOGICAL_BUILD, + endpoint = phase0::spans::TRACE_QUERY_PAGINATED, + table = SUMMARY_TABLE_NAME, + phase = "time_filters" + ) + .entered(); + // Time predicates stay first so Delta Lake can prune partitions and Parquet + // row groups before the aggregation merges summary fragments per trace. + if let Some(start) = time_window.start { + df = df.filter(col(PARTITION_DATE_COL).gt_eq(date_lit(&start)))?; + df = df.filter(col(START_TIME_COL).gt_eq(ts_lit(&start)))?; + } + if let Some(end) = time_window.end { + df = df.filter(col(PARTITION_DATE_COL).lt_eq(date_lit(&end)))?; + df = df.filter(col(START_TIME_COL).lt(ts_lit(&end)))?; + } } let by_span_end: Vec = vec![ @@ -820,8 +876,16 @@ pub(crate) async fn deduped_summary_df( // Duration is derived after aggregation because DataFusion cannot reuse two // aggregate outputs inside another aggregate expression in the same slot. - let df = df - .aggregate( + let df = { + let _span = span!( + Level::INFO, + phase0::spans::DF_LOGICAL_BUILD, + endpoint = phase0::spans::TRACE_QUERY_PAGINATED, + table = SUMMARY_TABLE_NAME, + phase = "dedupe_aggregate" + ) + .entered(); + df.aggregate( vec![col(TRACE_ID_COL)], vec![ min(col(START_TIME_COL)).alias(START_TIME_COL), @@ -850,7 +914,8 @@ pub(crate) async fn deduped_summary_df( DURATION_MS_COL, (col("_max_end_us") - col("_min_start_us")) / lit(1000i64), )? - .drop_columns(&["_max_end_us", "_min_start_us"])?; + .drop_columns(&["_max_end_us", "_min_start_us"])? + }; Ok(df) } @@ -870,6 +935,7 @@ impl TraceSummaryQueries { /// Time filters are pushed into the SQL WHERE clause for partition pruning. /// /// Secondary filters (service, errors, cursor) apply to the deduplicated DataFrame. + #[instrument(skip_all, name = "scouter.trace.query.paginated")] pub async fn get_paginated_traces( &self, filters: &TraceFilters, @@ -896,6 +962,14 @@ impl TraceSummaryQueries { }) .collect::>()?; if !binary_ids.is_empty() { + let _span = span!( + Level::INFO, + phase0::spans::DF_LOGICAL_BUILD, + endpoint = phase0::spans::TRACE_QUERY_PAGINATED, + table = SUMMARY_TABLE_NAME, + phase = "trace_id_filter" + ) + .entered(); df = df.filter(col(TRACE_ID_COL).in_list(binary_ids, false))?; } } @@ -927,6 +1001,14 @@ impl TraceSummaryQueries { .eq(cursor_ts) .and(col(TRACE_ID_COL).lt(cursor_tid))) }; + let _span = span!( + Level::INFO, + phase0::spans::DF_LOGICAL_BUILD, + endpoint = phase0::spans::TRACE_QUERY_PAGINATED, + table = SUMMARY_TABLE_NAME, + phase = "cursor_filter" + ) + .entered(); df = df.filter(cursor_expr)?; } @@ -948,23 +1030,44 @@ impl TraceSummaryQueries { // ── Sort: DESC for "next", ASC for "previous" ──────────────────────── // "previous" direction fetches the oldest limit+1 items newer than the cursor, // which matches the original Rust post-reversal behavior. - df = if direction == "previous" { - df.sort(vec![ - col(START_TIME_COL).sort(true, true), - col(TRACE_ID_COL).sort(true, true), - ])? - } else { - df.sort(vec![ - col(START_TIME_COL).sort(false, false), - col(TRACE_ID_COL).sort(false, false), - ])? - }; + { + let _span = span!( + Level::INFO, + phase0::spans::DF_LOGICAL_BUILD, + endpoint = phase0::spans::TRACE_QUERY_PAGINATED, + table = SUMMARY_TABLE_NAME, + phase = "sort_limit" + ) + .entered(); + df = if direction == "previous" { + df.sort(vec![ + col(START_TIME_COL).sort(true, true), + col(TRACE_ID_COL).sort(true, true), + ])? + } else { + df.sort(vec![ + col(START_TIME_COL).sort(false, false), + col(TRACE_ID_COL).sort(false, false), + ])? + }; - // ── LIMIT pushed into DataFusion (fetch limit+1 to detect next page) ─ - df = df.limit(0, Some(limit + 1))?; + // ── LIMIT pushed into DataFusion (fetch limit+1 to detect next page) ─ + df = df.limit(0, Some(limit + 1))?; + } - let batches = df.collect().await?; - let mut items = batches_to_trace_list_items(batches)?; + let batches = + collect_with_phase0(df, phase0::spans::TRACE_QUERY_PAGINATED, SUMMARY_TABLE_NAME) + .await?; + let mut items = { + let _span = span!( + Level::INFO, + phase0::spans::ARROW_CONVERT, + endpoint = phase0::spans::TRACE_QUERY_PAGINATED, + table = SUMMARY_TABLE_NAME + ) + .entered(); + batches_to_trace_list_items(batches)? + }; let has_more = items.len() > limit; if has_more { diff --git a/crates/scouter_server/src/api/routes/trace/route.rs b/crates/scouter_server/src/api/routes/trace/route.rs index 31f7192b..1d39abe8 100644 --- a/crates/scouter_server/src/api/routes/trace/route.rs +++ b/crates/scouter_server/src/api/routes/trace/route.rs @@ -31,8 +31,91 @@ use scouter_types::{ use std::collections::HashSet; use std::panic::{AssertUnwindSafe, catch_unwind}; use std::sync::Arc; +use tracing::field::Empty; use tracing::instrument; -use tracing::{debug, error}; +use tracing::{Span, debug, error, info_span}; + +mod phase0 { + #[allow(dead_code)] + pub mod attrs { + pub const TRACE_QUERY_ENDPOINT: &str = "trace.query.endpoint"; + pub const TRACE_QUERY_KIND: &str = "trace.query.kind"; + pub const TRACE_QUERY_HAS_START_TIME: &str = "trace.query.has_start_time"; + pub const TRACE_QUERY_HAS_END_TIME: &str = "trace.query.has_end_time"; + pub const TRACE_QUERY_WINDOW_MS: &str = "trace.query.window_ms"; + pub const TRACE_QUERY_LIMIT: &str = "trace.query.limit"; + pub const TRACE_QUERY_OFFSET: &str = "trace.query.offset"; + pub const TRACE_QUERY_TRACE_ID_PRESENT: &str = "trace.query.trace_id_present"; + pub const TRACE_QUERY_UNBOUNDED: &str = "trace.query.unbounded"; + pub const TRACE_QUERY_CACHE_HIT: &str = "trace.query.cache.hit"; + pub const TRACE_QUERY_CACHE_NAME: &str = "trace.query.cache.name"; + pub const TRACE_QUERY_RESULT_ROWS: &str = "trace.query.result.rows"; + pub const TRACE_QUERY_RESULT_BYTES_ESTIMATE: &str = "trace.query.result.bytes_estimate"; + pub const TRACE_QUERY_TABLE_VERSION: &str = "trace.query.table_version"; + pub const TRACE_QUERY_STORAGE_BACKEND: &str = "trace.query.storage_backend"; + pub const TRACE_QUERY_REFRESH_ORIGIN: &str = "trace.query.refresh_origin"; + } + + pub mod routes { + pub const TRACE_PAGINATED_PATH: &str = "{prefix}/trace/paginated"; + pub const TRACE_SPANS_PATH: &str = "{prefix}/trace/spans"; + pub const TRACE_METRICS_PATH: &str = "{prefix}/trace/metrics"; + pub const V1_TRACE_SPANS_PATH: &str = "{prefix}/v1/traces/{id}/spans"; + pub const V1_TRACES_PATH: &str = "{prefix}/v1/traces"; + } +} + +fn window_ms( + start_time: Option>, + end_time: Option>, +) -> Option { + match (start_time, end_time) { + (Some(start), Some(end)) => Some((end - start).num_milliseconds()), + _ => None, + } +} + +struct TraceQueryAttrs { + endpoint: &'static str, + kind: &'static str, + has_start_time: bool, + has_end_time: bool, + window_ms: Option, + limit: Option, + offset: Option, + trace_id_present: bool, + unbounded: bool, +} + +fn record_trace_query_common(attrs: TraceQueryAttrs) { + let span = Span::current(); + span.record(phase0::attrs::TRACE_QUERY_ENDPOINT, attrs.endpoint); + span.record(phase0::attrs::TRACE_QUERY_KIND, attrs.kind); + span.record( + phase0::attrs::TRACE_QUERY_HAS_START_TIME, + attrs.has_start_time, + ); + span.record(phase0::attrs::TRACE_QUERY_HAS_END_TIME, attrs.has_end_time); + if let Some(window_ms) = attrs.window_ms { + span.record(phase0::attrs::TRACE_QUERY_WINDOW_MS, window_ms); + } + if let Some(limit) = attrs.limit { + span.record(phase0::attrs::TRACE_QUERY_LIMIT, limit); + } + if let Some(offset) = attrs.offset { + span.record(phase0::attrs::TRACE_QUERY_OFFSET, offset); + } + span.record( + phase0::attrs::TRACE_QUERY_TRACE_ID_PRESENT, + attrs.trace_id_present, + ); + span.record(phase0::attrs::TRACE_QUERY_UNBOUNDED, attrs.unbounded); + span.record(phase0::attrs::TRACE_QUERY_STORAGE_BACKEND, "delta"); +} + +fn record_trace_query_result(row_count: usize) { + Span::current().record(phase0::attrs::TRACE_QUERY_RESULT_ROWS, row_count as i64); +} fn invalid_search_query(err: impl std::fmt::Display) -> (StatusCode, Json) { ( @@ -203,13 +286,45 @@ pub async fn get_trace_baggage( tag = "traces", security(("bearer_token" = [])) )] -#[instrument(skip_all)] +#[instrument( + skip_all, + name = "paginated_traces", + fields( + trace.query.endpoint = Empty, + trace.query.kind = Empty, + trace.query.has_start_time = Empty, + trace.query.has_end_time = Empty, + trace.query.window_ms = Empty, + trace.query.limit = Empty, + trace.query.offset = Empty, + trace.query.trace_id_present = Empty, + trace.query.unbounded = Empty, + trace.query.cache.hit = Empty, + trace.query.cache.name = Empty, + trace.query.result.rows = Empty, + trace.query.result.bytes_estimate = Empty, + trace.query.table_version = Empty, + trace.query.storage_backend = Empty, + trace.query.refresh_origin = Empty, + ) +)] pub async fn paginated_traces( State(data): State>, Json(body): Json, ) -> Result, (StatusCode, Json)> { let body = normalize_trace_filters(body)?; validate_filters(&body)?; + record_trace_query_common(TraceQueryAttrs { + endpoint: phase0::routes::TRACE_PAGINATED_PATH, + kind: "paginated", + has_start_time: body.start_time.is_some(), + has_end_time: body.end_time.is_some(), + window_ms: window_ms(body.start_time, body.end_time), + limit: body.limit.map(i64::from), + offset: None, + trace_id_present: body.trace_ids.as_ref().is_some_and(|ids| !ids.is_empty()), + unbounded: body.start_time.is_none() && body.end_time.is_none(), + }); debug!( "paginated_traces: limit={:?} start={:?} end={:?}", body.limit, body.start_time, body.end_time @@ -239,6 +354,8 @@ pub async fn paginated_traces( pagination_response.items.len() ); + record_trace_query_result(pagination_response.items.len()); + let _response_span = info_span!("response.serialize").entered(); Ok(Json(pagination_response)) } @@ -256,12 +373,44 @@ pub async fn paginated_traces( tag = "traces", security(("bearer_token" = [])) )] -#[instrument(skip_all)] +#[instrument( + skip_all, + name = "get_trace_spans_by_id", + fields( + trace.query.endpoint = Empty, + trace.query.kind = Empty, + trace.query.has_start_time = Empty, + trace.query.has_end_time = Empty, + trace.query.window_ms = Empty, + trace.query.limit = Empty, + trace.query.offset = Empty, + trace.query.trace_id_present = Empty, + trace.query.unbounded = Empty, + trace.query.cache.hit = Empty, + trace.query.cache.name = Empty, + trace.query.result.rows = Empty, + trace.query.result.bytes_estimate = Empty, + trace.query.table_version = Empty, + trace.query.storage_backend = Empty, + trace.query.refresh_origin = Empty, + ) +)] pub async fn get_trace_spans_by_id( State(data): State>, Extension(perms): Extension, Path(id): Path, ) -> Result, (StatusCode, Json)> { + record_trace_query_common(TraceQueryAttrs { + endpoint: phase0::routes::V1_TRACE_SPANS_PATH, + kind: "spans_by_id", + has_start_time: false, + has_end_time: false, + window_ms: None, + limit: None, + offset: None, + trace_id_present: true, + unbounded: true, + }); debug!("Getting trace spans for trace_id: {}", id); let trace_id_bytes = TraceId::hex_to_bytes(&id).map_err(|e| { error!("Invalid trace_id hex: {:?}", e); @@ -293,6 +442,8 @@ pub async fn get_trace_spans_by_id( ) })?; + record_trace_query_result(spans.len()); + let _response_span = info_span!("response.serialize").entered(); Ok(Json(TraceSpansResponse { spans: redact_trace_spans_for_permissions(spans, &perms), })) @@ -310,12 +461,44 @@ pub async fn get_trace_spans_by_id( tag = "traces", security(("bearer_token" = [])) )] -#[instrument(skip_all)] +#[instrument( + skip_all, + name = "get_trace_spans", + fields( + trace.query.endpoint = Empty, + trace.query.kind = Empty, + trace.query.has_start_time = Empty, + trace.query.has_end_time = Empty, + trace.query.window_ms = Empty, + trace.query.limit = Empty, + trace.query.offset = Empty, + trace.query.trace_id_present = Empty, + trace.query.unbounded = Empty, + trace.query.cache.hit = Empty, + trace.query.cache.name = Empty, + trace.query.result.rows = Empty, + trace.query.result.bytes_estimate = Empty, + trace.query.table_version = Empty, + trace.query.storage_backend = Empty, + trace.query.refresh_origin = Empty, + ) +)] pub async fn get_trace_spans( State(data): State>, Extension(perms): Extension, Query(params): Query, ) -> Result, (StatusCode, Json)> { + record_trace_query_common(TraceQueryAttrs { + endpoint: phase0::routes::TRACE_SPANS_PATH, + kind: "spans", + has_start_time: params.start_time.is_some(), + has_end_time: params.end_time.is_some(), + window_ms: None, + limit: None, + offset: None, + trace_id_present: true, + unbounded: params.start_time.is_none() && params.end_time.is_none(), + }); debug!( "Getting trace spans for trace_id: {}, service_name: {:?}", params.trace_id, params.service_name, @@ -366,6 +549,8 @@ pub async fn get_trace_spans( ) })?; + record_trace_query_result(spans.len()); + let _response_span = info_span!("response.serialize").entered(); Ok(Json(TraceSpansResponse { spans: redact_trace_spans_for_permissions(spans, &perms), })) @@ -466,12 +651,44 @@ pub async fn query_trace_spans_from_tags( tag = "traces", security(("bearer_token" = [])) )] -#[instrument(skip_all)] +#[instrument( + skip_all, + name = "trace_metrics", + fields( + trace.query.endpoint = Empty, + trace.query.kind = Empty, + trace.query.has_start_time = Empty, + trace.query.has_end_time = Empty, + trace.query.window_ms = Empty, + trace.query.limit = Empty, + trace.query.offset = Empty, + trace.query.trace_id_present = Empty, + trace.query.unbounded = Empty, + trace.query.cache.hit = Empty, + trace.query.cache.name = Empty, + trace.query.result.rows = Empty, + trace.query.result.bytes_estimate = Empty, + trace.query.table_version = Empty, + trace.query.storage_backend = Empty, + trace.query.refresh_origin = Empty, + ) +)] pub async fn trace_metrics( State(data): State>, Json(body): Json, ) -> Result, (StatusCode, Json)> { let body = normalize_metrics_request(body)?; + record_trace_query_common(TraceQueryAttrs { + endpoint: phase0::routes::TRACE_METRICS_PATH, + kind: "metrics", + has_start_time: true, + has_end_time: true, + window_ms: Some((body.end_time - body.start_time).num_milliseconds()), + limit: None, + offset: None, + trace_id_present: false, + unbounded: false, + }); if let Some(clause) = &body.clause { validate_clause(clause) .map_err(|msg| (StatusCode::BAD_REQUEST, Json(ScouterServerError::new(msg))))?; @@ -509,6 +726,8 @@ pub async fn trace_metrics( ) })?; + record_trace_query_result(metrics.len()); + let _response_span = info_span!("response.serialize").entered(); Ok(Json(TraceMetricsResponse { metrics })) } @@ -611,12 +830,44 @@ pub async fn query_spans_from_filters( ), tag = "traces" )] -#[instrument(skip_all)] +#[instrument( + skip_all, + name = "v1_otel_traces", + fields( + trace.query.endpoint = Empty, + trace.query.kind = Empty, + trace.query.has_start_time = Empty, + trace.query.has_end_time = Empty, + trace.query.window_ms = Empty, + trace.query.limit = Empty, + trace.query.offset = Empty, + trace.query.trace_id_present = Empty, + trace.query.unbounded = Empty, + trace.query.cache.hit = Empty, + trace.query.cache.name = Empty, + trace.query.result.rows = Empty, + trace.query.result.bytes_estimate = Empty, + trace.query.table_version = Empty, + trace.query.storage_backend = Empty, + trace.query.refresh_origin = Empty, + ) +)] pub async fn v1_otel_traces( State(data): State>, headers: HeaderMap, body: Bytes, ) -> Result)> { + record_trace_query_common(TraceQueryAttrs { + endpoint: phase0::routes::V1_TRACES_PATH, + kind: "otel_ingest", + has_start_time: false, + has_end_time: false, + window_ms: None, + limit: None, + offset: None, + trace_id_present: false, + unbounded: false, + }); let content_type = headers .get(axum::http::header::CONTENT_TYPE) .and_then(|v| v.to_str().ok()) @@ -640,6 +891,12 @@ pub async fn v1_otel_traces( ))), ) })?; + let span_count = request + .resource_spans + .iter() + .flat_map(|resource| &resource.scope_spans) + .map(|scope| scope.spans.len()) + .sum::(); data.trace_record_tx .try_send(TraceServerRecord { request }) @@ -658,6 +915,12 @@ pub async fn v1_otel_traces( })?; let response_bytes = ExportTraceServiceResponse::default().encode_to_vec(); + record_trace_query_result(span_count); + Span::current().record( + phase0::attrs::TRACE_QUERY_RESULT_BYTES_ESTIMATE, + response_bytes.len() as i64, + ); + let _response_span = info_span!("response.serialize").entered(); Ok(( StatusCode::OK, [(axum::http::header::CONTENT_TYPE, "application/x-protobuf")],