From b59d6f7e7a6005de301c2484aab9e3246ddf4acc Mon Sep 17 00:00:00 2001 From: Jiangzhou He Date: Sat, 13 Sep 2025 14:29:17 -0700 Subject: [PATCH] feat(rows-to-retry): optionally maintain a list of row keys to retry --- src/execution/db_tracking.rs | 6 +++++- src/execution/source_indexer.rs | 20 ++++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/src/execution/db_tracking.rs b/src/execution/db_tracking.rs index 55623e5f..a433aeb4 100644 --- a/src/execution/db_tracking.rs +++ b/src/execution/db_tracking.rs @@ -294,6 +294,8 @@ pub struct TrackedSourceKeyMetadata { pub processed_source_ordinal: Option, pub processed_source_fp: Option>, pub process_logic_fingerprint: Option>, + pub max_process_ordinal: Option, + pub process_ordinal: Option, } pub struct ListTrackedSourceKeyMetadataState { @@ -314,7 +316,9 @@ impl ListTrackedSourceKeyMetadataState { pool: &'a PgPool, ) -> impl Stream> + 'a { self.query_str = format!( - "SELECT source_key, processed_source_ordinal, {}, process_logic_fingerprint FROM {} WHERE source_id = $1", + "SELECT \ + source_key, processed_source_ordinal, {}, process_logic_fingerprint, max_process_ordinal, process_ordinal \ + FROM {} WHERE source_id = $1", if db_setup.has_fast_fingerprint_column { "processed_source_fp" } else { diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index 92f23da9..0c47de1c 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -48,6 +48,12 @@ impl Default for SourceRowIndexingState { struct SourceIndexingState { rows: HashMap, scan_generation: usize, + + // Set of rows to retry. + // It's for sources that we don't proactively scan all input rows during refresh. + // We need to maintain a list of row keys failed in last processing, to retry them later. + // It's `None` if we don't need this mechanism for failure retry. + rows_to_retry: Option>, } pub struct SourceIndexingContext { @@ -113,6 +119,10 @@ impl<'a> LocalSourceRowStateOperator<'a> { let mut state = self.indexing_state.lock().unwrap(); let touched_generation = state.scan_generation; + if let Some(rows_to_retry) = &mut state.rows_to_retry { + rows_to_retry.remove(self.key); + } + if self.last_source_version == Some(source_version) { return Ok(RowStateAdvanceOutcome::Noop); } @@ -211,6 +221,9 @@ impl<'a> LocalSourceRowStateOperator<'a> { } else { indexing_state.rows.remove(self.key); } + if let Some(rows_to_retry) = &mut indexing_state.rows_to_retry { + rows_to_retry.insert(self.key.clone()); + } } } @@ -244,6 +257,7 @@ impl SourceIndexingContext { let import_op = &plan.import_ops[source_idx]; let mut list_state = db_tracking::ListTrackedSourceKeyMetadataState::new(); let mut rows = HashMap::new(); + let mut rows_to_retry: Option> = None; let scan_generation = 0; { let mut key_metadata_stream = list_state.list( @@ -257,6 +271,11 @@ impl SourceIndexingContext { key_metadata.source_key, &import_op.primary_key_schema, )?; + if let Some(rows_to_retry) = &mut rows_to_retry { + if key_metadata.max_process_ordinal > key_metadata.process_ordinal { + rows_to_retry.insert(source_pk.clone()); + } + } rows.insert( source_pk, SourceRowIndexingState { @@ -280,6 +299,7 @@ impl SourceIndexingContext { state: Mutex::new(SourceIndexingState { rows, scan_generation, + rows_to_retry, }), pending_update: Mutex::new(None), update_sem: Semaphore::new(1),