diff --git a/docs/docs/tutorials/control_flow.mdx b/docs/docs/tutorials/control_flow.mdx new file mode 100644 index 000000000..896996bbd --- /dev/null +++ b/docs/docs/tutorials/control_flow.mdx @@ -0,0 +1,156 @@ +--- +title: 'Control Processing Concurrency Best Practices' +description: "Learn how to tune the concurrency control in CocoIndex to optimize data processing performance, prevent system overload, and ensure stable, efficient pipelines at scale." +image: /img/tutorials/control-flow/cover.png +--- + +![Control Processing Concurrency Tuning Guide](/img/tutorials/control-flow/cover.png) + +CocoIndex’s parallelism model boosts speed by processing multiple data items at once, but **more parallelism isn’t always better**. Left unconstrained, excessive concurrency can strain—or even destabilize—your systems. That’s why CocoIndex includes **built-in concurrency control mechanisms** that strike the right balance between **raw performance** and **system stability**, even at massive scale. + +Processing too many items simultaneously can cause: + +- **Memory exhaustion** – large datasets loaded at once consume massive amounts of RAM. +- **Resource contention** – CPU, disk I/O, and network bandwidth get overwhelmed by competing operations. +- **System instability** – timeouts, degraded performance, or outright crashes. + +Unlike generic concurrency features, CocoIndex lets you: + +- Constrain both data volume (rows) and memory usage (bytes). +- Set limits at multiple layers: global, per source, and per-row iteration. +- Combine controls: *all* specified constraints must be satisfied before processing proceeds. + +This layered approach ensures that resource-heavy sources don’t overwhelm the system, and nested tasks (such as splitting documents into chunks) remain predictable and safe. + +![flow-control](/img/tutorials/control-flow/flow-control.png) + +You can review the full documentation [here](https://cocoindex.io/docs/core/flow_def#control-processing-concurrency). CocoIndex is powering users process at the scale of millions in production, :star: [star us](https://github.com/cocoindex-io/cocoindex) if you like it! + + +## Concurrency Options + +CocoIndex provides two primary settings: + +| Option | Purpose | Unit | +| --- | --- | --- | +| **`max_inflight_rows`** | Maximum number of rows processed concurrently. | rows | +| **`max_inflight_bytes`** | Maximum memory footprint of concurrently processed data (before transformations). | bytes | + +When a limit is reached, CocoIndex **pauses new processing** until some existing work completes. This keeps throughput high without pushing your system past its limits. + +:::note +For simplicity, `max_inflight_bytes` only measures the size of data already in memory before any transformations—it does **not** include temporary memory used during processing steps. +::: + +## Where to Apply Concurrency Controls + + +### 1. Source Level + +Controls how many rows from a data source are processed simultaneously. This prevents overwhelming your system when ingesting large datasets. + +Source level control happens at two different granularities + +- Global, in which all sources across all indexing flows share the same budget. +- Per-source, in which each source has its own budget. + +Both **global** and **per-source** limits must pass before a new row is processed—providing two layers of safety. + + +#### Global Concurrency: One Setting to Shield All Flows + +![global-level concurrency](/img/tutorials/control-flow/global-level.png) + +Global limits ensure your system never overshoots safe operating thresholds, even if individual flows attempt higher concurrency. + +Apply system-wide protections either via environment variables or programmatic control: + +The easiest way is to control it via environment variables: + +```sh +export COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS=256 +export COCOINDEX_SOURCE_MAX_INFLIGHT_BYTES=1048576 +``` +Programmatically, configure it when calling `cocoindex.init()`, which will take precedence over the environment variable: + +```python +from cocoindex import GlobalExecutionOptions + +cocoindex.init( + cocoindex.Settings( + ..., + global_execution_options = GlobalExecutionOptions( + source_max_inflight_rows=256, + source_max_inflight_bytes=1_048_576 + ) + ) +) +``` + +Currently, CocoIndex uses 1024 as the default value of global max inflight rows, if you don't explicitly set it. + +#### Per-Source Concurrency: Granular Customization + +![per-source concurrency](/img/tutorials/control-flow/per-source.png) + +Set different limits for each source according to workload and data characteristics: + +```python +@cocoindex.flow_def(name="DemoFlow") +def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): + data_scope["documents"] = flow_builder.add_source( + DemoSourceSpec(...), + max_inflight_rows=10, + max_inflight_bytes=100*1024*1024 # 100 MB + ) +``` + + +### 2. Nested Iteration Level Concurrency: Deep Structural Control +![nested-iteration concurrency](/img/tutorials/control-flow/nested-level.png) + +When processing nested rows, such as processing each chunk of each document, you can configure the maximum concurrent rows and/or bytes: + +```python +with data_scope["documents"].row() as doc: + doc["chunks"] = doc["content"].transform(SplitRecursively(...)) + with doc["chunks"].row(max_inflight_rows=100, max_inflight_bytes=100*1000*1000): + # Process up to 100 chunks in parallel per document + ... +``` + +### Summary Table: Concurrency Configuration in CocoIndex + +| Level | Configuration Path | Applies To | +| --- | --- | --- | +| Global | Environment variables, or pass `GlobalExecutionOptions` to `cocoindex.init()` | All sources, all flows, added together | +| Per-Source | Arguments to `FlowBuilder.add_source()` | Specific source/flow | +| Row Iteration | Arguments to DataSlice.`row(max_inflight_rows=...)` | Nested iterations | + + +## Best Practices + +In actual incremental pipelines, the processing bottleneck is usually at a few heavy operations, such as running inference using an AI model locally or via a remote API. It's common to keep more data in memory even if it cannot be processed immediately—in this way, once the busy backend becomes available, new workloads can be taken on right away to keep the backends busy. However, we need a reasonable bound on this to prevent memory exhaustion and similar issues. That's where concurrency control comes in. + +![tuning-guide](/img/tutorials/control-flow/tuning-guide.png) + +### Most Situations - Default is Good Enough +For most cases, the default global source max rows limit (`COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS`: 1024) already fits the situation described above: loading more than what heavy operations can consume, but still within a reasonable bound. You don't need to do anything. + +### When and What to Tune +Decrease the limit if it's not stable enough (e.g. memory overuse, timeout observed on certain operations), or increase the limit if it's very stable but you want it to go faster. What to tune? + +- Most cases, tuning the global source row limit (`COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS`) will just work. + +- Only start to touch more specific knobs when deemed necessary - usually only for some complex and unbalanced situations. + - On highly unbalanced row (file) size, set the max bytes limit to prevent a small number of abnormally large inputs from overloading the system. e.g., when the distribution of your input data size follows a long-tail distribution rather than a normal distribution). + - On highly unbalanced complexity across sources, set per-source concurrency. This happens when you want to run multiple flows within the same process, or have multiple sources within the same flow, and they vary in processing complexity (e.g., one source goes through a very heavy and slow model, and another only does simple data movement). + - On high fanout in nested iterations and unbalanced nested rows, set concurrency control options on nested iterations. For example, you have a high number of nested rows to process, and the specific number varies significantly. + +This concurrency control framework gives you **safe, scalable, and customizable flow performance**. You gain flexibility (configure per-flow), control (set global limits), and the confidence to scale cocoindex flows smoothly across diverse workloads. + +## Support us + +We’re constantly improving our runtime. Please ⭐ star [CocoIndex on GitHub](https://github.com/cocoindex-io/cocoindex) and share it with others. + +Need help crafting a more detailed code snippet, or insight into using byte-based or default concurrency settings? Just let me know! diff --git a/docs/static/img/examples/pdf_elements/cover.png b/docs/static/img/examples/pdf_elements/cover.png index 2614de60b..02cc6b32c 100644 Binary files a/docs/static/img/examples/pdf_elements/cover.png and b/docs/static/img/examples/pdf_elements/cover.png differ diff --git a/docs/static/img/tutorials/control-flow/cover.png b/docs/static/img/tutorials/control-flow/cover.png new file mode 100644 index 000000000..596dfedc4 Binary files /dev/null and b/docs/static/img/tutorials/control-flow/cover.png differ diff --git a/docs/static/img/tutorials/control-flow/flow-control.png b/docs/static/img/tutorials/control-flow/flow-control.png new file mode 100644 index 000000000..f64e265a9 Binary files /dev/null and b/docs/static/img/tutorials/control-flow/flow-control.png differ diff --git a/docs/static/img/tutorials/control-flow/global-level.png b/docs/static/img/tutorials/control-flow/global-level.png new file mode 100644 index 000000000..bbca47735 Binary files /dev/null and b/docs/static/img/tutorials/control-flow/global-level.png differ diff --git a/docs/static/img/tutorials/control-flow/nested-level.png b/docs/static/img/tutorials/control-flow/nested-level.png new file mode 100644 index 000000000..74090a9aa Binary files /dev/null and b/docs/static/img/tutorials/control-flow/nested-level.png differ diff --git a/docs/static/img/tutorials/control-flow/per-source.png b/docs/static/img/tutorials/control-flow/per-source.png new file mode 100644 index 000000000..42abed58a Binary files /dev/null and b/docs/static/img/tutorials/control-flow/per-source.png differ diff --git a/docs/static/img/tutorials/control-flow/tuning-guide.png b/docs/static/img/tutorials/control-flow/tuning-guide.png new file mode 100644 index 000000000..2ea5a0145 Binary files /dev/null and b/docs/static/img/tutorials/control-flow/tuning-guide.png differ