From 601024ce1de9ddac886389189335cf8c6da0b484 Mon Sep 17 00:00:00 2001 From: LJ Date: Wed, 5 Mar 2025 16:30:52 -0800 Subject: [PATCH 1/2] Cleanup unnecessary casts. --- src/execution/indexer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/execution/indexer.rs b/src/execution/indexer.rs index 0ee493b9..cd5bea2c 100644 --- a/src/execution/indexer.rs +++ b/src/execution/indexer.rs @@ -420,7 +420,7 @@ pub async fn update_source_entry<'a>( key: &value::KeyValue, pool: &PgPool, ) -> Result<()> { - let source_id = plan.source_ops[source_op_idx as usize].source_id; + let source_id = plan.source_ops[source_op_idx].source_id; let source_key_json = serde_json::to_value(key)?; let process_timestamp = chrono::Utc::now(); @@ -532,7 +532,7 @@ async fn update_source( schema: &schema::DataSchema, pool: &PgPool, ) -> Result { - let source_op = &plan.source_ops[source_op_idx as usize]; + let source_op = &plan.source_ops[source_op_idx]; let (keys, existing_keys_json) = try_join( source_op.executor.list_keys(), db_tracking::list_source_tracking_keys( From 091774373392bd3276c10c608ddac52d3f844cdd Mon Sep 17 00:00:00 2001 From: LJ Date: Wed, 5 Mar 2025 16:44:37 -0800 Subject: [PATCH 2/2] Enable evaluation cache in the readonly evaluation API. --- src/execution/indexer.rs | 27 +++++++++++++++++++++++++++ src/service/flows.rs | 22 ++++++++++++++++------ 2 files changed, 43 insertions(+), 6 deletions(-) diff --git a/src/execution/indexer.rs b/src/execution/indexer.rs index cd5bea2c..5aecf9d0 100644 --- a/src/execution/indexer.rs +++ b/src/execution/indexer.rs @@ -413,6 +413,33 @@ async fn commit_source_tracking_info( Ok(WithApplyStatus::Normal(())) } +/// Built an evaluation cache on the existing data. +pub async fn evaluation_cache_on_existing_data( + plan: &ExecutionPlan, + source_op_idx: usize, + key: &value::KeyValue, + pool: &PgPool, +) -> Result { + let source_id = plan.source_ops[source_op_idx].source_id; + let source_key_json = serde_json::to_value(key)?; + let existing_tracking_info = read_source_tracking_info( + source_id, + &source_key_json, + &plan.tracking_table_setup, + pool, + ) + .await?; + let process_timestamp = chrono::Utc::now(); + let memoization_info = existing_tracking_info + .map(|info| info.memoization_info.map(|info| info.0)) + .flatten() + .flatten(); + Ok(EvaluationCache::new( + process_timestamp, + memoization_info.map(|info| info.cache), + )) +} + pub async fn update_source_entry<'a>( plan: &ExecutionPlan, source_op_idx: usize, diff --git a/src/service/flows.rs b/src/service/flows.rs index bbbfb974..92c4f9a5 100644 --- a/src/service/flows.rs +++ b/src/service/flows.rs @@ -145,12 +145,22 @@ pub async fn evaluate_data( .ok_or_else(|| api_error!("field {} does not have a key", query.field))?; let key = value::KeyValue::from_strs(query.key, &key_field.value_type.typ)?; - let data_builder = - evaluator::evaluate_source_entry(&execution_plan, source_op_idx, &schema, &key, None) - .await? - .ok_or_else(|| { - api_error!("value not found for source at the specified key: {key:?}") - })?; + let evaluation_cache = indexer::evaluation_cache_on_existing_data( + &execution_plan, + source_op_idx, + &key, + &lib_context.pool, + ) + .await?; + let data_builder = evaluator::evaluate_source_entry( + &execution_plan, + source_op_idx, + &schema, + &key, + Some(&evaluation_cache), + ) + .await? + .ok_or_else(|| api_error!("value not found for source at the specified key: {key:?}"))?; Ok(Json(EvaluateDataResponse { schema: schema.clone(),