Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/base/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,28 @@ impl serde::Serialize for KeyValue {
}
}

impl std::fmt::Display for KeyValue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
KeyValue::Bytes(v) => write!(f, "{}", BASE64_STANDARD.encode(v)),
KeyValue::Str(v) => write!(f, "\"{}\"", v.escape_default()),
KeyValue::Bool(v) => write!(f, "{}", v),
KeyValue::Int64(v) => write!(f, "{}", v),
KeyValue::Range(v) => write!(f, "[{}, {})", v.start, v.end),
KeyValue::Struct(v) => {
write!(
f,
"[{}]",
v.iter()
.map(|v| v.to_string())
.collect::<Vec<_>>()
.join(", ")
)
}
}
}
}

impl KeyValue {
fn parts_from_str(
values_iter: &mut impl Iterator<Item = String>,
Expand Down
25 changes: 19 additions & 6 deletions src/execution/evaluator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::{Mutex, OnceLock};
use std::{borrow::Cow, collections::BTreeMap};

use anyhow::{bail, Ok, Result};
use anyhow::{bail, Context, Ok, Result};
use futures::future::try_join_all;

use crate::builder::{plan::*, AnalyzedTransientFlow};
Expand Down Expand Up @@ -298,7 +298,17 @@ async fn evaluate_child_op_scope(
child_scope_entry: ScopeEntry<'_>,
cache: Option<&EvaluationCache>,
) -> Result<()> {
evaluate_op_scope(op_scope, scoped_entries.prepend(&child_scope_entry), cache).await
evaluate_op_scope(op_scope, scoped_entries.prepend(&child_scope_entry), cache)
.await
.with_context(|| {
format!(
"Evaluating in scope with key {}",
match child_scope_entry.key.key() {
Some(k) => k.to_string(),
None => "()".to_string(),
}
)
})
}

async fn evaluate_op_scope(
Expand Down Expand Up @@ -331,15 +341,16 @@ async fn evaluate_op_scope(
let output_value = evaluate_with_cell(output_value_cell.as_ref(), move || {
op.executor.evaluate(input_values)
})
.await?;
.await
.with_context(|| format!("Evaluating Transform op `{}`", op.name,))?;
head_scope.define_field(&op.output, &output_value)?;
}

AnalyzedReactiveOp::ForEach(op) => {
let target_field_schema = head_scope.get_field_schema(&op.local_field_ref)?;
let collection_schema = match &target_field_schema.value_type.typ {
schema::ValueType::Collection(cs) => cs,
_ => panic!("Expect target field to be a collection"),
_ => bail!("Expect target field to be a collection"),
};

let target_field = head_scope.get_value_field_builder(&op.local_field_ref);
Expand Down Expand Up @@ -391,10 +402,12 @@ async fn evaluate_op_scope(
})
.collect::<Vec<_>>(),
_ => {
panic!("Target field type is expected to be a collection");
bail!("Target field type is expected to be a collection");
}
};
try_join_all(task_futs).await?;
try_join_all(task_futs)
.await
.with_context(|| format!("Evaluating ForEach op `{}`", op.name,))?;
}

AnalyzedReactiveOp::Collect(op) => {
Expand Down
5 changes: 2 additions & 3 deletions src/execution/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use futures::future::{join_all, try_join, try_join_all};
use log::{debug, error};
use log::error;
use serde::Serialize;
use sqlx::PgPool;
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -597,8 +597,7 @@ async fn update_source(
let num_errors = join_all(all_keys_set.into_iter().map(|key| async move {
let result = update_source_entry(plan, source_op_idx, schema, &key, pool).await;
if let Err(e) = result {
error!("Error indexing source row: {}", e);
debug!("Detailed error: {:?}", e);
error!("{:?}", e.context("Error in indexing a source row"));
1
} else {
0
Expand Down