diff --git a/src/execution/stats.rs b/src/execution/stats.rs index 4dab89fc..dfc89366 100644 --- a/src/execution/stats.rs +++ b/src/execution/stats.rs @@ -107,10 +107,6 @@ pub struct UpdateStats { pub num_errors: Counter, /// Processing counters for tracking in-process rows. pub processing: ProcessingCounters, - /// Cumulative total count of items (for display purposes) - /// This represents the actual total after applying additions and deletions - #[serde(skip)] - pub cumulative_total: Counter, } impl UpdateStats { @@ -123,7 +119,6 @@ impl UpdateStats { num_reprocesses: self.num_reprocesses.delta(&base.num_reprocesses), num_errors: self.num_errors.delta(&base.num_errors), processing: self.processing.delta(&base.processing), - cumulative_total: self.cumulative_total.clone(), } } @@ -135,9 +130,6 @@ impl UpdateStats { self.num_reprocesses.merge(&delta.num_reprocesses); self.num_errors.merge(&delta.num_errors); self.processing.merge(&delta.processing); - // Update cumulative total: add insertions, subtract deletions - let net_change = delta.num_insertions.get() - delta.num_deletions.get(); - self.cumulative_total.inc(net_change); } pub fn has_any_change(&self) -> bool { @@ -199,113 +191,96 @@ impl OperationInProcessStats { } } +struct UpdateStatsSegment { + count: i64, + symbol: char, + label: &'static str, + bar_width: usize, +} + +impl UpdateStatsSegment { + pub fn new(count: i64, symbol: char, label: &'static str) -> Self { + Self { + count, + symbol, + label, + bar_width: 0, + } + } +} + +const BAR_WIDTH: usize = 40; + +fn indices_of(_: &[T; N]) -> [usize; N] { + std::array::from_fn(|i| i) +} impl std::fmt::Display for UpdateStats { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut segments = Vec::new(); - let num_insertions = self.num_insertions.get(); - let num_deletions = self.num_deletions.get(); - let num_updates = self.num_updates.get(); - let num_no_change = self.num_no_change.get(); - let num_errors = self.num_errors.get(); + let mut segments: [UpdateStatsSegment; _] = [ + UpdateStatsSegment::new(self.num_insertions.get(), '+', "added"), + UpdateStatsSegment::new(self.num_updates.get(), '~', "updated"), + UpdateStatsSegment::new(self.num_reprocesses.get(), '*', "reprocessed"), + UpdateStatsSegment::new(self.num_deletions.get(), '-', "deleted"), + UpdateStatsSegment::new(self.num_no_change.get(), '.', "no change"), + UpdateStatsSegment::new(self.num_errors.get(), '!', "errors"), + ]; let num_in_process = self.processing.get_in_process(); - let num_reprocesses = self.num_reprocesses.get(); - let total = num_insertions + num_deletions + num_updates + num_no_change + num_reprocesses; + let processed_count = segments.iter().map(|seg| seg.count).sum::(); + let total = num_in_process + processed_count; - // Progress bar segments - if total > 0 { - if num_insertions > 0 { - segments.push((num_insertions, "+", format!("(+{} added)", num_insertions))); - } - if num_deletions > 0 { - segments.push((num_deletions, "-", format!("(-{} removed)", num_deletions))); - } - if num_updates > 0 { - segments.push((num_updates, "~", format!("(~{} updated)", num_updates))); - } - if num_no_change > 0 { - segments.push((num_no_change, " ", "".to_string())); - } + if total <= 0 { + write!(f, "No input data")?; + return Ok(()); } - // Error handling - if num_errors > 0 { - write!(f, "{} rows failed", num_errors)?; - if !segments.is_empty() { - write!(f, "; ")?; + let mut segments_indices = indices_of(&segments); + segments_indices.sort_by_key(|&i| segments[i].count); + + let mut remaining_width = BAR_WIDTH as u64; + let mut remaining_count = total as u64; + for idx in segments_indices.iter() { + let seg = &mut segments[*idx]; + if seg.count > 0 { + if remaining_count == 0 { + error!("remaining_count is 0, but still have segments to process"); + break; + } + let width = std::cmp::max( + // rounded division of remaining_width * seg.count / remaining_count + (remaining_width * (seg.count as u64) + remaining_count / 2) / remaining_count, + 1, + ); + seg.bar_width = width as usize; + remaining_width -= width; + remaining_count -= seg.count as u64; } } - // Progress bar - if !segments.is_empty() { - let mut sorted_segments = segments.clone(); - sorted_segments.sort_by_key(|s| s.0); - sorted_segments.reverse(); - - let bar_width = 40; - let mut bar = String::new(); - - let mut remaining_width = bar_width; - - for (count, segment_type, _) in sorted_segments.iter() { - let segment_width = (*count * bar_width as i64 / total as i64) as usize; - let width = std::cmp::min(segment_width, remaining_width); - if width > 0 { - // Calculate completed and remaining portions - let completed_portion = - (width as f64 * (total - num_in_process) as f64 / total as f64) as usize; - let remaining_portion = width - completed_portion; - - // Add segment with appropriate characters based on type - if completed_portion > 0 { - let completed_char = match *segment_type { - "+" => "█", - "-" => "▓", - "~" => "▒", - _ => "░", - }; - bar.push_str(&completed_char.repeat(completed_portion)); - } - - if remaining_portion > 0 { - let remaining_char = match *segment_type { - "+" => "▒", - "-" => "░", - "~" => "░", - _ => " ", - }; - bar.push_str(&remaining_char.repeat(remaining_portion)); - } - - remaining_width = remaining_width.saturating_sub(width); - } - } - if remaining_width > 0 { - bar.push_str(&" ".repeat(remaining_width)); + write!(f, "[")?; + for segment in segments.iter() { + for _ in 0..segment.bar_width { + write!(f, "{}", segment.symbol)?; } - // Use total from current operations - this represents the actual record count - write!(f, "[{}] {}/{} records ", bar, total - num_in_process, total)?; - - // Add segment labels with different grey shades for each segment type - let mut first = true; - for (_, segment_type, label) in segments.iter() { - if !label.is_empty() { - if !first { - write!(f, " ")?; - } - write!(f, "{}", label)?; - first = false; - } - } - } else { - write!(f, "No changes")?; } - - // In-process info with grey coloring - if num_in_process > 0 { - if !segments.is_empty() { - write!(f, " ")?; + for _ in 0..remaining_width { + write!(f, " ")?; + } + write!(f, "] {processed_count}/{total} source rows")?; + + if processed_count > 0 { + let mut delimiter = ':'; + for seg in segments.iter() { + if seg.count > 0 { + write!( + f, + "{delimiter} ({symbol}) {count} {label}", + count = seg.count, + symbol = seg.symbol, + label = seg.label, + )?; + delimiter = ','; + } } - write!(f, "({} in process)", num_in_process)?; } Ok(()) @@ -575,21 +550,24 @@ mod tests { let stats = UpdateStats::default(); // Test with no activity - assert_eq!(format!("{}", stats), "No changes"); + assert_eq!(format!("{}", stats), "No input data"); // Test with in-process rows (no segments yet, so just shows in-process) stats.processing.start(5); let display = format!("{}", stats); - assert!(display.contains("5 in process")); + assert_eq!( + display, + "[ ] 0/5 source rows" + ); // Test with mixed activity stats.num_insertions.inc(3); stats.num_errors.inc(1); - stats.cumulative_total.inc(3); let display = format!("{}", stats); - assert!(display.contains("1 rows failed")); - assert!(display.contains("(+3 added)")); - assert!(display.contains("5 in process")); + assert_eq!( + display, + "[++++++++++++++!!!! ] 4/9 source rows: (+) 3 added, (!) 1 errors" + ); } #[test]