From 4d74e7b7449293467ff684568035c43eba3a33e0 Mon Sep 17 00:00:00 2001 From: Chongxiao Cao Date: Thu, 17 Jun 2021 14:37:22 -0500 Subject: [PATCH] Revert "Refactored FilesystemStore to use fsspec to support additional remote filesystems (#2927)" This reverts commit 7a697111eef7d88899551c176e31cde5ab61545c. --- docs/mocks.py | 3 - horovod/spark/common/store.py | 183 ++++++++++++++---------------- horovod/spark/keras/remote.py | 2 - horovod/spark/lightning/remote.py | 1 - horovod/spark/torch/remote.py | 2 - setup.py | 2 +- test/integration/test_spark.py | 8 +- 7 files changed, 91 insertions(+), 110 deletions(-) diff --git a/docs/mocks.py b/docs/mocks.py index 793e0fd9cc..32c2e8512e 100644 --- a/docs/mocks.py +++ b/docs/mocks.py @@ -37,9 +37,6 @@ def _dummy(): 'h5py', 'psutil', - 'fsspec', - 'fsspec.core', - 'pyarrow', 'pyarrow.parquet', diff --git a/horovod/spark/common/store.py b/horovod/spark/common/store.py index cb93301b08..e5b893722c 100644 --- a/horovod/spark/common/store.py +++ b/horovod/spark/common/store.py @@ -16,7 +16,6 @@ import contextlib import errno import os -import pathlib import re import shutil import tempfile @@ -27,9 +26,6 @@ import pyarrow as pa import pyarrow.parquet as pq -import fsspec -from fsspec.core import split_protocol - from horovod.spark.common.util import is_databricks @@ -159,28 +155,26 @@ def create(prefix_path, *args, **kwargs): elif is_databricks() and DBFSLocalStore.matches_dbfs(prefix_path): return DBFSLocalStore(prefix_path, *args, **kwargs) else: - return FilesystemStore(prefix_path, *args, **kwargs) + return LocalStore(prefix_path, *args, **kwargs) -class AbstractFilesystemStore(Store): +class FilesystemStore(Store): """Abstract class for stores that use a filesystem for underlying storage.""" - def __init__(self, prefix_path, train_path=None, val_path=None, test_path=None, - runs_path=None, save_runs=True, storage_options=None, **kwargs): + def __init__(self, prefix_path, train_path=None, val_path=None, test_path=None, runs_path=None, save_runs=True): self.prefix_path = self.get_full_path(prefix_path) self._train_path = self._get_full_path_or_default(train_path, 'intermediate_train_data') self._val_path = self._get_full_path_or_default(val_path, 'intermediate_val_data') self._test_path = self._get_full_path_or_default(test_path, 'intermediate_test_data') self._runs_path = self._get_full_path_or_default(runs_path, 'runs') self._save_runs = save_runs - self.storage_options = storage_options - super().__init__() + super(FilesystemStore, self).__init__() def exists(self, path): - return self.fs.exists(self.get_localized_path(path)) or self.fs.isdir(path) + return self.get_filesystem().exists(self.get_localized_path(path)) def read(self, path): - with self.fs.open(self.get_localized_path(path), 'rb') as f: + with self.get_filesystem().open(self.get_localized_path(path), 'rb') as f: return f.read() def read_serialized_keras_model(self, ckpt_path, model, custom_objects): @@ -199,7 +193,7 @@ def read_serialized_keras_model(self, ckpt_path, model, custom_objects): return codec.dumps_base64(model_bytes) def write_text(self, path, text): - with self.fs.open(self.get_localized_path(path), 'w') as f: + with self.get_filesystem().open(self.get_localized_path(path), 'w') as f: f.write(text) def is_parquet_dataset(self, path): @@ -210,7 +204,7 @@ def is_parquet_dataset(self, path): return False def get_parquet_dataset(self, path): - return pq.ParquetDataset(self.get_localized_path(path), filesystem=self.fs) + return pq.ParquetDataset(self.get_localized_path(path), filesystem=self.get_filesystem()) def get_train_data_path(self, idx=None): return '{}.{}'.format(self._train_path, idx) if idx is not None else self._train_path @@ -243,7 +237,7 @@ def get_checkpoint_path(self, run_id): def get_checkpoints(self, run_id, suffix='.ckpt'): checkpoint_dir = self.get_localized_path(self.get_checkpoint_path(run_id)) - filenames = self.fs.ls(checkpoint_dir) + filenames = self.get_filesystem().ls(checkpoint_dir) return sorted([name for name in filenames if name.endswith(suffix)]) def get_logs_path(self, run_id): @@ -256,6 +250,23 @@ def get_checkpoint_filename(self): def get_logs_subdir(self): return 'logs' + def get_full_path(self, path): + if not self.matches(path): + return self.path_prefix() + path + return path + + def get_localized_path(self, path): + if self.matches(path): + return path[len(self.path_prefix()):] + return path + + def get_full_path_fn(self): + prefix = self.path_prefix() + + def get_path(path): + return prefix + path + return get_path + def _get_full_path_or_default(self, path, default_key): if path is not None: return self.get_full_path(path) @@ -264,90 +275,66 @@ def _get_full_path_or_default(self, path, default_key): def _get_path(self, key): return os.path.join(self.prefix_path, key) - def get_local_output_dir_fn(self, run_id): - @contextlib.contextmanager - def local_run_path(): - with tempfile.TemporaryDirectory() as tmpdir: - yield tmpdir - return local_run_path - - def get_localized_path(self, path): + def path_prefix(self): raise NotImplementedError() - def get_full_path(self, path): + def get_filesystem(self): raise NotImplementedError() - def get_full_path_fn(self): - raise NotImplementedError() + @classmethod + def matches(cls, path): + return path.startswith(cls.filesystem_prefix()) - @property - def fs(self): + @classmethod + def filesystem_prefix(cls): raise NotImplementedError() -class FilesystemStore(AbstractFilesystemStore): - """Concrete filesystems store that delegates to `fsspec`.""" - - def __init__(self, prefix_path, *args, **kwargs): - self.storage_options = kwargs['storage_options'] if 'storage_options' in kwargs else {} - self.prefix_path = prefix_path - self._fs, self.protocol = self._get_fs_and_protocol() - std_params = ['train_path', 'val_path', 'test_path', 'runs_path', 'save_runs', 'storage_options'] - params = dict((k, kwargs[k]) for k in std_params if k in kwargs) - super().__init__(prefix_path, *args, **params) +class LocalStore(FilesystemStore): + """Uses the local filesystem as a store of intermediate data and training artifacts.""" - def sync_fn(self, run_id): - run_path = self.get_run_path(run_id) + FS_PREFIX = 'file://' - def fn(local_run_path): - self.fs.put(local_run_path, run_path, recursive=True, overwrite=True) + def __init__(self, prefix_path, *args, **kwargs): + self._fs = pa.LocalFileSystem() + super(LocalStore, self).__init__(prefix_path, *args, **kwargs) - return fn + def path_prefix(self): + return self.FS_PREFIX def get_filesystem(self): - return self.fs - - def get_localized_path(self, path): - _, lpath = split_protocol(path) - return lpath - - def get_full_path(self, path): - return self.get_full_path_fn()(path) - - def get_full_path_fn(self): - def get_path(path): - protocol, _ = split_protocol(path) - if protocol is not None: - return path - return pathlib.Path(os.path.abspath(path)).as_uri() - return get_path - - @property - def fs(self): return self._fs - #@staticmethod - def _get_fs_and_protocol(self): - protocol, path = split_protocol(self.prefix_path) - fs = fsspec.filesystem(protocol, **self.storage_options) - return fs, protocol + def get_local_output_dir_fn(self, run_id): + run_path = self.get_localized_path(self.get_run_path(run_id)) - @classmethod - def matches(cls, path): - return True + @contextlib.contextmanager + def local_run_path(): + if not os.path.exists(run_path): + try: + os.makedirs(run_path, mode=0o755) + except OSError as e: + # Race condition from workers on the same host: ignore + if e.errno != errno.EEXIST: + raise + yield run_path + return local_run_path -class LocalStore(FilesystemStore): - """Uses the local filesystem as a store of intermediate data and training artifacts. + def sync_fn(self, run_id): + run_path = self.get_localized_path(self.get_run_path(run_id)) - This class is deprecated and now just resolves to FilesystemStore. - """ + def fn(local_run_path): + # No-op for LocalStore since the `local_run_path` will be the same as the run path + assert run_path == local_run_path + return fn - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + @classmethod + def filesystem_prefix(cls): + return cls.FS_PREFIX -class HDFSStore(AbstractFilesystemStore): +class HDFSStore(FilesystemStore): """Uses HDFS as a store of intermediate data and training artifacts. Initialized from a `prefix_path` that can take one of the following forms: @@ -371,7 +358,9 @@ class HDFSStore(AbstractFilesystemStore): def __init__(self, prefix_path, host=None, port=None, user=None, kerb_ticket=None, - driver='libhdfs', extra_conf=None, *args, **kwargs): + driver='libhdfs', extra_conf=None, temp_dir=None, *args, **kwargs): + self._temp_dir = temp_dir + prefix, url_host, url_port, path, path_offset = self.parse_url(prefix_path) self._check_url(prefix_path, prefix, path) self._url_prefix = prefix_path[:path_offset] if prefix else self.FS_PREFIX @@ -402,21 +391,24 @@ def parse_url(self, url): path_offset = match.start(4) return prefix, host, port, path, path_offset - def get_full_path(self, path): - if not self.matches(path): - return self._url_prefix + path - return path + def path_prefix(self): + return self._url_prefix - def get_full_path_fn(self): - prefix = self._url_prefix + def get_filesystem(self): + return self._hdfs - def get_path(path): - return prefix + path - return get_path + def get_local_output_dir_fn(self, run_id): + temp_dir = self._temp_dir - @property - def fs(self): - return self._hdfs + @contextlib.contextmanager + def local_run_path(): + dirpath = tempfile.mkdtemp(dir=temp_dir) + try: + yield dirpath + finally: + shutil.rmtree(dirpath) + + return local_run_path def sync_fn(self, run_id): class SyncState(object): @@ -472,18 +464,13 @@ def _check_url(self, url, prefix, path): if not path: raise ValueError('Failed to parse path from URL: {}'.format(url)) - - def get_localized_path(self, path): - if self.matches(path): - return path[len(self._url_prefix):] - return path @classmethod - def matches(cls, path): - return path.startswith(cls.FS_PREFIX) + def filesystem_prefix(cls): + return cls.FS_PREFIX -class DBFSLocalStore(FilesystemStore): +class DBFSLocalStore(LocalStore): """Uses Databricks File System (DBFS) local file APIs as a store of intermediate data and training artifacts. diff --git a/horovod/spark/keras/remote.py b/horovod/spark/keras/remote.py index 569600a42b..d92cbba5fe 100644 --- a/horovod/spark/keras/remote.py +++ b/horovod/spark/keras/remote.py @@ -218,7 +218,6 @@ def train(serialized_model, train_rows, val_rows, avg_row_size): hdfs_driver=PETASTORM_HDFS_DRIVER, schema_fields=schema_fields, transform_spec=transform_spec, - storage_options=store.storage_options, **reader_factory_kwargs) as train_reader: with reader_factory(remote_store.val_data_path, num_epochs=1, @@ -229,7 +228,6 @@ def train(serialized_model, train_rows, val_rows, avg_row_size): hdfs_driver=PETASTORM_HDFS_DRIVER, schema_fields=schema_fields, transform_spec=transform_spec, - storage_options=store.storage_options, **reader_factory_kwargs) \ if should_validate else empty_batch_reader() as val_reader: diff --git a/horovod/spark/lightning/remote.py b/horovod/spark/lightning/remote.py index 7d0368473c..133784f5dd 100644 --- a/horovod/spark/lightning/remote.py +++ b/horovod/spark/lightning/remote.py @@ -264,7 +264,6 @@ def set_data_loader(model, data_path, dataloader_attr, reader_worker_count, read hdfs_driver=PETASTORM_HDFS_DRIVER, schema_fields=schema_fields, transform_spec=transform_spec, - storage_options=store.storage_options, **reader_factory_kwargs) as reader: def dataloader_fn(): return data_loader_cls(reader=reader, batch_size=batch_size, diff --git a/horovod/spark/torch/remote.py b/horovod/spark/torch/remote.py index d9b2944789..46d8ca928c 100644 --- a/horovod/spark/torch/remote.py +++ b/horovod/spark/torch/remote.py @@ -236,7 +236,6 @@ def save_checkpoint(): hdfs_driver=PETASTORM_HDFS_DRIVER, schema_fields=schema_fields, transform_spec=transform_spec, - storage_options=store.storage_options, **reader_factory_kwargs) as train_reader: with reader_factory(remote_store.val_data_path, num_epochs=None, @@ -247,7 +246,6 @@ def save_checkpoint(): hdfs_driver=PETASTORM_HDFS_DRIVER, schema_fields=schema_fields, transform_spec=transform_spec, - storage_options=store.storage_options, **reader_factory_kwargs) \ if should_validate else empty_batch_reader() as val_reader: diff --git a/setup.py b/setup.py index a9280dbda4..bc877ffbd6 100644 --- a/setup.py +++ b/setup.py @@ -112,7 +112,7 @@ def build_extensions(self): pyspark_require_list = ['pyspark>=2.3.2;python_version<"3.8"', 'pyspark>=3.0.0;python_version>="3.8"'] # Pin h5py: https://github.com/h5py/h5py/issues/1732 -spark_require_list = ['h5py<3', 'numpy', 'petastorm>=0.11.0', 'pyarrow>=0.15.0', 'fsspec'] +spark_require_list = ['h5py<3', 'numpy', 'petastorm>=0.11.0', 'pyarrow>=0.15.0'] ray_require_list = ['ray'] pytorch_spark_require_list = pytorch_require_list + \ spark_require_list + \ diff --git a/test/integration/test_spark.py b/test/integration/test_spark.py index bbca54e491..90dd1c9932 100644 --- a/test/integration/test_spark.py +++ b/test/integration/test_spark.py @@ -15,7 +15,6 @@ import contextlib import copy -from horovod.spark.common.store import FilesystemStore import io import itertools import logging @@ -1564,7 +1563,7 @@ def test_hdfs_store_parse_url(self, mock_get_filesystem_fn): # Case 1: full path hdfs_root = 'hdfs://namenode01:8020/user/test/output' store = HDFSStore(hdfs_root) - assert store.get_full_path("/output") == 'hdfs://namenode01:8020/output', hdfs_root + assert store.path_prefix() == 'hdfs://namenode01:8020', hdfs_root assert store.get_full_path('/user/test/output') == 'hdfs://namenode01:8020/user/test/output', hdfs_root assert store.get_localized_path('hdfs://namenode01:8020/user/test/output') == '/user/test/output', hdfs_root assert store._hdfs_kwargs['host'] == 'namenode01', hdfs_root @@ -1573,6 +1572,7 @@ def test_hdfs_store_parse_url(self, mock_get_filesystem_fn): # Case 2: no host and port hdfs_root = 'hdfs:///user/test/output' store = HDFSStore(hdfs_root) + assert store.path_prefix() == 'hdfs://', hdfs_root assert store.get_full_path('/user/test/output') == 'hdfs:///user/test/output', hdfs_root assert store.get_localized_path('hdfs:///user/test/output') == '/user/test/output', hdfs_root assert store._hdfs_kwargs['host'] == 'default', hdfs_root @@ -1581,6 +1581,7 @@ def test_hdfs_store_parse_url(self, mock_get_filesystem_fn): # Case 3: no prefix hdfs_root = '/user/test/output' store = HDFSStore(hdfs_root) + assert store.path_prefix() == 'hdfs://', hdfs_root assert store.get_full_path('/user/test/output') == 'hdfs:///user/test/output', hdfs_root assert store.get_localized_path('hdfs:///user/test/output') == '/user/test/output', hdfs_root assert store._hdfs_kwargs['host'] == 'default', hdfs_root @@ -1589,6 +1590,7 @@ def test_hdfs_store_parse_url(self, mock_get_filesystem_fn): # Case 4: no namespace hdfs_root = 'hdfs://namenode01:8020/user/test/output' store = HDFSStore(hdfs_root) + assert store.path_prefix() == 'hdfs://namenode01:8020', hdfs_root assert store.get_full_path('/user/test/output') == 'hdfs://namenode01:8020/user/test/output', hdfs_root assert store.get_localized_path('hdfs://namenode01:8020/user/test/output') == '/user/test/output', hdfs_root assert store._hdfs_kwargs['host'] == 'namenode01', hdfs_root @@ -1726,7 +1728,7 @@ def test_dbfs_local_store(self): # test Store.create will not create DBFSLocalStore on non-databricks environment local_store = Store.create("/dbfs/tmp/test_local_dir") - assert isinstance(local_store, FilesystemStore) + assert isinstance(local_store, LocalStore) # test Store.create will create DBFSLocalStore on databricks environment try: