Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions docs/docs/core/flow_def.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,16 @@ You can pass the following arguments to `add_source()` to control the concurrenc
* `max_inflight_rows`: the maximum number of concurrent inflight requests for the source operation.
* `max_inflight_bytes`: the maximum number of concurrent inflight bytes for the source operation.

For example:

```py
@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
data_scope["documents"] = flow_builder.add_source(
DemoSourceSpec(...), max_inflight_rows=10, max_inflight_bytes=100*1024*1024)
......
```

The default value can be specified by [`DefaultExecutionOptions`](/docs/core/settings#defaultexecutionoptions) or corresponding [environment variable](/docs/core/settings#list-of-environment-variables).

### Transform
Expand Down Expand Up @@ -204,6 +214,25 @@ def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataSco
</TabItem>
</Tabs>

#### Concurrency control

You can pass the following arguments to `row()` to control the concurrency of the for-each operation:

* `max_inflight_rows`: the maximum number of concurrent inflight requests for the for-each operation.
* `max_inflight_bytes`: the maximum number of concurrent inflight bytes for the for-each operation.
We only take the number of bytes from this row before this for-each operation into account.

For example:

```python
@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
...
with data_scope["table1"].row(max_inflight_rows=10, max_inflight_bytes=10*1024*1024) as table1_row:
# Children operations
table1_row["field2"] = table1_row["field1"].transform(DemoFunctionSpec(...))
```

### Get a sub field

If the data slice has `Struct` type, you can obtain a data slice on a specific sub field of it, similar to getting a field of a data scope.
Expand Down
32 changes: 28 additions & 4 deletions python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,18 +198,42 @@ def __getitem__(self, field_name: str) -> DataSlice[T]:
raise KeyError(field_name)
return DataSlice(_DataSliceState(self._state.flow_builder_state, field_slice))

def row(self) -> DataScope:
def row(
self,
/,
*,
max_inflight_rows: int | None = None,
max_inflight_bytes: int | None = None,
) -> DataScope:
"""
Return a scope representing each row of the table.
"""
row_scope = self._state.engine_data_slice.table_row_scope()
row_scope = self._state.flow_builder_state.engine_flow_builder.for_each(
self._state.engine_data_slice,
execution_options=dump_engine_object(
_ExecutionOptions(
max_inflight_rows=max_inflight_rows,
max_inflight_bytes=max_inflight_bytes,
),
),
)
return DataScope(self._state.flow_builder_state, row_scope)

def for_each(self, f: Callable[[DataScope], None]) -> None:
def for_each(
self,
f: Callable[[DataScope], None],
/,
*,
max_inflight_rows: int | None = None,
max_inflight_bytes: int | None = None,
) -> None:
"""
Apply a function to each row of the collection.
"""
with self.row() as scope:
with self.row(
max_inflight_rows=max_inflight_rows,
max_inflight_bytes=max_inflight_bytes,
) as scope:
f(scope)

def transform(
Expand Down
6 changes: 6 additions & 0 deletions src/base/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,10 @@ impl SpecFormatter for OpSpec {

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ExecutionOptions {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_inflight_rows: Option<usize>,

#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_inflight_bytes: Option<usize>,
}

Expand Down Expand Up @@ -327,6 +330,9 @@ pub struct ForEachOpSpec {
/// Mapping that provides a table to apply reactive operations to.
pub field_path: FieldPath,
pub op_scope: ReactiveOpScope,

#[serde(default)]
pub execution_options: ExecutionOptions,
}

impl ForEachOpSpec {
Expand Down
38 changes: 24 additions & 14 deletions src/base/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ use serde::{
};
use std::{collections::BTreeMap, ops::Deref, sync::Arc};

pub trait EstimatedByteSize: Sized {
fn estimated_detached_byte_size(&self) -> usize;

fn estimated_byte_size(&self) -> usize {
self.estimated_detached_byte_size() + std::mem::size_of::<Self>()
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct RangeValue {
pub start: usize,
Expand Down Expand Up @@ -855,7 +863,7 @@ impl<VS> Value<VS> {
}
}

impl Value<ScopeValue> {
impl<VS: EstimatedByteSize> Value<VS> {
pub fn estimated_byte_size(&self) -> usize {
std::mem::size_of::<Self>()
+ match self {
Expand Down Expand Up @@ -885,6 +893,16 @@ pub struct FieldValues<VS = ScopeValue> {
pub fields: Vec<Value<VS>>,
}

impl<VS: EstimatedByteSize> EstimatedByteSize for FieldValues<VS> {
fn estimated_detached_byte_size(&self) -> usize {
self.fields
.iter()
.map(Value::<VS>::estimated_byte_size)
.sum::<usize>()
+ self.fields.len() * std::mem::size_of::<Value<VS>>()
}
}

impl serde::Serialize for FieldValues {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
self.fields.serialize(serializer)
Expand Down Expand Up @@ -954,23 +972,15 @@ where
}
}

impl FieldValues<ScopeValue> {
fn estimated_detached_byte_size(&self) -> usize {
self.fields
.iter()
.map(Value::estimated_byte_size)
.sum::<usize>()
+ self.fields.len() * std::mem::size_of::<Value<ScopeValue>>()
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ScopeValue(pub FieldValues);

pub fn estimated_byte_size(&self) -> usize {
self.estimated_detached_byte_size() + std::mem::size_of::<Self>()
impl EstimatedByteSize for ScopeValue {
fn estimated_detached_byte_size(&self) -> usize {
self.0.estimated_detached_byte_size()
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ScopeValue(pub FieldValues);

impl Deref for ScopeValue {
type Target = FieldValues;

Expand Down
6 changes: 6 additions & 0 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -807,13 +807,19 @@ impl AnalyzerContext {
analyzed_op_scope_fut
};
let op_name = reactive_op.name.clone();

let exec_options = foreach_op.execution_options.clone();
async move {
Ok(AnalyzedReactiveOp::ForEach(AnalyzedForEachOp {
local_field_ref,
op_scope: analyzed_op_scope_fut
.await
.with_context(|| format!("Analyzing foreach op: {op_name}"))?,
name: op_name,
concurrency_controller: concur_control::ConcurrencyController::new(
exec_options.max_inflight_rows,
exec_options.max_inflight_bytes,
),
}))
}
.boxed()
Expand Down
103 changes: 57 additions & 46 deletions src/builder/flow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,24 +156,6 @@ impl DataSlice {
data_type: field_schema.value_type.clone().into(),
}))
}

pub fn table_row_scope(&self) -> PyResult<OpScopeRef> {
let field_path = match self.value.as_ref() {
spec::ValueMapping::Field(v) => &v.field_path,
_ => return Err(PyException::new_err("expect field path")),
};
let num_parent_layers = self.scope.ancestors().count();
let scope_name = format!(
"{}_{}",
field_path.last().map_or("", |s| s.as_str()),
num_parent_layers
);
let (_, sub_op_scope) = self
.scope
.new_foreach_op_scope(scope_name, field_path)
.into_py_result()?;
Ok(OpScopeRef(sub_op_scope))
}
}

impl DataSlice {
Expand Down Expand Up @@ -383,6 +365,48 @@ impl FlowBuilder {
Ok(())
}

#[pyo3(signature = (data_slice, execution_options=None))]
pub fn for_each(
&mut self,
data_slice: DataSlice,
execution_options: Option<py::Pythonized<spec::ExecutionOptions>>,
) -> PyResult<OpScopeRef> {
let parent_scope = &data_slice.scope;
let field_path = match data_slice.value.as_ref() {
spec::ValueMapping::Field(v) => &v.field_path,
_ => return Err(PyException::new_err("expect field path")),
};
let num_parent_layers = parent_scope.ancestors().count();
let scope_name = format!(
"{}_{}",
field_path.last().map_or("", |s| s.as_str()),
num_parent_layers
);
let (_, child_op_scope) = parent_scope
.new_foreach_op_scope(scope_name.clone(), field_path)
.into_py_result()?;

let reactive_op = spec::NamedSpec {
name: format!(".for_each.{}", self.next_generated_op_id),
spec: spec::ReactiveOpSpec::ForEach(spec::ForEachOpSpec {
field_path: field_path.clone(),
op_scope: spec::ReactiveOpScope {
name: scope_name,
ops: vec![],
},
execution_options: execution_options
.map(|o| o.into_inner())
.unwrap_or_default(),
}),
};
self.next_generated_op_id += 1;
self.get_mut_reactive_ops(parent_scope)
.into_py_result()?
.push(reactive_op);

Ok(OpScopeRef(child_op_scope))
}

#[pyo3(signature = (kind, op_spec, args, target_scope, name))]
pub fn transform(
&mut self,
Expand Down Expand Up @@ -428,7 +452,9 @@ impl FlowBuilder {
.into_py_result()?;
std::mem::drop(analyzed);

self.get_mut_reactive_ops(op_scope).push(reactive_op);
self.get_mut_reactive_ops(op_scope)
.into_py_result()?
.push(reactive_op);

let result = Self::last_field_to_data_slice(op_scope).into_py_result()?;
Ok(result)
Expand Down Expand Up @@ -476,7 +502,9 @@ impl FlowBuilder {
.into_py_result()?;
std::mem::drop(analyzed);

self.get_mut_reactive_ops(common_scope).push(reactive_op);
self.get_mut_reactive_ops(common_scope)
.into_py_result()?
.push(reactive_op);

let collector_schema = CollectorSchema::from_fields(
fields
Expand Down Expand Up @@ -741,27 +769,19 @@ impl FlowBuilder {
fn get_mut_reactive_ops<'a>(
&'a mut self,
op_scope: &OpScope,
) -> &'a mut Vec<spec::NamedSpec<spec::ReactiveOpSpec>> {
Self::get_mut_reactive_ops_internal(
op_scope,
&mut self.reactive_ops,
&mut self.next_generated_op_id,
)
) -> Result<&'a mut Vec<spec::NamedSpec<spec::ReactiveOpSpec>>> {
Self::get_mut_reactive_ops_internal(op_scope, &mut self.reactive_ops)
}

fn get_mut_reactive_ops_internal<'a>(
op_scope: &OpScope,
root_reactive_ops: &'a mut Vec<spec::NamedSpec<spec::ReactiveOpSpec>>,
next_generated_op_id: &mut usize,
) -> &'a mut Vec<spec::NamedSpec<spec::ReactiveOpSpec>> {
match &op_scope.parent {
) -> Result<&'a mut Vec<spec::NamedSpec<spec::ReactiveOpSpec>>> {
let result = match &op_scope.parent {
None => root_reactive_ops,
Some((parent_op_scope, field_path)) => {
let parent_reactive_ops = Self::get_mut_reactive_ops_internal(
parent_op_scope,
root_reactive_ops,
next_generated_op_id,
);
let parent_reactive_ops =
Self::get_mut_reactive_ops_internal(parent_op_scope, root_reactive_ops)?;
// Reuse the last foreach if matched, otherwise create a new one.
match parent_reactive_ops.last() {
Some(spec::NamedSpec {
Expand All @@ -771,24 +791,15 @@ impl FlowBuilder {
&& foreach_spec.op_scope.name == op_scope.name => {}

_ => {
parent_reactive_ops.push(spec::NamedSpec {
name: format!(".foreach.{}", next_generated_op_id),
spec: spec::ReactiveOpSpec::ForEach(spec::ForEachOpSpec {
field_path: field_path.clone(),
op_scope: spec::ReactiveOpScope {
name: op_scope.name.clone(),
ops: vec![],
},
}),
});
*next_generated_op_id += 1;
api_bail!("already out of op scope `{}`", op_scope.name);
}
}
match &mut parent_reactive_ops.last_mut().unwrap().spec {
spec::ReactiveOpSpec::ForEach(foreach_spec) => &mut foreach_spec.op_scope.ops,
_ => unreachable!(),
}
}
}
};
Ok(result)
}
}
1 change: 1 addition & 0 deletions src/builder/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub struct AnalyzedForEachOp {
pub name: String,
pub local_field_ref: AnalyzedLocalFieldReference,
pub op_scope: AnalyzedOpScope,
pub concurrency_controller: concur_control::ConcurrencyController,
}

pub struct AnalyzedCollectOp {
Expand Down
Loading