From 2de7c7be40a2cedfebed3becb73efc5170682f97 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 14 Aug 2024 14:04:42 -0400 Subject: [PATCH 1/4] For nans coming out of parquet references --- fsspec/implementations/reference.py | 5 ++++- fsspec/mapping.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/fsspec/implementations/reference.py b/fsspec/implementations/reference.py index c14ee8302..cf88304ee 100644 --- a/fsspec/implementations/reference.py +++ b/fsspec/implementations/reference.py @@ -41,7 +41,7 @@ def _first(d): def _prot_in_references(path, references): ref = references.get(path) - if isinstance(ref, (list, tuple)): + if isinstance(ref, (list, tuple)) and isinstance(ref[0], str): return split_protocol(ref[0])[0] if ref[0] else ref[0] @@ -845,6 +845,9 @@ def cat(self, path, recursive=False, on_error="raise", **kwargs): # found and on_error is "raise" try: u, s, e = self._cat_common(p) + if not isinstance(u, (bytes, str)): + # nan/None from parquet + continue except FileNotFoundError as err: if on_error == "raise": raise diff --git a/fsspec/mapping.py b/fsspec/mapping.py index 8fb9b9efb..752eef352 100644 --- a/fsspec/mapping.py +++ b/fsspec/mapping.py @@ -112,7 +112,7 @@ def getitems(self, keys, on_error="raise"): for k, v in out.items() } return { - key: out[k2] + key: out[k2] if on_error == "raise" else out.get(k2, KeyError(k2)) for key, k2 in zip(keys, keys2) if on_error == "return" or not isinstance(out[k2], BaseException) } From dcf3cfcf8548f0be73a0b4d05f55d29c1e022e9b Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 4 Sep 2024 09:40:22 -0400 Subject: [PATCH 2/4] stop --- fsspec/implementations/reference.py | 7 +++++-- .../implementations/tests/test_reference.py | 21 +++++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/fsspec/implementations/reference.py b/fsspec/implementations/reference.py index cf88304ee..7c01881a5 100644 --- a/fsspec/implementations/reference.py +++ b/fsspec/implementations/reference.py @@ -158,8 +158,11 @@ def open_refs(field, record): """cached parquet file loader""" path = self.url.format(field=field, record=record) data = io.BytesIO(self.fs.cat_file(path)) - df = self.pd.read_parquet(data, engine="fastparquet") - refs = {c: df[c].values for c in df.columns} + try: + df = self.pd.read_parquet(data, engine="fastparquet") + refs = {c: df[c].values for c in df.columns} + except IOError: + refs = None return refs self.open_refs = open_refs diff --git a/fsspec/implementations/tests/test_reference.py b/fsspec/implementations/tests/test_reference.py index d980cd139..8c4aafa6a 100644 --- a/fsspec/implementations/tests/test_reference.py +++ b/fsspec/implementations/tests/test_reference.py @@ -783,3 +783,24 @@ def test_deep_parq(m): "instant/one/.zarray", "instant/one/0", ] + + +def test_parquet_no_data(m): + zarr = pytest.importorskip("zarr") + lz = fsspec.implementations.reference.LazyReferenceMapper.create( + "memory://out.parq", fs=m + ) + + g = zarr.open_group(lz, mode="w") + arr = g.create_dataset( + name="one", + dtype="int32", + shape=(10,), + chunks=(5,), + compression=None, + fill_value=1, + ) + lz.flush() + + breakpoint() + assert (arr[:] == 1).all() From fe5eb66f92f68705b9bccccf00197ca3f253a12f Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 23 Oct 2024 11:04:34 -0400 Subject: [PATCH 3/4] Test no-data case too --- fsspec/implementations/memory.py | 2 ++ .../implementations/tests/test_reference.py | 22 ++++++++++++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/fsspec/implementations/memory.py b/fsspec/implementations/memory.py index 93860af6a..d06db4c38 100644 --- a/fsspec/implementations/memory.py +++ b/fsspec/implementations/memory.py @@ -220,6 +220,8 @@ def cp_file(self, path1, path2, **kwargs): raise FileNotFoundError(path1) def cat_file(self, path, start=None, end=None, **kwargs): + if "refs.0.parq" in path: + breakpoint() logger.debug("cat: %s", path) path = self._strip_protocol(path) try: diff --git a/fsspec/implementations/tests/test_reference.py b/fsspec/implementations/tests/test_reference.py index 8c4aafa6a..09c55d41c 100644 --- a/fsspec/implementations/tests/test_reference.py +++ b/fsspec/implementations/tests/test_reference.py @@ -802,5 +802,25 @@ def test_parquet_no_data(m): ) lz.flush() - breakpoint() assert (arr[:] == 1).all() + + +def test_parquet_no_references(m): + zarr = pytest.importorskip("zarr") + lz = fsspec.implementations.reference.LazyReferenceMapper.create( + "memory://out.parq", fs=m + ) + + g = zarr.open_group(lz, mode="w") + arr = g.create_dataset( + name="one", + dtype="int32", + shape=(), + chunks=(), + compression=None, + fill_value=1, + ) + lz.flush() + arr[...] + + assert arr[...].tolist() == 1 # scalar, equal to fill value From 58bae72550f0bac31217f171724f128d8dc50b98 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 23 Oct 2024 13:43:22 -0400 Subject: [PATCH 4/4] remove debug --- fsspec/implementations/memory.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/fsspec/implementations/memory.py b/fsspec/implementations/memory.py index b65939148..3b6f0d204 100644 --- a/fsspec/implementations/memory.py +++ b/fsspec/implementations/memory.py @@ -220,8 +220,6 @@ def cp_file(self, path1, path2, **kwargs): raise FileNotFoundError(path1) def cat_file(self, path, start=None, end=None, **kwargs): - if "refs.0.parq" in path: - breakpoint() logger.debug("cat: %s", path) path = self._strip_protocol(path) try: