Skip to content

Commit

Permalink
[air] pyarrow.fs persistence (6/n): Fix Trial + Experiment path…
Browse files Browse the repository at this point in the history
…s to use the `StorageContext` (ray-project#38057)

Signed-off-by: harborn <gangsheng.wu@intel.com>
  • Loading branch information
justinvyu authored and harborn committed Aug 17, 2023
1 parent 61b4548 commit be989b2
Show file tree
Hide file tree
Showing 15 changed files with 420 additions and 274 deletions.
27 changes: 22 additions & 5 deletions python/ray/air/_internal/uri_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ class URI:
Example Usage:
>>> s3_uri = URI("s3://bucket/a?scheme=http&endpoint_override=localhost%3A900")
>>> s3_uri = URI("s3://bucket/a?scheme=http&param=1")
>>> s3_uri
URI<s3://bucket/a?scheme=http&endpoint_override=localhost%3A900>
URI<s3://bucket/a?scheme=http&param=1>
>>> str(s3_uri / "b" / "c")
's3://bucket/a/b/c?scheme=http&endpoint_override=localhost%3A900'
's3://bucket/a/b/c?scheme=http&param=1'
>>> str(s3_uri.parent)
's3://bucket?scheme=http&endpoint_override=localhost%3A900'
's3://bucket?scheme=http&param=1'
>>> str(s3_uri)
's3://bucket/a?scheme=http&endpoint_override=localhost%3A900'
's3://bucket/a?scheme=http&param=1'
>>> s3_uri.parent.name, s3_uri.name
('bucket', 'a')
>>> local_path = URI("/tmp/local")
Expand All @@ -42,6 +42,23 @@ def __init__(self, uri: str):
else:
self._path = Path(os.path.normpath(self._parsed.netloc + self._parsed.path))

def rstrip_subpath(self, subpath: Path) -> "URI":
"""Returns a new URI that strips the given subpath from the end of this URI.
Example:
>>> uri = URI("s3://bucket/a/b/c/?param=1")
>>> str(uri.rstrip_subpath(Path("b/c")))
's3://bucket/a?param=1'
>>> uri = URI("/tmp/a/b/c/")
>>> str(uri.rstrip_subpath(Path("/b/c/.//")))
'/tmp/a'
"""
assert str(self._path).endswith(str(subpath)), (self._path, subpath)
stripped_path = str(self._path).replace(str(subpath), "")
return URI(self._get_str_representation(self._parsed, stripped_path))

@property
def name(self) -> str:
return self._path.name
Expand Down
61 changes: 43 additions & 18 deletions python/ray/train/_internal/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
from pathlib import Path
import shutil
from typing import Callable, Dict, List, Optional, Tuple, TYPE_CHECKING
from typing import Callable, Dict, List, Optional, Tuple, Type, Union, TYPE_CHECKING

try:
import fsspec
Expand Down Expand Up @@ -320,20 +320,25 @@ class StorageContext:
For example, on the driver, the storage context is initialized, only knowing
the experiment path. On the Trainable actor, the trial_dir_name is accessible.
Example with storage_path="mock:///bucket/path":
There are 2 types of paths:
1. *_fs_path: A path on the `storage_filesystem`. This is a regular path
which has been prefix-stripped by pyarrow.fs.FileSystem.from_uri and
can be joined with `os.path.join`.
2. *_local_path: The path on the local filesystem where results are saved to
before persisting to storage.
Example with storage_path="mock:///bucket/path?param=1":
>>> from ray.train._internal.storage import StorageContext
>>> import os
>>> os.environ["RAY_AIR_LOCAL_CACHE_DIR"] = "/tmp/ray_results"
>>> storage = StorageContext(
... storage_path="mock:///bucket/path",
... storage_path="mock://netloc/bucket/path?param=1",
... sync_config=SyncConfig(),
... experiment_dir_name="exp_name",
... )
>>> storage.storage_filesystem # Auto-resolved # doctest: +ELLIPSIS
<pyarrow._fs._MockFileSystem object...
>>> storage.experiment_path
'mock:///bucket/path/exp_name'
>>> storage.experiment_fs_path
'bucket/path/exp_name'
>>> storage.experiment_local_path
Expand All @@ -346,6 +351,10 @@ class StorageContext:
>>> storage.current_checkpoint_index = 1
>>> storage.checkpoint_fs_path
'bucket/path/exp_name/trial_dir/checkpoint_000001'
>>> storage.storage_prefix
URI<mock://netloc?param=1>
>>> str(storage.storage_prefix / storage.experiment_fs_path)
'mock://netloc/bucket/path/exp_name?param=1'
Example with storage_path=None:
Expand All @@ -361,8 +370,6 @@ class StorageContext:
'/tmp/ray_results'
>>> storage.storage_local_path
'/tmp/ray_results'
>>> storage.experiment_path
'/tmp/ray_results/exp_name'
>>> storage.experiment_local_path
'/tmp/ray_results/exp_name'
>>> storage.experiment_fs_path
Expand All @@ -371,7 +378,10 @@ class StorageContext:
True
>>> storage.storage_filesystem # Auto-resolved # doctest: +ELLIPSIS
<pyarrow._fs.LocalFileSystem object...
>>> storage.storage_prefix
URI<.>
>>> str(storage.storage_prefix / storage.experiment_fs_path)
'/tmp/ray_results/exp_name'
Internal Usage Examples:
- To copy files to the trial directory on the storage filesystem:
Expand Down Expand Up @@ -428,6 +438,18 @@ def __init__(
self.storage_fs_path,
) = pyarrow.fs.FileSystem.from_uri(self.storage_path)

# The storage prefix is the URI that remains after stripping the
# URI prefix away from the user-provided `storage_path` (using `from_uri`).
# Ex: `storage_path="s3://bucket/path?param=1`
# -> `storage_prefix=URI<s3://.?param=1>`
# See the doctests for more examples.
# This is used to construct URI's of the same format as `storage_path`.
# However, we don't track these URI's internally, because pyarrow only
# needs to interact with the prefix-stripped fs_path.
self.storage_prefix: URI = URI(self.storage_path).rstrip_subpath(
Path(self.storage_fs_path)
)

# Only initialize a syncer if a `storage_path` was provided.
self.syncer: Optional[Syncer] = (
_FilesystemSyncer(
Expand Down Expand Up @@ -524,16 +546,6 @@ def persist_current_checkpoint(self, checkpoint: "Checkpoint") -> "Checkpoint":
logger.debug(f"Checkpoint successfully created at: {uploaded_checkpoint}")
return uploaded_checkpoint

@property
def experiment_path(self) -> str:
"""The path the experiment directory, where the format matches the
original `storage_path` format specified by the user.
Ex: If the user passed in storage_path="s3://bucket/path?param=1", then
this property returns "s3://bucket/path/exp_name?param=1".
"""
return str(URI(self.storage_path) / self.experiment_dir_name)

@property
def experiment_fs_path(self) -> str:
"""The path on the `storage_filesystem` to the experiment directory.
Expand Down Expand Up @@ -595,6 +607,19 @@ def checkpoint_fs_path(self) -> str:
)
return os.path.join(self.trial_fs_path, checkpoint_dir_name)

@staticmethod
def get_experiment_dir_name(run_obj: Union[str, Callable, Type]) -> str:
from ray.tune.experiment import Experiment
from ray.tune.utils import date_str

run_identifier = Experiment.get_trainable_name(run_obj)

if bool(int(os.environ.get("TUNE_DISABLE_DATED_SUBDIR", 0))):
dir_name = run_identifier
else:
dir_name = "{}_{}".format(run_identifier, date_str())
return dir_name


_storage_context: Optional[StorageContext] = None

Expand Down
12 changes: 6 additions & 6 deletions python/ray/train/base_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,16 +598,16 @@ def fit(self) -> Result:
_entrypoint=AirEntrypoint.TRAINER,
)

experiment_path = Path(
TunerInternal.setup_create_experiment_checkpoint_dir(
trainable, self.run_config
)
experiment_local_path, _ = TunerInternal.setup_create_experiment_checkpoint_dir(
trainable, self.run_config
)
self._save(experiment_path)

experiment_local_path = Path(experiment_local_path)
self._save(experiment_local_path)

restore_msg = TrainingFailedError._RESTORE_MSG.format(
trainer_cls_name=self.__class__.__name__,
path=str(experiment_path),
path=str(experiment_local_path),
)

try:
Expand Down
126 changes: 84 additions & 42 deletions python/ray/train/tests/test_new_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,53 @@ def _resolve_storage_type(
yield storage_path, storage_filesystem


def _get_local_inspect_dir(
root_local_path: Path,
storage_path: str,
storage_local_path: Path,
storage_filesystem: Optional[pyarrow.fs.FileSystem],
) -> Tuple[Path, str]:
"""Downloads the storage path -> local dir for inspecting contents.
Returns:
Tuple: (local_inspect_dir, storage_fs_path), where storage_fs_path
is the path to the storage path on the filesystem (e.g., prefix stripped).
This is used to check the correctness of paths returned from `Result`'s,
since URIs are hard to do comparisons with.
"""
local_inspect_dir = root_local_path / "inspect"
if storage_path:
if storage_filesystem:
fs, storage_fs_path = storage_filesystem, storage_path
else:
fs, storage_fs_path = pyarrow.fs.FileSystem.from_uri(storage_path)
_download_from_fs_path(
fs=fs, fs_path=storage_fs_path, local_path=str(local_inspect_dir)
)
else:
fs, storage_fs_path = pyarrow.fs.LocalFileSystem(), str(storage_local_path)
local_inspect_dir = storage_local_path

return local_inspect_dir, storage_fs_path


def _convert_path_to_fs_path(
path: str, storage_filesystem: Optional[pyarrow.fs.FileSystem]
) -> str:
"""Converts a path to a (prefix-stripped) filesystem path.
Ex: "s3://bucket/path/to/file" -> "bucket/path/to/file"
Ex: "/mnt/nfs/path/to/file" -> "/mnt/nfs/bucket/path/to/file"
"""
if not storage_filesystem:
_, fs_path = pyarrow.fs.FileSystem.from_uri(path)
return fs_path

# Otherwise, we're using a custom filesystem,
# and the provided path is already the fs path.
return path


def train_fn(config):
in_trainer = config.get("in_trainer", False)
if in_trainer:
Expand Down Expand Up @@ -165,7 +212,7 @@ def test_tuner(monkeypatch, storage_path_type, tmp_path):
run_config=train.RunConfig(
storage_path=storage_path,
storage_filesystem=storage_filesystem,
name="simple_persistence_test",
name=exp_name,
verbose=0,
failure_config=train.FailureConfig(max_failures=1),
),
Expand All @@ -174,20 +221,30 @@ def test_tuner(monkeypatch, storage_path_type, tmp_path):
num_samples=NUM_TRIALS, max_concurrent_trials=1
),
)
tuner.fit()

local_inspect_dir = tmp_path / "inspect"
if storage_path:
if storage_filesystem:
fs, fs_path = storage_filesystem, storage_path
else:
fs, fs_path = pyarrow.fs.FileSystem.from_uri(storage_path)
_download_from_fs_path(
fs=fs, fs_path=fs_path, local_path=str(local_inspect_dir)
)
else:
local_inspect_dir = LOCAL_CACHE_DIR
result_grid = tuner.fit()

local_inspect_dir, storage_fs_path = _get_local_inspect_dir(
root_local_path=tmp_path,
storage_path=storage_path,
storage_local_path=LOCAL_CACHE_DIR,
storage_filesystem=storage_filesystem,
)

# First, check that the ResultGrid returns the correct paths.
experiment_fs_path = _convert_path_to_fs_path(
result_grid.experiment_path, storage_filesystem
)
assert experiment_fs_path == os.path.join(storage_fs_path, exp_name)
assert len(result_grid) == NUM_TRIALS
for result in result_grid:
trial_fs_path = _convert_path_to_fs_path(result.path, storage_filesystem)
assert trial_fs_path.startswith(experiment_fs_path)
# TODO(justinvyu): Trainable syncing of artifacts and checkpoints
# is not yet implemented for the new persistence path.
# for checkpoint, _ in result.best_checkpoints:
# assert checkpoint.path.startswith(trial_fs_path)

# Next, inspect the storage path contents.
assert len(list(local_inspect_dir.glob("*"))) == 1 # Only expect 1 experiment dir
exp_dir = local_inspect_dir / exp_name

Expand Down Expand Up @@ -230,7 +287,7 @@ def test_trainer(
├── progress.csv
├── result.json
├── checkpoint_000000
│ ├── checkpoint_shard-rank=0.pkl <- Worker checkpoint shards
│ ├── checkpoint_shard-rank=0.pkl <- Worker checkpoint shards
│ └── checkpoint_shard-rank=1.pkl
├── ...
├── artifact-rank=0-iter=0.txt <- Worker artifacts
Expand Down Expand Up @@ -264,29 +321,18 @@ def test_trainer(
)
result = trainer.fit()

local_inspect_dir = tmp_path / "inspect"
if storage_path:
if storage_filesystem:
fs, storage_fs_path = storage_filesystem, storage_path
else:
fs, storage_fs_path = pyarrow.fs.FileSystem.from_uri(storage_path)
_download_from_fs_path(
fs=fs, fs_path=storage_fs_path, local_path=str(local_inspect_dir)
)
else:
fs, storage_fs_path = pyarrow.fs.LocalFileSystem(), str(LOCAL_CACHE_DIR)
local_inspect_dir = LOCAL_CACHE_DIR
local_inspect_dir, storage_fs_path = _get_local_inspect_dir(
root_local_path=tmp_path,
storage_path=storage_path,
storage_local_path=LOCAL_CACHE_DIR,
storage_filesystem=storage_filesystem,
)

# First, inspect that the result object returns the correct paths.
# TODO(justinvyu): [custom_fs_path_expansion]
# This doesn't work for the `custom_fs` case right now
# because Result.path <- Trial.remote_path/local_path <- Experiment.path,
# which expands the storage path to an absolute path.
# We shouldn't expand the storage path to an absolute path if a custom fs is passed.
if not storage_filesystem:
_, trial_fs_path = pyarrow.fs.FileSystem.from_uri(result.path)
assert trial_fs_path.startswith(storage_fs_path)
assert result.checkpoint.path.startswith(trial_fs_path)
trial_fs_path = _convert_path_to_fs_path(result.path, storage_filesystem)
assert trial_fs_path.startswith(storage_fs_path)
for checkpoint, _ in result.best_checkpoints:
assert checkpoint.path.startswith(trial_fs_path)

# Second, inspect the contents of the storage path
assert len(list(local_inspect_dir.glob("*"))) == 1 # Only expect 1 experiment dir
Expand All @@ -313,12 +359,8 @@ def test_trainer(

# NOTE: These next 2 are technically synced by the driver.
# TODO(justinvyu): In a follow-up PR, artifacts will be synced by the workers.
# TODO(justinvyu): [custom_fs_path_expansion] Same issue as above.
if not storage_filesystem:
assert (
len(list(trial_dir.glob("artifact-*"))) == NUM_ITERATIONS * NUM_WORKERS
)
assert len(list(trial_dir.glob(EXPR_RESULT_FILE))) == 1
assert len(list(trial_dir.glob("artifact-*"))) == NUM_ITERATIONS * NUM_WORKERS
assert len(list(trial_dir.glob(EXPR_RESULT_FILE))) == 1


if __name__ == "__main__":
Expand Down
3 changes: 2 additions & 1 deletion python/ray/tune/execution/tune_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,8 @@ def experiment_state_path(self) -> str:
@property
def experiment_path(self) -> str:
if _use_storage_context():
return self._storage.experiment_path
return str(self._storage.storage_prefix / self._storage.experiment_fs_path)

return self._legacy_remote_experiment_path or self._legacy_local_experiment_path

def _create_checkpoint_manager(self):
Expand Down
Loading

0 comments on commit be989b2

Please sign in to comment.