Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions src/execution/row_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl SourceVersion {
};
if should_skip {
if let Some(update_stats) = update_stats {
update_stats.num_skipped.inc(1);
update_stats.num_no_change.inc(1);
}
}
should_skip
Expand Down Expand Up @@ -491,8 +491,7 @@ pub async fn update_source_row(
pool,
)
.await?;
let already_exists = existing_tracking_info.is_some();
let memoization_info = match existing_tracking_info {
let (memoization_info, existing_version) = match existing_tracking_info {
Some(info) => {
let existing_version = SourceVersion::from_stored(
info.processed_source_ordinal,
Expand All @@ -502,7 +501,10 @@ pub async fn update_source_row(
if existing_version.should_skip(source_version, Some(update_stats)) {
return Ok(SkippedOr::Skipped(existing_version));
}
info.memoization_info.and_then(|info| info.0)
(
info.memoization_info.and_then(|info| info.0),
Some(existing_version),
)
}
None => Default::default(),
};
Expand Down Expand Up @@ -592,9 +594,15 @@ pub async fn update_source_row(
)
.await?;

if already_exists {
if let Some(existing_version) = existing_version {
if output.is_some() {
update_stats.num_repreocesses.inc(1);
if source_version.ordinal.is_none()
|| source_version.ordinal != existing_version.ordinal
{
update_stats.num_updates.inc(1);
} else {
update_stats.num_reprocesses.inc(1);
}
} else {
update_stats.num_deletions.inc(1);
}
Expand Down
28 changes: 17 additions & 11 deletions src/execution/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,34 @@ impl std::fmt::Debug for Counter {

#[derive(Debug, Serialize, Default, Clone)]
pub struct UpdateStats {
pub num_skipped: Counter,
pub num_no_change: Counter,
pub num_insertions: Counter,
pub num_deletions: Counter,
pub num_repreocesses: Counter,
/// Number of source rows that were updated.
pub num_updates: Counter,
/// Number of source rows that were reprocessed because of logic change.
pub num_reprocesses: Counter,
pub num_errors: Counter,
}

impl UpdateStats {
pub fn delta(&self, base: &Self) -> Self {
UpdateStats {
num_skipped: self.num_skipped.delta(&base.num_skipped),
num_no_change: self.num_no_change.delta(&base.num_no_change),
num_insertions: self.num_insertions.delta(&base.num_insertions),
num_deletions: self.num_deletions.delta(&base.num_deletions),
num_repreocesses: self.num_repreocesses.delta(&base.num_repreocesses),
num_updates: self.num_updates.delta(&base.num_updates),
num_reprocesses: self.num_reprocesses.delta(&base.num_reprocesses),
num_errors: self.num_errors.delta(&base.num_errors),
}
}

pub fn is_zero(&self) -> bool {
self.num_skipped.get() == 0
self.num_no_change.get() == 0
&& self.num_insertions.get() == 0
&& self.num_deletions.get() == 0
&& self.num_repreocesses.get() == 0
&& self.num_updates.get() == 0
&& self.num_reprocesses.get() == 0
&& self.num_errors.get() == 0
}
}
Expand All @@ -87,18 +92,19 @@ impl std::fmt::Display for UpdateStats {
messages.push(format!("{num_errors} source rows FAILED"));
}

let num_skipped = self.num_skipped.get();
let num_skipped = self.num_no_change.get();
if num_skipped > 0 {
messages.push(format!("{} source rows SKIPPED", num_skipped));
messages.push(format!("{} source rows NO CHANGE", num_skipped));
}

let num_insertions = self.num_insertions.get();
let num_deletions = self.num_deletions.get();
let num_reprocesses = self.num_repreocesses.get();
let num_source_rows = num_insertions + num_deletions + num_reprocesses;
let num_updates = self.num_updates.get();
let num_reprocesses = self.num_reprocesses.get();
let num_source_rows = num_insertions + num_deletions + num_updates + num_reprocesses;
if num_source_rows > 0 {
messages.push(format!(
"{num_source_rows} source rows processed: {num_insertions} ADDED, {num_deletions} REMOVED, {num_reprocesses} REPROCESSED",
"{num_source_rows} source rows processed ({num_insertions} ADDED, {num_deletions} REMOVED, {num_updates} UPDATED, {num_reprocesses} REPROCESSED on flow change)",
));
}

Expand Down