diff --git a/src/execution/live_updater.rs b/src/execution/live_updater.rs index c40326b6..6e434171 100644 --- a/src/execution/live_updater.rs +++ b/src/execution/live_updater.rs @@ -1,4 +1,7 @@ -use crate::{execution::stats::UpdateStats, prelude::*}; +use crate::{ + execution::{source_indexer::ProcessSourceKeyOptions, stats::UpdateStats}, + prelude::*, +}; use super::stats; use futures::future::try_join_all; @@ -191,14 +194,17 @@ impl SourceUpdateTask { .await?; tokio::spawn(source_context.clone().process_source_key( change.key, - Some(change.key_aux_info), - change.data, update_stats.clone(), concur_permit, Some(move || async move { SharedAckFn::ack(&shared_ack_fn).await }), pool.clone(), + ProcessSourceKeyOptions { + key_aux_info: Some(change.key_aux_info), + source_data: change.data, + ..Default::default() + }, )); } } diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index 3b4abb2b..bff79f34 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -48,6 +48,13 @@ pub struct SourceIndexingContext { pub const NO_ACK: Option Ready>> = None; +#[derive(Default)] +pub struct ProcessSourceKeyOptions { + /// `key_aux_info` is not available for deletions. It must not be provided if `source_data` is `None`. + pub key_aux_info: Option, + pub source_data: Option, +} + impl SourceIndexingContext { pub async fn load( flow: Arc, @@ -100,31 +107,29 @@ impl SourceIndexingContext { }) } - /// `key_aux_info` is not available for deletions. It must not be provided if `source_data` is `None`. pub async fn process_source_key< AckFut: Future> + Send + 'static, AckFn: FnOnce() -> AckFut, >( self: Arc, key: value::KeyValue, - key_aux_info: Option, - source_data: Option, update_stats: Arc, _concur_permit: concur_control::CombinedConcurrencyControllerPermit, ack_fn: Option, pool: PgPool, + options: ProcessSourceKeyOptions, ) { let process = async { let plan = self.flow.get_execution_plan().await?; let import_op = &plan.import_ops[self.source_idx]; let schema = &self.flow.data_schema; - let source_data = match source_data { + let source_data = match options.source_data { Some(source_data) => source_data, None => import_op .executor .get_value( &key, - key_aux_info.as_ref().ok_or_else(|| { + options.key_aux_info.as_ref().ok_or_else(|| { anyhow::anyhow!( "`key_aux_info` must be provided when there's no `source_data`" ) @@ -337,12 +342,14 @@ impl SourceIndexingContext { .await?; join_set.spawn(self.clone().process_source_key( row.key, - Some(row.key_aux_info), - None, update_stats.clone(), concur_permit, NO_ACK, pool.clone(), + ProcessSourceKeyOptions { + key_aux_info: Some(row.key_aux_info), + ..Default::default() + }, )); } } @@ -372,12 +379,14 @@ impl SourceIndexingContext { let concur_permit = import_op.concurrency_controller.acquire(Some(|| 0)).await?; join_set.spawn(self.clone().process_source_key( key, - None, - Some(source_data), update_stats.clone(), concur_permit, NO_ACK, pool.clone(), + ProcessSourceKeyOptions { + source_data: Some(source_data), + ..Default::default() + }, )); } while let Some(result) = join_set.join_next().await {