Skip to content

Commit

Permalink
Adding reader_options kwargs to open_virtual_dataset. (#67)
Browse files Browse the repository at this point in the history
* adding reader_options kwargs to open_virtual_dataset

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix typing

* modifies _automatically_determine_filetype to open file with fsspec to allows for reading of cloud storage

* using UPath to get file protocol and open with fsspec

* tests passing locally. Reading over s3/local w+w/o indexes & guessing filetypes

* add s3fs to test

* typing school 101

* anon

* tying

* test_anon update

* anon failing

* double down on storage_options

* fsspec nit

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* seting s3 defaults as empty to try to appease the cruel boto3 gods

* added fpath to SingleHDF5ToZarr

* hardcode in empty storage opts for s3

* hardcode default + unpack test

* changed reader_options defaults

* updated docs install

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* changed docstring type in utils to numpy style

* added TYPE_CHECKING for fsspec and s3fs mypy type hints

* fixed TYPE_CHECKING import

* pinned xarray to latest commit on github

* re-add upath

* merged w/ main

* ådds section to usage

* Minor formatting nit of code example in docs

---------

Co-authored-by: Tom Nicholas <tom@cworthy.org>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed May 14, 2024
1 parent b15c07b commit 8923b8c
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 15 deletions.
2 changes: 1 addition & 1 deletion ci/doc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ dependencies:
- "sphinx_design"
- "sphinx_togglebutton"
- "sphinx-autodoc-typehints"
- -e ..
- -e "..[test]"
10 changes: 10 additions & 0 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ vds = open_virtual_dataset('air.nc')

(Notice we did not have to explicitly indicate the file format, as {py:func}`open_virtual_dataset <virtualizarr.xarray.open_virtual_dataset>` will attempt to automatically infer it.)


```{note}
In future we would like for it to be possible to just use `xr.open_dataset`, e.g.
Expand Down Expand Up @@ -61,6 +62,15 @@ Attributes:

These {py:class}`ManifestArray <virtualizarr.manifests.ManifestArray>` objects are each a virtual reference to some data in the `air.nc` netCDF file, with the references stored in the form of "Chunk Manifests".

### Opening remote files

To open remote files as virtual datasets pass the `reader_options` options, e.g.

```python
aws_credentials = {"key": ..., "secret": ...}
vds = open_virtual_dataset("s3://some-bucket/file.nc", reader_options={'storage_options': aws_credentials})
```

## Chunk Manifests

In the Zarr model N-dimensional arrays are stored as a series of compressed chunks, each labelled by a chunk key which indicates its position in the array. Whilst conventionally each of these Zarr chunks are a separate compressed binary file stored within a Zarr Store, there is no reason why these chunks could not actually already exist as part of another file (e.g. a netCDF file), and be loaded by reading a specific byte range from this pre-existing file.
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ dependencies = [
"numpy",
"ujson",
"packaging",
"universal-pathlib"

]

[project.optional-dependencies]
Expand All @@ -40,6 +42,7 @@ test = [
"scipy",
"pooch",
"ruff",
"s3fs"

]

Expand Down
39 changes: 28 additions & 11 deletions virtualizarr/kerchunk.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from pathlib import Path
from typing import NewType, cast
from typing import NewType, Optional, cast

import ujson # type: ignore
import xarray as xr

from virtualizarr.utils import _fsspec_openfile_from_filepath
from virtualizarr.zarr import ZArray, ZAttrs

# Distinguishing these via type hints makes it a lot easier to mentally keep track of what the opaque kerchunk "reference dicts" actually mean
Expand Down Expand Up @@ -38,7 +39,11 @@ class FileType(AutoName):


def read_kerchunk_references_from_file(
filepath: str, filetype: FileType | None
filepath: str,
filetype: FileType | None,
reader_options: Optional[dict] = {
"storage_options": {"key": "", "secret": "", "anon": True}
},
) -> KerchunkStoreRefs:
"""
Read a single legacy file and return kerchunk references to its contents.
Expand All @@ -50,56 +55,67 @@ def read_kerchunk_references_from_file(
filetype : FileType, default: None
Type of file to be opened. Used to determine which kerchunk file format backend to use.
If not provided will attempt to automatically infer the correct filetype from the the filepath's extension.
reader_options: dict, default {'storage_options':{'key':'', 'secret':'', 'anon':True}}
Dict passed into Kerchunk file readers. Note: Each Kerchunk file reader has distinct arguments,
so ensure reader_options match selected Kerchunk reader arguments.
"""

if filetype is None:
filetype = _automatically_determine_filetype(filepath)
filetype = _automatically_determine_filetype(
filepath=filepath, reader_options=reader_options
)

# if filetype is user defined, convert to FileType
filetype = FileType(filetype)

if filetype.name.lower() == "netcdf3":
from kerchunk.netCDF3 import NetCDF3ToZarr

refs = NetCDF3ToZarr(filepath, inline_threshold=0).translate()
refs = NetCDF3ToZarr(filepath, inline_threshold=0, **reader_options).translate()

elif filetype.name.lower() == "netcdf4":
from kerchunk.hdf import SingleHdf5ToZarr

refs = SingleHdf5ToZarr(filepath, inline_threshold=0).translate()
refs = SingleHdf5ToZarr(
filepath, inline_threshold=0, **reader_options
).translate()
elif filetype.name.lower() == "grib":
# TODO Grib files should be handled as a DataTree object
# see https://github.com/TomNicholas/VirtualiZarr/issues/11
raise NotImplementedError(f"Unsupported file type: {filetype}")
elif filetype.name.lower() == "tiff":
from kerchunk.tiff import tiff_to_zarr

refs = tiff_to_zarr(filepath, inline_threshold=0)
refs = tiff_to_zarr(filepath, inline_threshold=0, **reader_options)
elif filetype.name.lower() == "fits":
from kerchunk.fits import process_file

refs = process_file(filepath, inline_threshold=0)
refs = process_file(filepath, inline_threshold=0, **reader_options)
else:
raise NotImplementedError(f"Unsupported file type: {filetype.name}")

# TODO validate the references that were read before returning?
return refs


def _automatically_determine_filetype(filepath: str) -> FileType:
def _automatically_determine_filetype(
*, filepath: str, reader_options: Optional[dict] = {}
) -> FileType:
file_extension = Path(filepath).suffix
fpath = _fsspec_openfile_from_filepath(
filepath=filepath, reader_options=reader_options
)

if file_extension == ".nc":
# based off of: https://github.com/TomNicholas/VirtualiZarr/pull/43#discussion_r1543415167
with open(filepath, "rb") as f:
magic = f.read()
magic = fpath.read()

if magic[0:3] == b"CDF":
filetype = FileType.netcdf3
elif magic[1:4] == b"HDF":
filetype = FileType.netcdf4
else:
raise ValueError(".nc file does not appear to be NETCDF3 OR NETCDF4")

elif file_extension == ".zarr":
# TODO we could imagine opening an existing zarr store, concatenating it, and writing a new virtual one...
raise NotImplementedError()
Expand All @@ -112,6 +128,7 @@ def _automatically_determine_filetype(filepath: str) -> FileType:
else:
raise NotImplementedError(f"Unrecognised file extension: {file_extension}")

fpath.close()
return filetype


Expand Down
8 changes: 6 additions & 2 deletions virtualizarr/tests/test_kerchunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,12 @@ def test_automatically_determine_filetype_netcdf3_netcdf4():
ds.to_netcdf(netcdf3_file_path, engine="scipy", format="NETCDF3_CLASSIC")
ds.to_netcdf(netcdf4_file_path, engine="h5netcdf")

assert FileType("netcdf3") == _automatically_determine_filetype(netcdf3_file_path)
assert FileType("netcdf4") == _automatically_determine_filetype(netcdf4_file_path)
assert FileType("netcdf3") == _automatically_determine_filetype(
filepath=netcdf3_file_path
)
assert FileType("netcdf4") == _automatically_determine_filetype(
filepath=netcdf4_file_path
)


def test_FileType():
Expand Down
20 changes: 20 additions & 0 deletions virtualizarr/tests/test_xarray.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections.abc import Mapping

import numpy as np
import pytest
import xarray as xr
import xarray.testing as xrt
from xarray.core.indexes import Index
Expand Down Expand Up @@ -268,6 +269,24 @@ def test_combine_by_coords(self, netcdf4_files):
assert combined_vds.xindexes["time"].to_pandas_index().is_monotonic_increasing


pytest.importorskip("s3fs")


@pytest.mark.parametrize(
"filetype", ["netcdf4", None], ids=["netcdf4 filetype", "None filetype"]
)
@pytest.mark.parametrize("indexes", [None, {}], ids=["None index", "empty dict index"])
def test_anon_read_s3(filetype, indexes):
"""Parameterized tests for empty vs supplied indexes and filetypes."""
# TODO: Switch away from this s3 url after minIO is implemented.
fpath = "s3://carbonplan-share/virtualizarr/local.nc"
vds = open_virtual_dataset(fpath, filetype=filetype, indexes=indexes)

assert vds.dims == {"time": 2920, "lat": 25, "lon": 53}
for var in vds.variables:
assert isinstance(vds[var].data, ManifestArray), var


class TestLoadVirtualDataset:
def test_loadable_variables(self, netcdf4_file):
vars_to_load = ["air", "time"]
Expand All @@ -280,6 +299,7 @@ def test_loadable_variables(self, netcdf4_file):
assert isinstance(vds[name].data, ManifestArray), name

full_ds = xr.open_dataset(netcdf4_file)

for name in full_ds.variables:
if name in vars_to_load:
xrt.assert_identical(vds.variables[name], full_ds.variables[name])
64 changes: 64 additions & 0 deletions virtualizarr/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Optional

if TYPE_CHECKING:
from fsspec.implementations.local import LocalFileOpener
from s3fs.core import S3File


def _fsspec_openfile_from_filepath(
*,
filepath: str,
reader_options: Optional[dict] = {
"storage_options": {"key": "", "secret": "", "anon": True}
},
) -> S3File | LocalFileOpener:
"""Converts input filepath to fsspec openfile object.
Parameters
----------
filepath : str
Input filepath
reader_options : _type_, optional
Dict containing kwargs to pass to file opener, by default {'storage_options':{'key':'', 'secret':'', 'anon':True}}
Returns
-------
S3File | LocalFileOpener
Either S3File or LocalFileOpener, depending on which protocol was supplied.
Raises
------
NotImplementedError
Raises a Not Implemented Error if filepath protocol is not supported.
"""

import fsspec
from upath import UPath

universal_filepath = UPath(filepath)
protocol = universal_filepath.protocol

if protocol == "":
fpath = fsspec.open(filepath, "rb").open()

elif protocol in ["s3"]:
s3_anon_defaults = {"key": "", "secret": "", "anon": True}
if not bool(reader_options):
storage_options = s3_anon_defaults

else:
storage_options = reader_options.get("storage_options") # type: ignore

# using dict merge operator to add in defaults if keys are not specified
storage_options = s3_anon_defaults | storage_options

fpath = fsspec.filesystem(protocol, **storage_options).open(filepath)

else:
raise NotImplementedError(
"Only local and s3 file protocols are currently supported"
)

return fpath
14 changes: 13 additions & 1 deletion virtualizarr/xarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from pathlib import Path
from typing import (
Literal,
Optional,
overload,
)

Expand All @@ -15,6 +16,7 @@
import virtualizarr.kerchunk as kerchunk
from virtualizarr.kerchunk import FileType, KerchunkStoreRefs
from virtualizarr.manifests import ChunkManifest, ManifestArray
from virtualizarr.utils import _fsspec_openfile_from_filepath
from virtualizarr.zarr import (
attrs_from_zarr_group_json,
dataset_to_zarr,
Expand All @@ -35,6 +37,9 @@ def open_virtual_dataset(
loadable_variables: Iterable[str] | None = None,
indexes: Mapping[str, Index] | None = None,
virtual_array_class=ManifestArray,
reader_options: Optional[dict] = {
"storage_options": {"key": "", "secret": "", "anon": True}
},
) -> xr.Dataset:
"""
Open a file or store as an xarray Dataset wrapping virtualized zarr arrays.
Expand Down Expand Up @@ -63,6 +68,9 @@ def open_virtual_dataset(
virtual_array_class
Virtual array class to use to represent the references to the chunks in each on-disk array.
Currently can only be ManifestArray, but once VirtualZarrArray is implemented the default should be changed to that.
reader_options: dict, default {'storage_options':{'key':'', 'secret':'', 'anon':True}}
Dict passed into Kerchunk file readers. Note: Each Kerchunk file reader has distinct arguments,
so ensure reader_options match selected Kerchunk reader arguments.
Returns
-------
Expand Down Expand Up @@ -112,7 +120,11 @@ def open_virtual_dataset(
# TODO we are reading a bunch of stuff we know we won't need here, e.g. all of the data variables...
# TODO it would also be nice if we could somehow consolidate this with the reading of the kerchunk references
# TODO really we probably want a dedicated xarray backend that iterates over all variables only once
ds = xr.open_dataset(filepath, drop_variables=drop_variables)
fpath = _fsspec_openfile_from_filepath(
filepath=filepath, reader_options=reader_options
)

ds = xr.open_dataset(fpath, drop_variables=drop_variables)

if indexes is None:
# add default indexes by reading data from file
Expand Down

0 comments on commit 8923b8c

Please sign in to comment.