From 21542a64b7b365b20308ed70b80bbb33af2b0d61 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Thu, 13 Jul 2023 09:20:58 -0700 Subject: [PATCH] [AIR] Remove head node syncing as the default storage option (#37142) Signed-off-by: Justin Yu Signed-off-by: harborn --- .../dolly_lightning_fsdp_finetuning.ipynb | 48 +++++++-- .../examples/gptj_deepspeed_fine_tuning.ipynb | 32 +++++- .../02_many_model_training/start.ipynb | 9 +- python/ray/air/constants.py | 6 ++ .../examples/pytorch/torch_linear_example.py | 5 +- .../examples/tf/tensorflow_mnist_example.py | 8 +- python/ray/tune/syncer.py | 83 +++++++++++++--- python/ray/tune/tests/test_multinode_sync.py | 7 +- python/ray/tune/tests/test_syncer_callback.py | 97 ++++++++++++++++++- .../workloads/xgboost_benchmark.py | 5 +- release/air_tests/frequent_pausing/script.py | 3 +- .../horovod/workloads/horovod_tune_test.py | 1 + .../workloads/torch_tune_serve_test.py | 1 + .../lightning_tests/workloads/test_trainer.py | 3 +- .../lightning_tests/workloads/test_tuner.py | 1 + release/long_running_tests/workloads/apex.py | 1 + .../long_running_tests/workloads/impala.py | 1 + .../train/train_tensorflow_mnist_test.py | 4 +- .../train/train_torch_linear_test.py | 4 +- .../tune_rllib/run_connect_tests.py | 2 +- ..._test_basic_multi_node_training_learner.py | 4 + .../cloud_tests/workloads/_tune_script.py | 5 + .../cloud_tests/workloads/run_cloud_test.py | 58 +++++++++++ .../test_long_running_large_checkpoints.py | 1 + rllib/examples/tune/framework.py | 3 +- rllib/utils/test_utils.py | 6 ++ 26 files changed, 358 insertions(+), 40 deletions(-) diff --git a/doc/source/ray-air/examples/dolly_lightning_fsdp_finetuning.ipynb b/doc/source/ray-air/examples/dolly_lightning_fsdp_finetuning.ipynb index a57b9e34dfe494..e6eec57f01ba98 100644 --- a/doc/source/ray-air/examples/dolly_lightning_fsdp_finetuning.ipynb +++ b/doc/source/ray-air/examples/dolly_lightning_fsdp_finetuning.ipynb @@ -139,6 +139,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -321,11 +322,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Fine-tune with LightningTrainer\n", - "\n", - "```{note}\n", - "Here we save the checkpoints to the local file system. You can also upload the checkpoints to cloud storage by setting S3 bucket URI to {class}`air.RunConfig(storage_path=S3_BUCKET_URI) `. See {ref}`train-run-config` for an example.\n", - "```" + "## Fine-tune with LightningTrainer" ] }, { @@ -387,6 +384,43 @@ "lightning_config.trainer(callbacks=[DollyV2ProgressBar(num_iters_per_epoch)])" ] }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "```{note}\n", + "Since this example runs with multiple nodes, we need to persist checkpoints\n", + "and other outputs to some external storage for access after training has completed.\n", + "**You should set up cloud storage or NFS, then replace `storage_path` with your own cloud bucket URI or NFS path.**\n", + "\n", + "See the [storage guide](tune-storage-options) for more details.\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "storage_path=\"s3://your-bucket-here\" # TODO: Set up cloud storage\n", + "# storage_path=\"/mnt/path/to/nfs\" # TODO: Alternatively, set up NFS" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [ + "remove-cell" + ] + }, + "outputs": [], + "source": [ + "storage_path = \"/mnt/cluster_storage\"" + ] + }, { "cell_type": "code", "execution_count": 9, @@ -921,9 +955,10 @@ "from ray.tune.syncer import SyncConfig\n", "# Save AIR checkpoints according to the performance on validation set\n", "run_config = RunConfig(\n", + " storage_path=storage_path,\n", " name=\"finetune_dolly-v2-7b\",\n", " checkpoint_config=CheckpointConfig(),\n", - " sync_config=SyncConfig(sync_artifacts=False)\n", + " sync_config=SyncConfig(sync_artifacts=False),\n", ")\n", "\n", "# Scale the DDP training workload across 16 GPUs\n", @@ -954,6 +989,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ diff --git a/doc/source/ray-air/examples/gptj_deepspeed_fine_tuning.ipynb b/doc/source/ray-air/examples/gptj_deepspeed_fine_tuning.ipynb index 7cb444ddb7251d..2ca0d0f32f4b26 100644 --- a/doc/source/ray-air/examples/gptj_deepspeed_fine_tuning.ipynb +++ b/doc/source/ray-air/examples/gptj_deepspeed_fine_tuning.ipynb @@ -582,10 +582,37 @@ "We pass the preprocessors we have defined earlier as an argument, wrapped in a {class}`~ray.data.preprocessors.chain.Chain`. The preprocessor will be included with the returned {class}`~ray.air.checkpoint.Checkpoint`, meaning it will also be applied during inference.\n", "\n", "```{note}\n", - "If you want to upload checkpoints to cloud storage (eg. S3), set {class}`air.RunConfig(storage_path) `. See {ref}`train-run-config` for an example. Using cloud storage is highly recommended, especially for production.\n", + "Since this example runs with multiple nodes, we need to persist checkpoints\n", + "and other outputs to some external storage for access after training has completed.\n", + "**You should set up cloud storage or NFS, then replace `storage_path` with your own cloud bucket URI or NFS path.**\n", + "\n", + "See the [storage guide](tune-storage-options) for more details.\n", "```" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "storage_path=\"s3://your-bucket-here\" # TODO: Set up cloud storage\n", + "# storage_path=\"/mnt/path/to/nfs\" # TODO: Alternatively, set up NFS" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [ + "remove-cell" + ] + }, + "outputs": [], + "source": [ + "storage_path = \"/mnt/cluster_storage\"" + ] + }, { "cell_type": "code", "execution_count": 12, @@ -593,7 +620,7 @@ "outputs": [], "source": [ "from ray.train.huggingface import TransformersTrainer\n", - "from ray.air.config import ScalingConfig\n", + "from ray.air import RunConfig, ScalingConfig\n", "from ray.data.preprocessors import Chain\n", "\n", "\n", @@ -610,6 +637,7 @@ " ),\n", " datasets={\"train\": ray_datasets[\"train\"], \"evaluation\": ray_datasets[\"validation\"]},\n", " preprocessor=Chain(splitter, tokenizer),\n", + " run_config=RunConfig(storage_path=storage_path),\n", ")" ] }, diff --git a/doc/source/templates/02_many_model_training/start.ipynb b/doc/source/templates/02_many_model_training/start.ipynb index e1a76f0e9ba764..89cdd345002e0a 100644 --- a/doc/source/templates/02_many_model_training/start.ipynb +++ b/doc/source/templates/02_many_model_training/start.ipynb @@ -75,7 +75,7 @@ "from statsforecast.models import AutoARIMA, AutoETS, MSTL\n", "\n", "from ray import tune\n", - "from ray.air import session\n" + "from ray.air import session, RunConfig\n" ] }, { @@ -265,7 +265,12 @@ "metadata": {}, "outputs": [], "source": [ - "tuner = tune.Tuner(trainable, param_space=param_space)\n", + "tuner = tune.Tuner(\n", + " trainable,\n", + " param_space=param_space,\n", + " # Experiment results are saved to a shared filesystem available to all nodes.\n", + " run_config=RunConfig(storage_path=\"/mnt/cluster_storage\"),\n", + ")\n", "result_grid = tuner.fit()\n" ] }, diff --git a/python/ray/air/constants.py b/python/ray/air/constants.py index d58234a01a58cc..6dc8a2bd77a7a2 100644 --- a/python/ray/air/constants.py +++ b/python/ray/air/constants.py @@ -93,10 +93,16 @@ # as Trainable) DISABLE_LAZY_CHECKPOINTING_ENV = "TRAIN_DISABLE_LAZY_CHECKPOINTING" +# TODO(ml-team): [Deprecation - head node syncing] +# Whether or not the sync-to-head behavior is enabled by default. +# If unset, running AIR on a multi-node cluster with checkpointing will raise +# an error telling the user to switch to cloud/NFS. +REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE = "RAY_AIR_REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE" # NOTE: When adding a new environment variable, please track it in this list. # TODO(ml-team): Most env var constants should get moved here. AIR_ENV_VARS = { + REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE, COPY_DIRECTORY_CHECKPOINTS_INSTEAD_OF_MOVING_ENV, DISABLE_LAZY_CHECKPOINTING_ENV, "RAY_AIR_FULL_TRACEBACKS", diff --git a/python/ray/train/examples/pytorch/torch_linear_example.py b/python/ray/train/examples/pytorch/torch_linear_example.py index 647b51a0db6b2b..4fc050605b2767 100644 --- a/python/ray/train/examples/pytorch/torch_linear_example.py +++ b/python/ray/train/examples/pytorch/torch_linear_example.py @@ -6,7 +6,7 @@ import torch.nn as nn import ray.train as train from ray.train.torch import TorchTrainer, TorchCheckpoint -from ray.air.config import ScalingConfig +from ray.air.config import RunConfig, ScalingConfig class LinearDataset(torch.utils.data.Dataset): @@ -85,12 +85,13 @@ def train_func(config): return results -def train_linear(num_workers=2, use_gpu=False, epochs=3): +def train_linear(num_workers=2, use_gpu=False, epochs=3, storage_path=None): config = {"lr": 1e-2, "hidden_size": 1, "batch_size": 4, "epochs": epochs} trainer = TorchTrainer( train_loop_per_worker=train_func, train_loop_config=config, scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu), + run_config=RunConfig(storage_path=storage_path), ) result = trainer.fit() diff --git a/python/ray/train/examples/tf/tensorflow_mnist_example.py b/python/ray/train/examples/tf/tensorflow_mnist_example.py index 730279a983eff9..46fd187ab5dadf 100644 --- a/python/ray/train/examples/tf/tensorflow_mnist_example.py +++ b/python/ray/train/examples/tf/tensorflow_mnist_example.py @@ -12,7 +12,7 @@ from ray.train.tensorflow import TensorflowTrainer from ray.air.integrations.keras import ReportCheckpointCallback -from ray.air.config import ScalingConfig +from ray.air.config import RunConfig, ScalingConfig def mnist_dataset(batch_size: int) -> tf.data.Dataset: @@ -79,13 +79,17 @@ def train_func(config: dict): def train_tensorflow_mnist( - num_workers: int = 2, use_gpu: bool = False, epochs: int = 4 + num_workers: int = 2, + use_gpu: bool = False, + epochs: int = 4, + storage_path: str = None, ) -> Result: config = {"lr": 1e-3, "batch_size": 64, "epochs": epochs} trainer = TensorflowTrainer( train_loop_per_worker=train_func, train_loop_config=config, scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu), + run_config=RunConfig(storage_path=storage_path), ) results = trainer.fit() return results diff --git a/python/ray/tune/syncer.py b/python/ray/tune/syncer.py index 5694eab731fc92..100b145a72da25 100644 --- a/python/ray/tune/syncer.py +++ b/python/ray/tune/syncer.py @@ -40,7 +40,11 @@ delete_at_uri, is_non_local_path_uri, ) -from ray.air.constants import LAZY_CHECKPOINT_MARKER_FILE, TRAINING_ITERATION +from ray.air.constants import ( + LAZY_CHECKPOINT_MARKER_FILE, + REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE, + TRAINING_ITERATION, +) from ray.exceptions import RayActorError from ray.tune import TuneError from ray.tune.callback import Callback @@ -75,6 +79,30 @@ f"./{LAZY_CHECKPOINT_MARKER_FILE}", ] +_SYNC_TO_HEAD_DEPRECATION_MESSAGE = ( + "Ray AIR no longer supports the synchronization of checkpoints and other " + "artifacts from worker nodes to the head node. This means that the " + "checkpoints and artifacts saved by trials scheduled on worker nodes will not be " + "accessible during the run (e.g., resuming from a checkpoint " + "after a failure) or after the run " + "(e.g., loading the checkpoint of a trial that ran on an already " + "terminated worker node).\n\n" + "To fix this issue, configure AIR to use either:\n" + "(1) Cloud storage: `RunConfig(storage_path='s3://your/bucket')`\n" + "(2) A network filesystem mounted on all nodes: " + "`RunConfig(storage_path='/mnt/path/to/nfs_storage')`\n" + "See this Github issue for more details on transitioning to cloud storage/NFS " + "as well as an explanation on why this functionality is " + "being removed: https://github.com/ray-project/ray/issues/37177\n\n" + "Other temporary workarounds:\n" + "- If you want to avoid errors/warnings and continue running with " + "syncing explicitly turned off, set `RunConfig(SyncConfig(syncer=None))`\n" + "- Or, to re-enable the head node syncing behavior, set the " + f"environment variable {REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE}=1\n" + " - **Note that this functionality will tentatively be hard-deprecated in " + "Ray 2.7.** See the linked issue for the latest information." +) + @PublicAPI @dataclass @@ -790,14 +818,6 @@ def _sync_trial_dir( if not self._enabled or trial.uses_cloud_checkpointing: return False - sync_process = self._get_trial_sync_process(trial) - - # Always run if force=True - # Otherwise, only run if we should sync (considering sync period) - # and if there is no sync currently still running. - if not force and (not self._should_sync(trial) or sync_process.is_running): - return False - source_ip = self._trial_ips.get(trial.trial_id, None) if not source_ip: @@ -815,6 +835,22 @@ def _sync_trial_dir( self._trial_ips[trial.trial_id] = source_ip + if not bool(int(os.environ.get(REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE, "0"))): + # Only log a warning for remote trials, since + # this only affects artifacts that are saved on worker nodes. + if source_ip != ray.util.get_node_ip_address(): + if log_once("deprecated_head_node_sync"): + logger.warning(_SYNC_TO_HEAD_DEPRECATION_MESSAGE) + return False + + sync_process = self._get_trial_sync_process(trial) + + # Always run if force=True + # Otherwise, only run if we should sync (considering sync period) + # and if there is no sync currently still running. + if not force and (not self._should_sync(trial) or sync_process.is_running): + return False + try: sync_process.wait() except TuneError as e: @@ -887,16 +923,33 @@ def on_checkpoint( checkpoint: _TrackedCheckpoint, **info, ): + if not self._enabled or trial.uses_cloud_checkpointing: + return + if checkpoint.storage_mode == CheckpointStorage.MEMORY: return - if self._sync_trial_dir( - trial, force=trial.sync_on_checkpoint, wait=True - ) and not os.path.exists(checkpoint.dir_or_data): - raise TuneError( - f"Trial {trial}: Checkpoint path {checkpoint.dir_or_data} not " - "found after successful sync down." + if not bool(int(os.environ.get(REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE, "0"))): + # If we have saved a checkpoint, but it's not accessible on the driver, + # that means that it lives on some other node and would be synced to head + # prior to Ray 2.6. + if not os.path.exists(checkpoint.dir_or_data): + raise DeprecationWarning(_SYNC_TO_HEAD_DEPRECATION_MESSAGE) + # else: + # No need to raise an error about syncing, since the driver can find + # the checkpoint, because either: + # - the checkpoint lives on the head node + # - a shared filesystem is used + else: + # Old head node syncing codepath + synced = self._sync_trial_dir( + trial, force=trial.sync_on_checkpoint, wait=True ) + if synced and not os.path.exists(checkpoint.dir_or_data): + raise TuneError( + f"Trial {trial}: Checkpoint path {checkpoint.dir_or_data} not " + "found after successful sync down." + ) def wait_for_all(self): # Remove any sync processes as needed, and only wait on the remaining ones. diff --git a/python/ray/tune/tests/test_multinode_sync.py b/python/ray/tune/tests/test_multinode_sync.py index cd9a9730b9eeba..3d3e591eba013a 100644 --- a/python/ray/tune/tests/test_multinode_sync.py +++ b/python/ray/tune/tests/test_multinode_sync.py @@ -8,6 +8,7 @@ import ray from ray import tune from ray.air.config import CheckpointConfig +from ray.air.constants import REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE from ray.air.util.node import _force_on_node from ray.autoscaler._private.fake_multi_node.node_provider import FAKE_HEAD_NODE_ID from ray.autoscaler._private.fake_multi_node.test_utils import DockerCluster @@ -206,7 +207,11 @@ def testCheckpointSync(self): ) # Connect via Ray client and wait until all nodes are there self.cluster.start() - self.cluster.connect(client=True, timeout=120) + self.cluster.connect( + client=True, + timeout=120, + runtime_env={"env_vars": {REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE: "1"}}, + ) self.cluster.wait_for_resources({"CPU": 12}) # This train function trains for 10 iterations per run diff --git a/python/ray/tune/tests/test_syncer_callback.py b/python/ray/tune/tests/test_syncer_callback.py index eb49e00acd8d44..9d9ab0c960e64c 100644 --- a/python/ray/tune/tests/test_syncer_callback.py +++ b/python/ray/tune/tests/test_syncer_callback.py @@ -10,12 +10,13 @@ import ray.util from ray.air._internal.checkpoint_manager import CheckpointStorage, _TrackedCheckpoint -from ray.air.constants import TRAINING_ITERATION +from ray.air.constants import TRAINING_ITERATION, REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE from ray.exceptions import RayActorError from ray.tune import TuneError from ray.tune.logger import NoopLogger from ray.tune.result import TIME_TOTAL_S from ray.tune.syncer import ( + _SYNC_TO_HEAD_DEPRECATION_MESSAGE, DEFAULT_SYNC_PERIOD, SyncConfig, SyncerCallback, @@ -46,6 +47,11 @@ def ray_start_2_cpus(): ray.shutdown() +@pytest.fixture(autouse=True) +def enable_legacy_head_node_syncing(monkeypatch): + monkeypatch.setenv(REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE, "1") + + @pytest.fixture def temp_data_dirs(): tmp_source = os.path.realpath(tempfile.mkdtemp()) @@ -119,14 +125,20 @@ def assert_file(exists: bool, root: str, path: str = ""): class MockTrial: - def __init__(self, trial_id: str, logdir: str, on_dead_node: bool = False): + def __init__( + self, + trial_id: str, + logdir: str, + on_dead_node: bool = False, + runner_ip: str = None, + ): self.trial_id = trial_id self.uses_cloud_checkpointing = False self.sync_on_checkpoint = True self.logdir = logdir self.local_path = logdir - self._local_ip = ray.util.get_node_ip_address() + self._local_ip = runner_ip or ray.util.get_node_ip_address() self._on_dead_node = on_dead_node def get_runner_ip(self): @@ -566,6 +578,85 @@ def train_fn(config): assert_file(False, tmp_target, "save_to_object1234") +# TODO(ml-team): [Deprecation - head node syncing] +def test_head_node_syncing_disabled_error(monkeypatch, tmp_path): + syncer_callback = SyncerCallback(sync_period=0) + trial = MockTrial(trial_id="a", logdir=None) + + # Raise a deprecation error if checkpointing in a multi-node cluster + monkeypatch.setenv(REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE, "0") + with pytest.raises(DeprecationWarning): + syncer_callback.on_checkpoint( + iteration=1, + trials=[], + trial=trial, + checkpoint=_TrackedCheckpoint( + dir_or_data="/does/not/exist", storage_mode=CheckpointStorage.PERSISTENT + ), + ) + + # Setting the env var raises the original TuneError instead of a deprecation + monkeypatch.setenv(REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE, "1") + with pytest.raises(TuneError): + syncer_callback.on_checkpoint( + iteration=1, + trials=[], + trial=trial, + checkpoint=_TrackedCheckpoint( + dir_or_data="/does/not/exist", storage_mode=CheckpointStorage.PERSISTENT + ), + ) + + # Make sure we don't raise an error if running on a single node or using NFS, + # where the checkpoint can be accessed from the driver. + monkeypatch.setenv(REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE, "0") + path_that_exists = tmp_path / "exists" + path_that_exists.mkdir() + syncer_callback.on_checkpoint( + iteration=1, + trials=[], + trial=trial, + checkpoint=_TrackedCheckpoint( + dir_or_data=str(path_that_exists), storage_mode=CheckpointStorage.PERSISTENT + ), + ) + + +# TODO(ml-team): [Deprecation - head node syncing] +def test_head_node_syncing_disabled_warning(propagate_logs, caplog, monkeypatch): + monkeypatch.setenv(REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE, "0") + syncer_callback = SyncerCallback(sync_period=0) + remote_trial_a = MockTrial(trial_id="a", logdir=None, runner_ip="remote") + remote_trial_b = MockTrial(trial_id="b", logdir=None, runner_ip="remote") + local_trial_c = MockTrial(trial_id="c", logdir=None) + + with caplog.at_level(logging.WARNING): + # The log should only be displayed once for the first remote trial. + syncer_callback._sync_trial_dir(local_trial_c) + assert caplog.text.count(_SYNC_TO_HEAD_DEPRECATION_MESSAGE) == 0 + + # Any attempts to sync from remote trials should no-op. + # Instead, print a warning message to the user explaining that + # no checkpoints or artifacts are pulled to the head node. + syncer_callback._sync_trial_dir(remote_trial_a) + assert caplog.text.count(_SYNC_TO_HEAD_DEPRECATION_MESSAGE) == 1 + + # More sync attempts shouldn't add any extra warnings. + syncer_callback._sync_trial_dir(remote_trial_b) + syncer_callback._sync_trial_dir(remote_trial_a) + syncer_callback._sync_trial_dir(local_trial_c) + + assert caplog.text.count(_SYNC_TO_HEAD_DEPRECATION_MESSAGE) == 1 + + disabled_syncer_callback = SyncerCallback(enabled=False) + remote_trial_d = MockTrial(trial_id="d", logdir=None, runner_ip="remote") + caplog.clear() + with caplog.at_level(logging.WARNING): + # No warning if syncing is explicitly disabled + disabled_syncer_callback._sync_trial_dir(remote_trial_d) + assert caplog.text.count(_SYNC_TO_HEAD_DEPRECATION_MESSAGE) == 0 + + if __name__ == "__main__": import sys diff --git a/release/air_tests/air_benchmarks/workloads/xgboost_benchmark.py b/release/air_tests/air_benchmarks/workloads/xgboost_benchmark.py index 8de20406143c8d..6bc912c3b59478 100644 --- a/release/air_tests/air_benchmarks/workloads/xgboost_benchmark.py +++ b/release/air_tests/air_benchmarks/workloads/xgboost_benchmark.py @@ -15,7 +15,7 @@ XGBoostPredictor, ) from ray.train.batch_predictor import BatchPredictor -from ray.air.config import ScalingConfig +from ray.air.config import RunConfig, ScalingConfig _XGB_MODEL_PATH = "model.json" _TRAINING_TIME_THRESHOLD = 1000 @@ -97,6 +97,9 @@ def run_xgboost_training(data_path: str, num_workers: int, cpus_per_worker: int) label_column="labels", params=params, datasets={"train": ds}, + run_config=RunConfig( + storage_path="/mnt/cluster_storage", name="xgboost_benchmark" + ), ) result = trainer.fit() checkpoint = XGBoostCheckpoint.from_checkpoint(result.checkpoint) diff --git a/release/air_tests/frequent_pausing/script.py b/release/air_tests/frequent_pausing/script.py index 8d1468bb513a71..0cbdf8f1bb6691 100644 --- a/release/air_tests/frequent_pausing/script.py +++ b/release/air_tests/frequent_pausing/script.py @@ -16,7 +16,7 @@ import numpy as np -from ray.air import session +from ray.air import session, RunConfig from ray.air.checkpoint import Checkpoint from ray.tune.schedulers.trial_scheduler import FIFOScheduler, TrialScheduler from ray.tune.tune_config import TuneConfig @@ -49,6 +49,7 @@ def on_trial_result(self, trial_runner, trial, result): tuner = Tuner( func, tune_config=TuneConfig(num_samples=2, scheduler=FrequentPausesScheduler()), + run_config=RunConfig(storage_path="/mnt/cluster_storage", name="frequent_pausing"), ) tuner.fit() diff --git a/release/air_tests/horovod/workloads/horovod_tune_test.py b/release/air_tests/horovod/workloads/horovod_tune_test.py index 58e32e8e225343..314c73c6cd1b4b 100755 --- a/release/air_tests/horovod/workloads/horovod_tune_test.py +++ b/release/air_tests/horovod/workloads/horovod_tune_test.py @@ -183,6 +183,7 @@ def train_loop_per_worker(config): failure_config=FailureConfig(fail_fast=False), checkpoint_config=CheckpointConfig(num_to_keep=1), callbacks=[ProgressCallback()], + storage_path="/mnt/cluster_storage", ), ) diff --git a/release/golden_notebook_tests/workloads/torch_tune_serve_test.py b/release/golden_notebook_tests/workloads/torch_tune_serve_test.py index c843953e756846..2c02d436ba5ee9 100644 --- a/release/golden_notebook_tests/workloads/torch_tune_serve_test.py +++ b/release/golden_notebook_tests/workloads/torch_tune_serve_test.py @@ -127,6 +127,7 @@ def train_mnist(test_mode=False, num_workers=1, use_gpu=False): ), run_config=RunConfig( verbose=1, + storage_path="/mnt/cluster_storage", ), ) diff --git a/release/lightning_tests/workloads/test_trainer.py b/release/lightning_tests/workloads/test_trainer.py index 117f6ee85ab9b2..da3ee4f5406f31 100644 --- a/release/lightning_tests/workloads/test_trainer.py +++ b/release/lightning_tests/workloads/test_trainer.py @@ -4,7 +4,7 @@ from pytorch_lightning.loggers.csv_logs import CSVLogger import ray -from ray.air.config import ScalingConfig +from ray.air.config import RunConfig, ScalingConfig from ray.train.lightning import LightningTrainer, LightningConfigBuilder from lightning_test_utils import MNISTClassifier, MNISTDataModule @@ -34,6 +34,7 @@ trainer = LightningTrainer( lightning_config=lightning_config, scaling_config=scaling_config, + run_config=RunConfig(storage_path="/mnt/cluster_storage"), ) result = trainer.fit() diff --git a/release/lightning_tests/workloads/test_tuner.py b/release/lightning_tests/workloads/test_tuner.py index 37c880cc7c67fa..eb71293048f893 100644 --- a/release/lightning_tests/workloads/test_tuner.py +++ b/release/lightning_tests/workloads/test_tuner.py @@ -53,6 +53,7 @@ lightning_trainer, param_space={"lightning_config": lightning_config}, run_config=ray.air.RunConfig( + storage_path="/mnt/cluster_storage", name="release-tuner-test", verbose=2, checkpoint_config=CheckpointConfig( diff --git a/release/long_running_tests/workloads/apex.py b/release/long_running_tests/workloads/apex.py index 3b2cc5e0a217c7..4aee3c40db3f27 100644 --- a/release/long_running_tests/workloads/apex.py +++ b/release/long_running_tests/workloads/apex.py @@ -52,6 +52,7 @@ "min_time_s_per_iteration": 10, "min_sample_timesteps_per_iteration": 10, }, + "storage_path": "/mnt/cluster_storage", } }, callbacks=[ProgressCallback()], diff --git a/release/long_running_tests/workloads/impala.py b/release/long_running_tests/workloads/impala.py index 9660dd1ee214fc..d727d1ec7341a4 100644 --- a/release/long_running_tests/workloads/impala.py +++ b/release/long_running_tests/workloads/impala.py @@ -56,6 +56,7 @@ "rollout_fragment_length": 50, "train_batch_size": 100, }, + "storage_path": "/mnt/cluster_storage", }, }, callbacks=[ProgressCallback()], diff --git a/release/ml_user_tests/train/train_tensorflow_mnist_test.py b/release/ml_user_tests/train/train_tensorflow_mnist_test.py index e158c7506dc521..627c112d8a2d91 100644 --- a/release/ml_user_tests/train/train_tensorflow_mnist_test.py +++ b/release/ml_user_tests/train/train_tensorflow_mnist_test.py @@ -16,7 +16,9 @@ else: ray.init(address="auto") - train_tensorflow_mnist(num_workers=6, use_gpu=True, epochs=20) + train_tensorflow_mnist( + num_workers=6, use_gpu=True, epochs=20, storage_path="/mnt/cluster_storage" + ) taken = time.time() - start result = { diff --git a/release/ml_user_tests/train/train_torch_linear_test.py b/release/ml_user_tests/train/train_torch_linear_test.py index d76a8433ee2eb3..48610020ea6683 100644 --- a/release/ml_user_tests/train/train_torch_linear_test.py +++ b/release/ml_user_tests/train/train_torch_linear_test.py @@ -17,7 +17,9 @@ else: ray.init(address="auto") - results = train_linear(num_workers=6, use_gpu=True, epochs=20) + results = train_linear( + num_workers=6, use_gpu=True, epochs=20, storage_path="/mnt/cluster_storage" + ) taken = time.time() - start result = {"time_taken": taken} diff --git a/release/ml_user_tests/tune_rllib/run_connect_tests.py b/release/ml_user_tests/tune_rllib/run_connect_tests.py index 5da1888a942942..8decd32f273328 100644 --- a/release/ml_user_tests/tune_rllib/run_connect_tests.py +++ b/release/ml_user_tests/tune_rllib/run_connect_tests.py @@ -19,7 +19,7 @@ ray.init(address="auto") start_time = time.time() - results = run() + results = run(storage_path="/mnt/cluster_storage") exp_analysis = results._experiment_analysis end_time = time.time() diff --git a/release/rllib_tests/smoke_tests/smoke_test_basic_multi_node_training_learner.py b/release/rllib_tests/smoke_tests/smoke_test_basic_multi_node_training_learner.py index f332d7b6b808b9..c54bcfb8bde777 100644 --- a/release/rllib_tests/smoke_tests/smoke_test_basic_multi_node_training_learner.py +++ b/release/rllib_tests/smoke_tests/smoke_test_basic_multi_node_training_learner.py @@ -10,6 +10,7 @@ def run_with_tuner_n_rollout_worker_2_gpu(config): "PPO", param_space=config, run_config=air.RunConfig( + storage_path="/mnt/cluster_storage", stop={"timesteps_total": 128}, failure_config=air.FailureConfig(fail_fast=True), ), @@ -24,6 +25,7 @@ def run_with_tuner_0_rollout_worker_2_gpu(config): "PPO", param_space=config, run_config=air.RunConfig( + storage_path="/mnt/cluster_storage", stop={"timesteps_total": 128}, failure_config=air.FailureConfig(fail_fast=True), ), @@ -43,6 +45,7 @@ def run_tuner_n_rollout_workers_0_gpu(config): "PPO", param_space=config, run_config=air.RunConfig( + storage_path="/mnt/cluster_storage", stop={"timesteps_total": 128}, failure_config=air.FailureConfig(fail_fast=True), ), @@ -62,6 +65,7 @@ def run_tuner_n_rollout_workers_1_gpu_local(config): "PPO", param_space=config, run_config=air.RunConfig( + storage_path="/mnt/cluster_storage", stop={"timesteps_total": 128}, failure_config=air.FailureConfig(fail_fast=True), ), diff --git a/release/tune_tests/cloud_tests/workloads/_tune_script.py b/release/tune_tests/cloud_tests/workloads/_tune_script.py index 4b4ba903a6c055..06a0dc1e431a49 100644 --- a/release/tune_tests/cloud_tests/workloads/_tune_script.py +++ b/release/tune_tests/cloud_tests/workloads/_tune_script.py @@ -7,6 +7,7 @@ import ray from ray import tune from ray.air import Checkpoint, session +from ray.air.constants import REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE from ray.rllib.algorithms.callbacks import DefaultCallbacks from ray.rllib.algorithms.ppo import PPO @@ -100,6 +101,10 @@ def run_tune( else: raise RuntimeError(f"Unknown trainable: {trainable}") + if not no_syncer and storage_path is None: + # syncer="auto" + storage_path=None -> legacy head node syncing path + os.environ[REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE] = "1" + tune.run( train, name=experiment_name, diff --git a/release/tune_tests/cloud_tests/workloads/run_cloud_test.py b/release/tune_tests/cloud_tests/workloads/run_cloud_test.py index 6093007e4cfb40..8783f843bbfaa1 100644 --- a/release/tune_tests/cloud_tests/workloads/run_cloud_test.py +++ b/release/tune_tests/cloud_tests/workloads/run_cloud_test.py @@ -35,6 +35,7 @@ import json import os import platform +import pytest import re import shutil import signal @@ -45,6 +46,8 @@ import ray import ray.cloudpickle as pickle +from ray import air, tune +from ray.air import Checkpoint, session from ray.tune.execution.trial_runner import _find_newest_experiment_checkpoint from ray.tune.utils.serialization import TuneFunctionDecoder @@ -1022,6 +1025,58 @@ def after_experiments(): ) +# TODO(ml-team): [Deprecation - head node syncing] +def test_head_node_syncing_disabled_error(): + """Tests that head node syncing is disabled properly in a multi-node setting. + Runs a 4 trial Tune run, where each trial uses 2 CPUs. + The cluster config = 4 nodes, each with 2 CPUs, so head node syncing + would have been required to synchronize checkpoints. + """ + + # Raise an error for checkpointing + no storage path + def train_fn(config): + session.report({"score": 1}, checkpoint=Checkpoint.from_dict({"dummy": 1})) + + tuner = tune.Tuner( + tune.with_resources(train_fn, {"CPU": 2.0}), + run_config=air.RunConfig( + storage_path=None, failure_config=air.FailureConfig(fail_fast="raise") + ), + tune_config=tune.TuneConfig(num_samples=4), + ) + with pytest.raises(DeprecationWarning): + tuner.fit() + print("Success: checkpointing without a storage path raises an error") + + # Workaround: continue running, with syncing explicitly disabled + tuner = tune.Tuner( + tune.with_resources(train_fn, {"CPU": 2.0}), + run_config=air.RunConfig( + storage_path=None, + failure_config=air.FailureConfig(fail_fast="raise"), + sync_config=tune.SyncConfig(syncer=None), + ), + tune_config=tune.TuneConfig(num_samples=4), + ) + tuner.fit() + print("Success: explicitly disabling syncing is a sufficient workaround") + + # Not hard failing for multi-node with no checkpointing + def train_fn_no_checkpoint(config): + session.report({"score": 1}) + + tuner = tune.Tuner( + tune.with_resources(train_fn_no_checkpoint, {"CPU": 2.0}), + run_config=air.RunConfig( + storage_path=None, failure_config=air.FailureConfig(fail_fast="raise") + ), + tune_config=tune.TuneConfig(num_samples=4), + ) + tuner.fit() + print("Success: a multi-node experiment without checkpoint still runs") + + +# TODO(ml-team): [Deprecation - head node syncing] def test_ssh_sync(): """ SSH syncing, so: @@ -1049,6 +1104,9 @@ def test_ssh_sync(): - All trials progressed with training """ + # Some preliminary checks that head node syncing is deprecated correctly. + test_head_node_syncing_disabled_error() + experiment_name = "cloud_ssh_sync" indicator_file = f"/tmp/{experiment_name}_indicator" diff --git a/release/tune_tests/scalability_tests/workloads/test_long_running_large_checkpoints.py b/release/tune_tests/scalability_tests/workloads/test_long_running_large_checkpoints.py index 9298cd457ce55b..3202fcdab36d45 100644 --- a/release/tune_tests/scalability_tests/workloads/test_long_running_large_checkpoints.py +++ b/release/tune_tests/scalability_tests/workloads/test_long_running_large_checkpoints.py @@ -42,6 +42,7 @@ def main(smoke_test: bool = False): resources_per_trial={"cpu": 1}, sync_config=tune.SyncConfig(syncer="auto"), callbacks=[callback], + storage_path="/mnt/cluster_storage", ) diff --git a/rllib/examples/tune/framework.py b/rllib/examples/tune/framework.py index 304b549708e93a..fb2042090f9276 100644 --- a/rllib/examples/tune/framework.py +++ b/rllib/examples/tune/framework.py @@ -14,7 +14,7 @@ logger = logging.getLogger("tune_framework") -def run(smoke_test=False): +def run(smoke_test=False, storage_path: str = None): stop = {"training_iteration": 1 if smoke_test else 50} num_workers = 1 if smoke_test else 20 num_gpus = 0 if smoke_test else 1 @@ -62,6 +62,7 @@ def run(smoke_test=False): sort_by_metric=True, max_report_frequency=30, ), + storage_path=storage_path, ), tune_config=tune.TuneConfig( num_samples=1, diff --git a/rllib/utils/test_utils.py b/rllib/utils/test_utils.py index 557eaf2e552b13..a69e21a4928261 100644 --- a/rllib/utils/test_utils.py +++ b/rllib/utils/test_utils.py @@ -822,6 +822,12 @@ def should_check_eval(experiment): # If an experiment passes, we'll remove it from this dict. experiments_to_run = experiments.copy() + # When running as a release test, use `/mnt/cluster_storage` as the storage path. + release_test_storage_path = "/mnt/cluster_storage" + if os.path.exists(release_test_storage_path): + for k, e in experiments_to_run.items(): + e["storage_path"] = release_test_storage_path + try: ray.init(address="auto") except ConnectionError: