Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -969,14 +969,38 @@ impl AnalyzerContext {
op_futs.push(self.analyze_reactive_op(op_scope, reactive_op).await?);
}
let collector_len = op_scope.states.lock().unwrap().collectors.len();
let scope_qualifier = self.build_scope_qualifier(op_scope);
let result_fut = async move {
Ok(AnalyzedOpScope {
reactive_ops: try_join_all(op_futs).await?,
collector_len,
scope_qualifier,
})
};
Ok(result_fut)
}

fn build_scope_qualifier(&self, op_scope: &Arc<OpScope>) -> String {
let mut scope_names = Vec::new();
let mut current_scope = op_scope.as_ref();

// Walk up the parent chain to collect scope names
while let Some((parent, _)) = &current_scope.parent {
scope_names.push(current_scope.name.as_str());
current_scope = parent.as_ref();
}

// Reverse to get the correct order (root to leaf)
scope_names.reverse();

// Build the qualifier string
let mut result = String::new();
for name in scope_names {
result.push_str(&name);
result.push('.');
}
result
}
}

pub fn build_flow_instance_context(
Expand Down Expand Up @@ -1059,6 +1083,7 @@ pub async fn analyze_flow(
let target_factory = get_target_factory(&target_kind)?;
let analyzed_target_op_group = AnalyzedExportTargetOpGroup {
target_factory,
target_kind: target_kind.clone(),
op_idx: op_ids.export_op_ids,
};
export_ops_futs.extend(
Expand Down
2 changes: 2 additions & 0 deletions src/builder/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ pub struct AnalyzedExportOp {

pub struct AnalyzedExportTargetOpGroup {
pub target_factory: Arc<dyn TargetFactory + Send + Sync>,
pub target_kind: String,
pub op_idx: Vec<usize>,
}

Expand All @@ -129,6 +130,7 @@ pub enum AnalyzedReactiveOp {
pub struct AnalyzedOpScope {
pub reactive_ops: Vec<AnalyzedReactiveOp>,
pub collector_len: usize,
pub scope_qualifier: String,
}

pub struct ExecutionPlan {
Expand Down
56 changes: 43 additions & 13 deletions src/execution/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ async fn evaluate_child_op_scope(
child_scope_entry: ScopeEntry<'_>,
concurrency_controller: &concur_control::ConcurrencyController,
memory: &EvaluationMemory,
operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>,
) -> Result<()> {
let _permit = concurrency_controller
.acquire(Some(|| {
Expand All @@ -333,32 +334,46 @@ async fn evaluate_child_op_scope(
.sum()
}))
.await?;
evaluate_op_scope(op_scope, scoped_entries.prepend(&child_scope_entry), memory)
.await
.with_context(|| {
format!(
"Evaluating in scope with key {}",
match child_scope_entry.key.key() {
Some(k) => k.to_string(),
None => "()".to_string(),
}
)
})
evaluate_op_scope(
op_scope,
scoped_entries.prepend(&child_scope_entry),
memory,
operation_in_process_stats,
)
.await
.with_context(|| {
format!(
"Evaluating in scope with key {}",
match child_scope_entry.key.key() {
Some(k) => k.to_string(),
None => "()".to_string(),
}
)
})
}

async fn evaluate_op_scope(
op_scope: &AnalyzedOpScope,
scoped_entries: RefList<'_, &ScopeEntry<'_>>,
memory: &EvaluationMemory,
operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>,
) -> Result<()> {
let head_scope = *scoped_entries.head().unwrap();
for reactive_op in op_scope.reactive_ops.iter() {
match reactive_op {
AnalyzedReactiveOp::Transform(op) => {
// Track transform operation start
if let Some(ref op_stats) = operation_in_process_stats {
let transform_key =
format!("transform/{}{}", op_scope.scope_qualifier, op.name);
op_stats.start_processing(&transform_key, 1);
}

let mut input_values = Vec::with_capacity(op.inputs.len());
input_values
.extend(assemble_input_values(&op.inputs, scoped_entries).collect::<Vec<_>>());
if op.function_exec_info.enable_cache {

let result = if op.function_exec_info.enable_cache {
let output_value_cell = memory.get_cache_entry(
|| {
Ok(op
Expand All @@ -382,7 +397,16 @@ async fn evaluate_op_scope(
.await
.and_then(|v| head_scope.define_field(&op.output, &v))
}
.with_context(|| format!("Evaluating Transform op `{}`", op.name,))?
.with_context(|| format!("Evaluating Transform op `{}`", op.name,));

// Track transform operation completion
if let Some(ref op_stats) = operation_in_process_stats {
let transform_key =
format!("transform/{}{}", op_scope.scope_qualifier, op.name);
op_stats.finish_processing(&transform_key, 1);
}

result?
}

AnalyzedReactiveOp::ForEach(op) => {
Expand All @@ -408,6 +432,7 @@ async fn evaluate_op_scope(
),
&op.concurrency_controller,
memory,
operation_in_process_stats,
)
})
.collect::<Vec<_>>(),
Expand All @@ -425,6 +450,7 @@ async fn evaluate_op_scope(
),
&op.concurrency_controller,
memory,
operation_in_process_stats,
)
})
.collect::<Vec<_>>(),
Expand All @@ -443,6 +469,7 @@ async fn evaluate_op_scope(
),
&op.concurrency_controller,
memory,
operation_in_process_stats,
)
})
.collect::<Vec<_>>(),
Expand Down Expand Up @@ -509,6 +536,7 @@ pub async fn evaluate_source_entry(
src_eval_ctx: &SourceRowEvaluationContext<'_>,
source_value: value::FieldValues,
memory: &EvaluationMemory,
operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>,
) -> Result<EvaluateSourceEntryOutput> {
let _permit = src_eval_ctx
.import_op
Expand Down Expand Up @@ -556,6 +584,7 @@ pub async fn evaluate_source_entry(
&src_eval_ctx.plan.op_scope,
RefList::Nil.prepend(&root_scope_entry),
memory,
operation_in_process_stats,
)
.await?;
let collected_values = root_scope_entry
Expand Down Expand Up @@ -604,6 +633,7 @@ pub async fn evaluate_transient_flow(
&flow.execution_plan.op_scope,
RefList::Nil.prepend(&root_scope_entry),
&eval_memory,
None, // No operation stats for transient flows
)
.await?;
let output_value = assemble_value(
Expand Down
9 changes: 9 additions & 0 deletions src/execution/live_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub struct FlowLiveUpdater {
flow_ctx: Arc<FlowContext>,
join_set: Mutex<Option<JoinSet<Result<()>>>>,
stats_per_task: Vec<Arc<stats::UpdateStats>>,
/// Global tracking of in-process rows per operation
pub operation_in_process_stats: Arc<stats::OperationInProcessStats>,
recv_state: tokio::sync::Mutex<UpdateReceiveState>,
num_remaining_tasks_rx: watch::Receiver<usize>,

Expand Down Expand Up @@ -83,6 +85,7 @@ struct SourceUpdateTask {
plan: Arc<plan::ExecutionPlan>,
execution_ctx: Arc<tokio::sync::OwnedRwLockReadGuard<crate::lib_context::FlowExecutionContext>>,
source_update_stats: Arc<stats::UpdateStats>,
operation_in_process_stats: Arc<stats::OperationInProcessStats>,
pool: PgPool,
options: FlowLiveUpdaterOptions,

Expand Down Expand Up @@ -137,6 +140,7 @@ impl SourceUpdateTask {
let change_stream_stats = change_stream_stats.clone();
let pool = self.pool.clone();
let status_tx = self.status_tx.clone();
let operation_in_process_stats = self.operation_in_process_stats.clone();
async move {
let mut change_stream = change_stream;
let retry_options = retryable::RetryOptions {
Expand Down Expand Up @@ -203,6 +207,7 @@ impl SourceUpdateTask {
},
super::source_indexer::UpdateMode::Normal,
update_stats.clone(),
Some(operation_in_process_stats.clone()),
concur_permit,
Some(move || async move {
SharedAckFn::ack(&shared_ack_fn).await
Expand Down Expand Up @@ -328,6 +333,7 @@ impl FlowLiveUpdater {

let mut join_set = JoinSet::new();
let mut stats_per_task = Vec::new();
let operation_in_process_stats = Arc::new(stats::OperationInProcessStats::default());

for source_idx in 0..plan.import_ops.len() {
let source_update_stats = Arc::new(stats::UpdateStats::default());
Expand All @@ -337,6 +343,7 @@ impl FlowLiveUpdater {
plan: plan.clone(),
execution_ctx: execution_ctx.clone(),
source_update_stats: source_update_stats.clone(),
operation_in_process_stats: operation_in_process_stats.clone(),
pool: pool.clone(),
options: options.clone(),
status_tx: status_tx.clone(),
Expand All @@ -345,10 +352,12 @@ impl FlowLiveUpdater {
join_set.spawn(source_update_task.run());
stats_per_task.push(source_update_stats);
}

Ok(Self {
flow_ctx,
join_set: Mutex::new(Some(join_set)),
stats_per_task,
operation_in_process_stats,
recv_state: tokio::sync::Mutex::new(UpdateReceiveState {
status_rx,
last_num_source_updates: vec![0; plan.import_ops.len()],
Expand Down
39 changes: 32 additions & 7 deletions src/execution/row_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ pub struct RowIndexer<'a> {
setup_execution_ctx: &'a exec_ctx::FlowSetupExecutionContext,
mode: super::source_indexer::UpdateMode,
update_stats: &'a stats::UpdateStats,
operation_in_process_stats: Option<&'a stats::OperationInProcessStats>,
pool: &'a PgPool,

source_id: i32,
Expand All @@ -201,6 +202,7 @@ impl<'a> RowIndexer<'a> {
mode: super::source_indexer::UpdateMode,
process_time: chrono::DateTime<chrono::Utc>,
update_stats: &'a stats::UpdateStats,
operation_in_process_stats: Option<&'a stats::OperationInProcessStats>,
pool: &'a PgPool,
) -> Result<Self> {
Ok(Self {
Expand All @@ -212,6 +214,7 @@ impl<'a> RowIndexer<'a> {
setup_execution_ctx,
mode,
update_stats,
operation_in_process_stats,
pool,
})
}
Expand Down Expand Up @@ -311,9 +314,13 @@ impl<'a> RowIndexer<'a> {
},
);

let output =
evaluate_source_entry(self.src_eval_ctx, source_value, &evaluation_memory)
.await?;
let output = evaluate_source_entry(
self.src_eval_ctx,
source_value,
&evaluation_memory,
self.operation_in_process_stats,
)
.await?;
let mut stored_info = evaluation_memory.into_stored()?;
if tracking_setup_state.has_fast_fingerprint_column {
(Some(output), stored_info, content_version_fp)
Expand Down Expand Up @@ -368,9 +375,27 @@ impl<'a> RowIndexer<'a> {
})
.collect();
(!mutations_w_ctx.is_empty()).then(|| {
export_op_group
.target_factory
.apply_mutation(mutations_w_ctx)
let export_key = format!("export/{}", export_op_group.target_kind);
let operation_in_process_stats = self.operation_in_process_stats;

async move {
// Track export operation start
if let Some(ref op_stats) = operation_in_process_stats {
op_stats.start_processing(&export_key, 1);
}

let result = export_op_group
.target_factory
.apply_mutation(mutations_w_ctx)
.await;

// Track export operation completion
if let Some(ref op_stats) = operation_in_process_stats {
op_stats.finish_processing(&export_key, 1);
}

result
}
})
});

Expand Down Expand Up @@ -875,7 +900,7 @@ pub async fn evaluate_source_entry_with_memory(
.ok_or_else(|| anyhow::anyhow!("value not returned"))?;
let output = match source_value {
interface::SourceValue::Existence(source_value) => {
Some(evaluate_source_entry(src_eval_ctx, source_value, &memory).await?)
Some(evaluate_source_entry(src_eval_ctx, source_value, &memory, None).await?)
}
interface::SourceValue::NonExistence => None,
};
Expand Down
Loading
Loading