Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 108 additions & 20 deletions crates/scouter_dataframe/src/parquet/tracing/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::sync::Arc;
use tokio::sync::oneshot;
use tokio::sync::{RwLock as AsyncRwLock, mpsc};
use tokio::time::{Duration, interval};
use tracing::{debug, error, info, instrument};
use tracing::{Instrument, Level, debug, error, info, instrument, span};
use url::Url;

const TRACE_SPAN_TABLE_NAME: &str = "trace_spans";
Expand All @@ -38,6 +38,17 @@ const TRACE_SPAN_TABLE_NAME: &str = "trace_spans";
const TASK_OPTIMIZE: &str = "trace_optimize";
const TASK_RETENTION: &str = "trace_retention";

mod phase0 {
#[allow(dead_code)]
pub mod spans {
pub const DELTA_TABLE_LOAD: &str = "delta.table.load";
pub const DELTA_SNAPSHOT_REFRESH: &str = "delta.snapshot.refresh";
pub const DELTA_CATALOG_SWAP: &str = "delta.catalog.swap";
pub const DELTA_OPTIMIZE: &str = "delta.optimize";
pub const UPDATE_INCREMENTAL: &str = "update_incremental";
}
}

/// Days from year-0001 to Unix epoch (1970-01-01), used to convert chrono → Arrow Date32.
/// Equivalent to `NaiveDate::from_ymd_opt(1970, 1, 1).unwrap().num_days_from_ce()`.
const UNIX_EPOCH_DAYS: i32 = 719_163;
Expand Down Expand Up @@ -149,6 +160,12 @@ async fn build_or_create_table_inner(
Ok(builder) => builder
.with_storage_backend(store, table_url.clone())
.load()
.instrument(span!(
Level::INFO,
phase0::spans::DELTA_TABLE_LOAD,
table = TRACE_SPAN_TABLE_NAME,
mode = "probe"
))
.await
.is_ok(),
Err(_) => false,
Expand All @@ -168,6 +185,12 @@ async fn build_or_create_table_inner(
let mut table = DeltaTableBuilder::from_url(table_url.clone())?
.with_storage_backend(store, table_url)
.load()
.instrument(span!(
Level::INFO,
phase0::spans::DELTA_TABLE_LOAD,
table = TRACE_SPAN_TABLE_NAME,
mode = "existing"
))
.await?;

// Schema evolution: add any columns present in the desired schema but missing from the
Expand Down Expand Up @@ -254,6 +277,13 @@ impl TraceSpanDBEngine {
// A freshly-created table has no committed Parquet files yet — table_provider()
// returns an error in that case. Defer registration until the first write populates the log.
if let Ok(provider) = delta_table.table_provider().await {
let _span = span!(
Level::INFO,
phase0::spans::DELTA_CATALOG_SWAP,
table = TRACE_SPAN_TABLE_NAME,
reason = "init"
)
.entered();
catalog.swap(TRACE_SPAN_TABLE_NAME, provider);
} else {
info!("Empty table at init — deferring catalog registration until first write");
Expand Down Expand Up @@ -416,7 +446,15 @@ impl TraceSpanDBEngine {

let new_provider = updated_table.table_provider().await?;
// Atomic single-step swap — no deregister/register gap where queries see "not found".
let _catalog_span = span!(
Level::INFO,
phase0::spans::DELTA_CATALOG_SWAP,
table = TRACE_SPAN_TABLE_NAME,
reason = "write"
)
.entered();
self.catalog.swap(TRACE_SPAN_TABLE_NAME, new_provider);
drop(_catalog_span);
// Ensure the table's object store is registered with the DataFusion session
// so that DeltaScan::scan() can resolve file URLs during query execution.
updated_table.update_datafusion_session(&self.ctx.state())?;
Expand All @@ -431,21 +469,38 @@ impl TraceSpanDBEngine {

let current_table = table_guard.clone();

let (updated_table, _metrics) = current_table
.optimize()
.with_target_size(std::num::NonZero::new(128 * 1024 * 1024).unwrap())
.with_type(OptimizeType::ZOrder(vec![
"start_time".to_string(),
"service_name".to_string(),
]))
// Bloom filters must be re-specified here — compaction rewrites all Parquet files
// from scratch using these properties. Without this, every compaction cycle
// silently discards all bloom filters on the rewritten files.
.with_writer_properties(Self::build_writer_props())
.await?;
let optimize_span = span!(
Level::INFO,
phase0::spans::DELTA_OPTIMIZE,
table = TRACE_SPAN_TABLE_NAME
);
let (updated_table, _metrics) = async {
current_table
.optimize()
.with_target_size(std::num::NonZero::new(128 * 1024 * 1024).unwrap())
.with_type(OptimizeType::ZOrder(vec![
"start_time".to_string(),
"service_name".to_string(),
]))
// Bloom filters must be re-specified here — compaction rewrites all Parquet files
// from scratch using these properties. Without this, every compaction cycle
// silently discards all bloom filters on the rewritten files.
.with_writer_properties(Self::build_writer_props())
.await
}
.instrument(optimize_span)
.await?;

self.catalog
.swap(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?);
let new_provider = updated_table.table_provider().await?;
let _catalog_span = span!(
Level::INFO,
phase0::spans::DELTA_CATALOG_SWAP,
table = TRACE_SPAN_TABLE_NAME,
reason = "optimize"
)
.entered();
self.catalog.swap(TRACE_SPAN_TABLE_NAME, new_provider);
drop(_catalog_span);
updated_table.update_datafusion_session(&self.ctx.state())?;

*table_guard = updated_table;
Expand All @@ -463,8 +518,16 @@ impl TraceSpanDBEngine {
.with_enforce_retention_duration(false)
.await?;

self.catalog
.swap(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?);
let new_provider = updated_table.table_provider().await?;
let _catalog_span = span!(
Level::INFO,
phase0::spans::DELTA_CATALOG_SWAP,
table = TRACE_SPAN_TABLE_NAME,
reason = "vacuum"
)
.entered();
self.catalog.swap(TRACE_SPAN_TABLE_NAME, new_provider);
drop(_catalog_span);
updated_table.update_datafusion_session(&self.ctx.state())?;

*table_guard = updated_table;
Expand Down Expand Up @@ -498,8 +561,16 @@ impl TraceSpanDBEngine {
cutoff_date
);

self.catalog
.swap(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?);
let new_provider = updated_table.table_provider().await?;
let _catalog_span = span!(
Level::INFO,
phase0::spans::DELTA_CATALOG_SWAP,
table = TRACE_SPAN_TABLE_NAME,
reason = "expire"
)
.entered();
self.catalog.swap(TRACE_SPAN_TABLE_NAME, new_provider);
drop(_catalog_span);
updated_table.update_datafusion_session(&self.ctx.state())?;

*table_guard = updated_table;
Expand Down Expand Up @@ -569,14 +640,23 @@ impl TraceSpanDBEngine {
/// This is mainly for multiple pods sharing the same storage.
/// Safety: clones the table before calling `update_incremental` so that a failure
/// (e.g. "Not a Delta table" on an empty table) leaves the original guard intact.
#[instrument(skip_all, name = "delta.snapshot.refresh", fields(table = TRACE_SPAN_TABLE_NAME))]
async fn refresh_table(&self) -> Result<(), TraceEngineError> {
let mut table_guard = self.table.write().await;
let current_version = table_guard.version();

// Clone before update_incremental — on failure the clone is discarded and the
// original guard stays intact, avoiding the corrupted-state bug described at line 301.
let mut refreshed = table_guard.clone();
match refreshed.update_incremental(None).await {
match refreshed
.update_incremental(None)
.instrument(span!(
Level::INFO,
phase0::spans::UPDATE_INCREMENTAL,
table = TRACE_SPAN_TABLE_NAME
))
.await
{
Ok(_) => {
if refreshed.version() > current_version {
info!(
Expand All @@ -586,7 +666,15 @@ impl TraceSpanDBEngine {
);
let new_provider = refreshed.table_provider().await?;
// Atomic swap — no gap between deregister and register.
let _catalog_span = span!(
Level::INFO,
phase0::spans::DELTA_CATALOG_SWAP,
table = TRACE_SPAN_TABLE_NAME,
reason = "refresh"
)
.entered();
self.catalog.swap(TRACE_SPAN_TABLE_NAME, new_provider);
drop(_catalog_span);
refreshed.update_datafusion_session(&self.ctx.state())?;
*table_guard = refreshed;
}
Expand Down
Loading
Loading