Skip to content

Commit

Permalink
Reference improvements (#1063)
Browse files Browse the repository at this point in the history
* Always populate reference fss keys

* consolidate reference requests

* Implemented

* Allow start and stop in reference cat_file

* Surface pars and add docs
  • Loading branch information
martindurant committed Oct 19, 2022
1 parent b5428c2 commit 3a2221a
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 73 deletions.
13 changes: 11 additions & 2 deletions fsspec/asyn.py
Expand Up @@ -439,7 +439,14 @@ async def _cat(
return out[0]

async def _cat_ranges(
self, paths, starts, ends, max_gap=None, batch_size=None, **kwargs
self,
paths,
starts,
ends,
max_gap=None,
batch_size=None,
on_error="return",
**kwargs,
):
# TODO: on_error
if max_gap is not None:
Expand All @@ -458,7 +465,9 @@ async def _cat_ranges(
for p, s, e in zip(paths, starts, ends)
]
batch_size = batch_size or self.batch_size
return await _run_coros_in_chunks(coros, batch_size=batch_size, nofiles=True)
return await _run_coros_in_chunks(
coros, batch_size=batch_size, nofiles=True, return_exceptions=True
)

async def _put_file(self, lpath, rpath, **kwargs):
raise NotImplementedError
Expand Down
178 changes: 111 additions & 67 deletions fsspec/implementations/reference.py
Expand Up @@ -16,7 +16,7 @@
from ..callbacks import _DEFAULT_CALLBACK
from ..core import filesystem, open, split_protocol
from ..spec import AbstractFileSystem
from ..utils import isfilelike
from ..utils import isfilelike, merge_offset_ranges

logger = logging.getLogger("fsspec.reference")

Expand Down Expand Up @@ -76,6 +76,8 @@ def __init__(
fs=None,
template_overrides=None,
simple_templates=True,
max_gap=64_000,
max_block=256_000_000,
loop=None,
**kwargs,
):
Expand All @@ -85,18 +87,19 @@ def __init__(
----------
fo : dict or str
The set of references to use for this instance, with a structure as above.
If str, will use fsspec.open, in conjunction with ref_storage_args to
open and parse JSON at this location.
If str, will use fsspec.open, in conjunction with target_options
and target_protocol to open and parse JSON at this location.
target : str
For any references having target_url as None, this is the default file
target to use
ref_storage_args : dict
If references is a str, use these kwargs for loading the JSON file
If references is a str, use these kwargs for loading the JSON file.
Deprecated: use target_options instead.
target_protocol : str
Used for loading the reference file, if it is a path. If None, protocol
will be derived from the given path
target_options : dict
Extra FS options for loading the reference file, if given as a path
Extra FS options for loading the reference file ``fo``, if given as a path
remote_protocol : str
The protocol of the filesystem on which the references will be evaluated
(unless fs is provided). If not given, will be derived from the first
Expand All @@ -119,6 +122,14 @@ def __init__(
Whether templates can be processed with simple replace (True) or if
jinja is needed (False, much slower). All reference sets produced by
``kerchunk`` are simple in this sense, but the spec allows for complex.
max_gap, max_block: int
For merging multiple concurrent requests to the same remote file.
Neighboring byte ranges will only be merged when their
inter-range gap is <= `max_gap`. Default is 64KB. Set to 0
to only merge when it requires no extra bytes. Pass a negative
number to disable merging, appropriate for local target files.
Neighboring byte ranges will only be merged when the size of
the aggregated range is <= `max_block`. Default is 256MB.
kwargs : passed to parent class
"""
super().__init__(loop=loop, **kwargs)
Expand All @@ -128,6 +139,8 @@ def __init__(
self.simple_templates = simple_templates
self.templates = {}
self.fss = {}
self.max_gap = max_gap
self.max_block = max_block
if hasattr(fo, "read"):
text = fo.read()
elif isinstance(fo, str):
Expand Down Expand Up @@ -156,45 +169,48 @@ def __init__(
)
for k, opts in fs.items()
}
if None not in self.fss:
self.fss[None] = filesystem("file")
return
if fs is not None:
# single remote FS
remote_protocol = (
fs.protocol[0] if isinstance(fs.protocol, tuple) else fs.protocol
)
self.fss[remote_protocol] = fs

if remote_protocol is None:
# get single protocol from any templates
for ref in self.templates.values():
if callable(ref):
ref = ref()
protocol, _ = fsspec.core.split_protocol(ref)
if protocol:
remote_protocol = protocol
break
if protocol and protocol not in self.fss:
fs = filesystem(protocol, loop=loop, **(remote_options or {}))
self.fss[protocol] = fs
if remote_protocol is None:
# get single protocol from references
for ref in self.references.values():
if callable(ref):
ref = ref()
if isinstance(ref, list) and ref[0]:
protocol, _ = fsspec.core.split_protocol(ref[0])
if protocol:
remote_protocol = protocol
break
if remote_protocol is None:
remote_protocol = target_protocol
if protocol and protocol not in self.fss:
fs = filesystem(protocol, loop=loop, **(remote_options or {}))
self.fss[protocol] = fs

if remote_protocol and remote_protocol not in self.fss:
fs = filesystem(remote_protocol, loop=loop, **(remote_options or {}))
self.fss[remote_protocol] = fs

fs = fs or filesystem(remote_protocol, loop=loop, **(remote_options or {}))
self.fss[remote_protocol] = fs
self.fss[None] = fs # default one
self.fss[None] = fs or filesystem("file") # default one

@property
def loop(self):
inloop = [fs.loop for fs in self.fss.values() if fs.async_impl]
return inloop[0] if inloop else self._loop

def _cat_common(self, path):
def _cat_common(self, path, start=None, end=None):
path = self._strip_protocol(path)
logger.debug(f"cat: {path}")
part = self.references[path]
Expand All @@ -209,35 +225,44 @@ def _cat_common(self, path):
if len(part) == 1:
logger.debug(f"Reference: {path}, whole file")
url = part[0]
start = None
end = None
start1, end1 = start, end
else:
url, start, size = part
logger.debug(f"Reference: {path}, offset {start}, size {size}")
end = start + size
url, start0, size = part
logger.debug(f"Reference: {path} => {url}, offset {start0}, size {size}")
end0 = start0 + size

if start is not None:
if start >= 0:
start1 = start0 + start
else:
start1 = end0 + start
else:
start1 = start0
if end is not None:
if end >= 0:
end1 = start0 + end
else:
end1 = end0 + end
else:
end1 = end0
if url is None:
url = self.target
return url, start, end
return url, start1, end1

async def _cat_file(self, path, start=None, end=None, **kwargs):
part_or_url, start0, end0 = self._cat_common(path)
part_or_url, start0, end0 = self._cat_common(path, start=start, end=end)
if isinstance(part_or_url, bytes):
return part_or_url[start:end]
protocol, _ = split_protocol(part_or_url)
# TODO: start and end should be passed to cat_file, not sliced
return (
await self.fss[protocol]._cat_file(part_or_url, start=start0, end=end0)
)[start:end]
return await self.fss[protocol]._cat_file(part_or_url, start=start, end=end)

def cat_file(self, path, start=None, end=None, **kwargs):
part_or_url, start0, end0 = self._cat_common(path)
part_or_url, start0, end0 = self._cat_common(path, start=start, end=end)
if isinstance(part_or_url, bytes):
return part_or_url[start:end]
protocol, _ = split_protocol(part_or_url)
# TODO: start and end should be passed to cat_file, not sliced
return self.fss[protocol].cat_file(part_or_url, start=start0, end=end0)[
start:end
]
return self.fss[protocol].cat_file(part_or_url, start=start0, end=end0)

def pipe_file(self, path, value, **_):
"""Temporarily add binary data or reference as a file"""
Expand Down Expand Up @@ -277,45 +302,64 @@ def get(self, rpath, lpath, recursive=False, **kwargs):
)

def cat(self, path, recursive=False, on_error="raise", **kwargs):
if isinstance(path, str) and recursive:
raise NotImplementedError
if isinstance(path, list) and (recursive or any("*" in p for p in path)):
raise NotImplementedError
proto_dict = _protocol_groups(path, self.references)
out = {}
for proto, paths in proto_dict.items():
if proto is None:
# binary/string
for p in paths:
try:
out[p] = AbstractFileSystem.cat_file(self, p, **kwargs)
except Exception as e:
if on_error == "raise":
raise
if on_error == "return":
out[p] = e

elif self.fss[proto].async_impl:
# TODO: asyncio.gather on multiple async FSs
out.update(
sync(
self.loop,
self._cat,
paths,
recursive,
on_error=on_error,
**kwargs,
)
)
elif isinstance(paths, list):
if recursive or any("*" in p for p in paths):
raise NotImplementedError
for p in paths:
try:
out[p] = AbstractFileSystem.cat_file(self, p, **kwargs)
except Exception as e:
if on_error == "raise":
raise
if on_error == "return":
out[p] = e
fs = self.fss[proto]
urls, starts, ends = zip(*[self._cat_common(p) for p in paths])
urls2 = []
starts2 = []
ends2 = []
paths2 = []
whole_files = set()
for u, s, e, p in zip(urls, starts, ends, paths):
if isinstance(u, bytes):
# data
out[p] = u
elif s is None:
# whole file - limits are None, None, but no further
# entries take for this file
whole_files.add(u)
urls2.append(u)
starts2.append(s)
ends2.append(e)
paths2.append(p)
for u, s, e, p in zip(urls, starts, ends, paths):
if s is not None and u not in whole_files:
urls2.append(u)
starts2.append(s)
ends2.append(e)
paths2.append(p)
new_paths, new_starts, new_ends = merge_offset_ranges(
list(urls2),
list(starts2),
list(ends2),
sort=False,
max_gap=self.max_gap,
max_block=self.max_block,
)
bytes_out = fs.cat_ranges(new_paths, new_starts, new_ends)
if len(urls2) == len(bytes_out):
# we didn't do any merging
for p, b in zip(paths2, bytes_out):
out[p] = b
else:
out.update(AbstractFileSystem.cat_file(self, paths))
# unbundle from merged bytes - simple approach
for u, s, e, p in zip(urls, starts, ends, paths):
if p in out:
continue # was bytes, already handled
for np, ns, ne, b in zip(
new_paths, new_starts, new_ends, bytes_out
):
if np == u and (ns is None or ne is None):
out[p] = b[s:e]
elif np == u and s >= ns and e <= ne:
out[p] = b[s - ns : (e - ne) or None]

if len(out) == 1 and isinstance(path, str) and "*" not in path:
return _first(out)
return out
Expand Down
69 changes: 68 additions & 1 deletion fsspec/implementations/tests/test_reference.py
Expand Up @@ -78,7 +78,11 @@ def test_mutable(server, m):
def test_defaults(server): # noqa: F811
refs = {"a": b"data", "b": (None, 0, 5)}
fs = fsspec.filesystem(
"reference", fo=refs, target_protocol="http", target=realfile
"reference",
fo=refs,
target_protocol="http",
target=realfile,
remote_protocol="http",
)

assert fs.cat("a") == b"data"
Expand Down Expand Up @@ -337,3 +341,66 @@ def test_missing_nonasync(m):

a = zarr.open_array(m)
assert str(a[0]) == "nan"


def test_fss_has_defaults(m):
fs = fsspec.filesystem("reference", fo={})
assert None in fs.fss

fs = fsspec.filesystem("reference", fo={}, remote_protocol="memory")
assert fs.fss[None].protocol == "memory"
assert fs.fss["memory"].protocol == "memory"

fs = fsspec.filesystem("reference", fs=m, fo={})
assert fs.fss[None] is m

fs = fsspec.filesystem("reference", fs={"memory": m}, fo={})
assert fs.fss["memory"] is m
assert fs.fss[None].protocol == "file"

fs = fsspec.filesystem("reference", fs={None: m}, fo={})
assert fs.fss[None] is m

fs = fsspec.filesystem("reference", fo={"key": ["memory://a"]})
assert fs.fss[None] is fs.fss["memory"]

fs = fsspec.filesystem("reference", fo={"key": ["memory://a"], "blah": ["path"]})
assert fs.fss[None] is fs.fss["memory"]


def test_merging(m):
m.pipe("/a", b"test data")
other = b"other test data"
m.pipe("/b", other)
fs = fsspec.filesystem(
"reference",
fo={
"a": ["memory://a", 1, 1],
"b": ["memory://a", 2, 1],
"c": ["memory://b"],
"d": ["memory://b", 4, 6],
},
)
out = fs.cat(["a", "b", "c", "d"])
assert out == {"a": b"e", "b": b"s", "c": other, "d": other[4:10]}


def test_cat_file_ranges(m):
other = b"other test data"
m.pipe("/b", other)
fs = fsspec.filesystem(
"reference",
fo={
"c": ["memory://b"],
"d": ["memory://b", 4, 6],
},
)
assert fs.cat_file("c") == other
assert fs.cat_file("c", start=1) == other[1:]
assert fs.cat_file("c", start=-5) == other[-5:]
assert fs.cat_file("c", 1, -5) == other[1:-5]

assert fs.cat_file("d") == other[4:10]
assert fs.cat_file("d", start=1) == other[4:10][1:]
assert fs.cat_file("d", start=-5) == other[4:10][-5:]
assert fs.cat_file("d", 1, -3) == other[4:10][1:-3]

0 comments on commit 3a2221a

Please sign in to comment.