diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index 703bb6ff..177b6eb6 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -1094,63 +1094,67 @@ pub fn analyze_flow( // TODO: Fill it with a meaningful value. targets: IndexMap::new(), }; - let plan_fut = { - let analyzer_ctx = AnalyzerContext { registry, flow_ctx }; - let mut root_exec_scope = ExecutionScope { - name: ROOT_SCOPE_NAME, - data: &mut root_data_scope, - }; - let source_ops_futs = flow_inst - .source_ops - .iter() - .map(|source_op| { - let existing_source_states = source_states_by_name.get(source_op.name.as_str()); - analyzer_ctx.analyze_source_op( - root_exec_scope.data, - source_op.clone(), - Some(&mut setup_state.metadata), - existing_source_states, - ) - }) - .collect::>>()?; - let op_scope_fut = analyzer_ctx.analyze_op_scope( - &mut root_exec_scope, - &flow_inst.reactive_ops, - RefList::Nil, - )?; - let export_ops_futs = flow_inst - .export_ops - .iter() - .map(|export_op| { - analyzer_ctx.analyze_export_op( - root_exec_scope.data, - export_op.clone(), - Some(&mut setup_state), - &target_states_by_name_type, - ) - }) - .collect::>>()?; - let tracking_table_setup = setup_state.tracking_table.clone(); - async move { - let (source_ops, op_scope, export_ops) = try_join3( - try_join_all(source_ops_futs), - op_scope_fut, - try_join_all(export_ops_futs), + let analyzer_ctx = AnalyzerContext { registry, flow_ctx }; + let mut root_exec_scope = ExecutionScope { + name: ROOT_SCOPE_NAME, + data: &mut root_data_scope, + }; + let source_ops_futs = flow_inst + .source_ops + .iter() + .map(|source_op| { + let existing_source_states = source_states_by_name.get(source_op.name.as_str()); + analyzer_ctx.analyze_source_op( + root_exec_scope.data, + source_op.clone(), + Some(&mut setup_state.metadata), + existing_source_states, ) - .await?; - - Ok(ExecutionPlan { - tracking_table_setup, - logic_fingerprint: vec![0; 8], // TODO: Fill it with a meaningful value automatically - source_ops, - op_scope, - export_ops, - }) - } + }) + .collect::>>()?; + let op_scope_fut = analyzer_ctx.analyze_op_scope( + &mut root_exec_scope, + &flow_inst.reactive_ops, + RefList::Nil, + )?; + let export_ops_futs = flow_inst + .export_ops + .iter() + .map(|export_op| { + analyzer_ctx.analyze_export_op( + root_exec_scope.data, + export_op.clone(), + Some(&mut setup_state), + &target_states_by_name_type, + ) + }) + .collect::>>()?; + + let tracking_table_setup = setup_state.tracking_table.clone(); + let data_schema = root_data_scope.into_data_schema()?; + let logic_fingerprint = Fingerprinter::default() + .with(&flow_inst)? + .with(&data_schema)? + .into_fingerprint(); + let plan_fut = async move { + let (source_ops, op_scope, export_ops) = try_join3( + try_join_all(source_ops_futs), + op_scope_fut, + try_join_all(export_ops_futs), + ) + .await?; + + Ok(ExecutionPlan { + tracking_table_setup, + logic_fingerprint, + source_ops, + op_scope, + export_ops, + }) }; - Ok((root_data_scope.into_data_schema()?, plan_fut, setup_state)) + Ok((data_schema, plan_fut, setup_state)) } pub fn analyze_transient_flow<'a>( diff --git a/src/builder/plan.rs b/src/builder/plan.rs index 43306077..30466027 100644 --- a/src/builder/plan.rs +++ b/src/builder/plan.rs @@ -6,7 +6,7 @@ use crate::base::schema::ValueType; use crate::base::value; use crate::execution::db_tracking_setup; use crate::ops::interface::*; -use crate::utils::fingerprint::Fingerprinter; +use crate::utils::fingerprint::{Fingerprint, Fingerprinter}; #[derive(Debug, Clone, PartialEq, Eq, Serialize)] pub struct AnalyzedLocalFieldReference { @@ -126,7 +126,7 @@ pub struct AnalyzedOpScope { pub struct ExecutionPlan { pub tracking_table_setup: db_tracking_setup::TrackingTableSetupState, - pub logic_fingerprint: Vec, + pub logic_fingerprint: Fingerprint, pub source_ops: Vec, pub op_scope: AnalyzedOpScope, diff --git a/src/execution/indexer.rs b/src/execution/indexer.rs index 453ebbd4..e52c599e 100644 --- a/src/execution/indexer.rs +++ b/src/execution/indexer.rs @@ -22,28 +22,39 @@ use super::evaluator::{evaluate_source_entry, ScopeValueBuilder}; #[derive(Debug, Serialize, Default)] pub struct UpdateStats { + pub num_skipped: AtomicUsize, pub num_insertions: AtomicUsize, pub num_deletions: AtomicUsize, - pub num_already_exists: AtomicUsize, + pub num_repreocesses: AtomicUsize, pub num_errors: AtomicUsize, } impl std::fmt::Display for UpdateStats { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let num_source_rows = self.num_insertions.load(Relaxed) - + self.num_deletions.load(Relaxed) - + self.num_already_exists.load(Relaxed); - write!(f, "{num_source_rows} source rows processed",)?; - if self.num_errors.load(Relaxed) > 0 { - write!(f, " with {} ERRORS", self.num_errors.load(Relaxed))?; + let num_skipped = self.num_skipped.load(Relaxed); + if num_skipped > 0 { + write!(f, "{} rows skipped", num_skipped)?; + } + + let num_insertions = self.num_insertions.load(Relaxed); + let num_deletions = self.num_deletions.load(Relaxed); + let num_reprocesses = self.num_repreocesses.load(Relaxed); + let num_source_rows = num_insertions + num_deletions + num_reprocesses; + if num_source_rows > 0 { + if num_skipped > 0 { + write!(f, ", ")?; + } + write!(f, "{num_source_rows} source rows processed",)?; + + let num_errors = self.num_errors.load(Relaxed); + if num_errors > 0 { + write!(f, " with {num_errors} ERRORS",)?; + } + write!( + f, + ": {num_insertions} added, {num_deletions} removed, {num_reprocesses} repocessed", + )?; } - write!( - f, - ": {} added, {} removed, {} already exists", - self.num_insertions.load(Relaxed), - self.num_deletions.load(Relaxed), - self.num_already_exists.load(Relaxed) - )?; Ok(()) } } @@ -495,8 +506,11 @@ pub async fn update_source_entry( (Some(source_ordinal), Some(existing_source_ordinal)) => { if source_ordinal < existing_source_ordinal || (source_ordinal == existing_source_ordinal - && existing_logic_fingerprint == source_op.) + && existing_logic_fingerprint.as_ref().map(|v| v.as_slice()) + == Some(plan.logic_fingerprint.0.as_slice())) { + // TODO: We should detect based on finer grain fingerprint. + stats.num_skipped.fetch_add(1, Relaxed); return Ok(()); } } @@ -538,7 +552,7 @@ pub async fn update_source_entry( }; if already_exists { if output.is_some() { - stats.num_already_exists.fetch_add(1, Relaxed); + stats.num_repreocesses.fetch_add(1, Relaxed); } else { stats.num_deletions.fetch_add(1, Relaxed); } @@ -590,7 +604,7 @@ pub async fn update_source_entry( source_op.source_id, &source_key_json, source_ordinal.map(|o| o.into()), - &plan.logic_fingerprint, + &plan.logic_fingerprint.0, precommit_output.metadata, &process_timestamp, &plan.tracking_table_setup,