Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 70 additions & 39 deletions crates/scouter_dataframe/src/parquet/control/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use chrono::{DateTime, Duration, Utc};
use datafusion::logical_expr::{col, lit};
use datafusion::prelude::SessionContext;
use deltalake::{DeltaTable, DeltaTableBuilder, TableProperty};
use scouter_settings::ObjectStorageSettings;
use std::sync::Arc;
use tokio::sync::RwLock as AsyncRwLock;
use tracing::{debug, info, warn};
Expand Down Expand Up @@ -118,7 +117,6 @@ pub fn get_pod_id() -> String {
pub struct ControlTableEngine {
schema: SchemaRef,
#[allow(dead_code)] // Used for future vacuum/maintenance operations
object_store: ObjectStore,
table: Arc<AsyncRwLock<DeltaTable>>,
ctx: Arc<SessionContext>,
pod_id: String,
Expand All @@ -129,13 +127,9 @@ impl ControlTableEngine {
///
/// The `pod_id` identifies this instance for distributed locking. In K8s,
/// pass the pod hostname (`std::env::var("HOSTNAME")`).
pub async fn new(
storage_settings: &ObjectStorageSettings,
pod_id: String,
) -> Result<Self, TraceEngineError> {
let object_store = ObjectStore::new(storage_settings)?;
pub async fn new(object_store: &ObjectStore, pod_id: String) -> Result<Self, TraceEngineError> {
let schema = Arc::new(control_schema());
let table = build_or_create_control_table(&object_store, schema.clone()).await?;
let table = build_or_create_control_table(object_store, schema.clone()).await?;
let ctx = object_store.get_session()?;

if let Ok(provider) = table.table_provider().await {
Expand All @@ -146,7 +140,6 @@ impl ControlTableEngine {

Ok(Self {
schema,
object_store,
table: Arc::new(AsyncRwLock::new(table)),
ctx: Arc::new(ctx),
pod_id,
Expand Down Expand Up @@ -399,32 +392,40 @@ impl ControlTableEngine {
continue;
}

let get_string = |col_name: &str| -> String {
let col = batch.column_by_name(col_name).unwrap();
let casted =
arrow::compute::cast(col, &DataType::Utf8).expect("cast to Utf8 failed");
let arr = casted.as_any().downcast_ref::<StringArray>().unwrap();
arr.value(0).to_string()
};

let get_timestamp = |col_name: &str| -> Option<DateTime<Utc>> {
let col = batch.column_by_name(col_name).unwrap();
if col.is_null(0) {
return None;
}
let arr = col
let get_string = |col_name: &'static str| -> Result<String, TraceEngineError> {
let col = batch
.column_by_name(col_name)
.ok_or(TraceEngineError::DowncastError(col_name))?;
let casted = arrow::compute::cast(col, &DataType::Utf8)
.map_err(TraceEngineError::ArrowError)?;
let arr = casted
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap();
DateTime::from_timestamp_micros(arr.value(0))
.downcast_ref::<StringArray>()
.ok_or(TraceEngineError::DowncastError(col_name))?;
Ok(arr.value(0).to_string())
};

let task_name_val = get_string("task_name");
let status_val = get_string("status");
let pod_id_val = get_string("pod_id");
let claimed_at = get_timestamp("claimed_at").unwrap_or_else(Utc::now);
let completed_at = get_timestamp("completed_at");
let next_run_at = get_timestamp("next_run_at").unwrap_or_else(Utc::now);
let get_timestamp =
|col_name: &'static str| -> Result<Option<DateTime<Utc>>, TraceEngineError> {
let col = batch
.column_by_name(col_name)
.ok_or(TraceEngineError::DowncastError(col_name))?;
if col.is_null(0) {
return Ok(None);
}
let arr = col
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.ok_or(TraceEngineError::DowncastError(col_name))?;
Ok(DateTime::from_timestamp_micros(arr.value(0)))
};

let task_name_val = get_string("task_name")?;
let status_val = get_string("status")?;
let pod_id_val = get_string("pod_id")?;
let claimed_at = get_timestamp("claimed_at")?.unwrap_or_else(Utc::now);
let completed_at = get_timestamp("completed_at")?;
let next_run_at = get_timestamp("next_run_at")?.unwrap_or_else(Utc::now);

return Ok(Some(TaskRecord {
task_name: task_name_val,
Expand Down Expand Up @@ -453,6 +454,17 @@ impl ControlTableEngine {

// First, delete the existing row for this task (if any).
// On a brand-new table with no data, delete will fail — that's fine.
// Safety: task_name is always an internal constant (e.g. "summary_optimize").
// The delta-rs delete API takes a String predicate, not a parameterized Expr,
// so we assert the invariant defensively.
debug_assert!(
record
.task_name
.chars()
.all(|c| c.is_alphanumeric() || c == '_'),
"task_name must be alphanumeric + underscore, got: {}",
record.task_name
);
let predicate = format!("task_name = '{}'", record.task_name);
let delete_result = table_guard.clone().delete().with_predicate(predicate).await;

Expand All @@ -470,8 +482,19 @@ impl ControlTableEngine {
}
*table_guard = updated_table;
}
Err(_) => {
// No existing data to delete (new table) — just append
Err(e) => {
let err_msg = e.to_string();
// On a brand-new table with no data, delete fails because there's nothing
// to remove — this is expected and we fall through to append.
// All other errors are real failures and must be propagated.
if !err_msg.contains("No data") && !err_msg.contains("empty") {
warn!(
"Delete before write_task_update failed unexpectedly: {}",
err_msg
);
return Err(TraceEngineError::DataTableError(e));
}

let updated_table = table_guard
.clone()
.write(vec![batch])
Expand Down Expand Up @@ -587,12 +610,16 @@ mod tests {
use super::*;
use scouter_settings::ObjectStorageSettings;

fn make_test_object_store(storage_settings: &ObjectStorageSettings) -> ObjectStore {
ObjectStore::new(storage_settings).unwrap()
}

fn cleanup() {
let storage_settings = ObjectStorageSettings::default();
let current_dir = std::env::current_dir().unwrap();
let storage_path = current_dir.join(storage_settings.storage_root());
if storage_path.exists() {
std::fs::remove_dir_all(storage_path).unwrap();
let _ = std::fs::remove_dir_all(storage_path);
}
}

Expand All @@ -601,7 +628,8 @@ mod tests {
cleanup();

let settings = ObjectStorageSettings::default();
let engine = ControlTableEngine::new(&settings, "pod-1".to_string()).await?;
let object_store = make_test_object_store(&settings);
let engine = ControlTableEngine::new(&object_store, "pod-1".to_string()).await?;

// No tasks should exist yet
let due = engine.is_task_due("optimize").await?;
Expand All @@ -616,7 +644,8 @@ mod tests {
cleanup();

let settings = ObjectStorageSettings::default();
let engine = ControlTableEngine::new(&settings, "pod-1".to_string()).await?;
let object_store = make_test_object_store(&settings);
let engine = ControlTableEngine::new(&object_store, "pod-1".to_string()).await?;

// Claim a new task
let claimed = engine.try_claim_task("optimize").await?;
Expand Down Expand Up @@ -645,7 +674,8 @@ mod tests {
cleanup();

let settings = ObjectStorageSettings::default();
let engine = ControlTableEngine::new(&settings, "pod-1".to_string()).await?;
let object_store = make_test_object_store(&settings);
let engine = ControlTableEngine::new(&object_store, "pod-1".to_string()).await?;

// Claim and release with 0-second interval (immediately due again)
let claimed = engine.try_claim_task("vacuum").await?;
Expand Down Expand Up @@ -673,7 +703,8 @@ mod tests {
cleanup();

let settings = ObjectStorageSettings::default();
let engine = ControlTableEngine::new(&settings, "pod-1".to_string()).await?;
let object_store = make_test_object_store(&settings);
let engine = ControlTableEngine::new(&object_store, "pod-1".to_string()).await?;

// Claim two different tasks
let claimed_opt = engine.try_claim_task("optimize").await?;
Expand Down
10 changes: 9 additions & 1 deletion crates/scouter_dataframe/src/parquet/tracing/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl TraceSpanDBEngine {
} else {
info!("Empty table at init — deferring SessionContext registration until first write");
}
let control = ControlTableEngine::new(storage_settings, get_pod_id()).await?;
let control = ControlTableEngine::new(&object_store, get_pod_id()).await?;

Ok(TraceSpanDBEngine {
schema,
Expand Down Expand Up @@ -324,6 +324,9 @@ impl TraceSpanDBEngine {
self.ctx.deregister_table(TRACE_SPAN_TABLE_NAME)?;
self.ctx
.register_table(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?)?;
// Ensure the table's object store is registered with the DataFusion session
// so that DeltaScan::scan() can resolve file URLs during query execution.
updated_table.update_datafusion_session(&self.ctx.state())?;

*table_guard = updated_table;

Expand Down Expand Up @@ -351,6 +354,7 @@ impl TraceSpanDBEngine {
self.ctx.deregister_table(TRACE_SPAN_TABLE_NAME)?;
self.ctx
.register_table(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?)?;
updated_table.update_datafusion_session(&self.ctx.state())?;

*table_guard = updated_table;

Expand All @@ -370,6 +374,7 @@ impl TraceSpanDBEngine {
self.ctx.deregister_table(TRACE_SPAN_TABLE_NAME)?;
self.ctx
.register_table(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?)?;
updated_table.update_datafusion_session(&self.ctx.state())?;

*table_guard = updated_table;

Expand Down Expand Up @@ -404,6 +409,7 @@ impl TraceSpanDBEngine {
self.ctx.deregister_table(TRACE_SPAN_TABLE_NAME)?;
self.ctx
.register_table(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?)?;
updated_table.update_datafusion_session(&self.ctx.state())?;

*table_guard = updated_table;

Expand Down Expand Up @@ -495,6 +501,8 @@ impl TraceSpanDBEngine {
}
}
TableCommand::Optimize { respond_to } => {
// Response is sent before vacuum so callers aren't blocked
// on the potentially slow file-deletion pass.
let _ = respond_to.send(self.optimize_table().await);
if let Err(e) = self.vacuum_table(0).await {
error!("Post-optimize vacuum failed: {}", e);
Expand Down
Loading
Loading