Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 32 additions & 38 deletions docs/docs/core/flow_def.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -151,25 +151,6 @@ If nothing changed during the last refresh cycle, only list operations will be p

:::

#### Concurrency control

You can pass the following arguments to `add_source()` to control the concurrency of the source operation:

* `max_inflight_rows`: the maximum number of concurrent inflight requests for the source operation.
* `max_inflight_bytes`: the maximum number of concurrent inflight bytes for the source operation.

For example:

```py
@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
data_scope["documents"] = flow_builder.add_source(
DemoSourceSpec(...), max_inflight_rows=10, max_inflight_bytes=100*1024*1024)
......
```

The default value can be specified by [`DefaultExecutionOptions`](/docs/core/settings#defaultexecutionoptions) or corresponding [environment variable](/docs/core/settings#list-of-environment-variables).

### Transform

`transform()` method transforms the data slice by a function, which creates another data slice.
Expand All @@ -195,7 +176,7 @@ def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataSco
</TabItem>
</Tabs>

### For each row
### For-each-row

If the data slice has [table type](/docs/core/data_types#table-types), you can call `row()` method to obtain a child scope representing each row, to apply operations on each row.

Expand All @@ -214,24 +195,6 @@ def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataSco
</TabItem>
</Tabs>

#### Concurrency control

You can pass the following arguments to `row()` to control the concurrency of the for-each operation:

* `max_inflight_rows`: the maximum number of concurrent inflight requests for the for-each operation.
* `max_inflight_bytes`: the maximum number of concurrent inflight bytes for the for-each operation.
We only take the number of bytes from this row before this for-each operation into account.

For example:

```python
@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
...
with data_scope["table1"].row(max_inflight_rows=10, max_inflight_bytes=10*1024*1024) as table1_row:
# Children operations
table1_row["field2"] = table1_row["field1"].transform(DemoFunctionSpec(...))
```

### Get a sub field

Expand Down Expand Up @@ -380,6 +343,37 @@ doc_embeddings.export(

It will use `Staging__doc_embeddings` as the collection name if the current app namespace is `Staging`, and use `doc_embeddings` if the app namespace is empty.

### Control Processing Concurrency

You can control the concurrency of the processing by setting the following options:

* `max_inflight_rows`: the maximum number of concurrent inflight requests for the processing.
* `max_inflight_bytes`: the maximum number of concurrent inflight bytes for the processing.

These options can be passed in to the following APIs:

* [`FlowBuilder.add_source()`](#import-from-source): The options above control the processing concurrency of multiple rows from a source. New rows will not be loaded in memory if it'll be over the limit.

The default value can be specified by [`DefaultExecutionOptions`](/docs/core/settings#defaultexecutionoptions) or corresponding [environment variables](/docs/core/settings#list-of-environment-variables).

* [`DataSlice.row()`](#for-each-row): The options above provides a finer-grained control, to limit the processing concurrency of multiple rows within a table at any level.

`max_inflight_bytes` only counts the number of bytes already existing in the current row before any further processing.

For example:

```py
@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
data_scope["documents"] = flow_builder.add_source(
DemoSourceSpec(...), max_inflight_rows=10, max_inflight_bytes=100*1024*1024)

with data_scope["documents"].row() as doc:
doc["chunks"] = doc["content"].transform(SplitRecursively(...))
with doc["chunks"].row(max_inflight_rows=100) as chunk:
......
```

### Target Declarations

Most time a target is created by calling `export()` method on a collector, and this `export()` call comes with configurations needed for the target, e.g. options for storage indexes.
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/core/settings.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ If you use the Postgres database hosted by [Supabase](https://supabase.com/), pl
* `source_max_inflight_rows` (type: `int`, optional): The maximum number of concurrent inflight requests for source operations.
* `source_max_inflight_bytes` (type: `int`, optional): The maximum number of concurrent inflight bytes for source operations.

The options provide default values, and can be overridden by arguments passed to `FlowBuilder.add_source()` on per-source basis ([details](/docs/core/flow_def#concurrency-control)).
The options provide default values, and can be overridden by arguments passed to `FlowBuilder.add_source()` on per-source basis ([details](/docs/core/flow_def#control-processing-concurrency)).

## List of Environment Variables

Expand Down