Skip to content

Commit

Permalink
Merge pull request #675 from isidentical/progress
Browse files Browse the repository at this point in the history
callbacks: Implement fsspec.callbacks
  • Loading branch information
martindurant committed Jul 2, 2021
2 parents f4671e8 + 565ff92 commit 47f72bc
Show file tree
Hide file tree
Showing 13 changed files with 483 additions and 44 deletions.
13 changes: 13 additions & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ Base Classes
fsspec.dircache.DirCache
fsspec.registry.ReadOnlyRegistry
fsspec.registry.register_implementation
fsspec.callbacks.Callback
fsspec.callbacks.callback
fsspec.callbacks.as_callback
fsspec.callbacks.branch

.. autoclass:: fsspec.spec.AbstractFileSystem
:members:
Expand Down Expand Up @@ -77,6 +81,15 @@ Base Classes

.. autofunction:: fsspec.registry.register_implementation

.. autoclass:: fsspec.callbacks.Callback
:members:

.. autofunction:: fsspec.callbacks.callback

.. autofunction:: fsspec.callbacks.as_callback

.. autofunction:: fsspec.callbacks.branch

.. _implementations:

Built-in Implementations
Expand Down
2 changes: 2 additions & 0 deletions fsspec/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from . import caching
from ._version import get_versions
from .callbacks import callback
from .core import get_fs_token_paths, open, open_files, open_local
from .exceptions import FSTimeoutError
from .mapping import FSMap, get_mapper
Expand Down Expand Up @@ -38,6 +39,7 @@
"open_local",
"registry",
"caching",
"callback",
]

if entry_points is not None:
Expand Down
41 changes: 26 additions & 15 deletions fsspec/asyn.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from contextlib import contextmanager
from glob import has_magic

from .callbacks import as_callback, branch
from .exceptions import FSTimeoutError
from .spec import AbstractFileSystem
from .utils import PY36, is_exception, other_paths
Expand Down Expand Up @@ -184,26 +185,29 @@ def _get_batch_size():
return soft_limit // 8


async def _throttled_gather(coros, batch_size=None, **gather_kwargs):
async def _run_coros_in_chunks(coros, batch_size=None, callback=None, timeout=None):
"""Run the given coroutines in smaller chunks to
not crossing the file descriptor limit.
If batch_size parameter is -1, then it will not be any throttling. If
it is none, it will be inferred from the process resources (soft limit divided
by 8) and fallback to 128 if the system doesn't support it."""

callback = as_callback(callback)
if batch_size is None:
batch_size = _get_batch_size()

if batch_size == -1:
return await asyncio.gather(*coros, **gather_kwargs)
batch_size = len(coros)

assert batch_size > 0

results = []
for start in range(0, len(coros), batch_size):
chunk = coros[start : start + batch_size]
results.extend(await asyncio.gather(*chunk, **gather_kwargs))
for coro in asyncio.as_completed(chunk, timeout=timeout):
results.append(await coro)
callback.call("relative_update", 1)
return results


Expand Down Expand Up @@ -376,13 +380,17 @@ async def _put(self, lpath, rpath, recursive=False, **kwargs):
await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs])
files = sorted(set(lpaths) - set(dirs))
rpaths = other_paths(files, rpath)
callback = as_callback(kwargs.pop("callback", None))
batch_size = kwargs.pop("batch_size", self.batch_size)
return await _throttled_gather(
[
self._put_file(lpath, rpath, **kwargs)
for lpath, rpath in zip(files, rpaths)
],
batch_size=batch_size,

coros = []
callback.call("set_size", len(files))
for lpath, rpath in zip(files, rpaths):
branch(callback, lpath, rpath, kwargs)
coros.append(self._put_file(lpath, rpath, **kwargs))

return await _run_coros_in_chunks(
coros, batch_size=batch_size, callback=callback
)

async def _get_file(self, rpath, lpath, **kwargs):
Expand Down Expand Up @@ -410,13 +418,16 @@ async def _get(self, rpath, lpath, recursive=False, **kwargs):
rpaths = await self._expand_path(rpath, recursive=recursive)
lpaths = other_paths(rpaths, lpath)
[os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths]
callback = as_callback(kwargs.pop("callback", None))
batch_size = kwargs.pop("batch_size", self.batch_size)
return await _throttled_gather(
[
self._get_file(rpath, lpath, **kwargs)
for lpath, rpath in zip(lpaths, rpaths)
],
batch_size=batch_size,

coros = []
callback.lazy_call("set_size", len, lpaths)
for lpath, rpath in zip(lpaths, rpaths):
branch(callback, rpath, lpath, kwargs)
coros.append(self._get_file(rpath, lpath, **kwargs))
return await _run_coros_in_chunks(
coros, batch_size=batch_size, callback=callback
)

async def _isfile(self, path):
Expand Down
199 changes: 199 additions & 0 deletions fsspec/callbacks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
from .utils import stringify_path


class Callback:
__slots__ = ["properties", "hooks"]

def __init__(self, properties=None, **hooks):
self.hooks = hooks
self.properties = properties or {}

def call(self, hook, *args, **kwargs):
"""Make a callback to a hook named ``hook``. If it can't
find the hook, then this function will return None. Otherwise
the return value of the hook will be used.
Parameters
----------
hook: str
The name of the hook
*args: Any
All the positional arguments that will be passed
to the ``hook``, if found.
**kwargs: Any
All the keyword arguments that will be passed
tot the ``hook``, if found.
"""
callback = self.hooks.get(hook)
if callback is not None:
return callback(*args, **kwargs)

def lazy_call(self, hook, func, *args, **kwargs):
"""Make a callback to a hook named ``hook`` with a
single value that will be lazily obtained from a call
to the ``func`` with ``*args`` and ``**kwargs``.
This method should be used for expensive operations,
e.g ``len(data)`` since if there is no hook for the
given ``hook`` parameter, that operation will be wasted.
With this method, it will only evaluate function, if there
is a hook attached to the given ``hook`` parameter.
Parameters
----------
hook: str
The name of the hook
func: Callable[..., Any]
Function that will be called and passed as the argument
to the hook, if found.
*args: Any
All the positional arguments that will be passed
to the ``func``, if ``hook`` is found.
**kwargs: Any
All the keyword arguments that will be passed
tot the ``func``, if ``hook` is found.
"""
callback = self.hooks.get(hook)
if callback is not None:
return callback(func(*args, **kwargs))

def wrap(self, iterable):
"""Wrap an iterable to send ``relative_update`` hook
on each iterations.
Parameters
----------
iterable: Iterable
The iterable that is being wrapped
"""
for item in iterable:
self.call("relative_update", 1)
yield item


class NoOpCallback(Callback):
def call(self, hook, *args, **kwargs):
return None

def lazy_call(self, hook, *args, **kwargs):
return None


_DEFAULT_CALLBACK = NoOpCallback()


def callback(
*,
set_size=None,
relative_update=None,
absolute_update=None,
branch=None,
properties=None,
**hooks,
):
"""Create a new callback for filesystem APIs.
Parameters
----------
set_size: Callable[[Optional[int]], None] (optional)
When transferring something quantifiable (e.g bytes in a file, or
number of files), this hook will be called with the total number of
items. Might set something to None, in that case it should be ignored.
relative_update: Callable[[int], None] (optional)
Update the total transferred items relative to the previous position.
If the current cursor is at N, and a relative_update(Q) happens then
the current cursor should now point at the N+Q.
absolute_update: Callable[[int], None] (optional)
Update the total transferred items to an absolute position. If
the current cursor is at N, and a absolute_update(Q) happens then
the current cursor should now point at the Q. If another one happens
it will override the current value.
branch: Callable[[os.PathLike, os.PathLike], Optional[fsspec.callbacks.Callback]] (optional)
When some operations need branching (e.g each ``put()``/``get()`
operation have their own callbacks, but they will also need to
branch out for ``put_file()``/``get_file()`` since those might
require additional child callbacks) the branch hook will be called
with the paths that are being transffered and it is expected to
either return a new fsspec.callbacks.Callback instance or None. If
``stringify_paths`` property is set, the paths will be casted to
string, and if ``posixify_paths`` property is set both arguments
will be sanitized to the posix convention.
properties: Dict[str, Any] (optional)
A mapping of config option (callback related) to their values.
hooks: Callable[..., Any]
Optional hooks that are not generally available.
Returns
-------
fsspec.callback.Callback
""" # noqa: E501

return Callback(
properties=properties,
set_size=set_size,
relative_update=relative_update,
absolute_update=absolute_update,
branch=branch,
**hooks,
)


def as_callback(maybe_callback):
"""Return the no-op callback if the maybe_callback parameter is None
Parameters
----------
maybe_callback: fsspec.callback.Callback or None
Returns
-------
fsspec.callback.Callback
"""
if maybe_callback is None:
return _DEFAULT_CALLBACK
else:
return maybe_callback


def branch(callback, path_1, path_2, kwargs=None):
"""Branch out from an existing callback.
Parameters
----------
callback: fsspec.callback.Callback
Parent callback
path_1: os.PathLike
Left path
path_2: os.PathLike
Right path
kwargs: Dict[str, Any] (optional)
Update the ``callback`` key on the given ``kwargs``
if there is a brancher attached to the ``callback``.
Returns
-------
fsspec.callback.Callback or None
"""
from .implementations.local import make_path_posix

if callback.properties.get("stringify_paths"):
path_1 = stringify_path(path_1)
path_2 = stringify_path(path_2)

if callback.properties.get("posixify_paths"):
path_1 = make_path_posix(path_1)
path_2 = make_path_posix(path_2)

branched = callback.call("branch", path_1, path_2)
if branched is None or branched is _DEFAULT_CALLBACK:
return None

if kwargs is not None:
kwargs["callback"] = branched
return branched
9 changes: 8 additions & 1 deletion fsspec/implementations/cached.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from shutil import move, rmtree

from fsspec import AbstractFileSystem, filesystem
from fsspec.callbacks import as_callback
from fsspec.compression import compr
from fsspec.core import BaseCache, MMapCache
from fsspec.spec import AbstractBufferedFile
Expand Down Expand Up @@ -543,6 +544,7 @@ def _make_local_details(self, path):
return fn

def cat(self, path, recursive=False, on_error="raise", **kwargs):
callback = as_callback(kwargs.pop("callback", None))
paths = self.expand_path(
path, recursive=recursive, maxdepth=kwargs.get("maxdepth", None)
)
Expand All @@ -561,7 +563,12 @@ def cat(self, path, recursive=False, on_error="raise", **kwargs):
if getpaths:
self.fs.get(getpaths, storepaths)
self.save_cache()
out = {path: open(fn, "rb").read() for path, fn in zip(paths, fns)}

out = {}
callback.lazy_call("set_size", len, paths)
for path, fn in zip(paths, fns):
out[path] = open(fn, "rb").read()
callback.call("relative_update", 1)
if isinstance(path, str) and len(paths) == 1 and recursive is False:
out = out[paths[0]]
return out
Expand Down
4 changes: 2 additions & 2 deletions fsspec/implementations/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ def cp_file(self, path1, path2, **kwargs):
else:
raise FileNotFoundError

def get_file(self, path1, path2, **kwargs):
def get_file(self, path1, path2, callback=None, **kwargs):
return self.cp_file(path1, path2, **kwargs)

def put_file(self, path1, path2, **kwargs):
def put_file(self, path1, path2, callback=None, **kwargs):
return self.cp_file(path1, path2, **kwargs)

def mv_file(self, path1, path2, **kwargs):
Expand Down
4 changes: 4 additions & 0 deletions fsspec/implementations/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import json

from ..asyn import AsyncFileSystem, sync
from ..callbacks import as_callback
from ..core import filesystem, open
from ..mapping import get_mapper
from ..spec import AbstractFileSystem
Expand Down Expand Up @@ -199,9 +200,12 @@ async def _get_file(self, rpath, lpath, **kwargs):
f.write(data)

def get_file(self, rpath, lpath, **kwargs):
callback = as_callback(kwargs.pop("callback", None))
data = self.cat_file(rpath, **kwargs)
callback.lazy_call("set_size", len, data)
with open(lpath, "wb") as f:
f.write(data)
callback.lazy_call("absolute_update", len, data)

def get(self, rpath, lpath, recursive=False, **kwargs):
if self.fs.async_impl:
Expand Down
Loading

0 comments on commit 47f72bc

Please sign in to comment.