Skip to content

Commit

Permalink
[AIR] Remove head node syncing as the default storage option (ray-pro…
Browse files Browse the repository at this point in the history
…ject#37142)

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: harborn <gangsheng.wu@intel.com>
  • Loading branch information
justinvyu authored and harborn committed Aug 17, 2023
1 parent 97931c1 commit 21542a6
Show file tree
Hide file tree
Showing 26 changed files with 358 additions and 40 deletions.
48 changes: 42 additions & 6 deletions doc/source/ray-air/examples/dolly_lightning_fsdp_finetuning.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
Expand Down Expand Up @@ -321,11 +322,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"## Fine-tune with LightningTrainer\n",
"\n",
"```{note}\n",
"Here we save the checkpoints to the local file system. You can also upload the checkpoints to cloud storage by setting S3 bucket URI to {class}`air.RunConfig(storage_path=S3_BUCKET_URI) <ray.air.RunConfig>`. See {ref}`train-run-config` for an example.\n",
"```"
"## Fine-tune with LightningTrainer"
]
},
{
Expand Down Expand Up @@ -387,6 +384,43 @@
"lightning_config.trainer(callbacks=[DollyV2ProgressBar(num_iters_per_epoch)])"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"```{note}\n",
"Since this example runs with multiple nodes, we need to persist checkpoints\n",
"and other outputs to some external storage for access after training has completed.\n",
"**You should set up cloud storage or NFS, then replace `storage_path` with your own cloud bucket URI or NFS path.**\n",
"\n",
"See the [storage guide](tune-storage-options) for more details.\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"storage_path=\"s3://your-bucket-here\" # TODO: Set up cloud storage\n",
"# storage_path=\"/mnt/path/to/nfs\" # TODO: Alternatively, set up NFS"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"remove-cell"
]
},
"outputs": [],
"source": [
"storage_path = \"/mnt/cluster_storage\""
]
},
{
"cell_type": "code",
"execution_count": 9,
Expand Down Expand Up @@ -921,9 +955,10 @@
"from ray.tune.syncer import SyncConfig\n",
"# Save AIR checkpoints according to the performance on validation set\n",
"run_config = RunConfig(\n",
" storage_path=storage_path,\n",
" name=\"finetune_dolly-v2-7b\",\n",
" checkpoint_config=CheckpointConfig(),\n",
" sync_config=SyncConfig(sync_artifacts=False)\n",
" sync_config=SyncConfig(sync_artifacts=False),\n",
")\n",
"\n",
"# Scale the DDP training workload across 16 GPUs\n",
Expand Down Expand Up @@ -954,6 +989,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
Expand Down
32 changes: 30 additions & 2 deletions doc/source/ray-air/examples/gptj_deepspeed_fine_tuning.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -582,18 +582,45 @@
"We pass the preprocessors we have defined earlier as an argument, wrapped in a {class}`~ray.data.preprocessors.chain.Chain`. The preprocessor will be included with the returned {class}`~ray.air.checkpoint.Checkpoint`, meaning it will also be applied during inference.\n",
"\n",
"```{note}\n",
"If you want to upload checkpoints to cloud storage (eg. S3), set {class}`air.RunConfig(storage_path) <ray.air.RunConfig>`. See {ref}`train-run-config` for an example. Using cloud storage is highly recommended, especially for production.\n",
"Since this example runs with multiple nodes, we need to persist checkpoints\n",
"and other outputs to some external storage for access after training has completed.\n",
"**You should set up cloud storage or NFS, then replace `storage_path` with your own cloud bucket URI or NFS path.**\n",
"\n",
"See the [storage guide](tune-storage-options) for more details.\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"storage_path=\"s3://your-bucket-here\" # TODO: Set up cloud storage\n",
"# storage_path=\"/mnt/path/to/nfs\" # TODO: Alternatively, set up NFS"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"remove-cell"
]
},
"outputs": [],
"source": [
"storage_path = \"/mnt/cluster_storage\""
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"from ray.train.huggingface import TransformersTrainer\n",
"from ray.air.config import ScalingConfig\n",
"from ray.air import RunConfig, ScalingConfig\n",
"from ray.data.preprocessors import Chain\n",
"\n",
"\n",
Expand All @@ -610,6 +637,7 @@
" ),\n",
" datasets={\"train\": ray_datasets[\"train\"], \"evaluation\": ray_datasets[\"validation\"]},\n",
" preprocessor=Chain(splitter, tokenizer),\n",
" run_config=RunConfig(storage_path=storage_path),\n",
")"
]
},
Expand Down
9 changes: 7 additions & 2 deletions doc/source/templates/02_many_model_training/start.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
"from statsforecast.models import AutoARIMA, AutoETS, MSTL\n",
"\n",
"from ray import tune\n",
"from ray.air import session\n"
"from ray.air import session, RunConfig\n"
]
},
{
Expand Down Expand Up @@ -265,7 +265,12 @@
"metadata": {},
"outputs": [],
"source": [
"tuner = tune.Tuner(trainable, param_space=param_space)\n",
"tuner = tune.Tuner(\n",
" trainable,\n",
" param_space=param_space,\n",
" # Experiment results are saved to a shared filesystem available to all nodes.\n",
" run_config=RunConfig(storage_path=\"/mnt/cluster_storage\"),\n",
")\n",
"result_grid = tuner.fit()\n"
]
},
Expand Down
6 changes: 6 additions & 0 deletions python/ray/air/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,16 @@
# as Trainable)
DISABLE_LAZY_CHECKPOINTING_ENV = "TRAIN_DISABLE_LAZY_CHECKPOINTING"

# TODO(ml-team): [Deprecation - head node syncing]
# Whether or not the sync-to-head behavior is enabled by default.
# If unset, running AIR on a multi-node cluster with checkpointing will raise
# an error telling the user to switch to cloud/NFS.
REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE = "RAY_AIR_REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE"

# NOTE: When adding a new environment variable, please track it in this list.
# TODO(ml-team): Most env var constants should get moved here.
AIR_ENV_VARS = {
REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE,
COPY_DIRECTORY_CHECKPOINTS_INSTEAD_OF_MOVING_ENV,
DISABLE_LAZY_CHECKPOINTING_ENV,
"RAY_AIR_FULL_TRACEBACKS",
Expand Down
5 changes: 3 additions & 2 deletions python/ray/train/examples/pytorch/torch_linear_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import torch.nn as nn
import ray.train as train
from ray.train.torch import TorchTrainer, TorchCheckpoint
from ray.air.config import ScalingConfig
from ray.air.config import RunConfig, ScalingConfig


class LinearDataset(torch.utils.data.Dataset):
Expand Down Expand Up @@ -85,12 +85,13 @@ def train_func(config):
return results


def train_linear(num_workers=2, use_gpu=False, epochs=3):
def train_linear(num_workers=2, use_gpu=False, epochs=3, storage_path=None):
config = {"lr": 1e-2, "hidden_size": 1, "batch_size": 4, "epochs": epochs}
trainer = TorchTrainer(
train_loop_per_worker=train_func,
train_loop_config=config,
scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
run_config=RunConfig(storage_path=storage_path),
)
result = trainer.fit()

Expand Down
8 changes: 6 additions & 2 deletions python/ray/train/examples/tf/tensorflow_mnist_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from ray.train.tensorflow import TensorflowTrainer
from ray.air.integrations.keras import ReportCheckpointCallback
from ray.air.config import ScalingConfig
from ray.air.config import RunConfig, ScalingConfig


def mnist_dataset(batch_size: int) -> tf.data.Dataset:
Expand Down Expand Up @@ -79,13 +79,17 @@ def train_func(config: dict):


def train_tensorflow_mnist(
num_workers: int = 2, use_gpu: bool = False, epochs: int = 4
num_workers: int = 2,
use_gpu: bool = False,
epochs: int = 4,
storage_path: str = None,
) -> Result:
config = {"lr": 1e-3, "batch_size": 64, "epochs": epochs}
trainer = TensorflowTrainer(
train_loop_per_worker=train_func,
train_loop_config=config,
scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
run_config=RunConfig(storage_path=storage_path),
)
results = trainer.fit()
return results
Expand Down
83 changes: 68 additions & 15 deletions python/ray/tune/syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@
delete_at_uri,
is_non_local_path_uri,
)
from ray.air.constants import LAZY_CHECKPOINT_MARKER_FILE, TRAINING_ITERATION
from ray.air.constants import (
LAZY_CHECKPOINT_MARKER_FILE,
REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE,
TRAINING_ITERATION,
)
from ray.exceptions import RayActorError
from ray.tune import TuneError
from ray.tune.callback import Callback
Expand Down Expand Up @@ -75,6 +79,30 @@
f"./{LAZY_CHECKPOINT_MARKER_FILE}",
]

_SYNC_TO_HEAD_DEPRECATION_MESSAGE = (
"Ray AIR no longer supports the synchronization of checkpoints and other "
"artifacts from worker nodes to the head node. This means that the "
"checkpoints and artifacts saved by trials scheduled on worker nodes will not be "
"accessible during the run (e.g., resuming from a checkpoint "
"after a failure) or after the run "
"(e.g., loading the checkpoint of a trial that ran on an already "
"terminated worker node).\n\n"
"To fix this issue, configure AIR to use either:\n"
"(1) Cloud storage: `RunConfig(storage_path='s3://your/bucket')`\n"
"(2) A network filesystem mounted on all nodes: "
"`RunConfig(storage_path='/mnt/path/to/nfs_storage')`\n"
"See this Github issue for more details on transitioning to cloud storage/NFS "
"as well as an explanation on why this functionality is "
"being removed: https://github.com/ray-project/ray/issues/37177\n\n"
"Other temporary workarounds:\n"
"- If you want to avoid errors/warnings and continue running with "
"syncing explicitly turned off, set `RunConfig(SyncConfig(syncer=None))`\n"
"- Or, to re-enable the head node syncing behavior, set the "
f"environment variable {REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE}=1\n"
" - **Note that this functionality will tentatively be hard-deprecated in "
"Ray 2.7.** See the linked issue for the latest information."
)


@PublicAPI
@dataclass
Expand Down Expand Up @@ -790,14 +818,6 @@ def _sync_trial_dir(
if not self._enabled or trial.uses_cloud_checkpointing:
return False

sync_process = self._get_trial_sync_process(trial)

# Always run if force=True
# Otherwise, only run if we should sync (considering sync period)
# and if there is no sync currently still running.
if not force and (not self._should_sync(trial) or sync_process.is_running):
return False

source_ip = self._trial_ips.get(trial.trial_id, None)

if not source_ip:
Expand All @@ -815,6 +835,22 @@ def _sync_trial_dir(

self._trial_ips[trial.trial_id] = source_ip

if not bool(int(os.environ.get(REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE, "0"))):
# Only log a warning for remote trials, since
# this only affects artifacts that are saved on worker nodes.
if source_ip != ray.util.get_node_ip_address():
if log_once("deprecated_head_node_sync"):
logger.warning(_SYNC_TO_HEAD_DEPRECATION_MESSAGE)
return False

sync_process = self._get_trial_sync_process(trial)

# Always run if force=True
# Otherwise, only run if we should sync (considering sync period)
# and if there is no sync currently still running.
if not force and (not self._should_sync(trial) or sync_process.is_running):
return False

try:
sync_process.wait()
except TuneError as e:
Expand Down Expand Up @@ -887,16 +923,33 @@ def on_checkpoint(
checkpoint: _TrackedCheckpoint,
**info,
):
if not self._enabled or trial.uses_cloud_checkpointing:
return

if checkpoint.storage_mode == CheckpointStorage.MEMORY:
return

if self._sync_trial_dir(
trial, force=trial.sync_on_checkpoint, wait=True
) and not os.path.exists(checkpoint.dir_or_data):
raise TuneError(
f"Trial {trial}: Checkpoint path {checkpoint.dir_or_data} not "
"found after successful sync down."
if not bool(int(os.environ.get(REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE, "0"))):
# If we have saved a checkpoint, but it's not accessible on the driver,
# that means that it lives on some other node and would be synced to head
# prior to Ray 2.6.
if not os.path.exists(checkpoint.dir_or_data):
raise DeprecationWarning(_SYNC_TO_HEAD_DEPRECATION_MESSAGE)
# else:
# No need to raise an error about syncing, since the driver can find
# the checkpoint, because either:
# - the checkpoint lives on the head node
# - a shared filesystem is used
else:
# Old head node syncing codepath
synced = self._sync_trial_dir(
trial, force=trial.sync_on_checkpoint, wait=True
)
if synced and not os.path.exists(checkpoint.dir_or_data):
raise TuneError(
f"Trial {trial}: Checkpoint path {checkpoint.dir_or_data} not "
"found after successful sync down."
)

def wait_for_all(self):
# Remove any sync processes as needed, and only wait on the remaining ones.
Expand Down
7 changes: 6 additions & 1 deletion python/ray/tune/tests/test_multinode_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import ray
from ray import tune
from ray.air.config import CheckpointConfig
from ray.air.constants import REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE
from ray.air.util.node import _force_on_node
from ray.autoscaler._private.fake_multi_node.node_provider import FAKE_HEAD_NODE_ID
from ray.autoscaler._private.fake_multi_node.test_utils import DockerCluster
Expand Down Expand Up @@ -206,7 +207,11 @@ def testCheckpointSync(self):
)
# Connect via Ray client and wait until all nodes are there
self.cluster.start()
self.cluster.connect(client=True, timeout=120)
self.cluster.connect(
client=True,
timeout=120,
runtime_env={"env_vars": {REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE: "1"}},
)
self.cluster.wait_for_resources({"CPU": 12})

# This train function trains for 10 iterations per run
Expand Down
Loading

0 comments on commit 21542a6

Please sign in to comment.