From ebd72b030102a1e4ad235ca20f008fb5e4cd4859 Mon Sep 17 00:00:00 2001 From: Al Rigazzi Date: Mon, 11 Oct 2021 22:31:12 +0200 Subject: [PATCH] Ray Cluster features (#50) * Adds notebook to run head_node. * Add Ray notebook which can start cluster head node. * Workarounds for Ray/Jupyter resource problems. * Working setup for XC. * Insert CPU control for SLURM Ray. * Adds Ray cluster setup. * Refactor RayWorker and RayHead. * Adds batch capabilities to Ray Slurm launcher. * Add preamble to batch_settings * Fixes some issues on CCM for Ray * Redirect cmd server output for Ray. * Avoid Ray workers overlapping with head. * Change Slurm configuration for Ray. * Remove command server and use Ray client. * Add ray cluster for PBS batch. * Adds PBS functionalities for Ray cluster. * Adds Ray starter script. * Add new starter for Ray * Fix batch args bug for RayCluster * Add preamble mechanism to BatchSettings. * Remove old ray starter script. * Fixes tests for new interface. * Fix entity utils test. * Delete issues.md * Add local launcher and test for Ray. * Delete 05_starting_ray.ipynb Removes unused tutorial. * Delete manual-start.sh Remove unused script. * Delete start-head.sh Remove unused script. * Delete start-worker.sh Remove unused script. * Update requirements. * Remove check for slurm launcher and rely on exception. * Address reviewer's comments. Add ray_args. * Modify Ray tutorial. * Merge branch. * Adds Ray to manifest. * Fix to raystarter.py * Add manifest.ray_clusters to exp.stop() * Removes egg files. * Fixes wrong option for Ray and aprun * Address review, add flexibility to ray cluster * Add API functions to RayCluster * Fix for internal ray args * Add dashboard port to raystarter args. * Apply styling. * Add tests for ray on slurm * Fix slurm in alloc ray test * ADD new information to README and OA tutorial The readme has been updated with new examples and usage patterns. A new Online Analysis examples has been created with a Lattice Boltzman simulation that will show users how to perform streaming analysis with Smartsim. more to come. * Link Online Analysis example into docs Formatting in the README, added the OA example to the docs and converted to RST * Add visualization to README * Add PBS tests and pass ray_exe to raystarter * Remove expand_exe option * Remove duplicate function * Remove unused ray template * Add egg-info to gitignore * Remove expand_exe from RayCluster * Fix exe_path * Adds ray API to docs * Move set_cpus out of launch tests. * Fix characters in options for PBSOrchestrator * Fix non utf8 chars in options * Fix Cobalt options * Fix options in AprunSettings. * Allow multiple trials for Ray tests * Fix ray launch summary * Address local launcher for Ray The local launcher for Ray was broken on Mac because of how Ray does IP lookups for local addresses. The local launcher functionality has been taken out and the slurm and PBS are the only supported launchers now. Minor cleanup on RayCluster classes as well. Changed inheiriance structure from model to SmartSimEntity * Make RayCluster closer to SmartSim paradigm * Modifies starter to bind dashboard to all interfaces * Revert port for dashboard, update github wf * Add password option to RayCluster * Remove output from Notebook * Remove log level env from ray notebook * Adapt ray tests. * Bump up ray version * Remove unused launch branch, fix ray summary * Remove unused launcher branch * Apply styling * Remove notebook output * Remove block_in_batch feature * Fix ALPS regression introduced in this branch * Fix settings * Fix docstring * Add ignore flag for ray batch tests. * Change ray_started args * Update docstrings * Update TODO list in raycluster.py * Add disclaimer to notebook, license to raycluster * Make RayCluster error more useful * Add ray.shutdown to tests. * Add interface to Ray PBS tests. Apply styling * Remove useless _vars from RayCluster * Remove unused attributes from RayCluster * Remove ray_head variable * Fix new variables for RayCluster * Make RayCluster functions static * Modify notebook * Update Ray path to exp * Update docs, removed unused function * Extend wait time for Ray head log * Remove node retrieval for Ray * Update notebook and summary for RayCluster * Fixes to Ray docs * Add interface to Ray tests * Apply styling * Address reviewer's comments * Apply styling * Add SSH tunneling instructions to Ray notebook * Change workers parameter to num_nodes for clarity * Update Ray tests * Add review's suggestions and dashboard host fix * Update Ray tutorial * Restrict dashboard option to head only * Correct typo in Ray notebook * Add some info to notebook. * Fix typo in notebook Co-authored-by: Sam Partee --- .github/workflows/run_local_tests.yml | 4 +- .gitignore | 1 + README.md | 78 +++ doc/api/smartsim_api.rst | 18 + doc/index.rst | 2 + setup.cfg | 2 + smart | 2 +- smartsim/control/controller.py | 11 +- smartsim/control/manifest.py | 23 +- smartsim/entity/entity.py | 2 - smartsim/entity/entityList.py | 3 +- smartsim/exp/ray/__init__.py | 1 + smartsim/exp/ray/raycluster.py | 474 ++++++++++++++++++ smartsim/exp/ray/raystarter.py | 83 +++ smartsim/experiment.py | 37 +- smartsim/generation/generator.py | 1 + smartsim/settings/alpsSettings.py | 42 +- smartsim/settings/slurmSettings.py | 41 +- smartsim/utils/__init__.py | 2 +- smartsim/utils/helpers.py | 12 + tests/backends/run_sklearn_onnx.py | 1 + tests/backends/run_tf.py | 2 +- tests/backends/run_torch.py | 1 + tests/full_wlm/with_ray/test_ray_pbs_batch.py | 75 +++ .../full_wlm/with_ray/test_ray_slurm_batch.py | 72 +++ tests/on_wlm/test_launch_orc_cobalt.py | 3 - tests/on_wlm/test_launch_orc_pbs.py | 3 - tests/on_wlm/test_launch_orc_slurm.py | 3 - tests/test_configs/smartredis/consumer.py | 1 + tests/test_configs/smartredis/producer.py | 1 + tests/test_manifest.py | 28 +- tests/test_smartredis.py | 11 +- tests/with_ray/on_wlm/test_ray_pbs.py | 75 +++ tests/with_ray/on_wlm/test_ray_slurm.py | 122 +++++ tests/with_ray/test_ray.py | 107 ++++ .../01_getting_started.ipynb | 2 +- .../05_starting_ray_builtin.ipynb | 279 +++++++++++ 37 files changed, 1584 insertions(+), 41 deletions(-) create mode 100644 smartsim/exp/ray/__init__.py create mode 100644 smartsim/exp/ray/raycluster.py create mode 100644 smartsim/exp/ray/raystarter.py create mode 100644 tests/full_wlm/with_ray/test_ray_pbs_batch.py create mode 100644 tests/full_wlm/with_ray/test_ray_slurm_batch.py create mode 100644 tests/with_ray/on_wlm/test_ray_pbs.py create mode 100644 tests/with_ray/on_wlm/test_ray_slurm.py create mode 100644 tests/with_ray/test_ray.py create mode 100644 tutorials/05_starting_ray/05_starting_ray_builtin.ipynb diff --git a/.github/workflows/run_local_tests.yml b/.github/workflows/run_local_tests.yml index e291c27f5..507a1f457 100644 --- a/.github/workflows/run_local_tests.yml +++ b/.github/workflows/run_local_tests.yml @@ -47,13 +47,13 @@ jobs: if: matrix.python-version != '3.9' run: | echo "$(brew --prefix)/opt/make/libexec/gnubin" >> $GITHUB_PATH - python -m pip install -vvv .[dev,ml] + python -m pip install -vvv .[dev,ml,ray] - name: Install SmartSim if: matrix.python-version == '3.9' run: | echo "$(brew --prefix)/opt/make/libexec/gnubin" >> $GITHUB_PATH - python -m pip install -vvv .[dev] + python -m pip install -vvv .[dev,ray] - name: Install ML Runtimes with Smart (with pt and tf) if: contains(matrix.os, 'macos') diff --git a/.gitignore b/.gitignore index 9533e7e4d..628db5c53 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ __pycache__ .pytest_cache/ .coverage* htmlcov +smartsim.egg-info # Dependencies third-party diff --git a/README.md b/README.md index 039c6a82b..d25fdb6bc 100644 --- a/README.md +++ b/README.md @@ -81,6 +81,9 @@ independently. - [Local Launch](#local-launch) - [Interactive Launch](#interactive-launch) - [Batch Launch](#batch-launch) + - [Ray](#ray) + - [Ray on Slurm](#ray-on-slurm) + - [Ray on PBS](#ray-on-pbs) - [SmartRedis](#smartredis) - [Tensors](#tensors) - [Datasets](#datasets) @@ -288,6 +291,7 @@ python hello_ensemble_pbs.py # Infrastructure Library Applications - Orchestrator - In-memory data store and Machine Learning Inference (Redis + RedisAI) + - Ray - Distributed Reinforcement Learning (RL), Hyperparameter Optimization (HPO) ## Redis + RedisAI @@ -415,6 +419,80 @@ exp.stop(db_cluster) python run_db_pbs_batch.py ``` +----- +## Ray + +Ray is a distributed computation framework that supports a number of applications + - RLlib - Distributed Reinforcement Learning (RL) + - RaySGD - Distributed Training + - Ray Tune - Hyperparameter Optimization (HPO) + - Ray Serve - ML/DL inference +As well as other integrations with frameworks like Modin, Mars, Dask, and Spark. + +Historically, Ray has not been well supported on HPC systems. A few examples exist, +but none are well maintained. Because SmartSim already has launchers for HPC systems, +launching Ray through SmartSim is a relatively simple task. + +### Ray on Slurm + +Below is an example of how to launch a Ray cluster on a Slurm system and connect to it. +In this example, we set `batch=True`, which means that the cluster will be started +requesting an allocation through Slurm. If this code is run within a sufficiently large +interactive allocation, setting `batch=False` will spin the Ray cluster on the +allocated nodes. + +```Python +import ray + +from smartsim import Experiment +from smartsim.ext.ray import RayCluster + +exp = Experiment("ray-cluster", launcher='slurm') +# 3 workers + 1 head node = 4 node-cluster +cluster = RayCluster(name="ray-cluster", run_args={}, + ray_args={"num-cpus": 24}, + launcher=launcher, workers=3, batch=True) + +exp.generate(cluster, overwrite=True) +exp.start(cluster, block=False, summary=True) + +# Connect to the Ray cluster +ray.util.connect(cluster.head_model.address+":10001") + +# +``` + + +### Ray on PBS + +Below is an example of how to launch a Ray cluster on a PBS system and connect to it. +In this example, we set `batch=True`, which means that the cluster will be started +requesting an allocation through Slurm. If this code is run within a sufficiently large +interactive allocation, setting `batch=False` will spin the Ray cluster on the +allocated nodes. + +```Python +import ray + +from smartsim import Experiment +from smartsim.ext.ray import RayCluster + +exp = Experiment("ray-cluster", launcher='pbs') +# 3 workers + 1 head node = 4 node-cluster +cluster = RayCluster(name="ray-cluster", run_args={}, + ray_args={"num-cpus": 24}, + launcher=launcher, workers=3, batch=True) + +exp.generate(cluster, overwrite=True) +exp.start(cluster, block=False, summary=True) + +# Connect to the ray cluster +ray.util.connect(cluster.head_model.address+":10001") + +# +``` + + ------ # SmartRedis diff --git a/doc/api/smartsim_api.rst b/doc/api/smartsim_api.rst index c5cbfc5e3..f146b6f6c 100644 --- a/doc/api/smartsim_api.rst +++ b/doc/api/smartsim_api.rst @@ -490,3 +490,21 @@ Slurm .. automodule:: smartsim.launcher.slurm.slurm :members: + + +Ray +=== + +.. currentmodule:: smartsim.exp.ray + +.. _ray_api: + +``RayCluster`` is used to launch a Ray cluster + and can be launched as a batch or in an interactive allocation. + +.. autoclass:: RayCluster + :show-inheritance: + :members: + :inherited-members: + :undoc-members: + :exclude-members: batch set_path type diff --git a/doc/index.rst b/doc/index.rst index 2a90c7a0c..42a198c68 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -14,6 +14,8 @@ :caption: Tutorials tutorials/01_getting_started/01_getting_started + tutorlals/03_online_analysis/03_online_analysis + tutorials/05_starting_ray/05_starting_ray tutorials/using_clients tutorials/lattice_boltz_analysis tutorials/inference diff --git a/setup.cfg b/setup.cfg index 79e1e25e6..767a3d6eb 100644 --- a/setup.cfg +++ b/setup.cfg @@ -66,6 +66,8 @@ doc= sphinx-fortran==1.1.1 nbsphinx>=0.8.2 +ray= + ray>=1.6 [options.packages.find] exclude = diff --git a/smart b/smart index 226c038cf..988eaff91 100755 --- a/smart +++ b/smart @@ -245,7 +245,7 @@ def clean(install_path, _all=False): rai_path = lib_path.joinpath("redisai.so") if rai_path.is_file(): rai_path.unlink() - print("Succesfully removed existing RedisAI installation") + print("Successfully removed existing RedisAI installation") backend_path = lib_path.joinpath("backends") if backend_path.is_dir(): diff --git a/smartsim/control/controller.py b/smartsim/control/controller.py index 53a012668..78df428e9 100644 --- a/smartsim/control/controller.py +++ b/smartsim/control/controller.py @@ -247,9 +247,11 @@ def init_launcher(self, launcher): elif launcher == "pbs": self._launcher = PBSLauncher() self._jobs.set_launcher(self._launcher) + # Init Cobalt launcher elif launcher == "cobalt": self._launcher = CobaltLauncher() self._jobs.set_launcher(self._launcher) + # Init LSF launcher elif launcher == "lsf": self._launcher = LSFLauncher() self._jobs.set_launcher(self._launcher) @@ -275,10 +277,13 @@ def _launch(self, manifest): raise SmartSimError(msg) self._launch_orchestrator(orchestrator) + for rc in manifest.ray_clusters: + rc._update_workers() + # create all steps prior to launch steps = [] - - for elist in manifest.ensembles: + all_entity_lists = manifest.ensembles + manifest.ray_clusters + for elist in all_entity_lists: if elist.batch: batch_step = self._create_batch_job_step(elist) steps.append((batch_step, elist)) @@ -498,7 +503,7 @@ def _orchestrator_launch_wait(self, orchestrator): if not self._jobs.actively_monitoring: self._jobs.check_jobs() - # _jobs.get_status aquires JM lock for main thread, no need for locking + # _jobs.get_status acquires JM lock for main thread, no need for locking statuses = self.get_entity_list_status(orchestrator) if all([stat == STATUS_RUNNING for stat in statuses]): ready = True diff --git a/smartsim/control/manifest.py b/smartsim/control/manifest.py index 4710be60c..d7bc2d5cc 100644 --- a/smartsim/control/manifest.py +++ b/smartsim/control/manifest.py @@ -28,11 +28,12 @@ from ..entity import EntityList, SmartSimEntity from ..error import SmartSimError from ..error.errors import SmartSimError +from ..exp.ray import RayCluster # List of types derived from EntityList which require specific behavior # A corresponding property needs to exist (like db for Orchestrator), # otherwise they will not be accessible -entity_list_exception_types = [Orchestrator] +entity_list_exception_types = [Orchestrator, RayCluster] class Manifest: @@ -83,6 +84,26 @@ def ensembles(self): return _ensembles + @property + def ray_clusters(self): + _ray_cluster = [] + for deployable in self._deployables: + if isinstance(deployable, RayCluster): + _ray_cluster.append(deployable) + return _ray_cluster + + @property + def all_entity_lists(self): + """All entity lists, including ensembles and + exceptional ones like Orchestrator and Ray Clusters + """ + _all_entity_lists = self.ray_clusters + self.ensembles + db = self.db + if db is not None: + _all_entity_lists.append(db) + + return _all_entity_lists + def _check_names(self, deployables): used = [] for deployable in deployables: diff --git a/smartsim/entity/entity.py b/smartsim/entity/entity.py index 5ea989d4d..8f786686c 100644 --- a/smartsim/entity/entity.py +++ b/smartsim/entity/entity.py @@ -24,8 +24,6 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import os.path as osp - class SmartSimEntity: def __init__(self, name, path, run_settings): diff --git a/smartsim/entity/entityList.py b/smartsim/entity/entityList.py index 11840d045..efb1a8799 100644 --- a/smartsim/entity/entityList.py +++ b/smartsim/entity/entityList.py @@ -43,8 +43,7 @@ def batch(self): try: if self.batch_settings: return True - else: - return False + return False # local orchestrator cannot launch with batches except AttributeError: return False diff --git a/smartsim/exp/ray/__init__.py b/smartsim/exp/ray/__init__.py new file mode 100644 index 000000000..d7d9665b5 --- /dev/null +++ b/smartsim/exp/ray/__init__.py @@ -0,0 +1 @@ +from .raycluster import RayCluster, parse_ray_head_node_address diff --git a/smartsim/exp/ray/raycluster.py b/smartsim/exp/ray/raycluster.py new file mode 100644 index 000000000..5b0fbb4d4 --- /dev/null +++ b/smartsim/exp/ray/raycluster.py @@ -0,0 +1,474 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import os +import re +import time as _time +import uuid + +from ...entity import EntityList, SmartSimEntity +from ...error import SSConfigError, SSUnsupportedError +from ...settings import AprunSettings, QsubBatchSettings, SbatchSettings, SrunSettings +from ...utils import delete_elements, get_logger +from ...utils.helpers import expand_exe_path, init_default + +logger = get_logger(__name__) + + +class RayCluster(EntityList): + """Entity used to run a Ray cluster on a given number of hosts. One Ray node is + launched on each host, and the first host is used to launch the head node. + + :param name: The name of the entity. + :type name: str + :param path: path to output, error, and configuration files + :type path: str + :param ray_port: Port at which the head node will be running. + :type ray_port: int + :param ray_args: Arguments to be passed to Ray executable as `--key=value`, or `--key` if `value` is set to `None`. + :type ray_args: dict[str,str] + :param num_nodes: Number of hosts, includes 1 head node and all worker nodes. + :type num_nodes: int + :param run_args: Arguments to pass to launcher to specify details such as partition, time, and so on. + :type run_args: dict[str,str] + :param batch_args: Additional batch arguments passed to launcher when running batch jobs. + :type batch_args: dict[str,str] + :param launcher: Name of launcher to use for starting the cluster. + :type launcher: str + :param interface: Name of network interface the cluster nodes should bind to. + :type interface: str + :param alloc: ID of allocation to run on, only used if launcher is Slurm and allocation is + obtained with ``ray.slurm.get_allocation`` + :type alloc: int + :param batch: Whether cluster should be launched as batch file, ignored when ``launcher`` is `local` + :type batch: bool + :param time: The walltime the cluster will be running for + :type time: str + :param password: Password to use for Redis server, which is passed as `--redis_password` to `ray start`. + Can be set to + - `auto`: a strong password will be generated internally + - a string: it will be used as password + - `None`: the default Ray password will be used. + Defaults to `auto` + :type password: str + """ + + def __init__( + self, + name, + path=os.getcwd(), + ray_port=6789, + ray_args=None, + num_nodes=1, + run_args=None, + batch_args=None, + launcher="local", + batch=False, + time="01:00:00", + interface="ipogif0", + alloc=None, + password="auto", + **kwargs, + ): + launcher = launcher.lower() + if launcher not in ["slurm", "pbs"]: + raise SSUnsupportedError( + "Only Slurm and PBS launchers are supported by RayCluster" + ) + + if password: + if password == "auto": + self._ray_password = str(uuid.uuid4()) + else: + self._ray_password = password + else: + self._ray_password = None + + if num_nodes < 1: + raise ValueError("Number of nodes must be larger than 0.") + + self.alloc = None + self.batch_settings = None + self._hosts = None + + run_args = init_default({}, run_args, dict) + batch_args = init_default({}, batch_args, dict) + ray_args = init_default({}, ray_args, dict) + + self._ray_args = ray_args + super().__init__( + name=name, + path=path, + ray_args=ray_args, + run_args=run_args, + ray_port=ray_port, + launcher=launcher, + interface=interface, + alloc=alloc, + num_nodes=num_nodes, + **kwargs, + ) + if batch: + self._build_batch_settings(num_nodes, time, batch_args, launcher) + self.ray_head_address = None + + @property + def batch(self): + try: + if self.batch_settings: + return True + return False + except AttributeError: + return False + + def _initialize_entities(self, **kwargs): + + ray_port = kwargs.get("ray_port", 6789) + launcher = kwargs.get("launcher", "slurm") + ray_args = kwargs.get("ray_args", None) + run_args = kwargs.get("run_args", None) + interface = kwargs.get("interface", "ipogif0") + num_nodes = kwargs.get("num_nodes", 0) + alloc = kwargs.get("alloc", None) + + ray_head = RayHead( + name="ray_head", + path=self.path, + ray_password=self._ray_password, + ray_port=ray_port, + launcher=launcher, + run_args=run_args, + ray_args=ray_args, + interface=interface, + alloc=alloc, + ) + + self.entities.append(ray_head) + + for worker_id in range(num_nodes - 1): + worker_model = RayWorker( + name=f"ray_worker_{worker_id}", + path=self.path, + run_args=run_args, + ray_port=ray_port, + ray_password=self._ray_password, + ray_args=ray_args, + interface=interface, + launcher=launcher, + alloc=alloc, + ) + self.entities.append(worker_model) + + def _build_batch_settings(self, num_nodes, time, batch_args, launcher): + if launcher == "pbs": + self.batch_settings = QsubBatchSettings( + nodes=num_nodes, time=time, batch_args=batch_args + ) + elif launcher == "slurm": + self.batch_settings = SbatchSettings( + nodes=num_nodes, time=time, batch_args=batch_args + ) + else: + raise SSUnsupportedError("Only PBS and Slurm launchers are supported") + + def get_head_address(self): + """Return address of head node + + If address has not been initialized, returns None + + :returns: Address of head node + :rtype: str + """ + if not self.ray_head_address: + self.ray_head_address = parse_ray_head_node_address( + os.path.join(self.entities[0].path, self.entities[0].name + ".out") + ) + return self.ray_head_address + + def get_dashboard_address(self): + """Returns dashboard address + + The format is : + + :returns: Dashboard address + :rtype: str + """ + return self.get_head_address() + ":" + str(self.entities[0].dashboard_port) + + def _update_workers(self): + """Update worker args before launching them.""" + for worker in range(1, len(self.entities)): + self.entities[worker].set_head_log( + f"{os.path.join(self.entities[0].path, self.entities[0].name)}.out" + ) + + +def find_ray_exe(): + """Find ray executable in current path.""" + try: + ray_exe = expand_exe_path("ray") + return ray_exe + except SSConfigError as e: + raise SSConfigError("Could not find ray executable") from e + + +def find_ray_stater_script(): + """Find location of script used to start Ray nodes.""" + dir_path = os.path.dirname(os.path.realpath(__file__)) + return f"{dir_path}/raystarter.py" + + +def parse_ray_head_node_address(head_log): + """Get the ray head node host address from the log file produced + by the head process. + + :param head_log: full path to log file of head node + :return: address of the head host + :rtype: str + """ + + max_attempts = 12 + attempts = 0 + while not os.path.isfile(head_log): + _time.sleep(5) + attempts += 1 + if attempts == max_attempts: + raise RuntimeError(f"Could not find Ray cluster head log file {head_log}") + + attempts = 0 + head_ip = None + while head_ip is None: + _time.sleep(5) + with open(head_log) as fp: + line = fp.readline() + while line: + plain_line = re.sub("\033\\[([0-9]+)(;[0-9]+)*m", "", line) + if "Local node IP:" in plain_line: + matches = re.search(r"(?<=Local node IP: ).*", plain_line) + head_ip = matches.group() + break + line = fp.readline() + attempts += 1 + if attempts == max_attempts: + raise RuntimeError( + f"Could not find Ray cluster head address in log file {head_log}." + ) + + return head_ip + + +class RayHead(SmartSimEntity): + def __init__( + self, + name, + path, + ray_password, + ray_port=6789, + run_args=None, + ray_args=None, + launcher="slurm", + interface="ipogif0", + alloc=None, + dash_port=8265, + ): + self.dashboard_port = dash_port + self.batch_settings = None + self.files = None + + run_args = init_default({}, run_args, dict) + ray_args = init_default({}, ray_args, dict) + + ray_exe_args = self._build_ray_exe_args( + ray_port, ray_password, interface, ray_args + ) + + run_settings = self._build_run_settings(launcher, alloc, run_args, ray_exe_args) + super().__init__(name, path, run_settings) + + def _build_ray_exe_args(self, ray_port, ray_password, interface, ray_args): + + # python script that launches ray head node + starter_script = find_ray_stater_script() + ray_starter_args = [ + starter_script, + f"+port={ray_port}", + f"+ifname={interface}", + f"+ray-exe={find_ray_exe()}", + f"+head", + ] + + if ray_password: + ray_starter_args += [f"+redis-password={ray_password}"] + + if "dashboard-port" in ray_args: + self.dashboard_port = int(ray_args["dashboard-port"]) + ray_starter_args += [f"+dashboard-port={self.dashboard_port}"] + + used = ["block", "redis-password", "start", "head", "port", "dashboard-port"] + extra_ray_args = [] + for key, value in ray_args.items(): + if key not in used: + extra_ray_args += [f"+ray-args=--{key}={value}"] + ray_starter_args += extra_ray_args + + return " ".join(ray_starter_args) + + def _build_run_settings(self, launcher, alloc, run_args, ray_exe_args): + + if launcher == "slurm": + run_settings = self._build_srun_settings(alloc, run_args, ray_exe_args) + elif launcher == "pbs": + run_settings = self._build_pbs_settings(run_args, ray_exe_args) + else: + raise SSUnsupportedError("Only slurm, and pbs launchers are supported.") + + run_settings.set_tasks(1) + run_settings.set_tasks_per_node(1) + return run_settings + + def _build_pbs_settings(self, run_args, ray_args): + + aprun_settings = AprunSettings("python", exe_args=ray_args, run_args=run_args) + aprun_settings.set_tasks(1) + + return aprun_settings + + def _build_srun_settings(self, alloc, run_args, ray_args): + + delete_elements(run_args, ["oversubscribe"]) + + run_args["unbuffered"] = None + + srun_settings = SrunSettings( + "python", + exe_args=ray_args, + run_args=run_args, + alloc=alloc, + ) + srun_settings.set_nodes(1) + return srun_settings + + +class RayWorker(SmartSimEntity): + def __init__( + self, + name, + path, + ray_password, + ray_port, + run_args=None, + ray_args=None, + interface="ipogif0", + launcher="slurm", + alloc=None, + ): + + self.batch_settings = None + self.files = None + + run_args = init_default({}, run_args, dict) + ray_args = init_default({}, ray_args, dict) + + ray_exe_args = self._build_ray_exe_args( + ray_password, ray_args, ray_port, interface + ) + + run_settings = self._build_run_settings(launcher, alloc, run_args, ray_exe_args) + super().__init__(name, path, run_settings) + + @property + def batch(self): + return False + + def set_head_log(self, head_log): + """Set head log file (with full path) + + The head log file is used by the worker to discover + the head IP address. This function is called by + RayCluster before the cluster is launched. + """ + self.run_settings.add_exe_args([f"+head-log={head_log}"]) + + def _build_ray_exe_args(self, ray_password, ray_args, ray_port, interface): + + # python script that launches ray node + starter_script = find_ray_stater_script() + ray_starter_args = [ + starter_script, + f"+ray-exe={find_ray_exe()}", + f"+port={ray_port}", + f"+ifname={interface}", + ] + if ray_password: + ray_starter_args += [f"+redis-password={ray_password}"] + + used = [ + "block", + "redis-password", + "start", + "head", + "port", + "dashboard-port", + "dashboard-host", + ] + extra_ray_args = [] + for key, value in ray_args.items(): + if key not in used: + extra_ray_args += [f"+ray-args=--{key}={value}"] + ray_starter_args += extra_ray_args + + return " ".join(ray_starter_args) + + def _build_run_settings(self, launcher, alloc, run_args, ray_exe_args): + + if launcher == "slurm": + run_settings = self._build_srun_settings(alloc, run_args, ray_exe_args) + elif launcher == "pbs": + run_settings = self._build_pbs_settings(run_args, ray_exe_args) + else: + raise SSUnsupportedError("Only slurm, and pbs launchers are supported.") + + run_settings.set_tasks(1) + return run_settings + + def _build_pbs_settings(self, run_args, ray_args): + + aprun_settings = AprunSettings("python", exe_args=ray_args, run_args=run_args) + + return aprun_settings + + def _build_srun_settings(self, alloc, run_args, ray_args): + delete_elements(run_args, ["oversubscribe"]) + run_args["unbuffered"] = None + + srun_settings = SrunSettings( + "python", + exe_args=ray_args, + run_args=run_args, + alloc=alloc, + ) + srun_settings.set_nodes(1) + return srun_settings diff --git a/smartsim/exp/ray/raystarter.py b/smartsim/exp/ray/raystarter.py new file mode 100644 index 000000000..5da213534 --- /dev/null +++ b/smartsim/exp/ray/raystarter.py @@ -0,0 +1,83 @@ +import argparse +import os +from subprocess import PIPE, STDOUT, Popen + +import psutil + +from smartsim.exp.ray import parse_ray_head_node_address +from smartsim.utils.helpers import get_ip_from_interface + + +def get_lb_interface_name(): + """Use psutil to get loopback interface name""" + net_if_addrs = list(psutil.net_if_addrs()) + for interface in net_if_addrs: + if interface.startswith("lo"): + return interface + raise OSError("Could not find loopback interface name") + + +os.environ["PYTHONUNBUFFERED"] = "1" + +parser = argparse.ArgumentParser( + prefix_chars="+", description="SmartSim Ray head launcher" +) +parser.add_argument( + "+port", type=int, help="Port used by Ray to start the Redis server at" +) +parser.add_argument("+head", action="store_true") +parser.add_argument("+redis-password", type=str, help="Password of Redis cluster") +parser.add_argument( + "+ray-args", action="append", help="Additional arguments to start Ray" +) +parser.add_argument("+dashboard-port", type=str, help="Ray dashboard port") +parser.add_argument("+ray-exe", type=str, help="Ray executable", default="ray") +parser.add_argument("+ifname", type=str, help="Interface name", default="lo") +parser.add_argument("+head-log", type=str, help="Head node log") +args = parser.parse_args() + +if not args.head and not args.head_log: + raise argparse.ArgumentError( + "Ray starter needs +head or +head-log to start head or worker nodes respectively" + ) + + +def current_ip(interface="lo"): + if interface == "lo": + loopback = get_lb_interface_name() + return get_ip_from_interface(loopback) + else: + return get_ip_from_interface(interface) + + +RAY_IP = current_ip(args.ifname) + +cliargs = [ + args.ray_exe, + "start", + "--head" + if args.head + else f"--address={parse_ray_head_node_address(args.head_log)}:{args.port}", + "--block", + f"--node-ip-address={RAY_IP}", +] + +if args.ray_args: + cliargs += args.ray_args + if args.head and not any([arg.startswith("--dashboard-host") for arg in args.ray_args]): + cliargs += [f"--dashboard-host={RAY_IP}"] + +if args.redis_password: + cliargs += [f"--redis-password={args.redis_password}"] + +if args.head: + cliargs += [f"--port={args.port}", f"--dashboard-port={args.dashboard_port}"] + + +cmd = " ".join(cliargs) +print(f"Ray Command: {cmd}") + +p = Popen(cliargs, stdout=PIPE, stderr=STDOUT) + +for line in iter(p.stdout.readline, b""): + print(line.decode("utf-8").rstrip(), flush=True) diff --git a/smartsim/experiment.py b/smartsim/experiment.py index 3bfa376e2..454c16ad1 100644 --- a/smartsim/experiment.py +++ b/smartsim/experiment.py @@ -118,11 +118,8 @@ def stop(self, *args): stop_manifest = Manifest(*args) for entity in stop_manifest.models: self._control.stop_entity(entity) - for entity_list in stop_manifest.ensembles: + for entity_list in stop_manifest.all_entity_lists: self._control.stop_entity_list(entity_list) - orchestrator = stop_manifest.db - if orchestrator: - self._control.stop_entity_list(orchestrator) except SmartSimError as e: logger.error(e) raise @@ -400,6 +397,7 @@ def sprint(p): models = manifest.models ensembles = manifest.ensembles orchestrator = manifest.db + ray_clusters = manifest.ray_clusters header = colorize("=== LAUNCH SUMMARY ===", color="cyan", bold=True) exname = colorize("Experiment: " + self.name, color="green", bold=True) @@ -463,6 +461,37 @@ def sprint(p): batch = colorize(f"Launching as batch: {orchestrator.batch}", color="green") sprint(f"{batch}") sprint(f"{size}") + if ray_clusters: + sprint(colorize("=== RAY CLUSTERS ===", color="cyan", bold=True)) + for rc in ray_clusters: + name = colorize(rc.name, color="green", bold=True) + num_models = colorize("# of nodes: " + str(len(rc)), color="green") + if rc.batch: + batch_settings = colorize( + "Ray batch Settings: \n" + str(rc.batch_settings), + color="green", + ) + head_run_settings = colorize( + "Ray head run Settings: \n" + str(rc.entities[0].run_settings), + color="green", + ) + run_settings = head_run_settings + if len(rc) > 1: + worker_run_settings = colorize( + "\nRay worker run Settings: \n" + + str(rc.entities[1].run_settings), + color="green", + ) + run_settings += worker_run_settings + batch = colorize(f"Launching as batch: {rc.batch}", color="green") + + sprint(f"{name}") + sprint(f"{num_models}") + sprint(f"{batch}") + if rc.batch: + sprint(f"{batch_settings}") + sprint(f"{run_settings}") + sprint("\n") sprint("\n") diff --git a/smartsim/generation/generator.py b/smartsim/generation/generator.py index e1a394ee8..0a69685f0 100644 --- a/smartsim/generation/generator.py +++ b/smartsim/generation/generator.py @@ -87,6 +87,7 @@ def generate_experiment(self, *args): self._gen_orc_dir(generator_manifest.db) self._gen_entity_list_dir(generator_manifest.ensembles) self._gen_entity_dirs(generator_manifest.models) + self._gen_entity_list_dir(generator_manifest.ray_clusters) def set_tag(self, tag, regex=None): """Set the tag used for tagging input files diff --git a/smartsim/settings/alpsSettings.py b/smartsim/settings/alpsSettings.py index ad658bf2d..f60db95f5 100644 --- a/smartsim/settings/alpsSettings.py +++ b/smartsim/settings/alpsSettings.py @@ -28,7 +28,13 @@ class AprunSettings(RunSettings): - def __init__(self, exe, exe_args=None, run_args=None, env_vars=None): + def __init__( + self, + exe, + exe_args=None, + run_args=None, + env_vars=None, + ): """Settings to run job with ``aprun`` command ``AprunSettings`` can be used for both the `pbs` and `cobalt` @@ -44,7 +50,11 @@ def __init__(self, exe, exe_args=None, run_args=None, env_vars=None): :type env_vars: dict[str, str], optional """ super().__init__( - exe, exe_args, run_command="aprun", run_args=run_args, env_vars=env_vars + exe, + exe_args, + run_command="aprun", + run_args=run_args, + env_vars=env_vars, ) self.mpmd = [] @@ -104,6 +114,21 @@ def set_hostlist(self, host_list): raise TypeError("host_list argument must be list of strings") self.run_args["node-list"] = ",".join(host_list) + def set_excluded_hosts(self, host_list): + """Specify a list of hosts to exclude for launching this job + + :param host_list: hosts to exclude + :type host_list: list[str] + :raises TypeError: + """ + if isinstance(host_list, str): + host_list = [host_list.strip()] + if not isinstance(host_list, list): + raise TypeError("host_list argument must be a list of strings") + if not all([isinstance(host, str) for host in host_list]): + raise TypeError("host_list argument must be list of strings") + self.run_args["exclude-node-list"] = ",".join(host_list) + def format_run_args(self): """Return a list of ALPS formatted run arguments @@ -138,3 +163,16 @@ def format_env_vars(self): for name, value in self.env_vars.items(): formatted += ["-e", name + "=" + str(value)] return formatted + + def set_walltime(self, walltime): + """Set the walltime of the job + + format = "HH:MM:SS" + + :param walltime: wall time + :type walltime: str + """ + h_m_s = walltime.split(":") + self.run_args["t"] = str( + int(h_m_s[0]) * 3600 + int(h_m_s[1]) * 60 + int(h_m_s[2]) + ) diff --git a/smartsim/settings/slurmSettings.py b/smartsim/settings/slurmSettings.py index 0487ca8c0..9a7b036e8 100644 --- a/smartsim/settings/slurmSettings.py +++ b/smartsim/settings/slurmSettings.py @@ -30,7 +30,14 @@ class SrunSettings(RunSettings): - def __init__(self, exe, exe_args=None, run_args=None, env_vars=None, alloc=None): + def __init__( + self, + exe, + exe_args=None, + run_args=None, + env_vars=None, + alloc=None, + ): """Initialize run parameters for a slurm job with ``srun`` ``SrunSettings`` should only be used on Slurm based systems. @@ -50,7 +57,11 @@ def __init__(self, exe, exe_args=None, run_args=None, env_vars=None, alloc=None) :type alloc: str, optional """ super().__init__( - exe, exe_args, run_command="srun", run_args=run_args, env_vars=env_vars + exe, + exe_args, + run_command="srun", + run_args=run_args, + env_vars=env_vars, ) self.alloc = alloc self.mpmd = False @@ -80,6 +91,21 @@ def set_hostlist(self, host_list): raise TypeError("host_list argument must be list of strings") self.run_args["nodelist"] = ",".join(host_list) + def set_excluded_hosts(self, host_list): + """Specify a list of hosts to exclude for launching this job + + :param host_list: hosts to exclude + :type host_list: list[str] + :raises TypeError: + """ + if isinstance(host_list, str): + host_list = [host_list.strip()] + if not isinstance(host_list, list): + raise TypeError("host_list argument must be a list of strings") + if not all([isinstance(host, str) for host in host_list]): + raise TypeError("host_list argument must be list of strings") + self.run_args["exclude"] = ",".join(host_list) + def set_cpus_per_task(self, num_cpus): """Set the number of cpus to use per task @@ -110,6 +136,17 @@ def set_tasks_per_node(self, num_tpn): """ self.run_args["ntasks-per-node"] = int(num_tpn) + def set_walltime(self, walltime): + """Set the walltime of the job + + format = "HH:MM:SS" + + :param walltime: wall time + :type walltime: str + """ + # TODO check for errors here + self.run_args["time"] = walltime + def format_run_args(self): """return a list of slurm formatted run arguments diff --git a/smartsim/utils/__init__.py b/smartsim/utils/__init__.py index 0cb620e8f..48151433b 100644 --- a/smartsim/utils/__init__.py +++ b/smartsim/utils/__init__.py @@ -1,2 +1,2 @@ -from .helpers import get_env +from .helpers import delete_elements, get_env from .log import get_logger, log_to_file diff --git a/smartsim/utils/helpers.py b/smartsim/utils/helpers.py index 265eb9178..2caa39932 100644 --- a/smartsim/utils/helpers.py +++ b/smartsim/utils/helpers.py @@ -129,3 +129,15 @@ def colorize(string, color, bold=False, highlight=False): if bold: attr.append("1") return "\x1b[%sm%s\x1b[0m" % (";".join(attr), string) + + +def delete_elements(dictionary, key_list): + """Delete elements from a dictionary. + :param dictionary: the dictionary from which the elements must be deleted. + :type dictionary: dict + :param key_list: the list of keys to delete from the dictionary. + :type key: any + """ + for key in key_list: + if key in dictionary: + del dictionary[key] diff --git a/tests/backends/run_sklearn_onnx.py b/tests/backends/run_sklearn_onnx.py index f2c898a54..157e5a815 100644 --- a/tests/backends/run_sklearn_onnx.py +++ b/tests/backends/run_sklearn_onnx.py @@ -5,6 +5,7 @@ from sklearn.ensemble import RandomForestRegressor from sklearn.linear_model import LinearRegression from sklearn.model_selection import train_test_split + from smartredis import Client diff --git a/tests/backends/run_tf.py b/tests/backends/run_tf.py index 82151ef96..1003ab778 100644 --- a/tests/backends/run_tf.py +++ b/tests/backends/run_tf.py @@ -1,9 +1,9 @@ import os import numpy as np -from smartredis import Client from tensorflow import keras +from smartredis import Client from smartsim.tf import freeze_model diff --git a/tests/backends/run_torch.py b/tests/backends/run_torch.py index 0d4bfa4cc..8e3ff585f 100644 --- a/tests/backends/run_torch.py +++ b/tests/backends/run_torch.py @@ -4,6 +4,7 @@ import torch import torch.nn as nn import torch.nn.functional as F + from smartredis import Client diff --git a/tests/full_wlm/with_ray/test_ray_pbs_batch.py b/tests/full_wlm/with_ray/test_ray_pbs_batch.py new file mode 100644 index 000000000..8c61a9d9e --- /dev/null +++ b/tests/full_wlm/with_ray/test_ray_pbs_batch.py @@ -0,0 +1,75 @@ +import logging +import sys +import time +from os import environ +from shutil import which + +import pytest + +from smartsim import Experiment +from smartsim.exp.ray import RayCluster + +"""Test Ray cluster Slurm launch and shutdown. +""" + +# retrieved from pytest fixtures +if pytest.test_launcher not in pytest.wlm_options: + pytestmark = pytest.mark.skip(reason="Not testing WLM integrations") + + +environ["OMP_NUM_THREADS"] = "1" +shouldrun = True +try: + import ray +except ImportError: + shouldrun = False + + +pytestmark = pytest.mark.skipif( + not shouldrun, + reason="requires Ray", +) + + +def test_ray_launch_and_shutdown_batch(fileutils, wlmutils, caplog): + launcher = wlmutils.get_test_launcher() + if launcher != "pbs": + pytest.skip("Test only runs on systems with PBS as WLM") + + caplog.set_level(logging.CRITICAL) + test_dir = fileutils.make_test_dir("test-ray-pbs-launch-and-shutdown-batch") + + exp = Experiment("ray-cluster", test_dir, launcher=launcher) + cluster = RayCluster( + name="ray-cluster", + run_args={}, + ray_args={"num-cpus": 4}, + launcher=launcher, + num_nodes=2, + alloc=None, + batch=True, + ray_port=6830, + time="00:05:00", + interface=wlmutils.get_test_interface(), + ) + + exp.generate(cluster) + exp.start(cluster, block=False, summary=True) + ctx = ray.client(cluster.get_head_address() + ":10001").connect() + + right_resources = False + trials = 10 + while not right_resources and trials > 0: + right_resources = (len(ray.nodes()), ray.cluster_resources()["CPU"]) == (2, 8) + trials -= 1 + time.sleep(1) + + if not right_resources: + ctx.disconnect() + ray.shutdown() + exp.stop(cluster) + assert False + + ctx.disconnect() + ray.shutdown() + exp.stop(cluster) diff --git a/tests/full_wlm/with_ray/test_ray_slurm_batch.py b/tests/full_wlm/with_ray/test_ray_slurm_batch.py new file mode 100644 index 000000000..8f750acbb --- /dev/null +++ b/tests/full_wlm/with_ray/test_ray_slurm_batch.py @@ -0,0 +1,72 @@ +import logging +import sys +import time +from os import environ + +import pytest + +from smartsim import Experiment +from smartsim.exp.ray import RayCluster +from smartsim.launcher import slurm + +"""Test Ray cluster Slurm launch and shutdown. +""" + +# retrieved from pytest fixtures +if pytest.test_launcher not in pytest.wlm_options: + pytestmark = pytest.mark.skip(reason="Not testing WLM integrations") + +environ["OMP_NUM_THREADS"] = "1" +shouldrun = True +try: + import ray +except ImportError: + shouldrun = False + + +pytestmark = pytest.mark.skipif( + not shouldrun, + reason="requires Ray", +) + + +def test_ray_launch_and_shutdown_batch(fileutils, wlmutils, caplog): + launcher = wlmutils.get_test_launcher() + if launcher != "slurm": + pytest.skip("Test only runs on systems with Slurm as WLM") + + caplog.set_level(logging.CRITICAL) + test_dir = fileutils.make_test_dir("test-ray-slurm-launch-and-shutdown-batch") + + exp = Experiment("ray-cluster", test_dir, launcher=launcher) + cluster = RayCluster( + name="ray-cluster", + run_args={}, + ray_args={"num-cpus": 4}, + launcher=launcher, + num_nodes=2, + alloc=None, + batch=True, + interface=wlmutils.get_test_interface(), + ) + + exp.generate(cluster) + exp.start(cluster, block=False, summary=True) + ctx = ray.client(cluster.get_head_address() + ":10001").connect() + + right_resources = False + trials = 10 + while not right_resources and trials > 0: + right_resources = (len(ray.nodes()), ray.cluster_resources()["CPU"]) == (2, 8) + trials -= 1 + time.sleep(1) + + if not right_resources: + ctx.disconnect() + ray.shutdown() + exp.stop(cluster) + assert False + + ctx.disconnect() + ray.shutdown() + exp.stop(cluster) diff --git a/tests/on_wlm/test_launch_orc_cobalt.py b/tests/on_wlm/test_launch_orc_cobalt.py index 43872e89e..09de69144 100644 --- a/tests/on_wlm/test_launch_orc_cobalt.py +++ b/tests/on_wlm/test_launch_orc_cobalt.py @@ -57,9 +57,6 @@ def test_launch_cobalt_cluster_orc(fileutils, wlmutils): ) orc.set_path(test_dir) - orc.set_cpus(4) - assert all([db.run_settings.run_args["cpus-per-pe"] == 4 for db in orc.entities]) - exp.start(orc, block=True) status = exp.get_status(orc) diff --git a/tests/on_wlm/test_launch_orc_pbs.py b/tests/on_wlm/test_launch_orc_pbs.py index 506f4249a..f93f2f683 100644 --- a/tests/on_wlm/test_launch_orc_pbs.py +++ b/tests/on_wlm/test_launch_orc_pbs.py @@ -61,9 +61,6 @@ def test_launch_pbs_cluster_orc(fileutils, wlmutils): ) orc.set_path(test_dir) - orc.set_cpus(4) - assert all([db.run_settings.run_args["cpus-per-pe"] == 4 for db in orc.entities]) - exp.start(orc, block=True) status = exp.get_status(orc) diff --git a/tests/on_wlm/test_launch_orc_slurm.py b/tests/on_wlm/test_launch_orc_slurm.py index 7d1fec597..a501d4497 100644 --- a/tests/on_wlm/test_launch_orc_slurm.py +++ b/tests/on_wlm/test_launch_orc_slurm.py @@ -23,9 +23,6 @@ def test_launch_slurm_orc(fileutils, wlmutils): orc = SlurmOrchestrator(6780, batch=False, interface=network_interface) orc.set_path(test_dir) - orc.set_cpus(4) - assert all([db.run_settings.run_args["cpus-per-task"] == 4 for db in orc.entities]) - exp.start(orc, block=True) status = exp.get_status(orc) diff --git a/tests/test_configs/smartredis/consumer.py b/tests/test_configs/smartredis/consumer.py index d3b90d517..9cfdecc08 100644 --- a/tests/test_configs/smartredis/consumer.py +++ b/tests/test_configs/smartredis/consumer.py @@ -5,6 +5,7 @@ import numpy as np import torch import torch.nn as nn + from smartredis import Client if __name__ == "__main__": diff --git a/tests/test_configs/smartredis/producer.py b/tests/test_configs/smartredis/producer.py index 2f1c00690..baf8f16d6 100644 --- a/tests/test_configs/smartredis/producer.py +++ b/tests/test_configs/smartredis/producer.py @@ -5,6 +5,7 @@ import numpy as np import torch import torch.nn as nn + from smartredis import Client diff --git a/tests/test_manifest.py b/tests/test_manifest.py index b2b7ab982..d2e3052fd 100644 --- a/tests/test_manifest.py +++ b/tests/test_manifest.py @@ -1,3 +1,4 @@ +import sys from copy import deepcopy import pytest @@ -6,8 +7,15 @@ from smartsim.control import Manifest from smartsim.database import Orchestrator from smartsim.error import SmartSimError +from smartsim.exp.ray import RayCluster from smartsim.settings import RunSettings +ray_ok = True +try: + import ray +except ImportError: + ray_ok = False + # ---- create entities for testing -------- rs = RunSettings("python", "sleep.py") @@ -16,24 +24,34 @@ model = exp.create_model("model_1", run_settings=rs) model_2 = exp.create_model("model_1", run_settings=rs) ensemble = exp.create_ensemble("ensemble", run_settings=rs, replicas=1) + + orc = Orchestrator() orc_1 = deepcopy(orc) orc_1.name = "orc2" model_no_name = exp.create_model(name=None, run_settings=rs) +if ray_ok: + rc = RayCluster(name="ray-cluster", workers=0, launcher="slurm") def test_separate(): - manifest = Manifest(model, ensemble, orc) + if ray_ok: + manifest = Manifest(model, ensemble, orc, rc) + else: + manifest = Manifest(model, ensemble, orc) assert manifest.models[0] == model assert len(manifest.models) == 1 assert manifest.ensembles[0] == ensemble assert len(manifest.ensembles) == 1 assert manifest.db == orc + if ray_ok: + assert len(manifest.ray_clusters) == 1 + assert manifest.ray_clusters[0] == rc def test_no_name(): with pytest.raises(AttributeError): - manifest = Manifest(model_no_name) + _ = Manifest(model_no_name) def test_two_orc(): @@ -44,12 +62,12 @@ def test_two_orc(): def test_separate_type(): with pytest.raises(TypeError): - manifest = Manifest([1, 2, 3]) + _ = Manifest([1, 2, 3]) def test_name_collision(): with pytest.raises(SmartSimError): - manifest = Manifest(model, model_2) + _ = Manifest(model, model_2) def test_corner_case(): @@ -62,4 +80,4 @@ class Person: p = Person() with pytest.raises(TypeError): - manifest = Manifest(p) + _ = Manifest(p) diff --git a/tests/test_smartredis.py b/tests/test_smartredis.py index b30f8d5f0..550824ef8 100644 --- a/tests/test_smartredis.py +++ b/tests/test_smartredis.py @@ -20,17 +20,18 @@ REDIS_PORT = 6780 - +shouldrun = True try: - import smartredis import torch + + import smartredis except ImportError: - pass + shouldrun = False pytestmark = pytest.mark.skipif( - ("torch" not in sys.modules), - reason="requires PyTorch", + not shouldrun, + reason="requires PyTorch and SmartRedis", ) diff --git a/tests/with_ray/on_wlm/test_ray_pbs.py b/tests/with_ray/on_wlm/test_ray_pbs.py new file mode 100644 index 000000000..3858d0241 --- /dev/null +++ b/tests/with_ray/on_wlm/test_ray_pbs.py @@ -0,0 +1,75 @@ +import logging +import sys +import time +from os import environ +from shutil import which + +import pytest + +from smartsim import Experiment +from smartsim.exp.ray import RayCluster + +"""Test Ray cluster Slurm launch and shutdown. +""" + +# retrieved from pytest fixtures +if pytest.test_launcher not in pytest.wlm_options: + pytestmark = pytest.mark.skip(reason="Not testing WLM integrations") + + +environ["OMP_NUM_THREADS"] = "1" +shouldrun = True +try: + import ray +except ImportError: + shouldrun = False + + +pytestmark = pytest.mark.skipif( + not shouldrun, + reason="requires Ray", +) + + +def test_ray_launch_and_shutdown(fileutils, wlmutils, caplog): + launcher = wlmutils.get_test_launcher() + if launcher != "pbs": + pytest.skip("Test only runs on systems with PBS as WLM") + + caplog.set_level(logging.CRITICAL) + test_dir = fileutils.make_test_dir("test-ray-pbs-launch-and-shutdown") + + exp = Experiment("ray-cluster", test_dir, launcher=launcher) + cluster = RayCluster( + name="ray-cluster", + run_args={}, + ray_args={"num-cpus": 4}, + launcher=launcher, + num_nodes=2, + alloc=None, + batch=False, + time="00:05:00", + ray_port=6830, + interface=wlmutils.get_test_interface(), + ) + + exp.generate(cluster) + exp.start(cluster, block=False, summary=False) + ctx = ray.client(cluster.get_head_address() + ":10001").connect() + + right_resources = False + trials = 10 + while not right_resources and trials > 0: + right_resources = (len(ray.nodes()), ray.cluster_resources()["CPU"]) == (2, 8) + trials -= 1 + time.sleep(1) + + if not right_resources: + ctx.disconnect() + ray.shutdown() + exp.stop(cluster) + assert False + + ctx.disconnect() + ray.shutdown() + exp.stop(cluster) diff --git a/tests/with_ray/on_wlm/test_ray_slurm.py b/tests/with_ray/on_wlm/test_ray_slurm.py new file mode 100644 index 000000000..143d34b1f --- /dev/null +++ b/tests/with_ray/on_wlm/test_ray_slurm.py @@ -0,0 +1,122 @@ +import logging +import sys +import time +from os import environ + +import pytest + +from smartsim import Experiment +from smartsim.exp.ray import RayCluster +from smartsim.launcher import slurm + +"""Test Ray cluster Slurm launch and shutdown. +""" + +# retrieved from pytest fixtures +if pytest.test_launcher not in pytest.wlm_options: + pytestmark = pytest.mark.skip(reason="Not testing WLM integrations") + + +environ["OMP_NUM_THREADS"] = "1" +shouldrun = True +try: + import ray +except ImportError: + shouldrun = False + + +pytestmark = pytest.mark.skipif( + not shouldrun, + reason="requires Ray", +) + + +def test_ray_launch_and_shutdown(fileutils, wlmutils, caplog): + launcher = wlmutils.get_test_launcher() + if launcher != "slurm": + pytest.skip("Test only runs on systems with Slurm as WLM") + + caplog.set_level(logging.CRITICAL) + test_dir = fileutils.make_test_dir("test-ray-slurm-launch-and-shutdown") + + exp = Experiment("ray-cluster", test_dir, launcher=launcher) + cluster = RayCluster( + name="ray-cluster", + run_args={}, + ray_args={"num-cpus": 4}, + launcher=launcher, + num_nodes=2, + alloc=None, + batch=False, + time="00:05:00", + interface=wlmutils.get_test_interface(), + ) + + exp.generate(cluster) + exp.start(cluster, block=False, summary=False) + ctx = ray.client(cluster.get_head_address() + ":10001").connect() + + right_resources = False + trials = 10 + while not right_resources and trials > 0: + right_resources = (len(ray.nodes()), ray.cluster_resources()["CPU"]) == (2, 8) + trials -= 1 + time.sleep(1) + + if not right_resources: + ctx.disconnect() + ray.shutdown() + exp.stop(cluster) + assert False + + ctx.disconnect() + ray.shutdown() + exp.stop(cluster) + + +def test_ray_launch_and_shutdown_in_alloc(fileutils, wlmutils, caplog): + launcher = wlmutils.get_test_launcher() + if launcher != "slurm": + pytest.skip("Test only runs on systems with Slurm as WLM") + if "SLURM_JOBID" in environ: + pytest.skip("Test can not be run inside an allocation") + + caplog.set_level(logging.CRITICAL) + test_dir = fileutils.make_test_dir("test-ray-slurm-launch-and-shutdown-in-alloc") + + alloc = slurm.get_allocation(4, time="00:05:00") + + exp = Experiment("ray-cluster", test_dir, launcher=launcher) + cluster = RayCluster( + name="ray-cluster", + run_args={}, + ray_args={"num-cpus": 4}, + launcher=launcher, + workers=2, + alloc=alloc, + batch=False, + interface=wlmutils.get_test_interface(), + ) + + exp.generate(cluster) + exp.start(cluster, block=False, summary=False) + ctx = ray.client(cluster.get_head_address() + ":10001").connect() + + right_resources = False + trials = 10 + while not right_resources and trials > 0: + right_resources = (len(ray.nodes()), ray.cluster_resources()["CPU"]) == (3, 12) + trials -= 1 + time.sleep(1) + + if not right_resources: + ctx.disconnect() + ray.shutdown() + exp.stop(cluster) + slurm.release_allocation(alloc) + assert False + + ctx.disconnect() + ray.shutdown() + exp.stop(cluster) + slurm.release_allocation(alloc) diff --git a/tests/with_ray/test_ray.py b/tests/with_ray/test_ray.py new file mode 100644 index 000000000..d900ec2f8 --- /dev/null +++ b/tests/with_ray/test_ray.py @@ -0,0 +1,107 @@ +import logging +import sys +from os import environ + +import psutil +import pytest + +from smartsim import Experiment +from smartsim.error import SSUnsupportedError +from smartsim.exp.ray import RayCluster + +"""Test Ray cluster local launch and shutdown. +""" + +environ["OMP_NUM_THREADS"] = "1" +shouldrun = True +try: + import ray +except ImportError: + shouldrun = False + +pytestmark = pytest.mark.skipif( + not shouldrun, + reason="requires Ray", +) + + +@pytest.mark.skip(reason="Local launch is currently disabled for Ray") +def test_ray_local_launch_and_shutdown(fileutils, caplog): + """Start a local (single node) Ray cluster and + shut it down. + """ + # Avoid Ray output + caplog.set_level(logging.CRITICAL) + + test_dir = fileutils.make_test_dir("test-ray-local-launch-and-shutdown") + + exp = Experiment("ray-cluster", launcher="local", exp_path=test_dir) + cluster = RayCluster( + name="ray-cluster", + run_args={}, + launcher="local", + ray_port=6830, + num_nodes=1, + batch=True, + ray_args={"num-cpus": "4", "dashboard-port": "8266"}, + ) + exp.generate(cluster, overwrite=False) + exp.start(cluster, block=False, summary=False) + + ray.init("ray://" + cluster.get_head_address() + ":10001") + + right_size = len(ray.nodes()) == 1 + if not right_size: + ray.shutdown() + exp.stop(cluster) + assert False + + right_resources = ray.cluster_resources()["CPU"] == 4 + if not right_resources: + ray.shutdown() + exp.stop(cluster) + assert False + + # Even setting batch to True must result in cluster.batch==False on local + if cluster.batch: + ray.shutdown() + exp.stop(cluster) + assert False + + ray.shutdown() + exp.stop(cluster) + + raylet_active = False + for proc in psutil.process_iter(): + try: + if "raylet" in proc.name().lower(): + raylet_active = True + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + pass + assert not raylet_active + + assert cluster.get_dashboard_address() == cluster.get_head_address() + ":8266" + + +def test_ray_errors(fileutils): + """Try to start a local Ray cluster with incorrect settings.""" + + test_dir = fileutils.make_test_dir("test-ray-errors") + + with pytest.raises(SSUnsupportedError): + _ = RayCluster( + name="local-ray-cluster", + path=test_dir, + run_args={}, + launcher="local", + num_nodes=1, + ) + + with pytest.raises(ValueError): + _ = RayCluster( + name="small-ray-cluster", + path=test_dir, + run_args={}, + launcher="slurm", + num_nodes=0, + ) diff --git a/tutorials/01_getting_started/01_getting_started.ipynb b/tutorials/01_getting_started/01_getting_started.ipynb index 2bb17da3c..9df06ed40 100644 --- a/tutorials/01_getting_started/01_getting_started.ipynb +++ b/tutorials/01_getting_started/01_getting_started.ipynb @@ -1368,7 +1368,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.7.7" + "version": "3.7.10" } }, "nbformat": 4, diff --git a/tutorials/05_starting_ray/05_starting_ray_builtin.ipynb b/tutorials/05_starting_ray/05_starting_ray_builtin.ipynb new file mode 100644 index 000000000..102455f55 --- /dev/null +++ b/tutorials/05_starting_ray/05_starting_ray_builtin.ipynb @@ -0,0 +1,279 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "4cba3240", + "metadata": {}, + "source": [ + "# Setting up a Ray cluster with SmartSim" + ] + }, + { + "cell_type": "markdown", + "id": "624cb31c", + "metadata": {}, + "source": [ + "## 1. Start the cluster\n", + "We set up a SmartSim experiment, which will handle the launch of the Ray cluster.\n", + "\n", + "First we import the relevant modules and set up variables. `NUM_NODES` is the number of Ray nodes we will deploy: the first one will be the head node, and we will run one node on each host." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bf6b043d", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import numpy as np\n", + "import os\n", + "import ray\n", + "from ray import tune\n", + "import ray.util\n", + "\n", + "from smartsim import Experiment\n", + "from smartsim.exp.ray import RayCluster\n", + "\n", + "NUM_NODES = 3\n", + "CPUS_PER_WORKER = 18\n", + "launcher='slurm'" + ] + }, + { + "cell_type": "markdown", + "id": "713f5f27", + "metadata": {}, + "source": [ + "Now we define a SmartSim experiment which will spin the Ray cluster. The output files will be located in the `ray-cluster` directory (relative to the path from where we are executing this notebook). We are limiting the number each ray node can use to `CPUS_PER_WORKER`: if we wanted to let it use all the cpus, it would suffice not to pass `ray_args`.\n", + "Notice that the cluster will be password-protected (the password, generated internally, will be shared with worker nodes).\n", + "\n", + "If the hosts are attached to multiple interfaces (e.g. `ib`, `eth0`, ...) we can specify to which one the Ray nodes should bind: it is recommended to always choose the one offering the best performances. On a Cray XC, for example, this will be `ipogif0`. \n", + "\n", + "To connect to the cluster, we will use the Ray client. Note that this approach only works with `ray>=1.6`, for previous versions, one has to add `password=None` to the `RayCluster` constructor." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a8851bff", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "exp = Experiment(\"ray-cluster\", launcher=launcher)\n", + "cluster = RayCluster(name=\"ray-cluster\", run_args={}, ray_args={\"num-cpus\": CPUS_PER_WORKER},\n", + " launcher=launcher, num_nodes=NUM_NODES, batch=False, interface=\"ipogif0\")" + ] + }, + { + "cell_type": "markdown", + "id": "a28512f9", + "metadata": {}, + "source": [ + "We now generate the needed directories. If an experiment with the same name already exists, this call will fail, to avoid overwriting existing results. If we want to overwrite, we can simply pass `overwrite=True` to `exp.generate()`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "30c66187", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "exp.generate(cluster, overwrite=True)" + ] + }, + { + "cell_type": "markdown", + "id": "5ddd1af8", + "metadata": {}, + "source": [ + "Now we are ready to start the cluster!" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "088251d3", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "exp.start(cluster, block=False, summary=False)" + ] + }, + { + "cell_type": "markdown", + "id": "847a4a74", + "metadata": {}, + "source": [ + "## 2. Start the ray driver script\n", + "\n", + "Now we can just connect to our running server." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2a90ff89", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "ctx = ray.init(\"ray://\"+cluster.get_head_address()+\":10001\")" + ] + }, + { + "cell_type": "markdown", + "id": "c6401082", + "metadata": {}, + "source": [ + "We can check that all resources are set properly." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c17e5555", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "print('''This cluster consists of\n", + " {} nodes in total\n", + " {} CPU resources in total\n", + " and the head node is running at {}'''.format(len(ray.nodes()), ray.cluster_resources()['CPU'], cluster.get_head_address()))\n" + ] + }, + { + "cell_type": "markdown", + "id": "4f6663d4", + "metadata": {}, + "source": [ + "And we can run a Ray Tune example, to see that everything is working." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1f08fc6a", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "tune.run(\n", + " \"PPO\",\n", + " stop={\"episode_reward_max\": 200},\n", + " config={\n", + " \"framework\": \"torch\",\n", + " \"env\": \"CartPole-v0\",\n", + " \"num_gpus\": 0,\n", + " \"lr\": tune.grid_search(np.linspace (0.001, 0.01, 50).tolist()),\n", + " \"log_level\": \"ERROR\",\n", + " },\n", + " local_dir=os.path.join(exp.exp_path, \"ray_log\"),\n", + " verbose=0,\n", + " fail_fast=True,\n", + " log_to_file=True,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "66e52249", + "metadata": {}, + "source": [ + "When the Ray job is running, we can connect to the Ray dashboard to monitor the evolution of the experiment. To do this, if Ray is running on a compute node of a remote system, we need to setup a SSH tunnel (we will see later how), to forward the port on which the dashboard is published to our local system. For example if the head address (printed in the cell above) is ``, and the system name is ``, we can establish a tunnel to the dashboard opening a terminal on the local system and entering\n", + "\n", + "```bash\n", + "ssh -L 8265::8265 \n", + "```\n", + "\n", + "Then, from a browser on the local system, we can just go to the address `http://localhost:8265` to see the dashboard.\n", + "\n", + "There are two things to know if something does not work:\n", + "1. We are using `8265` as a port, which is the default dashboard port. If that port is not free, we can bind the dashboard to another port, e.g. `PORT_NUMBER` (by adding `\"dashboard-port\": str(PORT_NUMBER)` to `ray_args` when creating the cluster) and the command has to be changed accordingly.\n", + "\n", + "2. If the port forwarding fails, it is possible that the interface is not reachable. In that case, one can add `\"dashboard-address\": \"0.0.0.0\"` to `ray_args` when creating the cluster, to bind the dashboard to all interfaces, or select a visible address if one knows it. We can then just use the node name (or its public IP) to establish the tunnel, by entering (on the local terminal)\n", + " ```bash\n", + " ssh -L 8265::8265 \n", + " ```\n", + "please refer to your system guide to find out how you can get the name and the address of a node." + ] + }, + { + "cell_type": "markdown", + "id": "6da5f0a5", + "metadata": {}, + "source": [ + "## 3. Stop cluster and release allocation\n", + "\n", + "We first shut down the Ray runtime, then disconnect the context." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4961f1d6", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "ray.shutdown()\n", + "ctx.disconnect()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now that all is gracefully stopped, we can stop the job on the allocation." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f19f7b95", + "metadata": {}, + "outputs": [], + "source": [ + "exp.stop(cluster)" + ] + } + ], + "metadata": { + "interpreter": { + "hash": "b738ecfe013e3ceede67431676fa5746fa1d95dad4240bd1c29430e75f30557e" + }, + "kernelspec": { + "display_name": "smartsim", + "language": "python", + "name": "smartsim" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.8" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}