diff --git a/docs/docs/core/flow_def.mdx b/docs/docs/core/flow_def.mdx index 41fd27f9..1bcc4c24 100644 --- a/docs/docs/core/flow_def.mdx +++ b/docs/docs/core/flow_def.mdx @@ -158,6 +158,16 @@ You can pass the following arguments to `add_source()` to control the concurrenc * `max_inflight_rows`: the maximum number of concurrent inflight requests for the source operation. * `max_inflight_bytes`: the maximum number of concurrent inflight bytes for the source operation. +For example: + +```py +@cocoindex.flow_def(name="DemoFlow") +def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): + data_scope["documents"] = flow_builder.add_source( + DemoSourceSpec(...), max_inflight_rows=10, max_inflight_bytes=100*1024*1024) + ...... +``` + The default value can be specified by [`DefaultExecutionOptions`](/docs/core/settings#defaultexecutionoptions) or corresponding [environment variable](/docs/core/settings#list-of-environment-variables). ### Transform @@ -204,6 +214,25 @@ def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataSco +#### Concurrency control + +You can pass the following arguments to `row()` to control the concurrency of the for-each operation: + +* `max_inflight_rows`: the maximum number of concurrent inflight requests for the for-each operation. +* `max_inflight_bytes`: the maximum number of concurrent inflight bytes for the for-each operation. + We only take the number of bytes from this row before this for-each operation into account. + +For example: + +```python +@cocoindex.flow_def(name="DemoFlow") +def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): + ... + with data_scope["table1"].row(max_inflight_rows=10, max_inflight_bytes=10*1024*1024) as table1_row: + # Children operations + table1_row["field2"] = table1_row["field1"].transform(DemoFunctionSpec(...)) +``` + ### Get a sub field If the data slice has `Struct` type, you can obtain a data slice on a specific sub field of it, similar to getting a field of a data scope. diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index 498e068c..5ce6a0cb 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -198,18 +198,42 @@ def __getitem__(self, field_name: str) -> DataSlice[T]: raise KeyError(field_name) return DataSlice(_DataSliceState(self._state.flow_builder_state, field_slice)) - def row(self) -> DataScope: + def row( + self, + /, + *, + max_inflight_rows: int | None = None, + max_inflight_bytes: int | None = None, + ) -> DataScope: """ Return a scope representing each row of the table. """ - row_scope = self._state.engine_data_slice.table_row_scope() + row_scope = self._state.flow_builder_state.engine_flow_builder.for_each( + self._state.engine_data_slice, + execution_options=dump_engine_object( + _ExecutionOptions( + max_inflight_rows=max_inflight_rows, + max_inflight_bytes=max_inflight_bytes, + ), + ), + ) return DataScope(self._state.flow_builder_state, row_scope) - def for_each(self, f: Callable[[DataScope], None]) -> None: + def for_each( + self, + f: Callable[[DataScope], None], + /, + *, + max_inflight_rows: int | None = None, + max_inflight_bytes: int | None = None, + ) -> None: """ Apply a function to each row of the collection. """ - with self.row() as scope: + with self.row( + max_inflight_rows=max_inflight_rows, + max_inflight_bytes=max_inflight_bytes, + ) as scope: f(scope) def transform( diff --git a/src/base/spec.rs b/src/base/spec.rs index 8ecf3b23..340cdc3b 100644 --- a/src/base/spec.rs +++ b/src/base/spec.rs @@ -255,7 +255,10 @@ impl SpecFormatter for OpSpec { #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct ExecutionOptions { + #[serde(default, skip_serializing_if = "Option::is_none")] pub max_inflight_rows: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] pub max_inflight_bytes: Option, } @@ -327,6 +330,9 @@ pub struct ForEachOpSpec { /// Mapping that provides a table to apply reactive operations to. pub field_path: FieldPath, pub op_scope: ReactiveOpScope, + + #[serde(default)] + pub execution_options: ExecutionOptions, } impl ForEachOpSpec { diff --git a/src/base/value.rs b/src/base/value.rs index 49c7d949..f594b117 100644 --- a/src/base/value.rs +++ b/src/base/value.rs @@ -14,6 +14,14 @@ use serde::{ }; use std::{collections::BTreeMap, ops::Deref, sync::Arc}; +pub trait EstimatedByteSize: Sized { + fn estimated_detached_byte_size(&self) -> usize; + + fn estimated_byte_size(&self) -> usize { + self.estimated_detached_byte_size() + std::mem::size_of::() + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct RangeValue { pub start: usize, @@ -855,7 +863,7 @@ impl Value { } } -impl Value { +impl Value { pub fn estimated_byte_size(&self) -> usize { std::mem::size_of::() + match self { @@ -885,6 +893,16 @@ pub struct FieldValues { pub fields: Vec>, } +impl EstimatedByteSize for FieldValues { + fn estimated_detached_byte_size(&self) -> usize { + self.fields + .iter() + .map(Value::::estimated_byte_size) + .sum::() + + self.fields.len() * std::mem::size_of::>() + } +} + impl serde::Serialize for FieldValues { fn serialize(&self, serializer: S) -> Result { self.fields.serialize(serializer) @@ -954,23 +972,15 @@ where } } -impl FieldValues { - fn estimated_detached_byte_size(&self) -> usize { - self.fields - .iter() - .map(Value::estimated_byte_size) - .sum::() - + self.fields.len() * std::mem::size_of::>() - } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct ScopeValue(pub FieldValues); - pub fn estimated_byte_size(&self) -> usize { - self.estimated_detached_byte_size() + std::mem::size_of::() +impl EstimatedByteSize for ScopeValue { + fn estimated_detached_byte_size(&self) -> usize { + self.0.estimated_detached_byte_size() } } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub struct ScopeValue(pub FieldValues); - impl Deref for ScopeValue { type Target = FieldValues; diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index 217f1351..9c06517b 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -807,6 +807,8 @@ impl AnalyzerContext { analyzed_op_scope_fut }; let op_name = reactive_op.name.clone(); + + let exec_options = foreach_op.execution_options.clone(); async move { Ok(AnalyzedReactiveOp::ForEach(AnalyzedForEachOp { local_field_ref, @@ -814,6 +816,10 @@ impl AnalyzerContext { .await .with_context(|| format!("Analyzing foreach op: {op_name}"))?, name: op_name, + concurrency_controller: concur_control::ConcurrencyController::new( + exec_options.max_inflight_rows, + exec_options.max_inflight_bytes, + ), })) } .boxed() diff --git a/src/builder/flow_builder.rs b/src/builder/flow_builder.rs index db5bf8a2..de1c0319 100644 --- a/src/builder/flow_builder.rs +++ b/src/builder/flow_builder.rs @@ -156,24 +156,6 @@ impl DataSlice { data_type: field_schema.value_type.clone().into(), })) } - - pub fn table_row_scope(&self) -> PyResult { - let field_path = match self.value.as_ref() { - spec::ValueMapping::Field(v) => &v.field_path, - _ => return Err(PyException::new_err("expect field path")), - }; - let num_parent_layers = self.scope.ancestors().count(); - let scope_name = format!( - "{}_{}", - field_path.last().map_or("", |s| s.as_str()), - num_parent_layers - ); - let (_, sub_op_scope) = self - .scope - .new_foreach_op_scope(scope_name, field_path) - .into_py_result()?; - Ok(OpScopeRef(sub_op_scope)) - } } impl DataSlice { @@ -383,6 +365,48 @@ impl FlowBuilder { Ok(()) } + #[pyo3(signature = (data_slice, execution_options=None))] + pub fn for_each( + &mut self, + data_slice: DataSlice, + execution_options: Option>, + ) -> PyResult { + let parent_scope = &data_slice.scope; + let field_path = match data_slice.value.as_ref() { + spec::ValueMapping::Field(v) => &v.field_path, + _ => return Err(PyException::new_err("expect field path")), + }; + let num_parent_layers = parent_scope.ancestors().count(); + let scope_name = format!( + "{}_{}", + field_path.last().map_or("", |s| s.as_str()), + num_parent_layers + ); + let (_, child_op_scope) = parent_scope + .new_foreach_op_scope(scope_name.clone(), field_path) + .into_py_result()?; + + let reactive_op = spec::NamedSpec { + name: format!(".for_each.{}", self.next_generated_op_id), + spec: spec::ReactiveOpSpec::ForEach(spec::ForEachOpSpec { + field_path: field_path.clone(), + op_scope: spec::ReactiveOpScope { + name: scope_name, + ops: vec![], + }, + execution_options: execution_options + .map(|o| o.into_inner()) + .unwrap_or_default(), + }), + }; + self.next_generated_op_id += 1; + self.get_mut_reactive_ops(parent_scope) + .into_py_result()? + .push(reactive_op); + + Ok(OpScopeRef(child_op_scope)) + } + #[pyo3(signature = (kind, op_spec, args, target_scope, name))] pub fn transform( &mut self, @@ -428,7 +452,9 @@ impl FlowBuilder { .into_py_result()?; std::mem::drop(analyzed); - self.get_mut_reactive_ops(op_scope).push(reactive_op); + self.get_mut_reactive_ops(op_scope) + .into_py_result()? + .push(reactive_op); let result = Self::last_field_to_data_slice(op_scope).into_py_result()?; Ok(result) @@ -476,7 +502,9 @@ impl FlowBuilder { .into_py_result()?; std::mem::drop(analyzed); - self.get_mut_reactive_ops(common_scope).push(reactive_op); + self.get_mut_reactive_ops(common_scope) + .into_py_result()? + .push(reactive_op); let collector_schema = CollectorSchema::from_fields( fields @@ -741,27 +769,19 @@ impl FlowBuilder { fn get_mut_reactive_ops<'a>( &'a mut self, op_scope: &OpScope, - ) -> &'a mut Vec> { - Self::get_mut_reactive_ops_internal( - op_scope, - &mut self.reactive_ops, - &mut self.next_generated_op_id, - ) + ) -> Result<&'a mut Vec>> { + Self::get_mut_reactive_ops_internal(op_scope, &mut self.reactive_ops) } fn get_mut_reactive_ops_internal<'a>( op_scope: &OpScope, root_reactive_ops: &'a mut Vec>, - next_generated_op_id: &mut usize, - ) -> &'a mut Vec> { - match &op_scope.parent { + ) -> Result<&'a mut Vec>> { + let result = match &op_scope.parent { None => root_reactive_ops, Some((parent_op_scope, field_path)) => { - let parent_reactive_ops = Self::get_mut_reactive_ops_internal( - parent_op_scope, - root_reactive_ops, - next_generated_op_id, - ); + let parent_reactive_ops = + Self::get_mut_reactive_ops_internal(parent_op_scope, root_reactive_ops)?; // Reuse the last foreach if matched, otherwise create a new one. match parent_reactive_ops.last() { Some(spec::NamedSpec { @@ -771,17 +791,7 @@ impl FlowBuilder { && foreach_spec.op_scope.name == op_scope.name => {} _ => { - parent_reactive_ops.push(spec::NamedSpec { - name: format!(".foreach.{}", next_generated_op_id), - spec: spec::ReactiveOpSpec::ForEach(spec::ForEachOpSpec { - field_path: field_path.clone(), - op_scope: spec::ReactiveOpScope { - name: op_scope.name.clone(), - ops: vec![], - }, - }), - }); - *next_generated_op_id += 1; + api_bail!("already out of op scope `{}`", op_scope.name); } } match &mut parent_reactive_ops.last_mut().unwrap().spec { @@ -789,6 +799,7 @@ impl FlowBuilder { _ => unreachable!(), } } - } + }; + Ok(result) } } diff --git a/src/builder/plan.rs b/src/builder/plan.rs index 7bf55419..d3ae3acf 100644 --- a/src/builder/plan.rs +++ b/src/builder/plan.rs @@ -82,6 +82,7 @@ pub struct AnalyzedForEachOp { pub name: String, pub local_field_ref: AnalyzedLocalFieldReference, pub op_scope: AnalyzedOpScope, + pub concurrency_controller: concur_control::ConcurrencyController, } pub struct AnalyzedCollectOp { diff --git a/src/execution/evaluator.rs b/src/execution/evaluator.rs index ad18a7ab..d6de0f85 100644 --- a/src/execution/evaluator.rs +++ b/src/execution/evaluator.rs @@ -3,6 +3,7 @@ use crate::prelude::*; use anyhow::{Context, Ok}; use futures::future::try_join_all; +use crate::base::value::EstimatedByteSize; use crate::builder::{AnalyzedTransientFlow, plan::*}; use crate::py::IntoPyResult; use crate::{ @@ -18,6 +19,15 @@ pub struct ScopeValueBuilder { pub fields: Vec>>, } +impl value::EstimatedByteSize for ScopeValueBuilder { + fn estimated_detached_byte_size(&self) -> usize { + self.fields + .iter() + .map(|f| f.get().map_or(0, |v| v.estimated_byte_size())) + .sum() + } +} + impl From<&ScopeValueBuilder> for value::ScopeValue { fn from(val: &ScopeValueBuilder) -> Self { value::ScopeValue(value::FieldValues { @@ -295,8 +305,19 @@ async fn evaluate_child_op_scope( op_scope: &AnalyzedOpScope, scoped_entries: RefList<'_, &ScopeEntry<'_>>, child_scope_entry: ScopeEntry<'_>, + concurrency_controller: &concur_control::ConcurrencyController, memory: &EvaluationMemory, ) -> Result<()> { + let _permit = concurrency_controller + .acquire(Some(|| { + child_scope_entry + .value + .fields + .iter() + .map(|f| f.get().map_or(0, |v| v.estimated_byte_size())) + .sum() + })) + .await?; evaluate_op_scope(op_scope, scoped_entries.prepend(&child_scope_entry), memory) .await .with_context(|| { @@ -363,6 +384,7 @@ async fn evaluate_op_scope( &table_schema.row, &op.op_scope, ), + &op.concurrency_controller, memory, ) }) @@ -379,6 +401,7 @@ async fn evaluate_op_scope( &table_schema.row, &op.op_scope, ), + &op.concurrency_controller, memory, ) }) @@ -396,6 +419,7 @@ async fn evaluate_op_scope( &table_schema.row, &op.op_scope, ), + &op.concurrency_controller, memory, ) })