From f2406aa25d17473894e612162c3d01e83754ca15 Mon Sep 17 00:00:00 2001 From: LJ Date: Fri, 21 Mar 2025 17:01:17 -0700 Subject: [PATCH] Make indexing update stats more consistent with what really happened --- src/execution/indexer.rs | 126 ++++++++++++++++++++++++--------------- 1 file changed, 77 insertions(+), 49 deletions(-) diff --git a/src/execution/indexer.rs b/src/execution/indexer.rs index 199a1c21..cf07ace2 100644 --- a/src/execution/indexer.rs +++ b/src/execution/indexer.rs @@ -1,9 +1,11 @@ use anyhow::Result; -use futures::future::{join_all, try_join, try_join_all}; +use futures::future::{join, join_all, try_join, try_join_all}; +use itertools::Itertools; use log::error; use serde::Serialize; use sqlx::PgPool; use std::collections::{HashMap, HashSet}; +use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; use super::db_tracking::{self, read_source_tracking_info, TrackedTargetKey}; use super::db_tracking_setup; @@ -20,23 +22,27 @@ use super::evaluator::{evaluate_source_entry, ScopeValueBuilder}; #[derive(Debug, Serialize, Default)] pub struct UpdateStats { - pub num_insertions: usize, - pub num_deletions: usize, - pub num_already_exists: usize, - pub num_errors: usize, + pub num_insertions: AtomicUsize, + pub num_deletions: AtomicUsize, + pub num_already_exists: AtomicUsize, + pub num_errors: AtomicUsize, } impl std::fmt::Display for UpdateStats { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let num_source_rows = self.num_insertions + self.num_deletions + self.num_already_exists; + let num_source_rows = self.num_insertions.load(Relaxed) + + self.num_deletions.load(Relaxed) + + self.num_already_exists.load(Relaxed); write!(f, "{num_source_rows} source rows processed",)?; - if self.num_errors > 0 { - write!(f, " with {} ERRORS", self.num_errors)?; + if self.num_errors.load(Relaxed) > 0 { + write!(f, " with {} ERRORS", self.num_errors.load(Relaxed))?; } write!( f, ": {} added, {} removed, {} already exists", - self.num_insertions, self.num_deletions, self.num_already_exists + self.num_insertions.load(Relaxed), + self.num_deletions.load(Relaxed), + self.num_already_exists.load(Relaxed) )?; Ok(()) } @@ -442,7 +448,9 @@ pub async fn update_source_entry( source_op_idx: usize, schema: &schema::DataSchema, key: &value::KeyValue, + only_for_deletion: bool, pool: &PgPool, + stats: &UpdateStats, ) -> Result<()> { let source_id = plan.source_ops[source_op_idx].source_id; let source_key_json = serde_json::to_value(key)?; @@ -464,11 +472,22 @@ pub async fn update_source_entry( .flatten(); let evaluation_cache = EvaluationCache::new(process_timestamp, memoization_info.map(|info| info.cache)); - let value_builder = - evaluate_source_entry(plan, source_op_idx, schema, key, Some(&evaluation_cache)).await?; + let value_builder = if !only_for_deletion { + evaluate_source_entry(plan, source_op_idx, schema, key, Some(&evaluation_cache)).await? + } else { + None + }; + let exists = value_builder.is_some(); - // Didn't exist and still doesn't exist. No need to apply any changes. - if !already_exists && value_builder.is_none() { + if already_exists { + if exists { + stats.num_already_exists.fetch_add(1, Relaxed); + } else { + stats.num_deletions.fetch_add(1, Relaxed); + } + } else if exists { + stats.num_insertions.fetch_add(1, Relaxed); + } else { return Ok(()); } @@ -548,6 +567,31 @@ pub async fn update_source_entry( Ok(()) } +async fn update_source_entry_with_err_handling( + plan: &ExecutionPlan, + source_op_idx: usize, + schema: &schema::DataSchema, + key: &value::KeyValue, + only_for_deletion: bool, + pool: &PgPool, + stats: &UpdateStats, +) { + let r = update_source_entry( + plan, + source_op_idx, + schema, + key, + only_for_deletion, + pool, + stats, + ) + .await; + if let Err(e) = r { + stats.num_errors.fetch_add(1, Relaxed); + error!("{:?}", e.context("Error in indexing a source row")); + } +} + async fn update_source( source_name: &str, plan: &ExecutionPlan, @@ -566,45 +610,29 @@ async fn update_source( ) .await?; - let num_new_keys = keys.len(); - let mut num_updates = 0; - let mut num_deletions = 0; - - let mut all_keys_set = keys.into_iter().collect::>(); - for existing_key_json in existing_keys_json.into_iter() { - let existing_key = value::Value::::from_json( - existing_key_json.source_key, - &source_op.primary_key_type, - )?; - let inserted = all_keys_set.insert(existing_key.as_key()?); - if inserted { - num_deletions += 1; - } else { - num_updates += 1; - } - } - - let num_errors = join_all(all_keys_set.into_iter().map(|key| async move { - let result = update_source_entry(plan, source_op_idx, schema, &key, pool).await; - if let Err(e) = result { - error!("{:?}", e.context("Error in indexing a source row")); - 1 - } else { - 0 - } - })) - .await - .iter() - .sum(); + let stats = UpdateStats::default(); + let upsert_futs = join_all(keys.iter().map(|key| { + update_source_entry_with_err_handling(plan, source_op_idx, schema, key, false, pool, &stats) + })); + let deleted_keys = existing_keys_json + .into_iter() + .map(|existing_key_json| { + value::Value::::from_json( + existing_key_json.source_key, + &source_op.primary_key_type, + )? + .as_key() + }) + .filter_ok(|existing_key| !keys.contains(existing_key)) + .collect::>>()?; + let delete_futs = join_all(deleted_keys.iter().map(|key| { + update_source_entry_with_err_handling(plan, source_op_idx, schema, key, true, pool, &stats) + })); + join(upsert_futs, delete_futs).await; Ok(SourceUpdateInfo { source_name: source_name.to_string(), - stats: UpdateStats { - num_insertions: num_new_keys - num_updates, - num_deletions, - num_already_exists: num_updates, - num_errors, - }, + stats, }) }