diff --git a/Cargo.lock b/Cargo.lock index 8a13212e..a7dc7f82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1319,6 +1319,7 @@ dependencies = [ "hyper-util", "indenter", "indexmap 2.10.0", + "indicatif", "indoc", "infer 0.19.0", "itertools 0.14.0", @@ -1418,6 +1419,19 @@ dependencies = [ "yaml-rust2", ] +[[package]] +name = "console" +version = "0.15.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "054ccb5b10f9f2cbf51eb355ca1d05c2d279ce1804688d0db74b4733a5aeafd8" +dependencies = [ + "encode_unicode", + "libc", + "once_cell", + "unicode-width", + "windows-sys 0.59.0", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -1868,6 +1882,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "encode_unicode" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0" + [[package]] name = "encoding_rs" version = "0.8.35" @@ -3031,6 +3051,19 @@ dependencies = [ "serde", ] +[[package]] +name = "indicatif" +version = "0.17.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "183b3088984b400f4cfac3620d5e076c84da5364016b4f49473de574b2586235" +dependencies = [ + "console", + "number_prefix", + "portable-atomic", + "unicode-width", + "web-time", +] + [[package]] name = "indoc" version = "2.0.6" @@ -3522,6 +3555,12 @@ dependencies = [ "libc", ] +[[package]] +name = "number_prefix" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" + [[package]] name = "numpy" version = "0.25.0" @@ -6068,6 +6107,12 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" +[[package]] +name = "unicode-width" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254" + [[package]] name = "unicode-xid" version = "0.2.6" diff --git a/Cargo.toml b/Cargo.toml index 98866836..b26af946 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,7 @@ blake2 = "0.10.6" pgvector = { version = "0.4.1", features = ["sqlx", "halfvec"] } phf = { version = "0.12.1", features = ["macros"] } indenter = "0.3.4" +indicatif = "0.17.9" itertools = "0.14.0" derivative = "2.2.0" hex = "0.4.3" diff --git a/src/execution/live_updater.rs b/src/execution/live_updater.rs index 485d8c4e..760219cf 100644 --- a/src/execution/live_updater.rs +++ b/src/execution/live_updater.rs @@ -8,6 +8,7 @@ use crate::{ use super::stats; use futures::future::try_join_all; +use indicatif::ProgressBar; use sqlx::PgPool; use tokio::{sync::watch, task::JoinSet, time::MissedTickBehavior}; @@ -329,7 +330,47 @@ impl SourceUpdateTask { update_options: super::source_indexer::UpdateOptions, ) -> Result<()> { let update_stats = Arc::new(stats::UpdateStats::default()); - source_indexing_context + + // Spawn periodic stats reporting task if print_stats is enabled + let (reporting_handle, progress_bar) = if self.options.print_stats { + let update_stats_clone = update_stats.clone(); + let update_title_owned = update_title.to_string(); + let flow_name = self.flow.flow_instance.name.clone(); + let import_op_name = self.import_op().name.clone(); + + // Create a progress bar that will overwrite the same line + let pb = ProgressBar::new_spinner(); + pb.set_style( + indicatif::ProgressStyle::default_spinner() + .template("{msg}") + .unwrap(), + ); + let pb_clone = pb.clone(); + + let report_task = async move { + let mut interval = tokio::time::interval(REPORT_INTERVAL); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + interval.tick().await; // Skip first tick + + loop { + interval.tick().await; + let current_stats = update_stats_clone.as_ref().clone(); + if current_stats.has_any_change() { + // Show cumulative stats (always show latest total, not delta) + pb_clone.set_message(format!( + "{}.{} ({update_title_owned}): {}", + flow_name, import_op_name, current_stats + )); + } + } + }; + (Some(tokio::spawn(report_task)), Some(pb)) + } else { + (None, None) + }; + + // Run the actual update + let update_result = source_indexing_context .update(&self.pool, &update_stats, update_options) .await .with_context(|| { @@ -338,12 +379,28 @@ impl SourceUpdateTask { self.flow.flow_instance.name, self.import_op().name ) - })?; + }); + + // Cancel the reporting task if it was spawned + if let Some(handle) = reporting_handle { + handle.abort(); + } + + // Clear the progress bar to ensure final stats appear on a new line + if let Some(pb) = progress_bar { + pb.finish_and_clear(); + } + + // Check update result + update_result?; + if update_stats.has_any_change() { self.status_tx.send_modify(|update| { update.source_updates_num[self.source_idx] += 1; }); } + + // Report final stats self.report_stats(&update_stats, update_title); Ok(()) }