From 3fefc671cbdd69cbf48a6b6dd118ce7ac9cfc0d1 Mon Sep 17 00:00:00 2001 From: vincentsarago Date: Thu, 31 Oct 2024 10:48:06 +0100 Subject: [PATCH 1/6] use xarray.open_zarr and make aiohttp and s3fs optional --- .github/workflows/ci.yml | 2 +- src/titiler/xarray/pyproject.toml | 20 +++--- .../xarray/titiler/xarray/dependencies.py | 16 ----- src/titiler/xarray/titiler/xarray/io.py | 62 ++++++++----------- 4 files changed, 40 insertions(+), 60 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bcd4456e2..528e56c99 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -51,7 +51,7 @@ jobs: - name: Test titiler.xarray run: | - python -m pip install -e src/titiler/xarray["test"] + python -m pip install -e src/titiler/xarray["test,all"] python -m pytest src/titiler/xarray --cov=titiler.xarray --cov-report=xml --cov-append --cov-report=term-missing - name: Test titiler.mosaic diff --git a/src/titiler/xarray/pyproject.toml b/src/titiler/xarray/pyproject.toml index 78747a86d..123740604 100644 --- a/src/titiler/xarray/pyproject.toml +++ b/src/titiler/xarray/pyproject.toml @@ -30,19 +30,25 @@ classifiers = [ dynamic = ["version"] dependencies = [ "titiler.core==0.19.0.dev", - "cftime", - "h5netcdf", "xarray", "rioxarray", - "zarr", "fsspec", - "s3fs", - "aiohttp", - "pandas", - "httpx", + "zarr", + "h5netcdf", + "cftime", ] [project.optional-dependencies] +s3 = [ + "s3fs", +] +http = [ + "aiohttp", +] +all = [ + "s3fs", + "aiohttp", +] test = [ "pytest", "pytest-cov", diff --git a/src/titiler/xarray/titiler/xarray/dependencies.py b/src/titiler/xarray/titiler/xarray/dependencies.py index 5c13d9e7b..0b4b87cc0 100644 --- a/src/titiler/xarray/titiler/xarray/dependencies.py +++ b/src/titiler/xarray/titiler/xarray/dependencies.py @@ -22,14 +22,6 @@ class XarrayIOParams(DefaultDependency): ), ] = None - reference: Annotated[ - Optional[bool], - Query( - title="reference", - description="Whether the dataset is a kerchunk reference", - ), - ] = None - decode_times: Annotated[ Optional[bool], Query( @@ -38,14 +30,6 @@ class XarrayIOParams(DefaultDependency): ), ] = None - consolidated: Annotated[ - Optional[bool], - Query( - title="consolidated", - description="Whether to expect and open zarr store with consolidated metadata", - ), - ] = None - # cache_client diff --git a/src/titiler/xarray/titiler/xarray/io.py b/src/titiler/xarray/titiler/xarray/io.py index 2c575a7ce..7c4513d09 100644 --- a/src/titiler/xarray/titiler/xarray/io.py +++ b/src/titiler/xarray/titiler/xarray/io.py @@ -1,18 +1,27 @@ """titiler.xarray.io""" import pickle -import re from typing import Any, Callable, Dict, List, Optional, Protocol +from urllib.parse import urlparse import attr import fsspec import numpy -import s3fs import xarray from morecantile import TileMatrixSet from rio_tiler.constants import WEB_MERCATOR_TMS from rio_tiler.io.xarray import XarrayReader +try: + import s3fs +except ImportError: # pragma: nocover + s3fs = None # type: ignore + +try: + import aiohttp +except ImportError: # pragma: nocover + aiohttp = None # type: ignore + class CacheClient(Protocol): """CacheClient Protocol.""" @@ -26,18 +35,10 @@ def set(self, key: str, body: bytes) -> None: ... -def parse_protocol(src_path: str, reference: Optional[bool] = False) -> str: +def parse_protocol(src_path: str) -> str: """Parse protocol from path.""" - match = re.match(r"^(s3|https|http)", src_path) - protocol = "file" - if match: - protocol = match.group(0) - - # override protocol if reference - if reference: - protocol = "reference" - - return protocol + parsed = urlparse(src_path) + return parsed.scheme or "file" def xarray_engine(src_path: str) -> str: @@ -59,6 +60,8 @@ def get_filesystem( Get the filesystem for the given source path. """ if protocol == "s3": + assert s3fs is not None, "s3fs must be installed to support S3:// url" + s3_filesystem = s3fs.S3FileSystem() return ( s3_filesystem.open(src_path) @@ -66,11 +69,12 @@ def get_filesystem( else s3fs.S3Map(root=src_path, s3=s3_filesystem) ) - elif protocol == "reference": - reference_args = {"fo": src_path, "remote_options": {"anon": anon}} - return fsspec.filesystem("reference", **reference_args).get_mapper("") - elif protocol in ["https", "http", "file"]: + if protocol.startswith("http"): + assert ( + aiohttp is not None + ), "aiohttp must be installed to support HTTP:// url" + filesystem = fsspec.filesystem(protocol) # type: ignore return ( filesystem.open(src_path) @@ -85,9 +89,7 @@ def get_filesystem( def xarray_open_dataset( src_path: str, group: Optional[Any] = None, - reference: Optional[bool] = False, decode_times: Optional[bool] = True, - consolidated: Optional[bool] = True, cache_client: Optional[CacheClient] = None, ) -> xarray.Dataset: """Open dataset.""" @@ -98,7 +100,7 @@ def xarray_open_dataset( if data_bytes: return pickle.loads(data_bytes) - protocol = parse_protocol(src_path, reference=reference) + protocol = parse_protocol(src_path) xr_engine = xarray_engine(src_path) file_handler = get_filesystem(src_path, protocol, xr_engine) @@ -117,17 +119,11 @@ def xarray_open_dataset( if xr_engine == "h5netcdf": xr_open_args["engine"] = "h5netcdf" xr_open_args["lock"] = False - else: - # Zarr arguments - xr_open_args["engine"] = "zarr" - xr_open_args["consolidated"] = consolidated + ds = xarray.open_dataset(file_handler, **xr_open_args) - # Additional arguments when dealing with a reference file. - if reference: - xr_open_args["consolidated"] = False - xr_open_args["backend_kwargs"] = {"consolidated": False} - - ds = xarray.open_dataset(file_handler, **xr_open_args) + # Fallback to Zarr + else: + ds = xarray.open_zarr(file_handler, **xr_open_args) if cache_client: # Serialize the dataset to bytes using pickle @@ -245,9 +241,7 @@ class Reader(XarrayReader): opener: Callable[..., xarray.Dataset] = attr.ib(default=xarray_open_dataset) group: Optional[Any] = attr.ib(default=None) - reference: bool = attr.ib(default=False) decode_times: bool = attr.ib(default=False) - consolidated: Optional[bool] = attr.ib(default=True) cache_client: Optional[CacheClient] = attr.ib(default=None) # xarray.DataArray options @@ -266,9 +260,7 @@ def __attrs_post_init__(self): self.ds = self.opener( self.src_path, group=self.group, - reference=self.reference, decode_times=self.decode_times, - consolidated=self.consolidated, cache_client=self.cache_client, ) @@ -300,7 +292,5 @@ def list_variables( with xarray_open_dataset( src_path, group=group, - reference=reference, - consolidated=consolidated, ) as ds: return list(ds.data_vars) # type: ignore From 943f5ebc01701f89d399a7bf52be7506af82a972 Mon Sep 17 00:00:00 2001 From: vincentsarago Date: Mon, 4 Nov 2024 15:47:54 +0100 Subject: [PATCH 2/6] add support for references --- .../tests/fixtures/generate_fixtures.ipynb | 24 +++++++++++++++++ .../xarray/tests/fixtures/reference.json | 1 + src/titiler/xarray/tests/test_io_tools.py | 2 +- src/titiler/xarray/titiler/xarray/io.py | 26 ++++++++++++++----- 4 files changed, 46 insertions(+), 7 deletions(-) create mode 100644 src/titiler/xarray/tests/fixtures/reference.json diff --git a/src/titiler/xarray/tests/fixtures/generate_fixtures.ipynb b/src/titiler/xarray/tests/fixtures/generate_fixtures.ipynb index 52c997147..2b2b733b2 100644 --- a/src/titiler/xarray/tests/fixtures/generate_fixtures.ipynb +++ b/src/titiler/xarray/tests/fixtures/generate_fixtures.ipynb @@ -138,6 +138,30 @@ " ds.to_zarr(store=f\"pyramid.zarr\", mode=\"w\", group=ix)" ] }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "import fsspec\n", + "from kerchunk.hdf import SingleHdf5ToZarr\n", + "\n", + "with fsspec.open(\"dataset_3d.nc\", mode=\"rb\", anon=True) as infile:\n", + " h5chunks = SingleHdf5ToZarr(infile, \"dataset_3d.nc\", inline_threshold=100)\n", + "\n", + " with open(\"reference.json\", 'w') as f:\n", + " f.write(json.dumps(h5chunks.translate()));\n" + ] + }, { "cell_type": "code", "execution_count": null, diff --git a/src/titiler/xarray/tests/fixtures/reference.json b/src/titiler/xarray/tests/fixtures/reference.json new file mode 100644 index 000000000..213579f38 --- /dev/null +++ b/src/titiler/xarray/tests/fixtures/reference.json @@ -0,0 +1 @@ +{"version": 1, "refs": {".zgroup": "{\"zarr_format\":2}", "dataset/.zarray": "{\"chunks\":[1,500,1000],\"compressor\":null,\"dtype\":\" str: # ".hdf", ".hdf5", ".h5" will be supported once we have tests + expand the type permitted for the group parameter if any(src_path.lower().endswith(ext) for ext in [".nc", ".nc4"]): return "h5netcdf" - else: - return "zarr" + return "zarr" def get_filesystem( @@ -68,8 +67,14 @@ def get_filesystem( if xr_engine == "h5netcdf" else s3fs.S3Map(root=src_path, s3=s3_filesystem) ) - - elif protocol in ["https", "http", "file"]: + elif protocol == "reference" or src_path.lower().endswith(".json"): + reference_args = { + "fo": src_path.replace("reference://", ""), + "remote_options": {"anon": anon}, + } + return fsspec.filesystem("reference", **reference_args).get_mapper("") + + elif protocol in ["https", "http", "file", "reference"]: if protocol.startswith("http"): assert ( aiohttp is not None @@ -121,6 +126,17 @@ def xarray_open_dataset( xr_open_args["lock"] = False ds = xarray.open_dataset(file_handler, **xr_open_args) + elif src_path.lower().endswith(".json"): + xr_open_args.update( + { + "engine": "zarr", + "consolidated": False, + "backend_kwargs": {"consolidated": False}, + } + ) + + ds = xarray.open_dataset(file_handler, **xr_open_args) + # Fallback to Zarr else: ds = xarray.open_zarr(file_handler, **xr_open_args) @@ -285,8 +301,6 @@ def list_variables( cls, src_path: str, group: Optional[Any] = None, - reference: Optional[bool] = False, - consolidated: Optional[bool] = True, ) -> List[str]: """List available variable in a dataset.""" with xarray_open_dataset( From 3d40b9986a07a0abb4cdc4a1d45f952e413b6c4f Mon Sep 17 00:00:00 2001 From: vincentsarago Date: Mon, 4 Nov 2024 16:01:22 +0100 Subject: [PATCH 3/6] tests prefixed protocol --- src/titiler/xarray/tests/test_io_tools.py | 17 +++++++++++++---- src/titiler/xarray/titiler/xarray/io.py | 2 +- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/titiler/xarray/tests/test_io_tools.py b/src/titiler/xarray/tests/test_io_tools.py index 6caf019ed..d4a885e6c 100644 --- a/src/titiler/xarray/tests/test_io_tools.py +++ b/src/titiler/xarray/tests/test_io_tools.py @@ -109,12 +109,21 @@ def test_get_variable(): @pytest.mark.parametrize( - "filename", - ["dataset_2d.nc", "dataset_3d.nc", "dataset_3d.zarr", "reference.json"], + "protocol,filename", + [ + ("file://", "dataset_2d.nc"), + ("file://", "dataset_3d.nc"), + ("file://", "dataset_3d.zarr"), + ("reference://", "reference.json"), + ("", "dataset_2d.nc"), + ("", "dataset_3d.nc"), + ("", "dataset_3d.zarr"), + ("", "reference.json"), + ], ) -def test_reader(filename): +def test_reader(protocol, filename): """test reader.""" - src_path = os.path.join(prefix, filename) + src_path = protocol + os.path.join(protocol, prefix, filename) assert Reader.list_variables(src_path) == ["dataset"] with Reader(src_path, variable="dataset") as src: diff --git a/src/titiler/xarray/titiler/xarray/io.py b/src/titiler/xarray/titiler/xarray/io.py index 2784dbf01..0029b5620 100644 --- a/src/titiler/xarray/titiler/xarray/io.py +++ b/src/titiler/xarray/titiler/xarray/io.py @@ -126,7 +126,7 @@ def xarray_open_dataset( xr_open_args["lock"] = False ds = xarray.open_dataset(file_handler, **xr_open_args) - elif src_path.lower().endswith(".json"): + elif protocol == "reference" or src_path.lower().endswith(".json"): xr_open_args.update( { "engine": "zarr", From 8dffefc68777fcc3f8dfc3e3be4b95e19e04bf4d Mon Sep 17 00:00:00 2001 From: vincentsarago Date: Mon, 4 Nov 2024 16:13:19 +0100 Subject: [PATCH 4/6] use tmp_dir for reference --- src/titiler/xarray/pyproject.toml | 1 + src/titiler/xarray/tests/test_io_tools.py | 29 +++++++++++++++++++++-- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/src/titiler/xarray/pyproject.toml b/src/titiler/xarray/pyproject.toml index 123740604..0db2d628b 100644 --- a/src/titiler/xarray/pyproject.toml +++ b/src/titiler/xarray/pyproject.toml @@ -54,6 +54,7 @@ test = [ "pytest-cov", "pytest-asyncio", "httpx", + "kerchunk", ] [project.urls] diff --git a/src/titiler/xarray/tests/test_io_tools.py b/src/titiler/xarray/tests/test_io_tools.py index d4a885e6c..9183be38d 100644 --- a/src/titiler/xarray/tests/test_io_tools.py +++ b/src/titiler/xarray/tests/test_io_tools.py @@ -1,11 +1,14 @@ """test titiler.xarray.io utility functions.""" +import json import os from datetime import datetime +import fsspec import numpy import pytest import xarray +from kerchunk.hdf import SingleHdf5ToZarr from titiler.xarray.io import Reader, get_variable @@ -114,11 +117,9 @@ def test_get_variable(): ("file://", "dataset_2d.nc"), ("file://", "dataset_3d.nc"), ("file://", "dataset_3d.zarr"), - ("reference://", "reference.json"), ("", "dataset_2d.nc"), ("", "dataset_3d.nc"), ("", "dataset_3d.zarr"), - ("", "reference.json"), ], ) def test_reader(protocol, filename): @@ -143,3 +144,27 @@ def test_zarr_group(group): assert src.info() assert src.tile(0, 0, 0) assert src.point(0, 0).data[0] == group * 2 + + +def test_kerchunk_reference(tmp_path): + """test Kerchunk reference.""" + d = tmp_path / "ref" + d.mkdir() + + netcdf = os.path.join(prefix, "dataset_3d.nc") + reference = os.path.join( + str(d), + "reference.json", + ) + + with fsspec.open(netcdf, mode="rb", anon=True) as infile: + h5chunks = SingleHdf5ToZarr(infile, netcdf, inline_threshold=100) + with open(reference, "w") as f: + f.write(json.dumps(h5chunks.translate())) + + for protocol in ["", "reference://"]: + src_path = protocol + reference + assert Reader.list_variables(src_path) == ["dataset"] + with Reader(src_path, variable="dataset") as src: + assert src.info() + assert src.tile(0, 0, 0) From ea4709c41219e58a6d73d7f1035b25ddb4a04503 Mon Sep 17 00:00:00 2001 From: vincentsarago Date: Mon, 4 Nov 2024 17:42:54 +0100 Subject: [PATCH 5/6] add parquet support --- src/titiler/xarray/pyproject.toml | 1 + src/titiler/xarray/tests/test_io_tools.py | 29 +++++++++++++++----- src/titiler/xarray/titiler/xarray/io.py | 33 ++++++++++++++++------- 3 files changed, 47 insertions(+), 16 deletions(-) diff --git a/src/titiler/xarray/pyproject.toml b/src/titiler/xarray/pyproject.toml index 0db2d628b..704dccea8 100644 --- a/src/titiler/xarray/pyproject.toml +++ b/src/titiler/xarray/pyproject.toml @@ -55,6 +55,7 @@ test = [ "pytest-asyncio", "httpx", "kerchunk", + "fastparquet", ] [project.urls] diff --git a/src/titiler/xarray/tests/test_io_tools.py b/src/titiler/xarray/tests/test_io_tools.py index 9183be38d..7f8e402d8 100644 --- a/src/titiler/xarray/tests/test_io_tools.py +++ b/src/titiler/xarray/tests/test_io_tools.py @@ -8,6 +8,7 @@ import numpy import pytest import xarray +from kerchunk.df import refs_to_dataframe from kerchunk.hdf import SingleHdf5ToZarr from titiler.xarray.io import Reader, get_variable @@ -152,19 +153,33 @@ def test_kerchunk_reference(tmp_path): d.mkdir() netcdf = os.path.join(prefix, "dataset_3d.nc") + + # JSON kerchunk reference = os.path.join( str(d), "reference.json", ) - with fsspec.open(netcdf, mode="rb", anon=True) as infile: h5chunks = SingleHdf5ToZarr(infile, netcdf, inline_threshold=100) with open(reference, "w") as f: + out_dict = h5chunks.translate() f.write(json.dumps(h5chunks.translate())) - for protocol in ["", "reference://"]: - src_path = protocol + reference - assert Reader.list_variables(src_path) == ["dataset"] - with Reader(src_path, variable="dataset") as src: - assert src.info() - assert src.tile(0, 0, 0) + src_path = "reference://" + reference + assert Reader.list_variables(src_path) == ["dataset"] + with Reader(src_path, variable="dataset") as src: + assert src.info() + assert src.tile(0, 0, 0) + + # kerchunk Parquet impl + reference_parquet = os.path.join( + str(d), + "reference.parq", + ) + + refs_to_dataframe(out_dict, reference_parquet) + src_path = "reference://" + reference_parquet + assert Reader.list_variables(src_path) == ["dataset"] + with Reader(src_path, variable="dataset") as src: + assert src.info() + assert src.tile(0, 0, 0) diff --git a/src/titiler/xarray/titiler/xarray/io.py b/src/titiler/xarray/titiler/xarray/io.py index 0029b5620..516bbcc8c 100644 --- a/src/titiler/xarray/titiler/xarray/io.py +++ b/src/titiler/xarray/titiler/xarray/io.py @@ -46,6 +46,7 @@ def xarray_engine(src_path: str) -> str: # ".hdf", ".hdf5", ".h5" will be supported once we have tests + expand the type permitted for the group parameter if any(src_path.lower().endswith(ext) for ext in [".nc", ".nc4"]): return "h5netcdf" + return "zarr" @@ -67,14 +68,15 @@ def get_filesystem( if xr_engine == "h5netcdf" else s3fs.S3Map(root=src_path, s3=s3_filesystem) ) - elif protocol == "reference" or src_path.lower().endswith(".json"): + + elif protocol == "reference": reference_args = { "fo": src_path.replace("reference://", ""), "remote_options": {"anon": anon}, } return fsspec.filesystem("reference", **reference_args).get_mapper("") - elif protocol in ["https", "http", "file", "reference"]: + elif protocol in ["https", "http", "file"]: if protocol.startswith("http"): assert ( aiohttp is not None @@ -120,13 +122,7 @@ def xarray_open_dataset( if group is not None: xr_open_args["group"] = group - # NetCDF arguments - if xr_engine == "h5netcdf": - xr_open_args["engine"] = "h5netcdf" - xr_open_args["lock"] = False - ds = xarray.open_dataset(file_handler, **xr_open_args) - - elif protocol == "reference" or src_path.lower().endswith(".json"): + if protocol == "reference": xr_open_args.update( { "engine": "zarr", @@ -137,8 +133,27 @@ def xarray_open_dataset( ds = xarray.open_dataset(file_handler, **xr_open_args) + # NetCDF arguments + elif xr_engine == "h5netcdf": + xr_open_args.update( + { + "engine": "h5netcdf", + "lock": False, + } + ) + + ds = xarray.open_dataset(file_handler, **xr_open_args) + # Fallback to Zarr else: + if protocol == "reference": + xr_open_args.update( + { + "consolidated": False, + "backend_kwargs": {"consolidated": False}, + } + ) + ds = xarray.open_zarr(file_handler, **xr_open_args) if cache_client: From b9b7f30353c34163cd7ae8daf5149b598e49dd68 Mon Sep 17 00:00:00 2001 From: vincentsarago Date: Mon, 4 Nov 2024 21:16:10 +0100 Subject: [PATCH 6/6] remove kerchunk support --- src/titiler/xarray/pyproject.toml | 2 - .../xarray/tests/fixtures/reference.json | 1 - src/titiler/xarray/tests/test_io_tools.py | 42 ------------------- src/titiler/xarray/titiler/xarray/io.py | 20 +-------- 4 files changed, 1 insertion(+), 64 deletions(-) delete mode 100644 src/titiler/xarray/tests/fixtures/reference.json diff --git a/src/titiler/xarray/pyproject.toml b/src/titiler/xarray/pyproject.toml index 704dccea8..123740604 100644 --- a/src/titiler/xarray/pyproject.toml +++ b/src/titiler/xarray/pyproject.toml @@ -54,8 +54,6 @@ test = [ "pytest-cov", "pytest-asyncio", "httpx", - "kerchunk", - "fastparquet", ] [project.urls] diff --git a/src/titiler/xarray/tests/fixtures/reference.json b/src/titiler/xarray/tests/fixtures/reference.json deleted file mode 100644 index 213579f38..000000000 --- a/src/titiler/xarray/tests/fixtures/reference.json +++ /dev/null @@ -1 +0,0 @@ -{"version": 1, "refs": {".zgroup": "{\"zarr_format\":2}", "dataset/.zarray": "{\"chunks\":[1,500,1000],\"compressor\":null,\"dtype\":\"