diff --git a/src/execution/db_tracking.rs b/src/execution/db_tracking.rs index a433aeb4..9def9fb0 100644 --- a/src/execution/db_tracking.rs +++ b/src/execution/db_tracking.rs @@ -184,6 +184,30 @@ pub async fn precommit_source_tracking_info( Ok(()) } +pub async fn touch_max_process_ordinal( + source_id: i32, + source_key_json: &serde_json::Value, + process_ordinal: i64, + db_setup: &TrackingTableSetupState, + db_executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>, +) -> Result<()> { + let query_str = format!( + "INSERT INTO {} AS t (source_id, source_key, max_process_ordinal, staging_target_keys) \ + VALUES ($1, $2, $3, $4) \ + ON CONFLICT (source_id, source_key) DO UPDATE SET \ + max_process_ordinal = GREATEST(t.max_process_ordinal + 1, EXCLUDED.max_process_ordinal)", + db_setup.table_name, + ); + sqlx::query(&query_str) + .bind(source_id) + .bind(source_key_json) + .bind(process_ordinal) + .bind(sqlx::types::Json(TrackedTargetKeyForSource::default())) + .execute(db_executor) + .await?; + Ok(()) +} + #[derive(sqlx::FromRow, Debug)] pub struct SourceTrackingInfoForCommit { pub staging_target_keys: sqlx::types::Json, diff --git a/src/execution/row_indexer.rs b/src/execution/row_indexer.rs index 0e991dca..a1499ba3 100644 --- a/src/execution/row_indexer.rs +++ b/src/execution/row_indexer.rs @@ -199,12 +199,13 @@ impl<'a> RowIndexer<'a> { src_eval_ctx: &'a SourceRowEvaluationContext<'_>, setup_execution_ctx: &'a exec_ctx::FlowSetupExecutionContext, mode: super::source_indexer::UpdateMode, + process_time: chrono::DateTime, update_stats: &'a stats::UpdateStats, pool: &'a PgPool, ) -> Result { Ok(Self { source_id: setup_execution_ctx.import_ops[src_eval_ctx.import_op_idx].source_id, - process_time: chrono::Utc::now(), + process_time, source_key_json: serde_json::to_value(src_eval_ctx.key)?, src_eval_ctx, @@ -216,10 +217,11 @@ impl<'a> RowIndexer<'a> { } pub async fn update_source_row( - &mut self, + &self, source_version: &SourceVersion, source_value: interface::SourceValue, source_version_fp: Option>, + ordinal_touched: &mut bool, ) -> Result> { let tracking_setup_state = &self.setup_execution_ctx.setup_state.tracking_table; // Phase 1: Check existing tracking info and apply optimizations @@ -335,6 +337,7 @@ impl<'a> RowIndexer<'a> { }), ) .await?; + *ordinal_touched = true; let precommit_output = match precommit_output { SkippedOr::Normal(output) => output, SkippedOr::Skipped(v, fp) => return Ok(SkippedOr::Skipped(v, fp)), @@ -398,7 +401,7 @@ impl<'a> RowIndexer<'a> { } pub async fn try_collapse( - &mut self, + &self, source_version: &SourceVersion, content_version_fp: &[u8], existing_version: &SourceVersion, @@ -538,7 +541,7 @@ impl<'a> RowIndexer<'a> { .map(|info| info.max_process_ordinal) .unwrap_or(0) + 1) - .max(self.process_time.timestamp_millis()); + .max(Self::process_ordinal_from_time(self.process_time)); let existing_process_ordinal = tracking_info.as_ref().and_then(|info| info.process_ordinal); let mut tracking_info_for_targets = HashMap::::new(); @@ -820,6 +823,10 @@ impl<'a> RowIndexer<'a> { Ok(()) } + + pub fn process_ordinal_from_time(process_time: chrono::DateTime) -> i64 { + process_time.timestamp_millis() + } } pub async fn evaluate_source_entry_with_memory( diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index 0c47de1c..024a4c7c 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -63,6 +63,7 @@ pub struct SourceIndexingContext { update_sem: Semaphore, state: Mutex, setup_execution_ctx: Arc, + needs_to_track_rows_to_retry: bool, } pub const NO_ACK: Option Ready>> = None; @@ -296,6 +297,7 @@ impl SourceIndexingContext { Ok(Self { flow, source_idx, + needs_to_track_rows_to_retry: rows_to_retry.is_some(), state: Mutex::new(SourceIndexingState { rows, scan_generation, @@ -331,10 +333,12 @@ impl SourceIndexingContext { key: &row_input.key, import_op_idx: self.source_idx, }; - let mut row_indexer = row_indexer::RowIndexer::new( + let process_time = chrono::Utc::now(); + let row_indexer = row_indexer::RowIndexer::new( &eval_ctx, &self.setup_execution_ctx, mode, + process_time, &update_stats, &pool, )?; @@ -342,6 +346,7 @@ impl SourceIndexingContext { let source_data = row_input.data; let mut row_state_operator = LocalSourceRowStateOperator::new(&row_input.key, &self.state, &update_stats); + let mut ordinal_touched = false; let result = { let row_state_operator = &mut row_state_operator; let row_key = &row_input.key; @@ -434,7 +439,12 @@ impl SourceIndexingContext { } let result = row_indexer - .update_source_row(&source_version, value, content_version_fp.clone()) + .update_source_row( + &source_version, + value, + content_version_fp.clone(), + &mut ordinal_touched, + ) .await?; if let SkippedOr::Skipped(version, fp) = result { row_state_operator @@ -449,6 +459,17 @@ impl SourceIndexingContext { row_state_operator.commit(); } else { row_state_operator.rollback(); + if !ordinal_touched && self.needs_to_track_rows_to_retry { + let source_key_json = serde_json::to_value(&row_input.key)?; + db_tracking::touch_max_process_ordinal( + self.setup_execution_ctx.import_ops[self.source_idx].source_id, + &source_key_json, + row_indexer::RowIndexer::process_ordinal_from_time(process_time), + &self.setup_execution_ctx.setup_state.tracking_table, + &pool, + ) + .await?; + } } result };