Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/run-checks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- name: Install IPFS
uses: ibnesayeed/setup-ipfs@master
with:
ipfs_version: "0.32.1"
ipfs_version: "0.34.1"
run_daemon: true
id: ipfs_setup

Expand Down Expand Up @@ -77,7 +77,7 @@ jobs:

# Test Nginx config
sudo nginx -t

- name: Start Nginx and restart IPFS daemon
run: |
# Start Nginx
Expand Down
112 changes: 92 additions & 20 deletions tests/test_zarr_ipfs.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import json
import os
import shutil
import socket
import subprocess
import tempfile
import time
from pathlib import Path

import numpy as np
import pandas as pd
import requests
import xarray as xr
import pytest
import zarr.core.buffer
Expand All @@ -14,17 +19,13 @@

@pytest.fixture(scope="module")
def random_zarr_dataset():
"""Creates a random xarray Dataset and saves it to a temporary zarr store.
"""Creates a random xarray Dataset.

Returns:
tuple: (dataset_path, expected_data)
- dataset_path: Path to the zarr store
- expected_data: The original xarray Dataset for comparison
"""
# Create temporary directory for zarr store
temp_dir = tempfile.mkdtemp()
zarr_path = os.path.join(temp_dir, "test.zarr")

# Coordinates of the random data
times = pd.date_range("2024-01-01", periods=100)
lats = np.linspace(-90, 90, 18)
Expand Down Expand Up @@ -56,22 +57,84 @@ def random_zarr_dataset():
attrs={"description": "Test dataset with random weather data"},
)

ds.to_zarr(zarr_path, mode="w")
yield ds


def find_free_port() -> int:
with socket.socket() as s:
s.bind(("", 0)) # Bind to a free port provided by the host.
return int(s.getsockname()[1]) # Return the port number assigned.

yield zarr_path, ds

# Cleanup
@pytest.fixture
def create_ipfs():
# Create temporary directory, set it as the IPFS Path
temp_dir = Path(tempfile.mkdtemp())
custom_env = os.environ.copy()
custom_env["IPFS_PATH"] = str(temp_dir)

# IPFS init
subprocess.run(
["ipfs", "init", "--profile", "pebbleds"], check=True, env=custom_env
)

# Edit the config file so that it serves on randomly selected and available ports to not conflict with any currently running ipfs daemons
swarm_port = find_free_port()
rpc_port = find_free_port()
gateway_port = find_free_port()

config_path = temp_dir / "config"
config: dict
with open(config_path, "r") as f:
config = json.load(f)

swarm_addrs: list[str] = config["Addresses"]["Swarm"]
new_port_swarm_addrs = [s.replace("4001", str(swarm_port)) for s in swarm_addrs]
config["Addresses"]["Swarm"] = new_port_swarm_addrs

rpc_multiaddr = config["Addresses"]["API"]
gateway_multiaddr = config["Addresses"]["Gateway"]

config["Addresses"]["API"] = rpc_multiaddr.replace("5001", str(rpc_port))
config["Addresses"]["Gateway"] = gateway_multiaddr.replace(
"8080", str(gateway_port)
)

with open(config_path, "w") as f:
json.dump(config, f, indent=2)

# Start the daemon
rpc_uri_stem = f"http://127.0.0.1:{rpc_port}"
gateway_uri_stem = f"http://127.0.0.1:{gateway_port}"

ipfs_process = subprocess.Popen(["ipfs", "daemon"], env=custom_env)
while True:
try:
requests.post(rpc_uri_stem + "/api/v0/id", timeout=1)
break
except requests.exceptions.ConnectionError:
time.sleep(1)

yield rpc_uri_stem, gateway_uri_stem

# Close the daemon
ipfs_process.kill()

# Delete the temporary directory
shutil.rmtree(temp_dir)


# This test also collects miscellaneous statistics about performance, run with pytest -s to see these statistics being printed out
@pytest.mark.asyncio
async def test_write_read(random_zarr_dataset: tuple[str, xr.Dataset]):
_, test_ds = random_zarr_dataset
async def test_write_read(create_ipfs, random_zarr_dataset: xr.Dataset):
rpc_uri_stem, gateway_uri_stem = create_ipfs
test_ds = random_zarr_dataset
print("=== Writing this xarray Dataset to a Zarr v3 on IPFS ===")
print(test_ds)

ipfsstore = IPFSStore(debug=True)
ipfsstore = IPFSStore(
debug=True, rpc_uri_stem=rpc_uri_stem, gateway_uri_stem=gateway_uri_stem
)
hamt = HAMT(store=ipfsstore)
ipfszarr3 = IPFSZarr3(hamt)
assert ipfszarr3.supports_writes
Expand All @@ -90,7 +153,9 @@ async def test_write_read(random_zarr_dataset: tuple[str, xr.Dataset]):
print(cid)

print("=== Reading data back in and checking if identical")
ipfsstore = IPFSStore(debug=True)
ipfsstore = IPFSStore(
debug=True, rpc_uri_stem=rpc_uri_stem, gateway_uri_stem=gateway_uri_stem
)
hamt = HAMT(store=ipfsstore, root_node_id=cid)
start = time.perf_counter()
ipfs_ds: xr.Dataset
Expand Down Expand Up @@ -176,8 +241,9 @@ async def test_write_read(random_zarr_dataset: tuple[str, xr.Dataset]):
assert previous_zarr_json.to_bytes() == zarr_json_now.to_bytes()


def test_encryption(random_zarr_dataset: tuple[str, xr.Dataset]):
_, test_ds = random_zarr_dataset
def test_encryption(create_ipfs, random_zarr_dataset: xr.Dataset):
rpc_uri_stem, gateway_uri_stem = create_ipfs
test_ds = random_zarr_dataset

with pytest.raises(ValueError, match="Encryption key is not 32 bytes"):
create_zarr_encryption_transformers(bytes(), bytes())
Expand All @@ -191,7 +257,9 @@ def test_encryption(random_zarr_dataset: tuple[str, xr.Dataset]):
detect_exclude=test_ds,
)
hamt = HAMT(
store=IPFSStore(), transformer_encode=encrypt, transformer_decode=decrypt
store=IPFSStore(rpc_uri_stem=rpc_uri_stem, gateway_uri_stem=gateway_uri_stem),
transformer_encode=encrypt,
transformer_decode=decrypt,
)
ipfszarr3 = IPFSZarr3(hamt)
test_ds.to_zarr(store=ipfszarr3) # type: ignore
Expand All @@ -212,7 +280,9 @@ def test_encryption(random_zarr_dataset: tuple[str, xr.Dataset]):
ds = xr.open_zarr(
store=IPFSZarr3(
HAMT(
store=IPFSStore(),
store=IPFSStore(
rpc_uri_stem=rpc_uri_stem, gateway_uri_stem=gateway_uri_stem
),
root_node_id=ipfszarr3.hamt.root_node_id,
transformer_encode=bad_encrypt,
transformer_decode=auto_detecting_decrypt,
Expand All @@ -221,7 +291,7 @@ def test_encryption(random_zarr_dataset: tuple[str, xr.Dataset]):
)
)
print(ds)
assert ds.temp.sum() == test_ds.temp.sum()
assert ds.temp.sum() == test_ds.temp.sum() # type: ignore
# We should be unable to read precipitation values which are still encrypted
with pytest.raises(Exception):
ds.precip.sum()
Expand All @@ -235,7 +305,9 @@ def test_encryption(random_zarr_dataset: tuple[str, xr.Dataset]):
ds = xr.open_zarr(
store=IPFSZarr3(
HAMT(
store=IPFSStore(),
store=IPFSStore(
rpc_uri_stem=rpc_uri_stem, gateway_uri_stem=gateway_uri_stem
),
root_node_id=ipfszarr3.hamt.root_node_id,
transformer_encode=bad_encrypt,
transformer_decode=bad_decrypt,
Expand All @@ -247,8 +319,8 @@ def test_encryption(random_zarr_dataset: tuple[str, xr.Dataset]):


# This test assumes the other zarr ipfs tests are working fine, so if other things are breaking check those first
def test_authenticated_gateway(random_zarr_dataset: tuple[str, xr.Dataset]):
_, test_ds = random_zarr_dataset
def test_authenticated_gateway(random_zarr_dataset: xr.Dataset):
test_ds = random_zarr_dataset

def write_and_check(store: IPFSStore) -> bool:
try:
Expand Down
Loading