diff --git a/src/builder/analyzed_flow.rs b/src/builder/analyzed_flow.rs index 9dab7d2e..64a5522c 100644 --- a/src/builder/analyzed_flow.rs +++ b/src/builder/analyzed_flow.rs @@ -20,7 +20,9 @@ impl AnalyzedFlow { flow_instance_ctx: Arc, ) -> Result { let (data_schema, setup_state, execution_plan_fut) = - analyzer::analyze_flow(&flow_instance, flow_instance_ctx.clone()).await?; + analyzer::analyze_flow(&flow_instance, flow_instance_ctx.clone()) + .await + .with_context(|| format!("analyzing flow `{}`", flow_instance.name))?; let execution_plan = async move { shared_ok(Arc::new( execution_plan_fut.await.map_err(SharedError::new)?, diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index b1a403c7..bea3e737 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -382,7 +382,8 @@ impl SourceIndexingContext { error!( "{:?}", e.context(format!( - "Error in processing row from source `{source}` with key: {key}", + "Error in processing row from flow `{flow}` source `{source}` with key: {key}", + flow = self.flow.flow_instance.name, source = self.flow.flow_instance.import_ops[self.source_idx].name, key = row_input.key, ))