From 07e7c4a9aa51af85388cc86f4b3eb2017ece587c Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Fri, 18 Jun 2021 12:52:00 +0300 Subject: [PATCH] implement chunking to the async file system --- fsspec/asyn.py | 40 ++++++++++++++++++++++++-------------- fsspec/tests/test_async.py | 12 ++++++------ 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/fsspec/asyn.py b/fsspec/asyn.py index a4940716d..bc7f5db6b 100644 --- a/fsspec/asyn.py +++ b/fsspec/asyn.py @@ -9,6 +9,7 @@ from contextlib import contextmanager from glob import has_magic +from .callbacks import as_callback, branch from .exceptions import FSTimeoutError from .spec import AbstractFileSystem from .utils import PY36, is_exception, other_paths @@ -154,7 +155,7 @@ def _get_batch_size(): return soft_limit // 8 -async def _throttled_gather(coros, batch_size=None, **gather_kwargs): +async def _run_coros_in_chunks(coros, batch_size=None, callback=None, timeout=None): """Run the given coroutines in smaller chunks to not crossing the file descriptor limit. @@ -162,18 +163,21 @@ async def _throttled_gather(coros, batch_size=None, **gather_kwargs): it is none, it will be inferred from the process resources (soft limit divided by 8) and fallback to 128 if the system doesn't support it.""" + callback = as_callback(callback) if batch_size is None: batch_size = _get_batch_size() if batch_size == -1: - return await asyncio.gather(*coros, **gather_kwargs) + batch_size = len(coros) assert batch_size > 0 results = [] for start in range(0, len(coros), batch_size): chunk = coros[start : start + batch_size] - results.extend(await asyncio.gather(*chunk, **gather_kwargs)) + for coro in asyncio.as_completed(chunk, timeout=timeout): + results.append(await coro) + callback.call("relative_update", 1) return results @@ -340,13 +344,16 @@ async def _put(self, lpath, rpath, recursive=False, **kwargs): fs = LocalFileSystem() lpaths = fs.expand_path(lpath, recursive=recursive) rpaths = other_paths(lpaths, rpath) + callback = as_callback(kwargs.pop("callback", None)) batch_size = kwargs.pop("batch_size", self.batch_size) - return await _throttled_gather( - [ - self._put_file(lpath, rpath, **kwargs) - for lpath, rpath in zip(lpaths, rpaths) - ], - batch_size=batch_size, + + coros = [] + callback.lazy_call("set_size", len, lpaths) + for lpath, rpath in zip(lpaths, rpaths): + branch(callback, lpath, rpath, kwargs) + coros.append(self._get_file(lpath, rpath, **kwargs)) + return await _run_coros_in_chunks( + coros, batch_size=batch_size, callback=callback ) async def _get_file(self, rpath, lpath, **kwargs): @@ -374,13 +381,16 @@ async def _get(self, rpath, lpath, recursive=False, **kwargs): rpaths = await self._expand_path(rpath, recursive=recursive) lpaths = other_paths(rpaths, lpath) [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths] + callback = as_callback(kwargs.pop("callback", None)) batch_size = kwargs.pop("batch_size", self.batch_size) - return await _throttled_gather( - [ - self._get_file(rpath, lpath, **kwargs) - for lpath, rpath in zip(lpaths, rpaths) - ], - batch_size=batch_size, + + coros = [] + callback.lazy_call("set_size", len, lpaths) + for lpath, rpath in zip(lpaths, rpaths): + branch(callback, rpath, lpath, kwargs) + coros.append(self._get_file(rpath, lpath, **kwargs)) + return await _run_coros_in_chunks( + coros, batch_size=batch_size, callback=callback ) async def _isfile(self, path): diff --git a/fsspec/tests/test_async.py b/fsspec/tests/test_async.py index b783cd7f4..4f9955665 100644 --- a/fsspec/tests/test_async.py +++ b/fsspec/tests/test_async.py @@ -8,7 +8,7 @@ import fsspec import fsspec.asyn -from fsspec.asyn import _throttled_gather +from fsspec.asyn import _run_coros_in_chunks def test_sync_methods(): @@ -72,7 +72,7 @@ def test_sync_wrapper_treat_timeout_0_as_none(): @pytest.mark.skipif(sys.version_info < (3, 7), reason="no asyncio.run in <3.7") -def test_throttled_gather(monkeypatch): +def test_run_coros_in_chunks(monkeypatch): total_running = 0 async def runner(): @@ -90,7 +90,7 @@ async def main(**kwargs): total_running = 0 coros = [runner() for _ in range(32)] - results = await _throttled_gather(coros, **kwargs) + results = await _run_coros_in_chunks(coros, **kwargs) for result in results: if isinstance(result, Exception): raise result @@ -99,16 +99,16 @@ async def main(**kwargs): assert sum(asyncio.run(main(batch_size=4))) == 32 with pytest.raises(ValueError): - asyncio.run(main(batch_size=5, return_exceptions=True)) + asyncio.run(main(batch_size=5)) with pytest.raises(ValueError): - asyncio.run(main(batch_size=-1, return_exceptions=True)) + asyncio.run(main(batch_size=-1)) assert sum(asyncio.run(main(batch_size=4))) == 32 monkeypatch.setitem(fsspec.config.conf, "gather_batch_size", 5) with pytest.raises(ValueError): - asyncio.run(main(return_exceptions=True)) + asyncio.run(main()) assert sum(asyncio.run(main(batch_size=4))) == 32 # override monkeypatch.setitem(fsspec.config.conf, "gather_batch_size", 4)