From 34452b6a87ddfe16116e2db12c5d8a8bc0e3d4bb Mon Sep 17 00:00:00 2001 From: LJ Date: Wed, 5 Mar 2025 16:25:48 -0800 Subject: [PATCH] Bug fix: for evaluate API, use position in source instead of schema. --- src/execution/evaluator.rs | 4 ++-- src/execution/indexer.rs | 13 +++---------- src/service/flows.rs | 17 +++++++++-------- 3 files changed, 14 insertions(+), 20 deletions(-) diff --git a/src/execution/evaluator.rs b/src/execution/evaluator.rs index d9220464..507bf30c 100644 --- a/src/execution/evaluator.rs +++ b/src/execution/evaluator.rs @@ -412,7 +412,7 @@ async fn evaluate_op_scope( pub async fn evaluate_source_entry<'a>( plan: &ExecutionPlan, - source_op_idx: u32, + source_op_idx: usize, schema: &schema::DataSchema, key: &value::KeyValue, cache: Option<&EvaluationCache>, @@ -426,7 +426,7 @@ pub async fn evaluate_source_entry<'a>( schema: &root_schema, }; - let source_op = &plan.source_ops[source_op_idx as usize]; + let source_op = &plan.source_ops[source_op_idx]; let collection_schema = match &root_schema.fields[source_op.output.field_idx as usize] .value_type .typ diff --git a/src/execution/indexer.rs b/src/execution/indexer.rs index 1694742a..0ee493b9 100644 --- a/src/execution/indexer.rs +++ b/src/execution/indexer.rs @@ -415,7 +415,7 @@ async fn commit_source_tracking_info( pub async fn update_source_entry<'a>( plan: &ExecutionPlan, - source_op_idx: u32, + source_op_idx: usize, schema: &schema::DataSchema, key: &value::KeyValue, pool: &PgPool, @@ -528,7 +528,7 @@ pub async fn update_source_entry<'a>( async fn update_source( source_name: &str, plan: &ExecutionPlan, - source_op_idx: u32, + source_op_idx: usize, schema: &schema::DataSchema, pool: &PgPool, ) -> Result { @@ -591,14 +591,7 @@ pub async fn update( .iter() .enumerate() .map(|(source_op_idx, source_op)| async move { - update_source( - source_op.name.as_str(), - plan, - source_op_idx as u32, - schema, - pool, - ) - .await + update_source(source_op.name.as_str(), plan, source_op_idx, schema, pool).await }) .collect::>(), ) diff --git a/src/service/flows.rs b/src/service/flows.rs index d83138b2..bbbfb974 100644 --- a/src/service/flows.rs +++ b/src/service/flows.rs @@ -122,30 +122,31 @@ pub async fn evaluate_data( let fl = &lib_context.with_flow_context(&flow_name, |ctx| ctx.flow.clone())?; let schema = &fl.data_schema; - let field_idx = schema - .fields + let source_op_idx = fl + .flow_instance + .source_ops .iter() - .position(|f| f.name == query.field) + .position(|source_op| source_op.name == query.field) .ok_or_else(|| { ApiError::new( - &format!("field not found: {}", query.field), + &format!("source field not found: {}", query.field), StatusCode::BAD_REQUEST, ) })?; - - let field_schema = &schema.fields[field_idx]; + let execution_plan = fl.get_execution_plan().await?; + let field_schema = + &schema.fields[execution_plan.source_ops[source_op_idx].output.field_idx as usize]; let collection_schema = match &field_schema.value_type.typ { schema::ValueType::Collection(collection) => collection, _ => api_bail!("field is not a table: {}", query.field), }; - let execution_plan = fl.get_execution_plan().await?; let key_field = collection_schema .key_field() .ok_or_else(|| api_error!("field {} does not have a key", query.field))?; let key = value::KeyValue::from_strs(query.key, &key_field.value_type.typ)?; let data_builder = - evaluator::evaluate_source_entry(&execution_plan, field_idx as u32, &schema, &key, None) + evaluator::evaluate_source_entry(&execution_plan, source_op_idx, &schema, &key, None) .await? .ok_or_else(|| { api_error!("value not found for source at the specified key: {key:?}")