Skip to content
Closed
87 changes: 87 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Common Development Commands

Setup environment:
```bash
uv sync
source .venv/bin/activate
pre-commit install
```

Run all checks (tests, linting, formatting, type checking):
```bash
bash run-checks.sh
```

Run tests:
```bash
# All tests (requires IPFS daemon or Docker)
pytest --ipfs --cov=py_hamt tests/

# Quick tests without IPFS integration
pytest --cov=py_hamt tests/

# Single test file
pytest tests/test_hamt.py

# Coverage report
uv run coverage report --fail-under=100 --show-missing
```

Linting and formatting:
```bash
# Run all pre-commit hooks
uv run pre-commit run --all-files --show-diff-on-failure

# Fix auto-fixable ruff issues
uv run ruff check --fix
```

Type checking and other tools:
```bash
# Type checking is handled by pre-commit hooks (mypy)
# Documentation preview
uv run pdoc py_hamt
```

## Architecture Overview

py-hamt implements a Hash Array Mapped Trie (HAMT) for IPFS/IPLD content-addressed storage. The core architecture follows this pattern:

1. **ContentAddressedStore (CAS)** - Abstract storage layer (store.py)
- `KuboCAS` - IPFS/Kubo implementation for production
- `InMemoryCAS` - In-memory implementation for testing

2. **HAMT** - Core data structure (hamt.py)
- Uses blake3 hashing by default
- Implements content-addressed trie for efficient key-value storage
- Supports async operations for large datasets

3. **ZarrHAMTStore** - Zarr integration (zarr_hamt_store.py)
- Implements zarr.abc.store.Store interface
- Enables storing large Zarr arrays on IPFS via HAMT
- Keys stored verbatim, values as raw bytes

4. **Encryption Layer** - Optional encryption (encryption_hamt_store.py)
- `SimpleEncryptedZarrHAMTStore` for fully encrypted storage

## Key Design Patterns

- All storage operations are async to handle IPFS network calls
- Content addressing means identical data gets same hash/CID
- HAMT provides O(log n) access time for large key sets
- Store abstractions allow swapping storage backends
- Type hints required throughout (mypy enforced)
- 100% test coverage required with hypothesis property-based testing

## IPFS Integration Requirements

Tests require either:
- Local IPFS daemon running (`ipfs daemon`)
- Docker available for containerized IPFS
- Neither (unit tests only, integration tests skip)

The `--ipfs` pytest flag controls IPFS test execution.
3 changes: 2 additions & 1 deletion py_hamt/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .encryption_hamt_store import SimpleEncryptedZarrHAMTStore
from .hamt import HAMT, blake3_hashfn
from .store import ContentAddressedStore, KuboCAS
from .store import ContentAddressedStore, InMemoryCAS, KuboCAS
from .zarr_hamt_store import ZarrHAMTStore

__all__ = [
Expand All @@ -9,5 +9,6 @@
"ContentAddressedStore",
"KuboCAS",
"ZarrHAMTStore",
"InMemoryCAS",
"SimpleEncryptedZarrHAMTStore",
]
13 changes: 11 additions & 2 deletions py_hamt/hamt.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
Callable,
Dict,
Iterator,
Optional,
cast,
)

Expand Down Expand Up @@ -589,10 +590,18 @@ async def delete(self, key: str) -> None:
# If we didn't make a change, then this key must not exist within the HAMT
raise KeyError

async def get(self, key: str) -> IPLDKind:
async def get(
self,
key: str,
offset: Optional[int] = None,
length: Optional[int] = None,
suffix: Optional[int] = None,
) -> IPLDKind:
"""Get a value."""
pointer: IPLDKind = await self.get_pointer(key)
data: bytes = await self.cas.load(pointer)
data: bytes = await self.cas.load(
pointer, offset=offset, length=length, suffix=suffix
)
if self.values_are_bytes:
return data
else:
Expand Down
62 changes: 54 additions & 8 deletions py_hamt/store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
from abc import ABC, abstractmethod
from typing import Any, Literal, cast
from typing import Any, Literal, Optional, cast

import aiohttp
from dag_cbor.ipld import IPLDKind
Expand Down Expand Up @@ -30,7 +30,13 @@ async def save(self, data: bytes, codec: CodecInput) -> IPLDKind:
"""

@abstractmethod
async def load(self, id: IPLDKind) -> bytes:
async def load(
self,
id: IPLDKind,
offset: Optional[int] = None,
length: Optional[int] = None,
suffix: Optional[int] = None,
) -> bytes:
"""Retrieve data."""


Expand All @@ -49,7 +55,13 @@ async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> by
self.store[hash] = data
return hash

async def load(self, id: IPLDKind) -> bytes:
async def load(
self,
id: IPLDKind,
offset: Optional[int] = None,
length: Optional[int] = None,
suffix: Optional[int] = None,
) -> bytes:
"""
`ContentAddressedStore` allows any IPLD scalar key. For the in-memory
backend we *require* a `bytes` hash; anything else is rejected at run
Expand All @@ -64,12 +76,24 @@ async def load(self, id: IPLDKind) -> bytes:
raise TypeError(
f"InMemoryCAS only supports byte‐hash keys; got {type(id).__name__}"
)

data: bytes
try:
return self.store[key]
data = self.store[key]
except KeyError as exc:
raise KeyError("Object not found in in-memory store") from exc

if offset is not None:
start = offset
if length is not None:
end = start + length
return data[start:end]
else:
return data[start:]
elif suffix is not None: # If only length is given, assume start from 0
return data[-suffix:]
else: # Full load
return data


class KuboCAS(ContentAddressedStore):
"""
Expand Down Expand Up @@ -259,11 +283,33 @@ async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CI
cid = cid.set(codec=codec)
return cid

async def load(self, id: IPLDKind) -> bytes:
async def load(
self,
id: IPLDKind,
offset: Optional[int] = None,
length: Optional[int] = None,
suffix: Optional[int] = None,
) -> bytes:
"""@private"""
cid = cast(CID, id) # CID is definitely in the IPLDKind type
cid = cast(CID, id)
url: str = self.gateway_base_url + str(cid)
headers: dict[str, str] = {}

# Construct the Range header if required
if offset is not None:
start = offset
if length is not None:
# Standard HTTP Range: bytes=start-end (inclusive)
end = start + length - 1
headers["Range"] = f"bytes={start}-{end}"
else:
# Standard HTTP Range: bytes=start- (from start to end)
headers["Range"] = f"bytes={start}-"
elif suffix is not None:
# Standard HTTP Range: bytes=-N (last N bytes)
headers["Range"] = f"bytes=-{suffix}"

async with self._sem: # throttle gateway
async with self._loop_session().get(url) as resp:
async with self._loop_session().get(url, headers=headers or None) as resp:
resp.raise_for_status()
return await resp.read()
49 changes: 43 additions & 6 deletions py_hamt/zarr_hamt_store.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
from collections.abc import AsyncIterator, Iterable
from typing import cast
from typing import Optional, cast

import zarr.abc.store
import zarr.core.buffer
Expand Down Expand Up @@ -78,6 +79,29 @@ def __init__(self, hamt: HAMT, read_only: bool = False) -> None:
self.metadata_read_cache: dict[str, bytes] = {}
"""@private"""

def _map_byte_request(
self, byte_range: Optional[zarr.abc.store.ByteRequest]
) -> tuple[Optional[int], Optional[int], Optional[int]]:
"""Helper to map Zarr ByteRequest to offset, length, suffix."""
offset: Optional[int] = None
length: Optional[int] = None
suffix: Optional[int] = None

if byte_range:
if isinstance(byte_range, zarr.abc.store.RangeByteRequest):
offset = byte_range.start
length = byte_range.end - byte_range.start
if length is not None and length < 0:
raise ValueError("End must be >= start for RangeByteRequest")
elif isinstance(byte_range, zarr.abc.store.OffsetByteRequest):
offset = byte_range.offset
elif isinstance(byte_range, zarr.abc.store.SuffixByteRequest):
suffix = byte_range.suffix
else:
raise TypeError(f"Unsupported ByteRequest type: {type(byte_range)}")

return offset, length, suffix

@property
def read_only(self) -> bool:
"""@private"""
Expand All @@ -103,27 +127,40 @@ async def get(
len(key) >= 9 and key[-9:] == "zarr.json"
) # if path ends with zarr.json

if is_metadata and key in self.metadata_read_cache:
if is_metadata and byte_range is None and key in self.metadata_read_cache:
val = self.metadata_read_cache[key]
else:
offset, length, suffix = self._map_byte_request(byte_range)
val = cast(
bytes, await self.hamt.get(key)
bytes,
await self.hamt.get(
key, offset=offset, length=length, suffix=suffix
),
) # We know values received will always be bytes since we only store bytes in the HAMT
if is_metadata:
if is_metadata and byte_range is None:
self.metadata_read_cache[key] = val

return prototype.buffer.from_bytes(val)
except KeyError:
# Sometimes zarr queries keys that don't exist anymore, just return nothing on those cases
return None
except Exception as e:
print(f"Error getting key '{key}' with range {byte_range}: {e}")
raise

async def get_partial_values(
self,
prototype: zarr.core.buffer.BufferPrototype,
key_ranges: Iterable[tuple[str, zarr.abc.store.ByteRequest | None]],
) -> list[zarr.core.buffer.Buffer | None]:
"""@private"""
raise NotImplementedError
"""
Retrieves multiple keys or byte ranges concurrently using asyncio.gather.
"""
tasks = [self.get(key, prototype, byte_range) for key, byte_range in key_ranges]
results = await asyncio.gather(
*tasks, return_exceptions=False
) # Set return_exceptions=True for debugging
return results

async def exists(self, key: str) -> bool:
"""@private"""
Expand Down
5 changes: 0 additions & 5 deletions tests/test_zarr_ipfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,6 @@ async def test_write_read(
with pytest.raises(NotImplementedError):
await zhs.set_partial_values([])

with pytest.raises(NotImplementedError):
await zhs.get_partial_values(
zarr.core.buffer.default_buffer_prototype(), []
)

previous_zarr_json: zarr.core.buffer.Buffer | None = await zhs.get(
"zarr.json", zarr.core.buffer.default_buffer_prototype()
)
Expand Down
16 changes: 4 additions & 12 deletions tests/test_zarr_ipfs_encrypted.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
import pandas as pd
import pytest
import xarray as xr
import zarr
import zarr.core.buffer
from Crypto.Random import get_random_bytes
from dag_cbor.ipld import IPLDKind

from py_hamt import HAMT, KuboCAS, SimpleEncryptedZarrHAMTStore
from py_hamt.zarr_hamt_store import ZarrHAMTStore
Expand Down Expand Up @@ -91,9 +90,7 @@ async def test_encrypted_write_read(
correct_key = get_random_bytes(32)
wrong_key = get_random_bytes(32)
header = b"test-encryption-header"

root_cid = None

root_cid: IPLDKind = None
# --- Write Phase ---
async with KuboCAS(
rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url
Expand All @@ -109,7 +106,7 @@ async def test_encrypted_write_read(
assert ezhs_write != hamt_write

assert ezhs_write.supports_writes
test_ds.to_zarr(store=ezhs_write, mode="w", zarr_format=3) # Use mode='w'
test_ds.to_zarr(store=ezhs_write, mode="w", zarr_format=3) # type: ignore

await hamt_write.make_read_only()
root_cid = hamt_write.root_node_id
Expand Down Expand Up @@ -195,13 +192,8 @@ async def test_encrypted_write_read(
with pytest.raises(NotImplementedError):
await ezhs_read_ok.set_partial_values([])

with pytest.raises(NotImplementedError):
await ezhs_read_ok.get_partial_values(
zarr.core.buffer.default_buffer_prototype(), []
)

with pytest.raises(Exception):
await ezhs_read_ok.set("new_key", np.array([b"a"], dtype=np.bytes_))
await ezhs_read_ok.set("new_key", np.array([b"a"], dtype=np.bytes_)) # type: ignore

with pytest.raises(Exception):
await ezhs_read_ok.delete("zarr.json")
Loading
Loading