Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

callbacks: Implement fsspec.callbacks #675

Merged
merged 21 commits into from
Jul 2, 2021
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ Base Classes
fsspec.dircache.DirCache
fsspec.registry.ReadOnlyRegistry
fsspec.registry.register_implementation
fsspec.callbacks.Callback
fsspec.callbacks.callback
fsspec.callbacks.as_callback
fsspec.callbacks.branch

.. autoclass:: fsspec.spec.AbstractFileSystem
:members:
Expand Down Expand Up @@ -77,6 +81,15 @@ Base Classes

.. autofunction:: fsspec.registry.register_implementation

.. autoclass:: fsspec.callbacks.Callback
:members:

.. autofunction:: fsspec.callbacks.callback

.. autofunction:: fsspec.callbacks.as_callback

.. autofunction:: fsspec.callbacks.branch

.. _implementations:

Built-in Implementations
Expand Down
2 changes: 2 additions & 0 deletions fsspec/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from . import caching
from ._version import get_versions
from .callbacks import callback
from .core import get_fs_token_paths, open, open_files, open_local
from .exceptions import FSTimeoutError
from .mapping import FSMap, get_mapper
Expand Down Expand Up @@ -38,6 +39,7 @@
"open_local",
"registry",
"caching",
"callback",
]

if entry_points is not None:
Expand Down
40 changes: 25 additions & 15 deletions fsspec/asyn.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from contextlib import contextmanager
from glob import has_magic

from .callbacks import as_callback, branch
from .exceptions import FSTimeoutError
from .spec import AbstractFileSystem
from .utils import PY36, is_exception, other_paths
Expand Down Expand Up @@ -184,26 +185,29 @@ def _get_batch_size():
return soft_limit // 8


async def _throttled_gather(coros, batch_size=None, **gather_kwargs):
async def _run_coros_in_chunks(coros, batch_size=None, callback=None, timeout=None):
"""Run the given coroutines in smaller chunks to
not crossing the file descriptor limit.

If batch_size parameter is -1, then it will not be any throttling. If
it is none, it will be inferred from the process resources (soft limit divided
by 8) and fallback to 128 if the system doesn't support it."""

callback = as_callback(callback)
if batch_size is None:
batch_size = _get_batch_size()

if batch_size == -1:
return await asyncio.gather(*coros, **gather_kwargs)
batch_size = len(coros)

assert batch_size > 0

results = []
for start in range(0, len(coros), batch_size):
chunk = coros[start : start + batch_size]
results.extend(await asyncio.gather(*chunk, **gather_kwargs))
for coro in asyncio.as_completed(chunk, timeout=timeout):
results.append(await coro)
callback.call("relative_update", 1)
return results


Expand Down Expand Up @@ -370,13 +374,16 @@ async def _put(self, lpath, rpath, recursive=False, **kwargs):
fs = LocalFileSystem()
lpaths = fs.expand_path(lpath, recursive=recursive)
rpaths = other_paths(lpaths, rpath)
callback = as_callback(kwargs.pop("callback", None))
batch_size = kwargs.pop("batch_size", self.batch_size)
return await _throttled_gather(
[
self._put_file(lpath, rpath, **kwargs)
for lpath, rpath in zip(lpaths, rpaths)
],
batch_size=batch_size,

coros = []
callback.lazy_call("set_size", len, lpaths)
for lpath, rpath in zip(lpaths, rpaths):
branch(callback, lpath, rpath, kwargs)
coros.append(self._put_file(lpath, rpath, **kwargs))
return await _run_coros_in_chunks(
coros, batch_size=batch_size, callback=callback
)

async def _get_file(self, rpath, lpath, **kwargs):
Expand Down Expand Up @@ -404,13 +411,16 @@ async def _get(self, rpath, lpath, recursive=False, **kwargs):
rpaths = await self._expand_path(rpath, recursive=recursive)
lpaths = other_paths(rpaths, lpath)
[os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths]
callback = as_callback(kwargs.pop("callback", None))
batch_size = kwargs.pop("batch_size", self.batch_size)
return await _throttled_gather(
[
self._get_file(rpath, lpath, **kwargs)
for lpath, rpath in zip(lpaths, rpaths)
],
batch_size=batch_size,

coros = []
callback.lazy_call("set_size", len, lpaths)
for lpath, rpath in zip(lpaths, rpaths):
branch(callback, rpath, lpath, kwargs)
coros.append(self._get_file(rpath, lpath, **kwargs))
return await _run_coros_in_chunks(
coros, batch_size=batch_size, callback=callback
)

async def _isfile(self, path):
Expand Down
199 changes: 199 additions & 0 deletions fsspec/callbacks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
from .utils import stringify_path


class Callback:
__slots__ = ["properties", "hooks"]

def __init__(self, properties=None, **hooks):
self.hooks = hooks
self.properties = properties or {}

def call(self, hook, *args, **kwargs):
"""Make a callback to a hook named ``hook``. If it can't
find the hook, then this function will return None. Otherwise
the return value of the hook will be used.

Parameters
----------
hook: str
The name of the hook
*args: Any
All the positional arguments that will be passed
to the ``hook``, if found.
**kwargs: Any
All the keyword arguments that will be passed
tot the ``hook``, if found.
"""
callback = self.hooks.get(hook)
if callback is not None:
return callback(*args, **kwargs)

def lazy_call(self, hook, func, *args, **kwargs):
"""Make a callback to a hook named ``hook`` with a
single value that will be lazily obtained from a call
to the ``func`` with ``*args`` and ``**kwargs``.

This method should be used for expensive operations,
e.g ``len(data)`` since if there is no hook for the
given ``hook`` parameter, that operation will be wasted.
With this method, it will only evaluate function, if there
is a hook attached to the given ``hook`` parameter.

Parameters
----------
hook: str
The name of the hook
func: Callable[..., Any]
Function that will be called and passed as the argument
to the hook, if found.
*args: Any
All the positional arguments that will be passed
to the ``func``, if ``hook`` is found.
**kwargs: Any
All the keyword arguments that will be passed
tot the ``func``, if ``hook` is found.
"""
callback = self.hooks.get(hook)
if callback is not None:
return callback(func(*args, **kwargs))

def wrap(self, iterable):
"""Wrap an iterable to send ``relative_update`` hook
on each iterations.

Parameters
----------
iterable: Iterable
The iterable that is being wrapped
"""
for item in iterable:
self.call("relative_update", 1)
yield item


class NoOpCallback(Callback):
def call(self, hook, *args, **kwargs):
return None

def lazy_call(self, hook, *args, **kwargs):
return None


_DEFAULT_CALLBACK = NoOpCallback()


def callback(
*,
set_size=None,
relative_update=None,
absolute_update=None,
branch=None,
properties=None,
**hooks,
):
"""Create a new callback for filesystem APIs.

Parameters
----------
set_size: Callable[[Optional[int]], None] (optional)
When transferring something quantifiable (e.g bytes in a file, or
number of files), this hook will be called with the total number of
items. Might set something to None, in that case it should be ignored.

relative_update: Callable[[int], None] (optional)
Update the total transferred items relative to the previous position.
If the current cursor is at N, and a relative_update(Q) happens then
the current cursor should now point at the N+Q.

absolute_update: Callable[[int], None] (optional)
Update the total transferred items to an absolute position. If
the current cursor is at N, and a absolute_update(Q) happens then
the current cursor should now point at the Q. If another one happens
it will override the current value.

branch: Callable[[os.PathLike, os.PathLike], Optional[fsspec.callbacks.Callback]] (optional)
When some operations need branching (e.g each ``put()``/``get()`
operation have their own callbacks, but they will also need to
branch out for ``put_file()``/``get_file()`` since those might
require additional child callbacks) the branch hook will be called
with the paths that are being transffered and it is expected to
either return a new fsspec.callbacks.Callback instance or None. If
``stringify_paths`` property is set, the paths will be casted to
string, and if ``posixify_paths`` property is set both arguments
will be sanitized to the posix convention.

properties: Dict[str, Any] (optional)
A mapping of config option (callback related) to their values.

hooks: Callable[..., Any]
Optional hooks that are not generally available.

Returns
-------
fsspec.callback.Callback
""" # noqa: E501

return Callback(
properties=properties,
set_size=set_size,
relative_update=relative_update,
absolute_update=absolute_update,
branch=branch,
**hooks,
)


def as_callback(maybe_callback):
"""Return the no-op callback if the maybe_callback parameter is None

Parameters
----------
maybe_callback: fsspec.callback.Callback or None

Returns
-------
fsspec.callback.Callback
"""
if maybe_callback is None:
return _DEFAULT_CALLBACK
else:
return maybe_callback


def branch(callback, path_1, path_2, kwargs=None):
"""Branch out from an existing callback.

Parameters
----------
callback: fsspec.callback.Callback
Parent callback
path_1: os.PathLike
Left path
path_2: os.PathLike
Right path
kwargs: Dict[str, Any] (optional)
Update the ``callback`` key on the given ``kwargs``
if there is a brancher attached to the ``callback``.


Returns
-------
fsspec.callback.Callback or None
"""
from .implementations.local import make_path_posix

if callback.properties.get("stringify_paths"):
path_1 = stringify_path(path_1)
path_2 = stringify_path(path_2)

if callback.properties.get("posixify_paths"):
path_1 = make_path_posix(path_1)
path_2 = make_path_posix(path_2)

branched = callback.call("branch", path_1, path_2)
if branched is None or branched is _DEFAULT_CALLBACK:
return None

if kwargs is not None:
kwargs["callback"] = branched
return branched
9 changes: 8 additions & 1 deletion fsspec/implementations/cached.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from shutil import move, rmtree

from fsspec import AbstractFileSystem, filesystem
from fsspec.callbacks import as_callback
from fsspec.compression import compr
from fsspec.core import BaseCache, MMapCache
from fsspec.spec import AbstractBufferedFile
Expand Down Expand Up @@ -543,6 +544,7 @@ def _make_local_details(self, path):
return fn

def cat(self, path, recursive=False, on_error="raise", **kwargs):
callback = as_callback(kwargs.pop("callback", None))
paths = self.expand_path(
path, recursive=recursive, maxdepth=kwargs.get("maxdepth", None)
)
Expand All @@ -561,7 +563,12 @@ def cat(self, path, recursive=False, on_error="raise", **kwargs):
if getpaths:
self.fs.get(getpaths, storepaths)
self.save_cache()
out = {path: open(fn, "rb").read() for path, fn in zip(paths, fns)}

out = {}
callback.lazy_call("set_size", len, paths)
for path, fn in zip(paths, fns):
out[path] = open(fn, "rb").read()
callback.call("relative_update", 1)
if isinstance(path, str) and len(paths) == 1 and recursive is False:
out = out[paths[0]]
return out
Expand Down
3 changes: 3 additions & 0 deletions fsspec/implementations/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import tempfile

from fsspec import AbstractFileSystem
from fsspec.callbacks import as_callback
from fsspec.compression import compr
from fsspec.core import get_compression
from fsspec.utils import stringify_path
Expand Down Expand Up @@ -111,9 +112,11 @@ def cp_file(self, path1, path2, **kwargs):
raise FileNotFoundError

def get_file(self, path1, path2, **kwargs):
callback = as_callback(kwargs.pop("callback", None)) # noqa: F841
isidentical marked this conversation as resolved.
Show resolved Hide resolved
return self.cp_file(path1, path2, **kwargs)

def put_file(self, path1, path2, **kwargs):
callback = as_callback(kwargs.pop("callback", None)) # noqa: F841
return self.cp_file(path1, path2, **kwargs)

def mv_file(self, path1, path2, **kwargs):
Expand Down
Loading