Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing zarr chunks written directly to bucket in parallel #327

Open
rafa-guedes opened this issue Jan 13, 2021 · 1 comment
Open

Missing zarr chunks written directly to bucket in parallel #327

rafa-guedes opened this issue Jan 13, 2021 · 1 comment

Comments

@rafa-guedes
Copy link

We have recently started writing large zarr archives directly to google buckets in parallel using fsspec/gcsfs and the new regions capability in latest xarray release. We have found some intermittent issues that appear to be related to gcsfs (they are not seen when writing locally to disk).

The most serious one is that data in some chunks are not written but the process does no fail. Frequently data are missing for the last chunk along a certain dimension in the dataset. The problem is intermittent and only a small portion of the data fail to write (normally 3% or less) so it is difficult to reproduce.

I wonder if this could perhaps have to do with the rate limit of 1000 object write / sec in google buckets being exceeded or if it is related with fsspec/filesystem_spec#255.

Another symptoms we have noted - sometimes we cannot read the data in the zarr archive, we get a RuntimeError: error during blosc decompression: 0 error. In other cases the process does fail with an error such as the example tracebacks below which are similar to pydata/xarray#4704 and #316, but it tends to succeed when we rerun it.

Example traceback 1
    self.chunk_store.setitems({k: v for k, v in zip(ckeys, cdatas)})
  File "/usr/local/lib/python3.8/dist-packages/fsspec/mapping.py", line 110, in setitems
    self.fs.pipe(values)
  File "/usr/local/lib/python3.8/dist-packages/fsspec/asyn.py", line 121, in wrapper
    return maybe_sync(func, self, *args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/fsspec/asyn.py", line 100, in maybe_sync
    return sync(loop, func, *args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/fsspec/asyn.py", line 71, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.8/dist-packages/fsspec/asyn.py", line 55, in f
    result[0] = await future
  File "/usr/local/lib/python3.8/dist-packages/fsspec/asyn.py", line 222, in _pipe
    await asyncio.gather(
  File "/usr/local/lib/python3.8/dist-packages/gcsfs/core.py", line 1007, in _pipe_file
    return await simple_upload(
  File "/usr/local/lib/python3.8/dist-packages/gcsfs/core.py", line 1523, in simple_upload
    j = await fs._call(
  File "/usr/local/lib/python3.8/dist-packages/gcsfs/core.py", line 525, in _call
    raise e
  File "/usr/local/lib/python3.8/dist-packages/gcsfs/core.py", line 507, in _call
    self.validate_response(status, contents, json, path, headers)
  File "/usr/local/lib/python3.8/dist-packages/gcsfs/core.py", line 1228, in validate_response
    raise HttpError(error)
gcsfs.utils.HttpError: Required
Example traceback 2
    for key in sorted(listdir(self._store, self._path)):
  File "/usr/local/lib/python3.8/dist-packages/zarr/storage.py", line 169, in listdir
    return _listdir_from_keys(store, path)
  File "/usr/local/lib/python3.8/dist-packages/zarr/storage.py", line 151, in _listdir_from_keys
    for key in list(store.keys()):
  File "/usr/lib/python3.8/_collections_abc.py", line 702, in __len__
    return len(self._mapping)
  File "/usr/local/lib/python3.8/dist-packages/fsspec/mapping.py", line 157, in __len__
    return len(self.fs.find(self.root))
  File "/usr/local/lib/python3.8/dist-packages/gcsfs/core.py", line 1085, in find
    out, _ = sync(self.loop, self._do_list_objects, path, delimiter=None)
  File "/usr/local/lib/python3.8/dist-packages/fsspec/asyn.py", line 71, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.8/dist-packages/fsspec/asyn.py", line 55, in f
    result[0] = await future
  File "/usr/local/lib/python3.8/dist-packages/gcsfs/core.py", line 641, in _do_list_objects
    page = await self._call(
  File "/usr/local/lib/python3.8/dist-packages/gcsfs/core.py", line 525, in _call
    raise e
  File "/usr/local/lib/python3.8/dist-packages/gcsfs/core.py", line 507, in _call
    self.validate_response(status, contents, json, path, headers)
  File "/usr/local/lib/python3.8/dist-packages/gcsfs/core.py", line 1230, in validate_response
    raise HttpError({"code": status})
gcsfs.utils.HttpError

Environment:

  • Dask version: dask-2020.12.0
  • Python version: 3.8
  • Operating System: Ubuntu-20.04
  • Install method (conda, pip, source): pip
@martindurant
Copy link
Member

to do with the rate limit of 1000 object write / sec

Maybe there's a reason to throttle the amount of work being done, then, or submit the requests in batches...

cc fsspec/filesystem_spec#488 , maybe throttling would help there too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants