Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/run-checks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
run_daemon: true

- name: Run pytest with coverage
run: uv run pytest --ipfs --cov=py_hamt tests/ --cov-report=xml
run: uv run pytest --ipfs --cov=py_hamt tests/ --cov-report=xml -s

- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@18283e04ce6e62d37312384ff67231eb8fd56d24 # v5
Expand Down
53 changes: 53 additions & 0 deletions fsgs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import asyncio
import os
import sys

import httpx
import xarray as xr
from xarray import Dataset

from py_hamt import HAMT, KuboCAS, ZarrHAMTStore

print(f"Python version: {sys.version}")
print(f"Python path: {os.path.dirname(sys.executable)}")


print(f"httpx version: {httpx.__version__}")


async def main():
cid = "bafyr4iecw3faqyvj75psutabk2jxpddpjdokdy5b26jdnjjzpkzbgb5xoq"

# Use KuboCAS as an async context manager
async with KuboCAS() as kubo_cas: # connects to a local kubo node
hamt = await HAMT.build(
cas=kubo_cas, root_node_id=cid, values_are_bytes=True, read_only=True
)
# Initialize the store
zhs = ZarrHAMTStore(hamt, read_only=True)

# Open the dataset with xarray
zarr_ds: Dataset = xr.open_zarr(store=zhs)
# List all variables
print("All variables:", list(zarr_ds.variables))

# Alternatively, just the data variables (not coordinates or attributes)
print("Data variables:", list(zarr_ds.data_vars))

# Or, if you want coordinate variables too
print("Coordinates:", list(zarr_ds.coords))

first_timestamp = zarr_ds["time"].values[0]

first_year = str(first_timestamp.astype("datetime64[Y]"))

first_year_precip = zarr_ds["precip"].sel(
time=slice(f"{first_year}-01-01", f"{first_year}-12-31")
)

print(f"Precip data for {first_year}:")
print(first_year_precip)


if __name__ == "__main__":
asyncio.run(main())
5 changes: 4 additions & 1 deletion py_hamt/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from .encryption_hamt_store import SimpleEncryptedZarrHAMTStore
from .hamt import HAMT, blake3_hashfn
from .store import ContentAddressedStore, KuboCAS
from .store_httpx import ContentAddressedStore, InMemoryCAS, KuboCAS
from .zarr_hamt_store import ZarrHAMTStore

__all__ = [
"blake3_hashfn",
"HAMT",
"ContentAddressedStore",
"InMemoryCAS",
"KuboCAS",
"ZarrHAMTStore",
"SimpleEncryptedZarrHAMTStore",
]

print("Running py-hamt from source!")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove the print statement from module initialization.

Print statements in module initialization will execute every time the module is imported, which is inappropriate for a library and will pollute user applications' stdout.

Apply this diff to remove the print statement:

-
-print("Running py-hamt from source!")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
print("Running py-hamt from source!")
🤖 Prompt for AI Agents
In py_hamt/__init__.py at line 15, remove the print statement "print('Running
py-hamt from source!')" to prevent unwanted output during module import,
ensuring the library does not produce side effects like printing to stdout when
imported.

2 changes: 1 addition & 1 deletion py_hamt/hamt.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from dag_cbor.ipld import IPLDKind
from multiformats import multihash

from .store import ContentAddressedStore
from .store_httpx import ContentAddressedStore


def extract_bits(hash_bytes: bytes, depth: int, nbits: int) -> int:
Expand Down
175 changes: 96 additions & 79 deletions py_hamt/store.py → py_hamt/store_httpx.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import asyncio
from abc import ABC, abstractmethod
from typing import Any, Literal, cast
from typing import Any, Dict, Literal, Tuple, cast

import aiohttp
import httpx
from dag_cbor.ipld import IPLDKind
from multiformats import CID, multihash
from multiformats.multihash import Multihash
Expand Down Expand Up @@ -55,7 +55,7 @@ async def load(self, id: IPLDKind) -> bytes:
backend we *require* a `bytes` hash; anything else is rejected at run
time. In OO type-checking, a subclass may widen (make more general) argument types,
but it must never narrow them; otherwise callers that expect the base-class contract can break.
Mypy enforces this contra-variance rule and emits the violates Liskov substitution principle error.
Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error.
This is why we use `cast` here, to tell mypy that we know what we are doing.
h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch
"""
Expand Down Expand Up @@ -85,39 +85,39 @@ class KuboCAS(ContentAddressedStore):
### Authentication / custom headers
You have two options:

1. **Bring your own `aiohttp.ClientSession`**
Pass it via `session=...` — any default headers or `BasicAuth`
configured on that session are reused for **every** request.
2. **Let `KuboCAS` build the session** but pass
1. **Bring your own `httpx.AsyncClient`**
Pass it via `client=...` — any default headers or auth
configured on that client are reused for **every** request.
2. **Let `KuboCAS` build the client** but pass
`headers=` *and*/or `auth=` kwargs; they are forwarded to the
internally–created `ClientSession`.
internally–created `AsyncClient`.

```python
import aiohttp
import httpx
from py_hamt import KuboCAS

# Option 1: user-supplied session
sess = aiohttp.ClientSession(
# Option 1: user-supplied client
client = httpx.AsyncClient(
headers={"Authorization": "Bearer <token>"},
auth=aiohttp.BasicAuth("user", "pass"),
auth=("user", "pass"),
)
cas = KuboCAS(session=sess)
cas = KuboCAS(client=client)

# Option 2: let KuboCAS create the session
# Option 2: let KuboCAS create the client
cas = KuboCAS(
headers={"X-My-Header": "yes"},
auth=aiohttp.BasicAuth("user", "pass"),
auth=("user", "pass"),
)
```

### Parameters
- **hasher** (str): multihash name (defaults to *blake3*).
- **session** (`aiohttp.ClientSession | None`): reuse an existing
session; if *None* KuboCAS will create one lazily.
- **client** (`httpx.AsyncClient | None`): reuse an existing
client; if *None* KuboCAS will create one lazily.
- **headers** (dict[str, str] | None): default headers for the
internally-created session.
- **auth** (`aiohttp.BasicAuth | None`): authentication object for
the internally-created session.
internally-created client.
- **auth** (`tuple[str, str] | None`): authentication tuple (username, password)
for the internally-created client.
- **rpc_base_url / gateway_base_url** (str | None): override daemon
endpoints (defaults match the local daemon ports).

Expand All @@ -130,31 +130,31 @@ class KuboCAS(ContentAddressedStore):
DAG_PB_MARKER: int = 0x70
"""@private"""

# Take in a aiohttp session that can be reused across POSTs and GETs to a specific ipfs daemon, also allow for not doing that and creating our own single request lifetime session instead
# Take in a httpx client that can be reused across POSTs and GETs to a specific IPFS daemon
def __init__(
self,
hasher: str = "blake3",
session: aiohttp.ClientSession | None = None,
rpc_base_url: str | None = KUBO_DEFAULT_LOCAL_RPC_BASE_URL,
gateway_base_url: str | None = KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL,
client: httpx.AsyncClient | None = None,
rpc_base_url: str | None = None,
gateway_base_url: str | None = None,
concurrency: int = 32,
*,
headers: dict[str, str] | None = None,
auth: aiohttp.BasicAuth | None = None,
auth: Tuple[str, str] | None = None,
):
"""
If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all.

### `aiohttp.ClientSession` Management
If `session` is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using `await cas.aclose()`
### `httpx.AsyncClient` Management
If `client` is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using `await cas.aclose()`
as a class instance cannot know when it will no longer be in use, unless explicitly told to do so.

If you are using the `KuboCAS` instance in an `async with` block, it will automatically close the session when the block is exited which is what we suggest below:
If you are using the `KuboCAS` instance in an `async with` block, it will automatically close the client when the block is exited which is what we suggest below:
```python
async with aiohttp.ClientSession() as session, KuboCAS(
async with httpx.AsyncClient() as client, KuboCAS(
rpc_base_url=rpc_base_url,
gateway_base_url=gateway_base_url,
session=session,
client=client,
) as kubo_cas:
hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True)
zhs = ZarrHAMTStore(hamt)
Expand All @@ -170,8 +170,8 @@ def __init__(
```

### Authenticated RPC/Gateway Access
Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own `aiohttp.ClientSession` and then passing that in.
Alternatively, they can pass in `headers` and `auth` parameters to the constructor, which will be used to create a new `aiohttp.ClientSession` if one is not provided.
Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own `httpx.AsyncClient` and then passing that in.
Alternatively, they can pass in `headers` and `auth` parameters to the constructor, which will be used to create a new `httpx.AsyncClient` if one is not provided.
If you do not need authentication, you can leave these parameters as `None`.

### RPC and HTTP Gateway Base URLs
Expand All @@ -182,7 +182,7 @@ def __init__(
"""The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT."""

if rpc_base_url is None:
rpc_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_RPC_BASE_URL
rpc_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_RPC_BASE_URL # pragma
if gateway_base_url is None:
gateway_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL

Expand All @@ -191,89 +191,104 @@ def __init__(
self.gateway_base_url: str = f"{gateway_base_url}/ipfs/"
"""@private"""

self._session_per_loop: dict[
asyncio.AbstractEventLoop, aiohttp.ClientSession
] = {}
self._client_per_loop: Dict[asyncio.AbstractEventLoop, httpx.AsyncClient] = {}

if session is not None:
if client is not None:
# user supplied → bind it to *their* current loop
self._session_per_loop[asyncio.get_running_loop()] = session
self._owns_session: bool = False
self._client_per_loop[asyncio.get_running_loop()] = client
self._owns_client: bool = False
else:
self._owns_session = True # well create sessions lazily
self._owns_client = True # we'll create clients lazily
Comment on lines +196 to +201
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

get_running_loop() may raise when called outside an event loop.

If a user supplies an AsyncClient in synchronous code, asyncio.get_running_loop() raises RuntimeError, breaking __init__.

-if client is not None:
-    self._client_per_loop[asyncio.get_running_loop()] = client
+if client is not None:
+    try:
+        loop = asyncio.get_running_loop()
+    except RuntimeError:
+        # Defer binding until first use
+        loop = None
+    if loop is not None:
+        self._client_per_loop[loop] = client
+    self._user_client = client

Then have _loop_client() return self._user_client when present.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In py_hamt/store_httpx.py around lines 196 to 201, calling
asyncio.get_running_loop() when a user supplies an AsyncClient in synchronous
code can raise RuntimeError. To fix this, wrap the get_running_loop() call in a
try-except block to catch RuntimeError and handle it gracefully. Also, modify
the _loop_client() method to return self._user_client directly if it is present,
avoiding reliance on the event loop in that case.


# store for later use by _loop_session()
# store for later use by _loop_client()
self._default_headers = headers
self._default_auth = auth

self._sem: asyncio.Semaphore = asyncio.Semaphore(concurrency)
self._closed: bool = False

# --------------------------------------------------------------------- #
# helper: get or create the session bound to the current running loop #
# helper: get or create the client bound to the current running loop #
# --------------------------------------------------------------------- #
def _loop_session(self) -> aiohttp.ClientSession:
"""Get or create a session for the current event loop with improved cleanup."""
def _loop_client(self) -> httpx.AsyncClient:
"""Get or create a client for the current event loop."""
loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
try:
return self._session_per_loop[loop]
return self._client_per_loop[loop]
except KeyError:
# Create a session with a connector that closes more quickly
connector = aiohttp.TCPConnector(
limit=64,
limit_per_host=32,
force_close=True, # Force close connections
enable_cleanup_closed=True, # Enable cleanup of closed connections
)

sess = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=60),
connector=connector,
# Create a new client
client = httpx.AsyncClient(
timeout=60.0,
headers=self._default_headers,
auth=self._default_auth,
limits=httpx.Limits(max_connections=64, max_keepalive_connections=32),
# Uncomment when they finally support Robost HTTP/2 GOAWAY responses
# http2=True,
)
self._session_per_loop[loop] = sess
return sess
self._client_per_loop[loop] = client
return client

# --------------------------------------------------------------------- #
# graceful shutdown: close **all** sessions we own #
# graceful shutdown: close **all** clients we own #
# --------------------------------------------------------------------- #
async def aclose(self) -> None:
"""Close all internally-created sessions."""
if not self._owns_session:
# User supplied the session; they are responsible for closing it.
"""Close all internally-created clients."""
if not self._owns_client:
# User supplied the client; they are responsible for closing it.
return

for sess in list(self._session_per_loop.values()):
if not sess.closed:
for client in list(self._client_per_loop.values()):
if not client.is_closed:
try:
await sess.close()
await client.aclose()
except Exception:
# Best-effort cleanup; ignore errors during shutdown
pass

self._session_per_loop.clear()
self._client_per_loop.clear()
self._closed = True

# At this point, _session_per_loop should be empty or only contain
# sessions from loops we haven't seen (which shouldn't happen in practice)
# At this point, _client_per_loop should be empty or only contain
# clients from loops we haven't seen (which shouldn't happen in practice)
async def __aenter__(self) -> "KuboCAS":
return self

async def __aexit__(self, *exc: Any) -> None:
await self.aclose()

def __del__(self) -> None:
"""Best-effort close for internally-created clients."""
if not self._owns_client or self._closed:
return

# Attempt proper cleanup if possible
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None

try:
if loop is None or not loop.is_running():
asyncio.run(self.aclose())
else:
loop.create_task(self.aclose())
except Exception:
# Suppress all errors during interpreter shutdown or loop teardown
pass

# --------------------------------------------------------------------- #
# save() – now uses the per-loop session #
# save() – now uses the per-loop client #
# --------------------------------------------------------------------- #
async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CID:
async with self._sem: # throttle RPC
form = aiohttp.FormData()
form.add_field(
"file", data, filename="blob", content_type="application/octet-stream"
)
# Create multipart form data
files = {"file": data}

async with self._loop_session().post(self.rpc_url, data=form) as resp:
resp.raise_for_status()
cid_str: str = (await resp.json())["Hash"]
# Send the POST request
client = self._loop_client()
response = await client.post(self.rpc_url, files=files)
response.raise_for_status()
cid_str: str = response.json()["Hash"]

cid: CID = CID.decode(cid_str)
if cid.codec.code != self.DAG_PB_MARKER:
Expand All @@ -284,7 +299,9 @@ async def load(self, id: IPLDKind) -> bytes:
"""@private"""
cid = cast(CID, id) # CID is definitely in the IPLDKind type
url: str = self.gateway_base_url + str(cid)

async with self._sem: # throttle gateway
async with self._loop_session().get(url) as resp:
resp.raise_for_status()
return await resp.read()
client = self._loop_client()
response = await client.get(url)
response.raise_for_status()
return response.content
Loading
Loading