From 7c8db2523f9e004609e67f4a5fae4e2dbd5b6281 Mon Sep 17 00:00:00 2001 From: Jiangzhou He Date: Wed, 2 Jul 2025 12:44:13 -0700 Subject: [PATCH] fix(stats): make stats more interpretable --- src/execution/live_updater.rs | 159 ++++++++++++++++------------------ src/execution/stats.rs | 26 ++++-- 2 files changed, 94 insertions(+), 91 deletions(-) diff --git a/src/execution/live_updater.rs b/src/execution/live_updater.rs index 3ee390f7..a2a23692 100644 --- a/src/execution/live_updater.rs +++ b/src/execution/live_updater.rs @@ -3,7 +3,6 @@ use crate::prelude::*; use super::stats; use futures::future::try_join_all; use sqlx::PgPool; -use std::time::Instant; use tokio::{task::JoinSet, time::MissedTickBehavior}; pub struct FlowLiveUpdater { @@ -22,12 +21,6 @@ pub struct FlowLiveUpdaterOptions { pub print_stats: bool, } -struct StatsReportState { - last_report_time: Option, - last_stats: stats::UpdateStats, -} - -const MIN_REPORT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5); const REPORT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10); struct SharedAckFn { @@ -74,32 +67,18 @@ async fn update_source( let import_op = &plan.import_ops[source_idx]; - let stats_report_state = Mutex::new(StatsReportState { - last_report_time: None, - last_stats: source_update_stats.as_ref().clone(), - }); - let report_stats = || { - let new_stats = source_update_stats.as_ref().clone(); - let now = Instant::now(); - let delta = { - let mut state = stats_report_state.lock().unwrap(); - if let Some(last_report_time) = state.last_report_time { - if now.duration_since(last_report_time) < MIN_REPORT_INTERVAL { - return; - } - } - let delta = new_stats.delta(&state.last_stats); - if delta.is_zero() { - return; - } - state.last_stats = new_stats; - state.last_report_time = Some(now); - delta - }; + let report_stats = |stats: &stats::UpdateStats, kind: &str| { + source_update_stats.merge(stats); if options.print_stats { - println!("{}.{}: {}", flow.flow_instance.name, import_op.name, delta); + println!( + "{}.{} ({kind}): {}", + flow.flow_instance.name, import_op.name, stats + ); } else { - trace!("{}.{}: {}", flow.flow_instance.name, import_op.name, delta); + trace!( + "{}.{} ({kind}): {}", + flow.flow_instance.name, import_op.name, stats + ); } }; @@ -108,68 +87,78 @@ async fn update_source( // Deal with change streams. if options.live_mode { if let Some(change_stream) = import_op.executor.change_stream().await? { - let pool = pool.clone(); - let source_update_stats = source_update_stats.clone(); + let change_stream_stats = Arc::new(stats::UpdateStats::default()); futs.push( - async move { - let mut change_stream = change_stream; - let retry_options = retryable::RetryOptions { - max_retries: None, - initial_backoff: std::time::Duration::from_secs(5), - max_backoff: std::time::Duration::from_secs(60), - }; - loop { - // Workaround as AsyncFnMut isn't mature yet. - // Should be changed to use AsyncFnMut once it is. - let change_stream = tokio::sync::Mutex::new(&mut change_stream); - let change_msg = retryable::run( - || async { - let mut change_stream = change_stream.lock().await; - change_stream - .next() - .await - .transpose() - .map_err(retryable::Error::always_retryable) - }, - &retry_options, - ) - .await?; - let change_msg = if let Some(change_msg) = change_msg { - change_msg - } else { - break; + { + let change_stream_stats = change_stream_stats.clone(); + let pool = pool.clone(); + async move { + let mut change_stream = change_stream; + let retry_options = retryable::RetryOptions { + max_retries: None, + initial_backoff: std::time::Duration::from_secs(5), + max_backoff: std::time::Duration::from_secs(60), }; - let ack_fn = change_msg.ack_fn.map(|ack_fn| { - Arc::new(Mutex::new(SharedAckFn::new( - change_msg.changes.iter().len(), - ack_fn, - ))) - }); - for change in change_msg.changes { - let ack_fn = ack_fn.clone(); - tokio::spawn(source_context.clone().process_source_key( - change.key, - change.data, - source_update_stats.clone(), - ack_fn.map(|ack_fn| { - move || async move { SharedAckFn::ack(&ack_fn).await } - }), - pool.clone(), - )); + loop { + // Workaround as AsyncFnMut isn't mature yet. + // Should be changed to use AsyncFnMut once it is. + let change_stream = tokio::sync::Mutex::new(&mut change_stream); + let change_msg = retryable::run( + || async { + let mut change_stream = change_stream.lock().await; + change_stream + .next() + .await + .transpose() + .map_err(retryable::Error::always_retryable) + }, + &retry_options, + ) + .await?; + let change_msg = if let Some(change_msg) = change_msg { + change_msg + } else { + break; + }; + let ack_fn = change_msg.ack_fn.map(|ack_fn| { + Arc::new(Mutex::new(SharedAckFn::new( + change_msg.changes.iter().len(), + ack_fn, + ))) + }); + for change in change_msg.changes { + let ack_fn = ack_fn.clone(); + tokio::spawn(source_context.clone().process_source_key( + change.key, + change.data, + change_stream_stats.clone(), + ack_fn.map(|ack_fn| { + move || async move { SharedAckFn::ack(&ack_fn).await } + }), + pool.clone(), + )); + } } + Ok(()) } - Ok(()) } .boxed(), ); + futs.push( async move { let mut interval = tokio::time::interval(REPORT_INTERVAL); + let mut last_change_stream_stats = change_stream_stats.as_ref().clone(); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); interval.tick().await; loop { interval.tick().await; - report_stats(); + let curr_change_stream_stats = change_stream_stats.as_ref().clone(); + let delta = curr_change_stream_stats.delta(&last_change_stream_stats); + if !delta.has_any_change() { + report_stats(&delta, "change stream"); + last_change_stream_stats = curr_change_stream_stats; + } } } .boxed(), @@ -178,11 +167,11 @@ async fn update_source( } // The main update loop. - let source_update_stats = source_update_stats.clone(); futs.push( async move { - source_context.update(&pool, &source_update_stats).await?; - report_stats(); + let update_stats = Arc::new(stats::UpdateStats::default()); + source_context.update(&pool, &update_stats).await?; + report_stats(&update_stats, "batch update"); if let (true, Some(refresh_interval)) = ( options.live_mode, @@ -193,8 +182,10 @@ async fn update_source( interval.tick().await; loop { interval.tick().await; - source_context.update(&pool, &source_update_stats).await?; - report_stats(); + + let update_stats = Arc::new(stats::UpdateStats::default()); + source_context.update(&pool, &update_stats).await?; + report_stats(&update_stats, "interval refresh"); } } Ok(()) diff --git a/src/execution/stats.rs b/src/execution/stats.rs index 4a58c05b..64e814a6 100644 --- a/src/execution/stats.rs +++ b/src/execution/stats.rs @@ -24,6 +24,10 @@ impl Counter { pub fn into_inner(self) -> i64 { self.0.into_inner() } + + pub fn merge(&self, delta: &Self) { + self.0.fetch_add(delta.get(), Relaxed); + } } impl AddAssign for Counter { @@ -74,13 +78,21 @@ impl UpdateStats { } } - pub fn is_zero(&self) -> bool { - self.num_no_change.get() == 0 - && self.num_insertions.get() == 0 - && self.num_deletions.get() == 0 - && self.num_updates.get() == 0 - && self.num_reprocesses.get() == 0 - && self.num_errors.get() == 0 + pub fn merge(&self, delta: &Self) { + self.num_no_change.merge(&delta.num_no_change); + self.num_insertions.merge(&delta.num_insertions); + self.num_deletions.merge(&delta.num_deletions); + self.num_updates.merge(&delta.num_updates); + self.num_reprocesses.merge(&delta.num_reprocesses); + self.num_errors.merge(&delta.num_errors); + } + + pub fn has_any_change(&self) -> bool { + self.num_insertions.get() > 0 + || self.num_deletions.get() > 0 + || self.num_updates.get() > 0 + || self.num_reprocesses.get() > 0 + || self.num_errors.get() > 0 } }