Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/cicd-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ jobs:
timeout-minutes: 40
run: |
uv sync --link-mode copy --locked --extra audio_cpu --extra sdg_cpu --extra text_cpu --extra video_cpu --group test
source .venv/bin/activate
FOLDER="${{ matrix.folder }}"
FOLDER="${FOLDER/stages-/stages/}"
uv run coverage run --branch --source=nemo_curator -m pytest -v "tests/$FOLDER" -m "not gpu"
coverage run --branch --source=nemo_curator -m pytest -v "tests/$FOLDER" -m "not gpu"

- name: Generate report
id: check
Expand Down
5 changes: 4 additions & 1 deletion nemo_curator/backends/xenna/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,14 @@ def execute(self, stages: list[ProcessingStage], initial_tasks: list[Task] | Non

try:
register_loguru_serializer()
# Prevent Ray from overriding accelerator env vars when num_gpus=0, letting Xenna manage them instead.
ray.init(
ignore_reinit_error=True,
runtime_env={
# We need to set this env var to avoid ray from setting CUDA_VISIBLE_DEVICES and let xenna do it
"env_vars": {"RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES": "0"}
"env_vars": {"RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES": "0",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abhinavg4 in your PR for Xenna bump can you see if we need to get rid of RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES

"RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO": "0",
}
},
)
# Run the pipeline (this will re-initialize ray but that'll be a no-op and the ray.init above will take precedence)
Expand Down
1 change: 0 additions & 1 deletion nemo_curator/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ def init_cluster( # noqa: PLR0913
# We set some env vars for Xenna here. This is only used for Xenna clusters.
os.environ["XENNA_RAY_METRICS_PORT"] = str(ray_metrics_port)
os.environ["XENNA_RESPECT_CUDA_VISIBLE_DEVICES"] = "1"

if stdouterr_capture_file:
with open(stdouterr_capture_file, "w") as f:
proc = subprocess.Popen( # noqa: S603
Expand Down
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ dependencies = [
"openai>=1.0.0",
"pandas>=2.1.0",
"pyarrow",
"ray[default,data]>=2.50",
"ray[default,data]>=2.54",
"torch",
"transformers",
]
Expand Down Expand Up @@ -212,7 +212,6 @@ constraint-dependencies = [
"protobuf>=5.29.6", # Address CVE GHSA-8qvm-5x2c-j2w7
"pyasn1>=0.6.2", # Address CVE GHSA-63vm-454h-vhhq
"python-multipart>=0.0.22", # Address CVE GHSA-wp53-j4wj-2cfg
"ray[default,data]>=2.52", # Address CVE GHSA-q279-jhrf-cc6v
"starlette>=0.49.1", # Address CVE GHSA-7f5h-v6xp-fcq8
"urllib3>=2.6.3", # Address CVE GHSA-38jv-5279-wg99
"wheel>=0.46.2", # Address CVE GHSA-8rrh-rw8j-w5fx
Expand Down
21 changes: 11 additions & 10 deletions tests/backends/experimental/ray_data/test_max_calls_pid.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import math
import os
import re
import shutil
import subprocess
import tempfile

import pandas as pd
Expand All @@ -26,10 +24,10 @@

from nemo_curator.backends.experimental.ray_data.executor import RayDataExecutor
from nemo_curator.backends.experimental.utils import RayStageSpecKeys
from nemo_curator.core.client import RayClient
from nemo_curator.stages.base import ProcessingStage, Resources
from nemo_curator.tasks import DocumentBatch, EmptyTask
from tests.backends.utils import capture_logs
from tests.conftest import build_ray_command


@pytest.fixture(scope="module")
Expand All @@ -44,18 +42,21 @@ def single_cpu_ray_cluster():
original_ray_address = os.environ.pop("RAY_ADDRESS", None)

temp_dir = tempfile.mkdtemp(prefix="ray1cpu_")
cmd, ray_port = build_ray_command(str(temp_dir), num_cpus=1, num_gpus=0, object_store_memory=2 * (1024**3))
ray_process = subprocess.Popen(cmd, shell=False) # noqa: S603
ray_client = RayClient(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this change!

num_cpus=1,
num_gpus=0,
object_store_memory=2 * (1024**3),
ray_temp_dir=str(temp_dir),
include_dashboard=False,
)
ray_client.start()

ray_address = f"localhost:{ray_port}"
os.environ["RAY_ADDRESS"] = ray_address
ray_address = os.environ["RAY_ADDRESS"]

try:
yield ray_address
finally:
ray_process.kill()
ray_process.wait()
shutil.rmtree(temp_dir, ignore_errors=True)
ray_client.stop()
if original_ray_address is not None:
os.environ["RAY_ADDRESS"] = original_ray_address
elif "RAY_ADDRESS" in os.environ:
Expand Down
71 changes: 14 additions & 57 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import os
import re
import socket
import subprocess
from pathlib import Path
from typing import Any
Expand All @@ -29,14 +28,9 @@
import ray
from loguru import logger

MODALITY_GROUPS = ["text", "image", "video", "audio"]

from nemo_curator.core.client import RayClient

def find_free_port() -> int:
"""Find an available port on the system."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0))
return s.getsockname()[1]
MODALITY_GROUPS = ["text", "image", "video", "audio"]


def gpu_available() -> bool:
Expand Down Expand Up @@ -66,7 +60,7 @@ def gpu_available() -> bool:
gpu_count = int(result.stdout.strip())
logger.info(f"Detected {gpu_count} GPU(s) via nvidia-smi")
return gpu_count > 0
except (subprocess.TimeoutExpired, FileNotFoundError, ValueError):
except (subprocess.TimeoutExpired, FileNotFoundError, ValueError, OSError):
pass

logger.warning("No GPU detected")
Expand Down Expand Up @@ -167,37 +161,6 @@ def pytest_ignore_collect(collection_path: Path, config: pytest.Config) -> bool:
return False


def build_ray_command(temp_dir: str, num_cpus: int, num_gpus: int, object_store_memory: int) -> tuple[list[str], int]:
"""Build the Ray start command with the given configuration."""
ray_port = find_free_port()
dashboard_port = find_free_port()
ray_client_server_port = find_free_port()

return [
"ray",
"start",
"--head",
"--disable-usage-stats",
"--port",
str(ray_port),
"--dashboard-port",
str(dashboard_port),
"--ray-client-server-port",
str(ray_client_server_port),
"--dashboard-host",
"0.0.0.0", # noqa: S104
"--temp-dir",
str(temp_dir),
"--num-cpus",
str(num_cpus),
"--num-gpus",
str(num_gpus),
"--object-store-memory",
str(object_store_memory),
"--block",
], ray_port


@pytest.fixture(scope="session", autouse=True)
def shared_ray_cluster(tmp_path_factory: pytest.TempPathFactory, pytestconfig: pytest.Config) -> str:
"""Set up a shared Ray cluster with dynamic GPU configuration.
Expand Down Expand Up @@ -225,37 +188,31 @@ def shared_ray_cluster(tmp_path_factory: pytest.TempPathFactory, pytestconfig: p
logger.error(error_msg)
raise RuntimeError(error_msg)

# Set up Ray configuration values
num_cpus = 11
num_gpus = 2 if needs_gpu else 0
object_store_memory = 2 * (1024**3) # 2 GB

logger.info(f"Configuring Ray cluster with {'GPU' if needs_gpu else 'CPU-only'} support")

# Create a temporary directory for Ray to avoid conflicts with other instances
temp_dir = tmp_path_factory.mktemp("ray")

# Build and execute Ray command
cmd_to_run, ray_port = build_ray_command(str(temp_dir), num_cpus, num_gpus, object_store_memory)

logger.info(f"Starting Ray cluster with {num_gpus} GPUs")
logger.info(f"Running Ray command: {' '.join(cmd_to_run)}")

# Use explicit path to ray command for security
ray_process = subprocess.Popen(cmd_to_run, shell=False) # noqa: S603
logger.info(f"Started Ray process: {ray_process.pid}")
ray_client = RayClient(
num_cpus=num_cpus,
num_gpus=num_gpus,
object_store_memory=object_store_memory,
ray_temp_dir=str(temp_dir),
include_dashboard=False,
)
ray_client.start()

ray_address = f"localhost:{ray_port}"
os.environ["RAY_ADDRESS"] = ray_address
logger.info(f"Set RAY_ADDRESS for tests to: {ray_address}")
ray_address = os.environ["RAY_ADDRESS"]
logger.info(f"Ray cluster started at: {ray_address}")
Comment on lines +199 to +209
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this 🙏


try:
yield ray_address
finally:
# Ensure cleanup happens even if tests fail
logger.info("Shutting down Ray cluster")
ray_process.kill()
ray_process.wait() # Wait for process to actually terminate
ray_client.stop()


@pytest.fixture
Expand Down
6 changes: 4 additions & 2 deletions tests/stages/text/embedders/test_vllm.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ class _FakeLLM:
def __init__(self, model: str, **kwargs: Any) -> None: # noqa: ANN401
captured["llm"] = {"model": model, "kwargs": kwargs}

monkeypatch.setattr("nemo_curator.stages.text.embedders.vllm.snapshot_download", _fake_snapshot_download)
monkeypatch.setattr("nemo_curator.stages.text.embedders.vllm.LLM", _FakeLLM)
import nemo_curator.stages.text.embedders.vllm as _vllm_mod

monkeypatch.setattr(_vllm_mod, "snapshot_download", _fake_snapshot_download)
monkeypatch.setattr(_vllm_mod, "LLM", _FakeLLM)

stage.setup_on_node()

Expand Down
9 changes: 6 additions & 3 deletions tests/utils/test_client_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ def test_is_remote_url_local_file(self) -> None:

def test_is_remote_url_with_mock_fsspec(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Test is_remote_url with mocked fsspec to test protocol handling."""
import nemo_curator.utils.client_utils as _client_utils
from nemo_curator.utils.client_utils import is_remote_url

# Mock the url_to_fs function
Expand All @@ -349,7 +350,7 @@ def __init__(self, protocol: str) -> None:
else:
return MockFS(None), ""

monkeypatch.setattr("nemo_curator.utils.client_utils.url_to_fs", mock_url_to_fs)
monkeypatch.setattr(_client_utils, "url_to_fs", mock_url_to_fs)

# Test with mocked protocols
assert is_remote_url("s3://bucket/key")
Expand All @@ -360,6 +361,7 @@ def __init__(self, protocol: str) -> None:

def test_is_remote_url_multiple_protocols(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Test is_remote_url with filesystem that has multiple protocols."""
import nemo_curator.utils.client_utils as _client_utils
from nemo_curator.utils.client_utils import is_remote_url

def mock_url_to_fs(_url: str) -> tuple:
Expand All @@ -370,13 +372,14 @@ def __init__(self) -> None:

return MockFS(), ""

monkeypatch.setattr("nemo_curator.utils.client_utils.url_to_fs", mock_url_to_fs)
monkeypatch.setattr(_client_utils, "url_to_fs", mock_url_to_fs)

# Should return True since first protocol is "s3" (remote)
assert is_remote_url("s3://bucket/key")

def test_is_remote_url_no_protocol(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Test is_remote_url with filesystem that has no protocol."""
import nemo_curator.utils.client_utils as _client_utils
from nemo_curator.utils.client_utils import is_remote_url

def mock_url_to_fs(_url: str) -> tuple:
Expand All @@ -387,7 +390,7 @@ def __init__(self) -> None:

return MockFS(), ""

monkeypatch.setattr("nemo_curator.utils.client_utils.url_to_fs", mock_url_to_fs)
monkeypatch.setattr(_client_utils, "url_to_fs", mock_url_to_fs)

# Should return False since protocol is None
assert not is_remote_url("unknown://path")
29 changes: 14 additions & 15 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading