From 6746861196bf0901a8a575673a3c85b7f17f370f Mon Sep 17 00:00:00 2001 From: Jiangzhou He Date: Thu, 30 Oct 2025 12:13:53 -0700 Subject: [PATCH] feat: report elapsed time for processing --- src/execution/live_updater.rs | 40 ++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/src/execution/live_updater.rs b/src/execution/live_updater.rs index c5748680..e69c16d1 100644 --- a/src/execution/live_updater.rs +++ b/src/execution/live_updater.rs @@ -10,6 +10,7 @@ use super::stats; use futures::future::try_join_all; use indicatif::ProgressBar; use sqlx::PgPool; +use std::fmt::Write; use tokio::{sync::watch, task::JoinSet, time::MissedTickBehavior}; use tokio_util::task::AbortOnDropHandle; @@ -251,7 +252,7 @@ impl SourceUpdateTask { let curr_change_stream_stats = change_stream_stats.as_ref().clone(); let delta = curr_change_stream_stats.delta(&last_change_stream_stats); if delta.has_any_change() { - task.report_stats(&delta, "change stream"); + task.report_stats(&delta, "change stream", None); last_change_stream_stats = curr_change_stream_stats; } } @@ -303,22 +304,30 @@ impl SourceUpdateTask { Ok(()) } - fn report_stats(&self, stats: &stats::UpdateStats, update_title: &str) { + fn report_stats( + &self, + stats: &stats::UpdateStats, + update_title: &str, + start_time: Option, + ) { self.source_update_stats.merge(stats); + let mut message = format!( + "{}.{} ({update_title}): {stats}", + self.flow.flow_instance.name, + self.import_op().name + ); + if let Some(start_time) = start_time { + write!( + &mut message, + " [processing time: {:.3}s]", + start_time.elapsed().as_secs_f64() + ) + .expect("Failed to write to message"); + } if self.options.print_stats { - println!( - "{}.{} ({update_title}): {}", - self.flow.flow_instance.name, - self.import_op().name, - stats - ); + println!("{message}"); } else { - trace!( - "{}.{} ({update_title}): {}", - self.flow.flow_instance.name, - self.import_op().name, - stats - ); + trace!("{message}"); } } @@ -328,6 +337,7 @@ impl SourceUpdateTask { update_title: &str, update_options: super::source_indexer::UpdateOptions, ) -> Result<()> { + let now = std::time::Instant::now(); let update_stats = Arc::new(stats::UpdateStats::default()); // Spawn periodic stats reporting task if print_stats is enabled @@ -403,7 +413,7 @@ impl SourceUpdateTask { } // Report final stats - self.report_stats(&update_stats, update_title); + self.report_stats(&update_stats, update_title, Some(now)); Ok(()) }