In [6]:
import os
import time
import subprocess
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# List of IPFS chunk CIDs (Content Identifiers) for the Zarr v3 dataset
# (These would be discovered from the Zarr index/HAMT structure; example placeholders used here)
chunk_cids = [
    "bafkr4iceixa2nv3itpozuno26amplvxugf6bn5xhswfj35thfzjle4zk4u",
    "bafkr4iaithj6biui7bqpb2pxkd3pz7p53gfnkkev44oebjeapfzq3sppvu",
    "bafyb4ibnntnlje64tf3e22pgiwzytc55h2mzo56yxiq2minophnqpd5nhm",
    "bafkr4iceixa2nv3itpozuno26amplvxugf6bn5xhswfj35thfzjle4zk4u",
    "bafyb4ienc2cqtrk7s7dcffyrpfrb5f2wdiu447dltyudhhijlfvfkgfeq4",
    "bafkr4icknftq4yxm3333uhj5cdcckdj7y4at2c6mskmgscuqd5lbswpco4",
    "bafyb4ihjldd2gz6cg37i5zj53npls37zq7r3yhqdlickq5o34dhi7n2s4e",
    "bafkr4igariesbp4vud5v47vcdlmuvc4zhempphwbdji3rnkvz4bj4tdgzy",
    "bafkr4idtuvp3yvkbr5c7pepkwi6yqy7e3ozmejsh3chlsw7euw7rua7vpq",
    "bafkr4ie6yrr5pa25klzqip3fyoq5vdesdvtfps7foh3pzdms5thmhcyz2m",
    "bafyb4ifpfmiiup43czhoyatrmlo3jzu2oc4e3juhiszwdhp5uri4b3qn64",
    "bafyb4idl6qsptuzcpw3ytedxp3q3edudqnmp6ou6p42ganckhfpdg5natm",
    "bafkr4ihiqpk5tfc4wl3bgssqp2cj5nu5fsmo6y5lksrpo64aqemlffhtta",
    "bafyb4ibd3msajqpikjfhsap64eprlok7g55mxrol3kloh5bzyj4lre6qiu",
    "bafyb4icclyf6s5mj2y5twk5jpunujyulamvr2sm26ok7rgw4mj5s55f5va",
    "bafkr4igzyp5ezzx6scop4jskj5376u2aotquawgjmmlzbtxhyejm6inmqy",
]
gateway = "https://ipfs-gateway.dclimate.net/ipfs"

In [7]:
# Set up HTTP session with retry logic for resilience
session = Session()
retries = Retry(total=3, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])
adapter = HTTPAdapter(max_retries=retries)
session.mount("https://", adapter)
session.mount("http://", adapter)

# -------------------------------
# 1. Fetch chunks via HTTP (requests)
# -------------------------------
print("=== Fetching chunks via requests ===")
start_req = time.perf_counter()
total_bytes_req = 0
for cid in chunk_cids:
    url = f"{gateway}/{cid}"
    print(f"Fetching (requests): {url}")  # Log chunk URL
    try:
        response = session.get(url, timeout=10)
        response.raise_for_status()
        data = response.content
        size = len(data)
        total_bytes_req += size
        print(f"  -> Retrieved {size} bytes")
    except Exception as e:
        print(f"  Request failed for {url}: {e}")
end_req = time.perf_counter()
duration_req = end_req - start_req
throughput_req = (total_bytes_req / 1e6) / duration_req if duration_req > 0 else 0.0
print(f"\n[requests] Total downloaded: {total_bytes_req} bytes in {duration_req:.2f} s -> {throughput_req:.2f} MB/s\n")


=== Fetching chunks via requests ===
Fetching (requests): https://ipfs-gateway.dclimate.net/ipfs/bafkr4iceixa2nv3itpozuno26amplvxugf6bn5xhswfj35thfzjle4zk4u
  -> Retrieved 232 bytes
Fetching (requests): https://ipfs-gateway.dclimate.net/ipfs/bafkr4iaithj6biui7bqpb2pxkd3pz7p53gfnkkev44oebjeapfzq3sppvu
  -> Retrieved 190 bytes
Fetching (requests): https://ipfs-gateway.dclimate.net/ipfs/bafyb4ibnntnlje64tf3e22pgiwzytc55h2mzo56yxiq2minophnqpd5nhm
  -> Retrieved 708244 bytes
Fetching (requests): https://ipfs-gateway.dclimate.net/ipfs/bafkr4iceixa2nv3itpozuno26amplvxugf6bn5xhswfj35thfzjle4zk4u
  -> Retrieved 232 bytes
Fetching (requests): https://ipfs-gateway.dclimate.net/ipfs/bafyb4ienc2cqtrk7s7dcffyrpfrb5f2wdiu447dltyudhhijlfvfkgfeq4
  -> Retrieved 879007 bytes
Fetching (requests): https://ipfs-gateway.dclimate.net/ipfs/bafkr4icknftq4yxm3333uhj5cdcckdj7y4at2c6mskmgscuqd5lbswpco4
  -> Retrieved 164 bytes
Fetching (requests): https://ipfs-gateway.dclimate.net/ipfs/bafyb4ihjldd2gz6cg37i5zj53n

In [9]:
from concurrent.futures import ThreadPoolExecutor, as_completed

def fetch_chunk(cid: str, session: Session, gateway: str) -> int:
    """Fetch a single chunk and return its byte size."""
    url = f"{gateway}/{cid}"
    try:
        response = session.get(url, timeout=10)
        response.raise_for_status()
        size = len(response.content)
        print(f"✓ {cid[:8]}... -> {size} bytes")
        return size
    except Exception as e:
        print(f"✗ {cid[:8]}... failed: {e}")
        return 0

print("=== Fetching chunks via requests in parallel ===")

# Limit concurrency to avoid overwhelming the gateway
max_workers = min(16, len(chunk_cids))

start_parallel = time.perf_counter()
total_bytes_parallel = 0

with ThreadPoolExecutor(max_workers=max_workers) as executor:
    futures = [executor.submit(fetch_chunk, cid, session, gateway) for cid in chunk_cids]
    for future in as_completed(futures):
        total_bytes_parallel += future.result()

end_parallel = time.perf_counter()
duration_parallel = end_parallel - start_parallel
throughput_parallel = (total_bytes_parallel / 1e6) / duration_parallel if duration_parallel > 0 else 0.0

print(f"\n[parallel requests] Total downloaded: {total_bytes_parallel} bytes in {duration_parallel:.2f} s -> {throughput_parallel:.2f} MB/s")


=== Fetching chunks via requests in parallel ===
✓ bafkr4ig... -> 3214 bytes
✓ bafkr4id... -> 1727 bytes
✓ bafkr4ia... -> 190 bytes
✓ bafkr4ic... -> 232 bytes
✓ bafkr4ih... -> 12596 bytes
✓ bafkr4ic... -> 232 bytes
✓ bafkr4ic... -> 164 bytes
✓ bafkr4ig... -> 25423 bytes
✓ bafkr4ie... -> 221036 bytes
✓ bafyb4ih... -> 301428 bytes
✓ bafyb4ic... -> 511424 bytes
✓ bafyb4ib... -> 708244 bytes
✓ bafyb4ie... -> 879007 bytes
✓ bafyb4ib... -> 841346 bytes
✓ bafyb4if... -> 900680 bytes
✓ bafyb4id... -> 810919 bytes

[parallel requests] Total downloaded: 5217862 bytes in 2.08 s -> 2.51 MB/s


In [8]:
# -------------------------------
# 2. Fetch chunks via curl (subprocess)
# -------------------------------
print("=== Fetching chunks via curl ===")
total_bytes_curl = 0
total_time_curl = 0.0
for cid in chunk_cids:
    url = f"{gateway}/{cid}"
    print(f"Curl fetching: {url}")  # Log chunk URL
    try:
        # Use curl to fetch and discard output, capturing download size and time
        # -L: follow redirects, -sS: silent mode (show errors), -o: output to null, -w: format output
        result = subprocess.run(
            ["curl", "-L", "-sS", "-o", os.devnull, "-w", "%{size_download} %{time_total}", url],
            check=True, capture_output=True, text=True
        )
        out = result.stdout.strip().split()
        if len(out) == 2:
            size_downloaded = int(out[0])
            time_taken = float(out[1])
            total_bytes_curl += size_downloaded
            total_time_curl += time_taken
            print(f"  -> Retrieved {size_downloaded} bytes in {time_taken:.2f} s")
        else:
            print(f"  Unexpected curl output: {result.stdout}")
    except Exception as e:
        print(f"  Curl failed for {url}: {e}")
if total_time_curl > 0:
    throughput_curl = (total_bytes_curl / 1e6) / total_time_curl
    print(f"\n[curl] Total downloaded: {total_bytes_curl} bytes in {total_time_curl:.2f} s -> {throughput_curl:.2f} MB/s\n")
else:
    print("\n[curl] No data retrieved.\n")

=== Fetching chunks via curl ===
Curl fetching: https://ipfs-gateway.dclimate.net/ipfs/bafkr4iceixa2nv3itpozuno26amplvxugf6bn5xhswfj35thfzjle4zk4u
  -> Retrieved 232 bytes in 0.57 s
Curl fetching: https://ipfs-gateway.dclimate.net/ipfs/bafkr4iaithj6biui7bqpb2pxkd3pz7p53gfnkkev44oebjeapfzq3sppvu
  -> Retrieved 190 bytes in 0.50 s
Curl fetching: https://ipfs-gateway.dclimate.net/ipfs/bafyb4ibnntnlje64tf3e22pgiwzytc55h2mzo56yxiq2minophnqpd5nhm
  -> Retrieved 708244 bytes in 1.68 s
Curl fetching: https://ipfs-gateway.dclimate.net/ipfs/bafkr4iceixa2nv3itpozuno26amplvxugf6bn5xhswfj35thfzjle4zk4u
  -> Retrieved 232 bytes in 0.65 s
Curl fetching: https://ipfs-gateway.dclimate.net/ipfs/bafyb4ienc2cqtrk7s7dcffyrpfrb5f2wdiu447dltyudhhijlfvfkgfeq4
  -> Retrieved 879007 bytes in 2.24 s
Curl fetching: https://ipfs-gateway.dclimate.net/ipfs/bafkr4icknftq4yxm3333uhj5cdcckdj7y4at2c6mskmgscuqd5lbswpco4
  -> Retrieved 164 bytes in 0.67 s
Curl fetching: https://ipfs-gateway.dclimate.net/ipfs/bafyb4ihjldd2