Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/run-checks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
run_daemon: true

- name: Run pytest with coverage
run: uv run pytest --cov=py_hamt tests/ --cov-report=xml
run: uv run pytest --ipfs --cov=py_hamt tests/ --cov-report=xml

- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@18283e04ce6e62d37312384ff67231eb8fd56d24 # v5
Expand Down
29 changes: 28 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ First, make sure you have the ipfs kubo daemon installed and running with the de
```sh
bash run-checks.sh
```
This will run tests with code coverage, and check formatting and linting. Under the hood it will be using the `pre-commit` command to run through all the checks within .pre-commit-config.yaml.
This will run tests with code coverage, and check formatting and linting. Under the hood it will be using the `pre-commit` command to run through all the checks within .pre-commit-config.yaml. If a local ipfs daemon is not running it will not run all tests, but it will spawn a docker ipfs container if docker is installed and run as many integration tests as possible.

We use `pytest` with 100% code coverage, and with test inputs that are both handwritten as well as generated by `hypothesis`. This allows us to try out millions of randomized inputs to create a more robust library.

Expand All @@ -55,6 +55,33 @@ We use `pytest` with 100% code coverage, and with test inputs that are both hand
> [!NOTE]
> Due to the restricted performance on GitHub actions runners, you may also sometimes see hypothesis tests running with errors because they exceeded test deadlines. Rerun the action if this happens.

### Tests

Due to the dependency on [IPFS](https://github.com/ipfs/kubo) in order to be able to run all integration tests which use IPFS a local ipfs daemon is required. The Github Actions found in `.github/workflows/run-checks.yaml` uses the `setup-ipfs` step which ensures that a local ipfs daemon is available. Locally if you wish to run the full integration tests you must ensure a local ipfs daemon is running (by running `ipfs daemon` once installed). If not, pytest will spawn a local docker image to run the ipfs tests. If [Docker](https://www.docker.com/) is not installed then tests will simply run the unit tests.

**To summarize:**

*In GitHub Actions:*
```bash IPFS daemon is running on default ports
uv run pytest --ipfs # All tests run, including test_kubo_default_urls
```

*Locally with Docker (no local daemon):*
```bash
pytest --ipfs # test_kubo_default_urls auto-skips, other tests use Docker
```

*Locally with IPFS daemon:*
```bash
pytest --ipfs # All tests run
```

*Quick local testing (no IPFS):*
```bash
pytest # All IPFS tests skip
```


## CPU and Memory Profiling
We use python's native `cProfile` for running CPU profiles and snakeviz for visualizing the profile. We use `memray` for the memory profiling. We will walk through using the profiling tools on the test suite.

Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ dev = [
"xarray[complete]>=2025.3.0",
"mypy>=1.15.0",
"pandas-stubs>=2.2.3.250527",
"docker>=7.1.0",
"types-docker>=7.1.0.20250523",
"pre-commit>=4.2.0",
]

Expand Down
2 changes: 1 addition & 1 deletion run-checks.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash

# Run pytest with coverage
uv run pytest --cov=py_hamt tests/
uv run pytest --ipfs --cov=py_hamt tests/

# Check coverage
uv run coverage report --fail-under=100 --show-missing
Expand Down
24 changes: 23 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,26 @@ async def global_client_session():
"""One aiohttp.ClientSession shared by the whole test run."""
async with aiohttp.ClientSession() as session:
yield session
# aiohttp’s async context manager awaits session.close() for us
# aiohttp's async context manager awaits session.close() for us


def pytest_addoption(parser):
parser.addoption(
"--ipfs",
action="store_true",
default=False,
help="run tests that require a Kubo daemon",
)


def pytest_configure(config):
config.addinivalue_line("markers", "ipfs: tests that need a live IPFS node")


def pytest_collection_modifyitems(config, items):
if config.getoption("--ipfs"):
return # user explicitly asked → run them
skip = pytest.mark.skip(reason="needs --ipfs to run")
for item in items:
if "ipfs" in item.keywords:
item.add_marker(skip)
197 changes: 138 additions & 59 deletions tests/test_kubo_cas.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest
from dag_cbor import IPLDKind
from hypothesis import given, settings
from testing_utils import create_ipfs, ipld_strategy # noqa
from testing_utils import ipld_strategy # noqa

from py_hamt import KuboCAS
from py_hamt.store import InMemoryCAS
Expand Down Expand Up @@ -43,94 +43,173 @@ async def test_memory_store_invalid_key_type():
await s.load(invalid_key)


# @pytest.mark.ipfs
# @pytest.mark.asyncio(loop_scope="session")
# @given(data=ipld_strategy())
# @settings(
# deadline=1000, print_blob=True
# ) # Increased deadline, print_blob for debugging
# async def test_kubo_default_urls(
# global_client_session, data: IPLDKind
# ): # Inject the session fixture
# """
# Tests KuboCAS using its default URLs and when None is passed for URLs,
# leveraging a globally managed aiohttp.ClientSession.
# """
# # Test Case 1: KuboCAS instantiated without explicit URLs (should use its defaults)
# # We pass the managed global_client_session to it.
# # KuboCAS itself is responsible for having default URLs if none are provided.
# async with KuboCAS(session=global_client_session) as kubo_cas_default:
# # print(f"Testing with default URLs: RPC={kubo_cas_default.rpc_base_url}, Gateway={kubo_cas_default.gateway_base_url}")
# encoded_data = dag_cbor.encode(data)

# for codec in ["raw", "dag-cbor"]:
# # The codec is a string, but we use Literal to ensure type safety
# # where codec_raw = "raw" and codec_dag_cbor = "dag-cbor"
# # necessary because when you iterate over a list of strings,
# # even if they are literal strings, mypy widens the type to just str
# codec_typed = cast(Literal["raw", "dag-cbor"], codec)
# # print(f"Saving with codec: {codec}, data: {data}")
# try:
# cid = await kubo_cas_default.save(encoded_data, codec=codec_typed)
# # print(f"Saved. CID: {cid}")
# loaded_encoded_data = await kubo_cas_default.load(cid)
# # print(f"Loaded encoded data length: {len(loaded_encoded_data)}")
# result = dag_cbor.decode(loaded_encoded_data)
# # print(f"Decoded result: {result}")
# assert (
# data == result
# ), f"Data mismatch for codec {codec} with default URLs"
# except Exception as e:
# pytest.fail(
# f"Error during KuboCAS default URL test (codec: {codec}): {e}"
# )

# # Test Case 2: KuboCAS instantiated with None for URLs (should also use its defaults)
# # We pass the managed global_client_session to it.
# async with KuboCAS(
# rpc_base_url=None, gateway_base_url=None, session=global_client_session
# ) as kubo_cas_none_urls:
# # print(f"Testing with None URLs: RPC={kubo_cas_none_urls.rpc_base_url}, Gateway={kubo_cas_none_urls.gateway_base_url}")
# encoded_data = dag_cbor.encode(
# data
# ) # Re-encode just in case, though it's the same data
# for codec in ["raw", "dag-cbor"]:
# # print(f"Saving with codec: {codec}, data: {data}")
# codec_typed = cast(Literal["raw", "dag-cbor"], codec)
# try:
# cid = await kubo_cas_none_urls.save(encoded_data, codec=codec_typed)
# # print(f"Saved. CID: {cid}")
# loaded_encoded_data = await kubo_cas_none_urls.load(cid)
# # print(f"Loaded encoded data length: {len(loaded_encoded_data)}")
# result = dag_cbor.decode(loaded_encoded_data)
# # print(f"Decoded result: {result}")
# assert data == result, f"Data mismatch for codec {codec} with None URLs"
# except Exception as e:
# pytest.fail(f"Error during KuboCAS None URL test (codec: {codec}): {e}")


# @given(data=ipld_strategy())
# @settings(deadline=1000)
# @pytest.mark.asyncio
# async def test_kubo_default_urls(data: IPLDKind):
# try:
# async with KuboCAS() as kubo_cas:
# for codec in ("raw", "dag-cbor"):
# cid = await kubo_cas.save(dag_cbor.encode(data), codec=codec)
# result = dag_cbor.decode(await kubo_cas.load(cid))
# assert data == result

# async with KuboCAS(gateway_base_url=None, rpc_base_url=None) as kubo_cas:
# for codec in ("raw", "dag-cbor"):
# cid = await kubo_cas.save(dag_cbor.encode(data), codec=codec)
# result = dag_cbor.decode(await kubo_cas.load(cid))
# assert data == result
# finally:
# # if Hypothesis cancels early, make sure every open CAS is closed
# for obj in list(globals().values()):
# if isinstance(obj, KuboCAS):
# await obj.aclose()


# Test that always works with Docker or local daemon
@pytest.mark.ipfs
@pytest.mark.asyncio(loop_scope="session")
@given(data=ipld_strategy())
@settings(
deadline=1000, print_blob=True
) # Increased deadline, print_blob for debugging
async def test_kubo_default_urls(
global_client_session, data: IPLDKind
): # Inject the session fixture
@settings(deadline=1000, print_blob=True)
async def test_kubo_urls_explicit(create_ipfs, global_client_session, data: IPLDKind):
"""
Tests KuboCAS using its default URLs and when None is passed for URLs,
leveraging a globally managed aiohttp.ClientSession.
Tests KuboCAS functionality with explicitly provided URLs.
Works with both Docker containers and local IPFS daemons.
"""
# Test Case 1: KuboCAS instantiated without explicit URLs (should use its defaults)
# We pass the managed global_client_session to it.
# KuboCAS itself is responsible for having default URLs if none are provided.
async with KuboCAS(session=global_client_session) as kubo_cas_default:
# print(f"Testing with default URLs: RPC={kubo_cas_default.rpc_base_url}, Gateway={kubo_cas_default.gateway_base_url}")
rpc_url, gateway_url = create_ipfs

# Test the same functionality but with explicit URLs
async with KuboCAS(
rpc_base_url=rpc_url,
gateway_base_url=gateway_url,
session=global_client_session,
) as kubo_cas:
encoded_data = dag_cbor.encode(data)
for codec in ["raw", "dag-cbor"]:
codec_typed = cast(Literal["raw", "dag-cbor"], codec)
cid = await kubo_cas.save(encoded_data, codec=codec_typed)
loaded_encoded_data = await kubo_cas.load(cid)
result = dag_cbor.decode(loaded_encoded_data)
assert data == result


@pytest.mark.ipfs
@pytest.mark.asyncio(loop_scope="session")
@given(data=ipld_strategy())
@settings(deadline=1000, print_blob=True)
async def test_kubo_default_urls(global_client_session, data: IPLDKind):
"""
Tests KuboCAS using its default URLs and when None is passed for URLs.
Requires a local IPFS daemon on default ports.
"""
# Check if local IPFS daemon is available on default ports
import http.client

try:
conn = http.client.HTTPConnection("127.0.0.1", 5001, timeout=1)
conn.request("POST", "/api/v0/version")
response = conn.getresponse()
if response.status != 200:
pytest.skip("No IPFS daemon running on default ports (127.0.0.1:5001)")
except Exception:
pytest.skip("No IPFS daemon running on default ports (127.0.0.1:5001)")

# Your original test code continues here
async with KuboCAS(session=global_client_session) as kubo_cas_default:
encoded_data = dag_cbor.encode(data)
for codec in ["raw", "dag-cbor"]:
# The codec is a string, but we use Literal to ensure type safety
# where codec_raw = "raw" and codec_dag_cbor = "dag-cbor"
# necessary because when you iterate over a list of strings,
# even if they are literal strings, mypy widens the type to just str
codec_typed = cast(Literal["raw", "dag-cbor"], codec)
# print(f"Saving with codec: {codec}, data: {data}")
try:
cid = await kubo_cas_default.save(encoded_data, codec=codec_typed)
# print(f"Saved. CID: {cid}")
loaded_encoded_data = await kubo_cas_default.load(cid)
# print(f"Loaded encoded data length: {len(loaded_encoded_data)}")
result = dag_cbor.decode(loaded_encoded_data)
# print(f"Decoded result: {result}")
assert data == result, (
f"Data mismatch for codec {codec} with default URLs"
)
assert data == result
except Exception as e:
pytest.fail(
f"Error during KuboCAS default URL test (codec: {codec}): {e}"
)

# Test Case 2: KuboCAS instantiated with None for URLs (should also use its defaults)
# We pass the managed global_client_session to it.
async with KuboCAS(
rpc_base_url=None, gateway_base_url=None, session=global_client_session
) as kubo_cas_none_urls:
# print(f"Testing with None URLs: RPC={kubo_cas_none_urls.rpc_base_url}, Gateway={kubo_cas_none_urls.gateway_base_url}")
encoded_data = dag_cbor.encode(
data
) # Re-encode just in case, though it's the same data
encoded_data = dag_cbor.encode(data)
for codec in ["raw", "dag-cbor"]:
# print(f"Saving with codec: {codec}, data: {data}")
codec_typed = cast(Literal["raw", "dag-cbor"], codec)
try:
cid = await kubo_cas_none_urls.save(encoded_data, codec=codec_typed)
# print(f"Saved. CID: {cid}")
loaded_encoded_data = await kubo_cas_none_urls.load(cid)
# print(f"Loaded encoded data length: {len(loaded_encoded_data)}")
result = dag_cbor.decode(loaded_encoded_data)
# print(f"Decoded result: {result}")
assert data == result, f"Data mismatch for codec {codec} with None URLs"
assert data == result
except Exception as e:
pytest.fail(f"Error during KuboCAS None URL test (codec: {codec}): {e}")


# @given(data=ipld_strategy())
# @settings(deadline=1000)
# @pytest.mark.asyncio
# async def test_kubo_default_urls(data: IPLDKind):
# try:
# async with KuboCAS() as kubo_cas:
# for codec in ("raw", "dag-cbor"):
# cid = await kubo_cas.save(dag_cbor.encode(data), codec=codec)
# result = dag_cbor.decode(await kubo_cas.load(cid))
# assert data == result

# async with KuboCAS(gateway_base_url=None, rpc_base_url=None) as kubo_cas:
# for codec in ("raw", "dag-cbor"):
# cid = await kubo_cas.save(dag_cbor.encode(data), codec=codec)
# result = dag_cbor.decode(await kubo_cas.load(cid))
# assert data == result
# finally:
# # if Hypothesis cancels early, make sure every open CAS is closed
# for obj in list(globals().values()):
# if isinstance(obj, KuboCAS):
# await obj.aclose()


@pytest.mark.asyncio
@given(data=ipld_strategy())
@settings(
Expand Down
Loading
Loading