Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 87 additions & 109 deletions src/execution/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,6 @@ pub struct UpdateStats {
pub num_errors: Counter,
/// Processing counters for tracking in-process rows.
pub processing: ProcessingCounters,
/// Cumulative total count of items (for display purposes)
/// This represents the actual total after applying additions and deletions
#[serde(skip)]
pub cumulative_total: Counter,
}

impl UpdateStats {
Expand All @@ -123,7 +119,6 @@ impl UpdateStats {
num_reprocesses: self.num_reprocesses.delta(&base.num_reprocesses),
num_errors: self.num_errors.delta(&base.num_errors),
processing: self.processing.delta(&base.processing),
cumulative_total: self.cumulative_total.clone(),
}
}

Expand All @@ -135,9 +130,6 @@ impl UpdateStats {
self.num_reprocesses.merge(&delta.num_reprocesses);
self.num_errors.merge(&delta.num_errors);
self.processing.merge(&delta.processing);
// Update cumulative total: add insertions, subtract deletions
let net_change = delta.num_insertions.get() - delta.num_deletions.get();
self.cumulative_total.inc(net_change);
}

pub fn has_any_change(&self) -> bool {
Expand Down Expand Up @@ -199,113 +191,96 @@ impl OperationInProcessStats {
}
}

struct UpdateStatsSegment {
count: i64,
symbol: char,
label: &'static str,
bar_width: usize,
}

impl UpdateStatsSegment {
pub fn new(count: i64, symbol: char, label: &'static str) -> Self {
Self {
count,
symbol,
label,
bar_width: 0,
}
}
}

const BAR_WIDTH: usize = 40;

fn indices_of<T, const N: usize>(_: &[T; N]) -> [usize; N] {
std::array::from_fn(|i| i)
}
impl std::fmt::Display for UpdateStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut segments = Vec::new();
let num_insertions = self.num_insertions.get();
let num_deletions = self.num_deletions.get();
let num_updates = self.num_updates.get();
let num_no_change = self.num_no_change.get();
let num_errors = self.num_errors.get();
let mut segments: [UpdateStatsSegment; _] = [
UpdateStatsSegment::new(self.num_insertions.get(), '+', "added"),
UpdateStatsSegment::new(self.num_updates.get(), '~', "updated"),
UpdateStatsSegment::new(self.num_reprocesses.get(), '*', "reprocessed"),
UpdateStatsSegment::new(self.num_deletions.get(), '-', "deleted"),
UpdateStatsSegment::new(self.num_no_change.get(), '.', "no change"),
UpdateStatsSegment::new(self.num_errors.get(), '!', "errors"),
];
let num_in_process = self.processing.get_in_process();
let num_reprocesses = self.num_reprocesses.get();
let total = num_insertions + num_deletions + num_updates + num_no_change + num_reprocesses;
let processed_count = segments.iter().map(|seg| seg.count).sum::<i64>();
let total = num_in_process + processed_count;

// Progress bar segments
if total > 0 {
if num_insertions > 0 {
segments.push((num_insertions, "+", format!("(+{} added)", num_insertions)));
}
if num_deletions > 0 {
segments.push((num_deletions, "-", format!("(-{} removed)", num_deletions)));
}
if num_updates > 0 {
segments.push((num_updates, "~", format!("(~{} updated)", num_updates)));
}
if num_no_change > 0 {
segments.push((num_no_change, " ", "".to_string()));
}
if total <= 0 {
write!(f, "No input data")?;
return Ok(());
}

// Error handling
if num_errors > 0 {
write!(f, "{} rows failed", num_errors)?;
if !segments.is_empty() {
write!(f, "; ")?;
let mut segments_indices = indices_of(&segments);
segments_indices.sort_by_key(|&i| segments[i].count);

let mut remaining_width = BAR_WIDTH as u64;
let mut remaining_count = total as u64;
for idx in segments_indices.iter() {
let seg = &mut segments[*idx];
if seg.count > 0 {
if remaining_count == 0 {
error!("remaining_count is 0, but still have segments to process");
break;
}
let width = std::cmp::max(
// rounded division of remaining_width * seg.count / remaining_count
(remaining_width * (seg.count as u64) + remaining_count / 2) / remaining_count,
1,
);
seg.bar_width = width as usize;
remaining_width -= width;
remaining_count -= seg.count as u64;
}
}

// Progress bar
if !segments.is_empty() {
let mut sorted_segments = segments.clone();
sorted_segments.sort_by_key(|s| s.0);
sorted_segments.reverse();

let bar_width = 40;
let mut bar = String::new();

let mut remaining_width = bar_width;

for (count, segment_type, _) in sorted_segments.iter() {
let segment_width = (*count * bar_width as i64 / total as i64) as usize;
let width = std::cmp::min(segment_width, remaining_width);
if width > 0 {
// Calculate completed and remaining portions
let completed_portion =
(width as f64 * (total - num_in_process) as f64 / total as f64) as usize;
let remaining_portion = width - completed_portion;

// Add segment with appropriate characters based on type
if completed_portion > 0 {
let completed_char = match *segment_type {
"+" => "█",
"-" => "▓",
"~" => "▒",
_ => "░",
};
bar.push_str(&completed_char.repeat(completed_portion));
}

if remaining_portion > 0 {
let remaining_char = match *segment_type {
"+" => "▒",
"-" => "░",
"~" => "░",
_ => " ",
};
bar.push_str(&remaining_char.repeat(remaining_portion));
}

remaining_width = remaining_width.saturating_sub(width);
}
}
if remaining_width > 0 {
bar.push_str(&" ".repeat(remaining_width));
write!(f, "[")?;
for segment in segments.iter() {
for _ in 0..segment.bar_width {
write!(f, "{}", segment.symbol)?;
}
// Use total from current operations - this represents the actual record count
write!(f, "[{}] {}/{} records ", bar, total - num_in_process, total)?;

// Add segment labels with different grey shades for each segment type
let mut first = true;
for (_, segment_type, label) in segments.iter() {
if !label.is_empty() {
if !first {
write!(f, " ")?;
}
write!(f, "{}", label)?;
first = false;
}
}
} else {
write!(f, "No changes")?;
}

// In-process info with grey coloring
if num_in_process > 0 {
if !segments.is_empty() {
write!(f, " ")?;
for _ in 0..remaining_width {
write!(f, " ")?;
}
write!(f, "] {processed_count}/{total} source rows")?;

if processed_count > 0 {
let mut delimiter = ':';
for seg in segments.iter() {
if seg.count > 0 {
write!(
f,
"{delimiter} ({symbol}) {count} {label}",
count = seg.count,
symbol = seg.symbol,
label = seg.label,
)?;
delimiter = ',';
}
}
write!(f, "({} in process)", num_in_process)?;
}

Ok(())
Expand Down Expand Up @@ -575,21 +550,24 @@ mod tests {
let stats = UpdateStats::default();

// Test with no activity
assert_eq!(format!("{}", stats), "No changes");
assert_eq!(format!("{}", stats), "No input data");

// Test with in-process rows (no segments yet, so just shows in-process)
stats.processing.start(5);
let display = format!("{}", stats);
assert!(display.contains("5 in process"));
assert_eq!(
display,
"[ ] 0/5 source rows"
);

// Test with mixed activity
stats.num_insertions.inc(3);
stats.num_errors.inc(1);
stats.cumulative_total.inc(3);
let display = format!("{}", stats);
assert!(display.contains("1 rows failed"));
assert!(display.contains("(+3 added)"));
assert!(display.contains("5 in process"));
assert_eq!(
display,
"[++++++++++++++!!!! ] 4/9 source rows: (+) 3 added, (!) 1 errors"
);
}

#[test]
Expand Down
Loading