diff --git a/README.md b/README.md index 324ac70b8..11ea57d57 100644 --- a/README.md +++ b/README.md @@ -16,19 +16,19 @@ These schemas enable CloudAI to be flexible and compatible with different system ## Support matrix -|Test|Slurm|Kubernetes (experimental)|Standalone| -|---|---|---|---| -|ChakraReplay|✅|❌|❌| -|GPT|✅|❌|❌| -|Grok|✅|❌|❌| -|NCCL|✅|✅|❌| -|NeMo Launcher|✅|❌|❌| -|NeMo Run|✅|❌|❌| -|Nemotron|✅|❌|❌| -|Sleep|✅|✅|✅| -|UCC|✅|❌|❌| -|SlurmContainer|✅|❌|❌| -|MegatronRun (experimental)|✅|❌|❌| +|Test|Slurm|Kubernetes|RunAI|Standalone| +|---|---|---|---|---| +|ChakraReplay|✅|❌|❌|❌| +|GPT|✅|❌|❌|❌| +|Grok|✅|❌|❌|❌| +|NCCL|✅|✅|✅|❌| +|NeMo Launcher|✅|❌|❌|❌| +|NeMo Run|✅|❌|❌|❌| +|Nemotron|✅|❌|❌|❌| +|Sleep|✅|✅|❌|✅| +|UCC|✅|❌|❌|❌| +|SlurmContainer|✅|❌|❌|❌| +|MegatronRun (experimental)|✅|❌|❌|❌| ## Set Up Access to the Private NGC Registry diff --git a/USER_GUIDE.md b/USER_GUIDE.md index bd5e408e1..20aaddaf5 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -89,7 +89,7 @@ ntasks_per_node = 8 [partitions.] name = "" ``` -Replace `` with the name of the partition you want to use. You can find the partition name by running `sinfo` on the cluster. +Replace `` with the name of the partition you want to use. You can find the partition name by running `sinfo` on the cluster. #### Step 4: Install Test Requirements Once all configs are ready, it is time to install test requirements. It is done once so that you can run multiple experiments without reinstalling the requirements. This step requires the system config file from the step 3. @@ -237,12 +237,33 @@ cache_docker_images_locally = true - **output_path**: Defines the default path where outputs are stored. Whenever a user runs a test scenario, a new subdirectory will be created under this path. - **default_partition**: Specifies the default partition where jobs are scheduled. - **partitions**: Describes the available partitions and nodes within those partitions. - - **[optional] groups**: Within the same partition, users can define groups of nodes. The group concept can be used to allocate nodes from specific groups in a test scenario schema. For instance, this feature is useful for specifying topology awareness. Groups represents logical partitioning of nodes and users are responsible for ensuring no overlap across groups. + - **[optional] groups**: Within the same partition, users can define groups of nodes. The group concept can be used to allocate nodes from specific groups in a test scenario schema. For instance, this feature is useful for specifying topology awareness. Groups represents logical partitioning of nodes and users are responsible for ensuring no overlap across groups. - **mpi**: Indicates the Process Management Interface (PMI) implementation to be used for inter-process communication. - **gpus_per_node** and **ntasks_per_node**: These are Slurm arguments passed to the `sbatch` script and `srun`. - **cache_docker_images_locally**: Specifies whether CloudAI should cache remote Docker images locally during installation. If set to `true`, CloudAI will cache the Docker images, enabling local access without needing to download them each time a test is run. This approach saves network bandwidth but requires more disk capacity. If set to `false`, CloudAI will allow Slurm to download the Docker images as needed when they are not cached locally by Slurm. - **global_env_vars**: Lists all global environment variables that will be applied globally whenever tests are run. +## Describing a System for RunAI Scheduler +When using RunAI as the scheduler, you need to specify additional fields in the system schema TOML file. Below is the list of required fields and how to set them: + +```toml +name = "runai-cluster" +scheduler = "runai" + +install_path = "./install" +output_path = "./results" + +base_url = "http://runai.example.com" # The URL of your RunAI system, typically the same as used for the web interface. +user_email = "your_email" # The email address used to log into the RunAI system. +app_id = "your_app_id" # Obtained by creating an application in the RunAI web interface. +app_secret = "your_app_secret" # Obtained together with the app_id. +project_id = "your_project_id" # Project ID assigned or created in the RunAI system (usually an integer). +cluster_id = "your_cluster_id" # Cluster ID in UUID format (e.g., a69928cc-ccaa-48be-bda9-482440f4d855). +``` +* After logging into the RunAI web interface, navigate to Access → Applications and create a new application to obtain app_id and app_secret. +* Use your assigned project and cluster IDs. Contact your administrator if they are not available. +* All other fields follow the same semantics as in the Slurm system schema (e.g., install_path, output_path). + ## Describing a Test Scenario in the Test Scenario Schema A test scenario is a set of tests with specific dependencies between them. A test scenario is described in a TOML schema file. This is an example of a test scenario file: ```toml diff --git a/conf/common/system/example_runai_cluster.toml b/conf/common/system/example_runai_cluster.toml new file mode 100644 index 000000000..b8a610523 --- /dev/null +++ b/conf/common/system/example_runai_cluster.toml @@ -0,0 +1,34 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name = "example-runai-cluster" +scheduler = "runai" + +install_path = "./install_dir" +output_path = "./results" +monitor_interval = 1 + +base_url = "http://runai.example.com" +user_email = "your_email" +app_id = "your_app_id" +app_secret = "your_app_secret" +project_id = "your_project_id" +cluster_id = "your_cluster_id" + +[global_env_vars] +NCCL_IB_GID_INDEX = "3" +NCCL_IB_TIMEOUT = "20" +NCCL_IB_QPS_PER_CONNECTION = "4" diff --git a/pyproject.toml b/pyproject.toml index 2c847be45..630118fd8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ dependencies = [ "kubernetes==30.1.0", "pydantic==2.8.2", "jinja2==3.1.6", + "websockets==15.0.1", ] [project.scripts] cloudai = "cloudai.__main__:main" diff --git a/requirements.txt b/requirements.txt index 08ca0bbd5..c12664a11 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,5 @@ tbparse==0.0.8 toml==0.10.2 kubernetes==30.1.0 pydantic==2.8.2 -jinja2==3.1.6 \ No newline at end of file +jinja2==3.1.6 +websockets==15.0.1 diff --git a/src/cloudai/__init__.py b/src/cloudai/__init__.py index da538564e..8e2345d0d 100644 --- a/src/cloudai/__init__.py +++ b/src/cloudai/__init__.py @@ -48,15 +48,18 @@ from ._core.test_template_strategy import TestTemplateStrategy from .installer.kubernetes_installer import KubernetesInstaller from .installer.lsf_installer import LSFInstaller +from .installer.runai_installer import RunAIInstaller from .installer.slurm_installer import SlurmInstaller from .installer.standalone_installer import StandaloneInstaller from .parser import Parser from .runner.kubernetes.kubernetes_runner import KubernetesRunner from .runner.lsf.lsf_runner import LSFRunner +from .runner.runai.runai_runner import RunAIRunner from .runner.slurm.slurm_runner import SlurmRunner from .runner.standalone.standalone_runner import StandaloneRunner from .systems.kubernetes.kubernetes_system import KubernetesSystem from .systems.lsf.lsf_system import LSFSystem +from .systems.runai.runai_system import RunAISystem from .systems.slurm.slurm_system import SlurmSystem from .systems.standalone_system import StandaloneSystem from .workloads.chakra_replay import ( @@ -91,6 +94,7 @@ NcclTestJobStatusRetrievalStrategy, NcclTestKubernetesJsonGenStrategy, NcclTestPerformanceReportGenerationStrategy, + NcclTestRunAIJsonGenStrategy, NcclTestSlurmCommandGenStrategy, ) from .workloads.nemo_launcher import ( @@ -126,6 +130,7 @@ Registry().add_runner("kubernetes", KubernetesRunner) Registry().add_runner("standalone", StandaloneRunner) Registry().add_runner("lsf", LSFRunner) +Registry().add_runner("runai", RunAIRunner) Registry().add_strategy( CommandGenStrategy, [StandaloneSystem], [SleepTestDefinition], SleepStandaloneCommandGenStrategy @@ -134,6 +139,7 @@ Registry().add_strategy(CommandGenStrategy, [SlurmSystem], [SleepTestDefinition], SleepSlurmCommandGenStrategy) Registry().add_strategy(JsonGenStrategy, [KubernetesSystem], [SleepTestDefinition], SleepKubernetesJsonGenStrategy) Registry().add_strategy(JsonGenStrategy, [KubernetesSystem], [NCCLTestDefinition], NcclTestKubernetesJsonGenStrategy) +Registry().add_strategy(JsonGenStrategy, [RunAISystem], [NCCLTestDefinition], NcclTestRunAIJsonGenStrategy) Registry().add_strategy(GradingStrategy, [SlurmSystem], [NCCLTestDefinition], NcclTestGradingStrategy) Registry().add_strategy( @@ -164,6 +170,7 @@ [GPTTestDefinition, GrokTestDefinition, NemotronTestDefinition], JaxToolboxSlurmCommandGenStrategy, ) + Registry().add_strategy( JobIdRetrievalStrategy, [SlurmSystem], @@ -184,8 +191,8 @@ Registry().add_strategy( JobIdRetrievalStrategy, [StandaloneSystem], [SleepTestDefinition], StandaloneJobIdRetrievalStrategy ) - Registry().add_strategy(JobIdRetrievalStrategy, [LSFSystem], [SleepTestDefinition], LSFJobIdRetrievalStrategy) + Registry().add_strategy( JobStatusRetrievalStrategy, [KubernetesSystem], @@ -221,10 +228,16 @@ Registry().add_strategy( JobStatusRetrievalStrategy, [StandaloneSystem], [SleepTestDefinition], DefaultJobStatusRetrievalStrategy ) - Registry().add_strategy( JobStatusRetrievalStrategy, [LSFSystem], [SleepTestDefinition], DefaultJobStatusRetrievalStrategy ) +Registry().add_strategy( + JobStatusRetrievalStrategy, + [RunAISystem], + [NCCLTestDefinition], + DefaultJobStatusRetrievalStrategy, +) + Registry().add_strategy(CommandGenStrategy, [SlurmSystem], [UCCTestDefinition], UCCTestSlurmCommandGenStrategy) Registry().add_strategy(GradingStrategy, [SlurmSystem], [ChakraReplayTestDefinition], ChakraReplayGradingStrategy) @@ -239,11 +252,13 @@ Registry().add_installer("standalone", StandaloneInstaller) Registry().add_installer("kubernetes", KubernetesInstaller) Registry().add_installer("lsf", LSFInstaller) +Registry().add_installer("runai", RunAIInstaller) Registry().add_system("slurm", SlurmSystem) Registry().add_system("standalone", StandaloneSystem) Registry().add_system("kubernetes", KubernetesSystem) Registry().add_system("lsf", LSFSystem) +Registry().add_system("runai", RunAISystem) Registry().add_test_definition("UCCTest", UCCTestDefinition) Registry().add_test_definition("NcclTest", NCCLTestDefinition) @@ -298,6 +313,7 @@ "PythonExecutable", "ReportGenerationStrategy", "Reporter", + "RunAISystem", "Runner", "System", "SystemConfigParsingError", diff --git a/src/cloudai/installer/runai_installer.py b/src/cloudai/installer/runai_installer.py new file mode 100644 index 000000000..20f75fe33 --- /dev/null +++ b/src/cloudai/installer/runai_installer.py @@ -0,0 +1,48 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from cloudai import BaseInstaller, Installable, InstallStatusResult +from cloudai.systems.runai.runai_system import RunAISystem + + +class RunAIInstaller(BaseInstaller): + """Installer for RunAI systems.""" + + def __init__(self, system: RunAISystem): + """Initialize the RunAIInstaller with a system object.""" + super().__init__(system) + + def _check_prerequisites(self) -> InstallStatusResult: + logging.info("Checking prerequisites for RunAI installation.") + return InstallStatusResult(True) + + def install_one(self, item: Installable) -> InstallStatusResult: + logging.info(f"Installing {item} for RunAI.") + return InstallStatusResult(True) + + def uninstall_one(self, item: Installable) -> InstallStatusResult: + logging.info(f"Uninstalling {item} for RunAI.") + return InstallStatusResult(True) + + def is_installed_one(self, item: Installable) -> InstallStatusResult: + logging.info(f"Checking if {item} is installed for RunAI.") + return InstallStatusResult(True) + + def mark_as_installed_one(self, item: Installable) -> InstallStatusResult: + logging.info(f"Marking {item} as installed for RunAI.") + return InstallStatusResult(True) diff --git a/src/cloudai/runner/runai/__init__.py b/src/cloudai/runner/runai/__init__.py new file mode 100644 index 000000000..de3a7588e --- /dev/null +++ b/src/cloudai/runner/runai/__init__.py @@ -0,0 +1,15 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/cloudai/runner/runai/runai_job.py b/src/cloudai/runner/runai/runai_job.py new file mode 100644 index 000000000..b496d0f30 --- /dev/null +++ b/src/cloudai/runner/runai/runai_job.py @@ -0,0 +1,27 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass + +from cloudai import BaseJob +from cloudai.systems.runai.runai_training import ActualPhase + + +@dataclass +class RunAIJob(BaseJob): + """A job class for execution on an RunAI system.""" + + status: ActualPhase diff --git a/src/cloudai/runner/runai/runai_runner.py b/src/cloudai/runner/runai/runai_runner.py new file mode 100644 index 000000000..07a7f27fb --- /dev/null +++ b/src/cloudai/runner/runai/runai_runner.py @@ -0,0 +1,54 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import cast + +from cloudai import BaseJob, BaseRunner, TestRun +from cloudai.systems.runai.runai_system import RunAISystem + +from .runai_job import RunAIJob + + +class RunAIRunner(BaseRunner): + """Class to manage and execute workloads using the RunAI platform.""" + + def _submit_test(self, tr: TestRun) -> RunAIJob: + logging.info(f"Running test: {tr.name}") + tr.output_path = self.get_job_output_path(tr) + job_spec = tr.test.test_template.gen_json(tr) + logging.debug(f"Generated JSON for test {tr.name}: {job_spec}") + + if self.mode == "run": + runai_system = cast(RunAISystem, self.system) + training = runai_system.create_training(job_spec) + job = RunAIJob(test_run=tr, id=training.workload_id, status=training.actual_phase) + logging.info(f"Submitted RunAI job: {job.id}") + return job + else: + raise RuntimeError("Invalid mode for submitting a test.") + + async def job_completion_callback(self, job: BaseJob) -> None: + runai_system = cast(RunAISystem, self.system) + job = cast(RunAIJob, job) + workload_id = str(job.id) + runai_system.get_workload_events(workload_id, job.test_run.output_path / "events.txt") + await runai_system.store_logs(workload_id, job.test_run.output_path / "stdout.txt") + + def kill_job(self, job: BaseJob) -> None: + runai_system = cast(RunAISystem, self.system) + job = cast(RunAIJob, job) + runai_system.delete_training(str(job.id)) diff --git a/src/cloudai/systems/__init__.py b/src/cloudai/systems/__init__.py index 34fa0ff12..e6221f48a 100644 --- a/src/cloudai/systems/__init__.py +++ b/src/cloudai/systems/__init__.py @@ -16,12 +16,14 @@ from .kubernetes.kubernetes_system import KubernetesSystem from .lsf.lsf_system import LSFSystem +from .runai import RunAISystem from .slurm.slurm_system import SlurmSystem from .standalone_system import StandaloneSystem __all__ = [ "KubernetesSystem", "LSFSystem", + "RunAISystem", "SlurmSystem", "StandaloneSystem", ] diff --git a/src/cloudai/systems/runai/__init__.py b/src/cloudai/systems/runai/__init__.py new file mode 100644 index 000000000..b54bb637a --- /dev/null +++ b/src/cloudai/systems/runai/__init__.py @@ -0,0 +1,19 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .runai_system import RunAISystem + +__all__ = ["RunAISystem"] diff --git a/src/cloudai/systems/runai/runai_cluster.py b/src/cloudai/systems/runai/runai_cluster.py new file mode 100644 index 000000000..d39a2a1ac --- /dev/null +++ b/src/cloudai/systems/runai/runai_cluster.py @@ -0,0 +1,83 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from enum import Enum +from typing import Any, ClassVar, Dict, List, Optional + +from pydantic import BaseModel, ConfigDict, Field + + +class ClusterState(Enum): + """Enumeration of possible cluster states.""" + + WAITING_TO_CONNECT = "WaitingToConnect" + CONNECTED = "Connected" + DISCONNECTED = "Disconnected" + MISSING_PREREQUISITES = "MissingPrerequisites" + SERVICE_ISSUES = "ServiceIssues" + UNKNOWN = "Unknown" + + @classmethod + def from_str(cls, value: str) -> ClusterState: + return cls(cls._value2member_map_.get(value, cls.UNKNOWN)) + + +class Status(BaseModel): + """Represent the status of a RunAI cluster.""" + + state: ClusterState = Field(default=ClusterState.UNKNOWN) + conditions: List[Dict[str, Any]] = Field(default_factory=list) + operands: Dict[str, Any] = Field(default_factory=dict) + platform: Optional[Dict[str, Any]] = Field(default=None) + config: Optional[Dict[str, Any]] = Field(default=None) + dependencies: Dict[str, Any] = Field(default_factory=dict) + + +class RunAICluster(BaseModel): + """Represent a RunAI cluster with its associated data and operations.""" + + uuid: str = Field(default="", alias="uuid") + tenant_id: int = Field(default=0, alias="tenantId") + name: str = Field(default="", alias="name") + created_at: str = Field(default="", alias="createdAt") + domain: Optional[str] = Field(default=None, alias="domain") + version: Optional[str] = Field(default=None, alias="version") + updated_at: Optional[str] = Field(default=None, alias="updatedAt") + deleted_at: Optional[str] = Field(default=None, alias="deletedAt") + last_liveness: Optional[str] = Field(default=None, alias="lastLiveness") + delete_requested_at: Optional[str] = Field(default=None, alias="deleteRequestedAt") + + status: Status = Field(default_factory=Status, alias="status") + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + def is_connected(self) -> bool: + return self.status.state == ClusterState.CONNECTED + + def get_kubernetes_version(self) -> Optional[str]: + if not self.status.platform: + return None + return self.status.platform.get("kubeVersion") + + def __repr__(self) -> str: + """Return a string representation of the RunAICluster object.""" + return ( + f"RunAICluster(name={self.name!r}, uuid={self.uuid!r}, tenant_id={self.tenant_id}, " + f"state={self.status.state.value!r}, created_at={self.created_at!r}, version={self.version!r}, " + f"domain={self.domain!r})" + ) diff --git a/src/cloudai/systems/runai/runai_event.py b/src/cloudai/systems/runai/runai_event.py new file mode 100644 index 000000000..09e6def94 --- /dev/null +++ b/src/cloudai/systems/runai/runai_event.py @@ -0,0 +1,52 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime +from typing import ClassVar + +from pydantic import BaseModel, ConfigDict, Field + + +class InvolvedObject(BaseModel): + """Represent an object involved in a RunAI event.""" + + uid: str = Field(default="", alias="uid") + kind: str = Field(default="", alias="kind") + name: str = Field(default="", alias="name") + namespace: str = Field(default="", alias="namespace") + + +class RunAIEvent(BaseModel): + """Represent a RunAI event.""" + + created_at: datetime = Field(alias="createdAt") + id: str = Field(default="", alias="id") + type: str = Field(default="", alias="type") + cluster_id: str = Field(default="", alias="clusterId") + message: str = Field(default="", alias="message") + reason: str = Field(default="", alias="reason") + source: str = Field(default="", alias="source") + involved_object: InvolvedObject = Field(default_factory=InvolvedObject, alias="involvedObject") + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + def __repr__(self) -> str: + """Return a string representation of the RunAIEvent.""" + return ( + f"RunAIEvent(created_at={self.created_at}, id={self.id}, type={self.type}, " + f"cluster_id={self.cluster_id}, message={self.message}, reason={self.reason}, " + f"source={self.source}, involved_object={self.involved_object})" + ) diff --git a/src/cloudai/systems/runai/runai_node.py b/src/cloudai/systems/runai/runai_node.py new file mode 100644 index 000000000..9c6a2d7b1 --- /dev/null +++ b/src/cloudai/systems/runai/runai_node.py @@ -0,0 +1,73 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import Enum +from typing import Any, ClassVar, Dict, List, Optional + +from pydantic import BaseModel, ConfigDict, Field, root_validator + + +class NodeStatus(Enum): + """Enumeration of possible node statuses.""" + + READY = "Ready" + NOT_READY = "NotReady" + UNKNOWN = "Unknown" + + @classmethod + def from_str(cls, value: str) -> "NodeStatus": + return NodeStatus(cls._value2member_map_.get(value, cls.UNKNOWN)) + + +class RunAINode(BaseModel): + """Represent a node in the RunAI cluster.""" + + status: NodeStatus = Field(default=NodeStatus.UNKNOWN, alias="status") + conditions: List[Dict[str, Any]] = Field(default_factory=list, alias="conditions") + taints: List[Dict[str, Any]] = Field(default_factory=list, alias="taints") + node_pool: str = Field(default="", alias="nodePool") + created_at: str = Field(default="", alias="createdAt") + + gpu_type: Optional[str] = Field(default=None, alias="gpuType") + gpu_count: Optional[int] = Field(default=None, alias="gpuCount") + nvlink_domain_uid: Optional[str] = Field(default=None, alias="nvLinkDomainUid") + nvlink_clique_id: Optional[str] = Field(default=None, alias="nvLinkCliqueId") + + name: str = Field(default="", alias="name") + id: str = Field(default="", alias="id") + cluster_uuid: str = Field(default="", alias="clusterUuid") + updated_at: str = Field(default="", alias="updatedAt") + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + @root_validator(pre=True) + def extract_gpu_info(cls, values): + gpu_info = values.pop("gpuInfo", None) + if gpu_info and isinstance(gpu_info, dict): + values["gpuType"] = gpu_info.get("gpuType") + values["gpuCount"] = gpu_info.get("gpuCount") + values["nvLinkDomainUid"] = gpu_info.get("nvLinkDomainUid") + values["nvLinkCliqueId"] = gpu_info.get("nvLinkCliqueId") + return values + + def __repr__(self) -> str: + """Return a string representation of the RunAINode object.""" + return ( + f"RunAINode(name={self.name!r}, id={self.id!r}, cluster_uuid={self.cluster_uuid!r}, " + f"node_pool={self.node_pool!r}, gpu_type={self.gpu_type!r}, gpu_count={self.gpu_count}, " + f"nvlink_domain_uid={self.nvlink_domain_uid!r}, nvlink_clique_id={self.nvlink_clique_id!r}, " + f"status={self.status.value!r})" + ) diff --git a/src/cloudai/systems/runai/runai_project.py b/src/cloudai/systems/runai/runai_project.py new file mode 100644 index 000000000..c00d79615 --- /dev/null +++ b/src/cloudai/systems/runai/runai_project.py @@ -0,0 +1,65 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import Enum +from typing import Any, ClassVar, Dict, List, Optional + +from pydantic import BaseModel, ConfigDict, Field + + +class ProjectPhase(Enum): + """Enumeration of possible project phases.""" + + READY = "ready" + CREATING = "creating" + + +class RunAIProject(BaseModel): + """Represent a RunAI Project.""" + + description: str = Field(default="", alias="description") + scheduling_rules: Optional[Dict[str, Any]] = Field(default_factory=dict, alias="schedulingRules") + default_node_pools: Optional[List[str]] = Field(default_factory=list, alias="defaultNodePools") + node_types: Dict[str, Any] = Field(default_factory=dict, alias="nodeTypes") + resources: List[Dict[str, Any]] = Field(default_factory=list, alias="resources") + name: str = Field(default="", alias="name") + cluster_id: str = Field(default="", alias="clusterId") + id: str = Field(default="", alias="id") + parent_id: str = Field(default="", alias="parentId") + requested_namespace: Optional[str] = Field(default=None, alias="requestedNamespace") + enforce_runai_scheduler: bool = Field(default=False, alias="enforceRunaiScheduler") + status: Dict[str, Any] = Field(default_factory=dict, alias="status") + total_resources: Dict[str, Any] = Field(default_factory=dict, alias="totalResources") + created_at: str = Field(default="", alias="createdAt") + updated_at: str = Field(default="", alias="updatedAt") + created_by: str = Field(default="", alias="createdBy") + updated_by: str = Field(default="", alias="updatedBy") + parent: Optional[Dict[str, Any]] = Field(default=None, alias="parent") + effective: Dict[str, Any] = Field(default_factory=dict, alias="effective") + overtime_data: Dict[str, Any] = Field(default_factory=dict, alias="overtimeData") + range_24h_data: Optional[Dict[str, Any]] = Field(default=None, alias="range24hData") + range_7d_data: Optional[Dict[str, Any]] = Field(default=None, alias="range7dData") + range_30d_data: Optional[Dict[str, Any]] = Field(default=None, alias="range30dData") + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + def __repr__(self) -> str: + """Prettify project output.""" + phase: str = self.status.get("phase", "") + return ( + f"RunAIProject(name={self.name!r}, id={self.id!r}, cluster_id={self.cluster_id!r}, " + f"created_by={self.created_by!r}, phase={phase!r})" + ) diff --git a/src/cloudai/systems/runai/runai_pvc.py b/src/cloudai/systems/runai/runai_pvc.py new file mode 100644 index 000000000..49ac6e68c --- /dev/null +++ b/src/cloudai/systems/runai/runai_pvc.py @@ -0,0 +1,89 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, ClassVar, Dict, List, Optional + +from pydantic import BaseModel, ConfigDict, Field + + +class PVCMeta(BaseModel): + """Represent the metadata of a Persistent Volume Claim (PVC).""" + + id: str = Field(default="", alias="id") + name: str = Field(default="", alias="name") + kind: str = Field(default="", alias="kind") + scope: str = Field(default="", alias="scope") + cluster_id: str = Field(default="", alias="clusterId") + department_id: Optional[str] = Field(default=None, alias="departmentId") + tenant_id: int = Field(default=-1, alias="tenantId") + created_by: str = Field(default="", alias="createdBy") + created_at: str = Field(default="", alias="createdAt") + updated_by: str = Field(default="", alias="updatedBy") + updated_at: str = Field(default="", alias="updatedAt") + deleted_at: Optional[str] = Field(default=None, alias="deletedAt") + deleted_by: Optional[str] = Field(default=None, alias="deletedBy") + description: str = Field(default="", alias="description") + auto_delete: bool = Field(default=False, alias="autoDelete") + project_id: Optional[int] = Field(default=None, alias="projectId") + workload_supported_types: Optional[Dict[str, Any]] = Field(default=None, alias="workloadSupportedTypes") + project_name: Optional[str] = Field(default=None, alias="projectName") + update_count: Optional[int] = Field(default=None, alias="updateCount") + + +class PVCCLAimInfo(BaseModel): + """Represent the claim information of a Persistent Volume Claim (PVC).""" + + access_modes: Optional[Dict[str, bool]] = Field(default=None, alias="accessModes") + size: Optional[str] = Field(default=None, alias="size") + storage_class: Optional[str] = Field(default=None, alias="storageClass") + volume_mode: Optional[str] = Field(default=None, alias="volumeMode") + + +class PVCSpec(BaseModel): + """Represent the specification of a Persistent Volume Claim (PVC).""" + + claim_name: str = Field(default="", alias="claimName") + path: str = Field(default="", alias="path") + read_only: bool = Field(default=False, alias="readOnly") + ephemeral: bool = Field(default=False, alias="ephemeral") + existing_pvc: bool = Field(default=False, alias="existingPvc") + claim_info: Optional[PVCCLAimInfo] = Field(default=None, alias="claimInfo") + + +class PVCClusterInfo(BaseModel): + """Represent the cluster information of a Persistent Volume Claim (PVC).""" + + resources: List[Dict[str, Any]] = Field(default_factory=list, alias="resources") + + +class RunAIPVC(BaseModel): + """Represent a Persistent Volume Claim (PVC) in the RunAI cluster.""" + + meta: PVCMeta = Field(alias="meta") + spec: PVCSpec = Field(alias="spec") + cluster_info: PVCClusterInfo = Field(alias="clusterInfo") + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + def __repr__(self) -> str: + """Return a string representation of the RunAIPVC object.""" + return ( + f"RunAIPVC(name={self.meta.name!r}, id={self.meta.id!r}, " + f"created_by={self.meta.created_by!r}, scope={self.meta.scope!r}, " + f"cluster_id={self.meta.cluster_id!r}, project_id={self.meta.project_id}, " + f"size={(self.spec.claim_info.size if self.spec.claim_info else None)!r}, " + f"storage_class={(self.spec.claim_info.storage_class if self.spec.claim_info else None)!r}, " + f"path={self.spec.path!r}, claim_name={self.spec.claim_name!r})" + ) diff --git a/src/cloudai/systems/runai/runai_rest_client.py b/src/cloudai/systems/runai/runai_rest_client.py new file mode 100644 index 000000000..b2ed8958d --- /dev/null +++ b/src/cloudai/systems/runai/runai_rest_client.py @@ -0,0 +1,520 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import ssl +from pathlib import Path +from typing import Any, Dict, Optional + +import requests +from websockets.legacy.client import connect + + +class RunAIRestClient: + """ + Client to interact with the RunAI REST API endpoints. + + REST API documentation can be found at https://api-docs.run.ai/latest/ + """ + + def __init__(self, base_url: str, app_id: str, app_secret: str) -> None: + """ + Initialize the client and automatically retrieve the access token. + + To generate `app_id` and `app_secret`, create a new application in the target RunAI cluster with a unique name. + Upon creation, the Client ID and Client secret will be returned, which correspond to `app_id` and + `app_secret` respectively. + """ + self.app_id = app_id + self.app_secret = app_secret + self.base_url = base_url.rstrip("/") + self.session = requests.Session() + self.access_token = self._get_access_token() + self.session.headers.update( + { + "Accept": "application/json", + "Content-Type": "application/json", + "Authorization": f"Bearer {self.access_token}", + } + ) + + # --- Private utility method --- + def _request( + self, + method: str, + path: str, + params: Optional[Dict[str, Any]] = None, + data: Optional[Dict[str, Any]] = None, + ) -> Any: + """Make an HTTP request and return JSON.""" + url: str = f"{self.base_url}{path}" + try: + response = requests.request(method, url, headers=self.session.headers, params=params, json=data) + response.raise_for_status() + return response.json() if response.text else None + except requests.exceptions.HTTPError as http_err: + logging.error(f"HTTP error occurred: {http_err}") + logging.error(f"Status Code: {http_err.response.status_code}") + logging.error(f"Response Content: {http_err.response.text}") + raise + except Exception as err: + logging.error(f"An error occurred: {err}") + raise + + def _get_access_token(self) -> str: + """Retrieve an access token using AppId and AppSecret.""" + url = f"{self.base_url}/api/v1/token" + payload = {"grantType": "app_token", "AppId": self.app_id, "AppSecret": self.app_secret} + + try: + response = self.session.post(url, json=payload) + response.raise_for_status() + data = response.json() + token = data.get("accessToken") + if not token: + raise ValueError("access_token not found in response") + return token + except requests.exceptions.HTTPError as http_err: + logging.error(f"HTTP error occurred while retrieving access token: {http_err}") + logging.error(f"Status Code: {http_err.response.status_code}") + logging.error(f"Response Content: {http_err.response.text}") + raise + except Exception as err: + logging.error(f"An error occurred while retrieving access token: {err}") + raise + + # ============================ Clusters ============================ + def get_clusters(self, params: Optional[Dict[str, Any]] = None) -> Any: + """Get clusters.""" + return self._request("GET", "/api/v1/clusters", params=params) + + def create_cluster(self, cluster_data: Dict[str, Any]) -> Any: + """Create cluster.""" + return self._request("POST", "/api/v1/clusters", data=cluster_data) + + def get_cluster(self, cluster_uuid: str, params: Optional[Dict[str, Any]] = None) -> Any: + """Get cluster by UUID.""" + return self._request("GET", f"/api/v1/clusters/{cluster_uuid}", params=params) + + def update_cluster(self, cluster_uuid: str, cluster_data: Dict[str, Any]) -> Any: + """Update cluster by UUID.""" + return self._request("PUT", f"/api/v1/clusters/{cluster_uuid}", data=cluster_data) + + def delete_cluster(self, cluster_uuid: str, params: Optional[Dict[str, Any]] = None) -> Any: + """Delete cluster by UUID.""" + return self._request("DELETE", f"/api/v1/clusters/{cluster_uuid}", params=params) + + def get_cluster_metrics(self, cluster_uuid: str, params: Dict[str, Any]) -> Any: + """Get cluster metrics.""" + return self._request("GET", f"/api/v1/clusters/{cluster_uuid}/metrics", params=params) + + def get_cluster_install_info(self, cluster_uuid: str, params: Dict[str, Any]) -> Any: + """Get cluster install info.""" + return self._request("GET", f"/api/v1/clusters/{cluster_uuid}/cluster-install-info", params=params) + + # ============================ Node Pools ============================ + def get_node_pools(self, params: Optional[Dict[str, Any]] = None) -> Any: + """Get node pools.""" + return self._request("GET", "/api/v1/node-pools", params=params) + + def count_node_pools(self, params: Optional[Dict[str, Any]] = None) -> Any: + """Count node pools.""" + return self._request("GET", "/api/v1/node-pools/count", params=params) + + def create_node_pool(self, payload: Dict[str, Any]) -> Any: + """Create node pool.""" + return self._request("POST", "/api/v1/node-pools", data=payload) + + def get_node_pool(self, nodepool_id: str) -> Any: + """Get node pool by ID.""" + return self._request("GET", f"/api/v1/node-pools/{nodepool_id}") + + def delete_node_pool(self, nodepool_id: str) -> Any: + """Delete node pool.""" + return self._request("DELETE", f"/api/v1/node-pools/{nodepool_id}") + + def update_node_pool(self, nodepool_id: str, payload: Dict[str, Any]) -> Any: + """Update node pool.""" + return self._request("PUT", f"/api/v1/node-pools/{nodepool_id}", data=payload) + + def patch_node_pool(self, nodepool_id: str, payload: Dict[str, Any]) -> Any: + """Patch node pool.""" + return self._request("PATCH", f"/api/v1/node-pools/{nodepool_id}", data=payload) + + # ============================ Nodes ============================ + def get_nodes(self, cluster_uuid: str, params: Optional[Dict[str, Any]] = None) -> Any: + """Get nodes.""" + return self._request("GET", f"/api/v1/clusters/{cluster_uuid}/nodes", params=params) + + def get_node_telemetry(self, params: Dict[str, Any]) -> Any: + """Get node telemetry.""" + return self._request("GET", "/api/v1/nodes/telemetry", params=params) + + def get_node_metrics(self, node_id: str, params: Dict[str, Any]) -> Any: + """Get node metrics.""" + return self._request("GET", f"/api/v1/nodes/{node_id}/metrics", params=params) + + # ============================ Projects ============================ + def get_projects(self, params: Optional[Dict[str, Any]] = None) -> Any: + """Get projects.""" + return self._request("GET", "/api/v1/org-unit/projects", params=params) + + def create_project(self, project_data: Dict[str, Any]) -> Any: + """Create project.""" + return self._request("POST", "/api/v1/org-unit/projects", data=project_data) + + def update_project(self, project_id: str, project_data: Dict[str, Any]) -> Any: + """Update project.""" + return self._request("PUT", f"/api/v1/org-unit/projects/{project_id}", data=project_data) + + def delete_project(self, project_id: str) -> Any: + """Delete project.""" + return self._request("DELETE", f"/api/v1/org-unit/projects/{project_id}") + + def get_project_metrics(self, project_id: str, params: Dict[str, Any]) -> Any: + """Get project metrics.""" + return self._request("GET", f"/api/v1/org-unit/projects/{project_id}/metrics", params=params) + + def update_project_resources(self, project_id: str, payload: Dict[str, Any]) -> Any: + """Update project resources.""" + return self._request("PUT", f"/api/v1/org-unit/projects/{project_id}/resources", data=payload) + + def patch_project_resources(self, project_id: str, payload: Dict[str, Any]) -> Any: + """Patch project resources.""" + return self._request("PATCH", f"/api/v1/org-unit/projects/{project_id}/resources", data=payload) + + # ============================ Tenant Settings ============================ + def get_tenant_settings(self) -> Any: + """Get tenant settings.""" + return self._request("GET", "/v1/k8s/setting") + + def update_tenant_setting(self, key: str, value: Any) -> Any: + """Update tenant setting.""" + payload: Dict[str, Any] = {"key": key, "value": value} + return self._request("PUT", "/v1/k8s/setting", data=payload) + + def get_tenant_setting(self, setting_key: str) -> Any: + """Get tenant setting by key.""" + return self._request("GET", f"/v1/k8s/setting/{setting_key}") + + # ============================ User Applications ============================ + def get_user_applications(self, params: Optional[Dict[str, Any]] = None) -> Any: + """Get user applications.""" + return self._request("GET", "/api/v1/user-applications", params=params) + + def create_user_application(self, payload: Dict[str, Any]) -> Any: + """Create user application.""" + return self._request("POST", "/api/v1/user-applications", data=payload) + + def get_user_application(self, app_id: str) -> Any: + """Get user application by ID.""" + return self._request("GET", f"/api/v1/user-applications/{app_id}") + + def delete_user_application(self, app_id: str) -> Any: + """Delete user application.""" + return self._request("DELETE", f"/api/v1/user-applications/{app_id}") + + def regenerate_user_app_secret(self, app_id: str) -> Any: + """Regenerate user application secret.""" + return self._request("POST", f"/api/v1/user-applications/{app_id}/secret") + + # ============================ Deployments ============================ + def get_deployments(self, cluster_uuid: str) -> Any: + """Get deployments.""" + return self._request("GET", f"/v1/k8s/clusters/{cluster_uuid}/deployments") + + def get_deployment(self, cluster_uuid: str, deployment_id: str) -> Any: + """Get deployment by ID.""" + return self._request("GET", f"/v1/k8s/clusters/{cluster_uuid}/deployments/{deployment_id}") + + def create_deployment(self, cluster_uuid: str, payload: Dict[str, Any]) -> Any: + """Create deployment.""" + return self._request("POST", f"/v1/k8s/clusters/{cluster_uuid}/deployments", data=payload) + + def update_deployment(self, cluster_uuid: str, deployment_id: str, payload: Dict[str, Any]) -> Any: + """Update deployment.""" + return self._request("PUT", f"/v1/k8s/clusters/{cluster_uuid}/deployments/{deployment_id}", data=payload) + + def delete_deployment(self, cluster_uuid: str, deployment_id: str) -> Any: + """Delete deployment.""" + return self._request("DELETE", f"/v1/k8s/clusters/{cluster_uuid}/deployments/{deployment_id}") + + # ============================ Events ============================ + def get_workload_events(self, workload_id: str, offset: int = 0, limit: int = 100, sort_order: str = "asc") -> Any: + """Retrieve workload events and write to a file.""" + params = { + "offset": offset, + "limit": limit, + "sortOrder": sort_order, + } + return self._request("GET", f"/api/v1/workloads/{workload_id}/events", params=params) + + # ============================ Pods ============================ + def get_workload_pods(self, params: Optional[Dict[str, Any]] = None) -> Any: + """Get workload pods.""" + return self._request("GET", "/api/v1/workloads/pods", params=params) + + def get_workload_pods_count(self, params: Optional[Dict[str, Any]] = None) -> Any: + """Get workload pods count.""" + return self._request("GET", "/api/v1/workloads/pods/count", params=params) + + def get_pod_metrics(self, workload_id: str, pod_id: str, params: Dict[str, Any]) -> Any: + """Get pod metrics.""" + return self._request("GET", f"/api/v1/workloads/{workload_id}/pods/{pod_id}/metrics", params=params) + + # ============================ Trainings ============================ + def create_training(self, payload: Dict[str, Any]) -> Any: + """Create training.""" + return self._request("POST", "/api/v1/workloads/trainings", data=payload) + + def delete_training(self, workload_id: str) -> Any: + """Delete training.""" + return self._request("DELETE", f"/api/v1/workloads/trainings/{workload_id}") + + def get_training(self, workload_id: str) -> Any: + """Get training.""" + return self._request("GET", f"/api/v1/workloads/trainings/{workload_id}") + + def suspend_training(self, workload_id: str) -> Any: + """Suspend training.""" + return self._request("POST", f"/api/v1/workloads/trainings/{workload_id}/suspend") + + def resume_training(self, workload_id: str) -> Any: + """Resume training.""" + return self._request("POST", f"/api/v1/workloads/trainings/{workload_id}/resume") + + # ============================ Inferences ============================ + def get_inference(self, workload_id: str) -> Any: + """Get inference workload.""" + return self._request("GET", f"/api/v1/workloads/inferences/{workload_id}") + + def create_inference(self, payload: Dict[str, Any]) -> Any: + """Create inference.""" + return self._request("POST", "/api/v1/workloads/inferences", data=payload) + + def update_inference_spec(self, workload_id: str, payload: Dict[str, Any]) -> Any: + """Update inference spec.""" + return self._request("PATCH", f"/api/v1/workloads/inferences/{workload_id}", data=payload) + + def delete_inference(self, workload_id: str) -> Any: + """Delete inference.""" + return self._request("DELETE", f"/api/v1/workloads/inferences/{workload_id}") + + def get_inference_metrics(self, workload_id: str, params: Dict[str, Any]) -> Any: + """Get inference metrics.""" + return self._request("GET", f"/api/v1/workloads/inferences/{workload_id}/metrics", params=params) + + def get_inference_pod_metrics(self, workload_id: str, pod_id: str, params: Dict[str, Any]) -> Any: + """Get inference pod metrics.""" + return self._request("GET", f"/api/v1/workloads/inferences/{workload_id}/pods/{pod_id}/metrics", params=params) + + # ============================ Storage Classes ============================ + def get_storage_classes_v1(self, cluster_uuid: str, include_none: bool = False) -> Any: + """Get storage classes (v1).""" + params: Dict[str, Any] = {"includeNone": include_none} + return self._request("GET", f"/v1/k8s/clusters/{cluster_uuid}/storage-classes", params=params) + + def get_storage_classes_v2(self, params: Dict[str, Any]) -> Any: + """Get storage classes (v2).""" + return self._request("GET", "/api/v2/storage-classes", params=params) + + # ============================ NFS Assets ============================ + def get_nfs_assets(self, params: Optional[Dict[str, Any]] = None) -> Any: + """Get NFS assets.""" + return self._request("GET", "/api/v1/asset/datasource/nfs", params=params) + + def create_nfs_asset(self, payload: Dict[str, Any]) -> Any: + """Create NFS asset.""" + return self._request("POST", "/api/v1/asset/datasource/nfs", data=payload) + + def get_nfs_asset(self, asset_id: str) -> Any: + """Get NFS asset by ID.""" + return self._request("GET", f"/api/v1/asset/datasource/nfs/{asset_id}") + + def update_nfs_asset(self, asset_id: str, payload: Dict[str, Any]) -> Any: + """Update NFS asset.""" + return self._request("PUT", f"/api/v1/asset/datasource/nfs/{asset_id}", data=payload) + + def delete_nfs_asset(self, asset_id: str) -> Any: + """Delete NFS asset.""" + return self._request("DELETE", f"/api/v1/asset/datasource/nfs/{asset_id}") + + # ============================ PVC Assets ============================ + def get_pvc_assets(self, params: Optional[Dict[str, Any]] = None) -> Any: + """Get PVC assets.""" + return self._request("GET", "/api/v1/asset/datasource/pvc", params=params) + + def create_pvc_asset(self, payload: Dict[str, Any]) -> Any: + """Create PVC asset.""" + return self._request("POST", "/api/v1/asset/datasource/pvc", data=payload) + + def get_pvc_asset(self, asset_id: str) -> Any: + """Get PVC asset by ID.""" + return self._request("GET", f"/api/v1/asset/datasource/pvc/{asset_id}") + + def update_pvc_asset(self, asset_id: str, payload: Dict[str, Any]) -> Any: + """Update PVC asset.""" + return self._request("PUT", f"/api/v1/asset/datasource/pvc/{asset_id}", data=payload) + + def delete_pvc_asset(self, asset_id: str) -> Any: + """Delete PVC asset.""" + return self._request("DELETE", f"/api/v1/asset/datasource/pvc/{asset_id}") + + # ============================ Registry Assets ============================ + def get_registry_assets(self, params: Optional[Dict[str, Any]] = None) -> Any: + """Get registry assets.""" + return self._request("GET", "/api/v1/asset/registries", params=params) + + def create_registry_asset(self, payload: Dict[str, Any]) -> Any: + """Create registry asset.""" + return self._request("POST", "/api/v1/asset/registries", data=payload) + + def get_registry_asset(self, asset_id: str) -> Any: + """Get registry asset by ID.""" + return self._request("GET", f"/api/v1/asset/registries/{asset_id}") + + def update_registry_asset(self, asset_id: str, payload: Dict[str, Any]) -> Any: + """Update registry asset.""" + return self._request("PUT", f"/api/v1/asset/registries/{asset_id}", data=payload) + + def delete_registry_asset(self, asset_id: str) -> Any: + """Delete registry asset.""" + return self._request("DELETE", f"/api/v1/asset/registries/{asset_id}") + + def get_registry_repositories(self, asset_id: str, params: Optional[Dict[str, Any]] = None) -> Any: + """Get registry repositories.""" + return self._request("GET", f"/api/v1/asset/registries/{asset_id}/repositories", params=params) + + def get_registry_repository_tags( + self, asset_id: str, repository: str, params: Optional[Dict[str, Any]] = None + ) -> Any: + """Get registry repository tags.""" + query_params = params or {} + query_params["repository"] = repository + return self._request("GET", f"/api/v1/asset/registries/{asset_id}/repositories/tags", params=query_params) + + # ============================ S3 Assets ============================ + def get_s3_assets(self, params: Optional[Dict[str, Any]] = None) -> Any: + """Get S3 assets.""" + return self._request("GET", "/api/v1/asset/datasource/s3", params=params) + + def create_s3_asset(self, payload: Dict[str, Any]) -> Any: + """Create S3 asset.""" + return self._request("POST", "/api/v1/asset/datasource/s3", data=payload) + + def get_s3_asset(self, asset_id: str) -> Any: + """Get S3 asset by ID.""" + return self._request("GET", f"/api/v1/asset/datasource/s3/{asset_id}") + + def update_s3_asset(self, asset_id: str, payload: Dict[str, Any]) -> Any: + """Update S3 asset.""" + return self._request("PUT", f"/api/v1/asset/datasource/s3/{asset_id}", data=payload) + + def delete_s3_asset(self, asset_id: str) -> Any: + """Delete S3 asset.""" + return self._request("DELETE", f"/api/v1/asset/datasource/s3/{asset_id}") + + # ============================ ConfigMap Assets ============================ + def get_configmap_assets(self, params: Optional[Dict[str, Any]] = None) -> Any: + """Get ConfigMap assets.""" + return self._request("GET", "/api/v1/asset/datasource/config-map", params=params) + + def create_configmap_asset(self, payload: Dict[str, Any]) -> Any: + """Create ConfigMap asset.""" + return self._request("POST", "/api/v1/asset/datasource/config-map", data=payload) + + def get_configmap_asset(self, asset_id: str) -> Any: + """Get ConfigMap asset by ID.""" + return self._request("GET", f"/api/v1/asset/datasource/config-map/{asset_id}") + + def update_configmap_asset(self, asset_id: str, payload: Dict[str, Any]) -> Any: + """Update ConfigMap asset.""" + return self._request("PUT", f"/api/v1/asset/datasource/config-map/{asset_id}", data=payload) + + def delete_configmap_asset(self, asset_id: str) -> Any: + """Delete ConfigMap asset.""" + return self._request("DELETE", f"/api/v1/asset/datasource/config-map/{asset_id}") + + # ============================ Secret Assets ============================ + def get_secret_assets(self, params: Optional[Dict[str, Any]] = None) -> Any: + """Get Secret assets.""" + return self._request("GET", "/api/v1/asset/datasource/secrets", params=params) + + def create_secret_asset(self, payload: Dict[str, Any]) -> Any: + """Create Secret asset.""" + return self._request("POST", "/api/v1/asset/datasource/secrets", data=payload) + + def get_secret_asset(self, asset_id: str) -> Any: + """Get Secret asset by ID.""" + return self._request("GET", f"/api/v1/asset/datasource/secrets/{asset_id}") + + def update_secret_asset(self, asset_id: str, payload: Dict[str, Any]) -> Any: + """Update Secret asset.""" + return self._request("PUT", f"/api/v1/asset/datasource/secrets/{asset_id}", data=payload) + + def delete_secret_asset(self, asset_id: str) -> Any: + """Delete Secret asset.""" + return self._request("DELETE", f"/api/v1/asset/datasource/secrets/{asset_id}") + + # ============================ Policies ============================ + def get_training_policy(self, params: Dict[str, Any]) -> Any: + """Get training policy.""" + return self._request("GET", "/api/v2/policy/trainings", params=params) + + def update_training_policy(self, payload: Dict[str, Any], validate_only: bool = False) -> Any: + """Update training policy.""" + params: Dict[str, Any] = {"validateOnly": validate_only} if validate_only else {} + return self._request("PATCH", "/api/v2/policy/trainings", params=params, data=payload) + + def overwrite_training_policy(self, payload: Dict[str, Any], validate_only: bool = False) -> Any: + """Overwrite training policy.""" + params: Dict[str, Any] = {"validateOnly": validate_only} if validate_only else {} + return self._request("PUT", "/api/v2/policy/trainings", params=params, data=payload) + + # ============================ Cluster API ============================ + def is_cluster_api_available(self, cluster_domain: str) -> bool: + url = f"{cluster_domain}/cluster-api/status" + headers = { + "Authorization": f"Bearer {self.access_token}", + "User-Agent": "runai-cli/2.21.3-saas.4 sdk/2.74.5 go/go1.23.7 darwin/arm64", + "Accept-Encoding": "gzip", + "Content-Length": "0", + } + response = requests.get(url, headers=headers) + return "OK" in response.text + + async def fetch_training_logs( + self, cluster_domain: str, project_name: str, training_task_name: str, output_file_path: Path + ): + if not self.is_cluster_api_available(cluster_domain): + logging.error("Cluster API status check failed.") + return + + cluster_domain = cluster_domain.replace("https://", "wss://") + url = f"{cluster_domain}/cluster-api/api/v1/{project_name}/workloads/training/runai/{training_task_name}/logs" + headers = { + "Authorization": f"Bearer {self.access_token}", + "User-Agent": "Go-http-client/1.1", + "Accept-Encoding": "gzip", + } + + ssl_context = ssl._create_unverified_context() + async with connect(url, extra_headers=headers, ssl=ssl_context) as websocket: + with output_file_path.open("w") as log_file: + async for message in websocket: + if isinstance(message, bytes): + message = message.decode("utf-8") + log_file.write(message) diff --git a/src/cloudai/systems/runai/runai_system.py b/src/cloudai/systems/runai/runai_system.py new file mode 100644 index 000000000..455f18c84 --- /dev/null +++ b/src/cloudai/systems/runai/runai_system.py @@ -0,0 +1,212 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from pathlib import Path +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, ConfigDict, PrivateAttr + +from cloudai import BaseJob + +from ..._core.system import System +from .runai_cluster import RunAICluster +from .runai_event import RunAIEvent +from .runai_node import RunAINode +from .runai_project import RunAIProject +from .runai_pvc import RunAIPVC +from .runai_rest_client import RunAIRestClient +from .runai_training import ActualPhase, RunAITraining + + +class RunAISystem(BaseModel, System): + """ + RunAISystem integrates with the RunAI platform to manage and monitor jobs and nodes. + + Attributes: + name (str): The name of the RunAI system. + install_path (Path): The installation path for RunAI. + output_path (Path): The path where output is stored. + base_url (str): The base URL for the RunAI API. + app_id (str): The application ID for authentication. + app_secret (str): The application secret for authentication. + scheduler (str): The scheduler type, default is "runai". + global_env_vars (Dict[str, Any]): Global environment variables to be passed to jobs. + nodes (List[RunAINode]): List of nodes in the RunAI cluster. + """ + + model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True) + + name: str + scheduler: str = "runai" + install_path: Path + output_path: Path + global_env_vars: Dict[str, Any] = {} + monitor_interval: int = 60 + base_url: str + user_email: str + app_id: str + app_secret: str + project_id: str + cluster_id: str + nodes: List[RunAINode] = [] + _api_client: Optional[RunAIRestClient] = PrivateAttr(default=None) + + @property + def api_client(self) -> RunAIRestClient: + if self._api_client is None: + self._api_client = RunAIRestClient( + base_url=self.base_url, + app_id=self.app_id, + app_secret=self.app_secret, + ) + return self._api_client + + @api_client.setter + def api_client(self, api_client: RunAIRestClient) -> None: + self._api_client = api_client + + def update(self): + pass + + def is_job_running(self, job: BaseJob) -> bool: + """Return True if the specified job is running in the RunAI cluster.""" + training_data = self.api_client.get_training(str(job.id)) + training = RunAITraining(**training_data) + return training.actual_phase == ActualPhase.RUNNING + + def is_job_completed(self, job: BaseJob) -> bool: + """Return True if the specified job is completed in the RunAI cluster.""" + training_data = self.api_client.get_training(str(job.id)) + training = RunAITraining(**training_data) + return training.actual_phase == ActualPhase.COMPLETED + + def kill(self, job: BaseJob) -> None: + """Terminate a job in the RunAI cluster.""" + response = self.api_client.delete_training(str(job.id)) + if response.status_code == 204: + logging.debug(f"Job {job.id} successfully terminated.") + else: + logging.error(f"Failed to terminate job {job.id}: {response.text}") + + # ============================ Clusters ============================ + def get_clusters(self) -> List[RunAICluster]: + """Fetch and return clusters as RunAICluster objects.""" + clusters_data = self.api_client.get_clusters() + return [RunAICluster(**data) for data in clusters_data.get("clusters", [])] + + # ============================ Projects ============================ + def get_projects(self) -> List[RunAIProject]: + """Fetch and return projects as RunAIProject objects.""" + projects_data = self.api_client.get_projects() + projects = [RunAIProject(**data) for data in projects_data.get("projects", [])] + return [project for project in projects if project.created_by == self.user_email] + + def create_project(self, project_data: Dict[str, Any]) -> RunAIProject: + """Create a project and return it as a RunAIProject object.""" + project_data = self.api_client.create_project(project_data) + return RunAIProject(**project_data) + + def update_project(self, project_id: str, project_data: Dict[str, Any]) -> RunAIProject: + """Update a project and return the updated RunAIProject object.""" + updated_data = self.api_client.update_project(project_id, project_data) + return RunAIProject(**updated_data) + + def delete_project(self, project_id: str) -> None: + """Delete a project by its ID.""" + self.api_client.delete_project(project_id) + + # ============================ PVC Assets ============================ + def get_pvc_assets(self) -> List[RunAIPVC]: + """Fetch and return PVC assets as RunAIPVC objects.""" + pvc_data = self.api_client.get_pvc_assets() + return [RunAIPVC(**pvc) for pvc in pvc_data.get("entries", [])] + + def create_pvc_asset(self, payload: Dict[str, Any]) -> RunAIPVC: + """Create a PVC asset and return it as a RunAIPVC object.""" + pvc_data = self.api_client.create_pvc_asset(payload) + return RunAIPVC(**pvc_data) + + def delete_pvc_asset(self, asset_id: str) -> None: + """Delete a PVC asset by its ID.""" + self.api_client.delete_pvc_asset(asset_id) + + # ============================ Events ============================ + def get_workload_events( + self, workload_id: str, output_file_path: Path, offset: int = 0, limit: int = 100, sort_order: str = "asc" + ) -> None: + """Retrieve workload events and write them to a file.""" + response = self.api_client.get_workload_events(workload_id, offset=offset, limit=limit, sort_order=sort_order) + events_data = response.get("events", []) + + events: List[RunAIEvent] = [RunAIEvent(**event_data) for event_data in events_data] + + with output_file_path.open("w") as file: + for event in events: + file.write(f"{event}\n") + + # ============================ Trainings ============================ + def create_training(self, training_data: Dict[str, Any]) -> RunAITraining: + """Create a training and return it as a RunAITraining object.""" + training_data = self.api_client.create_training(training_data) + return RunAITraining(**training_data) + + def delete_training(self, workload_id: str) -> None: + """Delete a training by its ID.""" + self.api_client.delete_training(workload_id) + + def get_training(self, workload_id: str) -> Any: + """Get a training by its ID.""" + return self.api_client.get_training(workload_id) + + def suspend_training(self, workload_id: str) -> None: + """Suspend a training by its ID.""" + self.api_client.suspend_training(workload_id) + + def resume_training(self, workload_id: str) -> None: + """Resume a training by its ID.""" + self.api_client.resume_training(workload_id) + + # ============================ Logs ============================ + async def store_logs(self, workload_id: str, output_file_path: Path): + """Store logs for a given workload.""" + training_data = self.api_client.get_training(workload_id) + training = RunAITraining(**training_data) + cluster_id = training.cluster_id + + projects_data = self.api_client.get_projects() + projects = [RunAIProject(**data) for data in projects_data.get("projects", [])] + project = next((p for p in projects if p.id == self.project_id), None) + + if not project: + logging.error(f"Project with ID {self.project_id} not found.") + return + + clusters_data = self.api_client.get_clusters() + clusters = [RunAICluster(**data) for data in clusters_data] + cluster = next((c for c in clusters if c.uuid == cluster_id), None) + + if not cluster: + logging.error(f"Cluster with ID {cluster_id} not found.") + return + + cluster_domain = cluster.domain + + if not cluster_domain: + logging.error(f"Domain for cluster {cluster_id} not found.") + return + + await self.api_client.fetch_training_logs(cluster_domain, project.name, training.name, output_file_path) diff --git a/src/cloudai/systems/runai/runai_training.py b/src/cloudai/systems/runai/runai_training.py new file mode 100644 index 000000000..683808f99 --- /dev/null +++ b/src/cloudai/systems/runai/runai_training.py @@ -0,0 +1,123 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime +from enum import Enum +from typing import ClassVar, List, Optional + +from pydantic import BaseModel, Field +from pydantic.config import ConfigDict + + +class WorkloadPhase(str, Enum): + """The phase of the workload.""" + + RUNNING = "Running" + STOPPED = "Stopped" + DELETED = "Deleted" + + +class ActualPhase(str, Enum): + """The actual phase of the workload.""" + + CREATING = "Creating" + INITIALIZING = "Initializing" + RESUMING = "Resuming" + PENDING = "Pending" + DELETING = "Deleting" + RUNNING = "Running" + UPDATING = "Updating" + STOPPED = "Stopped" + STOPPING = "Stopping" + DEGRADED = "Degraded" + FAILED = "Failed" + COMPLETED = "Completed" + TERMINATING = "Terminating" + UNKNOWN = "Unknown" + + +class Compute(BaseModel): + """Represent the compute resources requested for the workload.""" + + gpu_devices_request: Optional[int] = Field(default=None, alias="gpuDevicesRequest") + gpu_request_type: Optional[str] = Field(default=None, alias="gpuRequestType") + gpu_portion_request: Optional[float] = Field(default=None, alias="gpuPortionRequest") + gpu_portion_limit: Optional[float] = Field(default=None, alias="gpuPortionLimit") + gpu_memory_request: Optional[str] = Field(default=None, alias="gpuMemoryRequest") + gpu_memory_limit: Optional[str] = Field(default=None, alias="gpuMemoryLimit") + mig_profile: Optional[str] = Field(default=None, alias="migProfile") + cpu_core_request: Optional[float] = Field(default=None, alias="cpuCoreRequest") + cpu_core_limit: Optional[float] = Field(default=None, alias="cpuCoreLimit") + cpu_memory_request: Optional[str] = Field(default=None, alias="cpuMemoryRequest") + cpu_memory_limit: Optional[str] = Field(default=None, alias="cpuMemoryLimit") + large_shm_request: Optional[bool] = Field(default=None, alias="largeShmRequest") + + +class StoragePVC(BaseModel): + """Represent the storage resources requested for the workload.""" + + claim_name: str = Field(default="", alias="claimName") + path: str = Field(default="", alias="path") + existing_pvc: Optional[bool] = Field(default=False, alias="existingPvc") + + +class Storage(BaseModel): + """Represent the storage configuration for the workload.""" + + pvc: List[StoragePVC] = Field(default_factory=list, alias="pvc") + + +class Security(BaseModel): + """Represent the security settings for the workload.""" + + run_as_uid: Optional[int] = Field(default=None, alias="runAsUid") + run_as_gid: Optional[int] = Field(default=None, alias="runAsGid") + allow_privilege_escalation: Optional[bool] = Field(default=None, alias="allowPrivilegeEscalation") + host_ipc: Optional[bool] = Field(default=None, alias="hostIpc") + host_network: Optional[bool] = Field(default=None, alias="hostNetwork") + + +class TrainingSpec(BaseModel): + """Represent the specification of the training workload.""" + + command: Optional[str] = Field(default=None, alias="command") + args: Optional[str] = Field(default=None, alias="args") + image: Optional[str] = Field(default=None, alias="image") + restart_policy: Optional[str] = Field(default=None, alias="restartPolicy") + node_pools: List[str] = Field(default_factory=list, alias="nodePools") + compute: Optional[Compute] = Field(default=None, alias="compute") + storage: Optional[Storage] = Field(default=None, alias="storage") + security: Optional[Security] = Field(default=None, alias="security") + completions: Optional[int] = Field(default=None, alias="completions") + parallelism: Optional[int] = Field(default=None, alias="parallelism") + + +class RunAITraining(BaseModel): + """Represent a training workload in the RunAI system.""" + + name: str = Field(alias="name") + requested_name: str = Field(alias="requestedName") + workload_id: str = Field(alias="workloadId") + project_id: str = Field(alias="projectId") + department_id: Optional[str] = Field(default=None, alias="departmentId") + cluster_id: str = Field(alias="clusterId") + created_by: str = Field(alias="createdBy") + created_at: datetime = Field(alias="createdAt") + deleted_at: Optional[datetime] = Field(default=None, alias="deletedAt") + desired_phase: WorkloadPhase = Field(alias="desiredPhase") + actual_phase: ActualPhase = Field(alias="actualPhase") + spec: TrainingSpec = Field(alias="spec") + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") diff --git a/src/cloudai/workloads/nccl_test/__init__.py b/src/cloudai/workloads/nccl_test/__init__.py index 2d7f8e699..38b53e2cd 100644 --- a/src/cloudai/workloads/nccl_test/__init__.py +++ b/src/cloudai/workloads/nccl_test/__init__.py @@ -20,6 +20,7 @@ from .nccl import NCCLCmdArgs, NCCLTestDefinition from .performance_report_generation_strategy import NcclTestPerformanceReportGenerationStrategy from .prediction_report_generation_strategy import NcclTestPredictionReportGenerationStrategy +from .runai_json_gen_strategy import NcclTestRunAIJsonGenStrategy from .slurm_command_gen_strategy import NcclTestSlurmCommandGenStrategy __all__ = [ @@ -30,5 +31,6 @@ "NcclTestKubernetesJsonGenStrategy", "NcclTestPerformanceReportGenerationStrategy", "NcclTestPredictionReportGenerationStrategy", + "NcclTestRunAIJsonGenStrategy", "NcclTestSlurmCommandGenStrategy", ] diff --git a/src/cloudai/workloads/nccl_test/runai_json_gen_strategy.py b/src/cloudai/workloads/nccl_test/runai_json_gen_strategy.py new file mode 100644 index 000000000..ef4141362 --- /dev/null +++ b/src/cloudai/workloads/nccl_test/runai_json_gen_strategy.py @@ -0,0 +1,59 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime +from typing import Any, Dict, cast + +from cloudai import JsonGenStrategy, TestRun +from cloudai.systems.runai.runai_system import RunAISystem +from cloudai.workloads.nccl_test import NCCLTestDefinition + + +class NcclTestRunAIJsonGenStrategy(JsonGenStrategy): + """JSON generation strategy for NCCL tests on RunAI systems.""" + + def gen_json(self, tr: TestRun) -> Dict[Any, Any]: + runai_system = cast(RunAISystem, self.system) + tdef: NCCLTestDefinition = cast(NCCLTestDefinition, tr.test.test_definition) + project_id = runai_system.project_id + cluster_id = runai_system.cluster_id + + postfix = datetime.utcnow().strftime("%Y%m%d%H%M%S") + name = f"nccl-test-{postfix}" + + training_payload = { + "name": name, + "useGivenNameAsPrefix": False, + "projectId": project_id, + "clusterId": cluster_id, + "spec": { + "command": tr.test.test_definition.cmd_args.subtest_name, + "args": " ".join( + [ + f"--{arg} {getattr(tr.test.test_definition.cmd_args, arg)}" + for arg in tr.test.test_definition.cmd_args.model_dump() + if arg not in {"docker_image_url", "subtest_name"} + ] + + ([tr.test.extra_cmd_args] if tr.test.extra_cmd_args else []) + ), + "image": tdef.docker_image.installed_path, + "compute": {"gpuDevicesRequest": 8}, + "parallelism": tr.num_nodes, + "completions": tr.num_nodes, + }, + } + + return training_payload diff --git a/tests/conftest.py b/tests/conftest.py index d2fff6de9..447a68c2a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,6 +21,7 @@ import pytest from cloudai import TestDefinition +from cloudai.systems.runai.runai_system import RunAISystem from cloudai.systems.slurm.slurm_system import SlurmGroup, SlurmPartition, SlurmSystem @@ -58,6 +59,25 @@ def slurm_system(tmp_path: Path) -> SlurmSystem: return system +@pytest.fixture +def runai_system(tmp_path: Path) -> RunAISystem: + system = RunAISystem( + name="test_runai_system", + install_path=tmp_path / "install", + output_path=tmp_path / "output", + base_url="http://runai.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_id="test_project_id", + cluster_id="test_cluster_id", + scheduler="runai", + global_env_vars={}, + monitor_interval=60, + user_email="test_user@example.com", + ) + return system + + class MyTestDefinition(TestDefinition): @property def installables(self): diff --git a/tests/json_gen_strategy/test_nccl_runai_json_gen_strategy.py b/tests/json_gen_strategy/test_nccl_runai_json_gen_strategy.py new file mode 100644 index 000000000..ca212841e --- /dev/null +++ b/tests/json_gen_strategy/test_nccl_runai_json_gen_strategy.py @@ -0,0 +1,126 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest.mock import Mock + +import pytest + +from cloudai._core.test import Test +from cloudai._core.test_scenario import TestRun +from cloudai.systems.runai.runai_system import RunAISystem +from cloudai.workloads.nccl_test import NCCLCmdArgs, NCCLTestDefinition, NcclTestRunAIJsonGenStrategy + + +class TestNcclTestRunAIJsonGenStrategy: + @pytest.fixture + def json_gen_strategy(self, runai_system: RunAISystem) -> NcclTestRunAIJsonGenStrategy: + return NcclTestRunAIJsonGenStrategy(runai_system, {}) + + def test_gen_json(self, json_gen_strategy: NcclTestRunAIJsonGenStrategy) -> None: + cmd_args = NCCLCmdArgs.model_validate({"subtest_name": "all_reduce_perf", "docker_image_url": "fake_image_url"}) + nccl = NCCLTestDefinition(name="name", description="desc", test_template_name="tt", cmd_args=cmd_args) + t = Test(test_definition=nccl, test_template=Mock()) + tr = TestRun(name="t1", test=t, nodes=["node1", "node2"], num_nodes=2) + + json_payload = json_gen_strategy.gen_json(tr) + + assert json_payload["projectId"] == "test_project_id" + assert json_payload["clusterId"] == "test_cluster_id" + assert json_payload["spec"]["command"] == "all_reduce_perf" + assert json_payload["spec"]["image"] == "fake_image_url" + assert json_payload["spec"]["parallelism"] == 2 + assert json_payload["spec"]["completions"] == 2 + + def test_gen_json_with_cmd_args(self, json_gen_strategy: NcclTestRunAIJsonGenStrategy) -> None: + cmd_args = NCCLCmdArgs.model_validate( + { + "subtest_name": "all_reduce_perf", + "docker_image_url": "fake_image_url", + "nthreads": "4", + "ngpus": "2", + "minbytes": "32M", + "maxbytes": "64M", + "stepbytes": "1M", + "op": "sum", + "datatype": "float", + "root": "0", + "iters": "20", + "warmup_iters": "5", + "agg_iters": "1", + "average": "1", + "parallel_init": "0", + "check": "1", + "blocking": "0", + "cudagraph": "0", + } + ) + nccl = NCCLTestDefinition(name="name", description="desc", test_template_name="tt", cmd_args=cmd_args) + t = Test(test_definition=nccl, test_template=Mock()) + tr = TestRun(name="t1", test=t, nodes=["node1", "node2"], num_nodes=2) + + json_payload = json_gen_strategy.gen_json(tr) + + assert json_payload["projectId"] == "test_project_id" + assert json_payload["clusterId"] == "test_cluster_id" + assert json_payload["spec"]["command"] == "all_reduce_perf" + + expected_args = [ + "--nthreads 4", + "--ngpus 2", + "--minbytes 32M", + "--maxbytes 64M", + "--stepbytes 1M", + "--op sum", + "--datatype float", + "--root 0", + "--iters 20", + "--warmup_iters 5", + "--agg_iters 1", + "--average 1", + "--parallel_init 0", + "--check 1", + "--blocking 0", + "--cudagraph 0", + ] + + for arg in expected_args: + assert arg in json_payload["spec"]["args"] + + assert json_payload["spec"]["image"] == "fake_image_url" + assert json_payload["spec"]["parallelism"] == 2 + assert json_payload["spec"]["completions"] == 2 + + def test_gen_json_with_extra_cmd_args(self, json_gen_strategy: NcclTestRunAIJsonGenStrategy) -> None: + cmd_args = NCCLCmdArgs.model_validate({"subtest_name": "all_reduce_perf", "docker_image_url": "fake_image_url"}) + nccl = NCCLTestDefinition( + name="name", + description="desc", + test_template_name="tt", + cmd_args=cmd_args, + extra_cmd_args={"--extra-arg": "value"}, + ) + t = Test(test_definition=nccl, test_template=Mock()) + tr = TestRun(name="t1", test=t, nodes=["node1", "node2"], num_nodes=2) + + json_payload = json_gen_strategy.gen_json(tr) + + assert json_payload["projectId"] == "test_project_id" + assert json_payload["clusterId"] == "test_cluster_id" + assert json_payload["spec"]["command"] == "all_reduce_perf" + assert "--extra-arg value" in json_payload["spec"]["args"] + assert json_payload["spec"]["image"] == "fake_image_url" + assert json_payload["spec"]["parallelism"] == 2 + assert json_payload["spec"]["completions"] == 2 diff --git a/tests/runai/test_runai_cluster.py b/tests/runai/test_runai_cluster.py new file mode 100644 index 000000000..f8b360783 --- /dev/null +++ b/tests/runai/test_runai_cluster.py @@ -0,0 +1,87 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Dict + +import pytest + +from cloudai.systems.runai.runai_cluster import ClusterState, RunAICluster + + +@pytest.fixture +def cluster_data() -> Dict[str, Any]: + return { + "createdAt": "2025-01-01T00:00:00Z", + "domain": "https://dummy-cluster.example.com", + "lastLiveness": "2025-01-02T00:00:00Z", + "name": "dummy-cluster-name", + "status": { + "state": "Disconnected", + "conditions": [], + "platform": {"kubeVersion": "v0.0.0-dummy", "type": "dummy-platform"}, + "config": {}, + "dependencies": {}, + "operands": {}, + }, + "tenantId": 9999, + "updatedAt": "2025-01-01T00:00:00Z", + "uuid": "00000000-0000-0000-0000-000000000000", + "version": "0.0.0", + } + + +def test_cluster_state_enum() -> None: + assert ClusterState.from_str("Connected") == ClusterState.CONNECTED + assert ClusterState.from_str("Disconnected") == ClusterState.DISCONNECTED + assert ClusterState.from_str("UnknownState") == ClusterState.UNKNOWN + + +def test_cluster_initialization(cluster_data: Dict[str, Any]) -> None: + cluster = RunAICluster(**cluster_data) + assert cluster.name == cluster_data["name"] + assert cluster.uuid == cluster_data["uuid"] + assert cluster.tenant_id == cluster_data["tenantId"] + assert cluster.domain == cluster_data["domain"] + assert cluster.version == cluster_data["version"] + assert cluster.created_at == cluster_data["createdAt"] + assert cluster.updated_at == cluster_data["updatedAt"] + assert cluster.last_liveness == cluster_data["lastLiveness"] + assert cluster.status.state == ClusterState.DISCONNECTED + + +def test_cluster_state_disconnected(cluster_data: Dict[str, Any]) -> None: + cluster = RunAICluster(**cluster_data) + assert not cluster.is_connected() + + +def test_cluster_state_connected(cluster_data: Dict[str, Any]) -> None: + cluster_data["status"]["state"] = "Connected" + cluster = RunAICluster(**cluster_data) + assert cluster.is_connected() + + +def test_cluster_kubernetes_version(cluster_data: Dict[str, Any]) -> None: + cluster = RunAICluster(**cluster_data) + assert cluster.get_kubernetes_version() == "v0.0.0-dummy" + + +def test_repr(cluster_data: Dict[str, Any]) -> None: + cluster = RunAICluster(**cluster_data) + output = repr(cluster) + assert isinstance(output, str) + assert cluster.name is not None and cluster.name in output + assert cluster.uuid is not None and cluster.uuid in output + assert cluster.version is not None and cluster.version in output diff --git a/tests/runai/test_runai_event.py b/tests/runai/test_runai_event.py new file mode 100644 index 000000000..e424d1a68 --- /dev/null +++ b/tests/runai/test_runai_event.py @@ -0,0 +1,87 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime, timezone +from typing import Any, Dict + +import pytest + +from cloudai.systems.runai.runai_event import InvolvedObject, RunAIEvent + + +@pytest.fixture +def involved_object_data() -> Dict[str, Any]: + return {"uid": "abc-123", "kind": "Pod", "name": "example-pod", "namespace": "default"} + + +@pytest.fixture +def runai_event_data(involved_object_data: Dict[str, Any]) -> Dict[str, Any]: + return { + "createdAt": "2025-04-01T12:00:00Z", + "id": "event-001", + "type": "Warning", + "clusterId": "cluster-123", + "message": "GPU resource exhausted", + "reason": "OutOfGpu", + "source": "scheduler", + "involvedObject": involved_object_data, + } + + +def test_involved_object_initialization(involved_object_data: Dict[str, Any]) -> None: + obj = InvolvedObject(**involved_object_data) + assert obj.uid == involved_object_data["uid"] + assert obj.kind == involved_object_data["kind"] + assert obj.name == involved_object_data["name"] + assert obj.namespace == involved_object_data["namespace"] + + +def test_involved_object_defaults() -> None: + obj = InvolvedObject() + assert obj.uid == "" + assert obj.kind == "" + assert obj.name == "" + assert obj.namespace == "" + + +def test_runai_event_initialization(runai_event_data: Dict[str, Any]) -> None: + runai_event_data["createdAt"] = datetime.fromisoformat(runai_event_data["createdAt"].replace("Z", "+00:00")) + runai_event_data["involvedObject"] = InvolvedObject(**runai_event_data["involvedObject"]) + + event = RunAIEvent(**runai_event_data) + expected_datetime = datetime(2025, 4, 1, 12, 0, 0, tzinfo=timezone.utc) + assert event.created_at == expected_datetime + assert event.id == runai_event_data["id"] + assert event.type == runai_event_data["type"] + assert event.cluster_id == runai_event_data["clusterId"] + assert event.message == runai_event_data["message"] + assert event.reason == runai_event_data["reason"] + assert event.source == runai_event_data["source"] + assert isinstance(event.involved_object, InvolvedObject) + assert event.involved_object.uid == runai_event_data["involvedObject"].uid + + +@pytest.mark.parametrize( + "field,expected", [("id", ""), ("type", ""), ("cluster_id", ""), ("message", ""), ("reason", ""), ("source", "")] +) +def test_runai_event_defaults(field: str, expected: str) -> None: + minimal_data = { + "createdAt": datetime.fromisoformat("2025-04-01T12:00:00Z".replace("Z", "+00:00")), + "involvedObject": {}, + } + event = RunAIEvent(**minimal_data) + assert getattr(event, field) == expected + assert isinstance(event.involved_object, InvolvedObject) diff --git a/tests/runai/test_runai_node.py b/tests/runai/test_runai_node.py new file mode 100644 index 000000000..bb35413eb --- /dev/null +++ b/tests/runai/test_runai_node.py @@ -0,0 +1,82 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from cloudai.systems.runai.runai_node import NodeStatus, RunAINode + + +@pytest.mark.parametrize( + "input_status, expected_status", + [ + ("Ready", NodeStatus.READY), + ("NotReady", NodeStatus.NOT_READY), + ("Unknown", NodeStatus.UNKNOWN), + ("GarbageStatus", NodeStatus.UNKNOWN), + ("", NodeStatus.UNKNOWN), + (None, NodeStatus.UNKNOWN), + ], +) +def test_node_status_parsing(input_status, expected_status) -> None: + status = NodeStatus.from_str(input_status if input_status is not None else "") + assert status == expected_status + + +@pytest.fixture +def sample_node_data() -> dict: + return { + "status": "Ready", + "conditions": [{"type": "MemoryPressure", "reason": "KubeletHasSufficientMemory"}], + "taints": [{"key": "dedicated", "value": "gpu", "effect": "NoSchedule"}], + "nodePool": "gpu-pool", + "createdAt": "2025-01-01T00:00:00Z", + "gpuInfo": { + "gpuType": "NVIDIA-A100", + "gpuCount": 8, + "nvLinkDomainUid": "domain-1234", + "nvLinkCliqueId": "clique-5678", + }, + "name": "node-1", + "id": "node-uuid", + "clusterUuid": "cluster-uuid", + "updatedAt": "2025-01-02T00:00:00Z", + } + + +def test_runai_node_properties(sample_node_data) -> None: + node = RunAINode(**sample_node_data) + + assert node.status == NodeStatus.READY + assert node.conditions == sample_node_data["conditions"] + assert node.taints == sample_node_data["taints"] + assert node.node_pool == "gpu-pool" + assert node.created_at == "2025-01-01T00:00:00Z" + assert node.gpu_type == "NVIDIA-A100" + assert node.gpu_count == 8 + assert node.nvlink_domain_uid == "domain-1234" + assert node.nvlink_clique_id == "clique-5678" + assert node.name == "node-1" + assert node.id == "node-uuid" + assert node.cluster_uuid == "cluster-uuid" + assert node.updated_at == "2025-01-02T00:00:00Z" + + +def test_runai_node_repr(sample_node_data) -> None: + node = RunAINode(**sample_node_data) + text = repr(node) + assert "RunAINode(name='node-1'" in text + assert "gpu_count=8" in text + assert "status='Ready'" in text diff --git a/tests/runai/test_runai_project.py b/tests/runai/test_runai_project.py new file mode 100644 index 000000000..df4dcc727 --- /dev/null +++ b/tests/runai/test_runai_project.py @@ -0,0 +1,95 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Dict + +import pytest + +from cloudai.systems.runai.runai_project import RunAIProject + + +@pytest.fixture +def example_data() -> Dict[str, Any]: + return { + "description": "string", + "schedulingRules": { + "interactiveJobTimeLimitSeconds": 100, + "interactiveJobMaxIdleDurationSeconds": 100, + "interactiveJobPreemptIdleDurationSeconds": 100, + "trainingJobMaxIdleDurationSeconds": 100, + "trainingJobTimeLimitSeconds": 100, + }, + "defaultNodePools": ["string"], + "nodeTypes": { + "training": ["string"], + "workspace": ["string"], + "names": {"property1": "string", "property2": "string"}, + }, + "resources": [ + { + "nodePool": {"id": 22, "name": "default"}, + "gpu": {"deserved": 1000, "limit": 0, "overQuotaWeight": 2}, + "cpu": {"deserved": 1000, "limit": 0, "overQuotaWeight": 2}, + "memory": {"deserved": 1000, "limit": 0, "overQuotaWeight": 2, "units": "Mib"}, + "priority": "Normal", + } + ], + "name": "organization1", + "clusterId": "71f69d83-ba66-4822-adf5-55ce55efd210", + "requestedNamespace": "runai-proj1", + "enforceRunaiScheduler": True, + "parentId": "53a9228e-a722-420d-a102-9dc90da2efca", + } + + +def test_project_initialization(example_data: Dict[str, Any]) -> None: + project = RunAIProject(**example_data) + assert project.name == "organization1" + assert project.cluster_id == "71f69d83-ba66-4822-adf5-55ce55efd210" + assert project.description == "string" + assert project.scheduling_rules == { + "interactiveJobTimeLimitSeconds": 100, + "interactiveJobMaxIdleDurationSeconds": 100, + "interactiveJobPreemptIdleDurationSeconds": 100, + "trainingJobMaxIdleDurationSeconds": 100, + "trainingJobTimeLimitSeconds": 100, + } + assert project.default_node_pools == ["string"] + assert project.node_types == { + "training": ["string"], + "workspace": ["string"], + "names": {"property1": "string", "property2": "string"}, + } + assert project.resources == [ + { + "nodePool": {"id": 22, "name": "default"}, + "gpu": {"deserved": 1000, "limit": 0, "overQuotaWeight": 2}, + "cpu": {"deserved": 1000, "limit": 0, "overQuotaWeight": 2}, + "memory": {"deserved": 1000, "limit": 0, "overQuotaWeight": 2, "units": "Mib"}, + "priority": "Normal", + } + ] + assert project.requested_namespace == "runai-proj1" + assert project.enforce_runai_scheduler is True + assert project.parent_id == "53a9228e-a722-420d-a102-9dc90da2efca" + + +def test_repr_output(example_data: Dict[str, Any]) -> None: + project = RunAIProject(**example_data) + output = repr(project) + assert "RunAIProject" in output + assert "organization1" in output + assert "71f69d83-ba66-4822-adf5-55ce55efd210" in output diff --git a/tests/runai/test_runai_pvc.py b/tests/runai/test_runai_pvc.py new file mode 100644 index 000000000..96549afbf --- /dev/null +++ b/tests/runai/test_runai_pvc.py @@ -0,0 +1,117 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Dict + +import pytest + +from cloudai.systems.runai.runai_pvc import RunAIPVC + + +@pytest.fixture +def raw_pvc() -> Dict[str, Any]: + return { + "meta": { + "id": "pvc-123", + "name": "test-pvc", + "kind": "PersistentVolumeClaim", + "scope": "namespace", + "clusterId": "cluster-001", + "tenantId": 42, + "createdAt": "2024-01-01T00:00:00Z", + "updatedAt": "2024-01-02T00:00:00Z", + "deletedAt": None, + "createdBy": "user-1", + "updatedBy": "user-2", + "description": "test pvc", + "autoDelete": True, + "projectId": 1001, + }, + "spec": { + "claimName": "claim-001", + "path": "/mnt/data", + "readOnly": False, + "ephemeral": True, + "existingPvc": True, + "claimInfo": { + "accessModes": {"ReadWriteOnce": True}, + "size": "10Gi", + "storageClass": "standard", + "volumeMode": "Filesystem", + }, + }, + "clusterInfo": {"resources": [{"cpu": "4", "memory": "16Gi"}]}, + } + + +def test_pvc_initialization(raw_pvc: Dict[str, Any]) -> None: + pvc = RunAIPVC(**raw_pvc) + assert pvc.meta.name == "test-pvc" + assert pvc.meta.description == "test pvc" + assert pvc.meta.scope == "namespace" + assert pvc.meta.cluster_id == "cluster-001" + assert pvc.meta.department_id is None + assert pvc.meta.project_id == 1001 + assert pvc.meta.auto_delete is True + assert pvc.meta.workload_supported_types is None + assert pvc.meta.id == "pvc-123" + assert pvc.meta.kind == "PersistentVolumeClaim" + assert pvc.meta.tenant_id == 42 + assert pvc.meta.created_by == "user-1" + assert pvc.meta.created_at == "2024-01-01T00:00:00Z" + assert pvc.meta.updated_by == "user-2" + assert pvc.meta.updated_at == "2024-01-02T00:00:00Z" + assert pvc.meta.deleted_at is None + assert pvc.meta.deleted_by is None + assert pvc.meta.project_name is None + assert pvc.meta.update_count is None + assert pvc.spec.claim_name == "claim-001" + assert pvc.spec.path == "/mnt/data" + assert pvc.spec.read_only is False + assert pvc.spec.ephemeral is True + assert pvc.spec.existing_pvc is True + if pvc.spec.claim_info: + assert pvc.spec.claim_info.access_modes == {"ReadWriteOnce": True} + assert pvc.spec.claim_info.size == "10Gi" + assert pvc.spec.claim_info.storage_class == "standard" + assert pvc.spec.claim_info.volume_mode == "Filesystem" + assert pvc.cluster_info.resources == [{"cpu": "4", "memory": "16Gi"}] + + +@pytest.mark.parametrize( + "optional_fields", + [ + {}, + {"ephemeral": False, "existingPvc": False}, + {"claimInfo": {"accessModes": None, "size": None, "storageClass": None, "volumeMode": None}}, + ], +) +def test_pvc_optional_fields(raw_pvc: Dict[str, Any], optional_fields: Dict[str, Any]) -> None: + raw_pvc_mod = raw_pvc.copy() + raw_pvc_mod["spec"] = {**raw_pvc["spec"], **optional_fields} + pvc = RunAIPVC(**raw_pvc_mod) + assert isinstance(pvc, RunAIPVC) + + +def test_pvc_repr(raw_pvc: Dict[str, Any]) -> None: + pvc = RunAIPVC(**raw_pvc) + expected = ( + "RunAIPVC(name='test-pvc', id='pvc-123', created_by='user-1', " + "scope='namespace', cluster_id='cluster-001', project_id=1001, " + "size='10Gi', storage_class='standard', path='/mnt/data', " + "claim_name='claim-001')" + ) + assert repr(pvc) == expected diff --git a/tests/runai/test_runai_rest_client.py b/tests/runai/test_runai_rest_client.py new file mode 100644 index 000000000..515d8f790 --- /dev/null +++ b/tests/runai/test_runai_rest_client.py @@ -0,0 +1,138 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Dict, Optional, Tuple + +import pytest +import requests +from requests import Response + +from cloudai.systems.runai.runai_rest_client import RunAIRestClient + + +class DummyResponse(Response): + def __init__(self, json_data: Dict[str, Any], status_code: int = 200) -> None: + super().__init__() + self._json: Dict[str, Any] = json_data + self.status_code: int = status_code + self._content = b"non-empty" if json_data else b"" + + def json( + self, + *, + cls: Optional[Any] = None, + object_hook: Optional[Any] = None, + parse_float: Optional[Any] = None, + parse_int: Optional[Any] = None, + parse_constant: Optional[Any] = None, + object_pairs_hook: Optional[Any] = None, + **kwds: Any, + ) -> Dict[str, Any]: + _ = (cls, object_hook, parse_float, parse_int, parse_constant, object_pairs_hook, kwds) + return self._json + + +@pytest.fixture +def dummy_token_response() -> DummyResponse: + return DummyResponse({"accessToken": "dummy_token"}) + + +@pytest.fixture +def dummy_client(monkeypatch: pytest.MonkeyPatch, dummy_token_response: DummyResponse) -> RunAIRestClient: + def fake_post(self: requests.Session, url: str, json: Dict[str, Any]) -> DummyResponse: + return dummy_token_response + + monkeypatch.setattr(requests.Session, "post", fake_post) + return RunAIRestClient("http://dummy", "app_id", "app_secret") + + +def test_init_access_token(dummy_client: RunAIRestClient) -> None: + assert dummy_client.access_token == "dummy_token" + assert "Bearer dummy_token" in str(dummy_client.session.headers.get("Authorization", "")) + + +def test_get_access_token_error(monkeypatch: pytest.MonkeyPatch) -> None: + def fake_post(self: requests.Session, url: str, json: Dict[str, Any]) -> DummyResponse: + return DummyResponse({}, 200) + + monkeypatch.setattr(requests.Session, "post", fake_post) + with pytest.raises(ValueError): + RunAIRestClient("http://dummy", "app_id", "app_secret") + + +def test_request_success(dummy_client: RunAIRestClient, monkeypatch: pytest.MonkeyPatch) -> None: + def fake_request( + method: str, + url: str, + headers: Dict[str, Any], + params: Optional[Dict[str, Any]] = None, + json: Optional[Dict[str, Any]] = None, + ) -> DummyResponse: + return DummyResponse({"result": "ok"}, 200) + + monkeypatch.setattr(requests, "request", fake_request) + result = dummy_client._request("GET", "/test", params={"a": 1}) + assert result == {"result": "ok"} + + +def test_request_http_error(dummy_client: RunAIRestClient, monkeypatch: pytest.MonkeyPatch) -> None: + def fake_request( + method: str, + url: str, + headers: Dict[str, Any], + params: Optional[Dict[str, Any]] = None, + json: Optional[Dict[str, Any]] = None, + ) -> DummyResponse: + return DummyResponse({"error": "fail"}, 400) + + monkeypatch.setattr(requests, "request", fake_request) + with pytest.raises(requests.exceptions.HTTPError): + dummy_client._request("GET", "/error") + + +@pytest.mark.parametrize( + "method_name, args, kwargs, expected_method, expected_path", + [ + ("get_clusters", (), {"params": {"key": "value"}}, "GET", "/api/v1/clusters"), + ("create_cluster", ({"name": "test"},), {}, "POST", "/api/v1/clusters"), + ], +) +def test_endpoints( + dummy_client: RunAIRestClient, + monkeypatch: pytest.MonkeyPatch, + method_name: str, + args: Tuple[Any, ...], + kwargs: Dict[str, Any], + expected_method: str, + expected_path: str, +) -> None: + def fake_request( + method: str, path: str, params: Optional[Dict[str, Any]] = None, data: Optional[Dict[str, Any]] = None + ) -> Dict[str, Any]: + return {"method": method, "path": path, "params": params, "data": data} + + monkeypatch.setattr(dummy_client, "_request", fake_request) + result = getattr(dummy_client, method_name)(*args, **kwargs) + assert result["method"] == expected_method + assert result["path"] == expected_path + if "params" in kwargs: + assert result["params"] == kwargs["params"] + else: + assert result["params"] is None + if args and expected_method in {"POST", "PUT", "PATCH"}: + assert result["data"] == args[0] + else: + assert result["data"] is None diff --git a/tests/runai/test_runai_training.py b/tests/runai/test_runai_training.py new file mode 100644 index 000000000..62a1a1682 --- /dev/null +++ b/tests/runai/test_runai_training.py @@ -0,0 +1,103 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime +from typing import Any, Dict + +import pytest + +from cloudai.systems.runai.runai_training import ActualPhase, RunAITraining, TrainingSpec, WorkloadPhase + + +@pytest.fixture +def training_data() -> Dict[str, Any]: + return { + "name": "test-training", + "requestedName": "test-training-request", + "workloadId": "workload-123", + "projectId": "project-456", + "clusterId": "cluster-789", + "createdBy": "user@example.com", + "createdAt": "2025-01-01T00:00:00Z", + "deletedAt": None, + "desiredPhase": "Running", + "actualPhase": "Initializing", + "spec": { + "command": "echo Hello World", + "args": "--verbose", + "image": "test-image", + "restartPolicy": "Always", + "nodePools": ["pool1", "pool2"], + "compute": { + "gpuDevicesRequest": 1, + "cpuCoreRequest": 2.0, + }, + "storage": {"pvc": [{"claimName": "pvc1", "path": "/mnt/data"}]}, + "security": {"runAsUid": 1000, "allowPrivilegeEscalation": False}, + "completions": 1, + "parallelism": 1, + }, + } + + +def test_training_initialization(training_data: Dict[str, Any]) -> None: + training = RunAITraining(**training_data) + assert training.name == training_data["name"] + assert training.requested_name == training_data["requestedName"] + assert training.workload_id == training_data["workloadId"] + assert training.project_id == training_data["projectId"] + assert training.cluster_id == training_data["clusterId"] + assert training.created_by == training_data["createdBy"] + assert training.created_at == datetime.fromisoformat(training_data["createdAt"].replace("Z", "+00:00")) + assert training.deleted_at is None + assert training.desired_phase == WorkloadPhase.RUNNING + assert training.actual_phase == ActualPhase.INITIALIZING + assert isinstance(training.spec, TrainingSpec) + + +@pytest.mark.parametrize( + "desired_phase, actual_phase", + [ + ("Running", "Initializing"), + ("Stopped", "Failed"), + ("Deleted", "Terminating"), + ], +) +def test_training_phases(training_data: Dict[str, Any], desired_phase: str, actual_phase: str) -> None: + training_data["desiredPhase"] = desired_phase + training_data["actualPhase"] = actual_phase + training = RunAITraining(**training_data) + assert training.desired_phase == WorkloadPhase(desired_phase) + assert training.actual_phase == ActualPhase(actual_phase) + + +def test_training_specification(training_data: Dict[str, Any]) -> None: + training = RunAITraining(**training_data) + spec = training.spec + assert spec is not None + assert spec.command == training_data["spec"]["command"] + assert spec.args == training_data["spec"]["args"] + assert spec.image == training_data["spec"]["image"] + assert spec.restart_policy == training_data["spec"]["restartPolicy"] + assert spec.node_pools == training_data["spec"]["nodePools"] + assert spec.compute is not None + assert spec.compute.gpu_devices_request == training_data["spec"]["compute"]["gpuDevicesRequest"] + assert spec.compute.cpu_core_request == training_data["spec"]["compute"]["cpuCoreRequest"] + assert spec.storage is not None + assert spec.storage.pvc[0].claim_name == training_data["spec"]["storage"]["pvc"][0]["claimName"] + assert spec.security is not None + assert spec.security.run_as_uid == training_data["spec"]["security"]["runAsUid"] + assert spec.security.allow_privilege_escalation == training_data["spec"]["security"]["allowPrivilegeEscalation"] diff --git a/tests/test_init.py b/tests/test_init.py index 93f549222..fd698b3ce 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -25,10 +25,12 @@ ) from cloudai._core.reporter import PerTestReporter, StatusReporter from cloudai.installer.lsf_installer import LSFInstaller +from cloudai.installer.runai_installer import RunAIInstaller from cloudai.installer.slurm_installer import SlurmInstaller from cloudai.installer.standalone_installer import StandaloneInstaller from cloudai.systems.kubernetes.kubernetes_system import KubernetesSystem from cloudai.systems.lsf.lsf_system import LSFSystem +from cloudai.systems.runai.runai_system import RunAISystem from cloudai.systems.slurm.slurm_system import SlurmSystem from cloudai.systems.standalone_system import StandaloneSystem from cloudai.workloads.chakra_replay import ( @@ -53,6 +55,7 @@ NcclTestGradingStrategy, NcclTestJobStatusRetrievalStrategy, NcclTestKubernetesJsonGenStrategy, + NcclTestRunAIJsonGenStrategy, NcclTestSlurmCommandGenStrategy, ) from cloudai.workloads.nemo_launcher import ( @@ -84,7 +87,8 @@ def test_systems(): assert "slurm" in parsers assert "kubernetes" in parsers assert "lsf" in parsers - assert len(parsers) == 4 + assert "runai" in parsers + assert len(parsers) == 5 def test_runners(): @@ -93,7 +97,8 @@ def test_runners(): assert "slurm" in runners assert "kubernetes" in runners assert "lsf" in runners - assert len(runners) == 4 + assert "runai" in runners + assert len(runners) == 5 ALL_STRATEGIES = { @@ -146,8 +151,10 @@ def test_runners(): (JobStatusRetrievalStrategy, SlurmSystem, MegatronRunTestDefinition): DefaultJobStatusRetrievalStrategy, (JobStatusRetrievalStrategy, StandaloneSystem, SleepTestDefinition): DefaultJobStatusRetrievalStrategy, (JobStatusRetrievalStrategy, LSFSystem, SleepTestDefinition): DefaultJobStatusRetrievalStrategy, + (JobStatusRetrievalStrategy, RunAISystem, NCCLTestDefinition): DefaultJobStatusRetrievalStrategy, (JsonGenStrategy, KubernetesSystem, NCCLTestDefinition): NcclTestKubernetesJsonGenStrategy, (JsonGenStrategy, KubernetesSystem, SleepTestDefinition): SleepKubernetesJsonGenStrategy, + (JsonGenStrategy, RunAISystem, NCCLTestDefinition): NcclTestRunAIJsonGenStrategy, } @@ -168,10 +175,11 @@ def strategy2str(key: tuple) -> str: def test_installers(): installers = Registry().installers_map - assert len(installers) == 4 + assert len(installers) == 5 assert installers["standalone"] == StandaloneInstaller assert installers["slurm"] == SlurmInstaller assert installers["lsf"] == LSFInstaller + assert installers["runai"] == RunAIInstaller def test_definitions():