diff --git a/src/execution/db_tracking.rs b/src/execution/db_tracking.rs index a7165203..141e0917 100644 --- a/src/execution/db_tracking.rs +++ b/src/execution/db_tracking.rs @@ -1,6 +1,8 @@ +use crate::prelude::*; + use super::{db_tracking_setup::TrackingTableSetupState, memoization::StoredMemoizationInfo}; use crate::utils::{db::WriteAction, fingerprint::Fingerprint}; -use anyhow::Result; +use futures::Stream; use sqlx::PgPool; /// (target_key, process_ordinal, fingerprint) @@ -41,6 +43,7 @@ pub struct SourceTrackingInfoForPrecommit { pub staging_target_keys: sqlx::types::Json, pub processed_source_ordinal: Option, + pub process_logic_fingerprint: Option>, pub process_ordinal: Option, pub target_keys: Option>, } @@ -52,7 +55,7 @@ pub async fn read_source_tracking_info_for_precommit( db_executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>, ) -> Result> { let query_str = format!( - "SELECT max_process_ordinal, staging_target_keys, processed_source_ordinal, process_ordinal, target_keys FROM {} WHERE source_id = $1 AND source_key = $2", + "SELECT max_process_ordinal, staging_target_keys, processed_source_ordinal, process_logic_fingerprint, process_ordinal, target_keys FROM {} WHERE source_id = $1 AND source_key = $2", db_setup.table_name ); let precommit_tracking_info = sqlx::query_as(&query_str) @@ -178,22 +181,33 @@ pub async fn delete_source_tracking_info( } #[derive(sqlx::FromRow, Debug)] -pub struct SourceTrackingKey { +pub struct TrackedSourceKeyMetadata { pub source_key: serde_json::Value, + pub processed_source_ordinal: Option, + pub process_logic_fingerprint: Option>, } -pub async fn list_source_tracking_keys( - source_id: i32, - db_setup: &TrackingTableSetupState, - pool: &PgPool, -) -> Result> { - let query_str = format!( - "SELECT source_key FROM {} WHERE source_id = $1", +pub struct ListTrackedSourceKeyMetadataState { + query_str: String, +} + +impl ListTrackedSourceKeyMetadataState { + pub fn new() -> Self { + Self { + query_str: String::new(), + } + } + + pub fn list<'a>( + &'a mut self, + source_id: i32, + db_setup: &'a TrackingTableSetupState, + pool: &'a PgPool, + ) -> impl Stream> + 'a { + self.query_str = format!( + "SELECT source_key, processed_source_ordinal, process_logic_fingerprint FROM {} WHERE source_id = $1", db_setup.table_name ); - let keys: Vec = sqlx::query_as(&query_str) - .bind(source_id) - .fetch_all(pool) - .await?; - Ok(keys) + sqlx::query_as(&self.query_str).bind(source_id).fetch(pool) + } } diff --git a/src/execution/row_indexer.rs b/src/execution/row_indexer.rs index 49b7c171..df81d10f 100644 --- a/src/execution/row_indexer.rs +++ b/src/execution/row_indexer.rs @@ -1,7 +1,6 @@ use crate::prelude::*; use futures::future::try_join_all; -use log::error; use sqlx::PgPool; use std::collections::{HashMap, HashSet}; use std::sync::atomic::Ordering::Relaxed; @@ -39,9 +38,79 @@ pub fn extract_primary_key( Ok(key) } -pub enum UnchangedOr { +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)] +pub enum SourceVersionKind { + #[default] + NonExistent, + DifferentLogic, + CurrentLogic, + Deleted, +} + +#[derive(Debug, Clone, Default)] +pub struct SourceVersion { + pub ordinal: Option, + pub kind: SourceVersionKind, +} + +impl SourceVersion { + pub fn from_stored( + stored_ordinal: Option, + stored_fp: &Option>, + curr_fp: Fingerprint, + ) -> Self { + Self { + ordinal: stored_ordinal.map(Ordinal), + kind: match &stored_fp { + Some(stored_fp) => { + if stored_fp.as_slice() == curr_fp.0.as_slice() { + SourceVersionKind::CurrentLogic + } else { + SourceVersionKind::DifferentLogic + } + } + None => SourceVersionKind::NonExistent, + }, + } + } + + pub fn from_current(ordinal: Option) -> Self { + Self { + ordinal, + kind: SourceVersionKind::CurrentLogic, + } + } + + pub fn for_deletion(&self) -> Self { + Self { + ordinal: self.ordinal, + kind: SourceVersionKind::Deleted, + } + } + + pub fn should_skip( + &self, + target: &SourceVersion, + update_stats: Option<&stats::UpdateStats>, + ) -> bool { + let should_skip = match (self.ordinal, target.ordinal) { + (Some(orginal), Some(target_ordinal)) => { + orginal > target_ordinal || (orginal == target_ordinal && self.kind >= target.kind) + } + _ => false, + }; + if should_skip { + if let Some(update_stats) = update_stats { + update_stats.num_skipped.fetch_add(1, Relaxed); + } + } + should_skip + } +} + +pub enum SkippedOr { Normal(T), - Unchanged, + Skipped(SourceVersion), } #[derive(Default)] @@ -80,13 +149,15 @@ struct PrecommitOutput { async fn precommit_source_tracking_info( source_id: i32, source_key_json: &serde_json::Value, - source_ordinal: Option, + source_version: &SourceVersion, + logic_fp: Fingerprint, data: Option>, process_timestamp: &chrono::DateTime, db_setup: &db_tracking_setup::TrackingTableSetupState, export_ops: &[AnalyzedExportOp], + update_stats: &stats::UpdateStats, pool: &PgPool, -) -> Result> { +) -> Result> { let mut txn = pool.begin().await?; let tracking_info = db_tracking::read_source_tracking_info_for_precommit( @@ -96,15 +167,17 @@ async fn precommit_source_tracking_info( &mut *txn, ) .await?; - let tracking_info_exists = tracking_info.is_some(); - if source_ordinal.is_some() - && tracking_info - .as_ref() - .and_then(|info| info.processed_source_ordinal) - > source_ordinal - { - return Ok(UnchangedOr::Unchanged); + if let Some(tracking_info) = &tracking_info { + let existing_source_version = SourceVersion::from_stored( + tracking_info.processed_source_ordinal, + &tracking_info.process_logic_fingerprint, + logic_fp, + ); + if existing_source_version.should_skip(source_version, Some(update_stats)) { + return Ok(SkippedOr::Skipped(existing_source_version)); + } } + let tracking_info_exists = tracking_info.is_some(); let process_ordinal = (tracking_info .as_ref() .map(|info| info.max_process_ordinal) @@ -269,7 +342,7 @@ async fn precommit_source_tracking_info( txn.commit().await?; - Ok(UnchangedOr::Normal(PrecommitOutput { + Ok(SkippedOr::Normal(PrecommitOutput { metadata: PrecommitMetadata { source_entry_exists: data.is_some(), process_ordinal, @@ -283,13 +356,13 @@ async fn precommit_source_tracking_info( async fn commit_source_tracking_info( source_id: i32, source_key_json: &serde_json::Value, - source_ordinal: Option, + source_version: &SourceVersion, logic_fingerprint: &[u8], precommit_metadata: PrecommitMetadata, process_timestamp: &chrono::DateTime, db_setup: &db_tracking_setup::TrackingTableSetupState, pool: &PgPool, -) -> Result> { +) -> Result<()> { let mut txn = pool.begin().await?; let tracking_info = db_tracking::read_source_tracking_info_for_commit( @@ -303,7 +376,7 @@ async fn commit_source_tracking_info( if tracking_info.as_ref().and_then(|info| info.process_ordinal) >= Some(precommit_metadata.process_ordinal) { - return Ok(UnchangedOr::Unchanged); + return Ok(()); } let cleaned_staging_target_keys = tracking_info @@ -345,7 +418,7 @@ async fn commit_source_tracking_info( source_id, source_key_json, cleaned_staging_target_keys, - source_ordinal, + source_version.ordinal.map(|o| o.into()), logic_fingerprint, precommit_metadata.process_ordinal, process_timestamp.timestamp_micros(), @@ -363,7 +436,7 @@ async fn commit_source_tracking_info( txn.commit().await?; - Ok(UnchangedOr::Normal(())) + Ok(()) } pub async fn evaluate_source_entry_with_memory( @@ -407,10 +480,11 @@ pub async fn update_source_row( source_op: &AnalyzedSourceOp, schema: &schema::DataSchema, key: &value::KeyValue, - only_for_deletion: bool, + source_value: Option, + source_version: &SourceVersion, pool: &PgPool, - stats: &stats::UpdateStats, -) -> Result<()> { + update_stats: &stats::UpdateStats, +) -> Result> { let source_key_json = serde_json::to_value(key)?; let process_timestamp = chrono::Utc::now(); @@ -423,84 +497,50 @@ pub async fn update_source_row( ) .await?; let already_exists = existing_tracking_info.is_some(); - let (existing_source_ordinal, existing_logic_fingerprint, memoization_info) = - match existing_tracking_info { - Some(info) => ( - info.processed_source_ordinal.map(Ordinal), - info.process_logic_fingerprint, - info.memoization_info.map(|info| info.0).flatten(), - ), - None => Default::default(), - }; - let (source_ordinal, output, stored_mem_info) = if !only_for_deletion { - let source_data = source_op.executor.get_value(key).await?; - let source_ordinal = source_data.as_ref().and_then(|d| d.ordinal); - match (source_ordinal, existing_source_ordinal) { - // TODO: Collapse if the source is not newer and the processing logic is not changed. - (Some(source_ordinal), Some(existing_source_ordinal)) => { - if source_ordinal < existing_source_ordinal - || (source_ordinal == existing_source_ordinal - && existing_logic_fingerprint.as_ref().map(|v| v.as_slice()) - == Some(plan.logic_fingerprint.0.as_slice())) - { - // TODO: We should detect based on finer grain fingerprint. - stats.num_skipped.fetch_add(1, Relaxed); - return Ok(()); - } - } - _ => {} - } - let source_value = match source_data { - Some(d) => d.value.await?, - None => None, - }; - match source_value { - Some(source_value) => { - let evaluation_memory = EvaluationMemory::new( - process_timestamp, - memoization_info, - EvaluationMemoryOptions { - enable_cache: true, - evaluation_only: false, - }, - ); - let output = evaluate_source_entry( - plan, - source_op, - schema, - key, - source_value, - &evaluation_memory, - ) - .await?; - ( - source_ordinal, - Some(output), - evaluation_memory.into_stored()?, - ) + let memoization_info = match existing_tracking_info { + Some(info) => { + let existing_version = SourceVersion::from_stored( + info.processed_source_ordinal, + &info.process_logic_fingerprint, + plan.logic_fingerprint, + ); + if existing_version.should_skip(source_version, Some(update_stats)) { + return Ok(SkippedOr::Skipped(existing_version)); } - None => Default::default(), + info.memoization_info.map(|info| info.0).flatten() } - } else { - Default::default() + None => Default::default(), }; - if already_exists { - if output.is_some() { - stats.num_repreocesses.fetch_add(1, Relaxed); - } else { - stats.num_deletions.fetch_add(1, Relaxed); + let (output, stored_mem_info) = match source_value { + Some(source_value) => { + let evaluation_memory = EvaluationMemory::new( + process_timestamp, + memoization_info, + EvaluationMemoryOptions { + enable_cache: true, + evaluation_only: false, + }, + ); + let output = evaluate_source_entry( + plan, + source_op, + schema, + key, + source_value, + &evaluation_memory, + ) + .await?; + (Some(output), evaluation_memory.into_stored()?) } - } else if output.is_some() { - stats.num_insertions.fetch_add(1, Relaxed); - } else { - return Ok(()); - } + None => Default::default(), + }; // Phase 2 (precommit): Update with the memoization info and stage target keys. let precommit_output = precommit_source_tracking_info( source_op.source_id, &source_key_json, - source_ordinal.map(|o| o.into()), + source_version, + plan.logic_fingerprint, output.as_ref().map(|scope_value| PrecommitData { scope_value, memoization_info: &stored_mem_info, @@ -508,12 +548,13 @@ pub async fn update_source_row( &process_timestamp, &plan.tracking_table_setup, &plan.export_ops, + update_stats, pool, ) .await?; let precommit_output = match precommit_output { - UnchangedOr::Normal(output) => output, - UnchangedOr::Unchanged => return Ok(()), + SkippedOr::Normal(output) => output, + SkippedOr::Skipped(source_version) => return Ok(SkippedOr::Skipped(source_version)), }; // Phase 3: Apply changes to the target storage, including upserting new target records and removing existing ones. @@ -537,7 +578,7 @@ pub async fn update_source_row( commit_source_tracking_info( source_op.source_id, &source_key_json, - source_ordinal.map(|o| o.into()), + source_version, &plan.logic_fingerprint.0, precommit_output.metadata, &process_timestamp, @@ -546,21 +587,15 @@ pub async fn update_source_row( ) .await?; - Ok(()) -} - -pub(super) async fn update_source_row_with_err_handling( - plan: &ExecutionPlan, - source_op: &AnalyzedSourceOp, - schema: &schema::DataSchema, - key: &value::KeyValue, - only_for_deletion: bool, - pool: &PgPool, - stats: &stats::UpdateStats, -) { - let r = update_source_row(plan, source_op, schema, key, only_for_deletion, pool, stats).await; - if let Err(e) = r { - stats.num_errors.fetch_add(1, Relaxed); - error!("{:?}", e.context("Error in indexing a source row")); + if already_exists { + if output.is_some() { + update_stats.num_repreocesses.fetch_add(1, Relaxed); + } else { + update_stats.num_deletions.fetch_add(1, Relaxed); + } + } else if output.is_some() { + update_stats.num_insertions.fetch_add(1, Relaxed); } + + Ok(SkippedOr::Normal(())) } diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index 111b01a1..ece58575 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -1,73 +1,276 @@ +use std::collections::{hash_map, HashMap}; + use crate::prelude::*; -use super::{db_tracking, row_indexer, stats}; -use futures::future::{join, join_all, try_join_all}; +use super::{ + db_tracking, + row_indexer::{self, SkippedOr, SourceVersion}, + stats, +}; +use futures::future::try_join_all; use sqlx::PgPool; +use tokio::{sync::Semaphore, task::JoinSet}; +struct SourceRowState { + source_version: SourceVersion, + processing_sem: Arc, + touched_generation: usize, +} -async fn update_source( - source_name: &str, - plan: &plan::ExecutionPlan, - source_op: &plan::AnalyzedSourceOp, - schema: &schema::DataSchema, - pool: &PgPool, -) -> Result { - let existing_keys_json = db_tracking::list_source_tracking_keys( - source_op.source_id, - &plan.tracking_table_setup, - pool, - ) - .await?; - - let mut keys = Vec::new(); - let mut rows_stream = source_op - .executor - .list(interface::SourceExecutorListOptions { - include_ordinal: false, - }); - while let Some(rows) = rows_stream.next().await { - keys.extend(rows?.into_iter().map(|row| row.key)); +impl Default for SourceRowState { + fn default() -> Self { + Self { + source_version: SourceVersion::default(), + processing_sem: Arc::new(Semaphore::new(1)), + touched_generation: 0, + } } +} + +struct SourceState { + rows: HashMap, + scan_generation: usize, +} +pub struct SourceContext { + flow: Arc, + source_idx: usize, + state: Mutex, +} - let stats = stats::UpdateStats::default(); - let upsert_futs = join_all(keys.iter().map(|key| { - row_indexer::update_source_row_with_err_handling( - plan, source_op, schema, key, false, pool, &stats, - ) - })); - let deleted_keys = existing_keys_json - .into_iter() - .map(|existing_key_json| { - value::Value::::from_json( - existing_key_json.source_key, +impl SourceContext { + pub async fn load( + flow: Arc, + source_idx: usize, + pool: &PgPool, + ) -> Result { + let plan = flow.get_execution_plan().await?; + let source_op = &plan.source_ops[source_idx]; + let mut list_state = db_tracking::ListTrackedSourceKeyMetadataState::new(); + let mut rows = HashMap::new(); + let mut key_metadata_stream = + list_state.list(source_op.source_id, &plan.tracking_table_setup, pool); + let scan_generation = 0; + while let Some(key_metadata) = key_metadata_stream.next().await { + let key_metadata = key_metadata?; + let source_key = value::Value::::from_json( + key_metadata.source_key, &source_op.primary_key_type, )? - .as_key() + .into_key()?; + rows.insert( + source_key, + SourceRowState { + source_version: SourceVersion::from_stored( + key_metadata.processed_source_ordinal, + &key_metadata.process_logic_fingerprint, + plan.logic_fingerprint, + ), + processing_sem: Arc::new(Semaphore::new(1)), + touched_generation: scan_generation, + }, + ); + } + Ok(Self { + flow, + source_idx, + state: Mutex::new(SourceState { + rows, + scan_generation, + }), }) - .filter_ok(|existing_key| !keys.contains(existing_key)) - .collect::>>()?; - let delete_futs = join_all(deleted_keys.iter().map(|key| { - row_indexer::update_source_row_with_err_handling( - plan, source_op, schema, key, true, pool, &stats, - ) - })); - join(upsert_futs, delete_futs).await; + } - Ok(stats::SourceUpdateInfo { - source_name: source_name.to_string(), - stats, - }) + fn process_source_key( + self: Arc, + key: value::KeyValue, + source_version: SourceVersion, + update_stats: Arc, + processing_sem: Arc, + pool: PgPool, + join_set: &mut JoinSet>, + ) { + let fut = async move { + let permit = processing_sem.acquire().await?; + let plan = self.flow.get_execution_plan().await?; + let source_op = &plan.source_ops[self.source_idx]; + let source_value = if source_version.kind == row_indexer::SourceVersionKind::Deleted { + None + } else if let Some(d) = source_op.executor.get_value(&key).await? { + d.value.await? + } else { + // Even if the source version kind is not Deleted, the source value might be gone one polling. + // In this case, we still use the current source version even if it's already stale - actually this version skew + // also happens for update cases and there's no way to keep them always in sync for many sources. + // + // We only need source version <= actual version for value. + None + }; + let schema = &self.flow.data_schema; + let result = row_indexer::update_source_row( + &plan, + source_op, + schema, + &key, + source_value, + &source_version, + &pool, + &update_stats, + ) + .await?; + let target_source_version = if let SkippedOr::Skipped(existing_source_version) = result + { + Some(existing_source_version) + } else if source_version.kind == row_indexer::SourceVersionKind::Deleted { + Some(source_version) + } else { + None + }; + if let Some(target_source_version) = target_source_version { + let mut state = self.state.lock().unwrap(); + let scan_generation = state.scan_generation; + let entry = state.rows.entry(key.clone()); + match entry { + hash_map::Entry::Occupied(mut entry) => { + if !entry + .get() + .source_version + .should_skip(&target_source_version, None) + { + if target_source_version.kind == row_indexer::SourceVersionKind::Deleted + { + entry.remove(); + } else { + let mut_entry = entry.get_mut(); + mut_entry.source_version = target_source_version; + mut_entry.touched_generation = scan_generation; + } + } + } + hash_map::Entry::Vacant(entry) => { + entry.insert(SourceRowState { + source_version: target_source_version, + touched_generation: scan_generation, + ..Default::default() + }); + } + } + } + drop(permit); + anyhow::Ok(()) + }; + join_set.spawn(fut); + } + + fn process_source_key_if_newer( + self: &Arc, + key: value::KeyValue, + source_version: SourceVersion, + update_stats: &Arc, + pool: &PgPool, + join_set: &mut JoinSet>, + ) { + let processing_sem = { + let mut state = self.state.lock().unwrap(); + let scan_generation = state.scan_generation; + let row_state = state.rows.entry(key.clone()).or_default(); + row_state.touched_generation = scan_generation; + if row_state + .source_version + .should_skip(&source_version, Some(&update_stats)) + { + return; + } + row_state.source_version = source_version.clone(); + row_state.processing_sem.clone() + }; + self.clone().process_source_key( + key, + source_version, + update_stats.clone(), + processing_sem, + pool.clone(), + join_set, + ); + } + + async fn update_source(self: &Arc, pool: &PgPool) -> Result { + let plan = self.flow.get_execution_plan().await?; + let source_op = &plan.source_ops[self.source_idx]; + let mut rows_stream = source_op + .executor + .list(interface::SourceExecutorListOptions { + include_ordinal: true, + }); + let mut join_set = JoinSet::new(); + let scan_generation = { + let mut state = self.state.lock().unwrap(); + state.scan_generation += 1; + state.scan_generation + }; + let update_stats = Arc::new(stats::UpdateStats::default()); + while let Some(row) = rows_stream.next().await { + for row in row? { + self.process_source_key_if_newer( + row.key, + SourceVersion::from_current(row.ordinal), + &update_stats, + pool, + &mut join_set, + ); + } + } + while let Some(result) = join_set.join_next().await { + if let Err(e) = (|| anyhow::Ok(result??))() { + error!("{:?}", e.context("Error in indexing a source row")); + } + } + + let deleted_key_versions = { + let mut deleted_key_versions = Vec::new(); + let mut state = self.state.lock().unwrap(); + for (key, row_state) in state.rows.iter_mut() { + if row_state.touched_generation < scan_generation { + deleted_key_versions.push(( + key.clone(), + row_state.source_version.for_deletion(), + row_state.processing_sem.clone(), + )); + } + } + deleted_key_versions + }; + for (key, source_version, processing_sem) in deleted_key_versions { + self.clone().process_source_key( + key, + source_version, + update_stats.clone(), + processing_sem, + pool.clone(), + &mut join_set, + ); + } + while let Some(result) = join_set.join_next().await { + if let Err(e) = (|| anyhow::Ok(result??))() { + error!("{:?}", e.context("Error in deleting a source row")); + } + } + + Ok(stats::SourceUpdateInfo { + source_name: source_op.name.clone(), + stats: Arc::unwrap_or_clone(update_stats), + }) + } } pub async fn update( - plan: &plan::ExecutionPlan, - schema: &schema::DataSchema, + flow: &Arc, pool: &PgPool, ) -> Result { + let plan = flow.get_execution_plan().await?; let source_update_stats = try_join_all( - plan.source_ops - .iter() - .map(|source_op| async move { - update_source(source_op.name.as_str(), plan, source_op, schema, pool).await + (0..plan.source_ops.len()) + .map(|idx| async move { + let source_context = Arc::new(SourceContext::load(flow.clone(), idx, pool).await?); + source_context.update_source(pool).await }) .collect::>(), ) diff --git a/src/execution/stats.rs b/src/execution/stats.rs index eb978b2b..8db36aed 100644 --- a/src/execution/stats.rs +++ b/src/execution/stats.rs @@ -11,6 +11,18 @@ pub struct UpdateStats { pub num_errors: AtomicUsize, } +impl Clone for UpdateStats { + fn clone(&self) -> Self { + Self { + num_skipped: self.num_skipped.load(Relaxed).into(), + num_insertions: self.num_insertions.load(Relaxed).into(), + num_deletions: self.num_deletions.load(Relaxed).into(), + num_repreocesses: self.num_repreocesses.load(Relaxed).into(), + num_errors: self.num_errors.load(Relaxed).into(), + } + } +} + impl std::fmt::Display for UpdateStats { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let num_skipped = self.num_skipped.load(Relaxed); diff --git a/src/ops/sdk.rs b/src/ops/sdk.rs index 0938b0f5..7d1a1e4a 100644 --- a/src/ops/sdk.rs +++ b/src/ops/sdk.rs @@ -1,4 +1,4 @@ -pub use crate::prelude::*; +pub(crate) use crate::prelude::*; use crate::builder::plan::AnalyzedFieldReference; use crate::builder::plan::AnalyzedLocalFieldReference; diff --git a/src/prelude.rs b/src/prelude.rs index 60984ee1..9efd0362 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,13 +1,18 @@ -pub use anyhow::Result; -pub use async_trait::async_trait; -pub use futures::{future::BoxFuture, prelude::*, stream::BoxStream}; -pub use futures::{FutureExt, StreamExt}; -pub use itertools::Itertools; -pub use serde::{Deserialize, Serialize}; -pub use std::sync::Arc; - -pub use crate::base::{schema, spec, value}; -pub use crate::builder::plan; -pub use crate::ops::interface; -pub use crate::service::error::ApiError; -pub use crate::{api_bail, api_error}; +#![allow(unused_imports)] + +pub(crate) use anyhow::Result; +pub(crate) use async_trait::async_trait; +pub(crate) use futures::{future::BoxFuture, prelude::*, stream::BoxStream}; +pub(crate) use futures::{FutureExt, StreamExt}; +pub(crate) use itertools::Itertools; +pub(crate) use serde::{Deserialize, Serialize}; +pub(crate) use std::sync::{Arc, Mutex}; + +pub(crate) use crate::base::{schema, spec, value}; +pub(crate) use crate::builder::{self, plan}; +pub(crate) use crate::ops::interface; +pub(crate) use crate::service::error::ApiError; + +pub(crate) use crate::{api_bail, api_error}; + +pub(crate) use log::{debug, error, info, trace, warn}; diff --git a/src/py/mod.rs b/src/py/mod.rs index 61b5836c..597c2cbd 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -114,13 +114,7 @@ impl Flow { let update_info = lib_context .runtime .block_on(async { - let exec_plan = self.0.get_execution_plan().await?; - execution::source_indexer::update( - &exec_plan, - &self.0.data_schema, - &lib_context.pool, - ) - .await + execution::source_indexer::update(&self.0, &lib_context.pool).await }) .into_py_result()?; Ok(IndexUpdateInfo(update_info)) diff --git a/src/service/flows.rs b/src/service/flows.rs index 47689493..c324db57 100644 --- a/src/service/flows.rs +++ b/src/service/flows.rs @@ -170,8 +170,6 @@ pub async fn update( State(lib_context): State>, ) -> Result, ApiError> { let fl = &lib_context.with_flow_context(&flow_name, |ctx| ctx.flow.clone())?; - let execution_plan = fl.get_execution_plan().await?; - let update_info = - source_indexer::update(&execution_plan, &fl.data_schema, &lib_context.pool).await?; + let update_info = source_indexer::update(&fl, &lib_context.pool).await?; Ok(Json(update_info)) }