-
Notifications
You must be signed in to change notification settings - Fork 0
feat: Sharding #68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Sharding #68
Changes from all commits
f62e6c1
0016b03
226967f
89e8124
6e56031
c6bdf5c
90ec3d0
46b2f99
0fb9be9
f7f169d
3b94888
e17003a
72fdcfa
c92f5ec
4476272
c981437
8476806
50fd6e6
a87eb19
21e15c3
247bd69
7ae0561
3ef4aee
b0268ad
092a538
dec826e
cac1ad0
3834667
87e9085
5eeaedf
89481f8
4991475
573845b
255e10e
888a139
1288f55
4c92d2c
309ea4a
be19965
79796a6
999a85a
6f79dbb
78d7621
efa3429
2f74b56
6fc813b
36e1ad0
a57b434
a67e661
a60039b
2d18f4d
fb4fc37
f5ab517
eded043
cfda0c7
be0048b
d1ebf1f
a226a10
95982d2
5c2580a
2fcff2b
47c04c5
a9a36fb
07bae20
fcf0119
1ef78a8
63e59a8
d2fca81
7b20b2a
4acd195
5b1bad8
311fd17
bfb4a41
7d826fa
559a7ff
e537389
7a39161
efb3c64
41143f9
e55a3e3
a3f60f2
ac17139
6cac0f5
7529c2f
78cf72d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| # CLAUDE.md | ||
|
|
||
| This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. | ||
|
|
||
| ## Common Development Commands | ||
|
|
||
| Setup environment: | ||
| ```bash | ||
| uv sync | ||
| source .venv/bin/activate | ||
| pre-commit install | ||
| ``` | ||
|
|
||
| Run all checks (tests, linting, formatting, type checking): | ||
| ```bash | ||
| bash run-checks.sh | ||
| ``` | ||
|
|
||
| Run tests: | ||
| ```bash | ||
| # All tests (requires IPFS daemon or Docker) | ||
| pytest --ipfs --cov=py_hamt tests/ | ||
|
|
||
| # Quick tests without IPFS integration | ||
| pytest --cov=py_hamt tests/ | ||
|
|
||
| # Single test file | ||
| pytest tests/test_hamt.py | ||
|
|
||
| # Coverage report | ||
| uv run coverage report --fail-under=100 --show-missing | ||
| ``` | ||
|
|
||
| Linting and formatting: | ||
| ```bash | ||
| # Run all pre-commit hooks | ||
| uv run pre-commit run --all-files --show-diff-on-failure | ||
|
|
||
| # Fix auto-fixable ruff issues | ||
| uv run ruff check --fix | ||
| ``` | ||
|
|
||
| Type checking and other tools: | ||
| ```bash | ||
| # Type checking is handled by pre-commit hooks (mypy) | ||
| # Documentation preview | ||
| uv run pdoc py_hamt | ||
| ``` | ||
|
|
||
| ## Architecture Overview | ||
|
|
||
| py-hamt implements a Hash Array Mapped Trie (HAMT) for IPFS/IPLD content-addressed storage. The core architecture follows this pattern: | ||
|
|
||
| 1. **ContentAddressedStore (CAS)** - Abstract storage layer (store.py) | ||
| - `KuboCAS` - IPFS/Kubo implementation for production | ||
| - `InMemoryCAS` - In-memory implementation for testing | ||
|
|
||
| 2. **HAMT** - Core data structure (hamt.py) | ||
| - Uses blake3 hashing by default | ||
| - Implements content-addressed trie for efficient key-value storage | ||
| - Supports async operations for large datasets | ||
|
|
||
| 3. **ZarrHAMTStore** - Zarr integration (zarr_hamt_store.py) | ||
| - Implements zarr.abc.store.Store interface | ||
| - Enables storing large Zarr arrays on IPFS via HAMT | ||
| - Keys stored verbatim, values as raw bytes | ||
|
|
||
| 4. **Encryption Layer** - Optional encryption (encryption_hamt_store.py) | ||
| - `SimpleEncryptedZarrHAMTStore` for fully encrypted storage | ||
|
|
||
| ## Key Design Patterns | ||
|
|
||
| - All storage operations are async to handle IPFS network calls | ||
| - Content addressing means identical data gets same hash/CID | ||
| - HAMT provides O(log n) access time for large key sets | ||
| - Store abstractions allow swapping storage backends | ||
| - Type hints required throughout (mypy enforced) | ||
| - 100% test coverage required with hypothesis property-based testing | ||
|
|
||
| ## IPFS Integration Requirements | ||
|
|
||
| Tests require either: | ||
| - Local IPFS daemon running (`ipfs daemon`) | ||
| - Docker available for containerized IPFS | ||
| - Neither (unit tests only, integration tests skip) | ||
|
|
||
| The `--ipfs` pytest flag controls IPFS test execution. |
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,130 @@ | ||||||||||||||
| import argparse | ||||||||||||||
| import asyncio | ||||||||||||||
| import time | ||||||||||||||
|
|
||||||||||||||
| import xarray as xr | ||||||||||||||
| from multiformats import CID | ||||||||||||||
|
|
||||||||||||||
| from .hamt import HAMT | ||||||||||||||
| from .sharded_zarr_store import ShardedZarrStore | ||||||||||||||
| from .store_httpx import KuboCAS | ||||||||||||||
| from .zarr_hamt_store import ZarrHAMTStore | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| async def convert_hamt_to_sharded( | ||||||||||||||
| cas: KuboCAS, hamt_root_cid: str, chunks_per_shard: int | ||||||||||||||
| ) -> str: | ||||||||||||||
| """ | ||||||||||||||
| Converts a Zarr dataset from a HAMT-based store to a ShardedZarrStore. | ||||||||||||||
| Args: | ||||||||||||||
| cas: An initialized ContentAddressedStore instance (KuboCAS). | ||||||||||||||
| hamt_root_cid: The root CID of the source ZarrHAMTStore. | ||||||||||||||
| chunks_per_shard: The number of chunks to group into a single shard in the new store. | ||||||||||||||
| Returns: | ||||||||||||||
| The root CID of the newly created ShardedZarrStore. | ||||||||||||||
| """ | ||||||||||||||
| print(f"--- Starting Conversion from HAMT Root {hamt_root_cid} ---") | ||||||||||||||
| start_time = time.perf_counter() | ||||||||||||||
| # 1. Open the source HAMT store for reading | ||||||||||||||
| print("Opening source HAMT store...") | ||||||||||||||
| hamt_ro = await HAMT.build( | ||||||||||||||
| cas=cas, root_node_id=hamt_root_cid, values_are_bytes=True, read_only=True | ||||||||||||||
| ) | ||||||||||||||
| source_store = ZarrHAMTStore(hamt_ro, read_only=True) | ||||||||||||||
| source_dataset = xr.open_zarr(store=source_store, consolidated=True) | ||||||||||||||
| # 2. Introspect the source array to get its configuration | ||||||||||||||
| print("Reading metadata from source store...") | ||||||||||||||
|
|
||||||||||||||
| # Read the stores metadata to get array shape and chunk shape | ||||||||||||||
| data_var_name = next(iter(source_dataset.data_vars)) | ||||||||||||||
| ordered_dims = list(source_dataset[data_var_name].dims) | ||||||||||||||
| array_shape_tuple = tuple(source_dataset.sizes[dim] for dim in ordered_dims) | ||||||||||||||
| chunk_shape_tuple = tuple(source_dataset.chunks[dim][0] for dim in ordered_dims) | ||||||||||||||
| array_shape = array_shape_tuple | ||||||||||||||
| chunk_shape = chunk_shape_tuple | ||||||||||||||
|
|
||||||||||||||
| # 3. Create the destination ShardedZarrStore for writing | ||||||||||||||
| print( | ||||||||||||||
| f"Initializing new ShardedZarrStore with {chunks_per_shard} chunks per shard..." | ||||||||||||||
| ) | ||||||||||||||
| dest_store = await ShardedZarrStore.open( | ||||||||||||||
| cas=cas, | ||||||||||||||
| read_only=False, | ||||||||||||||
| array_shape=array_shape, | ||||||||||||||
| chunk_shape=chunk_shape, | ||||||||||||||
| chunks_per_shard=chunks_per_shard, | ||||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
| print("Destination store initialized.") | ||||||||||||||
|
|
||||||||||||||
| # 4. Iterate and copy all data from source to destination | ||||||||||||||
| print("Starting data migration...") | ||||||||||||||
| count = 0 | ||||||||||||||
| async for key in hamt_ro.keys(): | ||||||||||||||
| count += 1 | ||||||||||||||
| # Read the raw data (metadata or chunk) from the source | ||||||||||||||
| cid: CID = await hamt_ro.get_pointer(key) | ||||||||||||||
| cid_base32_str = str(cid.encode("base32")) | ||||||||||||||
|
|
||||||||||||||
| # Write the exact same key-value pair to the destination. | ||||||||||||||
| await dest_store.set_pointer(key, cid_base32_str) | ||||||||||||||
| if count % 200 == 0: # pragma: no cover | ||||||||||||||
| print(f"Migrated {count} keys...") # pragma: no cover | ||||||||||||||
|
|
||||||||||||||
| print(f"Migration of {count} total keys complete.") | ||||||||||||||
|
|
||||||||||||||
| # 5. Finalize the new store by flushing it to the CAS | ||||||||||||||
| print("Flushing new store to get final root CID...") | ||||||||||||||
| new_root_cid = await dest_store.flush() | ||||||||||||||
| end_time = time.perf_counter() | ||||||||||||||
|
|
||||||||||||||
| print("\n--- Conversion Complete! ---") | ||||||||||||||
| print(f"Total time: {end_time - start_time:.2f} seconds") | ||||||||||||||
| print(f"New ShardedZarrStore Root CID: {new_root_cid}") | ||||||||||||||
| return new_root_cid | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| async def sharded_converter_cli(): | ||||||||||||||
| parser = argparse.ArgumentParser( | ||||||||||||||
| description="Convert a Zarr HAMT store to a Sharded Zarr store." | ||||||||||||||
| ) | ||||||||||||||
| parser.add_argument( | ||||||||||||||
| "hamt_cid", type=str, help="The root CID of the source Zarr HAMT store." | ||||||||||||||
| ) | ||||||||||||||
| parser.add_argument( | ||||||||||||||
| "--chunks-per-shard", | ||||||||||||||
| type=int, | ||||||||||||||
| default=6250, | ||||||||||||||
| help="Number of chunk CIDs to store per shard in the new store.", | ||||||||||||||
| ) | ||||||||||||||
| parser.add_argument( | ||||||||||||||
| "--rpc-url", | ||||||||||||||
| type=str, | ||||||||||||||
| default="http://127.0.0.1:5001", | ||||||||||||||
| help="The URL of the IPFS Kubo RPC API.", | ||||||||||||||
| ) | ||||||||||||||
| parser.add_argument( | ||||||||||||||
| "--gateway-url", | ||||||||||||||
| type=str, | ||||||||||||||
| default="http://127.0.0.1:8080", | ||||||||||||||
| help="The URL of the IPFS Gateway.", | ||||||||||||||
| ) | ||||||||||||||
| args = parser.parse_args() | ||||||||||||||
| # Initialize the KuboCAS client with the provided RPC and Gateway URLs | ||||||||||||||
| async with KuboCAS( | ||||||||||||||
| rpc_base_url=args.rpc_url, gateway_base_url=args.gateway_url | ||||||||||||||
| ) as cas_client: | ||||||||||||||
| try: | ||||||||||||||
| await convert_hamt_to_sharded( | ||||||||||||||
| cas=cas_client, | ||||||||||||||
| hamt_root_cid=args.hamt_cid, | ||||||||||||||
| chunks_per_shard=args.chunks_per_shard, | ||||||||||||||
| ) | ||||||||||||||
| except Exception as e: | ||||||||||||||
| print(f"\nAn error occurred: {e}") | ||||||||||||||
|
Comment on lines
+125
to
+126
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Avoid catching blind exception. The broad Catch specific exceptions that are expected during conversion: - except Exception as e:
- print(f"\nAn error occurred: {e}")
+ except (ValueError, KeyError, RuntimeError) as e:
+ print(f"\nAn error occurred: {e}")
+ import sys
+ sys.exit(1)📝 Committable suggestion
Suggested change
🧰 Tools🪛 Ruff (0.12.2)125-125: Do not catch blind exception: (BLE001)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
maybe something to consider here @TheGreatAlgo ?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| if __name__ == "__main__": | ||||||||||||||
| asyncio.run(sharded_converter_cli()) # pragma: no cover | ||||||||||||||

Uh oh!
There was an error while loading. Please reload this page.