Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/execution/live_updater.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::{execution::stats::UpdateStats, prelude::*};
use crate::{
execution::{source_indexer::ProcessSourceKeyOptions, stats::UpdateStats},
prelude::*,
};

use super::stats;
use futures::future::try_join_all;
Expand Down Expand Up @@ -191,14 +194,17 @@ impl SourceUpdateTask {
.await?;
tokio::spawn(source_context.clone().process_source_key(
change.key,
Some(change.key_aux_info),
change.data,
update_stats.clone(),
concur_permit,
Some(move || async move {
SharedAckFn::ack(&shared_ack_fn).await
}),
pool.clone(),
ProcessSourceKeyOptions {
key_aux_info: Some(change.key_aux_info),
source_data: change.data,
..Default::default()
},
));
}
}
Expand Down
27 changes: 18 additions & 9 deletions src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ pub struct SourceIndexingContext {

pub const NO_ACK: Option<fn() -> Ready<Result<()>>> = None;

#[derive(Default)]
pub struct ProcessSourceKeyOptions {
/// `key_aux_info` is not available for deletions. It must not be provided if `source_data` is `None`.
pub key_aux_info: Option<serde_json::Value>,
pub source_data: Option<interface::SourceData>,
}

impl SourceIndexingContext {
pub async fn load(
flow: Arc<builder::AnalyzedFlow>,
Expand Down Expand Up @@ -100,31 +107,29 @@ impl SourceIndexingContext {
})
}

/// `key_aux_info` is not available for deletions. It must not be provided if `source_data` is `None`.
pub async fn process_source_key<
AckFut: Future<Output = Result<()>> + Send + 'static,
AckFn: FnOnce() -> AckFut,
>(
self: Arc<Self>,
key: value::KeyValue,
key_aux_info: Option<serde_json::Value>,
source_data: Option<interface::SourceData>,
update_stats: Arc<stats::UpdateStats>,
_concur_permit: concur_control::CombinedConcurrencyControllerPermit,
ack_fn: Option<AckFn>,
pool: PgPool,
options: ProcessSourceKeyOptions,
) {
let process = async {
let plan = self.flow.get_execution_plan().await?;
let import_op = &plan.import_ops[self.source_idx];
let schema = &self.flow.data_schema;
let source_data = match source_data {
let source_data = match options.source_data {
Some(source_data) => source_data,
None => import_op
.executor
.get_value(
&key,
key_aux_info.as_ref().ok_or_else(|| {
options.key_aux_info.as_ref().ok_or_else(|| {
anyhow::anyhow!(
"`key_aux_info` must be provided when there's no `source_data`"
)
Expand Down Expand Up @@ -337,12 +342,14 @@ impl SourceIndexingContext {
.await?;
join_set.spawn(self.clone().process_source_key(
row.key,
Some(row.key_aux_info),
None,
update_stats.clone(),
concur_permit,
NO_ACK,
pool.clone(),
ProcessSourceKeyOptions {
key_aux_info: Some(row.key_aux_info),
..Default::default()
},
));
}
}
Expand Down Expand Up @@ -372,12 +379,14 @@ impl SourceIndexingContext {
let concur_permit = import_op.concurrency_controller.acquire(Some(|| 0)).await?;
join_set.spawn(self.clone().process_source_key(
key,
None,
Some(source_data),
update_stats.clone(),
concur_permit,
NO_ACK,
pool.clone(),
ProcessSourceKeyOptions {
source_data: Some(source_data),
..Default::default()
},
));
}
while let Some(result) = join_set.join_next().await {
Expand Down
Loading