diff --git a/src/execution/live_updater.rs b/src/execution/live_updater.rs index f54603eb..f1811e08 100644 --- a/src/execution/live_updater.rs +++ b/src/execution/live_updater.rs @@ -3,8 +3,9 @@ use std::time::Instant; use crate::prelude::*; use super::stats; +use futures::future::try_join_all; use sqlx::PgPool; -use tokio::task::JoinSet; +use tokio::{task::JoinSet, time::MissedTickBehavior}; pub struct FlowLiveUpdater { flow_ctx: Arc, @@ -22,6 +23,14 @@ pub struct FlowLiveUpdaterOptions { pub print_stats: bool, } +struct StatsReportState { + last_report_time: Option, + last_stats: stats::UpdateStats, +} + +const MIN_REPORT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5); +const REPORT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10); + async fn update_source( flow_ctx: Arc, plan: Arc, @@ -35,44 +44,102 @@ async fn update_source( .await?; let import_op = &plan.import_ops[source_idx]; - let maybe_print_stats = |stats: &stats::UpdateStats| { + + let stats_report_state = Mutex::new(StatsReportState { + last_report_time: None, + last_stats: source_update_stats.as_ref().clone(), + }); + let report_stats = || { + let new_stats = source_update_stats.as_ref().clone(); + let now = Instant::now(); + let delta = { + let mut state = stats_report_state.lock().unwrap(); + if let Some(last_report_time) = state.last_report_time { + if now.duration_since(last_report_time) < MIN_REPORT_INTERVAL { + return; + } + } + let delta = new_stats.delta(&state.last_stats); + if delta.is_zero() { + return; + } + state.last_stats = new_stats; + state.last_report_time = Some(now); + delta + }; if options.print_stats { println!( "{}.{}: {}", - flow_ctx.flow.flow_instance.name, import_op.name, stats + flow_ctx.flow.flow_instance.name, import_op.name, delta ); } else { trace!( "{}.{}: {}", flow_ctx.flow.flow_instance.name, import_op.name, - stats + delta ); } }; - let mut update_start = Instant::now(); - source_context.update(&pool, &source_update_stats).await?; - maybe_print_stats(&source_update_stats); - - if let (true, Some(refresh_interval)) = ( - options.live_mode, - import_op.refresh_options.refresh_interval, - ) { - let mut last_stats = source_update_stats.as_ref().clone(); - loop { - let elapsed = update_start.elapsed(); - if elapsed < refresh_interval { - tokio::time::sleep(refresh_interval - elapsed).await; + let mut futs: Vec>> = Vec::new(); + + // Deal with change streams. + if let (true, Some(change_stream)) = (options.live_mode, import_op.executor.change_stream()) { + let pool = pool.clone(); + let source_update_stats = source_update_stats.clone(); + futs.push( + async move { + let mut change_stream = change_stream; + while let Some(change) = change_stream.next().await { + source_context + .process_change(change, &pool, &source_update_stats) + .map(tokio::spawn); + } + Ok(()) } - update_start = Instant::now(); + .boxed(), + ); + futs.push( + async move { + let mut interval = tokio::time::interval(REPORT_INTERVAL); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + interval.tick().await; + loop { + interval.tick().await; + report_stats(); + } + } + .boxed(), + ); + } + + // The main update loop. + let source_update_stats = source_update_stats.clone(); + futs.push( + async move { source_context.update(&pool, &source_update_stats).await?; + report_stats(); - let this_stats = source_update_stats.as_ref().clone(); - maybe_print_stats(&this_stats.delta(&last_stats)); - last_stats = this_stats; + if let (true, Some(refresh_interval)) = ( + options.live_mode, + import_op.refresh_options.refresh_interval, + ) { + let mut interval = tokio::time::interval(refresh_interval); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + interval.tick().await; + loop { + interval.tick().await; + source_context.update(&pool, &source_update_stats).await?; + report_stats(); + } + } + Ok(()) } - } + .boxed(), + ); + + try_join_all(futs).await?; Ok(()) } diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index 7f78823e..d4c3e62e 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -1,4 +1,4 @@ -use crate::prelude::*; +use crate::{ops::interface::SourceValueChange, prelude::*}; use sqlx::PgPool; use std::collections::{hash_map, HashMap}; @@ -6,7 +6,7 @@ use tokio::{sync::Semaphore, task::JoinSet}; use super::{ db_tracking, - row_indexer::{self, SkippedOr, SourceVersion}, + row_indexer::{self, SkippedOr, SourceVersion, SourceVersionKind}, stats, }; struct SourceRowIndexingState { @@ -78,21 +78,23 @@ impl SourceIndexingContext { }) } - fn process_source_key( + async fn process_source_key( self: Arc, key: value::KeyValue, source_version: SourceVersion, + value: Option, update_stats: Arc, processing_sem: Arc, pool: PgPool, - join_set: &mut JoinSet>, ) { - let fut = async move { + let process = async move { let permit = processing_sem.acquire().await?; let plan = self.flow.get_execution_plan().await?; let import_op = &plan.import_ops[self.source_idx]; let source_value = if source_version.kind == row_indexer::SourceVersionKind::Deleted { None + } else if let Some(value) = value { + Some(value) } else { // Even if the source version kind is not Deleted, the source value might be gone one polling. // In this case, we still use the current source version even if it's already stale - actually this version skew @@ -154,17 +156,19 @@ impl SourceIndexingContext { drop(permit); anyhow::Ok(()) }; - join_set.spawn(fut); + if let Err(e) = process.await { + error!("{:?}", e.context("Error in processing a source row")); + } } fn process_source_key_if_newer( self: &Arc, key: value::KeyValue, source_version: SourceVersion, + value: Option, update_stats: &Arc, pool: &PgPool, - join_set: &mut JoinSet>, - ) { + ) -> Option + Send + 'static> { let processing_sem = { let mut state = self.state.lock().unwrap(); let scan_generation = state.scan_generation; @@ -174,19 +178,19 @@ impl SourceIndexingContext { .source_version .should_skip(&source_version, Some(&update_stats)) { - return; + return None; } row_state.source_version = source_version.clone(); row_state.processing_sem.clone() }; - self.clone().process_source_key( + Some(self.clone().process_source_key( key, source_version, + value, update_stats.clone(), processing_sem, pool.clone(), - join_set, - ); + )) } pub async fn update( @@ -212,15 +216,18 @@ impl SourceIndexingContext { self.process_source_key_if_newer( row.key, SourceVersion::from_current(row.ordinal), + None, update_stats, pool, - &mut join_set, - ); + ) + .map(|fut| join_set.spawn(fut)); } } while let Some(result) = join_set.join_next().await { - if let Err(e) = (|| anyhow::Ok(result??))() { - error!("{:?}", e.context("Error in indexing a source row")); + if let Err(e) = result { + if !e.is_cancelled() { + error!("{:?}", e); + } } } @@ -239,21 +246,45 @@ impl SourceIndexingContext { deleted_key_versions }; for (key, source_version, processing_sem) in deleted_key_versions { - self.clone().process_source_key( + join_set.spawn(self.clone().process_source_key( key, source_version, + None, update_stats.clone(), processing_sem, pool.clone(), - &mut join_set, - ); + )); } while let Some(result) = join_set.join_next().await { - if let Err(e) = (|| anyhow::Ok(result??))() { - error!("{:?}", e.context("Error in deleting a source row")); + if let Err(e) = result { + if !e.is_cancelled() { + error!("{:?}", e); + } } } Ok(()) } + + pub fn process_change( + self: &Arc, + change: interface::SourceChange, + pool: &PgPool, + update_stats: &Arc, + ) -> Option + Send + 'static> { + let (source_version_kind, value) = match change.value { + SourceValueChange::Upsert(value) => (SourceVersionKind::CurrentLogic, value), + SourceValueChange::Delete => (SourceVersionKind::Deleted, None), + }; + self.process_source_key_if_newer( + change.key, + SourceVersion { + ordinal: change.ordinal, + kind: source_version_kind, + }, + value, + update_stats, + pool, + ) + } } diff --git a/src/execution/stats.rs b/src/execution/stats.rs index a2943068..c31963e1 100644 --- a/src/execution/stats.rs +++ b/src/execution/stats.rs @@ -69,6 +69,14 @@ impl UpdateStats { num_errors: self.num_errors.delta(&base.num_errors), } } + + pub fn is_zero(&self) -> bool { + self.num_skipped.get() == 0 + && self.num_insertions.get() == 0 + && self.num_deletions.get() == 0 + && self.num_repreocesses.get() == 0 + && self.num_errors.get() == 0 + } } impl std::fmt::Display for UpdateStats { @@ -97,6 +105,10 @@ impl std::fmt::Display for UpdateStats { ": {num_insertions} added, {num_deletions} removed, {num_reprocesses} repocessed", )?; } + + if num_skipped == 0 && num_source_rows == 0 { + write!(f, "no changes")?; + } Ok(()) } } diff --git a/src/ops/interface.rs b/src/ops/interface.rs index 49cc41fd..644442b2 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -46,12 +46,17 @@ pub struct SourceRowMetadata { pub ordinal: Option, } -pub struct SourceChange<'a> { +pub enum SourceValueChange { + /// None means value unavailable in this change - needs a separate poll by get_value() API. + Upsert(Option), + Delete, +} + +pub struct SourceChange { /// Last update/deletion ordinal. None means unavailable. pub ordinal: Option, pub key: KeyValue, - /// None means a deletion. None within the `BoxFuture` means the item is gone when polling. - pub value: Option>>>, + pub value: SourceValueChange, } #[derive(Debug, Default)] @@ -70,7 +75,7 @@ pub trait SourceExecutor: Send + Sync { // Get the value for the given key. async fn get_value(&self, key: &KeyValue) -> Result>; - fn change_stream<'a>(&'a self) -> Option>> { + fn change_stream<'a>(&'a self) -> Option> { None } }