diff --git a/src/execution/db_tracking.rs b/src/execution/db_tracking.rs index 00d14cc6..a7165203 100644 --- a/src/execution/db_tracking.rs +++ b/src/execution/db_tracking.rs @@ -9,26 +9,21 @@ pub type TrackedTargetKey = (serde_json::Value, i64, Option); pub type TrackedTargetKeyForSource = Vec<(i32, Vec)>; #[derive(sqlx::FromRow, Debug)] -pub struct SourceTrackingInfo { - pub max_process_ordinal: i64, - pub staging_target_keys: sqlx::types::Json, +pub struct SourceTrackingInfoForProcessing { pub memoization_info: Option>>, pub processed_source_ordinal: Option, pub process_logic_fingerprint: Option>, - pub process_ordinal: Option, - pub process_time_micros: Option, - pub target_keys: Option>, } -pub async fn read_source_tracking_info( +pub async fn read_source_tracking_info_for_processing( source_id: i32, source_key_json: &serde_json::Value, db_setup: &TrackingTableSetupState, pool: &PgPool, -) -> Result> { +) -> Result> { let query_str = format!( - "SELECT max_process_ordinal, staging_target_keys, memoization_info, processed_source_ordinal, process_logic_fingerprint, process_ordinal, process_time_micros, target_keys FROM {} WHERE source_id = $1 AND source_key = $2", + "SELECT memoization_info, processed_source_ordinal, process_logic_fingerprint FROM {} WHERE source_id = $1 AND source_key = $2", db_setup.table_name ); let tracking_info = sqlx::query_as(&query_str) diff --git a/src/execution/indexer.rs b/src/execution/indexer.rs index e52c599e..16358b64 100644 --- a/src/execution/indexer.rs +++ b/src/execution/indexer.rs @@ -8,7 +8,7 @@ use sqlx::PgPool; use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; -use super::db_tracking::{self, read_source_tracking_info, TrackedTargetKey}; +use super::db_tracking::{self, read_source_tracking_info_for_processing, TrackedTargetKey}; use super::db_tracking_setup; use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, StoredMemoizationInfo}; use crate::base::schema; @@ -442,7 +442,7 @@ pub async fn evaluate_source_entry_with_memory( ) -> Result> { let stored_info = if options.enable_cache || !options.evaluation_only { let source_key_json = serde_json::to_value(key)?; - let existing_tracking_info = read_source_tracking_info( + let existing_tracking_info = read_source_tracking_info_for_processing( source_op.source_id, &source_key_json, &plan.tracking_table_setup, @@ -481,7 +481,7 @@ pub async fn update_source_entry( let process_timestamp = chrono::Utc::now(); // Phase 1: Evaluate with memoization info. - let existing_tracking_info = read_source_tracking_info( + let existing_tracking_info = read_source_tracking_info_for_processing( source_op.source_id, &source_key_json, &plan.tracking_table_setup,