From f62e6c11bee29cf30b0e7c82e0ba46aec1f44451 Mon Sep 17 00:00:00 2001 From: TheGreatAlgo <37487508+TheGreatAlgo@users.noreply.github.com> Date: Fri, 30 May 2025 08:31:20 -0400 Subject: [PATCH 1/9] feat: support partials --- py_hamt/hamt.py | 5 +- py_hamt/store.py | 50 ++++-- py_hamt/zarr_hamt_store.py | 48 +++++- tests/test_zarr_ipfs_partial.py | 262 ++++++++++++++++++++++++++++++++ 4 files changed, 348 insertions(+), 17 deletions(-) create mode 100644 tests/test_zarr_ipfs_partial.py diff --git a/py_hamt/hamt.py b/py_hamt/hamt.py index ad867c2..98f3dc6 100644 --- a/py_hamt/hamt.py +++ b/py_hamt/hamt.py @@ -3,6 +3,7 @@ import uuid import asyncio from copy import deepcopy +from typing import Optional import dag_cbor from dag_cbor.ipld import IPLDKind @@ -571,10 +572,10 @@ async def delete(self, key: str): # If we didn't make a change, then this key must not exist within the HAMT raise KeyError - async def get(self, key: str) -> IPLDKind: + async def get(self, key: str, offset: Optional[int] = None, length: Optional[int] = None, suffix: Optional[int] = None) -> IPLDKind: """Get a value.""" pointer = await self.get_pointer(key) - data = await self.cas.load(pointer) + data = await self.cas.load(pointer, offset=offset, length=length, suffix=suffix) if self.values_are_bytes: return data else: diff --git a/py_hamt/store.py b/py_hamt/store.py index 0fbe62e..2071383 100644 --- a/py_hamt/store.py +++ b/py_hamt/store.py @@ -1,6 +1,6 @@ import asyncio import aiohttp -from typing import Literal +from typing import Literal, Optional from abc import ABC, abstractmethod from dag_cbor.ipld import IPLDKind from multiformats import multihash @@ -30,7 +30,7 @@ async def save(self, data: bytes, codec: CodecInput) -> IPLDKind: """ @abstractmethod - async def load(self, id: IPLDKind) -> bytes: + async def load(self, id: IPLDKind, offset: Optional[int] = None, length: Optional[int] = None, suffix: Optional[int] = None) -> bytes: """Retrieve data.""" @@ -49,11 +49,23 @@ async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> by self.store[hash] = data return hash - async def load(self, id: bytes) -> bytes: # type: ignore since bytes is a subset of the IPLDKind type - if id in self.store: - return self.store[id] - - raise KeyError + async def load(self, id: bytes, offset: Optional[int] = None, length: Optional[int] = None, suffix: Optional[int] = None) -> bytes: # type: ignore since bytes is a subset of the IPLDKind type + if id not in self.store: + raise KeyError + + data = self.store[id] + + if offset is not None: + start = offset + if length is not None: + end = start + length + return data[start:end] + else: + return data[start:] + elif suffix is not None: # If only length is given, assume start from 0 + return data[-suffix:] + else: # Full load + return data class KuboCAS(ContentAddressedStore): @@ -166,11 +178,31 @@ async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CI return cid async def load( # type: ignore CID is definitely in the IPLDKind type - self, id: CID + self, + id: CID, + offset: Optional[int] = None, + length: Optional[int] = None, + suffix: Optional[int] = None ) -> bytes: """@private""" url = self.gateway_base_url + str(id) + headers = {} + + # Construct the Range header if required + if offset is not None: + start = offset + if length is not None: + # Standard HTTP Range: bytes=start-end (inclusive) + end = start + length - 1 + headers["Range"] = f"bytes={start}-{end}" + else: + # Standard HTTP Range: bytes=start- (from start to end) + headers["Range"] = f"bytes={start}-" + elif suffix is not None: + # Standard HTTP Range: bytes=-N (last N bytes) + headers["Range"] = f"bytes=-{suffix}" + async with self._sem: # throttle gateway - async with self._loop_session().get(url) as resp: + async with self._loop_session().get(url, headers=headers or None) as resp: resp.raise_for_status() return await resp.read() diff --git a/py_hamt/zarr_hamt_store.py b/py_hamt/zarr_hamt_store.py index f5c5dbb..8f2c3a3 100644 --- a/py_hamt/zarr_hamt_store.py +++ b/py_hamt/zarr_hamt_store.py @@ -2,10 +2,11 @@ import zarr.abc.store import zarr.core.buffer from zarr.core.common import BytesLike +from typing import Optional +import asyncio from py_hamt.hamt import HAMT - class ZarrHAMTStore(zarr.abc.store.Store): """ Write and read Zarr v3s with a HAMT. @@ -61,6 +62,27 @@ def __init__(self, hamt: HAMT, read_only: bool = False) -> None: self.metadata_read_cache: dict[str, bytes] = dict() """@private""" + def _map_byte_request(self, byte_range: Optional[zarr.abc.store.ByteRequest]) -> tuple[Optional[int], Optional[int], Optional[int]]: + """Helper to map Zarr ByteRequest to offset, length, suffix.""" + offset: Optional[int] = None + length: Optional[int] = None + suffix: Optional[int] = None + + if byte_range: + if isinstance(byte_range, zarr.abc.store.RangeByteRequest): + offset = byte_range.start + length = byte_range.end - byte_range.start + if length < 0: + raise ValueError("End must be >= start for RangeByteRequest") + elif isinstance(byte_range, zarr.abc.store.OffsetByteRequest): + offset = byte_range.offset + elif isinstance(byte_range, zarr.abc.store.SuffixByteRequest): + suffix = byte_range.suffix + else: + raise TypeError(f"Unsupported ByteRequest type: {type(byte_range)}") + + return offset, length, suffix + @property def read_only(self) -> bool: """@private""" @@ -86,25 +108,39 @@ async def get( len(key) >= 9 and key[-9:] == "zarr.json" ) # if path ends with zarr.json - if is_metadata and key in self.metadata_read_cache: + if is_metadata and byte_range is None and key in self.metadata_read_cache: val = self.metadata_read_cache[key] else: - val = await self.hamt.get(key) # type: ignore We know values received will always be bytes since we only store bytes in the HAMT - if is_metadata: + offset, length, suffix = self._map_byte_request(byte_range) + + val = await self.hamt.get(key, offset=offset, length=length, suffix=suffix) # type: ignore We know values received will always be bytes since we only store bytes in the HAMT + # Update cache only on full metadata reads + if is_metadata and byte_range is None: self.metadata_read_cache[key] = val return prototype.buffer.from_bytes(val) except KeyError: # Sometimes zarr queries keys that don't exist anymore, just return nothing on those cases return + except Exception as e: + print(f"Error getting key '{key}' with range {byte_range}: {e}") + raise + async def get_partial_values( self, prototype: zarr.core.buffer.BufferPrototype, key_ranges: Iterable[tuple[str, zarr.abc.store.ByteRequest | None]], ) -> list[zarr.core.buffer.Buffer | None]: - """@private""" - raise NotImplementedError + """ + Retrieves multiple keys or byte ranges concurrently using asyncio.gather. + """ + tasks = [ + self.get(key, prototype, byte_range) + for key, byte_range in key_ranges + ] + results = await asyncio.gather(*tasks, return_exceptions=False) # Set return_exceptions=True for debugging + return results async def exists(self, key: str) -> bool: """@private""" diff --git a/tests/test_zarr_ipfs_partial.py b/tests/test_zarr_ipfs_partial.py new file mode 100644 index 0000000..702441f --- /dev/null +++ b/tests/test_zarr_ipfs_partial.py @@ -0,0 +1,262 @@ +import time + +import numpy as np +import pandas as pd +import xarray as xr +import pytest +import zarr +import zarr.core.buffer +# Make sure to import the ByteRequest types +from zarr.abc.store import RangeByteRequest, OffsetByteRequest, SuffixByteRequest +import aiohttp +from typing import Optional + + + +from py_hamt import HAMT, KuboCAS + +from py_hamt.zarr_hamt_store import ZarrHAMTStore + + +@pytest.fixture(scope="module") +def random_zarr_dataset(): + """Creates a random xarray Dataset. + + Returns: + tuple: (dataset_path, expected_data) + - dataset_path: Path to the zarr store + - expected_data: The original xarray Dataset for comparison + """ + # Coordinates of the random data + times = pd.date_range("2024-01-01", periods=100) + lats = np.linspace(-90, 90, 18) + lons = np.linspace(-180, 180, 36) + + # Create random variables with different shapes + temp = np.random.randn(len(times), len(lats), len(lons)) + precip = np.random.gamma(2, 0.5, size=(len(times), len(lats), len(lons))) + + # Create the dataset + ds = xr.Dataset( + { + "temp": ( + ["time", "lat", "lon"], + temp, + {"units": "celsius", "long_name": "Surface Temperature"}, + ), + "precip": ( + ["time", "lat", "lon"], + precip, + {"units": "mm/day", "long_name": "Daily Precipitation"}, + ), + }, + coords={ + "time": times, + "lat": ("lat", lats, {"units": "degrees_north"}), + "lon": ("lon", lons, {"units": "degrees_east"}), + }, + attrs={"description": "Test dataset with random weather data"}, + ) + + yield ds + +# This test also collects miscellaneous statistics about performance, run with pytest -s to see these statistics being printed out +@pytest.mark.asyncio(loop_scope="session") # ← match the loop of the fixture +async def test_write_read( + create_ipfs, + random_zarr_dataset: xr.Dataset, +): # noqa for fixture which is imported above but then "redefined" + rpc_base_url, gateway_base_url = create_ipfs + test_ds = random_zarr_dataset + print("=== Writing this xarray Dataset to a Zarr v3 on IPFS ===") + print(test_ds) + + + async with KuboCAS( # <-- own and auto-close session + rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url + ) as kubo_cas: + hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True) + zhs = ZarrHAMTStore(hamt) + assert zhs.supports_writes + start = time.perf_counter() + # Do an initial write along with an append which is a common xarray/zarr operation + # Ensure chunks are not too small for partial value tests + test_ds.to_zarr(store=zhs, chunk_store={'time': 50, 'lat': 18, 'lon': 36}) + test_ds.to_zarr(store=zhs, mode="a", append_dim="time", zarr_format=3) + end = time.perf_counter() + elapsed = end - start + print("=== Write Stats") + print(f"Total time in seconds: {elapsed:.2f}") + print("=== Root CID") + await hamt.make_read_only() + cid = hamt.root_node_id + + print(f"=== Verifying Gateway Suffix Support (CID: {cid}) ===") + # Get the gateway URL without the /ipfs/ part + gateway_only_url = gateway_base_url + + # You can add an assertion here if you expect it to work + # If you know the gateway *might* be buggy, just printing is okay too. + assert is_correct, "IPFS Gateway did not return the correct suffix data." + + print("=== Reading data back in and checking if identical") + hamt_read = await HAMT.build( # Renamed to avoid confusion + cas=kubo_cas, root_node_id=cid, values_are_bytes=True, read_only=True + ) + start = time.perf_counter() + zhs_read = ZarrHAMTStore(hamt_read, read_only=True) # Use the read-only hamt + ipfs_ds = xr.open_zarr(store=zhs_read) + print(ipfs_ds) + + # Check both halves, since each are an identical copy + ds1 = ipfs_ds.isel(time=slice(0, len(ipfs_ds.time) // 2)) + ds2 = ipfs_ds.isel(time=slice(len(ipfs_ds.time) // 2, len(ipfs_ds.time))) + xr.testing.assert_identical(ds1, ds2) + xr.testing.assert_identical(test_ds, ds1) + xr.testing.assert_identical(test_ds, ds2) + + end = time.perf_counter() + elapsed = end - start + print("=== Read Stats") + print(f"Total time in seconds: {elapsed:.2f}") + + # --- Start: New Partial Values Tests --- + + print("=== Testing get_partial_values ===") + proto = zarr.core.buffer.default_buffer_prototype() + + # Find a chunk key to test with (e.g., the first chunk of 'temp') + chunk_key = None + async for k in zhs_read.list(): + if k.startswith("temp/") and k != "temp/.zarray": + chunk_key = k + break + + assert chunk_key is not None, "Could not find a chunk key to test." + print(f"Testing with chunk key: {chunk_key}") + + # Get the full chunk data for comparison + full_chunk_buffer = await zhs_read.get(chunk_key, proto) + assert full_chunk_buffer is not None + full_chunk_data = full_chunk_buffer.to_bytes() + chunk_len = len(full_chunk_data) + print(f"Full chunk size: {chunk_len} bytes") + + # Ensure the chunk is large enough for meaningful tests + assert chunk_len > 100, "Chunk size too small for partial value tests" + + # Define some byte requests + range_req = RangeByteRequest(start=10, end=50) # Request 40 bytes + offset_req = OffsetByteRequest(offset=chunk_len - 30) # Request last 30 bytes + suffix_req = SuffixByteRequest(suffix=20) # Request last 20 bytes + + key_ranges_to_test = [ + (chunk_key, range_req), + (chunk_key, offset_req), + (chunk_key, suffix_req), + (chunk_key, None), # Full read + ] + + # Call get_partial_values + results = await zhs_read.get_partial_values(proto, key_ranges_to_test) + + # Assertions + assert len(results) == 4 + assert results[0] is not None + assert results[1] is not None + assert results[2] is not None + assert results[3] is not None + + # Check RangeByteRequest result + expected_range = full_chunk_data[10:50] + assert results[0].to_bytes() == expected_range, "RangeByteRequest failed" + print(f"RangeByteRequest: OK (Got {len(results[0].to_bytes())} bytes)") + + # Check OffsetByteRequest result + expected_offset = full_chunk_data[chunk_len - 30:] + assert results[1].to_bytes() == expected_offset, "OffsetByteRequest failed" + print(f"OffsetByteRequest: OK (Got {len(results[1].to_bytes())} bytes)") + + # Check SuffixByteRequest result + expected_suffix = full_chunk_data[-20:] + # Broken until Kubo 0.36.0 + assert results[2].to_bytes() == expected_suffix, "SuffixByteRequest failed" + print(f"SuffixByteRequest: OK (Got {len(results[2].to_bytes())} bytes)") + + # Check full read result + assert results[3].to_bytes() == full_chunk_data, "Full read via get_partial_values failed" + print(f"Full Read: OK (Got {len(results[3].to_bytes())} bytes)") + + + # --- End: New Partial Values Tests --- + + + # Tests for code coverage's sake + assert await zhs_read.exists("zarr.json") + # __eq__ + assert zhs_read == zhs_read + assert zhs_read != hamt_read + assert not zhs_read.supports_writes + assert not zhs_read.supports_partial_writes + assert zhs_read.supports_deletes # Should be true in read-only mode for HAMT? Usually False + + hamt_keys = set() + async for k in zhs_read.hamt.keys(): + hamt_keys.add(k) + + zhs_keys: set[str] = set() + async for k in zhs_read.list(): + zhs_keys.add(k) + assert hamt_keys == zhs_keys + + zhs_keys: set[str] = set() + async for k in zhs_read.list_prefix(""): + zhs_keys.add(k) + assert hamt_keys == zhs_keys + + with pytest.raises(NotImplementedError): + await zhs_read.set_partial_values([]) + + # REMOVED: The old NotImplementedError check for get_partial_values + # with pytest.raises(NotImplementedError): + # await zhs_read.get_partial_values( + # zarr.core.buffer.default_buffer_prototype(), [] + # ) + + previous_zarr_json = await zhs_read.get( + "zarr.json", zarr.core.buffer.default_buffer_prototype() + ) + assert previous_zarr_json is not None + + # --- Test set_if_not_exists (needs a writable store) --- + await hamt_read.enable_write() + zhs_write = ZarrHAMTStore(hamt_read, read_only=False) + + # Setting a metadata file that should always exist should not change anything + await zhs_write.set_if_not_exists( + "zarr.json", + zarr.core.buffer.Buffer.from_bytes(b"should_not_change"), + ) + zarr_json_now = await zhs_write.get( + "zarr.json", zarr.core.buffer.default_buffer_prototype() + ) + assert zarr_json_now is not None + assert previous_zarr_json.to_bytes() == zarr_json_now.to_bytes() + + # now remove that metadata file and then add it back + await zhs_write.delete("zarr.json") + # doing a duplicate delete should not result in an error + await zhs_write.delete("zarr.json") + + zhs_keys_after_delete: set[str] = set() + async for k in zhs_write.list(): + zhs_keys_after_delete.add(k) + assert hamt_keys != zhs_keys_after_delete + assert "zarr.json" not in zhs_keys_after_delete + + await zhs_write.set_if_not_exists("zarr.json", previous_zarr_json) + zarr_json_now = await zhs_write.get( + "zarr.json", zarr.core.buffer.default_buffer_prototype() + ) + assert zarr_json_now is not None + assert previous_zarr_json.to_bytes() == zarr_json_now.to_bytes() \ No newline at end of file From 226967fcd81cc633f8da1f375f21672efb96e69a Mon Sep 17 00:00:00 2001 From: TheGreatAlgo <37487508+TheGreatAlgo@users.noreply.github.com> Date: Fri, 30 May 2025 10:55:22 -0400 Subject: [PATCH 2/9] fix: re-add accidentally deleted _map_byte_request --- py_hamt/zarr_hamt_store.py | 1 + 1 file changed, 1 insertion(+) diff --git a/py_hamt/zarr_hamt_store.py b/py_hamt/zarr_hamt_store.py index 3b7294a..e378e16 100644 --- a/py_hamt/zarr_hamt_store.py +++ b/py_hamt/zarr_hamt_store.py @@ -112,6 +112,7 @@ async def get( if is_metadata and byte_range is None and key in self.metadata_read_cache: val = self.metadata_read_cache[key] else: + offset, length, suffix = self._map_byte_request(byte_range) val = cast( bytes, await self.hamt.get(key, offset=offset, length=length, suffix=suffix) ) # We know values received will always be bytes since we only store bytes in the HAMT From 89e8124af2e353f1e66d3d3b8e5a16453c8f7cbb Mon Sep 17 00:00:00 2001 From: TheGreatAlgo <37487508+TheGreatAlgo@users.noreply.github.com> Date: Mon, 2 Jun 2025 09:18:13 -0400 Subject: [PATCH 3/9] fix: tidying up --- py_hamt/hamt.py | 12 ++++- py_hamt/store.py | 28 ++++++++--- py_hamt/zarr_hamt_store.py | 21 +++++---- tests/test_zarr_ipfs.py | 5 -- tests/test_zarr_ipfs_partial.py | 82 +++++++-------------------------- 5 files changed, 60 insertions(+), 88 deletions(-) diff --git a/py_hamt/hamt.py b/py_hamt/hamt.py index c94959e..aab1151 100644 --- a/py_hamt/hamt.py +++ b/py_hamt/hamt.py @@ -590,10 +590,18 @@ async def delete(self, key: str) -> None: # If we didn't make a change, then this key must not exist within the HAMT raise KeyError - async def get(self, key: str, offset: Optional[int] = None, length: Optional[int] = None, suffix: Optional[int] = None) -> IPLDKind: + async def get( + self, + key: str, + offset: Optional[int] = None, + length: Optional[int] = None, + suffix: Optional[int] = None, + ) -> IPLDKind: """Get a value.""" pointer: IPLDKind = await self.get_pointer(key) - data: bytes = await self.cas.load(pointer, offset=offset, length=length, suffix=suffix) + data: bytes = await self.cas.load( + pointer, offset=offset, length=length, suffix=suffix + ) if self.values_are_bytes: return data else: diff --git a/py_hamt/store.py b/py_hamt/store.py index 06b4a04..00b581d 100644 --- a/py_hamt/store.py +++ b/py_hamt/store.py @@ -30,7 +30,13 @@ async def save(self, data: bytes, codec: CodecInput) -> IPLDKind: """ @abstractmethod - async def load(self, id: IPLDKind, offset: Optional[int] = None, length: Optional[int] = None, suffix: Optional[int] = None) -> bytes: + async def load( + self, + id: IPLDKind, + offset: Optional[int] = None, + length: Optional[int] = None, + suffix: Optional[int] = None, + ) -> bytes: """Retrieve data.""" @@ -49,7 +55,13 @@ async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> by self.store[hash] = data return hash - async def load(self, id: IPLDKind, offset: Optional[int] = None, length: Optional[int] = None, suffix: Optional[int] = None) -> bytes: # type: ignore since bytes is a subset of the IPLDKind type + async def load( + self, + id: IPLDKind, + offset: Optional[int] = None, + length: Optional[int] = None, + suffix: Optional[int] = None, + ) -> bytes: # type: ignore since bytes is a subset of the IPLDKind type """ `ContentAddressedStore` allows any IPLD scalar key. For the in-memory backend we *require* a `bytes` hash; anything else is rejected at run @@ -60,12 +72,16 @@ async def load(self, id: IPLDKind, offset: Optional[int] = None, length: Optiona h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch """ key = cast(bytes, id) + if not isinstance(key, (bytes, bytearray)): # defensive guard + raise TypeError( + f"InMemoryCAS only supports byte‐hash keys; got {type(id).__name__}" + ) data: bytes try: data = self.store[key] except KeyError as exc: raise KeyError("Object not found in in-memory store") from exc - + if offset is not None: start = offset if length is not None: @@ -189,15 +205,15 @@ async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CI return cid async def load( # type: ignore CID is definitely in the IPLDKind type - self, + self, id: IPLDKind, offset: Optional[int] = None, length: Optional[int] = None, - suffix: Optional[int] = None + suffix: Optional[int] = None, ) -> bytes: """@private""" cid = cast(CID, id) - url: str = self.gateway_base_url + str(id) + url: str = self.gateway_base_url + str(cid) headers: dict[str, str] = {} # Construct the Range header if required diff --git a/py_hamt/zarr_hamt_store.py b/py_hamt/zarr_hamt_store.py index e378e16..7fe8069 100644 --- a/py_hamt/zarr_hamt_store.py +++ b/py_hamt/zarr_hamt_store.py @@ -63,7 +63,9 @@ def __init__(self, hamt: HAMT, read_only: bool = False) -> None: self.metadata_read_cache: dict[str, bytes] = {} """@private""" - def _map_byte_request(self, byte_range: Optional[zarr.abc.store.ByteRequest]) -> tuple[Optional[int], Optional[int], Optional[int]]: + def _map_byte_request( + self, byte_range: Optional[zarr.abc.store.ByteRequest] + ) -> tuple[Optional[int], Optional[int], Optional[int]]: """Helper to map Zarr ByteRequest to offset, length, suffix.""" offset: Optional[int] = None length: Optional[int] = None @@ -81,7 +83,7 @@ def _map_byte_request(self, byte_range: Optional[zarr.abc.store.ByteRequest]) -> suffix = byte_range.suffix else: raise TypeError(f"Unsupported ByteRequest type: {type(byte_range)}") - + return offset, length, suffix @property @@ -114,7 +116,10 @@ async def get( else: offset, length, suffix = self._map_byte_request(byte_range) val = cast( - bytes, await self.hamt.get(key, offset=offset, length=length, suffix=suffix) + bytes, + await self.hamt.get( + key, offset=offset, length=length, suffix=suffix + ), ) # We know values received will always be bytes since we only store bytes in the HAMT if is_metadata and byte_range is None: self.metadata_read_cache[key] = val @@ -127,7 +132,6 @@ async def get( print(f"Error getting key '{key}' with range {byte_range}: {e}") raise - async def get_partial_values( self, prototype: zarr.core.buffer.BufferPrototype, @@ -136,11 +140,10 @@ async def get_partial_values( """ Retrieves multiple keys or byte ranges concurrently using asyncio.gather. """ - tasks = [ - self.get(key, prototype, byte_range) - for key, byte_range in key_ranges - ] - results = await asyncio.gather(*tasks, return_exceptions=False) # Set return_exceptions=True for debugging + tasks = [self.get(key, prototype, byte_range) for key, byte_range in key_ranges] + results = await asyncio.gather( + *tasks, return_exceptions=False + ) # Set return_exceptions=True for debugging return results async def exists(self, key: str) -> bool: diff --git a/tests/test_zarr_ipfs.py b/tests/test_zarr_ipfs.py index 11ffcb8..851936e 100644 --- a/tests/test_zarr_ipfs.py +++ b/tests/test_zarr_ipfs.py @@ -139,11 +139,6 @@ async def test_write_read( with pytest.raises(NotImplementedError): await zhs.set_partial_values([]) - with pytest.raises(NotImplementedError): - await zhs.get_partial_values( - zarr.core.buffer.default_buffer_prototype(), [] - ) - previous_zarr_json: zarr.core.buffer.Buffer | None = await zhs.get( "zarr.json", zarr.core.buffer.default_buffer_prototype() ) diff --git a/tests/test_zarr_ipfs_partial.py b/tests/test_zarr_ipfs_partial.py index 702441f..a39d045 100644 --- a/tests/test_zarr_ipfs_partial.py +++ b/tests/test_zarr_ipfs_partial.py @@ -8,9 +8,6 @@ import zarr.core.buffer # Make sure to import the ByteRequest types from zarr.abc.store import RangeByteRequest, OffsetByteRequest, SuffixByteRequest -import aiohttp -from typing import Optional - from py_hamt import HAMT, KuboCAS @@ -81,7 +78,7 @@ async def test_write_read( start = time.perf_counter() # Do an initial write along with an append which is a common xarray/zarr operation # Ensure chunks are not too small for partial value tests - test_ds.to_zarr(store=zhs, chunk_store={'time': 50, 'lat': 18, 'lon': 36}) + test_ds.to_zarr(store=zhs, chunk_store={"time": 50, "lat": 18, "lon": 36}) test_ds.to_zarr(store=zhs, mode="a", append_dim="time", zarr_format=3) end = time.perf_counter() elapsed = end - start @@ -93,18 +90,13 @@ async def test_write_read( print(f"=== Verifying Gateway Suffix Support (CID: {cid}) ===") # Get the gateway URL without the /ipfs/ part - gateway_only_url = gateway_base_url - - # You can add an assertion here if you expect it to work - # If you know the gateway *might* be buggy, just printing is okay too. - assert is_correct, "IPFS Gateway did not return the correct suffix data." print("=== Reading data back in and checking if identical") - hamt_read = await HAMT.build( # Renamed to avoid confusion + hamt_read = await HAMT.build( # Renamed to avoid confusion cas=kubo_cas, root_node_id=cid, values_are_bytes=True, read_only=True ) start = time.perf_counter() - zhs_read = ZarrHAMTStore(hamt_read, read_only=True) # Use the read-only hamt + zhs_read = ZarrHAMTStore(hamt_read, read_only=True) # Use the read-only hamt ipfs_ds = xr.open_zarr(store=zhs_read) print(ipfs_ds) @@ -131,7 +123,7 @@ async def test_write_read( if k.startswith("temp/") and k != "temp/.zarray": chunk_key = k break - + assert chunk_key is not None, "Could not find a chunk key to test." print(f"Testing with chunk key: {chunk_key}") @@ -141,20 +133,20 @@ async def test_write_read( full_chunk_data = full_chunk_buffer.to_bytes() chunk_len = len(full_chunk_data) print(f"Full chunk size: {chunk_len} bytes") - + # Ensure the chunk is large enough for meaningful tests assert chunk_len > 100, "Chunk size too small for partial value tests" # Define some byte requests - range_req = RangeByteRequest(start=10, end=50) # Request 40 bytes - offset_req = OffsetByteRequest(offset=chunk_len - 30) # Request last 30 bytes - suffix_req = SuffixByteRequest(suffix=20) # Request last 20 bytes + range_req = RangeByteRequest(start=10, end=50) # Request 40 bytes + offset_req = OffsetByteRequest(offset=chunk_len - 30) # Request last 30 bytes + suffix_req = SuffixByteRequest(suffix=20) # Request last 20 bytes key_ranges_to_test = [ (chunk_key, range_req), (chunk_key, offset_req), (chunk_key, suffix_req), - (chunk_key, None), # Full read + (chunk_key, None), # Full read ] # Call get_partial_values @@ -173,7 +165,7 @@ async def test_write_read( print(f"RangeByteRequest: OK (Got {len(results[0].to_bytes())} bytes)") # Check OffsetByteRequest result - expected_offset = full_chunk_data[chunk_len - 30:] + expected_offset = full_chunk_data[chunk_len - 30 :] assert results[1].to_bytes() == expected_offset, "OffsetByteRequest failed" print(f"OffsetByteRequest: OK (Got {len(results[1].to_bytes())} bytes)") @@ -184,13 +176,13 @@ async def test_write_read( print(f"SuffixByteRequest: OK (Got {len(results[2].to_bytes())} bytes)") # Check full read result - assert results[3].to_bytes() == full_chunk_data, "Full read via get_partial_values failed" + assert results[3].to_bytes() == full_chunk_data, ( + "Full read via get_partial_values failed" + ) print(f"Full Read: OK (Got {len(results[3].to_bytes())} bytes)") - # --- End: New Partial Values Tests --- - # Tests for code coverage's sake assert await zhs_read.exists("zarr.json") # __eq__ @@ -198,7 +190,9 @@ async def test_write_read( assert zhs_read != hamt_read assert not zhs_read.supports_writes assert not zhs_read.supports_partial_writes - assert zhs_read.supports_deletes # Should be true in read-only mode for HAMT? Usually False + assert not ( + zhs_read.supports_deletes + ) # Should be true in read-only mode for HAMT? Usually False hamt_keys = set() async for k in zhs_read.hamt.keys(): @@ -216,47 +210,3 @@ async def test_write_read( with pytest.raises(NotImplementedError): await zhs_read.set_partial_values([]) - - # REMOVED: The old NotImplementedError check for get_partial_values - # with pytest.raises(NotImplementedError): - # await zhs_read.get_partial_values( - # zarr.core.buffer.default_buffer_prototype(), [] - # ) - - previous_zarr_json = await zhs_read.get( - "zarr.json", zarr.core.buffer.default_buffer_prototype() - ) - assert previous_zarr_json is not None - - # --- Test set_if_not_exists (needs a writable store) --- - await hamt_read.enable_write() - zhs_write = ZarrHAMTStore(hamt_read, read_only=False) - - # Setting a metadata file that should always exist should not change anything - await zhs_write.set_if_not_exists( - "zarr.json", - zarr.core.buffer.Buffer.from_bytes(b"should_not_change"), - ) - zarr_json_now = await zhs_write.get( - "zarr.json", zarr.core.buffer.default_buffer_prototype() - ) - assert zarr_json_now is not None - assert previous_zarr_json.to_bytes() == zarr_json_now.to_bytes() - - # now remove that metadata file and then add it back - await zhs_write.delete("zarr.json") - # doing a duplicate delete should not result in an error - await zhs_write.delete("zarr.json") - - zhs_keys_after_delete: set[str] = set() - async for k in zhs_write.list(): - zhs_keys_after_delete.add(k) - assert hamt_keys != zhs_keys_after_delete - assert "zarr.json" not in zhs_keys_after_delete - - await zhs_write.set_if_not_exists("zarr.json", previous_zarr_json) - zarr_json_now = await zhs_write.get( - "zarr.json", zarr.core.buffer.default_buffer_prototype() - ) - assert zarr_json_now is not None - assert previous_zarr_json.to_bytes() == zarr_json_now.to_bytes() \ No newline at end of file From 6e5603199b6a3810e24164a68e6a4c003ef8eaf6 Mon Sep 17 00:00:00 2001 From: TheGreatAlgo <37487508+TheGreatAlgo@users.noreply.github.com> Date: Mon, 2 Jun 2025 09:30:17 -0400 Subject: [PATCH 4/9] fix: full coverage --- py_hamt/__init__.py | 3 +- py_hamt/zarr_hamt_store.py | 1 + tests/test_zarr_ipfs_partial.py | 62 +++++++++++++++++++++++++++++++-- 3 files changed, 63 insertions(+), 3 deletions(-) diff --git a/py_hamt/__init__.py b/py_hamt/__init__.py index 2d420b1..c6da3b9 100644 --- a/py_hamt/__init__.py +++ b/py_hamt/__init__.py @@ -1,5 +1,5 @@ from .hamt import HAMT, blake3_hashfn -from .store import ContentAddressedStore, KuboCAS +from .store import ContentAddressedStore, KuboCAS, InMemoryCAS from .zarr_hamt_store import ZarrHAMTStore __all__ = [ @@ -8,4 +8,5 @@ "ContentAddressedStore", "KuboCAS", "ZarrHAMTStore", + "InMemoryCAS", ] diff --git a/py_hamt/zarr_hamt_store.py b/py_hamt/zarr_hamt_store.py index 7fe8069..5e23403 100644 --- a/py_hamt/zarr_hamt_store.py +++ b/py_hamt/zarr_hamt_store.py @@ -8,6 +8,7 @@ from py_hamt.hamt import HAMT + class ZarrHAMTStore(zarr.abc.store.Store): """ Write and read Zarr v3s with a HAMT. diff --git a/tests/test_zarr_ipfs_partial.py b/tests/test_zarr_ipfs_partial.py index a39d045..24ff2cc 100644 --- a/tests/test_zarr_ipfs_partial.py +++ b/tests/test_zarr_ipfs_partial.py @@ -6,11 +6,12 @@ import pytest import zarr import zarr.core.buffer + # Make sure to import the ByteRequest types from zarr.abc.store import RangeByteRequest, OffsetByteRequest, SuffixByteRequest -from py_hamt import HAMT, KuboCAS +from py_hamt import HAMT, KuboCAS, InMemoryCAS from py_hamt.zarr_hamt_store import ZarrHAMTStore @@ -57,6 +58,7 @@ def random_zarr_dataset(): yield ds + # This test also collects miscellaneous statistics about performance, run with pytest -s to see these statistics being printed out @pytest.mark.asyncio(loop_scope="session") # ← match the loop of the fixture async def test_write_read( @@ -68,7 +70,6 @@ async def test_write_read( print("=== Writing this xarray Dataset to a Zarr v3 on IPFS ===") print(test_ds) - async with KuboCAS( # <-- own and auto-close session rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url ) as kubo_cas: @@ -210,3 +211,60 @@ async def test_write_read( with pytest.raises(NotImplementedError): await zhs_read.set_partial_values([]) + + +@pytest.mark.asyncio +async def test_zarr_hamt_store_byte_request_errors(): + """Tests error handling for unsupported or invalid ByteRequest types.""" + cas = InMemoryCAS() + hamt = await HAMT.build(cas=cas, values_are_bytes=True) + zhs = ZarrHAMTStore(hamt) + proto = zarr.core.buffer.default_buffer_prototype() + await zhs.set("some_key", proto.buffer.from_bytes(b"0123456789")) + + # Test for ValueError with an invalid range (end < start) + invalid_range_req = RangeByteRequest(start=10, end=5) + with pytest.raises(ValueError, match="End must be >= start for RangeByteRequest"): + await zhs.get("some_key", proto, byte_range=invalid_range_req) + + # Test for TypeError with a custom, unsupported request type + class DummyUnsupportedRequest: + pass + + unsupported_req = DummyUnsupportedRequest() + with pytest.raises(TypeError, match="Unsupported ByteRequest type"): + await zhs.get("some_key", proto, byte_range=unsupported_req) + + +@pytest.mark.asyncio +async def test_in_memory_cas_partial_reads(): + """ + Tests the partial read logic (offset, length, suffix) in the InMemoryCAS. + """ + cas = InMemoryCAS() + test_data = b"0123456789abcdefghijklmnopqrstuvwxyz" # 36 bytes long + data_id = await cas.save(test_data, "raw") + + # Test case 1: offset and length + result = await cas.load(data_id, offset=10, length=5) + assert result == b"abcde" + + # Test case 2: offset only + result = await cas.load(data_id, offset=30) + assert result == b"uvwxyz" + + # Test case 3: suffix only + result = await cas.load(data_id, suffix=6) + assert result == b"uvwxyz" + + # Test case 4: Full read (for completeness) + result = await cas.load(data_id) + assert result == test_data + + # Test case 5: Key not found (covers `try...except KeyError`) + with pytest.raises(KeyError, match="Object not found in in-memory store"): + await cas.load(b"\x00\x01\x02\x03\x04") # Some random, non-existent key + + # Test case 6: Invalid key type (covers `isinstance` check) + with pytest.raises(TypeError, match="InMemoryCAS only supports byte‐hash keys"): + await cas.load(12345) # Pass an integer instead of bytes From 46b2f9972c870daeef7eb7dc9df05b8cfd5727b5 Mon Sep 17 00:00:00 2001 From: TheGreatAlgo <37487508+TheGreatAlgo@users.noreply.github.com> Date: Tue, 3 Jun 2025 08:00:39 -0400 Subject: [PATCH 5/9] fix: re-order --- py_hamt/__init__.py | 2 +- py_hamt/hamt.py | 2 +- py_hamt/store.py | 3 +-- py_hamt/zarr_hamt_store.py | 5 ++--- tests/test_zarr_ipfs_encrypted.py | 5 ----- tests/test_zarr_ipfs_partial.py | 8 +++----- 6 files changed, 8 insertions(+), 17 deletions(-) diff --git a/py_hamt/__init__.py b/py_hamt/__init__.py index 03c5986..0a63761 100644 --- a/py_hamt/__init__.py +++ b/py_hamt/__init__.py @@ -1,6 +1,6 @@ from .encryption_hamt_store import SimpleEncryptedZarrHAMTStore from .hamt import HAMT, blake3_hashfn -from .store import ContentAddressedStore, KuboCAS, InMemoryCAS +from .store import ContentAddressedStore, InMemoryCAS, KuboCAS from .zarr_hamt_store import ZarrHAMTStore __all__ = [ diff --git a/py_hamt/hamt.py b/py_hamt/hamt.py index a232da9..bf1716d 100644 --- a/py_hamt/hamt.py +++ b/py_hamt/hamt.py @@ -8,8 +8,8 @@ Callable, Dict, Iterator, - cast, Optional, + cast, ) import dag_cbor diff --git a/py_hamt/store.py b/py_hamt/store.py index 33b9897..784d539 100644 --- a/py_hamt/store.py +++ b/py_hamt/store.py @@ -1,7 +1,6 @@ import asyncio -import aiohttp from abc import ABC, abstractmethod -from typing import Any, Literal, cast, Optional +from typing import Any, Literal, Optional, cast import aiohttp from dag_cbor.ipld import IPLDKind diff --git a/py_hamt/zarr_hamt_store.py b/py_hamt/zarr_hamt_store.py index 79fab77..e265656 100644 --- a/py_hamt/zarr_hamt_store.py +++ b/py_hamt/zarr_hamt_store.py @@ -1,11 +1,10 @@ +import asyncio from collections.abc import AsyncIterator, Iterable -from typing import cast +from typing import Optional, cast import zarr.abc.store import zarr.core.buffer from zarr.core.common import BytesLike -from typing import Optional -import asyncio from py_hamt.hamt import HAMT diff --git a/tests/test_zarr_ipfs_encrypted.py b/tests/test_zarr_ipfs_encrypted.py index 8d65d1c..32c7a2e 100644 --- a/tests/test_zarr_ipfs_encrypted.py +++ b/tests/test_zarr_ipfs_encrypted.py @@ -195,11 +195,6 @@ async def test_encrypted_write_read( with pytest.raises(NotImplementedError): await ezhs_read_ok.set_partial_values([]) - with pytest.raises(NotImplementedError): - await ezhs_read_ok.get_partial_values( - zarr.core.buffer.default_buffer_prototype(), [] - ) - with pytest.raises(Exception): await ezhs_read_ok.set("new_key", np.array([b"a"], dtype=np.bytes_)) diff --git a/tests/test_zarr_ipfs_partial.py b/tests/test_zarr_ipfs_partial.py index 24ff2cc..0cd076a 100644 --- a/tests/test_zarr_ipfs_partial.py +++ b/tests/test_zarr_ipfs_partial.py @@ -2,17 +2,15 @@ import numpy as np import pandas as pd -import xarray as xr import pytest +import xarray as xr import zarr import zarr.core.buffer # Make sure to import the ByteRequest types -from zarr.abc.store import RangeByteRequest, OffsetByteRequest, SuffixByteRequest - - -from py_hamt import HAMT, KuboCAS, InMemoryCAS +from zarr.abc.store import OffsetByteRequest, RangeByteRequest, SuffixByteRequest +from py_hamt import HAMT, InMemoryCAS, KuboCAS from py_hamt.zarr_hamt_store import ZarrHAMTStore From 0fb9be9d85d4e9bc485dc601f704f6406079c08e Mon Sep 17 00:00:00 2001 From: TheGreatAlgo <37487508+TheGreatAlgo@users.noreply.github.com> Date: Tue, 3 Jun 2025 08:21:18 -0400 Subject: [PATCH 6/9] fix: update ruff and mypy --- py_hamt/store.py | 4 ++-- tests/test_zarr_ipfs_encrypted.py | 11 ++++------- tests/test_zarr_ipfs_partial.py | 12 ++++++------ 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/py_hamt/store.py b/py_hamt/store.py index 784d539..e82fad6 100644 --- a/py_hamt/store.py +++ b/py_hamt/store.py @@ -61,7 +61,7 @@ async def load( offset: Optional[int] = None, length: Optional[int] = None, suffix: Optional[int] = None, - ) -> bytes: # type: ignore since bytes is a subset of the IPLDKind type + ) -> bytes: """ `ContentAddressedStore` allows any IPLD scalar key. For the in-memory backend we *require* a `bytes` hash; anything else is rejected at run @@ -283,7 +283,7 @@ async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CI cid = cid.set(codec=codec) return cid - async def load( # type: ignore CID is definitely in the IPLDKind type + async def load( self, id: IPLDKind, offset: Optional[int] = None, diff --git a/tests/test_zarr_ipfs_encrypted.py b/tests/test_zarr_ipfs_encrypted.py index 32c7a2e..ccbf114 100644 --- a/tests/test_zarr_ipfs_encrypted.py +++ b/tests/test_zarr_ipfs_encrypted.py @@ -4,9 +4,8 @@ import pandas as pd import pytest import xarray as xr -import zarr -import zarr.core.buffer from Crypto.Random import get_random_bytes +from dag_cbor.ipld import IPLDKind from py_hamt import HAMT, KuboCAS, SimpleEncryptedZarrHAMTStore from py_hamt.zarr_hamt_store import ZarrHAMTStore @@ -91,9 +90,7 @@ async def test_encrypted_write_read( correct_key = get_random_bytes(32) wrong_key = get_random_bytes(32) header = b"test-encryption-header" - - root_cid = None - + root_cid: IPLDKind = None # --- Write Phase --- async with KuboCAS( rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url @@ -109,7 +106,7 @@ async def test_encrypted_write_read( assert ezhs_write != hamt_write assert ezhs_write.supports_writes - test_ds.to_zarr(store=ezhs_write, mode="w", zarr_format=3) # Use mode='w' + test_ds.to_zarr(store=ezhs_write, mode="w", zarr_format=3) # type: ignore await hamt_write.make_read_only() root_cid = hamt_write.root_node_id @@ -196,7 +193,7 @@ async def test_encrypted_write_read( await ezhs_read_ok.set_partial_values([]) with pytest.raises(Exception): - await ezhs_read_ok.set("new_key", np.array([b"a"], dtype=np.bytes_)) + await ezhs_read_ok.set("new_key", np.array([b"a"], dtype=np.bytes_)) # type: ignore with pytest.raises(Exception): await ezhs_read_ok.delete("zarr.json") diff --git a/tests/test_zarr_ipfs_partial.py b/tests/test_zarr_ipfs_partial.py index 0cd076a..da3ba6c 100644 --- a/tests/test_zarr_ipfs_partial.py +++ b/tests/test_zarr_ipfs_partial.py @@ -77,8 +77,8 @@ async def test_write_read( start = time.perf_counter() # Do an initial write along with an append which is a common xarray/zarr operation # Ensure chunks are not too small for partial value tests - test_ds.to_zarr(store=zhs, chunk_store={"time": 50, "lat": 18, "lon": 36}) - test_ds.to_zarr(store=zhs, mode="a", append_dim="time", zarr_format=3) + test_ds.to_zarr(store=zhs, chunk_store={"time": 50, "lat": 18, "lon": 36}) # type: ignore + test_ds.to_zarr(store=zhs, mode="a", append_dim="time", zarr_format=3) # type: ignore end = time.perf_counter() elapsed = end - start print("=== Write Stats") @@ -149,7 +149,7 @@ async def test_write_read( ] # Call get_partial_values - results = await zhs_read.get_partial_values(proto, key_ranges_to_test) + results = await zhs_read.get_partial_values(proto, key_ranges_to_test) # type: ignore # Assertions assert len(results) == 4 @@ -202,10 +202,10 @@ async def test_write_read( zhs_keys.add(k) assert hamt_keys == zhs_keys - zhs_keys: set[str] = set() + zhs_keys_prefix: set[str] = set() async for k in zhs_read.list_prefix(""): - zhs_keys.add(k) - assert hamt_keys == zhs_keys + zhs_keys_prefix.add(k) + assert hamt_keys == zhs_keys_prefix with pytest.raises(NotImplementedError): await zhs_read.set_partial_values([]) From f7f169dea16e9b96b5deb72ec38035e58f0efefe Mon Sep 17 00:00:00 2001 From: TheGreatAlgo <37487508+TheGreatAlgo@users.noreply.github.com> Date: Tue, 3 Jun 2025 08:25:15 -0400 Subject: [PATCH 7/9] fix: pre-commit --- py_hamt/zarr_hamt_store.py | 2 +- tests/test_zarr_ipfs_encrypted.py | 2 +- tests/test_zarr_ipfs_partial.py | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/py_hamt/zarr_hamt_store.py b/py_hamt/zarr_hamt_store.py index e265656..9ef5042 100644 --- a/py_hamt/zarr_hamt_store.py +++ b/py_hamt/zarr_hamt_store.py @@ -91,7 +91,7 @@ def _map_byte_request( if isinstance(byte_range, zarr.abc.store.RangeByteRequest): offset = byte_range.start length = byte_range.end - byte_range.start - if length < 0: + if length is not None and length < 0: raise ValueError("End must be >= start for RangeByteRequest") elif isinstance(byte_range, zarr.abc.store.OffsetByteRequest): offset = byte_range.offset diff --git a/tests/test_zarr_ipfs_encrypted.py b/tests/test_zarr_ipfs_encrypted.py index ccbf114..93aa74b 100644 --- a/tests/test_zarr_ipfs_encrypted.py +++ b/tests/test_zarr_ipfs_encrypted.py @@ -193,7 +193,7 @@ async def test_encrypted_write_read( await ezhs_read_ok.set_partial_values([]) with pytest.raises(Exception): - await ezhs_read_ok.set("new_key", np.array([b"a"], dtype=np.bytes_)) # type: ignore + await ezhs_read_ok.set("new_key", np.array([b"a"], dtype=np.bytes_)) # type: ignore with pytest.raises(Exception): await ezhs_read_ok.delete("zarr.json") diff --git a/tests/test_zarr_ipfs_partial.py b/tests/test_zarr_ipfs_partial.py index da3ba6c..e7a7a05 100644 --- a/tests/test_zarr_ipfs_partial.py +++ b/tests/test_zarr_ipfs_partial.py @@ -77,8 +77,8 @@ async def test_write_read( start = time.perf_counter() # Do an initial write along with an append which is a common xarray/zarr operation # Ensure chunks are not too small for partial value tests - test_ds.to_zarr(store=zhs, chunk_store={"time": 50, "lat": 18, "lon": 36}) # type: ignore - test_ds.to_zarr(store=zhs, mode="a", append_dim="time", zarr_format=3) # type: ignore + test_ds.to_zarr(store=zhs, chunk_store={"time": 50, "lat": 18, "lon": 36}) # type: ignore + test_ds.to_zarr(store=zhs, mode="a", append_dim="time", zarr_format=3) # type: ignore end = time.perf_counter() elapsed = end - start print("=== Write Stats") @@ -149,7 +149,7 @@ async def test_write_read( ] # Call get_partial_values - results = await zhs_read.get_partial_values(proto, key_ranges_to_test) # type: ignore + results = await zhs_read.get_partial_values(proto, key_ranges_to_test) # type: ignore # Assertions assert len(results) == 4 From e17003af4219dd653cae2aa61bb81dfc81a6fea5 Mon Sep 17 00:00:00 2001 From: Faolain Date: Wed, 4 Jun 2025 19:03:01 -0400 Subject: [PATCH 8/9] deps: update kubo to latest in tests --- CLAUDE.md | 87 ++++++++++++++++++++++++++++++++++++++++++ tests/testing_utils.py | 2 +- 2 files changed, 88 insertions(+), 1 deletion(-) create mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..49c6cf4 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,87 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Common Development Commands + +Setup environment: +```bash +uv sync +source .venv/bin/activate +pre-commit install +``` + +Run all checks (tests, linting, formatting, type checking): +```bash +bash run-checks.sh +``` + +Run tests: +```bash +# All tests (requires IPFS daemon or Docker) +pytest --ipfs --cov=py_hamt tests/ + +# Quick tests without IPFS integration +pytest --cov=py_hamt tests/ + +# Single test file +pytest tests/test_hamt.py + +# Coverage report +uv run coverage report --fail-under=100 --show-missing +``` + +Linting and formatting: +```bash +# Run all pre-commit hooks +uv run pre-commit run --all-files --show-diff-on-failure + +# Fix auto-fixable ruff issues +uv run ruff check --fix +``` + +Type checking and other tools: +```bash +# Type checking is handled by pre-commit hooks (mypy) +# Documentation preview +uv run pdoc py_hamt +``` + +## Architecture Overview + +py-hamt implements a Hash Array Mapped Trie (HAMT) for IPFS/IPLD content-addressed storage. The core architecture follows this pattern: + +1. **ContentAddressedStore (CAS)** - Abstract storage layer (store.py) + - `KuboCAS` - IPFS/Kubo implementation for production + - `InMemoryCAS` - In-memory implementation for testing + +2. **HAMT** - Core data structure (hamt.py) + - Uses blake3 hashing by default + - Implements content-addressed trie for efficient key-value storage + - Supports async operations for large datasets + +3. **ZarrHAMTStore** - Zarr integration (zarr_hamt_store.py) + - Implements zarr.abc.store.Store interface + - Enables storing large Zarr arrays on IPFS via HAMT + - Keys stored verbatim, values as raw bytes + +4. **Encryption Layer** - Optional encryption (encryption_hamt_store.py) + - `SimpleEncryptedZarrHAMTStore` for fully encrypted storage + +## Key Design Patterns + +- All storage operations are async to handle IPFS network calls +- Content addressing means identical data gets same hash/CID +- HAMT provides O(log n) access time for large key sets +- Store abstractions allow swapping storage backends +- Type hints required throughout (mypy enforced) +- 100% test coverage required with hypothesis property-based testing + +## IPFS Integration Requirements + +Tests require either: +- Local IPFS daemon running (`ipfs daemon`) +- Docker available for containerized IPFS +- Neither (unit tests only, integration tests skip) + +The `--ipfs` pytest flag controls IPFS test execution. diff --git a/tests/testing_utils.py b/tests/testing_utils.py index 154f4cf..d109d43 100644 --- a/tests/testing_utils.py +++ b/tests/testing_utils.py @@ -169,7 +169,7 @@ def create_ipfs(): if client is None: pytest.skip("Neither IPFS daemon nor Docker available – skipping IPFS tests") - image = "ipfs/kubo:v0.35.0" + image = "ipfs/kubo:master-latest" rpc_p = _free_port() gw_p = _free_port() From 72fdcfabfe427e48ddc0c0d3c60612e5419073b8 Mon Sep 17 00:00:00 2001 From: Faolain Date: Wed, 4 Jun 2025 19:15:58 -0400 Subject: [PATCH 9/9] test: add test for ipfs gateway partials --- tests/test_zarr_ipfs_partial.py | 193 ++++++++++++++++++++++++++++++++ 1 file changed, 193 insertions(+) diff --git a/tests/test_zarr_ipfs_partial.py b/tests/test_zarr_ipfs_partial.py index e7a7a05..84de087 100644 --- a/tests/test_zarr_ipfs_partial.py +++ b/tests/test_zarr_ipfs_partial.py @@ -234,6 +234,199 @@ class DummyUnsupportedRequest: await zhs.get("some_key", proto, byte_range=unsupported_req) +@pytest.mark.asyncio +async def test_ipfs_gateway_compression_behavior(create_ipfs): + """ + Test to verify whether IPFS gateways decompress data before applying + byte range requests, which would negate compression benefits for partial reads. + + This test creates highly compressible data, stores it via IPFS, and then + compares the bytes returned by partial vs full reads to determine if + the gateway is operating on compressed or decompressed data. + """ + rpc_base_url, gateway_base_url = create_ipfs + + print("\n=== IPFS Gateway Compression Behavior Test ===") + + # Create highly compressible test data + print("Creating highly compressible test data...") + data = np.zeros((50, 50, 50), dtype=np.float32) # 500KB of zeros + # Add small amount of variation + data[0:5, 0:5, 0:5] = np.random.randn(5, 5, 5) + + ds = xr.Dataset({"compressible_var": (["x", "y", "z"], data)}) + + print(f"Original data shape: {data.shape}") + print(f"Original data size: {data.nbytes:,} bytes") + + # Custom CAS to track actual network transfers + class NetworkTrackingKuboCAS(KuboCAS): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.load_sizes = {} + self.save_sizes = {} + + async def save(self, data, codec=None): + cid = await super().save(data, codec) + self.save_sizes[str(cid)] = len(data) + print(f"Saved to IPFS: {str(cid)[:12]}... ({len(data):,} bytes)") + return cid + + async def load(self, cid, offset=None, length=None, suffix=None): + result = await super().load(cid, offset, length, suffix) + + range_desc = "full" + if offset is not None or length is not None or suffix is not None: + range_desc = f"offset={offset}, length={length}, suffix={suffix}" + + key = f"{str(cid)[:12]}... ({range_desc})" + self.load_sizes[key] = len(result) + print(f"Loaded from IPFS: {key} -> {len(result):,} bytes") + return result + + async with NetworkTrackingKuboCAS( + rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url + ) as tracking_cas: + # Write dataset with compression + print("\n=== Writing to IPFS with Blosc compression ===") + hamt = await HAMT.build(cas=tracking_cas, values_are_bytes=True) + store = ZarrHAMTStore(hamt) + + # Use smaller chunks to ensure meaningful compression + ds.chunk({"x": 25, "y": 25, "z": 25}).to_zarr( + store=store, mode="w", zarr_format=3 + ) + + await hamt.make_read_only() + root_cid = hamt.root_node_id + print(f"Root CID: {root_cid}") + + # Read back and test compression behavior + print("\n=== Testing Compression vs Partial Reads ===") + hamt_read = await HAMT.build( + cas=tracking_cas, + root_node_id=root_cid, + values_are_bytes=True, + read_only=True, + ) + store_read = ZarrHAMTStore(hamt_read, read_only=True) + + # Find the largest data chunk (likely the actual array data) + chunk_key = None + chunk_size = 0 + async for key in store_read.list(): + if ( + "compressible_var" in key + and not key.endswith(".zarray") + and not key.endswith("zarr.json") + ): + # Get size to find the largest chunk + proto = zarr.core.buffer.default_buffer_prototype() + chunk_buffer = await store_read.get(key, proto) + if chunk_buffer and len(chunk_buffer.to_bytes()) > chunk_size: + chunk_key = key + chunk_size = len(chunk_buffer.to_bytes()) + + assert chunk_key is not None, "No data chunk found" + print(f"Testing with largest chunk: {chunk_key}") + + # Get full chunk for baseline + proto = zarr.core.buffer.default_buffer_prototype() + full_chunk = await store_read.get(chunk_key, proto) + full_compressed_size = len(full_chunk.to_bytes()) + print(f"Full chunk compressed size: {full_compressed_size:,} bytes") + + # Calculate expected uncompressed size + # 25x25x25 float32 = 62,500 bytes uncompressed + expected_uncompressed_size = 25 * 25 * 25 * 4 + compression_ratio = expected_uncompressed_size / full_compressed_size + print(f"Estimated compression ratio: {compression_ratio:.1f}:1") + + # Test different partial read sizes + test_ranges = [ + (0, full_compressed_size // 4, "25% of compressed"), + (0, full_compressed_size // 2, "50% of compressed"), + (full_compressed_size // 4, full_compressed_size // 2, "25%-50% range"), + ] + + print("\n=== Partial Read Analysis ===") + print("If gateway operates on compressed data:") + print(" - Partial reads should return exactly the requested byte ranges") + print(" - Network transfer should be proportional to compressed size") + print("If gateway decompresses before range requests:") + print(" - Partial reads may return more data than expected") + print(" - Network transfer loses compression benefits") + print() + + compression_preserved = True + + for start, end, description in test_ranges: + length = end - start + byte_req = RangeByteRequest(start=start, end=end) + + # Clear the load tracking for this specific test + original_load_count = len(tracking_cas.load_sizes) + + partial_chunk = await store_read.get(chunk_key, proto, byte_range=byte_req) + partial_size = len(partial_chunk.to_bytes()) + + # Find the new load entry + new_loads = list(tracking_cas.load_sizes.items())[original_load_count:] + network_bytes = new_loads[0][1] if new_loads else partial_size + + expected_percentage = length / full_compressed_size + actual_percentage = partial_size / full_compressed_size + network_efficiency = network_bytes / full_compressed_size + + print(f"Range request: {description}") + print( + f" Requested: {length:,} bytes ({expected_percentage:.1%} of compressed)" + ) + print( + f" Received: {partial_size:,} bytes ({actual_percentage:.1%} of compressed)" + ) + print( + f" Network transfer: {network_bytes:,} bytes ({network_efficiency:.1%} of compressed)" + ) + + # Key test: if we get significantly more data than requested, + # the gateway is likely decompressing before applying ranges + if partial_size > length * 1.1: # 10% tolerance for overhead + compression_preserved = False + print( + f" ⚠️ Received {partial_size / length:.1f}x more data than requested!" + ) + print(" ⚠️ Gateway appears to decompress before applying ranges") + else: + print(" ✅ Range applied efficiently to compressed data") + + # Verify the partial data makes sense + full_data = full_chunk.to_bytes() + expected_partial = full_data[start:end] + assert partial_chunk.to_bytes() == expected_partial, ( + "Partial data doesn't match expected range" + ) + print(" ✅ Partial data content verified") + print() + + # Summary analysis + print("=== Final Analysis ===") + if compression_preserved: + print("✅ IPFS gateway preserves compression benefits for partial reads") + print(" - Byte ranges are applied to compressed data") + print(" - Network transfers are efficient") + else: + print("⚠️ IPFS gateway appears to decompress before applying ranges") + print(" - Partial reads may not provide expected bandwidth savings") + print(" - Consider alternative storage strategies (sharding, etc.)") + + print("\nCompression statistics:") + print(f" - Uncompressed chunk size: {expected_uncompressed_size:,} bytes") + print(f" - Compressed chunk size: {full_compressed_size:,} bytes") + print(f" - Compression ratio: {compression_ratio:.1f}:1") + print(f" - Compression preserved in partial reads: {compression_preserved}") + + @pytest.mark.asyncio async def test_in_memory_cas_partial_reads(): """