From 3e3b44408b0990c7ee806d15a351b83185508544 Mon Sep 17 00:00:00 2001 From: Onur Yilmaz Date: Thu, 26 Feb 2026 00:57:38 -0500 Subject: [PATCH 1/6] Update ray to 2.54 Signed-off-by: Onur Yilmaz --- .github/workflows/cicd-main.yml | 3 +- pyproject.toml | 3 +- tests/conftest.py | 42 ++++++++++++++++++++++++ tests/stages/text/embedders/test_vllm.py | 6 ++-- tests/utils/test_client_utils.py | 9 +++-- uv.lock | 29 ++++++++-------- 6 files changed, 69 insertions(+), 23 deletions(-) diff --git a/.github/workflows/cicd-main.yml b/.github/workflows/cicd-main.yml index 968c28511a..4edc4baee9 100644 --- a/.github/workflows/cicd-main.yml +++ b/.github/workflows/cicd-main.yml @@ -93,9 +93,10 @@ jobs: timeout-minutes: 40 run: | uv sync --link-mode copy --locked --extra audio_cpu --extra sdg_cpu --extra text_cpu --extra video_cpu --group test + source .venv/bin/activate FOLDER="${{ matrix.folder }}" FOLDER="${FOLDER/stages-/stages/}" - uv run coverage run --branch --source=nemo_curator -m pytest -v "tests/$FOLDER" -m "not gpu" + coverage run --branch --source=nemo_curator -m pytest -v "tests/$FOLDER" -m "not gpu" - name: Generate report id: check diff --git a/pyproject.toml b/pyproject.toml index 1a555bed3d..668861e9db 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,7 +63,7 @@ dependencies = [ "openai>=1.0.0", "pandas>=2.1.0", "pyarrow", - "ray[default,data]>=2.50", + "ray[default,data]>=2.54", "torch", "transformers", ] @@ -198,7 +198,6 @@ constraint-dependencies = [ "protobuf>=5.29.6", # Address CVE GHSA-8qvm-5x2c-j2w7 "pyasn1>=0.6.2", # Address CVE GHSA-63vm-454h-vhhq "python-multipart>=0.0.22", # Address CVE GHSA-wp53-j4wj-2cfg - "ray[default,data]>=2.52", # Address CVE GHSA-q279-jhrf-cc6v "starlette>=0.49.1", # Address CVE GHSA-7f5h-v6xp-fcq8 "urllib3>=2.6.3", # Address CVE GHSA-38jv-5279-wg99 "wheel>=0.46.2", # Address CVE GHSA-8rrh-rw8j-w5fx diff --git a/tests/conftest.py b/tests/conftest.py index 2491c0b2d8..72580011c4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -20,8 +20,10 @@ import os import re +import shutil import socket import subprocess +import time from pathlib import Path from typing import Any @@ -167,6 +169,27 @@ def pytest_ignore_collect(collection_path: Path, config: pytest.Config) -> bool: return False +def _wait_for_head_node_alive(ray_address: str, timeout: int = 120) -> None: + """Wait until the head node registers as ALIVE in GCS. + + In Ray 2.54+, `ray status` succeeds as soon as GCS starts, but the head + node may not have registered as ALIVE yet. ray.init() will fail with + "No node info found matching node ids" until that registration completes. + """ + deadline = time.time() + timeout + while time.time() < deadline: + try: + ray.init(address=ray_address) + ray.shutdown() + logger.info("Head node confirmed ALIVE in GCS") + except Exception: # noqa: BLE001, PERF203 + time.sleep(1) + else: + return + msg = f"Head node at {ray_address} did not become ALIVE in GCS within {timeout} seconds" + raise RuntimeError(msg) + + def _build_ray_command(temp_dir: str, num_cpus: int, num_gpus: int, object_store_memory: int) -> tuple[list[str], int]: """Build the Ray start command with the given configuration.""" ray_port = find_free_port() @@ -241,6 +264,8 @@ def shared_ray_cluster(tmp_path_factory: pytest.TempPathFactory, pytestconfig: p logger.info(f"Starting Ray cluster with {num_gpus} GPUs") logger.info(f"Running Ray command: {' '.join(cmd_to_run)}") + os.environ["RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO"] = "0" + # Use explicit path to ray command for security ray_process = subprocess.Popen(cmd_to_run, shell=False) # noqa: S603 logger.info(f"Started Ray process: {ray_process.pid}") @@ -249,6 +274,23 @@ def shared_ray_cluster(tmp_path_factory: pytest.TempPathFactory, pytestconfig: p os.environ["RAY_ADDRESS"] = ray_address logger.info(f"Set RAY_ADDRESS for tests to: {ray_address}") + # Wait for the cluster to be ready before yielding + deadline = time.time() + 120 + while time.time() < deadline: + result = subprocess.run( # noqa: S603 + [shutil.which("ray") or "ray", "status", "--address", ray_address], + capture_output=True, + check=False, + ) + if result.returncode == 0: + break + time.sleep(1) + else: + msg = f"Ray cluster at {ray_address} did not become ready within 60 seconds" + raise RuntimeError(msg) + + _wait_for_head_node_alive(ray_address) + try: yield ray_address finally: diff --git a/tests/stages/text/embedders/test_vllm.py b/tests/stages/text/embedders/test_vllm.py index a4de199a78..51a7de6307 100644 --- a/tests/stages/text/embedders/test_vllm.py +++ b/tests/stages/text/embedders/test_vllm.py @@ -124,8 +124,10 @@ class _FakeLLM: def __init__(self, model: str, **kwargs: Any) -> None: # noqa: ANN401 captured["llm"] = {"model": model, "kwargs": kwargs} - monkeypatch.setattr("nemo_curator.stages.text.embedders.vllm.snapshot_download", _fake_snapshot_download) - monkeypatch.setattr("nemo_curator.stages.text.embedders.vllm.LLM", _FakeLLM) + import nemo_curator.stages.text.embedders.vllm as _vllm_mod + + monkeypatch.setattr(_vllm_mod, "snapshot_download", _fake_snapshot_download) + monkeypatch.setattr(_vllm_mod, "LLM", _FakeLLM) stage.setup_on_node() diff --git a/tests/utils/test_client_utils.py b/tests/utils/test_client_utils.py index 21765f1884..2554bd2ed1 100644 --- a/tests/utils/test_client_utils.py +++ b/tests/utils/test_client_utils.py @@ -329,6 +329,7 @@ def test_is_remote_url_local_file(self) -> None: def test_is_remote_url_with_mock_fsspec(self, monkeypatch: pytest.MonkeyPatch) -> None: """Test is_remote_url with mocked fsspec to test protocol handling.""" + import nemo_curator.utils.client_utils as _client_utils from nemo_curator.utils.client_utils import is_remote_url # Mock the url_to_fs function @@ -349,7 +350,7 @@ def __init__(self, protocol: str) -> None: else: return MockFS(None), "" - monkeypatch.setattr("nemo_curator.utils.client_utils.url_to_fs", mock_url_to_fs) + monkeypatch.setattr(_client_utils, "url_to_fs", mock_url_to_fs) # Test with mocked protocols assert is_remote_url("s3://bucket/key") @@ -360,6 +361,7 @@ def __init__(self, protocol: str) -> None: def test_is_remote_url_multiple_protocols(self, monkeypatch: pytest.MonkeyPatch) -> None: """Test is_remote_url with filesystem that has multiple protocols.""" + import nemo_curator.utils.client_utils as _client_utils from nemo_curator.utils.client_utils import is_remote_url def mock_url_to_fs(_url: str) -> tuple: @@ -370,13 +372,14 @@ def __init__(self) -> None: return MockFS(), "" - monkeypatch.setattr("nemo_curator.utils.client_utils.url_to_fs", mock_url_to_fs) + monkeypatch.setattr(_client_utils, "url_to_fs", mock_url_to_fs) # Should return True since first protocol is "s3" (remote) assert is_remote_url("s3://bucket/key") def test_is_remote_url_no_protocol(self, monkeypatch: pytest.MonkeyPatch) -> None: """Test is_remote_url with filesystem that has no protocol.""" + import nemo_curator.utils.client_utils as _client_utils from nemo_curator.utils.client_utils import is_remote_url def mock_url_to_fs(_url: str) -> tuple: @@ -387,7 +390,7 @@ def __init__(self) -> None: return MockFS(), "" - monkeypatch.setattr("nemo_curator.utils.client_utils.url_to_fs", mock_url_to_fs) + monkeypatch.setattr(_client_utils, "url_to_fs", mock_url_to_fs) # Should return False since protocol is None assert not is_remote_url("unknown://path") diff --git a/uv.lock b/uv.lock index 7a5bcbd391..494aebf540 100644 --- a/uv.lock +++ b/uv.lock @@ -40,7 +40,6 @@ constraints = [ { name = "protobuf", specifier = ">=5.29.6" }, { name = "pyasn1", specifier = ">=0.6.2" }, { name = "python-multipart", specifier = ">=0.0.22" }, - { name = "ray", extras = ["default", "data"], specifier = ">=2.52" }, { name = "starlette", specifier = ">=0.49.1" }, { name = "urllib3", specifier = ">=2.6.3" }, { name = "wheel", specifier = ">=0.46.2" }, @@ -4776,7 +4775,7 @@ requires-dist = [ { name = "pynvvideocodec", marker = "platform_machine == 'x86_64' and sys_platform != 'darwin' and extra == 'video-cuda12'", specifier = "==2.0.2" }, { name = "raft-dask-cu12", marker = "extra == 'deduplication-cuda12'", specifier = "==25.10.*" }, { name = "rapidsmpf-cu12", marker = "extra == 'deduplication-cuda12'", specifier = "==25.10.*" }, - { name = "ray", extras = ["data", "default"], specifier = ">=2.50" }, + { name = "ray", extras = ["data", "default"], specifier = ">=2.54" }, { name = "resiliparse", marker = "extra == 'text-cpu'" }, { name = "s5cmd", marker = "extra == 'text-cpu'" }, { name = "scikit-learn", marker = "extra == 'deduplication-cuda12'", specifier = "<1.8.0" }, @@ -6931,7 +6930,7 @@ wheels = [ [[package]] name = "ray" -version = "2.52.1" +version = "2.54.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "click" }, @@ -6944,18 +6943,18 @@ dependencies = [ { name = "requests" }, ] wheels = [ - { url = "https://files.pythonhosted.org/packages/d0/19/7882c5918d3af848543ad1000b7da22db0f65fa20da8d371272ee24d41ba/ray-2.52.1-cp310-cp310-macosx_12_0_arm64.whl", hash = "sha256:993194a8be70540e0f819862031bbf19a64401fbe6c31b42065fd313ba466d34", size = 69385176, upload-time = "2025-11-28T02:22:03.533Z" }, - { url = "https://files.pythonhosted.org/packages/43/e4/e42cc912a657211eca9eb0befe71ffc4b6a209d561e9eaed246255c05c4d/ray-2.52.1-cp310-cp310-manylinux2014_aarch64.whl", hash = "sha256:65bf461fdfe4ffa667c46f9455f8740b2ad6c1fa471b461d5f5cf6b7baf177b5", size = 71253481, upload-time = "2025-11-28T02:22:09.623Z" }, - { url = "https://files.pythonhosted.org/packages/ee/3e/f180102b73157592ab48a160711771728bbbdc77f6a0510a6a7a2ca18818/ray-2.52.1-cp310-cp310-manylinux2014_x86_64.whl", hash = "sha256:b3f9e61b799fb3cc8fd7077a3d2eb676ddfef7db644f6b6a2b657c5c3214cf19", size = 72083695, upload-time = "2025-11-28T02:22:15.281Z" }, - { url = "https://files.pythonhosted.org/packages/f3/b4/f6109cb80f8c3057fb5361d0c76249856cda0872ef36220d9b7f600f1253/ray-2.52.1-cp310-cp310-win_amd64.whl", hash = "sha256:24694e60cdc7770b90f123cc578cabb9d1a231c1fe673b5da0027b118de45846", size = 27169182, upload-time = "2025-11-28T02:22:19.866Z" }, - { url = "https://files.pythonhosted.org/packages/8c/64/688d72f53f7adf582913a1bba95ab9fc3232a144057aec6b6f62cc1c76b4/ray-2.52.1-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:f59e3b2d1a1466ac0778f2c6fac9ccb5f30107d77e3dddd1d60167248d268474", size = 69389239, upload-time = "2025-11-28T02:22:24.803Z" }, - { url = "https://files.pythonhosted.org/packages/0b/c6/ae42db4bc9efd221643abad28d0fcdeecc31d49728f07eb27d2b1e4fcebc/ray-2.52.1-cp311-cp311-manylinux2014_aarch64.whl", hash = "sha256:2b57ef272a2a0a0dbae6d18d70aa541eab620b4fe3b44d50466d3a533c16f9d9", size = 71373439, upload-time = "2025-11-28T02:22:30.132Z" }, - { url = "https://files.pythonhosted.org/packages/40/5e/b000aa0e8189b37a8f2dfb4f589bb78105e9c451ad75424d4e67f03c5c79/ray-2.52.1-cp311-cp311-manylinux2014_x86_64.whl", hash = "sha256:a5a3c268d45060c50cd029979ecc5f1eaaec040b19fa88dd4fe9e927d19ff13e", size = 72201688, upload-time = "2025-11-28T02:22:35.64Z" }, - { url = "https://files.pythonhosted.org/packages/fc/5f/0b2e7bf4e1e80c83aaba789de81f346b6fd5f014223873e22f94e2e1c5d4/ray-2.52.1-cp311-cp311-win_amd64.whl", hash = "sha256:4e8478544fef69a17d865431c0bebdcfeff7c0f76a306f29b73c3bc3cbb0bdb9", size = 27163246, upload-time = "2025-11-28T02:22:40.926Z" }, - { url = "https://files.pythonhosted.org/packages/5c/c5/d5c3b6e28dee2bb6f9029dfcb950f41c2e682b1bf4cdbbbe42bde66f2ea8/ray-2.52.1-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:6831592fedf0a122016f5dab4b67d85fa3d4db3b21f588d18834b5c031396d1c", size = 69374499, upload-time = "2025-11-28T02:22:45.765Z" }, - { url = "https://files.pythonhosted.org/packages/63/9f/a019b66f1d716cfed89edfa6c597c9bffe4eab559042a8495a9c2b2c82ab/ray-2.52.1-cp312-cp312-manylinux2014_aarch64.whl", hash = "sha256:08eb8f5fd55292ba6bee363a32491136a5e54af54e007f81e0603986fbea41a4", size = 71412116, upload-time = "2025-11-28T02:22:51.568Z" }, - { url = "https://files.pythonhosted.org/packages/d7/a5/eaea6f080953dfe1506c4d7b7e16a46536b6ebc9f39703683e0c94e115e0/ray-2.52.1-cp312-cp312-manylinux2014_x86_64.whl", hash = "sha256:843c0108ad72bb7fc6c23a22e29e6099546a5eaad3ad675c78a146d9080f6ec6", size = 72267230, upload-time = "2025-11-28T02:22:56.877Z" }, - { url = "https://files.pythonhosted.org/packages/eb/69/d6cabdd6f3651f380a0cdf90d97b71ec266d6ba06fd2e649e8c878ab08ce/ray-2.52.1-cp312-cp312-win_amd64.whl", hash = "sha256:8045172ad3fcff62b9dab9a4cd2e0991ad0e27fc814fe625a8d3a120306651d6", size = 27144021, upload-time = "2025-11-28T02:23:01.55Z" }, + { url = "https://files.pythonhosted.org/packages/64/13/b86d791b41f33220335eba18fc4841f1ebddae41e562c6a216846404c88d/ray-2.54.0-cp310-cp310-macosx_12_0_arm64.whl", hash = "sha256:a22937f09ee74a43171df338d84b45ef882c1c05748947ca9d5343a44d4b9379", size = 70097079, upload-time = "2026-02-18T04:04:35.409Z" }, + { url = "https://files.pythonhosted.org/packages/e0/bb/f54980d45ecfd0ceb39b6a966bd64fc0597746af1917d7fe3cbdb9f72752/ray-2.54.0-cp310-cp310-manylinux2014_aarch64.whl", hash = "sha256:1e63e491155695d527513ffe9d33a6aeb3f3cdccb6309adadfd6f8dd7c0300f7", size = 71951024, upload-time = "2026-02-18T04:04:42.817Z" }, + { url = "https://files.pythonhosted.org/packages/b0/b1/8cc4e45a3ce87aabcb70696b448b20840bcbaa5c98bdb4807a2749541fda/ray-2.54.0-cp310-cp310-manylinux2014_x86_64.whl", hash = "sha256:2d140409e4ca06d8d6a06f71d441b53f6edcd930ebe67a6988f652915db81070", size = 72783364, upload-time = "2026-02-18T04:04:48.311Z" }, + { url = "https://files.pythonhosted.org/packages/12/79/7fb2f5698319cd28f0599fc9848a77dd7a64e0d82486c78dd94c6dce5095/ray-2.54.0-cp310-cp310-win_amd64.whl", hash = "sha256:86da6ff60b57394aa47158b2f3fc2616a87492e828983451f04e676b192b49ce", size = 27452281, upload-time = "2026-02-18T04:04:53.252Z" }, + { url = "https://files.pythonhosted.org/packages/08/58/6209b2231947f3c8df09ce1436f1c76c4a11fcafd57c8def852dcbb6d8ef/ray-2.54.0-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:8e39dd56b47a0a1820d5a5a54385bbe54d1d67e1093736d12d8ed4e99d0fa455", size = 70098998, upload-time = "2026-02-18T04:04:58.801Z" }, + { url = "https://files.pythonhosted.org/packages/ac/29/7871f4206e6b00a9bb784c16dad32ccd01e9df5a93545db92de220eb2871/ray-2.54.0-cp311-cp311-manylinux2014_aarch64.whl", hash = "sha256:491ae56ab80d8822c4eaf4d5bb96dcf32a6231d8d7b76eb8034400eb9be1bb18", size = 72066630, upload-time = "2026-02-18T04:05:04.957Z" }, + { url = "https://files.pythonhosted.org/packages/1d/e8/d2c8ebd9cd945abc817b01ad02a29df78cdb86cd07d764587e16977389d0/ray-2.54.0-cp311-cp311-manylinux2014_x86_64.whl", hash = "sha256:928bb09245a3c6f7c3c113ba8eafc69f948da9602d7f33e8251ecdf97c157615", size = 72895723, upload-time = "2026-02-18T04:05:10.686Z" }, + { url = "https://files.pythonhosted.org/packages/7e/96/a5ea3a149a943475cda1d68fdcdb14c86251826c652c232ae853600ad7e7/ray-2.54.0-cp311-cp311-win_amd64.whl", hash = "sha256:1e786330de55b3ba2228e36ec305381a9b86f0b01a8b6072c5811c3bc4dd9a3d", size = 27448371, upload-time = "2026-02-18T04:05:16.34Z" }, + { url = "https://files.pythonhosted.org/packages/0e/16/45eefb51eb1767342a6dbf41af0b432279e422e56160705fcd1098a7ec53/ray-2.54.0-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:cf5c33b4b13850ec24a5bd5f9d9e0a8161f8e586bfd297e52913d170dec447fe", size = 70084880, upload-time = "2026-02-18T04:05:22.007Z" }, + { url = "https://files.pythonhosted.org/packages/60/ad/e07aca3637e9c3ec4857ec4366208099cf8488ece8061a9925ba29b66382/ray-2.54.0-cp312-cp312-manylinux2014_aarch64.whl", hash = "sha256:795ae21d6b764245d3f521bc5833446d58569e7dfde9c5777417eb285d87450f", size = 72107346, upload-time = "2026-02-18T04:05:27.999Z" }, + { url = "https://files.pythonhosted.org/packages/9e/b9/cc5ea8460c3dc602e6b7198277a7c59ba2b8929374ab22efa8df9f3deac8/ray-2.54.0-cp312-cp312-manylinux2014_x86_64.whl", hash = "sha256:a972afd5aa3dda99d0b2f369b5f62e5dd95865ab7d37bf2e0a0e0d2cfbd9b325", size = 72967230, upload-time = "2026-02-18T04:05:33.771Z" }, + { url = "https://files.pythonhosted.org/packages/de/d7/744de3b1bb881701330ddcbb2f6efaccd65915d564ece899a3838f9fb105/ray-2.54.0-cp312-cp312-win_amd64.whl", hash = "sha256:2ee074ede491d0aacfa339c003f5d7a15826e1e2a72ce873234ccbc0446e19b3", size = 27427353, upload-time = "2026-02-18T04:05:38.853Z" }, ] [package.optional-dependencies] From c94294fd67051e689e3c28a7b44efa13e580187c Mon Sep 17 00:00:00 2001 From: Onur Yilmaz Date: Thu, 26 Feb 2026 18:27:17 -0500 Subject: [PATCH 2/6] Use ray client Signed-off-by: Onur Yilmaz --- nemo_curator/core/utils.py | 1 + tests/conftest.py | 88 ++++++-------------------------------- 2 files changed, 14 insertions(+), 75 deletions(-) diff --git a/nemo_curator/core/utils.py b/nemo_curator/core/utils.py index 5fb4024394..560fbe2daa 100644 --- a/nemo_curator/core/utils.py +++ b/nemo_curator/core/utils.py @@ -174,6 +174,7 @@ def init_cluster( # noqa: PLR0913 # We set some env vars for Xenna here. This is only used for Xenna clusters. os.environ["XENNA_RAY_METRICS_PORT"] = str(ray_metrics_port) os.environ["XENNA_RESPECT_CUDA_VISIBLE_DEVICES"] = "1" + os.environ["RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO"] = "0" if stdouterr_capture_file: with open(stdouterr_capture_file, "w") as f: diff --git a/tests/conftest.py b/tests/conftest.py index 72580011c4..5f89912aad 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -20,8 +20,6 @@ import os import re -import shutil -import socket import subprocess import time from pathlib import Path @@ -31,14 +29,9 @@ import ray from loguru import logger -MODALITY_GROUPS = ["text", "image", "video", "audio"] - +from nemo_curator.core.client import RayClient -def find_free_port() -> int: - """Find an available port on the system.""" - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(("", 0)) - return s.getsockname()[1] +MODALITY_GROUPS = ["text", "image", "video", "audio"] def gpu_available() -> bool: @@ -190,37 +183,6 @@ def _wait_for_head_node_alive(ray_address: str, timeout: int = 120) -> None: raise RuntimeError(msg) -def _build_ray_command(temp_dir: str, num_cpus: int, num_gpus: int, object_store_memory: int) -> tuple[list[str], int]: - """Build the Ray start command with the given configuration.""" - ray_port = find_free_port() - dashboard_port = find_free_port() - ray_client_server_port = find_free_port() - - return [ - "ray", - "start", - "--head", - "--disable-usage-stats", - "--port", - str(ray_port), - "--dashboard-port", - str(dashboard_port), - "--ray-client-server-port", - str(ray_client_server_port), - "--dashboard-host", - "0.0.0.0", # noqa: S104 - "--temp-dir", - str(temp_dir), - "--num-cpus", - str(num_cpus), - "--num-gpus", - str(num_gpus), - "--object-store-memory", - str(object_store_memory), - "--block", - ], ray_port - - @pytest.fixture(scope="session", autouse=True) def shared_ray_cluster(tmp_path_factory: pytest.TempPathFactory, pytestconfig: pytest.Config) -> str: """Set up a shared Ray cluster with dynamic GPU configuration. @@ -248,56 +210,32 @@ def shared_ray_cluster(tmp_path_factory: pytest.TempPathFactory, pytestconfig: p logger.error(error_msg) raise RuntimeError(error_msg) - # Set up Ray configuration values num_cpus = 11 num_gpus = 2 if needs_gpu else 0 object_store_memory = 2 * (1024**3) # 2 GB logger.info(f"Configuring Ray cluster with {'GPU' if needs_gpu else 'CPU-only'} support") - # Create a temporary directory for Ray to avoid conflicts with other instances temp_dir = tmp_path_factory.mktemp("ray") - - # Build and execute Ray command - cmd_to_run, ray_port = _build_ray_command(str(temp_dir), num_cpus, num_gpus, object_store_memory) - - logger.info(f"Starting Ray cluster with {num_gpus} GPUs") - logger.info(f"Running Ray command: {' '.join(cmd_to_run)}") - os.environ["RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO"] = "0" - # Use explicit path to ray command for security - ray_process = subprocess.Popen(cmd_to_run, shell=False) # noqa: S603 - logger.info(f"Started Ray process: {ray_process.pid}") - - ray_address = f"localhost:{ray_port}" - os.environ["RAY_ADDRESS"] = ray_address - logger.info(f"Set RAY_ADDRESS for tests to: {ray_address}") - - # Wait for the cluster to be ready before yielding - deadline = time.time() + 120 - while time.time() < deadline: - result = subprocess.run( # noqa: S603 - [shutil.which("ray") or "ray", "status", "--address", ray_address], - capture_output=True, - check=False, - ) - if result.returncode == 0: - break - time.sleep(1) - else: - msg = f"Ray cluster at {ray_address} did not become ready within 60 seconds" - raise RuntimeError(msg) + ray_client = RayClient( + num_cpus=num_cpus, + num_gpus=num_gpus, + object_store_memory=object_store_memory, + ray_temp_dir=str(temp_dir), + include_dashboard=False, + ) + ray_client.start() - _wait_for_head_node_alive(ray_address) + ray_address = os.environ["RAY_ADDRESS"] + logger.info(f"Ray cluster started at: {ray_address}") try: yield ray_address finally: - # Ensure cleanup happens even if tests fail logger.info("Shutting down Ray cluster") - ray_process.kill() - ray_process.wait() # Wait for process to actually terminate + ray_client.stop() @pytest.fixture From 9e62e0acda7fc9c460e826042e6ac00accda14f1 Mon Sep 17 00:00:00 2001 From: Onur Yilmaz Date: Thu, 26 Feb 2026 18:28:40 -0500 Subject: [PATCH 3/6] Delete wait ray code Signed-off-by: Onur Yilmaz --- tests/conftest.py | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 5f89912aad..ecc3cb2497 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -162,27 +162,6 @@ def pytest_ignore_collect(collection_path: Path, config: pytest.Config) -> bool: return False -def _wait_for_head_node_alive(ray_address: str, timeout: int = 120) -> None: - """Wait until the head node registers as ALIVE in GCS. - - In Ray 2.54+, `ray status` succeeds as soon as GCS starts, but the head - node may not have registered as ALIVE yet. ray.init() will fail with - "No node info found matching node ids" until that registration completes. - """ - deadline = time.time() + timeout - while time.time() < deadline: - try: - ray.init(address=ray_address) - ray.shutdown() - logger.info("Head node confirmed ALIVE in GCS") - except Exception: # noqa: BLE001, PERF203 - time.sleep(1) - else: - return - msg = f"Head node at {ray_address} did not become ALIVE in GCS within {timeout} seconds" - raise RuntimeError(msg) - - @pytest.fixture(scope="session", autouse=True) def shared_ray_cluster(tmp_path_factory: pytest.TempPathFactory, pytestconfig: pytest.Config) -> str: """Set up a shared Ray cluster with dynamic GPU configuration. From 27c5016e1aaddc53310896bfd7867b88e016aabd Mon Sep 17 00:00:00 2001 From: Onur Yilmaz Date: Thu, 26 Feb 2026 18:29:43 -0500 Subject: [PATCH 4/6] Fix ruff issue Signed-off-by: Onur Yilmaz --- tests/conftest.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index ecc3cb2497..e4b0a71c00 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,7 +21,6 @@ import os import re import subprocess -import time from pathlib import Path from typing import Any From 3fcb147b3dc919992b1ade41276dd16e437fa527 Mon Sep 17 00:00:00 2001 From: Onur Yilmaz Date: Tue, 3 Mar 2026 20:25:51 -0500 Subject: [PATCH 5/6] Move env variable to xenna executor Signed-off-by: Onur Yilmaz --- nemo_curator/backends/xenna/executor.py | 5 ++++- nemo_curator/core/utils.py | 2 -- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/nemo_curator/backends/xenna/executor.py b/nemo_curator/backends/xenna/executor.py index 20f6fcd7a4..3c91fbc11d 100644 --- a/nemo_curator/backends/xenna/executor.py +++ b/nemo_curator/backends/xenna/executor.py @@ -136,11 +136,14 @@ def execute(self, stages: list[ProcessingStage], initial_tasks: list[Task] | Non try: register_loguru_serializer() + # Prevent Ray from overriding accelerator env vars when num_gpus=0, letting Xenna manage them instead. ray.init( ignore_reinit_error=True, runtime_env={ # We need to set this env var to avoid ray from setting CUDA_VISIBLE_DEVICES and let xenna do it - "env_vars": {"RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES": "0"} + "env_vars": {"RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES": "0", + "RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO": "0", + } }, ) # Run the pipeline (this will re-initialize ray but that'll be a no-op and the ray.init above will take precedence) diff --git a/nemo_curator/core/utils.py b/nemo_curator/core/utils.py index 560fbe2daa..9d694054a3 100644 --- a/nemo_curator/core/utils.py +++ b/nemo_curator/core/utils.py @@ -174,8 +174,6 @@ def init_cluster( # noqa: PLR0913 # We set some env vars for Xenna here. This is only used for Xenna clusters. os.environ["XENNA_RAY_METRICS_PORT"] = str(ray_metrics_port) os.environ["XENNA_RESPECT_CUDA_VISIBLE_DEVICES"] = "1" - os.environ["RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO"] = "0" - if stdouterr_capture_file: with open(stdouterr_capture_file, "w") as f: proc = subprocess.Popen( # noqa: S603 From e5fb8f8dbe8e564d63ac153c285adeb002cb00aa Mon Sep 17 00:00:00 2001 From: Onur Yilmaz Date: Tue, 3 Mar 2026 23:49:50 -0500 Subject: [PATCH 6/6] Fix test_max_calls_pid.py import error by replacing build_ray_command with RayClient - Replace subprocess-based build_ray_command with RayClient in single_cpu_ray_cluster fixture - Add OSError to gpu_available() exception handler to handle incompatible nvidia-smi binaries in Docker --- .../ray_data/test_max_calls_pid.py | 21 ++++++++++--------- tests/conftest.py | 3 +-- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/tests/backends/experimental/ray_data/test_max_calls_pid.py b/tests/backends/experimental/ray_data/test_max_calls_pid.py index e060eeea77..6c3a258994 100644 --- a/tests/backends/experimental/ray_data/test_max_calls_pid.py +++ b/tests/backends/experimental/ray_data/test_max_calls_pid.py @@ -15,8 +15,6 @@ import math import os import re -import shutil -import subprocess import tempfile import pandas as pd @@ -26,10 +24,10 @@ from nemo_curator.backends.experimental.ray_data.executor import RayDataExecutor from nemo_curator.backends.experimental.utils import RayStageSpecKeys +from nemo_curator.core.client import RayClient from nemo_curator.stages.base import ProcessingStage, Resources from nemo_curator.tasks import DocumentBatch, EmptyTask from tests.backends.utils import capture_logs -from tests.conftest import build_ray_command @pytest.fixture(scope="module") @@ -44,18 +42,21 @@ def single_cpu_ray_cluster(): original_ray_address = os.environ.pop("RAY_ADDRESS", None) temp_dir = tempfile.mkdtemp(prefix="ray1cpu_") - cmd, ray_port = build_ray_command(str(temp_dir), num_cpus=1, num_gpus=0, object_store_memory=2 * (1024**3)) - ray_process = subprocess.Popen(cmd, shell=False) # noqa: S603 + ray_client = RayClient( + num_cpus=1, + num_gpus=0, + object_store_memory=2 * (1024**3), + ray_temp_dir=str(temp_dir), + include_dashboard=False, + ) + ray_client.start() - ray_address = f"localhost:{ray_port}" - os.environ["RAY_ADDRESS"] = ray_address + ray_address = os.environ["RAY_ADDRESS"] try: yield ray_address finally: - ray_process.kill() - ray_process.wait() - shutil.rmtree(temp_dir, ignore_errors=True) + ray_client.stop() if original_ray_address is not None: os.environ["RAY_ADDRESS"] = original_ray_address elif "RAY_ADDRESS" in os.environ: diff --git a/tests/conftest.py b/tests/conftest.py index b7f7460493..5953663bf0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -60,7 +60,7 @@ def gpu_available() -> bool: gpu_count = int(result.stdout.strip()) logger.info(f"Detected {gpu_count} GPU(s) via nvidia-smi") return gpu_count > 0 - except (subprocess.TimeoutExpired, FileNotFoundError, ValueError): + except (subprocess.TimeoutExpired, FileNotFoundError, ValueError, OSError): pass logger.warning("No GPU detected") @@ -195,7 +195,6 @@ def shared_ray_cluster(tmp_path_factory: pytest.TempPathFactory, pytestconfig: p logger.info(f"Configuring Ray cluster with {'GPU' if needs_gpu else 'CPU-only'} support") temp_dir = tmp_path_factory.mktemp("ray") - os.environ["RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO"] = "0" ray_client = RayClient( num_cpus=num_cpus,