From 7715b51aa0aaa28281b5ab5ea0fda75df693f39f Mon Sep 17 00:00:00 2001 From: DavdaJames Date: Thu, 2 Oct 2025 18:20:58 +0530 Subject: [PATCH 1/7] feat/additional-counter-numofrows-processed --- src/execution/live_updater.rs | 26 ++++ src/execution/source_indexer.rs | 28 +++- src/execution/stats.rs | 263 ++++++++++++++++++++++++++++++++ 3 files changed, 316 insertions(+), 1 deletion(-) diff --git a/src/execution/live_updater.rs b/src/execution/live_updater.rs index 13dde89e..cdc6f77c 100644 --- a/src/execution/live_updater.rs +++ b/src/execution/live_updater.rs @@ -27,6 +27,8 @@ pub struct FlowLiveUpdater { flow_ctx: Arc, join_set: Mutex>>>, stats_per_task: Vec>, + /// Global tracking of in-process rows per operation + operation_in_process_stats: Arc, recv_state: tokio::sync::Mutex, num_remaining_tasks_rx: watch::Receiver, @@ -83,6 +85,7 @@ struct SourceUpdateTask { plan: Arc, execution_ctx: Arc>, source_update_stats: Arc, + operation_in_process_stats: Arc, pool: PgPool, options: FlowLiveUpdaterOptions, @@ -137,6 +140,7 @@ impl SourceUpdateTask { let change_stream_stats = change_stream_stats.clone(); let pool = self.pool.clone(); let status_tx = self.status_tx.clone(); + let operation_in_process_stats = self.operation_in_process_stats.clone(); async move { let mut change_stream = change_stream; let retry_options = retryable::RetryOptions { @@ -203,6 +207,7 @@ impl SourceUpdateTask { }, super::source_indexer::UpdateMode::Normal, update_stats.clone(), + Some(operation_in_process_stats.clone()), concur_permit, Some(move || async move { SharedAckFn::ack(&shared_ack_fn).await @@ -328,6 +333,7 @@ impl FlowLiveUpdater { let mut join_set = JoinSet::new(); let mut stats_per_task = Vec::new(); + let operation_in_process_stats = Arc::new(stats::OperationInProcessStats::default()); for source_idx in 0..plan.import_ops.len() { let source_update_stats = Arc::new(stats::UpdateStats::default()); @@ -337,6 +343,7 @@ impl FlowLiveUpdater { plan: plan.clone(), execution_ctx: execution_ctx.clone(), source_update_stats: source_update_stats.clone(), + operation_in_process_stats: operation_in_process_stats.clone(), pool: pool.clone(), options: options.clone(), status_tx: status_tx.clone(), @@ -345,10 +352,12 @@ impl FlowLiveUpdater { join_set.spawn(source_update_task.run()); stats_per_task.push(source_update_stats); } + Ok(Self { flow_ctx, join_set: Mutex::new(Some(join_set)), stats_per_task, + operation_in_process_stats, recv_state: tokio::sync::Mutex::new(UpdateReceiveState { status_rx, last_num_source_updates: vec![0; plan.import_ops.len()], @@ -406,6 +415,23 @@ impl FlowLiveUpdater { } } + /// Get the total number of rows currently being processed across all operations. + pub fn get_total_in_process_count(&self) -> i64 { + self.operation_in_process_stats.get_total_in_process_count() + } + + /// Get the number of rows currently being processed for a specific operation. + pub fn get_operation_in_process_count(&self, operation_name: &str) -> i64 { + self.operation_in_process_stats + .get_operation_in_process_count(operation_name) + } + + /// Get a snapshot of all operation in-process counts. + pub fn get_all_operations_in_process(&self) -> std::collections::HashMap { + self.operation_in_process_stats + .get_all_operations_in_process() + } + pub async fn next_status_updates(&self) -> Result { let mut recv_state = self.recv_state.lock().await; let recv_state = &mut *recv_state; diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index 7545ebf1..b0b3400b 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -317,15 +317,31 @@ impl SourceIndexingContext { row_input: ProcessSourceRowInput, mode: UpdateMode, update_stats: Arc, + operation_in_process_stats: Option>, _concur_permit: concur_control::CombinedConcurrencyControllerPermit, ack_fn: Option, pool: PgPool, ) { + // Store operation name for tracking cleanup + let operation_name = { + let plan_result = self.flow.get_execution_plan().await; + match plan_result { + Ok(plan) => plan.import_ops[self.source_idx].name.clone(), + Err(_) => "unknown".to_string(), + } + }; + let process = async { let plan = self.flow.get_execution_plan().await?; let import_op = &plan.import_ops[self.source_idx]; let schema = &self.flow.data_schema; + // Track that we're starting to process this row + update_stats.start_processing(1); + if let Some(ref op_stats) = operation_in_process_stats { + op_stats.start_processing(&import_op.name, 1); + } + let eval_ctx = SourceRowEvaluationContext { plan: &plan, import_op, @@ -474,7 +490,15 @@ impl SourceIndexingContext { result }; let process_and_ack = async { - process.await?; + let result = process.await; + + // Track that we're finishing processing this row (regardless of success/failure) + update_stats.finish_processing(1); + if let Some(ref op_stats) = operation_in_process_stats { + op_stats.finish_processing(&operation_name, 1); + } + + result?; if let Some(ack_fn) = ack_fn { ack_fn().await?; } @@ -603,6 +627,7 @@ impl SourceIndexingContext { }, update_options.mode, update_stats.clone(), + None, // operation_in_process_stats concur_permit, NO_ACK, pool.clone(), @@ -642,6 +667,7 @@ impl SourceIndexingContext { }, update_options.mode, update_stats.clone(), + None, // operation_in_process_stats concur_permit, NO_ACK, pool.clone(), diff --git a/src/execution/stats.rs b/src/execution/stats.rs index fb1f94af..9ae3a1bd 100644 --- a/src/execution/stats.rs +++ b/src/execution/stats.rs @@ -64,6 +64,8 @@ pub struct UpdateStats { /// Number of source rows that were reprocessed because of logic change. pub num_reprocesses: Counter, pub num_errors: Counter, + /// Number of source rows currently being processed. + pub num_in_process: Counter, } impl UpdateStats { @@ -75,6 +77,7 @@ impl UpdateStats { num_updates: self.num_updates.delta(&base.num_updates), num_reprocesses: self.num_reprocesses.delta(&base.num_reprocesses), num_errors: self.num_errors.delta(&base.num_errors), + num_in_process: self.num_in_process.delta(&base.num_in_process), } } @@ -85,6 +88,7 @@ impl UpdateStats { self.num_updates.merge(&delta.num_updates); self.num_reprocesses.merge(&delta.num_reprocesses); self.num_errors.merge(&delta.num_errors); + self.num_in_process.merge(&delta.num_in_process); } pub fn has_any_change(&self) -> bool { @@ -94,6 +98,70 @@ impl UpdateStats { || self.num_reprocesses.get() > 0 || self.num_errors.get() > 0 } + + /// Start processing the specified number of rows. + /// Increments the in-process counter and is called when beginning row processing. + pub fn start_processing(&self, count: i64) { + self.num_in_process.inc(count); + } + + /// Finish processing the specified number of rows. + /// Decrements the in-process counter and is called when row processing completes. + pub fn finish_processing(&self, count: i64) { + self.num_in_process.inc(-count); + } + + /// Get the current number of rows being processed. + pub fn get_in_process_count(&self) -> i64 { + self.num_in_process.get() + } +} + +/// Per-operation tracking of in-process row counts. +#[derive(Debug, Default)] +pub struct OperationInProcessStats { + /// Maps operation names to their current in-process row counts. + operation_counters: std::sync::RwLock>, +} + +impl OperationInProcessStats { + /// Start processing rows for the specified operation. + pub fn start_processing(&self, operation_name: &str, count: i64) { + let mut counters = self.operation_counters.write().unwrap(); + let counter = counters.entry(operation_name.to_string()).or_default(); + counter.inc(count); + } + + /// Finish processing rows for the specified operation. + pub fn finish_processing(&self, operation_name: &str, count: i64) { + let counters = self.operation_counters.write().unwrap(); + if let Some(counter) = counters.get(operation_name) { + counter.inc(-count); + } + } + + /// Get the current in-process count for a specific operation. + pub fn get_operation_in_process_count(&self, operation_name: &str) -> i64 { + let counters = self.operation_counters.read().unwrap(); + counters + .get(operation_name) + .map_or(0, |counter| counter.get()) + } + + /// Get a snapshot of all operation in-process counts. + pub fn get_all_operations_in_process(&self) -> std::collections::HashMap { + let counters = self.operation_counters.read().unwrap(); + counters + .iter() + .map(|(name, counter)| (name.clone(), counter.get())) + .collect() + } + + /// Get the total in-process count across all operations. + pub fn get_total_in_process_count(&self) -> i64 { + let counters = self.operation_counters.read().unwrap(); + counters.values().map(|counter| counter.get()).sum() + } } impl std::fmt::Display for UpdateStats { @@ -136,6 +204,11 @@ impl std::fmt::Display for UpdateStats { )); } + let num_in_process = self.num_in_process.get(); + if num_in_process > 0 { + messages.push(format!("{num_in_process} source rows IN PROCESS")); + } + if !messages.is_empty() { write!(f, "{}", messages.join("; "))?; } else { @@ -171,3 +244,193 @@ impl std::fmt::Display for IndexUpdateInfo { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + use std::thread; + + #[test] + fn test_update_stats_in_process_tracking() { + let stats = UpdateStats::default(); + + // Initially should be zero + assert_eq!(stats.get_in_process_count(), 0); + + // Start processing some rows + stats.start_processing(5); + assert_eq!(stats.get_in_process_count(), 5); + + // Start processing more rows + stats.start_processing(3); + assert_eq!(stats.get_in_process_count(), 8); + + // Finish processing some rows + stats.finish_processing(2); + assert_eq!(stats.get_in_process_count(), 6); + + // Finish processing remaining rows + stats.finish_processing(6); + assert_eq!(stats.get_in_process_count(), 0); + } + + #[test] + fn test_update_stats_thread_safety() { + let stats = Arc::new(UpdateStats::default()); + let mut handles = Vec::new(); + + // Spawn multiple threads that concurrently increment and decrement + for i in 0..10 { + let stats_clone = Arc::clone(&stats); + let handle = thread::spawn(move || { + // Each thread processes 100 rows + stats_clone.start_processing(100); + + // Simulate some work + thread::sleep(std::time::Duration::from_millis(i * 10)); + + // Finish processing + stats_clone.finish_processing(100); + }); + handles.push(handle); + } + + // Wait for all threads to complete + for handle in handles { + handle.join().unwrap(); + } + + // Should be back to zero + assert_eq!(stats.get_in_process_count(), 0); + } + + #[test] + fn test_operation_in_process_stats() { + let op_stats = OperationInProcessStats::default(); + + // Initially should be zero for all operations + assert_eq!(op_stats.get_operation_in_process_count("op1"), 0); + assert_eq!(op_stats.get_total_in_process_count(), 0); + + // Start processing rows for different operations + op_stats.start_processing("op1", 5); + op_stats.start_processing("op2", 3); + + assert_eq!(op_stats.get_operation_in_process_count("op1"), 5); + assert_eq!(op_stats.get_operation_in_process_count("op2"), 3); + assert_eq!(op_stats.get_total_in_process_count(), 8); + + // Get all operations snapshot + let all_ops = op_stats.get_all_operations_in_process(); + assert_eq!(all_ops.len(), 2); + assert_eq!(all_ops.get("op1"), Some(&5)); + assert_eq!(all_ops.get("op2"), Some(&3)); + + // Finish processing some rows + op_stats.finish_processing("op1", 2); + assert_eq!(op_stats.get_operation_in_process_count("op1"), 3); + assert_eq!(op_stats.get_total_in_process_count(), 6); + + // Finish processing all remaining rows + op_stats.finish_processing("op1", 3); + op_stats.finish_processing("op2", 3); + assert_eq!(op_stats.get_total_in_process_count(), 0); + } + + #[test] + fn test_operation_in_process_stats_thread_safety() { + let op_stats = Arc::new(OperationInProcessStats::default()); + let mut handles = Vec::new(); + + // Spawn threads for different operations + for i in 0..5 { + let op_stats_clone = Arc::clone(&op_stats); + let op_name = format!("operation_{}", i); + + let handle = thread::spawn(move || { + // Each operation processes 50 rows + op_stats_clone.start_processing(&op_name, 50); + + // Simulate some work + thread::sleep(std::time::Duration::from_millis(i * 20)); + + // Finish processing + op_stats_clone.finish_processing(&op_name, 50); + }); + handles.push(handle); + } + + // Wait for all threads to complete + for handle in handles { + handle.join().unwrap(); + } + + // Should be back to zero + assert_eq!(op_stats.get_total_in_process_count(), 0); + } + + #[test] + fn test_update_stats_merge_with_in_process() { + let stats1 = UpdateStats::default(); + let stats2 = UpdateStats::default(); + + // Set up different counts + stats1.start_processing(10); + stats1.num_insertions.inc(5); + + stats2.start_processing(15); + stats2.num_updates.inc(3); + + // Merge stats2 into stats1 + stats1.merge(&stats2); + + // Check that all counters were merged correctly + assert_eq!(stats1.get_in_process_count(), 25); // 10 + 15 + assert_eq!(stats1.num_insertions.get(), 5); + assert_eq!(stats1.num_updates.get(), 3); + } + + #[test] + fn test_update_stats_delta_with_in_process() { + let base = UpdateStats::default(); + let current = UpdateStats::default(); + + // Set up base state + base.start_processing(5); + base.num_insertions.inc(2); + + // Set up current state + current.start_processing(12); + current.num_insertions.inc(7); + current.num_updates.inc(3); + + // Calculate delta + let delta = current.delta(&base); + + // Check that delta contains the differences + assert_eq!(delta.get_in_process_count(), 7); // 12 - 5 + assert_eq!(delta.num_insertions.get(), 5); // 7 - 2 + assert_eq!(delta.num_updates.get(), 3); // 3 - 0 + } + + #[test] + fn test_update_stats_display_with_in_process() { + let stats = UpdateStats::default(); + + // Test with no activity + assert_eq!(format!("{}", stats), "No changes"); + + // Test with in-process rows + stats.start_processing(5); + assert!(format!("{}", stats).contains("5 source rows IN PROCESS")); + + // Test with mixed activity + stats.num_insertions.inc(3); + stats.num_errors.inc(1); + let display = format!("{}", stats); + assert!(display.contains("1 source rows FAILED")); + assert!(display.contains("3 source rows processed")); + assert!(display.contains("5 source rows IN PROCESS")); + } +} From 03893b74f1043e0ede1622107687156a85a7f8d7 Mon Sep 17 00:00:00 2001 From: DavdaJames Date: Thu, 2 Oct 2025 23:17:10 +0530 Subject: [PATCH 2/7] fix: now uses ProcessingCounters struct, to make it cummulative --- src/execution/stats.rs | 149 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 132 insertions(+), 17 deletions(-) diff --git a/src/execution/stats.rs b/src/execution/stats.rs index 9ae3a1bd..7cb00843 100644 --- a/src/execution/stats.rs +++ b/src/execution/stats.rs @@ -54,6 +54,55 @@ impl std::fmt::Debug for Counter { } } +#[derive(Debug, Serialize, Default, Clone)] +pub struct ProcessingCounters { + /// Total number of processing operations started. + pub num_starts: Counter, + /// Total number of processing operations ended. + pub num_ends: Counter, +} + +impl ProcessingCounters { + /// Start processing the specified number of items. + pub fn start(&self, count: i64) { + self.num_starts.inc(count); + } + + /// End processing the specified number of items. + pub fn end(&self, count: i64) { + self.num_ends.inc(count); + } + + /// Get the current number of items being processed (starts - ends). + pub fn get_in_process(&self) -> i64 { + self.num_starts.get() - self.num_ends.get() + } + + /// Get the total number of processing operations started. + pub fn get_total_starts(&self) -> i64 { + self.num_starts.get() + } + + /// Get the total number of processing operations ended. + pub fn get_total_ends(&self) -> i64 { + self.num_ends.get() + } + + /// Calculate the delta between this and a base ProcessingCounters. + pub fn delta(&self, base: &Self) -> Self { + ProcessingCounters { + num_starts: self.num_starts.delta(&base.num_starts), + num_ends: self.num_ends.delta(&base.num_ends), + } + } + + /// Merge a delta into this ProcessingCounters. + pub fn merge(&self, delta: &Self) { + self.num_starts.merge(&delta.num_starts); + self.num_ends.merge(&delta.num_ends); + } +} + #[derive(Debug, Serialize, Default, Clone)] pub struct UpdateStats { pub num_no_change: Counter, @@ -64,8 +113,8 @@ pub struct UpdateStats { /// Number of source rows that were reprocessed because of logic change. pub num_reprocesses: Counter, pub num_errors: Counter, - /// Number of source rows currently being processed. - pub num_in_process: Counter, + /// Processing counters for tracking in-process rows. + pub processing: ProcessingCounters, } impl UpdateStats { @@ -77,7 +126,7 @@ impl UpdateStats { num_updates: self.num_updates.delta(&base.num_updates), num_reprocesses: self.num_reprocesses.delta(&base.num_reprocesses), num_errors: self.num_errors.delta(&base.num_errors), - num_in_process: self.num_in_process.delta(&base.num_in_process), + processing: self.processing.delta(&base.processing), } } @@ -88,7 +137,7 @@ impl UpdateStats { self.num_updates.merge(&delta.num_updates); self.num_reprocesses.merge(&delta.num_reprocesses); self.num_errors.merge(&delta.num_errors); - self.num_in_process.merge(&delta.num_in_process); + self.processing.merge(&delta.processing); } pub fn has_any_change(&self) -> bool { @@ -100,28 +149,28 @@ impl UpdateStats { } /// Start processing the specified number of rows. - /// Increments the in-process counter and is called when beginning row processing. + /// Increments the processing start counter. pub fn start_processing(&self, count: i64) { - self.num_in_process.inc(count); + self.processing.start(count); } /// Finish processing the specified number of rows. - /// Decrements the in-process counter and is called when row processing completes. + /// Increments the processing end counter. pub fn finish_processing(&self, count: i64) { - self.num_in_process.inc(-count); + self.processing.end(count); } /// Get the current number of rows being processed. pub fn get_in_process_count(&self) -> i64 { - self.num_in_process.get() + self.processing.get_in_process() } } /// Per-operation tracking of in-process row counts. #[derive(Debug, Default)] pub struct OperationInProcessStats { - /// Maps operation names to their current in-process row counts. - operation_counters: std::sync::RwLock>, + /// Maps operation names to their processing counters. + operation_counters: std::sync::RwLock>, } impl OperationInProcessStats { @@ -129,14 +178,14 @@ impl OperationInProcessStats { pub fn start_processing(&self, operation_name: &str, count: i64) { let mut counters = self.operation_counters.write().unwrap(); let counter = counters.entry(operation_name.to_string()).or_default(); - counter.inc(count); + counter.start(count); } /// Finish processing rows for the specified operation. pub fn finish_processing(&self, operation_name: &str, count: i64) { let counters = self.operation_counters.write().unwrap(); if let Some(counter) = counters.get(operation_name) { - counter.inc(-count); + counter.end(count); } } @@ -145,7 +194,7 @@ impl OperationInProcessStats { let counters = self.operation_counters.read().unwrap(); counters .get(operation_name) - .map_or(0, |counter| counter.get()) + .map_or(0, |counter| counter.get_in_process()) } /// Get a snapshot of all operation in-process counts. @@ -153,14 +202,17 @@ impl OperationInProcessStats { let counters = self.operation_counters.read().unwrap(); counters .iter() - .map(|(name, counter)| (name.clone(), counter.get())) + .map(|(name, counter)| (name.clone(), counter.get_in_process())) .collect() } /// Get the total in-process count across all operations. pub fn get_total_in_process_count(&self) -> i64 { let counters = self.operation_counters.read().unwrap(); - counters.values().map(|counter| counter.get()).sum() + counters + .values() + .map(|counter| counter.get_in_process()) + .sum() } } @@ -204,7 +256,7 @@ impl std::fmt::Display for UpdateStats { )); } - let num_in_process = self.num_in_process.get(); + let num_in_process = self.processing.get_in_process(); if num_in_process > 0 { messages.push(format!("{num_in_process} source rows IN PROCESS")); } @@ -251,6 +303,69 @@ mod tests { use std::sync::Arc; use std::thread; + #[test] + fn test_processing_counters() { + let counters = ProcessingCounters::default(); + + // Initially should be zero + assert_eq!(counters.get_in_process(), 0); + assert_eq!(counters.get_total_starts(), 0); + assert_eq!(counters.get_total_ends(), 0); + + // Start processing some items + counters.start(5); + assert_eq!(counters.get_in_process(), 5); + assert_eq!(counters.get_total_starts(), 5); + assert_eq!(counters.get_total_ends(), 0); + + // Start processing more items + counters.start(3); + assert_eq!(counters.get_in_process(), 8); + assert_eq!(counters.get_total_starts(), 8); + assert_eq!(counters.get_total_ends(), 0); + + // End processing some items + counters.end(2); + assert_eq!(counters.get_in_process(), 6); + assert_eq!(counters.get_total_starts(), 8); + assert_eq!(counters.get_total_ends(), 2); + + // End processing remaining items + counters.end(6); + assert_eq!(counters.get_in_process(), 0); + assert_eq!(counters.get_total_starts(), 8); + assert_eq!(counters.get_total_ends(), 8); + } + + #[test] + fn test_processing_counters_delta_and_merge() { + let base = ProcessingCounters::default(); + let current = ProcessingCounters::default(); + + // Set up base state + base.start(5); + base.end(2); + + // Set up current state + current.start(12); + current.end(4); + + // Calculate delta + let delta = current.delta(&base); + assert_eq!(delta.get_total_starts(), 7); // 12 - 5 + assert_eq!(delta.get_total_ends(), 2); // 4 - 2 + assert_eq!(delta.get_in_process(), 5); // 7 - 2 + + // Test merge + let merged = ProcessingCounters::default(); + merged.start(10); + merged.end(3); + merged.merge(&delta); + assert_eq!(merged.get_total_starts(), 17); // 10 + 7 + assert_eq!(merged.get_total_ends(), 5); // 3 + 2 + assert_eq!(merged.get_in_process(), 12); // 17 - 5 + } + #[test] fn test_update_stats_in_process_tracking() { let stats = UpdateStats::default(); From 1449612138b741731a41567822c5dc131443fead Mon Sep 17 00:00:00 2001 From: DavdaJames Date: Fri, 3 Oct 2025 00:32:51 +0530 Subject: [PATCH 3/7] refactored according to reviews --- src/execution/evaluator.rs | 52 +++++-- src/execution/row_indexer.rs | 48 ++++++- src/execution/source_indexer.rs | 5 +- src/execution/stats.rs | 231 +++++++++++++++++++++++--------- 4 files changed, 253 insertions(+), 83 deletions(-) diff --git a/src/execution/evaluator.rs b/src/execution/evaluator.rs index 71319378..bd5caa2b 100644 --- a/src/execution/evaluator.rs +++ b/src/execution/evaluator.rs @@ -322,6 +322,7 @@ async fn evaluate_child_op_scope( child_scope_entry: ScopeEntry<'_>, concurrency_controller: &concur_control::ConcurrencyController, memory: &EvaluationMemory, + operation_in_process_stats: Option<&crate::execution::stats::OperationInProcessStats>, ) -> Result<()> { let _permit = concurrency_controller .acquire(Some(|| { @@ -333,32 +334,44 @@ async fn evaluate_child_op_scope( .sum() })) .await?; - evaluate_op_scope(op_scope, scoped_entries.prepend(&child_scope_entry), memory) - .await - .with_context(|| { - format!( - "Evaluating in scope with key {}", - match child_scope_entry.key.key() { - Some(k) => k.to_string(), - None => "()".to_string(), - } - ) - }) + evaluate_op_scope( + op_scope, + scoped_entries.prepend(&child_scope_entry), + memory, + operation_in_process_stats, + ) + .await + .with_context(|| { + format!( + "Evaluating in scope with key {}", + match child_scope_entry.key.key() { + Some(k) => k.to_string(), + None => "()".to_string(), + } + ) + }) } async fn evaluate_op_scope( op_scope: &AnalyzedOpScope, scoped_entries: RefList<'_, &ScopeEntry<'_>>, memory: &EvaluationMemory, + operation_in_process_stats: Option<&crate::execution::stats::OperationInProcessStats>, ) -> Result<()> { let head_scope = *scoped_entries.head().unwrap(); for reactive_op in op_scope.reactive_ops.iter() { match reactive_op { AnalyzedReactiveOp::Transform(op) => { + // Track transform operation start + if let Some(ref op_stats) = operation_in_process_stats { + op_stats.start_processing(&op.name, 1); + } + let mut input_values = Vec::with_capacity(op.inputs.len()); input_values .extend(assemble_input_values(&op.inputs, scoped_entries).collect::>()); - if op.function_exec_info.enable_cache { + + let result = if op.function_exec_info.enable_cache { let output_value_cell = memory.get_cache_entry( || { Ok(op @@ -382,7 +395,14 @@ async fn evaluate_op_scope( .await .and_then(|v| head_scope.define_field(&op.output, &v)) } - .with_context(|| format!("Evaluating Transform op `{}`", op.name,))? + .with_context(|| format!("Evaluating Transform op `{}`", op.name,)); + + // Track transform operation completion + if let Some(ref op_stats) = operation_in_process_stats { + op_stats.finish_processing(&op.name, 1); + } + + result? } AnalyzedReactiveOp::ForEach(op) => { @@ -408,6 +428,7 @@ async fn evaluate_op_scope( ), &op.concurrency_controller, memory, + operation_in_process_stats, ) }) .collect::>(), @@ -425,6 +446,7 @@ async fn evaluate_op_scope( ), &op.concurrency_controller, memory, + operation_in_process_stats, ) }) .collect::>(), @@ -443,6 +465,7 @@ async fn evaluate_op_scope( ), &op.concurrency_controller, memory, + operation_in_process_stats, ) }) .collect::>(), @@ -509,6 +532,7 @@ pub async fn evaluate_source_entry( src_eval_ctx: &SourceRowEvaluationContext<'_>, source_value: value::FieldValues, memory: &EvaluationMemory, + operation_in_process_stats: Option<&crate::execution::stats::OperationInProcessStats>, ) -> Result { let _permit = src_eval_ctx .import_op @@ -556,6 +580,7 @@ pub async fn evaluate_source_entry( &src_eval_ctx.plan.op_scope, RefList::Nil.prepend(&root_scope_entry), memory, + operation_in_process_stats, ) .await?; let collected_values = root_scope_entry @@ -604,6 +629,7 @@ pub async fn evaluate_transient_flow( &flow.execution_plan.op_scope, RefList::Nil.prepend(&root_scope_entry), &eval_memory, + None, // No operation stats for transient flows ) .await?; let output_value = assemble_value( diff --git a/src/execution/row_indexer.rs b/src/execution/row_indexer.rs index d135ee7b..34ea8d64 100644 --- a/src/execution/row_indexer.rs +++ b/src/execution/row_indexer.rs @@ -183,6 +183,7 @@ pub struct RowIndexer<'a> { setup_execution_ctx: &'a exec_ctx::FlowSetupExecutionContext, mode: super::source_indexer::UpdateMode, update_stats: &'a stats::UpdateStats, + operation_in_process_stats: Option<&'a stats::OperationInProcessStats>, pool: &'a PgPool, source_id: i32, @@ -201,6 +202,7 @@ impl<'a> RowIndexer<'a> { mode: super::source_indexer::UpdateMode, process_time: chrono::DateTime, update_stats: &'a stats::UpdateStats, + operation_in_process_stats: Option<&'a stats::OperationInProcessStats>, pool: &'a PgPool, ) -> Result { Ok(Self { @@ -212,6 +214,7 @@ impl<'a> RowIndexer<'a> { setup_execution_ctx, mode, update_stats, + operation_in_process_stats, pool, }) } @@ -311,9 +314,13 @@ impl<'a> RowIndexer<'a> { }, ); - let output = - evaluate_source_entry(self.src_eval_ctx, source_value, &evaluation_memory) - .await?; + let output = evaluate_source_entry( + self.src_eval_ctx, + source_value, + &evaluation_memory, + self.operation_in_process_stats, + ) + .await?; let mut stored_info = evaluation_memory.into_stored()?; if tracking_setup_state.has_fast_fingerprint_column { (Some(output), stored_info, content_version_fp) @@ -368,9 +375,36 @@ impl<'a> RowIndexer<'a> { }) .collect(); (!mutations_w_ctx.is_empty()).then(|| { - export_op_group - .target_factory - .apply_mutation(mutations_w_ctx) + // Track export operation start + if let Some(ref op_stats) = self.operation_in_process_stats { + for export_op_idx in &export_op_group.op_idx { + let export_op = &self.src_eval_ctx.plan.export_ops[*export_op_idx]; + op_stats.start_processing(&export_op.name, 1); + } + } + + let export_op_names: Vec = export_op_group + .op_idx + .iter() + .map(|idx| self.src_eval_ctx.plan.export_ops[*idx].name.clone()) + .collect(); + let operation_in_process_stats = self.operation_in_process_stats; + + async move { + let result = export_op_group + .target_factory + .apply_mutation(mutations_w_ctx) + .await; + + // Track export operation completion + if let Some(ref op_stats) = operation_in_process_stats { + for export_op_name in &export_op_names { + op_stats.finish_processing(export_op_name, 1); + } + } + + result + } }) }); @@ -875,7 +909,7 @@ pub async fn evaluate_source_entry_with_memory( .ok_or_else(|| anyhow::anyhow!("value not returned"))?; let output = match source_value { interface::SourceValue::Existence(source_value) => { - Some(evaluate_source_entry(src_eval_ctx, source_value, &memory).await?) + Some(evaluate_source_entry(src_eval_ctx, source_value, &memory, None).await?) } interface::SourceValue::NonExistence => None, }; diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index b0b3400b..005c3b79 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -337,7 +337,7 @@ impl SourceIndexingContext { let schema = &self.flow.data_schema; // Track that we're starting to process this row - update_stats.start_processing(1); + update_stats.processing.start(1); if let Some(ref op_stats) = operation_in_process_stats { op_stats.start_processing(&import_op.name, 1); } @@ -356,6 +356,7 @@ impl SourceIndexingContext { mode, process_time, &update_stats, + operation_in_process_stats.as_ref().map(|s| s.as_ref()), &pool, )?; @@ -493,7 +494,7 @@ impl SourceIndexingContext { let result = process.await; // Track that we're finishing processing this row (regardless of success/failure) - update_stats.finish_processing(1); + update_stats.processing.end(1); if let Some(ref op_stats) = operation_in_process_stats { op_stats.finish_processing(&operation_name, 1); } diff --git a/src/execution/stats.rs b/src/execution/stats.rs index 7cb00843..88f5207a 100644 --- a/src/execution/stats.rs +++ b/src/execution/stats.rs @@ -75,17 +75,9 @@ impl ProcessingCounters { /// Get the current number of items being processed (starts - ends). pub fn get_in_process(&self) -> i64 { - self.num_starts.get() - self.num_ends.get() - } - - /// Get the total number of processing operations started. - pub fn get_total_starts(&self) -> i64 { - self.num_starts.get() - } - - /// Get the total number of processing operations ended. - pub fn get_total_ends(&self) -> i64 { - self.num_ends.get() + let ends = self.num_ends.get(); + let starts = self.num_starts.get(); + starts - ends } /// Calculate the delta between this and a base ProcessingCounters. @@ -147,23 +139,6 @@ impl UpdateStats { || self.num_reprocesses.get() > 0 || self.num_errors.get() > 0 } - - /// Start processing the specified number of rows. - /// Increments the processing start counter. - pub fn start_processing(&self, count: i64) { - self.processing.start(count); - } - - /// Finish processing the specified number of rows. - /// Increments the processing end counter. - pub fn finish_processing(&self, count: i64) { - self.processing.end(count); - } - - /// Get the current number of rows being processed. - pub fn get_in_process_count(&self) -> i64 { - self.processing.get_in_process() - } } /// Per-operation tracking of in-process row counts. @@ -309,32 +284,32 @@ mod tests { // Initially should be zero assert_eq!(counters.get_in_process(), 0); - assert_eq!(counters.get_total_starts(), 0); - assert_eq!(counters.get_total_ends(), 0); + assert_eq!(counters.num_starts.get(), 0); + assert_eq!(counters.num_ends.get(), 0); // Start processing some items counters.start(5); assert_eq!(counters.get_in_process(), 5); - assert_eq!(counters.get_total_starts(), 5); - assert_eq!(counters.get_total_ends(), 0); + assert_eq!(counters.num_starts.get(), 5); + assert_eq!(counters.num_ends.get(), 0); // Start processing more items counters.start(3); assert_eq!(counters.get_in_process(), 8); - assert_eq!(counters.get_total_starts(), 8); - assert_eq!(counters.get_total_ends(), 0); + assert_eq!(counters.num_starts.get(), 8); + assert_eq!(counters.num_ends.get(), 0); // End processing some items counters.end(2); assert_eq!(counters.get_in_process(), 6); - assert_eq!(counters.get_total_starts(), 8); - assert_eq!(counters.get_total_ends(), 2); + assert_eq!(counters.num_starts.get(), 8); + assert_eq!(counters.num_ends.get(), 2); // End processing remaining items counters.end(6); assert_eq!(counters.get_in_process(), 0); - assert_eq!(counters.get_total_starts(), 8); - assert_eq!(counters.get_total_ends(), 8); + assert_eq!(counters.num_starts.get(), 8); + assert_eq!(counters.num_ends.get(), 8); } #[test] @@ -352,8 +327,8 @@ mod tests { // Calculate delta let delta = current.delta(&base); - assert_eq!(delta.get_total_starts(), 7); // 12 - 5 - assert_eq!(delta.get_total_ends(), 2); // 4 - 2 + assert_eq!(delta.num_starts.get(), 7); // 12 - 5 + assert_eq!(delta.num_ends.get(), 2); // 4 - 2 assert_eq!(delta.get_in_process(), 5); // 7 - 2 // Test merge @@ -361,8 +336,8 @@ mod tests { merged.start(10); merged.end(3); merged.merge(&delta); - assert_eq!(merged.get_total_starts(), 17); // 10 + 7 - assert_eq!(merged.get_total_ends(), 5); // 3 + 2 + assert_eq!(merged.num_starts.get(), 17); // 10 + 7 + assert_eq!(merged.num_ends.get(), 5); // 3 + 2 assert_eq!(merged.get_in_process(), 12); // 17 - 5 } @@ -371,23 +346,23 @@ mod tests { let stats = UpdateStats::default(); // Initially should be zero - assert_eq!(stats.get_in_process_count(), 0); + assert_eq!(stats.processing.get_in_process(), 0); // Start processing some rows - stats.start_processing(5); - assert_eq!(stats.get_in_process_count(), 5); + stats.processing.start(5); + assert_eq!(stats.processing.get_in_process(), 5); // Start processing more rows - stats.start_processing(3); - assert_eq!(stats.get_in_process_count(), 8); + stats.processing.start(3); + assert_eq!(stats.processing.get_in_process(), 8); // Finish processing some rows - stats.finish_processing(2); - assert_eq!(stats.get_in_process_count(), 6); + stats.processing.end(2); + assert_eq!(stats.processing.get_in_process(), 6); // Finish processing remaining rows - stats.finish_processing(6); - assert_eq!(stats.get_in_process_count(), 0); + stats.processing.end(6); + assert_eq!(stats.processing.get_in_process(), 0); } #[test] @@ -400,13 +375,13 @@ mod tests { let stats_clone = Arc::clone(&stats); let handle = thread::spawn(move || { // Each thread processes 100 rows - stats_clone.start_processing(100); + stats_clone.processing.start(100); // Simulate some work thread::sleep(std::time::Duration::from_millis(i * 10)); // Finish processing - stats_clone.finish_processing(100); + stats_clone.processing.end(100); }); handles.push(handle); } @@ -417,7 +392,7 @@ mod tests { } // Should be back to zero - assert_eq!(stats.get_in_process_count(), 0); + assert_eq!(stats.processing.get_in_process(), 0); } #[test] @@ -491,17 +466,17 @@ mod tests { let stats2 = UpdateStats::default(); // Set up different counts - stats1.start_processing(10); + stats1.processing.start(10); stats1.num_insertions.inc(5); - stats2.start_processing(15); + stats2.processing.start(15); stats2.num_updates.inc(3); // Merge stats2 into stats1 stats1.merge(&stats2); // Check that all counters were merged correctly - assert_eq!(stats1.get_in_process_count(), 25); // 10 + 15 + assert_eq!(stats1.processing.get_in_process(), 25); // 10 + 15 assert_eq!(stats1.num_insertions.get(), 5); assert_eq!(stats1.num_updates.get(), 3); } @@ -512,11 +487,11 @@ mod tests { let current = UpdateStats::default(); // Set up base state - base.start_processing(5); + base.processing.start(5); base.num_insertions.inc(2); // Set up current state - current.start_processing(12); + current.processing.start(12); current.num_insertions.inc(7); current.num_updates.inc(3); @@ -524,7 +499,7 @@ mod tests { let delta = current.delta(&base); // Check that delta contains the differences - assert_eq!(delta.get_in_process_count(), 7); // 12 - 5 + assert_eq!(delta.processing.get_in_process(), 7); // 12 - 5 assert_eq!(delta.num_insertions.get(), 5); // 7 - 2 assert_eq!(delta.num_updates.get(), 3); // 3 - 0 } @@ -537,7 +512,7 @@ mod tests { assert_eq!(format!("{}", stats), "No changes"); // Test with in-process rows - stats.start_processing(5); + stats.processing.start(5); assert!(format!("{}", stats).contains("5 source rows IN PROCESS")); // Test with mixed activity @@ -548,4 +523,138 @@ mod tests { assert!(display.contains("3 source rows processed")); assert!(display.contains("5 source rows IN PROCESS")); } + + #[test] + fn test_granular_operation_tracking_integration() { + let op_stats = OperationInProcessStats::default(); + + // Simulate import operations + op_stats.start_processing("import_users", 5); + op_stats.start_processing("import_orders", 3); + + // Simulate transform operations + op_stats.start_processing("transform_user_data", 4); + op_stats.start_processing("transform_order_data", 2); + + // Simulate export operations + op_stats.start_processing("export_to_postgres", 3); + op_stats.start_processing("export_to_elasticsearch", 2); + + // Check individual operation counts + assert_eq!(op_stats.get_operation_in_process_count("import_users"), 5); + assert_eq!( + op_stats.get_operation_in_process_count("transform_user_data"), + 4 + ); + assert_eq!( + op_stats.get_operation_in_process_count("export_to_postgres"), + 3 + ); + + // Check total count across all operations + assert_eq!(op_stats.get_total_in_process_count(), 19); // 5+3+4+2+3+2 + + // Check snapshot of all operations + let all_ops = op_stats.get_all_operations_in_process(); + assert_eq!(all_ops.len(), 6); + assert_eq!(all_ops.get("import_users"), Some(&5)); + assert_eq!(all_ops.get("transform_user_data"), Some(&4)); + assert_eq!(all_ops.get("export_to_postgres"), Some(&3)); + + // Finish some operations + op_stats.finish_processing("import_users", 2); + op_stats.finish_processing("transform_user_data", 4); + op_stats.finish_processing("export_to_postgres", 1); + + // Verify counts after completion + assert_eq!(op_stats.get_operation_in_process_count("import_users"), 3); // 5-2 + assert_eq!( + op_stats.get_operation_in_process_count("transform_user_data"), + 0 + ); // 4-4 + assert_eq!( + op_stats.get_operation_in_process_count("export_to_postgres"), + 2 + ); // 3-1 + assert_eq!(op_stats.get_total_in_process_count(), 12); // 3+3+0+2+2+2 + } + + #[test] + fn test_operation_tracking_with_realistic_pipeline() { + let op_stats = OperationInProcessStats::default(); + + // Simulate a realistic processing pipeline scenario + // Import phase: Start processing 100 rows + op_stats.start_processing("users_import", 100); + assert_eq!(op_stats.get_total_in_process_count(), 100); + + // Transform phase: As import finishes, transform starts + for i in 0..100 { + // Each imported row triggers a transform + if i % 10 == 0 { + // Complete import batch every 10 items + op_stats.finish_processing("users_import", 10); + } + + // Start transform for each item + op_stats.start_processing("user_transform", 1); + + // Some transforms complete quickly + if i % 5 == 0 { + op_stats.finish_processing("user_transform", 1); + } + } + + // Verify intermediate state + assert_eq!(op_stats.get_operation_in_process_count("users_import"), 0); // All imports finished + assert_eq!( + op_stats.get_operation_in_process_count("user_transform"), + 80 + ); // 100 started - 20 finished + + // Export phase: As transforms finish, exports start + for i in 0..80 { + op_stats.finish_processing("user_transform", 1); + op_stats.start_processing("user_export", 1); + + // Some exports complete + if i % 3 == 0 { + op_stats.finish_processing("user_export", 1); + } + } + + // Final verification + assert_eq!(op_stats.get_operation_in_process_count("users_import"), 0); + assert_eq!(op_stats.get_operation_in_process_count("user_transform"), 0); + assert_eq!(op_stats.get_operation_in_process_count("user_export"), 53); // 80 - 27 (80/3 rounded down) + assert_eq!(op_stats.get_total_in_process_count(), 53); + } + + #[test] + fn test_operation_tracking_cumulative_behavior() { + let op_stats = OperationInProcessStats::default(); + + // Test that operation tracking maintains cumulative behavior for delta calculations + let snapshot1 = OperationInProcessStats::default(); + + // Initial state + op_stats.start_processing("test_op", 10); + op_stats.finish_processing("test_op", 3); + + // Simulate taking a snapshot (in real code, this would involve cloning counters) + // For testing, will manually create the "previous" state + snapshot1.start_processing("test_op", 10); + snapshot1.finish_processing("test_op", 3); + + // Continue processing + op_stats.start_processing("test_op", 5); + op_stats.finish_processing("test_op", 2); + + // Verify cumulative nature + // op_stats should have: starts=15, ends=5, in_process=10 + // snapshot1 should have: starts=10, ends=3, in_process=7 + // Delta would be: starts=5, ends=2, net_change=3 + + assert_eq!(op_stats.get_operation_in_process_count("test_op"), 10); // 15-5 + } } From 8421d957e2d46a13fe96da14370686a4a06b3408 Mon Sep 17 00:00:00 2001 From: DavdaJames Date: Fri, 3 Oct 2025 09:09:54 +0530 Subject: [PATCH 4/7] using target_kind for group identification --- src/builder/analyzer.rs | 1 + src/builder/plan.rs | 1 + src/execution/evaluator.rs | 12 +++++++----- src/execution/live_updater.rs | 19 +------------------ src/execution/row_indexer.rs | 16 ++++------------ src/execution/source_indexer.rs | 7 ++++--- 6 files changed, 18 insertions(+), 38 deletions(-) diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index 57be0c16..4522bf04 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -1059,6 +1059,7 @@ pub async fn analyze_flow( let target_factory = get_target_factory(&target_kind)?; let analyzed_target_op_group = AnalyzedExportTargetOpGroup { target_factory, + target_kind: target_kind.clone(), op_idx: op_ids.export_op_ids, }; export_ops_futs.extend( diff --git a/src/builder/plan.rs b/src/builder/plan.rs index 2aa878d5..61b2ac63 100644 --- a/src/builder/plan.rs +++ b/src/builder/plan.rs @@ -117,6 +117,7 @@ pub struct AnalyzedExportOp { pub struct AnalyzedExportTargetOpGroup { pub target_factory: Arc, + pub target_kind: String, pub op_idx: Vec, } diff --git a/src/execution/evaluator.rs b/src/execution/evaluator.rs index bd5caa2b..ed6f3cb6 100644 --- a/src/execution/evaluator.rs +++ b/src/execution/evaluator.rs @@ -322,7 +322,7 @@ async fn evaluate_child_op_scope( child_scope_entry: ScopeEntry<'_>, concurrency_controller: &concur_control::ConcurrencyController, memory: &EvaluationMemory, - operation_in_process_stats: Option<&crate::execution::stats::OperationInProcessStats>, + operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>, ) -> Result<()> { let _permit = concurrency_controller .acquire(Some(|| { @@ -356,7 +356,7 @@ async fn evaluate_op_scope( op_scope: &AnalyzedOpScope, scoped_entries: RefList<'_, &ScopeEntry<'_>>, memory: &EvaluationMemory, - operation_in_process_stats: Option<&crate::execution::stats::OperationInProcessStats>, + operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>, ) -> Result<()> { let head_scope = *scoped_entries.head().unwrap(); for reactive_op in op_scope.reactive_ops.iter() { @@ -364,7 +364,8 @@ async fn evaluate_op_scope( AnalyzedReactiveOp::Transform(op) => { // Track transform operation start if let Some(ref op_stats) = operation_in_process_stats { - op_stats.start_processing(&op.name, 1); + let transform_key = format!("transform/{}", op.name); + op_stats.start_processing(&transform_key, 1); } let mut input_values = Vec::with_capacity(op.inputs.len()); @@ -399,7 +400,8 @@ async fn evaluate_op_scope( // Track transform operation completion if let Some(ref op_stats) = operation_in_process_stats { - op_stats.finish_processing(&op.name, 1); + let transform_key = format!("transform/{}", op.name); + op_stats.finish_processing(&transform_key, 1); } result? @@ -532,7 +534,7 @@ pub async fn evaluate_source_entry( src_eval_ctx: &SourceRowEvaluationContext<'_>, source_value: value::FieldValues, memory: &EvaluationMemory, - operation_in_process_stats: Option<&crate::execution::stats::OperationInProcessStats>, + operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>, ) -> Result { let _permit = src_eval_ctx .import_op diff --git a/src/execution/live_updater.rs b/src/execution/live_updater.rs index cdc6f77c..8299f94b 100644 --- a/src/execution/live_updater.rs +++ b/src/execution/live_updater.rs @@ -28,7 +28,7 @@ pub struct FlowLiveUpdater { join_set: Mutex>>>, stats_per_task: Vec>, /// Global tracking of in-process rows per operation - operation_in_process_stats: Arc, + pub operation_in_process_stats: Arc, recv_state: tokio::sync::Mutex, num_remaining_tasks_rx: watch::Receiver, @@ -415,23 +415,6 @@ impl FlowLiveUpdater { } } - /// Get the total number of rows currently being processed across all operations. - pub fn get_total_in_process_count(&self) -> i64 { - self.operation_in_process_stats.get_total_in_process_count() - } - - /// Get the number of rows currently being processed for a specific operation. - pub fn get_operation_in_process_count(&self, operation_name: &str) -> i64 { - self.operation_in_process_stats - .get_operation_in_process_count(operation_name) - } - - /// Get a snapshot of all operation in-process counts. - pub fn get_all_operations_in_process(&self) -> std::collections::HashMap { - self.operation_in_process_stats - .get_all_operations_in_process() - } - pub async fn next_status_updates(&self) -> Result { let mut recv_state = self.recv_state.lock().await; let recv_state = &mut *recv_state; diff --git a/src/execution/row_indexer.rs b/src/execution/row_indexer.rs index 34ea8d64..9e72b628 100644 --- a/src/execution/row_indexer.rs +++ b/src/execution/row_indexer.rs @@ -377,17 +377,11 @@ impl<'a> RowIndexer<'a> { (!mutations_w_ctx.is_empty()).then(|| { // Track export operation start if let Some(ref op_stats) = self.operation_in_process_stats { - for export_op_idx in &export_op_group.op_idx { - let export_op = &self.src_eval_ctx.plan.export_ops[*export_op_idx]; - op_stats.start_processing(&export_op.name, 1); - } + let export_key = format!("export/{}", export_op_group.target_kind); + op_stats.start_processing(&export_key, 1); } - let export_op_names: Vec = export_op_group - .op_idx - .iter() - .map(|idx| self.src_eval_ctx.plan.export_ops[*idx].name.clone()) - .collect(); + let export_key = format!("export/{}", export_op_group.target_kind); let operation_in_process_stats = self.operation_in_process_stats; async move { @@ -398,9 +392,7 @@ impl<'a> RowIndexer<'a> { // Track export operation completion if let Some(ref op_stats) = operation_in_process_stats { - for export_op_name in &export_op_names { - op_stats.finish_processing(export_op_name, 1); - } + op_stats.finish_processing(&export_key, 1); } result diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index 005c3b79..e45cd32f 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -326,8 +326,8 @@ impl SourceIndexingContext { let operation_name = { let plan_result = self.flow.get_execution_plan().await; match plan_result { - Ok(plan) => plan.import_ops[self.source_idx].name.clone(), - Err(_) => "unknown".to_string(), + Ok(plan) => format!("import/{}", plan.import_ops[self.source_idx].name), + Err(_) => "import/unknown".to_string(), } }; @@ -339,7 +339,8 @@ impl SourceIndexingContext { // Track that we're starting to process this row update_stats.processing.start(1); if let Some(ref op_stats) = operation_in_process_stats { - op_stats.start_processing(&import_op.name, 1); + let import_key = format!("import/{}", import_op.name); + op_stats.start_processing(&import_key, 1); } let eval_ctx = SourceRowEvaluationContext { From 339d41d32524b17557d61f4319998f3d487f1d52 Mon Sep 17 00:00:00 2001 From: DavdaJames Date: Fri, 3 Oct 2025 14:54:56 +0530 Subject: [PATCH 5/7] scope issue fixed, trackin range fixed, moved block to async closure --- src/builder/analyzer.rs | 27 +++++++++++++++++++++++++++ src/builder/plan.rs | 1 + src/execution/evaluator.rs | 6 ++++-- src/execution/row_indexer.rs | 11 +++++------ src/execution/source_indexer.rs | 21 +++++++++++++-------- 5 files changed, 50 insertions(+), 16 deletions(-) diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index 4522bf04..c9206ef6 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -969,14 +969,41 @@ impl AnalyzerContext { op_futs.push(self.analyze_reactive_op(op_scope, reactive_op).await?); } let collector_len = op_scope.states.lock().unwrap().collectors.len(); + let scope_qualifier = self.build_scope_qualifier(op_scope); let result_fut = async move { Ok(AnalyzedOpScope { reactive_ops: try_join_all(op_futs).await?, collector_len, + scope_qualifier, }) }; Ok(result_fut) } + + fn build_scope_qualifier(&self, op_scope: &Arc) -> String { + let mut scope_names = Vec::new(); + let mut current_scope = Some(op_scope.as_ref()); + + // Walk up the parent chain to collect scope names + while let Some(scope) = current_scope { + if let Some((parent, _)) = &scope.parent { + scope_names.push(scope.name.clone()); + current_scope = Some(parent.as_ref()); + } else { + break; + } + } + + // Reverse to get the correct order (root to leaf) + scope_names.reverse(); + + // Build the qualifier string: "" for root, "name." for single level, "parent.child." for nested + if scope_names.is_empty() { + String::new() + } else { + format!("{}.", scope_names.join(".")) + } + } } pub fn build_flow_instance_context( diff --git a/src/builder/plan.rs b/src/builder/plan.rs index 61b2ac63..88d08c60 100644 --- a/src/builder/plan.rs +++ b/src/builder/plan.rs @@ -130,6 +130,7 @@ pub enum AnalyzedReactiveOp { pub struct AnalyzedOpScope { pub reactive_ops: Vec, pub collector_len: usize, + pub scope_qualifier: String, } pub struct ExecutionPlan { diff --git a/src/execution/evaluator.rs b/src/execution/evaluator.rs index ed6f3cb6..471c9995 100644 --- a/src/execution/evaluator.rs +++ b/src/execution/evaluator.rs @@ -364,7 +364,8 @@ async fn evaluate_op_scope( AnalyzedReactiveOp::Transform(op) => { // Track transform operation start if let Some(ref op_stats) = operation_in_process_stats { - let transform_key = format!("transform/{}", op.name); + let transform_key = + format!("transform/{}{}", op_scope.scope_qualifier, op.name); op_stats.start_processing(&transform_key, 1); } @@ -400,7 +401,8 @@ async fn evaluate_op_scope( // Track transform operation completion if let Some(ref op_stats) = operation_in_process_stats { - let transform_key = format!("transform/{}", op.name); + let transform_key = + format!("transform/{}{}", op_scope.scope_qualifier, op.name); op_stats.finish_processing(&transform_key, 1); } diff --git a/src/execution/row_indexer.rs b/src/execution/row_indexer.rs index 9e72b628..59cf310c 100644 --- a/src/execution/row_indexer.rs +++ b/src/execution/row_indexer.rs @@ -375,16 +375,15 @@ impl<'a> RowIndexer<'a> { }) .collect(); (!mutations_w_ctx.is_empty()).then(|| { - // Track export operation start - if let Some(ref op_stats) = self.operation_in_process_stats { - let export_key = format!("export/{}", export_op_group.target_kind); - op_stats.start_processing(&export_key, 1); - } - let export_key = format!("export/{}", export_op_group.target_kind); let operation_in_process_stats = self.operation_in_process_stats; async move { + // Track export operation start + if let Some(ref op_stats) = operation_in_process_stats { + op_stats.start_processing(&export_key, 1); + } + let result = export_op_group .target_factory .apply_mutation(mutations_w_ctx) diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index e45cd32f..0dca6777 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -338,10 +338,6 @@ impl SourceIndexingContext { // Track that we're starting to process this row update_stats.processing.start(1); - if let Some(ref op_stats) = operation_in_process_stats { - let import_key = format!("import/{}", import_op.name); - op_stats.start_processing(&import_key, 1); - } let eval_ctx = SourceRowEvaluationContext { plan: &plan, @@ -351,13 +347,16 @@ impl SourceIndexingContext { import_op_idx: self.source_idx, }; let process_time = chrono::Utc::now(); + let operation_in_process_stats_cloned = operation_in_process_stats.clone(); let row_indexer = row_indexer::RowIndexer::new( &eval_ctx, &self.setup_execution_ctx, mode, process_time, &update_stats, - operation_in_process_stats.as_ref().map(|s| s.as_ref()), + operation_in_process_stats_cloned + .as_ref() + .map(|s| s.as_ref()), &pool, )?; @@ -365,6 +364,9 @@ impl SourceIndexingContext { let mut row_state_operator = LocalSourceRowStateOperator::new(&row_input.key, &self.state, &update_stats); let mut ordinal_touched = false; + + let operation_in_process_stats_for_async = operation_in_process_stats.clone(); + let operation_name_for_async = operation_name.clone(); let result = { let row_state_operator = &mut row_state_operator; let row_key = &row_input.key; @@ -417,6 +419,9 @@ impl SourceIndexingContext { (ordinal, source_data.content_version_fp, value) } _ => { + if let Some(ref op_stats) = operation_in_process_stats_for_async { + op_stats.start_processing(&operation_name_for_async, 1); + } let data = import_op .executor .get_value( @@ -433,6 +438,9 @@ impl SourceIndexingContext { }, ) .await?; + if let Some(ref op_stats) = operation_in_process_stats_for_async { + op_stats.finish_processing(&operation_name_for_async, 1); + } ( data.ordinal.ok_or_else(|| { anyhow::anyhow!("ordinal is not available") @@ -496,9 +504,6 @@ impl SourceIndexingContext { // Track that we're finishing processing this row (regardless of success/failure) update_stats.processing.end(1); - if let Some(ref op_stats) = operation_in_process_stats { - op_stats.finish_processing(&operation_name, 1); - } result?; if let Some(ack_fn) = ack_fn { From c4ab79bb848a328d52bb08f65058548b40714106 Mon Sep 17 00:00:00 2001 From: DavdaJames Date: Sat, 4 Oct 2025 01:36:41 +0530 Subject: [PATCH 6/7] removed assigning current_scope as Option --- src/builder/analyzer.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index c9206ef6..4c64b350 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -982,13 +982,13 @@ impl AnalyzerContext { fn build_scope_qualifier(&self, op_scope: &Arc) -> String { let mut scope_names = Vec::new(); - let mut current_scope = Some(op_scope.as_ref()); + let mut current_scope = op_scope.as_ref(); // Walk up the parent chain to collect scope names - while let Some(scope) = current_scope { - if let Some((parent, _)) = &scope.parent { - scope_names.push(scope.name.clone()); - current_scope = Some(parent.as_ref()); + loop { + if let Some((parent, _)) = ¤t_scope.parent { + scope_names.push(current_scope.name.clone()); + current_scope = parent.as_ref(); } else { break; } From a48492f06e3a33b568d1c88553d922bb8e9ff762 Mon Sep 17 00:00:00 2001 From: DavdaJames Date: Sat, 4 Oct 2025 08:59:20 +0530 Subject: [PATCH 7/7] minor changes as required --- src/builder/analyzer.rs | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index 4c64b350..59ba94e2 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -985,24 +985,21 @@ impl AnalyzerContext { let mut current_scope = op_scope.as_ref(); // Walk up the parent chain to collect scope names - loop { - if let Some((parent, _)) = ¤t_scope.parent { - scope_names.push(current_scope.name.clone()); - current_scope = parent.as_ref(); - } else { - break; - } + while let Some((parent, _)) = ¤t_scope.parent { + scope_names.push(current_scope.name.as_str()); + current_scope = parent.as_ref(); } // Reverse to get the correct order (root to leaf) scope_names.reverse(); - // Build the qualifier string: "" for root, "name." for single level, "parent.child." for nested - if scope_names.is_empty() { - String::new() - } else { - format!("{}.", scope_names.join(".")) + // Build the qualifier string + let mut result = String::new(); + for name in scope_names { + result.push_str(&name); + result.push('.'); } + result } }