-
Notifications
You must be signed in to change notification settings - Fork 218
Ray Pool Executor #1415
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
lbliii
merged 4 commits into
NVIDIA-NeMo:main
from
lbliii:llane/ray-actor-pool-executor
Jan 26, 2026
Merged
Ray Pool Executor #1415
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -53,7 +53,7 @@ results = pipeline.run(executor) | |
|
|
||
| ### `XennaExecutor` (recommended) | ||
|
|
||
| `XennaExecutor` is the production-ready executor that uses Cosmos-Xenna, a Ray-based execution engine optimized for distributed data processing. Xenna provides native streaming support, automatic resource scaling, and built-in fault tolerance. It's the recommended choice for most production workloads, especially for video and multimodal pipelines. | ||
| `XennaExecutor` uses Cosmos-Xenna, a Ray-based execution engine optimized for distributed data processing. Xenna provides native streaming support, automatic resource scaling, and built-in fault tolerance. This executor is the recommended choice for most workloads, especially for video and multimodal pipelines. | ||
|
|
||
| **Key Features**: | ||
| - **Streaming execution**: Process data incrementally as it arrives, reducing memory requirements | ||
|
|
@@ -108,75 +108,69 @@ results = pipeline.run(executor) | |
|
|
||
| For more details, refer to the official [NVIDIA Cosmos-Xenna project](https://github.com/nvidia-cosmos/cosmos-xenna/tree/main). | ||
|
|
||
| ### `RayDataExecutor` | ||
| ### `RayActorPoolExecutor` | ||
|
|
||
| `RayDataExecutor` uses Ray Data, a scalable data processing library built on Ray Core. Ray Data provides a familiar DataFrame-like API for distributed data transformations. This executor is experimental and best suited for large-scale batch processing tasks that benefit from Ray Data's optimized data loading and transformation pipelines. | ||
| `RayActorPoolExecutor` uses Ray's ActorPool for efficient distributed processing with fine-grained resource management. This executor creates pools of Ray actors per stage, enabling better load balancing and fault tolerance through Ray's native mechanisms. Deduplication workflows automatically use this executor for GPU-accelerated stages. | ||
|
|
||
| **Key Features**: | ||
| - **Ray Data API**: Leverages Ray Data's optimized data processing primitives | ||
| - **Scalable transformations**: Efficient map-batch operations across distributed workers | ||
| - **Experimental status**: API and performance characteristics may change | ||
| - **ActorPool-based execution**: Creates dedicated actor pools per stage for optimal resource utilization | ||
| - **Load balancing**: Uses `map_unordered` for efficient work distribution across actors | ||
| - **RAFT support**: Native integration with [RAFT](https://github.com/rapidsai/raft) (RAPIDS Analytics Framework Toolbox) for GPU-accelerated clustering and nearest-neighbor operations | ||
| - **Head node exclusion**: Optional `ignore_head_node` parameter to reserve the Ray cluster's [head node](https://docs.ray.io/en/latest/cluster/key-concepts.html#head-node) for coordination tasks only | ||
|
|
||
| :::{dropdown} Example: Fuzzy Deduplication | ||
| :icon: code-square | ||
|
|
||
| ```python | ||
| from nemo_curator.backends.experimental.ray_data import RayDataExecutor | ||
| from nemo_curator.stages.deduplication.fuzzy.workflow import FuzzyDeduplicationWorkflow | ||
|
|
||
| workflow = FuzzyDeduplicationWorkflow( | ||
| input_path="/data/documents", | ||
| cache_path="/data/cache", | ||
| output_path="/data/output", | ||
| text_field="text", | ||
| perform_removal=True, | ||
| num_bands=20, | ||
| minhashes_per_band=13, | ||
| ) | ||
|
|
||
| executor = RayDataExecutor() | ||
| results = pipeline.run(executor) | ||
| # The workflow automatically uses RayActorPoolExecutor for GPU-accelerated stages | ||
| results = workflow.run() | ||
| ``` | ||
|
|
||
| :::{note}`RayDataExecutor` currently has limited configuration options. For more control over execution, consider using `XennaExecutor` or `RayActorPoolExecutor`. | ||
| For more details, refer to {ref}`Text Deduplication <text-process-data-dedup>`. | ||
| ::: | ||
|
|
||
| ### `RayActorPoolExecutor` | ||
| ### `RayDataExecutor` | ||
|
|
||
| Executor using Ray Actor pools for custom distributed processing patterns such as deduplication. | ||
| `RayDataExecutor` uses Ray Data, a scalable data processing library built on Ray Core. Ray Data provides a familiar DataFrame-like API for distributed data transformations. This executor is best suited for large-scale text processing tasks that benefit from Ray Data's optimized data loading and transformation pipelines. | ||
|
|
||
| **Key Features**: | ||
| - **Ray Data API**: Leverages Ray Data's optimized data processing primitives | ||
| - **Scalable transformations**: Efficient map-batch operations across distributed workers | ||
|
|
||
| ```python | ||
| from nemo_curator.backends.experimental.ray_actor_pool import RayActorPoolExecutor | ||
| from nemo_curator.backends.experimental.ray_data import RayDataExecutor | ||
|
|
||
| executor = RayActorPoolExecutor() | ||
| executor = RayDataExecutor() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can mention |
||
| results = pipeline.run(executor) | ||
| ``` | ||
|
|
||
| ## Ray Executors in Practice | ||
|
|
||
| Ray-based executors provide enhanced scalability and performance for large-scale data processing tasks. They're beneficial for: | ||
| Ray-based executors provide enhanced scalability and performance for large-scale data processing tasks. These executors are beneficial for: | ||
|
|
||
| - **Large-scale classification tasks**: Distributed inference across multi-GPU setups | ||
| - **Deduplication workflows**: Parallel processing of document similarity computations | ||
| - **Resource-intensive stages**: Automatic scaling based on computational demands | ||
|
|
||
| ### When to Use Ray Executors | ||
|
|
||
| Consider Ray executors when: | ||
|
|
||
| - Processing datasets that exceed single-machine capacity | ||
| - Running GPU-intensive stages (classifiers, embedding models, etc.) | ||
| - Needing automatic fault tolerance and recovery | ||
| - Scaling across multi-node clusters | ||
|
|
||
| ### Ray vs. Xenna Executors | ||
|
|
||
| | Feature | XennaExecutor | Ray Executors | | ||
| |---------|---------------|---------------| | ||
| | **Maturity** | Production-ready | Experimental | | ||
| | **Streaming** | Native support | Limited | | ||
| | **Resource Management** | Optimized for video/multimodal | General-purpose | | ||
| | **Fault Tolerance** | Built-in | Ray-native | | ||
| | **Scaling** | Auto-scaling | Manual configuration | | ||
|
|
||
| **Recommendation**: Use `XennaExecutor` for production workloads and Ray executors for experimental large-scale processing. | ||
|
|
||
| :::{note} | ||
| Ray executors emit an experimental warning as the API and performance characteristics may change. | ||
| ::: | ||
|
|
||
| ## Choosing a Backend | ||
|
|
||
| Both options can deliver strong performance; choose based on API fit and maturity: | ||
| All executors can deliver strong performance; choose based on your workload requirements: | ||
|
|
||
| - **`XennaExecutor`**: Default for most workloads due to maturity and extensive real-world usage (including video pipelines); supports streaming and batch execution with auto-scaling. | ||
| - **Ray Executors (experimental)**: Use Ray Data API for scalable data processing; the interface is still experimental and may change. | ||
| - **`RayActorPoolExecutor`**: Automatically used for deduplication workflows; provides GPU-accelerated processing with RAFT integration. | ||
| - **`RayDataExecutor`**: Best for batch data transformations using Ray Data's DataFrame-like API. | ||
|
|
||
| ## Minimal End-to-End example | ||
|
|
||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing import example for
RayActorPoolExecutorUnlike
XennaExecutorandRayDataExecutor, there's no code example showing how to importRayActorPoolExecutor. Based on the codebase, the correct import is:Consider adding an import example here for consistency and to help users.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!