From e09638ea93c3825c84d41ed0454ce21e5c3eadf3 Mon Sep 17 00:00:00 2001 From: Jiangzhou He Date: Tue, 8 Jul 2025 10:17:33 -0700 Subject: [PATCH] docs(flow-control): revise the doc for flow control --- docs/docs/core/flow_def.mdx | 70 +++++++++++++++++-------------------- docs/docs/core/settings.mdx | 2 +- 2 files changed, 33 insertions(+), 39 deletions(-) diff --git a/docs/docs/core/flow_def.mdx b/docs/docs/core/flow_def.mdx index 1bcc4c24..79a27dac 100644 --- a/docs/docs/core/flow_def.mdx +++ b/docs/docs/core/flow_def.mdx @@ -151,25 +151,6 @@ If nothing changed during the last refresh cycle, only list operations will be p ::: -#### Concurrency control - -You can pass the following arguments to `add_source()` to control the concurrency of the source operation: - -* `max_inflight_rows`: the maximum number of concurrent inflight requests for the source operation. -* `max_inflight_bytes`: the maximum number of concurrent inflight bytes for the source operation. - -For example: - -```py -@cocoindex.flow_def(name="DemoFlow") -def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): - data_scope["documents"] = flow_builder.add_source( - DemoSourceSpec(...), max_inflight_rows=10, max_inflight_bytes=100*1024*1024) - ...... -``` - -The default value can be specified by [`DefaultExecutionOptions`](/docs/core/settings#defaultexecutionoptions) or corresponding [environment variable](/docs/core/settings#list-of-environment-variables). - ### Transform `transform()` method transforms the data slice by a function, which creates another data slice. @@ -195,7 +176,7 @@ def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataSco -### For each row +### For-each-row If the data slice has [table type](/docs/core/data_types#table-types), you can call `row()` method to obtain a child scope representing each row, to apply operations on each row. @@ -214,24 +195,6 @@ def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataSco -#### Concurrency control - -You can pass the following arguments to `row()` to control the concurrency of the for-each operation: - -* `max_inflight_rows`: the maximum number of concurrent inflight requests for the for-each operation. -* `max_inflight_bytes`: the maximum number of concurrent inflight bytes for the for-each operation. - We only take the number of bytes from this row before this for-each operation into account. - -For example: - -```python -@cocoindex.flow_def(name="DemoFlow") -def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): - ... - with data_scope["table1"].row(max_inflight_rows=10, max_inflight_bytes=10*1024*1024) as table1_row: - # Children operations - table1_row["field2"] = table1_row["field1"].transform(DemoFunctionSpec(...)) -``` ### Get a sub field @@ -380,6 +343,37 @@ doc_embeddings.export( It will use `Staging__doc_embeddings` as the collection name if the current app namespace is `Staging`, and use `doc_embeddings` if the app namespace is empty. +### Control Processing Concurrency + +You can control the concurrency of the processing by setting the following options: + +* `max_inflight_rows`: the maximum number of concurrent inflight requests for the processing. +* `max_inflight_bytes`: the maximum number of concurrent inflight bytes for the processing. + +These options can be passed in to the following APIs: + +* [`FlowBuilder.add_source()`](#import-from-source): The options above control the processing concurrency of multiple rows from a source. New rows will not be loaded in memory if it'll be over the limit. + + The default value can be specified by [`DefaultExecutionOptions`](/docs/core/settings#defaultexecutionoptions) or corresponding [environment variables](/docs/core/settings#list-of-environment-variables). + +* [`DataSlice.row()`](#for-each-row): The options above provides a finer-grained control, to limit the processing concurrency of multiple rows within a table at any level. + +`max_inflight_bytes` only counts the number of bytes already existing in the current row before any further processing. + +For example: + +```py +@cocoindex.flow_def(name="DemoFlow") +def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): + data_scope["documents"] = flow_builder.add_source( + DemoSourceSpec(...), max_inflight_rows=10, max_inflight_bytes=100*1024*1024) + + with data_scope["documents"].row() as doc: + doc["chunks"] = doc["content"].transform(SplitRecursively(...)) + with doc["chunks"].row(max_inflight_rows=100) as chunk: + ...... +``` + ### Target Declarations Most time a target is created by calling `export()` method on a collector, and this `export()` call comes with configurations needed for the target, e.g. options for storage indexes. diff --git a/docs/docs/core/settings.mdx b/docs/docs/core/settings.mdx index bcee1105..83c9a586 100644 --- a/docs/docs/core/settings.mdx +++ b/docs/docs/core/settings.mdx @@ -112,7 +112,7 @@ If you use the Postgres database hosted by [Supabase](https://supabase.com/), pl * `source_max_inflight_rows` (type: `int`, optional): The maximum number of concurrent inflight requests for source operations. * `source_max_inflight_bytes` (type: `int`, optional): The maximum number of concurrent inflight bytes for source operations. -The options provide default values, and can be overridden by arguments passed to `FlowBuilder.add_source()` on per-source basis ([details](/docs/core/flow_def#concurrency-control)). +The options provide default values, and can be overridden by arguments passed to `FlowBuilder.add_source()` on per-source basis ([details](/docs/core/flow_def#control-processing-concurrency)). ## List of Environment Variables