From 65d720b85b6136728ba0c982fa06b3858d3c9de0 Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Mon, 10 Nov 2025 21:00:43 +0000 Subject: [PATCH 1/7] Fix compression and serializer encodings --- src/mdio/builder/xarray_builder.py | 29 +++++++++------- tests/unit/v1/test_dataset_serializer.py | 42 ++++++++++++++---------- 2 files changed, 41 insertions(+), 30 deletions(-) diff --git a/src/mdio/builder/xarray_builder.py b/src/mdio/builder/xarray_builder.py index 18ff31ef1..18e52c742 100644 --- a/src/mdio/builder/xarray_builder.py +++ b/src/mdio/builder/xarray_builder.py @@ -14,9 +14,11 @@ try: # zfpy is an optional dependency for ZFP compression # It is not installed by default, so we check for its presence and import it only if available. - from zfpy import ZFPY as zfpy_ZFPY # noqa: N811 + from numcodecs import ZFPY as zfpy_ZFPY # noqa: N811 + from zarr.codecs.numcodecs import ZFPY as zarr_ZFPY # noqa: N811 except ImportError: zfpy_ZFPY = None # noqa: N816 + zarr_ZFPY = None # noqa: N816 from mdio.builder.schemas.compressors import ZFP as mdio_ZFP # noqa: N811 from mdio.builder.schemas.compressors import Blosc as mdio_Blosc @@ -121,9 +123,9 @@ def _get_zarr_chunks(var: Variable, all_named_dims: dict[str, NamedDimension]) - return _get_zarr_shape(var, all_named_dims=all_named_dims) -def _convert_compressor( +def _compressor_to_encoding( compressor: mdio_Blosc | mdio_ZFP | None, -) -> BloscCodec | Blosc | zfpy_ZFPY | None: +) -> dict[str, BloscCodec | Blosc | zfpy_ZFPY | zarr_ZFPY] | None: """Convert a compressor to a numcodecs compatible format.""" if compressor is None: return None @@ -132,19 +134,18 @@ def _convert_compressor( blosc_kwargs = compressor.model_dump(exclude={"name"}, mode="json") if zarr.config.get("default_zarr_format") == ZarrFormat.V2: blosc_kwargs["shuffle"] = -1 if blosc_kwargs["shuffle"] is None else blosc_kwargs["shuffle"] - return Blosc(**blosc_kwargs) - return BloscCodec(**blosc_kwargs) + return {"compressors": Blosc(**blosc_kwargs)} + return {"compressors": BloscCodec(**blosc_kwargs)} if isinstance(compressor, mdio_ZFP): if zfpy_ZFPY is None: msg = "zfpy and numcodecs are required to use ZFP compression" raise ImportError(msg) - return zfpy_ZFPY( - mode=compressor.mode.value, - tolerance=compressor.tolerance, - rate=compressor.rate, - precision=compressor.precision, - ) + zfp_kwargs = compressor.model_dump(exclude={"name"}, mode="json") + if zarr.config.get("default_zarr_format") == ZarrFormat.V2: + # I'm not sure if this is the appropriate way for v2. + return {"compressors": zfpy_ZFPY(**zfp_kwargs)} + return {"serializer": zarr_ZFPY(**zfp_kwargs), "compressors": None} msg = f"Unsupported compressor model: {type(compressor)}" raise TypeError(msg) @@ -222,10 +223,14 @@ def to_xarray_dataset(mdio_ds: Dataset) -> xr_Dataset: # noqa: PLR0912 encoding = { "chunks": original_chunks, - "compressors": _convert_compressor(v.compressor), fill_value_key: fill_value, } + compressor_encodings = _compressor_to_encoding(v.compressor) + + if compressor_encodings is not None: + encoding.update(compressor_encodings) + if zarr_format == ZarrFormat.V2: encoding["chunk_key_encoding"] = {"name": "v2", "configuration": {"separator": "/"}} diff --git a/tests/unit/v1/test_dataset_serializer.py b/tests/unit/v1/test_dataset_serializer.py index 45c19665f..81cc5e834 100644 --- a/tests/unit/v1/test_dataset_serializer.py +++ b/tests/unit/v1/test_dataset_serializer.py @@ -19,7 +19,7 @@ from mdio.builder.schemas.v1.variable import Coordinate from mdio.builder.schemas.v1.variable import Variable from mdio.builder.schemas.v1.variable import VariableMetadata -from mdio.builder.xarray_builder import _convert_compressor +from mdio.builder.xarray_builder import _compressor_to_encoding from mdio.builder.xarray_builder import _get_all_named_dimensions from mdio.builder.xarray_builder import _get_coord_names from mdio.builder.xarray_builder import _get_dimension_names @@ -226,43 +226,49 @@ def test_get_fill_value() -> None: assert result_none_input is None -def test_convert_compressor() -> None: - """Simple test for _convert_compressor function covering basic scenarios.""" +def test_compressor_to_encoding() -> None: + """Simple test for _compressor_to_encoding function covering basic scenarios.""" # Test 1: None input - should return None - result_none = _convert_compressor(None) + result_none = _compressor_to_encoding(None) assert result_none is None # Test 2: mdio_Blosc compressor - should return nc_Blosc mdio_compressor = mdio_Blosc(cname=BloscCname.lz4, clevel=5, shuffle=BloscShuffle.bitshuffle, blocksize=1024) - result_blosc = _convert_compressor(mdio_compressor) + result_blosc = _compressor_to_encoding(mdio_compressor) - assert isinstance(result_blosc, BloscCodec) - assert result_blosc.cname == BloscCname.lz4 - assert result_blosc.clevel == 5 - assert result_blosc.shuffle == BloscShuffle.bitshuffle - assert result_blosc.blocksize == 1024 + assert isinstance(result_blosc, dict) + assert "compressors" in result_blosc + assert isinstance(result_blosc["compressors"], BloscCodec) + assert result_blosc["compressors"].cname == BloscCname.lz4 + assert result_blosc["compressors"].clevel == 5 + assert result_blosc["compressors"].shuffle == BloscShuffle.bitshuffle + assert result_blosc["compressors"].blocksize == 1024 # Test 3: mdio_ZFP compressor - should return zfpy_ZFPY if available zfp_compressor = MDIO_ZFP(mode=mdio_ZFPMode.FIXED_RATE, tolerance=0.01, rate=8.0, precision=16) + # TODO(BrianMichell): Update to also test zfp compression. + # https://github.com/TGSAI/mdio-python/issues/747 if HAS_ZFPY: # pragma: no cover - result_zfp = _convert_compressor(zfp_compressor) - assert isinstance(result_zfp, ZFPY) - assert result_zfp.mode == 1 # ZFPMode.FIXED_RATE.value = "fixed_rate" - assert result_zfp.tolerance == 0.01 - assert result_zfp.rate == 8.0 - assert result_zfp.precision == 16 + result_zfp = _compressor_to_encoding(zfp_compressor) + assert isinstance(result_zfp, dict) + assert "compressors" not in result_zfp + assert isinstance(result_zfp["serializer"], ZFPY) + assert result_zfp["serializer"].mode == 1 # ZFPMode.FIXED_RATE.value = "fixed_rate" + assert result_zfp["serializer"].tolerance == 0.01 + assert result_zfp["serializer"].rate == 8.0 + assert result_zfp["serializer"].precision == 16 else: # Test 5: mdio_ZFP without zfpy installed - should raise ImportError with pytest.raises(ImportError) as exc_info: - _convert_compressor(zfp_compressor) + _compressor_to_encoding(zfp_compressor) error_message = str(exc_info.value) assert "zfpy and numcodecs are required to use ZFP compression" in error_message # Test 6: Unsupported compressor type - should raise TypeError unsupported_compressor = "invalid_compressor" with pytest.raises(TypeError) as exc_info: - _convert_compressor(unsupported_compressor) + _compressor_to_encoding(unsupported_compressor) error_message = str(exc_info.value) assert "Unsupported compressor model" in error_message assert "" in error_message From 2a20324336ff818504555fb53547876266f1a113 Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Mon, 10 Nov 2025 21:23:02 +0000 Subject: [PATCH 2/7] Fix zarr v2 support --- src/mdio/builder/xarray_builder.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/mdio/builder/xarray_builder.py b/src/mdio/builder/xarray_builder.py index 18e52c742..c2d3d9e53 100644 --- a/src/mdio/builder/xarray_builder.py +++ b/src/mdio/builder/xarray_builder.py @@ -143,7 +143,9 @@ def _compressor_to_encoding( raise ImportError(msg) zfp_kwargs = compressor.model_dump(exclude={"name"}, mode="json") if zarr.config.get("default_zarr_format") == ZarrFormat.V2: - # I'm not sure if this is the appropriate way for v2. + # This may not be an exhaustive list of keys that are incompatible with v2. + if zfp_kwargs.get("writeHeader") is not None: + zfp_kwargs.pop("writeHeader") return {"compressors": zfpy_ZFPY(**zfp_kwargs)} return {"serializer": zarr_ZFPY(**zfp_kwargs), "compressors": None} From 4aedeb5de01a9bc3d44d48c7abf6c8142d8a888b Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Mon, 10 Nov 2025 22:29:44 +0000 Subject: [PATCH 3/7] Use the proper encoding for zfp mode --- src/mdio/builder/xarray_builder.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/mdio/builder/xarray_builder.py b/src/mdio/builder/xarray_builder.py index c2d3d9e53..b76f31699 100644 --- a/src/mdio/builder/xarray_builder.py +++ b/src/mdio/builder/xarray_builder.py @@ -142,6 +142,8 @@ def _compressor_to_encoding( msg = "zfpy and numcodecs are required to use ZFP compression" raise ImportError(msg) zfp_kwargs = compressor.model_dump(exclude={"name"}, mode="json") + # Convert string mode to integer for numcodecs compatibility + zfp_kwargs["mode"] = compressor.mode.int_code if zarr.config.get("default_zarr_format") == ZarrFormat.V2: # This may not be an exhaustive list of keys that are incompatible with v2. if zfp_kwargs.get("writeHeader") is not None: From c2647e7cddb0d623954752c2f6b6cf979d93fac5 Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Mon, 10 Nov 2025 22:30:14 +0000 Subject: [PATCH 4/7] Remove configuration for write_header as it is not used by Zarr or Numcodecs --- src/mdio/builder/schemas/compressors.py | 5 ----- src/mdio/builder/xarray_builder.py | 3 --- 2 files changed, 8 deletions(-) diff --git a/src/mdio/builder/schemas/compressors.py b/src/mdio/builder/schemas/compressors.py index 7794277a2..f661712bb 100644 --- a/src/mdio/builder/schemas/compressors.py +++ b/src/mdio/builder/schemas/compressors.py @@ -71,11 +71,6 @@ class ZFP(CamelCaseStrictModel): description="Fixed precision in terms of number of uncompressed bits per value.", ) - write_header: bool = Field( - default=True, - description="Encode array shape, scalar type, and compression parameters.", - ) - @model_validator(mode="after") def check_requirements(self) -> ZFP: """Check if ZFP parameters make sense.""" diff --git a/src/mdio/builder/xarray_builder.py b/src/mdio/builder/xarray_builder.py index b76f31699..fe7868aa2 100644 --- a/src/mdio/builder/xarray_builder.py +++ b/src/mdio/builder/xarray_builder.py @@ -145,9 +145,6 @@ def _compressor_to_encoding( # Convert string mode to integer for numcodecs compatibility zfp_kwargs["mode"] = compressor.mode.int_code if zarr.config.get("default_zarr_format") == ZarrFormat.V2: - # This may not be an exhaustive list of keys that are incompatible with v2. - if zfp_kwargs.get("writeHeader") is not None: - zfp_kwargs.pop("writeHeader") return {"compressors": zfpy_ZFPY(**zfp_kwargs)} return {"serializer": zarr_ZFPY(**zfp_kwargs), "compressors": None} From b72be98eae6e58ae2cd955aac87a669157268b68 Mon Sep 17 00:00:00 2001 From: Altay Sansal <13684161+tasansal@users.noreply.github.com> Date: Tue, 11 Nov 2025 08:42:03 -0600 Subject: [PATCH 5/7] reduce repetition and simplify logic --- src/mdio/builder/xarray_builder.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/mdio/builder/xarray_builder.py b/src/mdio/builder/xarray_builder.py index fe7868aa2..d9ff4cefc 100644 --- a/src/mdio/builder/xarray_builder.py +++ b/src/mdio/builder/xarray_builder.py @@ -130,23 +130,23 @@ def _compressor_to_encoding( if compressor is None: return None + is_v2 = zarr.config.get("default_zarr_format") == ZarrFormat.V2 + kwargs = compressor.model_dump(exclude={"name"}, mode="json") + if isinstance(compressor, mdio_Blosc): - blosc_kwargs = compressor.model_dump(exclude={"name"}, mode="json") - if zarr.config.get("default_zarr_format") == ZarrFormat.V2: - blosc_kwargs["shuffle"] = -1 if blosc_kwargs["shuffle"] is None else blosc_kwargs["shuffle"] - return {"compressors": Blosc(**blosc_kwargs)} - return {"compressors": BloscCodec(**blosc_kwargs)} + if is_v2 and kwargs["shuffle"] is None: + kwargs["shuffle"] = -1 + codec_cls = Blosc if is_v2 else BloscCodec + return {"compressors": codec_cls(**kwargs)} if isinstance(compressor, mdio_ZFP): if zfpy_ZFPY is None: msg = "zfpy and numcodecs are required to use ZFP compression" raise ImportError(msg) - zfp_kwargs = compressor.model_dump(exclude={"name"}, mode="json") - # Convert string mode to integer for numcodecs compatibility - zfp_kwargs["mode"] = compressor.mode.int_code - if zarr.config.get("default_zarr_format") == ZarrFormat.V2: - return {"compressors": zfpy_ZFPY(**zfp_kwargs)} - return {"serializer": zarr_ZFPY(**zfp_kwargs), "compressors": None} + kwargs["mode"] = compressor.mode.int_code + if is_v2: + return {"compressors": zfpy_ZFPY(**kwargs)} + return {"serializer": zarr_ZFPY(**kwargs), "compressors": None} msg = f"Unsupported compressor model: {type(compressor)}" raise TypeError(msg) From 3f46d9e0d0bee0b0177e2ebb49e5539653fd6c55 Mon Sep 17 00:00:00 2001 From: Altay Sansal <13684161+tasansal@users.noreply.github.com> Date: Tue, 11 Nov 2025 08:50:33 -0600 Subject: [PATCH 6/7] move compressor check to beginning --- src/mdio/builder/xarray_builder.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/mdio/builder/xarray_builder.py b/src/mdio/builder/xarray_builder.py index d9ff4cefc..b9f8bcabf 100644 --- a/src/mdio/builder/xarray_builder.py +++ b/src/mdio/builder/xarray_builder.py @@ -127,8 +127,9 @@ def _compressor_to_encoding( compressor: mdio_Blosc | mdio_ZFP | None, ) -> dict[str, BloscCodec | Blosc | zfpy_ZFPY | zarr_ZFPY] | None: """Convert a compressor to a numcodecs compatible format.""" - if compressor is None: - return None + if compressor is not None and not isinstance(compressor, mdio_Blosc | mdio_ZFP): + msg = f"Unsupported compressor model: {type(compressor)}" + raise TypeError(msg) is_v2 = zarr.config.get("default_zarr_format") == ZarrFormat.V2 kwargs = compressor.model_dump(exclude={"name"}, mode="json") @@ -148,8 +149,7 @@ def _compressor_to_encoding( return {"compressors": zfpy_ZFPY(**kwargs)} return {"serializer": zarr_ZFPY(**kwargs), "compressors": None} - msg = f"Unsupported compressor model: {type(compressor)}" - raise TypeError(msg) + return None def _get_fill_value(data_type: ScalarType | StructuredType | str) -> any: From b512c36b460714a171fd5b568b3383d4b3b426de Mon Sep 17 00:00:00 2001 From: Altay Sansal <13684161+tasansal@users.noreply.github.com> Date: Tue, 11 Nov 2025 08:57:54 -0600 Subject: [PATCH 7/7] handle linter complaints and handle None --- src/mdio/builder/xarray_builder.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/mdio/builder/xarray_builder.py b/src/mdio/builder/xarray_builder.py index b9f8bcabf..58501cba9 100644 --- a/src/mdio/builder/xarray_builder.py +++ b/src/mdio/builder/xarray_builder.py @@ -127,7 +127,10 @@ def _compressor_to_encoding( compressor: mdio_Blosc | mdio_ZFP | None, ) -> dict[str, BloscCodec | Blosc | zfpy_ZFPY | zarr_ZFPY] | None: """Convert a compressor to a numcodecs compatible format.""" - if compressor is not None and not isinstance(compressor, mdio_Blosc | mdio_ZFP): + if compressor is None: + return None + + if not isinstance(compressor, (mdio_Blosc, mdio_ZFP)): msg = f"Unsupported compressor model: {type(compressor)}" raise TypeError(msg) @@ -140,16 +143,14 @@ def _compressor_to_encoding( codec_cls = Blosc if is_v2 else BloscCodec return {"compressors": codec_cls(**kwargs)} - if isinstance(compressor, mdio_ZFP): - if zfpy_ZFPY is None: - msg = "zfpy and numcodecs are required to use ZFP compression" - raise ImportError(msg) - kwargs["mode"] = compressor.mode.int_code - if is_v2: - return {"compressors": zfpy_ZFPY(**kwargs)} - return {"serializer": zarr_ZFPY(**kwargs), "compressors": None} - - return None + # must be ZFP beyond here + if zfpy_ZFPY is None: + msg = "zfpy and numcodecs are required to use ZFP compression" + raise ImportError(msg) + kwargs["mode"] = compressor.mode.int_code + if is_v2: + return {"compressors": zfpy_ZFPY(**kwargs)} + return {"serializer": zarr_ZFPY(**kwargs), "compressors": None} def _get_fill_value(data_type: ScalarType | StructuredType | str) -> any: