Skip to content

Add ray data for image#1610

Merged
oyilmaz-nvidia merged 6 commits intoNVIDIA-NeMo:mainfrom
oyilmaz-nvidia:onur/ray-data-for-image
Mar 23, 2026
Merged

Add ray data for image#1610
oyilmaz-nvidia merged 6 commits intoNVIDIA-NeMo:mainfrom
oyilmaz-nvidia:onur/ray-data-for-image

Conversation

@oyilmaz-nvidia
Copy link
Copy Markdown
Contributor

@oyilmaz-nvidia oyilmaz-nvidia commented Mar 16, 2026

Description

This PR adds a few changes to test and benchmark Ray Data for image workflows.

Fanouts

Why IS_FANOUT_STAGE on ImageReaderStage:

ImageReaderStage.process() returns list[ImageBatch] — for each .tar file it reads, DALI may produce multiple batches. In Ray Data, all those batches from one tar end up in the same block after map_batches. Without IS_FANOUT_STAGE, all of them get sent to the same downstream embedding actor, killing parallelism. The flag triggers repartition(target_num_rows_per_block=1) in the adapter, splitting them into individual blocks so each ImageBatch can be picked up by any available ImageEmbeddingStage actor independently.

It's the same reason FilePartitioningStage has it — it also returns a list[FileGroupTask].

Should You Add More Fanouts?

For the standard pipeline, no.

Every other image stage returns a single task — confirmed:

Stage process() return type Fanout needed?
ImageReaderStage list[ImageBatch] ✅ added
ImageEmbeddingStage ImageBatch no
ImageAestheticFilterStage ImageBatch no
ImageNSFWFilterStage ImageBatch no
ImageWriterStage FileGroupTask no
ConvertImageBatchToDocumentBatch DocumentBatch no
DeduplicationRemovalStage ImageBatch no

ImageReaderStage is the only image stage that fans out (1 tar → N batches), so it's the only one that needs the flag.

Benchmarking:

The runtimes of Xenna and Ray Data for image curation benchmarking are almost the same.

Checklist

  • I am familiar with the Contributing Guide.
  • New or Existing tests cover these changes.
  • The documentation is up to date with these changes.

Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot bot commented Mar 16, 2026

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
@oyilmaz-nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test ae3bf46

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Mar 17, 2026

Greptile Summary

This PR enables Ray Data support for the image curation pipeline by adding a ray_stage_spec() override on ImageReaderStage that signals the Ray Data adapter to repartition after each tar is processed, restoring inter-stage parallelism that would otherwise collapse when multiple ImageBatch objects from one tar land in the same Ray block. A matching nightly benchmark entry (image_curation_raydata) is added alongside the existing Xenna benchmark to track Ray Data performance parity.

Key changes:

  • ImageReaderStage.ray_stage_spec() returns {IS_FANOUT_STAGE: True}, mirroring the pattern already established in FilePartitioningStage, so the Ray Data adapter calls repartition(target_num_rows_per_block=1) after this stage.
  • benchmarking/nightly-benchmark.yaml: old image_curation entry renamed to image_curation_xenna; new image_curation_raydata entry added with --executor=ray_data, identical resource config (64 CPUs, 4 GPUs), and the same correctness thresholds (exact_value: 3800, min_value: 3.0 images/sec).
  • Minor: the multi-line id_prefix ternary in _read_tars_with_dali was condensed to one line (this was noted in a prior review thread).

Confidence Score: 4/5

  • Safe to merge with one minor gap: the new ray_stage_spec() method has no unit test, leaving the fanout contract unverified by the test suite.
  • The IS_FANOUT_STAGE pattern is well-established in the codebase (FilePartitioningStage uses it identically) and the adapter's consumption of the flag is already tested there. The benchmark YAML changes are additive and low-risk. The only gap is that the test file for ImageReaderStage doesn't assert the new method's return value, despite the checklist claiming test coverage.
  • tests/stages/image/io/test_image_reader.py — missing assertion for the new ray_stage_spec() fanout flag.

Important Files Changed

Filename Overview
nemo_curator/stages/image/io/image_reader.py Adds ray_stage_spec() override returning IS_FANOUT_STAGE: True to enable proper parallelism when running under the Ray Data executor; also reformats the id_prefix ternary to a single line. The fanout logic follows the existing pattern in FilePartitioningStage. No test covers the new ray_stage_spec() return value.
benchmarking/nightly-benchmark.yaml Renames the existing image_curation entry to image_curation_xenna and adds an equivalent image_curation_raydata entry that passes --executor=ray_data. Both entries share identical resource requests, thresholds, and correctness requirements (min_value: 3.0, exact_value: 3800).

Sequence Diagram

sequenceDiagram
    participant RDE as RayDataExecutor
    participant Adapter as RayDataStageAdapter
    participant IRS as ImageReaderStage
    participant DS as Ray Dataset

    RDE->>Adapter: process_dataset(dataset)
    Adapter->>IRS: ray_stage_spec()
    IRS-->>Adapter: {IS_FANOUT_STAGE: True}
    Adapter->>DS: map_batches(ImageReaderStage.process)<br/>(1 FileGroupTask → N ImageBatches)
    DS-->>Adapter: dataset with N batches<br/>in same block
    Adapter->>DS: repartition(target_num_rows_per_block=1)
    DS-->>Adapter: N separate blocks (one per ImageBatch)
    Adapter->>RDE: return repartitioned dataset
    Note over DS: Each block dispatched<br/>independently to any<br/>ImageEmbeddingStage actor
Loading

Comments Outside Diff (1)

  1. tests/stages/image/io/test_image_reader.py, line 124-130 (link)

    P2 Missing test for ray_stage_spec()

    The new ray_stage_spec() method is the core behavioral change in this PR — it marks ImageReaderStage as a fanout stage so the Ray Data adapter triggers repartition() after it. The existing test suite covers inputs(), outputs(), resources, and process(), but there is no test asserting that ray_stage_spec() returns {RayStageSpecKeys.IS_FANOUT_STAGE: True}.

    Since the PR checklist states "New or Existing tests cover these changes," a simple assertion like the following would satisfy that requirement:

    def test_ray_stage_spec_is_fanout() -> None:
        from nemo_curator.backends.experimental.utils import RayStageSpecKeys
        from nemo_curator.stages.image.io.image_reader import ImageReaderStage
    
        with patch("torch.cuda.is_available", return_value=False):
            stage = ImageReaderStage(dali_batch_size=2, verbose=False)
    
        spec = stage.ray_stage_spec()
        assert spec.get(RayStageSpecKeys.IS_FANOUT_STAGE) is True

    Without this, a future refactor that accidentally drops the flag (or the method) would go undetected.

Last reviewed commit: "Update nightly-bench..."

if len(tar_paths) == 1
else f"group_{tar_paths[0].stem}_x{len(tar_paths)}"
)
id_prefix = tar_paths[0].stem if len(tar_paths) == 1 else f"group_{tar_paths[0].stem}_x{len(tar_paths)}"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Long line reduces readability

The original multi-line ternary was reformatted into a single ~105-character line. This makes the line harder to scan and likely exceeds the project's line-length limit. The original multi-line form was clearer — consider reverting to it:

Suggested change
id_prefix = tar_paths[0].stem if len(tar_paths) == 1 else f"group_{tar_paths[0].stem}_x{len(tar_paths)}"
id_prefix = (
tar_paths[0].stem
if len(tar_paths) == 1
else f"group_{tar_paths[0].stem}_x{len(tar_paths)}"
)

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Comment thread benchmarking/nightly-benchmark.yaml Outdated
Comment on lines +580 to +581
- metric: throughput_images_per_sec
min_value: 2.5
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Throughput threshold inconsistency with PR description

The PR description states "The runtimes of Xenna and Ray Data for image curation benchmarking are almost the same," yet the min_value for throughput_images_per_sec is set 20% lower for Ray Data (2.5) than for Xenna (3.0). If the two executors truly perform comparably, consider aligning the floor values — or document in a comment why a lower bound is intentional (e.g., to account for Ray Data cold-start overhead in CI).

@oyilmaz-nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test 1c2df7e

Comment on lines +56 to +61
def ray_stage_spec(self) -> dict[str, Any]:
"""Ray stage specification for this stage."""
return {
RayStageSpecKeys.IS_FANOUT_STAGE: True,
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense, thanks for adding it .

Copy link
Copy Markdown
Contributor

@suiyoubi suiyoubi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @oyilmaz-nvidia , does the perf for raydata comparable to using Xenna ? I see the min-value as set differently.

Signed-off-by: Onur Yilmaz <35306097+oyilmaz-nvidia@users.noreply.github.com>
@oyilmaz-nvidia
Copy link
Copy Markdown
Contributor Author

@suiyoubi Change the min val to default and the run time is similar to xenna.

@oyilmaz-nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test 590d503

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants