diff --git a/src/execution/indexer.rs b/src/execution/indexer.rs index cf07ace2..90746a55 100644 --- a/src/execution/indexer.rs +++ b/src/execution/indexer.rs @@ -116,6 +116,7 @@ struct TrackingInfoForTarget<'a> { mutation: ExportTargetMutation, } +#[derive(Debug)] struct PrecommitData<'a> { scope_value: &'a ScopeValueBuilder, memoization_info: &'a MemoizationInfo, @@ -174,10 +175,12 @@ async fn precommit_source_tracking_info( .or_default() .export_op = Some(export_op); } + + // Collect `tracking_info_for_targets` from existing tracking info. if let Some(info) = tracking_info { let sqlx::types::Json(staging_target_keys) = info.staging_target_keys; for (target_id, keys_info) in staging_target_keys.into_iter() { - let mut target_info = TrackingInfoForTarget::default(); + let target_info = tracking_info_for_targets.entry(target_id).or_default(); for key_info in keys_info.into_iter() { target_info .existing_staging_keys_info @@ -185,7 +188,6 @@ async fn precommit_source_tracking_info( .or_default() .push((key_info.1, key_info.2)); } - tracking_info_for_targets.insert(target_id, target_info); } if let Some(sqlx::types::Json(target_keys)) = info.target_keys { @@ -378,7 +380,6 @@ async fn commit_source_tracking_info( .collect::>() }) .unwrap_or_default(); - if !precommit_metadata.source_entry_exists && cleaned_staging_target_keys.is_empty() { // TODO: When we support distributed execution in the future, we'll need to leave a tombstone for a while // to prevent an earlier update causing the record reappear because of out-of-order processing. @@ -532,21 +533,17 @@ pub async fn update_source_entry( // Phase 3: Apply changes to the target storage, including upserting new target records and removing existing ones. let mut target_mutations = precommit_output.target_mutations; - let apply_futs = plan - .export_ops - .iter() - .filter_map(|export_op| { - target_mutations - .remove(&export_op.target_id) - .and_then(|mutation| { - if !mutation.is_empty() { - Some(export_op.executor.apply_mutation(mutation)) - } else { - None - } - }) - }) - .collect::>(); + let apply_futs = plan.export_ops.iter().filter_map(|export_op| { + target_mutations + .remove(&export_op.target_id) + .and_then(|mutation| { + if !mutation.is_empty() { + Some(export_op.executor.apply_mutation(mutation)) + } else { + None + } + }) + }); // TODO: Handle errors. try_join_all(apply_futs).await?;