diff --git a/docs/source/api.rst b/docs/source/api.rst index 443ebcd4b..9847d6577 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -43,6 +43,10 @@ Base Classes fsspec.dircache.DirCache fsspec.registry.ReadOnlyRegistry fsspec.registry.register_implementation + fsspec.callbacks.Callback + fsspec.callbacks.callback + fsspec.callbacks.as_callback + fsspec.callbacks.branch .. autoclass:: fsspec.spec.AbstractFileSystem :members: @@ -77,6 +81,15 @@ Base Classes .. autofunction:: fsspec.registry.register_implementation +.. autoclass:: fsspec.callbacks.Callback + :members: + +.. autofunction:: fsspec.callbacks.callback + +.. autofunction:: fsspec.callbacks.as_callback + +.. autofunction:: fsspec.callbacks.branch + .. _implementations: Built-in Implementations diff --git a/fsspec/__init__.py b/fsspec/__init__.py index 30944cab0..6803073a3 100644 --- a/fsspec/__init__.py +++ b/fsspec/__init__.py @@ -9,6 +9,7 @@ from . import caching from ._version import get_versions +from .callbacks import callback from .core import get_fs_token_paths, open, open_files, open_local from .exceptions import FSTimeoutError from .mapping import FSMap, get_mapper @@ -38,6 +39,7 @@ "open_local", "registry", "caching", + "callback", ] if entry_points is not None: diff --git a/fsspec/asyn.py b/fsspec/asyn.py index d1b8b33a6..1413c822a 100644 --- a/fsspec/asyn.py +++ b/fsspec/asyn.py @@ -9,6 +9,7 @@ from contextlib import contextmanager from glob import has_magic +from .callbacks import as_callback, branch from .exceptions import FSTimeoutError from .spec import AbstractFileSystem from .utils import PY36, is_exception, other_paths @@ -184,7 +185,7 @@ def _get_batch_size(): return soft_limit // 8 -async def _throttled_gather(coros, batch_size=None, **gather_kwargs): +async def _run_coros_in_chunks(coros, batch_size=None, callback=None, timeout=None): """Run the given coroutines in smaller chunks to not crossing the file descriptor limit. @@ -192,18 +193,21 @@ async def _throttled_gather(coros, batch_size=None, **gather_kwargs): it is none, it will be inferred from the process resources (soft limit divided by 8) and fallback to 128 if the system doesn't support it.""" + callback = as_callback(callback) if batch_size is None: batch_size = _get_batch_size() if batch_size == -1: - return await asyncio.gather(*coros, **gather_kwargs) + batch_size = len(coros) assert batch_size > 0 results = [] for start in range(0, len(coros), batch_size): chunk = coros[start : start + batch_size] - results.extend(await asyncio.gather(*chunk, **gather_kwargs)) + for coro in asyncio.as_completed(chunk, timeout=timeout): + results.append(await coro) + callback.call("relative_update", 1) return results @@ -376,13 +380,17 @@ async def _put(self, lpath, rpath, recursive=False, **kwargs): await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs]) files = sorted(set(lpaths) - set(dirs)) rpaths = other_paths(files, rpath) + callback = as_callback(kwargs.pop("callback", None)) batch_size = kwargs.pop("batch_size", self.batch_size) - return await _throttled_gather( - [ - self._put_file(lpath, rpath, **kwargs) - for lpath, rpath in zip(files, rpaths) - ], - batch_size=batch_size, + + coros = [] + callback.call("set_size", len(files)) + for lpath, rpath in zip(files, rpaths): + branch(callback, lpath, rpath, kwargs) + coros.append(self._put_file(lpath, rpath, **kwargs)) + + return await _run_coros_in_chunks( + coros, batch_size=batch_size, callback=callback ) async def _get_file(self, rpath, lpath, **kwargs): @@ -410,13 +418,16 @@ async def _get(self, rpath, lpath, recursive=False, **kwargs): rpaths = await self._expand_path(rpath, recursive=recursive) lpaths = other_paths(rpaths, lpath) [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths] + callback = as_callback(kwargs.pop("callback", None)) batch_size = kwargs.pop("batch_size", self.batch_size) - return await _throttled_gather( - [ - self._get_file(rpath, lpath, **kwargs) - for lpath, rpath in zip(lpaths, rpaths) - ], - batch_size=batch_size, + + coros = [] + callback.lazy_call("set_size", len, lpaths) + for lpath, rpath in zip(lpaths, rpaths): + branch(callback, rpath, lpath, kwargs) + coros.append(self._get_file(rpath, lpath, **kwargs)) + return await _run_coros_in_chunks( + coros, batch_size=batch_size, callback=callback ) async def _isfile(self, path): diff --git a/fsspec/callbacks.py b/fsspec/callbacks.py new file mode 100644 index 000000000..d0d532c94 --- /dev/null +++ b/fsspec/callbacks.py @@ -0,0 +1,199 @@ +from .utils import stringify_path + + +class Callback: + __slots__ = ["properties", "hooks"] + + def __init__(self, properties=None, **hooks): + self.hooks = hooks + self.properties = properties or {} + + def call(self, hook, *args, **kwargs): + """Make a callback to a hook named ``hook``. If it can't + find the hook, then this function will return None. Otherwise + the return value of the hook will be used. + + Parameters + ---------- + hook: str + The name of the hook + *args: Any + All the positional arguments that will be passed + to the ``hook``, if found. + **kwargs: Any + All the keyword arguments that will be passed + tot the ``hook``, if found. + """ + callback = self.hooks.get(hook) + if callback is not None: + return callback(*args, **kwargs) + + def lazy_call(self, hook, func, *args, **kwargs): + """Make a callback to a hook named ``hook`` with a + single value that will be lazily obtained from a call + to the ``func`` with ``*args`` and ``**kwargs``. + + This method should be used for expensive operations, + e.g ``len(data)`` since if there is no hook for the + given ``hook`` parameter, that operation will be wasted. + With this method, it will only evaluate function, if there + is a hook attached to the given ``hook`` parameter. + + Parameters + ---------- + hook: str + The name of the hook + func: Callable[..., Any] + Function that will be called and passed as the argument + to the hook, if found. + *args: Any + All the positional arguments that will be passed + to the ``func``, if ``hook`` is found. + **kwargs: Any + All the keyword arguments that will be passed + tot the ``func``, if ``hook` is found. + """ + callback = self.hooks.get(hook) + if callback is not None: + return callback(func(*args, **kwargs)) + + def wrap(self, iterable): + """Wrap an iterable to send ``relative_update`` hook + on each iterations. + + Parameters + ---------- + iterable: Iterable + The iterable that is being wrapped + """ + for item in iterable: + self.call("relative_update", 1) + yield item + + +class NoOpCallback(Callback): + def call(self, hook, *args, **kwargs): + return None + + def lazy_call(self, hook, *args, **kwargs): + return None + + +_DEFAULT_CALLBACK = NoOpCallback() + + +def callback( + *, + set_size=None, + relative_update=None, + absolute_update=None, + branch=None, + properties=None, + **hooks, +): + """Create a new callback for filesystem APIs. + + Parameters + ---------- + set_size: Callable[[Optional[int]], None] (optional) + When transferring something quantifiable (e.g bytes in a file, or + number of files), this hook will be called with the total number of + items. Might set something to None, in that case it should be ignored. + + relative_update: Callable[[int], None] (optional) + Update the total transferred items relative to the previous position. + If the current cursor is at N, and a relative_update(Q) happens then + the current cursor should now point at the N+Q. + + absolute_update: Callable[[int], None] (optional) + Update the total transferred items to an absolute position. If + the current cursor is at N, and a absolute_update(Q) happens then + the current cursor should now point at the Q. If another one happens + it will override the current value. + + branch: Callable[[os.PathLike, os.PathLike], Optional[fsspec.callbacks.Callback]] (optional) + When some operations need branching (e.g each ``put()``/``get()` + operation have their own callbacks, but they will also need to + branch out for ``put_file()``/``get_file()`` since those might + require additional child callbacks) the branch hook will be called + with the paths that are being transffered and it is expected to + either return a new fsspec.callbacks.Callback instance or None. If + ``stringify_paths`` property is set, the paths will be casted to + string, and if ``posixify_paths`` property is set both arguments + will be sanitized to the posix convention. + + properties: Dict[str, Any] (optional) + A mapping of config option (callback related) to their values. + + hooks: Callable[..., Any] + Optional hooks that are not generally available. + + Returns + ------- + fsspec.callback.Callback + """ # noqa: E501 + + return Callback( + properties=properties, + set_size=set_size, + relative_update=relative_update, + absolute_update=absolute_update, + branch=branch, + **hooks, + ) + + +def as_callback(maybe_callback): + """Return the no-op callback if the maybe_callback parameter is None + + Parameters + ---------- + maybe_callback: fsspec.callback.Callback or None + + Returns + ------- + fsspec.callback.Callback + """ + if maybe_callback is None: + return _DEFAULT_CALLBACK + else: + return maybe_callback + + +def branch(callback, path_1, path_2, kwargs=None): + """Branch out from an existing callback. + + Parameters + ---------- + callback: fsspec.callback.Callback + Parent callback + path_1: os.PathLike + Left path + path_2: os.PathLike + Right path + kwargs: Dict[str, Any] (optional) + Update the ``callback`` key on the given ``kwargs`` + if there is a brancher attached to the ``callback``. + + + Returns + ------- + fsspec.callback.Callback or None + """ + from .implementations.local import make_path_posix + + if callback.properties.get("stringify_paths"): + path_1 = stringify_path(path_1) + path_2 = stringify_path(path_2) + + if callback.properties.get("posixify_paths"): + path_1 = make_path_posix(path_1) + path_2 = make_path_posix(path_2) + + branched = callback.call("branch", path_1, path_2) + if branched is None or branched is _DEFAULT_CALLBACK: + return None + + if kwargs is not None: + kwargs["callback"] = branched + return branched diff --git a/fsspec/implementations/cached.py b/fsspec/implementations/cached.py index 4d38ee076..a450f272d 100644 --- a/fsspec/implementations/cached.py +++ b/fsspec/implementations/cached.py @@ -8,6 +8,7 @@ from shutil import move, rmtree from fsspec import AbstractFileSystem, filesystem +from fsspec.callbacks import as_callback from fsspec.compression import compr from fsspec.core import BaseCache, MMapCache from fsspec.spec import AbstractBufferedFile @@ -543,6 +544,7 @@ def _make_local_details(self, path): return fn def cat(self, path, recursive=False, on_error="raise", **kwargs): + callback = as_callback(kwargs.pop("callback", None)) paths = self.expand_path( path, recursive=recursive, maxdepth=kwargs.get("maxdepth", None) ) @@ -561,7 +563,12 @@ def cat(self, path, recursive=False, on_error="raise", **kwargs): if getpaths: self.fs.get(getpaths, storepaths) self.save_cache() - out = {path: open(fn, "rb").read() for path, fn in zip(paths, fns)} + + out = {} + callback.lazy_call("set_size", len, paths) + for path, fn in zip(paths, fns): + out[path] = open(fn, "rb").read() + callback.call("relative_update", 1) if isinstance(path, str) and len(paths) == 1 and recursive is False: out = out[paths[0]] return out diff --git a/fsspec/implementations/local.py b/fsspec/implementations/local.py index f352bf584..ce5c93370 100644 --- a/fsspec/implementations/local.py +++ b/fsspec/implementations/local.py @@ -112,10 +112,10 @@ def cp_file(self, path1, path2, **kwargs): else: raise FileNotFoundError - def get_file(self, path1, path2, **kwargs): + def get_file(self, path1, path2, callback=None, **kwargs): return self.cp_file(path1, path2, **kwargs) - def put_file(self, path1, path2, **kwargs): + def put_file(self, path1, path2, callback=None, **kwargs): return self.cp_file(path1, path2, **kwargs) def mv_file(self, path1, path2, **kwargs): diff --git a/fsspec/implementations/reference.py b/fsspec/implementations/reference.py index d17f8c418..ce556cb3b 100644 --- a/fsspec/implementations/reference.py +++ b/fsspec/implementations/reference.py @@ -10,6 +10,7 @@ import json from ..asyn import AsyncFileSystem, sync +from ..callbacks import as_callback from ..core import filesystem, open from ..mapping import get_mapper from ..spec import AbstractFileSystem @@ -199,9 +200,12 @@ async def _get_file(self, rpath, lpath, **kwargs): f.write(data) def get_file(self, rpath, lpath, **kwargs): + callback = as_callback(kwargs.pop("callback", None)) data = self.cat_file(rpath, **kwargs) + callback.lazy_call("set_size", len, data) with open(lpath, "wb") as f: f.write(data) + callback.lazy_call("absolute_update", len, data) def get(self, rpath, lpath, recursive=False, **kwargs): if self.fs.async_impl: diff --git a/fsspec/implementations/sftp.py b/fsspec/implementations/sftp.py index a47e73d6b..6ba5bfb06 100644 --- a/fsspec/implementations/sftp.py +++ b/fsspec/implementations/sftp.py @@ -119,11 +119,11 @@ def ls(self, path, detail=False): paths = [stat["name"] for stat in stats] return sorted(paths) - def put(self, lpath, rpath): + def put(self, lpath, rpath, callback=None, **kwargs): logger.debug("Put file %s into %s" % (lpath, rpath)) self.ftp.put(lpath, rpath) - def get(self, rpath, lpath): + def get(self, rpath, lpath, callback=None, **kwargs): logger.debug("Get file %s into %s" % (rpath, lpath)) self.ftp.get(rpath, lpath) diff --git a/fsspec/implementations/zip.py b/fsspec/implementations/zip.py index 276760b03..393e61f84 100644 --- a/fsspec/implementations/zip.py +++ b/fsspec/implementations/zip.py @@ -81,7 +81,7 @@ def _get_dirs(self): ) self.dir_cache[f["name"]] = f - def cat(self, path): + def cat(self, path, callback=None, **kwargs): return self.zip.read(path) def _open( diff --git a/fsspec/spec.py b/fsspec/spec.py index 13a775799..3c4cdbcfd 100644 --- a/fsspec/spec.py +++ b/fsspec/spec.py @@ -9,6 +9,7 @@ from glob import has_magic from hashlib import sha256 +from .callbacks import as_callback, branch from .config import apply_config, conf from .dircache import DirCache from .transaction import Transaction @@ -730,13 +731,17 @@ def get_file(self, rpath, lpath, **kwargs): """Copy single remote file to local""" if self.isdir(rpath): os.makedirs(lpath, exist_ok=True) - else: - with self.open(rpath, "rb", **kwargs) as f1: - with open(lpath, "wb") as f2: - data = True - while data: - data = f1.read(self.blocksize) - f2.write(data) + return None + + callback = as_callback(kwargs.pop("callback", None)) + with self.open(rpath, "rb", **kwargs) as f1: + callback.call("set_size", getattr(f1, "size", None)) + with open(lpath, "wb") as f2: + data = True + while data: + data = f1.read(self.blocksize) + segment_len = f2.write(data) + callback.call("relative_update", segment_len) def get(self, rpath, lpath, recursive=False, **kwargs): """Copy file(s) to local. @@ -750,25 +755,35 @@ def get(self, rpath, lpath, recursive=False, **kwargs): """ from .implementations.local import make_path_posix + callback = as_callback(kwargs.pop("callback", None)) if isinstance(lpath, str): lpath = make_path_posix(lpath) rpaths = self.expand_path(rpath, recursive=recursive) lpaths = other_paths(rpaths, lpath) - for lpath, rpath in zip(lpaths, rpaths): + + callback.lazy_call("set_size", len, lpaths) + for lpath, rpath in callback.wrap(zip(lpaths, rpaths)): + branch(callback, rpath, lpath, kwargs) self.get_file(rpath, lpath, **kwargs) def put_file(self, lpath, rpath, **kwargs): """Copy single file to remote""" if os.path.isdir(lpath): self.makedirs(rpath, exist_ok=True) - else: - with open(lpath, "rb") as f1: - self.mkdirs(os.path.dirname(rpath), exist_ok=True) - with self.open(rpath, "wb", **kwargs) as f2: - data = True - while data: - data = f1.read(self.blocksize) - f2.write(data) + return None + + callback = as_callback(kwargs.pop("callback", None)) + with open(lpath, "rb") as f1: + callback.call("set_size", f1.seek(0, 2)) + f1.seek(0) + + self.mkdirs(os.path.dirname(rpath), exist_ok=True) + with self.open(rpath, "wb", **kwargs) as f2: + data = True + while data: + data = f1.read(self.blocksize) + segment_len = f2.write(data) + callback.call("relative_update", segment_len) def put(self, lpath, rpath, recursive=False, **kwargs): """Copy file(s) from local. @@ -781,6 +796,7 @@ def put(self, lpath, rpath, recursive=False, **kwargs): """ from .implementations.local import LocalFileSystem, make_path_posix + callback = as_callback(kwargs.pop("callback", None)) rpath = ( self._strip_protocol(rpath) if isinstance(rpath, str) @@ -792,7 +808,9 @@ def put(self, lpath, rpath, recursive=False, **kwargs): lpaths = fs.expand_path(lpath, recursive=recursive) rpaths = other_paths(lpaths, rpath) - for lpath, rpath in zip(lpaths, rpaths): + callback.lazy_call("set_size", len, rpaths) + for lpath, rpath in callback.wrap(zip(lpaths, rpaths)): + branch(callback, lpath, rpath, kwargs) self.put_file(lpath, rpath, **kwargs) def head(self, path, size=1024): diff --git a/fsspec/tests/test_async.py b/fsspec/tests/test_async.py index 3217137fa..08aba5f20 100644 --- a/fsspec/tests/test_async.py +++ b/fsspec/tests/test_async.py @@ -8,7 +8,7 @@ import fsspec import fsspec.asyn -from fsspec.asyn import _throttled_gather, get_running_loop +from fsspec.asyn import _run_coros_in_chunks, get_running_loop def test_sync_methods(): @@ -72,7 +72,7 @@ def test_sync_wrapper_treat_timeout_0_as_none(): @pytest.mark.skipif(sys.version_info < (3, 7), reason="no asyncio.run in <3.7") -def test_throttled_gather(monkeypatch): +def test_run_coros_in_chunks(monkeypatch): total_running = 0 async def runner(): @@ -90,7 +90,7 @@ async def main(**kwargs): total_running = 0 coros = [runner() for _ in range(32)] - results = await _throttled_gather(coros, **kwargs) + results = await _run_coros_in_chunks(coros, **kwargs) for result in results: if isinstance(result, Exception): raise result @@ -99,16 +99,16 @@ async def main(**kwargs): assert sum(asyncio.run(main(batch_size=4))) == 32 with pytest.raises(ValueError): - asyncio.run(main(batch_size=5, return_exceptions=True)) + asyncio.run(main(batch_size=5)) with pytest.raises(ValueError): - asyncio.run(main(batch_size=-1, return_exceptions=True)) + asyncio.run(main(batch_size=-1)) assert sum(asyncio.run(main(batch_size=4))) == 32 monkeypatch.setitem(fsspec.config.conf, "gather_batch_size", 5) with pytest.raises(ValueError): - asyncio.run(main(return_exceptions=True)) + asyncio.run(main()) assert sum(asyncio.run(main(batch_size=4))) == 32 # override monkeypatch.setitem(fsspec.config.conf, "gather_batch_size", 4) diff --git a/fsspec/tests/test_callbacks.py b/fsspec/tests/test_callbacks.py new file mode 100644 index 000000000..97686e605 --- /dev/null +++ b/fsspec/tests/test_callbacks.py @@ -0,0 +1,52 @@ +from fsspec import callbacks + + +def test_callbacks(): + empty_callback = callbacks.callback() + assert empty_callback.call("something", "somearg") is None + + simple_callback = callbacks.callback(something=lambda arg: arg + 2) + assert simple_callback.call("something", 2) == 4 + + multi_arg_callback = callbacks.callback(something=lambda arg1, arg2: arg1 + arg2) + assert multi_arg_callback.call("something", 2, 2) == 4 + + +def test_callbacks_as_callback(): + empty_callback = callbacks.as_callback(None) + assert empty_callback.call("something", "somearg") is None + assert callbacks.as_callback(None) is callbacks.as_callback(None) + + real_callback = callbacks.as_callback( + callbacks.callback(something=lambda arg: arg + 2) + ) + assert real_callback.call("something", 2) == 4 + + +def test_callbacks_lazy_call(): + empty_callback = callbacks.as_callback(None) + simple_callback = callbacks.callback(something=lambda arg: arg + 2) + + total_called = 0 + + def expensive_func(n): + nonlocal total_called + total_called += 1 + return n + + assert empty_callback.lazy_call("something", expensive_func, 8) is None + assert simple_callback.lazy_call("nonexistent callback", expensive_func, 8) is None + assert total_called == 0 + + assert simple_callback.lazy_call("something", expensive_func, 8) == 10 + assert total_called == 1 + + +def test_callbacks_wrap(): + events = [] + callback = callbacks.callback(relative_update=events.append) + for _ in callback.wrap(range(10)): + ... + + assert len(events) == 10 + assert sum(events) == 10 diff --git a/fsspec/tests/test_spec.py b/fsspec/tests/test_spec.py index 74dac81dd..5ac6f586b 100644 --- a/fsspec/tests/test_spec.py +++ b/fsspec/tests/test_spec.py @@ -1,5 +1,8 @@ import json +import os import pickle +from collections import defaultdict +from functools import partial import numpy as np import pytest @@ -431,3 +434,133 @@ def test_readinto_with_multibyte(ftp_writable, tmpdir, dt): fp.readinto(arr2) assert np.array_equal(arr, arr2) + + +class DummyOpenFS(DummyTestFS): + blocksize = 10 + + def _open(self, path, mode="rb", **kwargs): + stream = open(path, mode) + stream.size = os.stat(path).st_size + return stream + + +def get_basic_callback(): + events = [] + + def make_event(event_type, *event_args): + events.append((event_type, *event_args)) + + callback = fsspec.callback( + set_size=partial(make_event, "set_size"), + relative_update=partial(make_event, "relative_update"), + ) + return events, callback + + +def imitate_transfer(size, chunk, *, file=True): + events = [("set_size", size)] + events.extend(("relative_update", size // chunk) for _ in range(chunk)) + if file: + # The reason that there is a relative_update(0) at the + # end is that, we don't have an early exit on the + # impleementations of get_file/put_file so it needs to + # go through the callback to get catch by the while's + # condition and then it will stop the transfer. + events.append(("relative_update", 0)) + + return events + + +def get_files(tmpdir, amount=10): + src, dest, base = [], [], [] + for index in range(amount): + src_path = tmpdir / f"src_{index}.txt" + src_path.write_text("x" * 50, "utf-8") + + src.append(str(src_path)) + dest.append(str(tmpdir / f"dst_{index}.txt")) + base.append(str(tmpdir / f"file_{index}.txt")) + return src, dest, base + + +def test_dummy_callbacks_file(tmpdir): + fs = DummyOpenFS() + events, callback = get_basic_callback() + + file = tmpdir / "file.txt" + source = tmpdir / "tmp.txt" + destination = tmpdir / "tmp2.txt" + + size = 100 + source.write_text("x" * 100, "utf-8") + + fs.put_file(source, file, callback=callback) + assert events == imitate_transfer(size, 10) + events.clear() + + fs.get_file(file, destination, callback=callback) + assert events == imitate_transfer(size, 10) + events.clear() + + assert destination.read_text("utf-8") == "x" * 100 + + +def test_dummy_callbacks_files(tmpdir): + fs = DummyOpenFS() + events, callback = get_basic_callback() + src, dest, base = get_files(tmpdir) + + fs.put(src, base, callback=callback) + assert events == imitate_transfer(10, 10, file=False) + events.clear() + + fs.get(base, dest, callback=callback) + assert events == imitate_transfer(10, 10, file=False) + events.clear() + + +def test_dummy_callbacks_files_branched(tmpdir): + fs = DummyOpenFS() + src, dest, base = get_files(tmpdir) + + events = defaultdict(list) + + def make_event(origin, event_type, *event_args): + events[origin].append((event_type, *event_args)) + + def make_callback(*args, **kwargs): + return fsspec.callback( + set_size=partial(make_event, args, "set_size"), + relative_update=partial(make_event, args, "relative_update"), + **kwargs, + ) + + callback = make_callback( + "top-level", + branch=make_callback, + properties={"stringify_paths": True, "posixify_paths": True}, + ) + + def check_events(lpaths, rpaths): + from fsspec.implementations.local import make_path_posix + + base_keys = zip(make_path_posix(lpaths), make_path_posix(rpaths)) + assert set(events.keys()) == {("top-level",), *base_keys} + assert ( + events[ + "top-level", + ] + == imitate_transfer(10, 10, file=False) + ) + + for key in base_keys: + assert events[key] == imitate_transfer(50, 5) + + fs.put(src, base, callback=callback) + check_events(src, base) + events.clear() + + fs.get(base, dest, callback=callback) + check_events(base, dest) + events.clear()