From 4aba6d75e74d4eb3f313728f0a659131ef63ba5f Mon Sep 17 00:00:00 2001 From: LJ Date: Wed, 19 Mar 2025 19:37:53 -0700 Subject: [PATCH] Make error location caused by functions more clear. --- src/base/value.rs | 22 ++++++++++++++++++++++ src/execution/evaluator.rs | 25 +++++++++++++++++++------ src/execution/indexer.rs | 5 ++--- 3 files changed, 43 insertions(+), 9 deletions(-) diff --git a/src/base/value.rs b/src/base/value.rs index 06d8a467..6dc0ddc9 100644 --- a/src/base/value.rs +++ b/src/base/value.rs @@ -133,6 +133,28 @@ impl serde::Serialize for KeyValue { } } +impl std::fmt::Display for KeyValue { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + KeyValue::Bytes(v) => write!(f, "{}", BASE64_STANDARD.encode(v)), + KeyValue::Str(v) => write!(f, "\"{}\"", v.escape_default()), + KeyValue::Bool(v) => write!(f, "{}", v), + KeyValue::Int64(v) => write!(f, "{}", v), + KeyValue::Range(v) => write!(f, "[{}, {})", v.start, v.end), + KeyValue::Struct(v) => { + write!( + f, + "[{}]", + v.iter() + .map(|v| v.to_string()) + .collect::>() + .join(", ") + ) + } + } + } +} + impl KeyValue { fn parts_from_str( values_iter: &mut impl Iterator, diff --git a/src/execution/evaluator.rs b/src/execution/evaluator.rs index 9222c38d..e2a9de70 100644 --- a/src/execution/evaluator.rs +++ b/src/execution/evaluator.rs @@ -1,7 +1,7 @@ use std::sync::{Mutex, OnceLock}; use std::{borrow::Cow, collections::BTreeMap}; -use anyhow::{bail, Ok, Result}; +use anyhow::{bail, Context, Ok, Result}; use futures::future::try_join_all; use crate::builder::{plan::*, AnalyzedTransientFlow}; @@ -298,7 +298,17 @@ async fn evaluate_child_op_scope( child_scope_entry: ScopeEntry<'_>, cache: Option<&EvaluationCache>, ) -> Result<()> { - evaluate_op_scope(op_scope, scoped_entries.prepend(&child_scope_entry), cache).await + evaluate_op_scope(op_scope, scoped_entries.prepend(&child_scope_entry), cache) + .await + .with_context(|| { + format!( + "Evaluating in scope with key {}", + match child_scope_entry.key.key() { + Some(k) => k.to_string(), + None => "()".to_string(), + } + ) + }) } async fn evaluate_op_scope( @@ -331,7 +341,8 @@ async fn evaluate_op_scope( let output_value = evaluate_with_cell(output_value_cell.as_ref(), move || { op.executor.evaluate(input_values) }) - .await?; + .await + .with_context(|| format!("Evaluating Transform op `{}`", op.name,))?; head_scope.define_field(&op.output, &output_value)?; } @@ -339,7 +350,7 @@ async fn evaluate_op_scope( let target_field_schema = head_scope.get_field_schema(&op.local_field_ref)?; let collection_schema = match &target_field_schema.value_type.typ { schema::ValueType::Collection(cs) => cs, - _ => panic!("Expect target field to be a collection"), + _ => bail!("Expect target field to be a collection"), }; let target_field = head_scope.get_value_field_builder(&op.local_field_ref); @@ -391,10 +402,12 @@ async fn evaluate_op_scope( }) .collect::>(), _ => { - panic!("Target field type is expected to be a collection"); + bail!("Target field type is expected to be a collection"); } }; - try_join_all(task_futs).await?; + try_join_all(task_futs) + .await + .with_context(|| format!("Evaluating ForEach op `{}`", op.name,))?; } AnalyzedReactiveOp::Collect(op) => { diff --git a/src/execution/indexer.rs b/src/execution/indexer.rs index af9520d6..e5cd6417 100644 --- a/src/execution/indexer.rs +++ b/src/execution/indexer.rs @@ -1,6 +1,6 @@ use anyhow::Result; use futures::future::{join_all, try_join, try_join_all}; -use log::{debug, error}; +use log::error; use serde::Serialize; use sqlx::PgPool; use std::collections::{HashMap, HashSet}; @@ -597,8 +597,7 @@ async fn update_source( let num_errors = join_all(all_keys_set.into_iter().map(|key| async move { let result = update_source_entry(plan, source_op_idx, schema, &key, pool).await; if let Err(e) = result { - error!("Error indexing source row: {}", e); - debug!("Detailed error: {:?}", e); + error!("{:?}", e.context("Error in indexing a source row")); 1 } else { 0