diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index 57be0c16..59ba94e2 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -969,14 +969,38 @@ impl AnalyzerContext { op_futs.push(self.analyze_reactive_op(op_scope, reactive_op).await?); } let collector_len = op_scope.states.lock().unwrap().collectors.len(); + let scope_qualifier = self.build_scope_qualifier(op_scope); let result_fut = async move { Ok(AnalyzedOpScope { reactive_ops: try_join_all(op_futs).await?, collector_len, + scope_qualifier, }) }; Ok(result_fut) } + + fn build_scope_qualifier(&self, op_scope: &Arc) -> String { + let mut scope_names = Vec::new(); + let mut current_scope = op_scope.as_ref(); + + // Walk up the parent chain to collect scope names + while let Some((parent, _)) = ¤t_scope.parent { + scope_names.push(current_scope.name.as_str()); + current_scope = parent.as_ref(); + } + + // Reverse to get the correct order (root to leaf) + scope_names.reverse(); + + // Build the qualifier string + let mut result = String::new(); + for name in scope_names { + result.push_str(&name); + result.push('.'); + } + result + } } pub fn build_flow_instance_context( @@ -1059,6 +1083,7 @@ pub async fn analyze_flow( let target_factory = get_target_factory(&target_kind)?; let analyzed_target_op_group = AnalyzedExportTargetOpGroup { target_factory, + target_kind: target_kind.clone(), op_idx: op_ids.export_op_ids, }; export_ops_futs.extend( diff --git a/src/builder/plan.rs b/src/builder/plan.rs index 2aa878d5..88d08c60 100644 --- a/src/builder/plan.rs +++ b/src/builder/plan.rs @@ -117,6 +117,7 @@ pub struct AnalyzedExportOp { pub struct AnalyzedExportTargetOpGroup { pub target_factory: Arc, + pub target_kind: String, pub op_idx: Vec, } @@ -129,6 +130,7 @@ pub enum AnalyzedReactiveOp { pub struct AnalyzedOpScope { pub reactive_ops: Vec, pub collector_len: usize, + pub scope_qualifier: String, } pub struct ExecutionPlan { diff --git a/src/execution/evaluator.rs b/src/execution/evaluator.rs index 71319378..471c9995 100644 --- a/src/execution/evaluator.rs +++ b/src/execution/evaluator.rs @@ -322,6 +322,7 @@ async fn evaluate_child_op_scope( child_scope_entry: ScopeEntry<'_>, concurrency_controller: &concur_control::ConcurrencyController, memory: &EvaluationMemory, + operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>, ) -> Result<()> { let _permit = concurrency_controller .acquire(Some(|| { @@ -333,32 +334,46 @@ async fn evaluate_child_op_scope( .sum() })) .await?; - evaluate_op_scope(op_scope, scoped_entries.prepend(&child_scope_entry), memory) - .await - .with_context(|| { - format!( - "Evaluating in scope with key {}", - match child_scope_entry.key.key() { - Some(k) => k.to_string(), - None => "()".to_string(), - } - ) - }) + evaluate_op_scope( + op_scope, + scoped_entries.prepend(&child_scope_entry), + memory, + operation_in_process_stats, + ) + .await + .with_context(|| { + format!( + "Evaluating in scope with key {}", + match child_scope_entry.key.key() { + Some(k) => k.to_string(), + None => "()".to_string(), + } + ) + }) } async fn evaluate_op_scope( op_scope: &AnalyzedOpScope, scoped_entries: RefList<'_, &ScopeEntry<'_>>, memory: &EvaluationMemory, + operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>, ) -> Result<()> { let head_scope = *scoped_entries.head().unwrap(); for reactive_op in op_scope.reactive_ops.iter() { match reactive_op { AnalyzedReactiveOp::Transform(op) => { + // Track transform operation start + if let Some(ref op_stats) = operation_in_process_stats { + let transform_key = + format!("transform/{}{}", op_scope.scope_qualifier, op.name); + op_stats.start_processing(&transform_key, 1); + } + let mut input_values = Vec::with_capacity(op.inputs.len()); input_values .extend(assemble_input_values(&op.inputs, scoped_entries).collect::>()); - if op.function_exec_info.enable_cache { + + let result = if op.function_exec_info.enable_cache { let output_value_cell = memory.get_cache_entry( || { Ok(op @@ -382,7 +397,16 @@ async fn evaluate_op_scope( .await .and_then(|v| head_scope.define_field(&op.output, &v)) } - .with_context(|| format!("Evaluating Transform op `{}`", op.name,))? + .with_context(|| format!("Evaluating Transform op `{}`", op.name,)); + + // Track transform operation completion + if let Some(ref op_stats) = operation_in_process_stats { + let transform_key = + format!("transform/{}{}", op_scope.scope_qualifier, op.name); + op_stats.finish_processing(&transform_key, 1); + } + + result? } AnalyzedReactiveOp::ForEach(op) => { @@ -408,6 +432,7 @@ async fn evaluate_op_scope( ), &op.concurrency_controller, memory, + operation_in_process_stats, ) }) .collect::>(), @@ -425,6 +450,7 @@ async fn evaluate_op_scope( ), &op.concurrency_controller, memory, + operation_in_process_stats, ) }) .collect::>(), @@ -443,6 +469,7 @@ async fn evaluate_op_scope( ), &op.concurrency_controller, memory, + operation_in_process_stats, ) }) .collect::>(), @@ -509,6 +536,7 @@ pub async fn evaluate_source_entry( src_eval_ctx: &SourceRowEvaluationContext<'_>, source_value: value::FieldValues, memory: &EvaluationMemory, + operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>, ) -> Result { let _permit = src_eval_ctx .import_op @@ -556,6 +584,7 @@ pub async fn evaluate_source_entry( &src_eval_ctx.plan.op_scope, RefList::Nil.prepend(&root_scope_entry), memory, + operation_in_process_stats, ) .await?; let collected_values = root_scope_entry @@ -604,6 +633,7 @@ pub async fn evaluate_transient_flow( &flow.execution_plan.op_scope, RefList::Nil.prepend(&root_scope_entry), &eval_memory, + None, // No operation stats for transient flows ) .await?; let output_value = assemble_value( diff --git a/src/execution/live_updater.rs b/src/execution/live_updater.rs index 13dde89e..8299f94b 100644 --- a/src/execution/live_updater.rs +++ b/src/execution/live_updater.rs @@ -27,6 +27,8 @@ pub struct FlowLiveUpdater { flow_ctx: Arc, join_set: Mutex>>>, stats_per_task: Vec>, + /// Global tracking of in-process rows per operation + pub operation_in_process_stats: Arc, recv_state: tokio::sync::Mutex, num_remaining_tasks_rx: watch::Receiver, @@ -83,6 +85,7 @@ struct SourceUpdateTask { plan: Arc, execution_ctx: Arc>, source_update_stats: Arc, + operation_in_process_stats: Arc, pool: PgPool, options: FlowLiveUpdaterOptions, @@ -137,6 +140,7 @@ impl SourceUpdateTask { let change_stream_stats = change_stream_stats.clone(); let pool = self.pool.clone(); let status_tx = self.status_tx.clone(); + let operation_in_process_stats = self.operation_in_process_stats.clone(); async move { let mut change_stream = change_stream; let retry_options = retryable::RetryOptions { @@ -203,6 +207,7 @@ impl SourceUpdateTask { }, super::source_indexer::UpdateMode::Normal, update_stats.clone(), + Some(operation_in_process_stats.clone()), concur_permit, Some(move || async move { SharedAckFn::ack(&shared_ack_fn).await @@ -328,6 +333,7 @@ impl FlowLiveUpdater { let mut join_set = JoinSet::new(); let mut stats_per_task = Vec::new(); + let operation_in_process_stats = Arc::new(stats::OperationInProcessStats::default()); for source_idx in 0..plan.import_ops.len() { let source_update_stats = Arc::new(stats::UpdateStats::default()); @@ -337,6 +343,7 @@ impl FlowLiveUpdater { plan: plan.clone(), execution_ctx: execution_ctx.clone(), source_update_stats: source_update_stats.clone(), + operation_in_process_stats: operation_in_process_stats.clone(), pool: pool.clone(), options: options.clone(), status_tx: status_tx.clone(), @@ -345,10 +352,12 @@ impl FlowLiveUpdater { join_set.spawn(source_update_task.run()); stats_per_task.push(source_update_stats); } + Ok(Self { flow_ctx, join_set: Mutex::new(Some(join_set)), stats_per_task, + operation_in_process_stats, recv_state: tokio::sync::Mutex::new(UpdateReceiveState { status_rx, last_num_source_updates: vec![0; plan.import_ops.len()], diff --git a/src/execution/row_indexer.rs b/src/execution/row_indexer.rs index d135ee7b..59cf310c 100644 --- a/src/execution/row_indexer.rs +++ b/src/execution/row_indexer.rs @@ -183,6 +183,7 @@ pub struct RowIndexer<'a> { setup_execution_ctx: &'a exec_ctx::FlowSetupExecutionContext, mode: super::source_indexer::UpdateMode, update_stats: &'a stats::UpdateStats, + operation_in_process_stats: Option<&'a stats::OperationInProcessStats>, pool: &'a PgPool, source_id: i32, @@ -201,6 +202,7 @@ impl<'a> RowIndexer<'a> { mode: super::source_indexer::UpdateMode, process_time: chrono::DateTime, update_stats: &'a stats::UpdateStats, + operation_in_process_stats: Option<&'a stats::OperationInProcessStats>, pool: &'a PgPool, ) -> Result { Ok(Self { @@ -212,6 +214,7 @@ impl<'a> RowIndexer<'a> { setup_execution_ctx, mode, update_stats, + operation_in_process_stats, pool, }) } @@ -311,9 +314,13 @@ impl<'a> RowIndexer<'a> { }, ); - let output = - evaluate_source_entry(self.src_eval_ctx, source_value, &evaluation_memory) - .await?; + let output = evaluate_source_entry( + self.src_eval_ctx, + source_value, + &evaluation_memory, + self.operation_in_process_stats, + ) + .await?; let mut stored_info = evaluation_memory.into_stored()?; if tracking_setup_state.has_fast_fingerprint_column { (Some(output), stored_info, content_version_fp) @@ -368,9 +375,27 @@ impl<'a> RowIndexer<'a> { }) .collect(); (!mutations_w_ctx.is_empty()).then(|| { - export_op_group - .target_factory - .apply_mutation(mutations_w_ctx) + let export_key = format!("export/{}", export_op_group.target_kind); + let operation_in_process_stats = self.operation_in_process_stats; + + async move { + // Track export operation start + if let Some(ref op_stats) = operation_in_process_stats { + op_stats.start_processing(&export_key, 1); + } + + let result = export_op_group + .target_factory + .apply_mutation(mutations_w_ctx) + .await; + + // Track export operation completion + if let Some(ref op_stats) = operation_in_process_stats { + op_stats.finish_processing(&export_key, 1); + } + + result + } }) }); @@ -875,7 +900,7 @@ pub async fn evaluate_source_entry_with_memory( .ok_or_else(|| anyhow::anyhow!("value not returned"))?; let output = match source_value { interface::SourceValue::Existence(source_value) => { - Some(evaluate_source_entry(src_eval_ctx, source_value, &memory).await?) + Some(evaluate_source_entry(src_eval_ctx, source_value, &memory, None).await?) } interface::SourceValue::NonExistence => None, }; diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index 7545ebf1..0dca6777 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -317,15 +317,28 @@ impl SourceIndexingContext { row_input: ProcessSourceRowInput, mode: UpdateMode, update_stats: Arc, + operation_in_process_stats: Option>, _concur_permit: concur_control::CombinedConcurrencyControllerPermit, ack_fn: Option, pool: PgPool, ) { + // Store operation name for tracking cleanup + let operation_name = { + let plan_result = self.flow.get_execution_plan().await; + match plan_result { + Ok(plan) => format!("import/{}", plan.import_ops[self.source_idx].name), + Err(_) => "import/unknown".to_string(), + } + }; + let process = async { let plan = self.flow.get_execution_plan().await?; let import_op = &plan.import_ops[self.source_idx]; let schema = &self.flow.data_schema; + // Track that we're starting to process this row + update_stats.processing.start(1); + let eval_ctx = SourceRowEvaluationContext { plan: &plan, import_op, @@ -334,12 +347,16 @@ impl SourceIndexingContext { import_op_idx: self.source_idx, }; let process_time = chrono::Utc::now(); + let operation_in_process_stats_cloned = operation_in_process_stats.clone(); let row_indexer = row_indexer::RowIndexer::new( &eval_ctx, &self.setup_execution_ctx, mode, process_time, &update_stats, + operation_in_process_stats_cloned + .as_ref() + .map(|s| s.as_ref()), &pool, )?; @@ -347,6 +364,9 @@ impl SourceIndexingContext { let mut row_state_operator = LocalSourceRowStateOperator::new(&row_input.key, &self.state, &update_stats); let mut ordinal_touched = false; + + let operation_in_process_stats_for_async = operation_in_process_stats.clone(); + let operation_name_for_async = operation_name.clone(); let result = { let row_state_operator = &mut row_state_operator; let row_key = &row_input.key; @@ -399,6 +419,9 @@ impl SourceIndexingContext { (ordinal, source_data.content_version_fp, value) } _ => { + if let Some(ref op_stats) = operation_in_process_stats_for_async { + op_stats.start_processing(&operation_name_for_async, 1); + } let data = import_op .executor .get_value( @@ -415,6 +438,9 @@ impl SourceIndexingContext { }, ) .await?; + if let Some(ref op_stats) = operation_in_process_stats_for_async { + op_stats.finish_processing(&operation_name_for_async, 1); + } ( data.ordinal.ok_or_else(|| { anyhow::anyhow!("ordinal is not available") @@ -474,7 +500,12 @@ impl SourceIndexingContext { result }; let process_and_ack = async { - process.await?; + let result = process.await; + + // Track that we're finishing processing this row (regardless of success/failure) + update_stats.processing.end(1); + + result?; if let Some(ack_fn) = ack_fn { ack_fn().await?; } @@ -603,6 +634,7 @@ impl SourceIndexingContext { }, update_options.mode, update_stats.clone(), + None, // operation_in_process_stats concur_permit, NO_ACK, pool.clone(), @@ -642,6 +674,7 @@ impl SourceIndexingContext { }, update_options.mode, update_stats.clone(), + None, // operation_in_process_stats concur_permit, NO_ACK, pool.clone(), diff --git a/src/execution/stats.rs b/src/execution/stats.rs index fb1f94af..88f5207a 100644 --- a/src/execution/stats.rs +++ b/src/execution/stats.rs @@ -54,6 +54,47 @@ impl std::fmt::Debug for Counter { } } +#[derive(Debug, Serialize, Default, Clone)] +pub struct ProcessingCounters { + /// Total number of processing operations started. + pub num_starts: Counter, + /// Total number of processing operations ended. + pub num_ends: Counter, +} + +impl ProcessingCounters { + /// Start processing the specified number of items. + pub fn start(&self, count: i64) { + self.num_starts.inc(count); + } + + /// End processing the specified number of items. + pub fn end(&self, count: i64) { + self.num_ends.inc(count); + } + + /// Get the current number of items being processed (starts - ends). + pub fn get_in_process(&self) -> i64 { + let ends = self.num_ends.get(); + let starts = self.num_starts.get(); + starts - ends + } + + /// Calculate the delta between this and a base ProcessingCounters. + pub fn delta(&self, base: &Self) -> Self { + ProcessingCounters { + num_starts: self.num_starts.delta(&base.num_starts), + num_ends: self.num_ends.delta(&base.num_ends), + } + } + + /// Merge a delta into this ProcessingCounters. + pub fn merge(&self, delta: &Self) { + self.num_starts.merge(&delta.num_starts); + self.num_ends.merge(&delta.num_ends); + } +} + #[derive(Debug, Serialize, Default, Clone)] pub struct UpdateStats { pub num_no_change: Counter, @@ -64,6 +105,8 @@ pub struct UpdateStats { /// Number of source rows that were reprocessed because of logic change. pub num_reprocesses: Counter, pub num_errors: Counter, + /// Processing counters for tracking in-process rows. + pub processing: ProcessingCounters, } impl UpdateStats { @@ -75,6 +118,7 @@ impl UpdateStats { num_updates: self.num_updates.delta(&base.num_updates), num_reprocesses: self.num_reprocesses.delta(&base.num_reprocesses), num_errors: self.num_errors.delta(&base.num_errors), + processing: self.processing.delta(&base.processing), } } @@ -85,6 +129,7 @@ impl UpdateStats { self.num_updates.merge(&delta.num_updates); self.num_reprocesses.merge(&delta.num_reprocesses); self.num_errors.merge(&delta.num_errors); + self.processing.merge(&delta.processing); } pub fn has_any_change(&self) -> bool { @@ -96,6 +141,56 @@ impl UpdateStats { } } +/// Per-operation tracking of in-process row counts. +#[derive(Debug, Default)] +pub struct OperationInProcessStats { + /// Maps operation names to their processing counters. + operation_counters: std::sync::RwLock>, +} + +impl OperationInProcessStats { + /// Start processing rows for the specified operation. + pub fn start_processing(&self, operation_name: &str, count: i64) { + let mut counters = self.operation_counters.write().unwrap(); + let counter = counters.entry(operation_name.to_string()).or_default(); + counter.start(count); + } + + /// Finish processing rows for the specified operation. + pub fn finish_processing(&self, operation_name: &str, count: i64) { + let counters = self.operation_counters.write().unwrap(); + if let Some(counter) = counters.get(operation_name) { + counter.end(count); + } + } + + /// Get the current in-process count for a specific operation. + pub fn get_operation_in_process_count(&self, operation_name: &str) -> i64 { + let counters = self.operation_counters.read().unwrap(); + counters + .get(operation_name) + .map_or(0, |counter| counter.get_in_process()) + } + + /// Get a snapshot of all operation in-process counts. + pub fn get_all_operations_in_process(&self) -> std::collections::HashMap { + let counters = self.operation_counters.read().unwrap(); + counters + .iter() + .map(|(name, counter)| (name.clone(), counter.get_in_process())) + .collect() + } + + /// Get the total in-process count across all operations. + pub fn get_total_in_process_count(&self) -> i64 { + let counters = self.operation_counters.read().unwrap(); + counters + .values() + .map(|counter| counter.get_in_process()) + .sum() + } +} + impl std::fmt::Display for UpdateStats { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let mut messages = Vec::new(); @@ -136,6 +231,11 @@ impl std::fmt::Display for UpdateStats { )); } + let num_in_process = self.processing.get_in_process(); + if num_in_process > 0 { + messages.push(format!("{num_in_process} source rows IN PROCESS")); + } + if !messages.is_empty() { write!(f, "{}", messages.join("; "))?; } else { @@ -171,3 +271,390 @@ impl std::fmt::Display for IndexUpdateInfo { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + use std::thread; + + #[test] + fn test_processing_counters() { + let counters = ProcessingCounters::default(); + + // Initially should be zero + assert_eq!(counters.get_in_process(), 0); + assert_eq!(counters.num_starts.get(), 0); + assert_eq!(counters.num_ends.get(), 0); + + // Start processing some items + counters.start(5); + assert_eq!(counters.get_in_process(), 5); + assert_eq!(counters.num_starts.get(), 5); + assert_eq!(counters.num_ends.get(), 0); + + // Start processing more items + counters.start(3); + assert_eq!(counters.get_in_process(), 8); + assert_eq!(counters.num_starts.get(), 8); + assert_eq!(counters.num_ends.get(), 0); + + // End processing some items + counters.end(2); + assert_eq!(counters.get_in_process(), 6); + assert_eq!(counters.num_starts.get(), 8); + assert_eq!(counters.num_ends.get(), 2); + + // End processing remaining items + counters.end(6); + assert_eq!(counters.get_in_process(), 0); + assert_eq!(counters.num_starts.get(), 8); + assert_eq!(counters.num_ends.get(), 8); + } + + #[test] + fn test_processing_counters_delta_and_merge() { + let base = ProcessingCounters::default(); + let current = ProcessingCounters::default(); + + // Set up base state + base.start(5); + base.end(2); + + // Set up current state + current.start(12); + current.end(4); + + // Calculate delta + let delta = current.delta(&base); + assert_eq!(delta.num_starts.get(), 7); // 12 - 5 + assert_eq!(delta.num_ends.get(), 2); // 4 - 2 + assert_eq!(delta.get_in_process(), 5); // 7 - 2 + + // Test merge + let merged = ProcessingCounters::default(); + merged.start(10); + merged.end(3); + merged.merge(&delta); + assert_eq!(merged.num_starts.get(), 17); // 10 + 7 + assert_eq!(merged.num_ends.get(), 5); // 3 + 2 + assert_eq!(merged.get_in_process(), 12); // 17 - 5 + } + + #[test] + fn test_update_stats_in_process_tracking() { + let stats = UpdateStats::default(); + + // Initially should be zero + assert_eq!(stats.processing.get_in_process(), 0); + + // Start processing some rows + stats.processing.start(5); + assert_eq!(stats.processing.get_in_process(), 5); + + // Start processing more rows + stats.processing.start(3); + assert_eq!(stats.processing.get_in_process(), 8); + + // Finish processing some rows + stats.processing.end(2); + assert_eq!(stats.processing.get_in_process(), 6); + + // Finish processing remaining rows + stats.processing.end(6); + assert_eq!(stats.processing.get_in_process(), 0); + } + + #[test] + fn test_update_stats_thread_safety() { + let stats = Arc::new(UpdateStats::default()); + let mut handles = Vec::new(); + + // Spawn multiple threads that concurrently increment and decrement + for i in 0..10 { + let stats_clone = Arc::clone(&stats); + let handle = thread::spawn(move || { + // Each thread processes 100 rows + stats_clone.processing.start(100); + + // Simulate some work + thread::sleep(std::time::Duration::from_millis(i * 10)); + + // Finish processing + stats_clone.processing.end(100); + }); + handles.push(handle); + } + + // Wait for all threads to complete + for handle in handles { + handle.join().unwrap(); + } + + // Should be back to zero + assert_eq!(stats.processing.get_in_process(), 0); + } + + #[test] + fn test_operation_in_process_stats() { + let op_stats = OperationInProcessStats::default(); + + // Initially should be zero for all operations + assert_eq!(op_stats.get_operation_in_process_count("op1"), 0); + assert_eq!(op_stats.get_total_in_process_count(), 0); + + // Start processing rows for different operations + op_stats.start_processing("op1", 5); + op_stats.start_processing("op2", 3); + + assert_eq!(op_stats.get_operation_in_process_count("op1"), 5); + assert_eq!(op_stats.get_operation_in_process_count("op2"), 3); + assert_eq!(op_stats.get_total_in_process_count(), 8); + + // Get all operations snapshot + let all_ops = op_stats.get_all_operations_in_process(); + assert_eq!(all_ops.len(), 2); + assert_eq!(all_ops.get("op1"), Some(&5)); + assert_eq!(all_ops.get("op2"), Some(&3)); + + // Finish processing some rows + op_stats.finish_processing("op1", 2); + assert_eq!(op_stats.get_operation_in_process_count("op1"), 3); + assert_eq!(op_stats.get_total_in_process_count(), 6); + + // Finish processing all remaining rows + op_stats.finish_processing("op1", 3); + op_stats.finish_processing("op2", 3); + assert_eq!(op_stats.get_total_in_process_count(), 0); + } + + #[test] + fn test_operation_in_process_stats_thread_safety() { + let op_stats = Arc::new(OperationInProcessStats::default()); + let mut handles = Vec::new(); + + // Spawn threads for different operations + for i in 0..5 { + let op_stats_clone = Arc::clone(&op_stats); + let op_name = format!("operation_{}", i); + + let handle = thread::spawn(move || { + // Each operation processes 50 rows + op_stats_clone.start_processing(&op_name, 50); + + // Simulate some work + thread::sleep(std::time::Duration::from_millis(i * 20)); + + // Finish processing + op_stats_clone.finish_processing(&op_name, 50); + }); + handles.push(handle); + } + + // Wait for all threads to complete + for handle in handles { + handle.join().unwrap(); + } + + // Should be back to zero + assert_eq!(op_stats.get_total_in_process_count(), 0); + } + + #[test] + fn test_update_stats_merge_with_in_process() { + let stats1 = UpdateStats::default(); + let stats2 = UpdateStats::default(); + + // Set up different counts + stats1.processing.start(10); + stats1.num_insertions.inc(5); + + stats2.processing.start(15); + stats2.num_updates.inc(3); + + // Merge stats2 into stats1 + stats1.merge(&stats2); + + // Check that all counters were merged correctly + assert_eq!(stats1.processing.get_in_process(), 25); // 10 + 15 + assert_eq!(stats1.num_insertions.get(), 5); + assert_eq!(stats1.num_updates.get(), 3); + } + + #[test] + fn test_update_stats_delta_with_in_process() { + let base = UpdateStats::default(); + let current = UpdateStats::default(); + + // Set up base state + base.processing.start(5); + base.num_insertions.inc(2); + + // Set up current state + current.processing.start(12); + current.num_insertions.inc(7); + current.num_updates.inc(3); + + // Calculate delta + let delta = current.delta(&base); + + // Check that delta contains the differences + assert_eq!(delta.processing.get_in_process(), 7); // 12 - 5 + assert_eq!(delta.num_insertions.get(), 5); // 7 - 2 + assert_eq!(delta.num_updates.get(), 3); // 3 - 0 + } + + #[test] + fn test_update_stats_display_with_in_process() { + let stats = UpdateStats::default(); + + // Test with no activity + assert_eq!(format!("{}", stats), "No changes"); + + // Test with in-process rows + stats.processing.start(5); + assert!(format!("{}", stats).contains("5 source rows IN PROCESS")); + + // Test with mixed activity + stats.num_insertions.inc(3); + stats.num_errors.inc(1); + let display = format!("{}", stats); + assert!(display.contains("1 source rows FAILED")); + assert!(display.contains("3 source rows processed")); + assert!(display.contains("5 source rows IN PROCESS")); + } + + #[test] + fn test_granular_operation_tracking_integration() { + let op_stats = OperationInProcessStats::default(); + + // Simulate import operations + op_stats.start_processing("import_users", 5); + op_stats.start_processing("import_orders", 3); + + // Simulate transform operations + op_stats.start_processing("transform_user_data", 4); + op_stats.start_processing("transform_order_data", 2); + + // Simulate export operations + op_stats.start_processing("export_to_postgres", 3); + op_stats.start_processing("export_to_elasticsearch", 2); + + // Check individual operation counts + assert_eq!(op_stats.get_operation_in_process_count("import_users"), 5); + assert_eq!( + op_stats.get_operation_in_process_count("transform_user_data"), + 4 + ); + assert_eq!( + op_stats.get_operation_in_process_count("export_to_postgres"), + 3 + ); + + // Check total count across all operations + assert_eq!(op_stats.get_total_in_process_count(), 19); // 5+3+4+2+3+2 + + // Check snapshot of all operations + let all_ops = op_stats.get_all_operations_in_process(); + assert_eq!(all_ops.len(), 6); + assert_eq!(all_ops.get("import_users"), Some(&5)); + assert_eq!(all_ops.get("transform_user_data"), Some(&4)); + assert_eq!(all_ops.get("export_to_postgres"), Some(&3)); + + // Finish some operations + op_stats.finish_processing("import_users", 2); + op_stats.finish_processing("transform_user_data", 4); + op_stats.finish_processing("export_to_postgres", 1); + + // Verify counts after completion + assert_eq!(op_stats.get_operation_in_process_count("import_users"), 3); // 5-2 + assert_eq!( + op_stats.get_operation_in_process_count("transform_user_data"), + 0 + ); // 4-4 + assert_eq!( + op_stats.get_operation_in_process_count("export_to_postgres"), + 2 + ); // 3-1 + assert_eq!(op_stats.get_total_in_process_count(), 12); // 3+3+0+2+2+2 + } + + #[test] + fn test_operation_tracking_with_realistic_pipeline() { + let op_stats = OperationInProcessStats::default(); + + // Simulate a realistic processing pipeline scenario + // Import phase: Start processing 100 rows + op_stats.start_processing("users_import", 100); + assert_eq!(op_stats.get_total_in_process_count(), 100); + + // Transform phase: As import finishes, transform starts + for i in 0..100 { + // Each imported row triggers a transform + if i % 10 == 0 { + // Complete import batch every 10 items + op_stats.finish_processing("users_import", 10); + } + + // Start transform for each item + op_stats.start_processing("user_transform", 1); + + // Some transforms complete quickly + if i % 5 == 0 { + op_stats.finish_processing("user_transform", 1); + } + } + + // Verify intermediate state + assert_eq!(op_stats.get_operation_in_process_count("users_import"), 0); // All imports finished + assert_eq!( + op_stats.get_operation_in_process_count("user_transform"), + 80 + ); // 100 started - 20 finished + + // Export phase: As transforms finish, exports start + for i in 0..80 { + op_stats.finish_processing("user_transform", 1); + op_stats.start_processing("user_export", 1); + + // Some exports complete + if i % 3 == 0 { + op_stats.finish_processing("user_export", 1); + } + } + + // Final verification + assert_eq!(op_stats.get_operation_in_process_count("users_import"), 0); + assert_eq!(op_stats.get_operation_in_process_count("user_transform"), 0); + assert_eq!(op_stats.get_operation_in_process_count("user_export"), 53); // 80 - 27 (80/3 rounded down) + assert_eq!(op_stats.get_total_in_process_count(), 53); + } + + #[test] + fn test_operation_tracking_cumulative_behavior() { + let op_stats = OperationInProcessStats::default(); + + // Test that operation tracking maintains cumulative behavior for delta calculations + let snapshot1 = OperationInProcessStats::default(); + + // Initial state + op_stats.start_processing("test_op", 10); + op_stats.finish_processing("test_op", 3); + + // Simulate taking a snapshot (in real code, this would involve cloning counters) + // For testing, will manually create the "previous" state + snapshot1.start_processing("test_op", 10); + snapshot1.finish_processing("test_op", 3); + + // Continue processing + op_stats.start_processing("test_op", 5); + op_stats.finish_processing("test_op", 2); + + // Verify cumulative nature + // op_stats should have: starts=15, ends=5, in_process=10 + // snapshot1 should have: starts=10, ends=3, in_process=7 + // Delta would be: starts=5, ends=2, net_change=3 + + assert_eq!(op_stats.get_operation_in_process_count("test_op"), 10); // 15-5 + } +}