From fcc3b2fe93c5d1a34574f3f1ee1226f37df433dc Mon Sep 17 00:00:00 2001 From: Jiangzhou He Date: Mon, 7 Jul 2025 17:29:14 -0700 Subject: [PATCH] feat(flow-control): do flow control during evaluation too --- src/execution/dumper.rs | 4 ++++ src/execution/evaluator.rs | 5 +++++ src/execution/source_indexer.rs | 9 --------- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/execution/dumper.rs b/src/execution/dumper.rs index bc22e3c3..52cfcd12 100644 --- a/src/execution/dumper.rs +++ b/src/execution/dumper.rs @@ -136,6 +136,10 @@ impl<'a> Dumper<'a> { key: value::KeyValue, file_path: PathBuf, ) -> Result<()> { + let _permit = import_op + .concurrency_controller + .acquire(concur_control::BYTES_UNKNOWN_YET) + .await?; let mut collected_values_buffer = Vec::new(); let (exports, error) = match self .evaluate_source_entry(import_op_idx, import_op, &key, &mut collected_values_buffer) diff --git a/src/execution/evaluator.rs b/src/execution/evaluator.rs index e6516ded..ad18a7ab 100644 --- a/src/execution/evaluator.rs +++ b/src/execution/evaluator.rs @@ -464,6 +464,11 @@ pub async fn evaluate_source_entry( source_value: value::FieldValues, memory: &EvaluationMemory, ) -> Result { + let _permit = src_eval_ctx + .import_op + .concurrency_controller + .acquire_bytes_with_reservation(|| source_value.estimated_byte_size()) + .await?; let root_schema = &src_eval_ctx.schema.schema; let root_scope_value = ScopeValueBuilder::new(root_schema.fields.len()); let root_scope_entry = ScopeEntry::new( diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index 11451bb6..a2469735 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -163,15 +163,6 @@ impl SourceIndexingContext { { let _processing_permit = processing_sem.acquire().await?; - let _concur_permit = match &source_data.value { - interface::SourceValue::Existence(value) => { - import_op - .concurrency_controller - .acquire_bytes_with_reservation(|| value.estimated_byte_size()) - .await? - } - interface::SourceValue::NonExistence => None, - }; let result = row_indexer::update_source_row( &SourceRowEvaluationContext { plan: &plan,