From 4315343c5002e8217d34f3307ec7607c41f597b0 Mon Sep 17 00:00:00 2001 From: Faolain Date: Thu, 12 Jun 2025 02:08:22 -0400 Subject: [PATCH 1/7] fix: add public gateway support to httpx branch --- public_gateway_example.py | 87 ++++++++++++++++ py_hamt/store_httpx.py | 33 +++++- tests/test_public_gateway.py | 193 +++++++++++++++++++++++++++++++++++ 3 files changed, 311 insertions(+), 2 deletions(-) create mode 100644 public_gateway_example.py create mode 100644 tests/test_public_gateway.py diff --git a/public_gateway_example.py b/public_gateway_example.py new file mode 100644 index 0000000..186aa4d --- /dev/null +++ b/public_gateway_example.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 + +import asyncio + +import xarray as xr +from xarray import Dataset + +from py_hamt import HAMT, KuboCAS, ZarrHAMTStore + + +async def fetch_zarr_from_gateway(cid: str, gateway: str = "https://ipfs.io"): + """ + Fetch and open a Zarr dataset from a public gateway using py-hamt. + + Parameters: + ----------- + cid : str + The IPFS CID of the Zarr dataset root + gateway : str + The IPFS gateway URL to use (default: ipfs.io) + + Returns: + -------- + Dataset + The xarray Dataset from the Zarr store + """ + print(f"Fetching CID {cid} from gateway {gateway}") + + # Use KuboCAS with the public gateway + # Setting RPC to None since we're only reading, not writing + async with KuboCAS(rpc_base_url=None, gateway_base_url=gateway) as kubo_cas: + # Initialize the HAMT with the root CID + hamt = await HAMT.build( + cas=kubo_cas, root_node_id=cid, values_are_bytes=True, read_only=True + ) + + # Initialize the Zarr store + zhs = ZarrHAMTStore(hamt, read_only=True) + + # Open the dataset with xarray + print("Opening Zarr dataset...") + zarr_ds: Dataset = xr.open_zarr(store=zhs) + + # Print info about the dataset + print("\nDataset summary:") + print(f"Dimensions: {dict(zarr_ds.dims)}") + print(f"Data variables: {list(zarr_ds.data_vars)}") + print(f"Coordinates: {list(zarr_ds.coords)}") + + # Return the dataset + return zarr_ds + + +async def main(): + # Example CID - this points to a weather dataset stored on IPFS + cid = "bafyr4iecw3faqyvj75psutabk2jxpddpjdokdy5b26jdnjjzpkzbgb5xoq" + + # Try different public gateways + gateways = [ + "https://ipfs.io", # IPFS.io gateway + "https://dweb.link", # Protocol Labs gateway + "https://cloudflare-ipfs.com", # Cloudflare gateway + ] + + # Try each gateway + for gateway in gateways: + print(f"\n===== Testing gateway: {gateway} =====") + try: + ds = await fetch_zarr_from_gateway(cid, gateway) + print("Success! Dataset loaded correctly.") + + # Demonstrate accessing data + if "precip" in ds and "time" in ds.coords: + first_timestamp = ds["time"].values[0] + print(f"First timestamp: {first_timestamp}") + + # Get a slice of the data + first_slice = ds["precip"].isel(time=0) + print(f"First precipitation slice shape: {first_slice.shape}") + + break # Stop after first successful gateway + except Exception as e: + print(f"Error with gateway {gateway}: {type(e).__name__}: {e}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/py_hamt/store_httpx.py b/py_hamt/store_httpx.py index 63f6efc..3af1b76 100644 --- a/py_hamt/store_httpx.py +++ b/py_hamt/store_httpx.py @@ -223,6 +223,7 @@ def _loop_client(self) -> httpx.AsyncClient: auth=self._default_auth, limits=httpx.Limits(max_connections=64, max_keepalive_connections=32), http2=True, + follow_redirects=True, ) self._client_per_loop[loop] = client return client @@ -297,10 +298,38 @@ async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CI async def load(self, id: IPLDKind) -> bytes: """@private""" cid = cast(CID, id) # CID is definitely in the IPLDKind type - url: str = self.gateway_base_url + str(cid) + + # Public gateways need special handling to return raw IPLD content + is_public_gateway = False + for domain in ["ipfs.io", "dweb.link", "cloudflare-ipfs.com"]: + if domain in self.gateway_base_url: + is_public_gateway = True + break + + # Remove /ipfs/ from the gateway URL if present + base_url = self.gateway_base_url + if "/ipfs/" in base_url: + base_url = base_url.split("/ipfs/")[0] + + # Standard gateway URL construction with proper path handling + if base_url.endswith("/"): + url = f"{base_url}ipfs/{cid}" + else: + url = f"{base_url}/ipfs/{cid}" + + # Set appropriate headers based on gateway type + # For public gateways, we need specific Accept headers to get the raw content + headers = {} + if is_public_gateway: + # These headers tell the gateway to return the raw content instead of HTML + headers["Accept"] = ( + "application/vnd.ipld.raw, application/vnd.ipld.dag-cbor, application/octet-stream" + ) + # Also add the dag-cbor format in the path for reliable content addressing + url = f"{url}?format=dag-cbor" async with self._sem: # throttle gateway client = self._loop_client() - response = await client.get(url) + response = await client.get(url, headers=headers) response.raise_for_status() return response.content diff --git a/tests/test_public_gateway.py b/tests/test_public_gateway.py new file mode 100644 index 0000000..d648664 --- /dev/null +++ b/tests/test_public_gateway.py @@ -0,0 +1,193 @@ +import asyncio + +import httpx +import pytest +from multiformats import CID + +from py_hamt import KuboCAS + +TEST_CID = "bafyr4iecw3faqyvj75psutabk2jxpddpjdokdy5b26jdnjjzpkzbgb5xoq" + + +async def verify_response_content(url: str, client=None): + """Fetch and verify the response from a given URL""" + should_close = False + if client is None: + client = httpx.AsyncClient(follow_redirects=True) + should_close = True + + try: + # Print request info + print(f"Testing URL: {url}") + + # Fetch content + response = await client.get(url) + response.raise_for_status() + + # Check content type + content_type = response.headers.get("content-type", "") + print(f"Content-Type: {content_type}") + + # First few bytes for debug + content = response.content + print(f"First 20 bytes: {content[:20].hex()}") + print(f"Content length: {len(content)}") + + # A valid DAG-CBOR object typically starts with 0xa* for arrays or 0x* for other types + # This is a simple heuristic check + first_byte = content[0] if content else 0 + return { + "url": url, + "status_code": response.status_code, + "content_type": content_type, + "content_length": len(content), + "first_byte": hex(first_byte), + "looks_like_dag_cbor": first_byte & 0xE0 in (0x80, 0xA0), # Arrays or maps + "content": content, + } + finally: + if should_close: + await client.aclose() + + +@pytest.mark.asyncio +async def test_compare_gateways(): + """Compare response content from different IPFS gateways""" + + # Test URLs + cid = CID.decode(TEST_CID) + gateways = [ + f"http://127.0.0.1:8080/ipfs/{cid}", # Local gateway + f"https://ipfs.io/ipfs/{cid}?format=dag-cbor", # Public gateway with format parameter + f"https://dweb.link/ipfs/{cid}?format=dag-cbor", # Protocol Labs' gateway with format parameter + f"https://cloudflare-ipfs.com/ipfs/{cid}?format=dag-cbor", # Cloudflare's gateway with format parameter + ] + + # Create a single client for all requests + async with httpx.AsyncClient(follow_redirects=True) as client: + # Test each gateway + results = [] + for url in gateways: + try: + result = await verify_response_content(url, client) + results.append(result) + except Exception as e: + print(f"Error testing {url}: {e}") + results.append({"url": url, "error": str(e)}) + + # Print comparison + for result in results: + print(f"\nURL: {result.get('url')}") + if "error" in result: + print(f" Error: {result['error']}") + continue + + print(f" Status: {result.get('status_code')}") + print(f" Content-Type: {result.get('content_type')}") + print(f" Content Length: {result.get('content_length')}") + print(f" First Byte: {result.get('first_byte')}") + print(f" Looks like DAG-CBOR: {result.get('looks_like_dag_cbor')}") + + # Verify at least the local gateway worked + local_result = next((r for r in results if "127.0.0.1" in r.get("url", "")), None) + if local_result and "error" not in local_result: + assert local_result.get("looks_like_dag_cbor", False), ( + "Local gateway response doesn't look like DAG-CBOR" + ) + + +@pytest.mark.asyncio +async def test_kubocas_public_gateway(): + """Test KuboCAS with a public gateway""" + + # Use a public gateway + cas = KuboCAS( + rpc_base_url="http://127.0.0.1:5001", # Keep local RPC for saves + gateway_base_url="https://ipfs.io", # Use public gateway for loads + ) + + try: + # Try to load the CID + cid = CID.decode(TEST_CID) + data = await cas.load(cid) + + # Print info for debugging + print(f"Loaded {len(data)} bytes from public gateway") + print(f"First 20 bytes: {data[:20].hex()}") + + # Check if it looks like DAG-CBOR + first_byte = data[0] if data else 0 + is_dag_cbor = first_byte & 0xE0 in (0x80, 0xA0) # Simple check for arrays/maps + print(f"First byte: {hex(first_byte)}, Looks like DAG-CBOR: {is_dag_cbor}") + + assert is_dag_cbor, "Data from public gateway doesn't look like DAG-CBOR" + + finally: + await cas.aclose() + + +@pytest.mark.asyncio +async def test_fix_kubocas_load(): + """Test a proposed fix for KuboCAS when loading from public gateways""" + + class FixedKuboCAS(KuboCAS): + """Extended KuboCAS with improved public gateway support""" + + async def load(self, id): + """Modified load that ensures we get the raw IPLD content""" + cid = CID.decode(str(id)) if isinstance(id, str) else id + + # Clean the base URL to prevent path issues + base_url = self.gateway_base_url + if "/ipfs/" in base_url: + base_url = base_url.split("/ipfs/")[0] + + # Construction of URL that works with public gateways + if base_url.endswith("/"): + url = f"{base_url}ipfs/{cid}?format=dag-cbor" + else: + url = f"{base_url}/ipfs/{cid}?format=dag-cbor" + + print(f"Requesting URL: {url}") + + async with self._sem: + client = self._loop_client() + + # For public gateways, add appropriate Accept header to get raw content + headers = { + "Accept": "application/vnd.ipld.raw, application/vnd.ipld.dag-cbor, application/octet-stream" + } + + response = await client.get(url, headers=headers) + response.raise_for_status() + return response.content + + # Use the fixed implementation with a public gateway + cas = FixedKuboCAS( + rpc_base_url="http://127.0.0.1:5001", gateway_base_url="https://ipfs.io" + ) + + try: + # Try to load the CID + cid = CID.decode(TEST_CID) + data = await cas.load(cid) + + # Print info for debugging + print(f"Loaded {len(data)} bytes from public gateway with fix") + print(f"First 20 bytes: {data[:20].hex()}") + + # Check if it looks like DAG-CBOR + first_byte = data[0] if data else 0 + is_dag_cbor = first_byte & 0xE0 in (0x80, 0xA0) + print(f"First byte: {hex(first_byte)}, Looks like DAG-CBOR: {is_dag_cbor}") + + assert is_dag_cbor, ( + "Data from public gateway with fix doesn't look like DAG-CBOR" + ) + + finally: + await cas.aclose() + + +if __name__ == "__main__": + asyncio.run(test_compare_gateways()) From 7d212ac0405f1ca7dca49c9a03f066288812cdaf Mon Sep 17 00:00:00 2001 From: Faolain Date: Thu, 12 Jun 2025 02:18:23 -0400 Subject: [PATCH 2/7] test: add missing test coverage line --- tests/test_public_gateway.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/tests/test_public_gateway.py b/tests/test_public_gateway.py index d648664..eb16b7b 100644 --- a/tests/test_public_gateway.py +++ b/tests/test_public_gateway.py @@ -126,6 +126,38 @@ async def test_kubocas_public_gateway(): await cas.aclose() +@pytest.mark.asyncio +async def test_trailing_slash_gateway(): + """Test KuboCAS with a gateway URL that has a trailing slash""" + + # Use a gateway URL with a trailing slash + cas = KuboCAS( + rpc_base_url="http://127.0.0.1:5001", + gateway_base_url="http://127.0.0.1:8080/", # Note the trailing slash + ) + + try: + # Try to load the CID + cid = CID.decode(TEST_CID) + data = await cas.load(cid) + + # Print info for debugging + print(f"Loaded {len(data)} bytes from gateway with trailing slash") + print(f"First 20 bytes: {data[:20].hex()}") + + # Check if it looks like DAG-CBOR + first_byte = data[0] if data else 0 + is_dag_cbor = first_byte & 0xE0 in (0x80, 0xA0) # Simple check for arrays/maps + print(f"First byte: {hex(first_byte)}, Looks like DAG-CBOR: {is_dag_cbor}") + + assert is_dag_cbor, ( + "Data from gateway with trailing slash doesn't look like DAG-CBOR" + ) + + finally: + await cas.aclose() + + @pytest.mark.asyncio async def test_fix_kubocas_load(): """Test a proposed fix for KuboCAS when loading from public gateways""" From 4df38729dfd13538d72d5c4efcdc3dd4f80d53b5 Mon Sep 17 00:00:00 2001 From: TheGreatAlgo <37487508+TheGreatAlgo@users.noreply.github.com> Date: Thu, 12 Jun 2025 16:08:16 -0400 Subject: [PATCH 3/7] fix: refactor headers and public gateways --- py_hamt/store_httpx.py | 42 ++++++++++++++---------------------------- 1 file changed, 14 insertions(+), 28 deletions(-) diff --git a/py_hamt/store_httpx.py b/py_hamt/store_httpx.py index 3af1b76..a73459a 100644 --- a/py_hamt/store_httpx.py +++ b/py_hamt/store_httpx.py @@ -186,9 +186,18 @@ def __init__( if gateway_base_url is None: gateway_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL + if "/ipfs/" in gateway_base_url: + gateway_base_url = gateway_base_url.split("/ipfs/")[0] + + # Standard gateway URL construction with proper path handling + if gateway_base_url.endswith("/"): + gateway_base_url = f"{gateway_base_url}ipfs/" + else: + gateway_base_url = f"{gateway_base_url}/ipfs/" + self.rpc_url: str = f"{rpc_base_url}/api/v0/add?hash={self.hasher}&pin=false" """@private""" - self.gateway_base_url: str = f"{gateway_base_url}/ipfs/" + self.gateway_base_url: str = gateway_base_url """@private""" self._client_per_loop: Dict[asyncio.AbstractEventLoop, httpx.AsyncClient] = {} @@ -299,34 +308,11 @@ async def load(self, id: IPLDKind) -> bytes: """@private""" cid = cast(CID, id) # CID is definitely in the IPLDKind type - # Public gateways need special handling to return raw IPLD content - is_public_gateway = False - for domain in ["ipfs.io", "dweb.link", "cloudflare-ipfs.com"]: - if domain in self.gateway_base_url: - is_public_gateway = True - break - - # Remove /ipfs/ from the gateway URL if present - base_url = self.gateway_base_url - if "/ipfs/" in base_url: - base_url = base_url.split("/ipfs/")[0] - - # Standard gateway URL construction with proper path handling - if base_url.endswith("/"): - url = f"{base_url}ipfs/{cid}" - else: - url = f"{base_url}/ipfs/{cid}" - - # Set appropriate headers based on gateway type - # For public gateways, we need specific Accept headers to get the raw content headers = {} - if is_public_gateway: - # These headers tell the gateway to return the raw content instead of HTML - headers["Accept"] = ( - "application/vnd.ipld.raw, application/vnd.ipld.dag-cbor, application/octet-stream" - ) - # Also add the dag-cbor format in the path for reliable content addressing - url = f"{url}?format=dag-cbor" + headers["Accept"] = ( + "application/vnd.ipld.raw, application/vnd.ipld.dag-cbor, application/octet-stream" + ) + url = f"{self.gateway_base_url + str(cid)}?format=dag-cbor" async with self._sem: # throttle gateway client = self._loop_client() From 62b0e842ef0bd819fc070ad7d0e8e93cb6be7722 Mon Sep 17 00:00:00 2001 From: TheGreatAlgo <37487508+TheGreatAlgo@users.noreply.github.com> Date: Thu, 12 Jun 2025 16:20:40 -0400 Subject: [PATCH 4/7] fix: coverage --- py_hamt/store_httpx.py | 1 + tests/test_public_gateway.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/py_hamt/store_httpx.py b/py_hamt/store_httpx.py index a73459a..1484415 100644 --- a/py_hamt/store_httpx.py +++ b/py_hamt/store_httpx.py @@ -309,6 +309,7 @@ async def load(self, id: IPLDKind) -> bytes: cid = cast(CID, id) # CID is definitely in the IPLDKind type headers = {} + # Necessary as ipfs public gateways return html headers["Accept"] = ( "application/vnd.ipld.raw, application/vnd.ipld.dag-cbor, application/octet-stream" ) diff --git a/tests/test_public_gateway.py b/tests/test_public_gateway.py index eb16b7b..c76c414 100644 --- a/tests/test_public_gateway.py +++ b/tests/test_public_gateway.py @@ -196,7 +196,7 @@ async def load(self, id): # Use the fixed implementation with a public gateway cas = FixedKuboCAS( - rpc_base_url="http://127.0.0.1:5001", gateway_base_url="https://ipfs.io" + rpc_base_url="http://127.0.0.1:5001", gateway_base_url="https://ipfs.io/ipfs/" ) try: From dbf2575ee15048bc66cb86a68bbc968467acd40c Mon Sep 17 00:00:00 2001 From: TheGreatAlgo <37487508+TheGreatAlgo@users.noreply.github.com> Date: Fri, 13 Jun 2025 12:54:41 -0400 Subject: [PATCH 5/7] fix: optional headers --- py_hamt/store_httpx.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/py_hamt/store_httpx.py b/py_hamt/store_httpx.py index 9b05b28..56d4c31 100644 --- a/py_hamt/store_httpx.py +++ b/py_hamt/store_httpx.py @@ -307,13 +307,14 @@ async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CI async def load(self, id: IPLDKind) -> bytes: """@private""" cid = cast(CID, id) # CID is definitely in the IPLDKind type - + url: str = f"{self.gateway_base_url + str(cid)}" headers = {} # Necessary as ipfs public gateways return html - headers["Accept"] = ( - "application/vnd.ipld.raw, application/vnd.ipld.dag-cbor, application/octet-stream" - ) - url = f"{self.gateway_base_url + str(cid)}?format=dag-cbor" + if "dclimate" not in self.gateway_base_url: + headers["Accept"] = ( + "application/vnd.ipld.raw, application/vnd.ipld.dag-cbor, application/octet-stream" + ) + url = f"{url}?format=dag-cbor" async with self._sem: # throttle gateway client = self._loop_client() From 190111fa707fda15e0c8778b7d6d046aff7a9a31 Mon Sep 17 00:00:00 2001 From: TheGreatAlgo <37487508+TheGreatAlgo@users.noreply.github.com> Date: Fri, 13 Jun 2025 13:23:48 -0400 Subject: [PATCH 6/7] fix: exclude local --- py_hamt/store_httpx.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/py_hamt/store_httpx.py b/py_hamt/store_httpx.py index 56d4c31..a2fb463 100644 --- a/py_hamt/store_httpx.py +++ b/py_hamt/store_httpx.py @@ -310,7 +310,10 @@ async def load(self, id: IPLDKind) -> bytes: url: str = f"{self.gateway_base_url + str(cid)}" headers = {} # Necessary as ipfs public gateways return html - if "dclimate" not in self.gateway_base_url: + if ( + "dclimate" not in self.gateway_base_url + and "127.0.0" not in self.gateway_base_url + ): headers["Accept"] = ( "application/vnd.ipld.raw, application/vnd.ipld.dag-cbor, application/octet-stream" ) From 93d709ee05c2f3f57b6b8db5f38d863e32aabc9c Mon Sep 17 00:00:00 2001 From: TheGreatAlgo <37487508+TheGreatAlgo@users.noreply.github.com> Date: Fri, 13 Jun 2025 14:24:31 -0400 Subject: [PATCH 7/7] fix: remove headers --- py_hamt/store_httpx.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/py_hamt/store_httpx.py b/py_hamt/store_httpx.py index a2fb463..2869b94 100644 --- a/py_hamt/store_httpx.py +++ b/py_hamt/store_httpx.py @@ -308,19 +308,9 @@ async def load(self, id: IPLDKind) -> bytes: """@private""" cid = cast(CID, id) # CID is definitely in the IPLDKind type url: str = f"{self.gateway_base_url + str(cid)}" - headers = {} - # Necessary as ipfs public gateways return html - if ( - "dclimate" not in self.gateway_base_url - and "127.0.0" not in self.gateway_base_url - ): - headers["Accept"] = ( - "application/vnd.ipld.raw, application/vnd.ipld.dag-cbor, application/octet-stream" - ) - url = f"{url}?format=dag-cbor" async with self._sem: # throttle gateway client = self._loop_client() - response = await client.get(url, headers=headers) + response = await client.get(url) response.raise_for_status() return response.content