From 948a38889ff416efea1aa02de21a58f2919fb2ad Mon Sep 17 00:00:00 2001 From: LJ Date: Fri, 2 May 2025 15:07:24 -0700 Subject: [PATCH] feat(stats): polish wording for stats and add a flow change bucket --- src/execution/row_indexer.rs | 20 ++++++++++++++------ src/execution/stats.rs | 28 +++++++++++++++++----------- 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/src/execution/row_indexer.rs b/src/execution/row_indexer.rs index b5039d59..38a301e1 100644 --- a/src/execution/row_indexer.rs +++ b/src/execution/row_indexer.rs @@ -100,7 +100,7 @@ impl SourceVersion { }; if should_skip { if let Some(update_stats) = update_stats { - update_stats.num_skipped.inc(1); + update_stats.num_no_change.inc(1); } } should_skip @@ -491,8 +491,7 @@ pub async fn update_source_row( pool, ) .await?; - let already_exists = existing_tracking_info.is_some(); - let memoization_info = match existing_tracking_info { + let (memoization_info, existing_version) = match existing_tracking_info { Some(info) => { let existing_version = SourceVersion::from_stored( info.processed_source_ordinal, @@ -502,7 +501,10 @@ pub async fn update_source_row( if existing_version.should_skip(source_version, Some(update_stats)) { return Ok(SkippedOr::Skipped(existing_version)); } - info.memoization_info.and_then(|info| info.0) + ( + info.memoization_info.and_then(|info| info.0), + Some(existing_version), + ) } None => Default::default(), }; @@ -592,9 +594,15 @@ pub async fn update_source_row( ) .await?; - if already_exists { + if let Some(existing_version) = existing_version { if output.is_some() { - update_stats.num_repreocesses.inc(1); + if source_version.ordinal.is_none() + || source_version.ordinal != existing_version.ordinal + { + update_stats.num_updates.inc(1); + } else { + update_stats.num_reprocesses.inc(1); + } } else { update_stats.num_deletions.inc(1); } diff --git a/src/execution/stats.rs b/src/execution/stats.rs index 6b016f4d..4a58c05b 100644 --- a/src/execution/stats.rs +++ b/src/execution/stats.rs @@ -52,29 +52,34 @@ impl std::fmt::Debug for Counter { #[derive(Debug, Serialize, Default, Clone)] pub struct UpdateStats { - pub num_skipped: Counter, + pub num_no_change: Counter, pub num_insertions: Counter, pub num_deletions: Counter, - pub num_repreocesses: Counter, + /// Number of source rows that were updated. + pub num_updates: Counter, + /// Number of source rows that were reprocessed because of logic change. + pub num_reprocesses: Counter, pub num_errors: Counter, } impl UpdateStats { pub fn delta(&self, base: &Self) -> Self { UpdateStats { - num_skipped: self.num_skipped.delta(&base.num_skipped), + num_no_change: self.num_no_change.delta(&base.num_no_change), num_insertions: self.num_insertions.delta(&base.num_insertions), num_deletions: self.num_deletions.delta(&base.num_deletions), - num_repreocesses: self.num_repreocesses.delta(&base.num_repreocesses), + num_updates: self.num_updates.delta(&base.num_updates), + num_reprocesses: self.num_reprocesses.delta(&base.num_reprocesses), num_errors: self.num_errors.delta(&base.num_errors), } } pub fn is_zero(&self) -> bool { - self.num_skipped.get() == 0 + self.num_no_change.get() == 0 && self.num_insertions.get() == 0 && self.num_deletions.get() == 0 - && self.num_repreocesses.get() == 0 + && self.num_updates.get() == 0 + && self.num_reprocesses.get() == 0 && self.num_errors.get() == 0 } } @@ -87,18 +92,19 @@ impl std::fmt::Display for UpdateStats { messages.push(format!("{num_errors} source rows FAILED")); } - let num_skipped = self.num_skipped.get(); + let num_skipped = self.num_no_change.get(); if num_skipped > 0 { - messages.push(format!("{} source rows SKIPPED", num_skipped)); + messages.push(format!("{} source rows NO CHANGE", num_skipped)); } let num_insertions = self.num_insertions.get(); let num_deletions = self.num_deletions.get(); - let num_reprocesses = self.num_repreocesses.get(); - let num_source_rows = num_insertions + num_deletions + num_reprocesses; + let num_updates = self.num_updates.get(); + let num_reprocesses = self.num_reprocesses.get(); + let num_source_rows = num_insertions + num_deletions + num_updates + num_reprocesses; if num_source_rows > 0 { messages.push(format!( - "{num_source_rows} source rows processed: {num_insertions} ADDED, {num_deletions} REMOVED, {num_reprocesses} REPROCESSED", + "{num_source_rows} source rows processed ({num_insertions} ADDED, {num_deletions} REMOVED, {num_updates} UPDATED, {num_reprocesses} REPROCESSED on flow change)", )); }