Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions src/execution/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,14 +413,41 @@ async fn commit_source_tracking_info(
Ok(WithApplyStatus::Normal(()))
}

/// Built an evaluation cache on the existing data.
pub async fn evaluation_cache_on_existing_data(
plan: &ExecutionPlan,
source_op_idx: usize,
key: &value::KeyValue,
pool: &PgPool,
) -> Result<EvaluationCache> {
let source_id = plan.source_ops[source_op_idx].source_id;
let source_key_json = serde_json::to_value(key)?;
let existing_tracking_info = read_source_tracking_info(
source_id,
&source_key_json,
&plan.tracking_table_setup,
pool,
)
.await?;
let process_timestamp = chrono::Utc::now();
let memoization_info = existing_tracking_info
.map(|info| info.memoization_info.map(|info| info.0))
.flatten()
.flatten();
Ok(EvaluationCache::new(
process_timestamp,
memoization_info.map(|info| info.cache),
))
}

pub async fn update_source_entry<'a>(
plan: &ExecutionPlan,
source_op_idx: usize,
schema: &schema::DataSchema,
key: &value::KeyValue,
pool: &PgPool,
) -> Result<()> {
let source_id = plan.source_ops[source_op_idx as usize].source_id;
let source_id = plan.source_ops[source_op_idx].source_id;
let source_key_json = serde_json::to_value(key)?;
let process_timestamp = chrono::Utc::now();

Expand Down Expand Up @@ -532,7 +559,7 @@ async fn update_source(
schema: &schema::DataSchema,
pool: &PgPool,
) -> Result<SourceUpdateInfo> {
let source_op = &plan.source_ops[source_op_idx as usize];
let source_op = &plan.source_ops[source_op_idx];
let (keys, existing_keys_json) = try_join(
source_op.executor.list_keys(),
db_tracking::list_source_tracking_keys(
Expand Down
22 changes: 16 additions & 6 deletions src/service/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,22 @@ pub async fn evaluate_data(
.ok_or_else(|| api_error!("field {} does not have a key", query.field))?;
let key = value::KeyValue::from_strs(query.key, &key_field.value_type.typ)?;

let data_builder =
evaluator::evaluate_source_entry(&execution_plan, source_op_idx, &schema, &key, None)
.await?
.ok_or_else(|| {
api_error!("value not found for source at the specified key: {key:?}")
})?;
let evaluation_cache = indexer::evaluation_cache_on_existing_data(
&execution_plan,
source_op_idx,
&key,
&lib_context.pool,
)
.await?;
let data_builder = evaluator::evaluate_source_entry(
&execution_plan,
source_op_idx,
&schema,
&key,
Some(&evaluation_cache),
)
.await?
.ok_or_else(|| api_error!("value not found for source at the specified key: {key:?}"))?;

Ok(Json(EvaluateDataResponse {
schema: schema.clone(),
Expand Down