Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/execution/db_tracking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ pub struct TrackedSourceKeyMetadata {
pub processed_source_ordinal: Option<i64>,
pub processed_source_fp: Option<Vec<u8>>,
pub process_logic_fingerprint: Option<Vec<u8>>,
pub max_process_ordinal: Option<i64>,
pub process_ordinal: Option<i64>,
}

pub struct ListTrackedSourceKeyMetadataState {
Expand All @@ -314,7 +316,9 @@ impl ListTrackedSourceKeyMetadataState {
pool: &'a PgPool,
) -> impl Stream<Item = Result<TrackedSourceKeyMetadata, sqlx::Error>> + 'a {
self.query_str = format!(
"SELECT source_key, processed_source_ordinal, {}, process_logic_fingerprint FROM {} WHERE source_id = $1",
"SELECT \
source_key, processed_source_ordinal, {}, process_logic_fingerprint, max_process_ordinal, process_ordinal \
FROM {} WHERE source_id = $1",
if db_setup.has_fast_fingerprint_column {
"processed_source_fp"
} else {
Expand Down
20 changes: 20 additions & 0 deletions src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ impl Default for SourceRowIndexingState {
struct SourceIndexingState {
rows: HashMap<value::KeyValue, SourceRowIndexingState>,
scan_generation: usize,

// Set of rows to retry.
// It's for sources that we don't proactively scan all input rows during refresh.
// We need to maintain a list of row keys failed in last processing, to retry them later.
// It's `None` if we don't need this mechanism for failure retry.
rows_to_retry: Option<HashSet<value::KeyValue>>,
}

pub struct SourceIndexingContext {
Expand Down Expand Up @@ -113,6 +119,10 @@ impl<'a> LocalSourceRowStateOperator<'a> {
let mut state = self.indexing_state.lock().unwrap();
let touched_generation = state.scan_generation;

if let Some(rows_to_retry) = &mut state.rows_to_retry {
rows_to_retry.remove(self.key);
}

if self.last_source_version == Some(source_version) {
return Ok(RowStateAdvanceOutcome::Noop);
}
Expand Down Expand Up @@ -211,6 +221,9 @@ impl<'a> LocalSourceRowStateOperator<'a> {
} else {
indexing_state.rows.remove(self.key);
}
if let Some(rows_to_retry) = &mut indexing_state.rows_to_retry {
rows_to_retry.insert(self.key.clone());
}
}
}

Expand Down Expand Up @@ -244,6 +257,7 @@ impl SourceIndexingContext {
let import_op = &plan.import_ops[source_idx];
let mut list_state = db_tracking::ListTrackedSourceKeyMetadataState::new();
let mut rows = HashMap::new();
let mut rows_to_retry: Option<HashSet<value::KeyValue>> = None;
let scan_generation = 0;
{
let mut key_metadata_stream = list_state.list(
Expand All @@ -257,6 +271,11 @@ impl SourceIndexingContext {
key_metadata.source_key,
&import_op.primary_key_schema,
)?;
if let Some(rows_to_retry) = &mut rows_to_retry {
if key_metadata.max_process_ordinal > key_metadata.process_ordinal {
rows_to_retry.insert(source_pk.clone());
}
}
rows.insert(
source_pk,
SourceRowIndexingState {
Expand All @@ -280,6 +299,7 @@ impl SourceIndexingContext {
state: Mutex::new(SourceIndexingState {
rows,
scan_generation,
rows_to_retry,
}),
pending_update: Mutex::new(None),
update_sem: Semaphore::new(1),
Expand Down
Loading