diff --git a/.github/workflows/run-checks.yaml b/.github/workflows/run-checks.yaml index 3477381..1782271 100644 --- a/.github/workflows/run-checks.yaml +++ b/.github/workflows/run-checks.yaml @@ -21,7 +21,7 @@ jobs: - name: Install IPFS uses: ibnesayeed/setup-ipfs@master with: - ipfs_version: "0.32.1" + ipfs_version: "0.34.1" run_daemon: true id: ipfs_setup @@ -77,7 +77,7 @@ jobs: # Test Nginx config sudo nginx -t - + - name: Start Nginx and restart IPFS daemon run: | # Start Nginx diff --git a/tests/test_zarr_ipfs.py b/tests/test_zarr_ipfs.py index 3344181..f23b5aa 100644 --- a/tests/test_zarr_ipfs.py +++ b/tests/test_zarr_ipfs.py @@ -1,10 +1,15 @@ +import json import os import shutil +import socket +import subprocess import tempfile import time +from pathlib import Path import numpy as np import pandas as pd +import requests import xarray as xr import pytest import zarr.core.buffer @@ -14,17 +19,13 @@ @pytest.fixture(scope="module") def random_zarr_dataset(): - """Creates a random xarray Dataset and saves it to a temporary zarr store. + """Creates a random xarray Dataset. Returns: tuple: (dataset_path, expected_data) - dataset_path: Path to the zarr store - expected_data: The original xarray Dataset for comparison """ - # Create temporary directory for zarr store - temp_dir = tempfile.mkdtemp() - zarr_path = os.path.join(temp_dir, "test.zarr") - # Coordinates of the random data times = pd.date_range("2024-01-01", periods=100) lats = np.linspace(-90, 90, 18) @@ -56,22 +57,84 @@ def random_zarr_dataset(): attrs={"description": "Test dataset with random weather data"}, ) - ds.to_zarr(zarr_path, mode="w") + yield ds + + +def find_free_port() -> int: + with socket.socket() as s: + s.bind(("", 0)) # Bind to a free port provided by the host. + return int(s.getsockname()[1]) # Return the port number assigned. - yield zarr_path, ds - # Cleanup +@pytest.fixture +def create_ipfs(): + # Create temporary directory, set it as the IPFS Path + temp_dir = Path(tempfile.mkdtemp()) + custom_env = os.environ.copy() + custom_env["IPFS_PATH"] = str(temp_dir) + + # IPFS init + subprocess.run( + ["ipfs", "init", "--profile", "pebbleds"], check=True, env=custom_env + ) + + # Edit the config file so that it serves on randomly selected and available ports to not conflict with any currently running ipfs daemons + swarm_port = find_free_port() + rpc_port = find_free_port() + gateway_port = find_free_port() + + config_path = temp_dir / "config" + config: dict + with open(config_path, "r") as f: + config = json.load(f) + + swarm_addrs: list[str] = config["Addresses"]["Swarm"] + new_port_swarm_addrs = [s.replace("4001", str(swarm_port)) for s in swarm_addrs] + config["Addresses"]["Swarm"] = new_port_swarm_addrs + + rpc_multiaddr = config["Addresses"]["API"] + gateway_multiaddr = config["Addresses"]["Gateway"] + + config["Addresses"]["API"] = rpc_multiaddr.replace("5001", str(rpc_port)) + config["Addresses"]["Gateway"] = gateway_multiaddr.replace( + "8080", str(gateway_port) + ) + + with open(config_path, "w") as f: + json.dump(config, f, indent=2) + + # Start the daemon + rpc_uri_stem = f"http://127.0.0.1:{rpc_port}" + gateway_uri_stem = f"http://127.0.0.1:{gateway_port}" + + ipfs_process = subprocess.Popen(["ipfs", "daemon"], env=custom_env) + while True: + try: + requests.post(rpc_uri_stem + "/api/v0/id", timeout=1) + break + except requests.exceptions.ConnectionError: + time.sleep(1) + + yield rpc_uri_stem, gateway_uri_stem + + # Close the daemon + ipfs_process.kill() + + # Delete the temporary directory shutil.rmtree(temp_dir) # This test also collects miscellaneous statistics about performance, run with pytest -s to see these statistics being printed out @pytest.mark.asyncio -async def test_write_read(random_zarr_dataset: tuple[str, xr.Dataset]): - _, test_ds = random_zarr_dataset +async def test_write_read(create_ipfs, random_zarr_dataset: xr.Dataset): + rpc_uri_stem, gateway_uri_stem = create_ipfs + test_ds = random_zarr_dataset print("=== Writing this xarray Dataset to a Zarr v3 on IPFS ===") print(test_ds) - ipfsstore = IPFSStore(debug=True) + ipfsstore = IPFSStore( + debug=True, rpc_uri_stem=rpc_uri_stem, gateway_uri_stem=gateway_uri_stem + ) hamt = HAMT(store=ipfsstore) ipfszarr3 = IPFSZarr3(hamt) assert ipfszarr3.supports_writes @@ -90,7 +153,9 @@ async def test_write_read(random_zarr_dataset: tuple[str, xr.Dataset]): print(cid) print("=== Reading data back in and checking if identical") - ipfsstore = IPFSStore(debug=True) + ipfsstore = IPFSStore( + debug=True, rpc_uri_stem=rpc_uri_stem, gateway_uri_stem=gateway_uri_stem + ) hamt = HAMT(store=ipfsstore, root_node_id=cid) start = time.perf_counter() ipfs_ds: xr.Dataset @@ -176,8 +241,9 @@ async def test_write_read(random_zarr_dataset: tuple[str, xr.Dataset]): assert previous_zarr_json.to_bytes() == zarr_json_now.to_bytes() -def test_encryption(random_zarr_dataset: tuple[str, xr.Dataset]): - _, test_ds = random_zarr_dataset +def test_encryption(create_ipfs, random_zarr_dataset: xr.Dataset): + rpc_uri_stem, gateway_uri_stem = create_ipfs + test_ds = random_zarr_dataset with pytest.raises(ValueError, match="Encryption key is not 32 bytes"): create_zarr_encryption_transformers(bytes(), bytes()) @@ -191,7 +257,9 @@ def test_encryption(random_zarr_dataset: tuple[str, xr.Dataset]): detect_exclude=test_ds, ) hamt = HAMT( - store=IPFSStore(), transformer_encode=encrypt, transformer_decode=decrypt + store=IPFSStore(rpc_uri_stem=rpc_uri_stem, gateway_uri_stem=gateway_uri_stem), + transformer_encode=encrypt, + transformer_decode=decrypt, ) ipfszarr3 = IPFSZarr3(hamt) test_ds.to_zarr(store=ipfszarr3) # type: ignore @@ -212,7 +280,9 @@ def test_encryption(random_zarr_dataset: tuple[str, xr.Dataset]): ds = xr.open_zarr( store=IPFSZarr3( HAMT( - store=IPFSStore(), + store=IPFSStore( + rpc_uri_stem=rpc_uri_stem, gateway_uri_stem=gateway_uri_stem + ), root_node_id=ipfszarr3.hamt.root_node_id, transformer_encode=bad_encrypt, transformer_decode=auto_detecting_decrypt, @@ -221,7 +291,7 @@ def test_encryption(random_zarr_dataset: tuple[str, xr.Dataset]): ) ) print(ds) - assert ds.temp.sum() == test_ds.temp.sum() + assert ds.temp.sum() == test_ds.temp.sum() # type: ignore # We should be unable to read precipitation values which are still encrypted with pytest.raises(Exception): ds.precip.sum() @@ -235,7 +305,9 @@ def test_encryption(random_zarr_dataset: tuple[str, xr.Dataset]): ds = xr.open_zarr( store=IPFSZarr3( HAMT( - store=IPFSStore(), + store=IPFSStore( + rpc_uri_stem=rpc_uri_stem, gateway_uri_stem=gateway_uri_stem + ), root_node_id=ipfszarr3.hamt.root_node_id, transformer_encode=bad_encrypt, transformer_decode=bad_decrypt, @@ -247,8 +319,8 @@ def test_encryption(random_zarr_dataset: tuple[str, xr.Dataset]): # This test assumes the other zarr ipfs tests are working fine, so if other things are breaking check those first -def test_authenticated_gateway(random_zarr_dataset: tuple[str, xr.Dataset]): - _, test_ds = random_zarr_dataset +def test_authenticated_gateway(random_zarr_dataset: xr.Dataset): + test_ds = random_zarr_dataset def write_and_check(store: IPFSStore) -> bool: try: