Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored FilesystemStore to use fsspec to support additional remote filesystems #2927

Merged
merged 5 commits into from Jun 2, 2021

Conversation

tgaddair
Copy link
Collaborator

@tgaddair tgaddair commented May 21, 2021

This adds support for various cloud storage systems like S3, GCS, ADLS, etc. in the Spark Estimators.

Fixes #2905.
Fixes #1732.

Signed-off-by: Travis Addair <tgaddair@gmail.com>
@github-actions
Copy link

github-actions bot commented May 21, 2021

Unit Test Results

     792 files  ±0       792 suites  ±0   6h 8m 13s ⏱️ ±0s
     600 tests ±0       564 ✔️ ±0       35 💤 ±0  0 ❌ ±0  1 🔥 ±0 
16 473 runs  ±0  12 439 ✔️ ±0  4 033 💤 ±0  0 ❌ ±0  1 🔥 ±0 

For more details on these errors, see this check.

Results for commit 7a69711. ± Comparison against base commit 7a69711.

♻️ This comment has been updated with latest results.

@staticmethod
def _get_fs_and_protocol(url):
protocol, path = split_protocol(url)
fs = fsspec.filesystem(protocol)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can storage_options be provided as second argument [fsspec.filesystem(protocol, **storage_options)] ? This will help provide authentication options to fsspec.


def __init__(self, prefix_path, *args, **kwargs):
self._fs = pa.LocalFileSystem()
super(LocalStore, self).__init__(prefix_path, *args, **kwargs)
self._fs, self.protocol = FilesystemStore._get_fs_and_protocol(prefix_path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be added ?:
self.storage_options = kwargs['storage_options'] if 'storage_options' in kwargs else dict()
self._fs, self.protocol = FilesystemStore._get_fs_and_protocol(prefix_path, self.storage_options)

@umashankark
Copy link
Contributor

Also, adding storage_options dict param to HorovodEstimator (EstimatorParams ?), should help RemoteTrainer()s to pass them to make_reader & make_batch_reader Petastorm calls.

@umashankark
Copy link
Contributor

I am testing this change with ADLS; I will update with results.

@tgaddair
Copy link
Collaborator Author

Thanks @umashankark, I hope to be able to follow-up on your suggestions soon (maybe this weekend). Just got a few other things I need to take care of first. Thanks for your patience!

@umashankark
Copy link
Contributor

umashankark commented May 29, 2021

  1. I generated xor dataset using spark_common.create_xor_data() and copied to ADLS.
  2. Took "xor" model estimator (from testcases).
  3. Used FsspecStore with minor changes (added_storage_options, and used same storage_options for make_reader,make_batch_reader access in remote.py) .
    Code:
    store = Store.create('abfs://container/', storage_options=storage_options,
    train_path='abfs://container/xor-data', save_runs=True)
    torch_estimator = hvd_spark.TorchEstimator(
    num_proc=1,
    store=store,
    model=model,
    optimizer=optimizer,
    loss=loss,
    input_shapes=[[1],[1]],
    feature_cols=['x1','x2'],
    label_cols=['y'],
    batch_size=1,
    epochs=10,
    verbose=1,)
    torch_model = torch_estimator.fit_on_parquet()

Getting this error only when Estimator(save_runs=True):
[0]: File "/home/lll/tests/final-hvd/venv/lib/python3.8/site-packages/horovod-0.22.0-py3.8-linux-x86_64.egg/horovod/spark/torch/remote.py", line 401, in train
[0]: remote_store.sync(run_output_dir)
[0]: File "/home/lll/tests/final-hvd/venv/lib/python3.8/site-packages/horovod-0.22.0-py3.8-linux-x86_64.egg/horovod/spark/common/store.py", line 307, in fn
[0]: self.fs.put(local_run_path, run_path, recursive=True)
[0]: File "/home/lll/tests/final-hvd/venv/lib/python3.8/site-packages/fsspec/asyn.py", line 72, in wrapper
[0]: return sync(self.loop, func, *args, **kwargs)
[0]: File "/home/lll/tests/final-hvd/venv/lib/python3.8/site-packages/fsspec/asyn.py", line 53, in sync
[0]: raise result[0]
[0]: File "/home/lll/tests/final-hvd/venv/lib/python3.8/site-packages/fsspec/asyn.py", line 20, in _runner
[0]: result[0] = await coro
[0]: File "/home/lll/tests/final-hvd/venv/lib/python3.8/site-packages/fsspec/asyn.py", line 283, in _put
[0]: return await _throttled_gather(
[0]: File "/home/lll/tests/final-hvd/venv/lib/python3.8/site-packages/fsspec/asyn.py", line 144, in _throttled_gather
[0]: results.extend(await asyncio.gather(*chunk, **gather_kwargs))
[0]: File "/home/lll/tests/final-hvd/venv/lib/python3.8/site-packages/adlfs/spec.py", line 1432, in _put_file
[0]: raise FileExistsError("File already exists!")
[0]:FileExistsError: File already exists!

This happens in second epoch. Looks like Store.sync_fn:: fs.put() couldn't overwrite the folder content. Checking whether fs.put() expected to overwrite.

@umashankark
Copy link
Contributor

umashankark commented May 29, 2021

FilesystemStore:

Store.sync_fn():
     """ snip
        self.fs.put(local_run_path, run_path, recursive=True, overwrite=True)

overwrite=True takes care of the error. Will share the minor changes done.

@umashankark
Copy link
Contributor

Tested and pushed changes to #2947. This is PR against fsspec branch. @tgaddair

umashankark and others added 3 commits June 1, 2021 17:20
@tgaddair
Copy link
Collaborator Author

tgaddair commented Jun 2, 2021

@chongxiaoc @irasit please take a look. Should hopefully not break anything on the Uber side.

Copy link
Collaborator

@chongxiaoc chongxiaoc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@tgaddair tgaddair merged commit 7a69711 into master Jun 2, 2021
@tgaddair tgaddair deleted the fsspec branch June 2, 2021 22:22
chongxiaoc added a commit to chongxiaoc/horovod that referenced this pull request Jun 17, 2021
chongxiaoc added a commit to chongxiaoc/horovod that referenced this pull request Jun 17, 2021
…l remote filesystems (horovod#2927)"

This reverts commit 7a69711.

Signed-off-by: Chongxiao Cao <chongxiaoc@uber.com>
chongxiaoc added a commit to chongxiaoc/horovod that referenced this pull request Jun 17, 2021
…l remote filesystems (horovod#2927)"

This reverts commit 7a69711.

Signed-off-by: Chongxiao Cao <chongxiaoc@uber.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

Supporting ADLS storage S3Store on hovorod.spark.common.store
3 participants