diff --git a/.github/workflows/pythonpackage.yml b/.github/disabled_workflows/pythonpackage.yml similarity index 100% rename from .github/workflows/pythonpackage.yml rename to .github/disabled_workflows/pythonpackage.yml diff --git a/.github/workflows/default_gateways.yml b/.github/workflows/default_gateways.yml new file mode 100644 index 0000000..d94a6c0 --- /dev/null +++ b/.github/workflows/default_gateways.yml @@ -0,0 +1,34 @@ +name: Test without local gateway + +on: [push] + +jobs: + test: + + runs-on: ubuntu-latest + strategy: + max-parallel: 4 + matrix: + python-version: [3.9] + + steps: + - uses: actions/checkout@v1 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v1 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e . + - name: Lint with flake8 + run: | + pip install flake8 + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + # The GitHub editor is 127 chars wide + flake8 . --count --max-complexity=10 --max-line-length=127 --statistics + - name: Test with pytest + run: | + pip install pytest pytest-asyncio + pytest -m "not local_gw" diff --git a/.github/workflows/local_gateway.yml b/.github/workflows/local_gateway.yml index c5d52fd..e0e2c69 100644 --- a/.github/workflows/local_gateway.yml +++ b/.github/workflows/local_gateway.yml @@ -9,8 +9,11 @@ jobs: strategy: max-parallel: 4 matrix: - python-version: [3.9] - ipfs-version: ["0.8.0"] + python-version: ["3.8", "3.9", "3.10"] + ipfs-version: ["0.12.0"] + include: + - python-version: "3.10" + ipfs-version: "0.9.1" env: IPFSSPEC_GATEWAYS: "http://127.0.0.1:8080" # use only localhost as gateway steps: @@ -27,12 +30,16 @@ jobs: run: | wget https://dist.ipfs.io/go-ipfs/v${{ matrix.ipfs-version }}/go-ipfs_v${{ matrix.ipfs-version }}_linux-amd64.tar.gz tar -xvzf go-ipfs_v${{ matrix.ipfs-version }}_linux-amd64.tar.gz - cd go-ipfs + pushd go-ipfs sudo bash install.sh + sudo sysctl -w net.core.rmem_max=2500000 + popd ipfs --version ipfs init --profile server - ipfs daemon > ipfs.log & + ipfs daemon 2>ipfs.log | grep -i -o -m1 'Daemon is ready' & tail -f --pid=$! ipfs.log + ipfs cat /ipfs/QmQPeNsJPyVWPFDVHb77w8G42Fvo15z4bG2X8D2GhfbSXc/readme + ipfs dag import test/testdata.car - name: Test with pytest run: | - pip install pytest + pip install pytest pytest-asyncio pytest diff --git a/ipfsspec/__init__.py b/ipfsspec/__init__.py index 8ee98f0..f77cc28 100644 --- a/ipfsspec/__init__.py +++ b/ipfsspec/__init__.py @@ -1,10 +1,12 @@ from .core import IPFSFileSystem +from .async_ipfs import AsyncIPFSFileSystem from fsspec import register_implementation from ._version import get_versions __version__ = get_versions()['version'] del get_versions -register_implementation(IPFSFileSystem.protocol, IPFSFileSystem) +# register_implementation(IPFSFileSystem.protocol, IPFSFileSystem) +register_implementation(AsyncIPFSFileSystem.protocol, AsyncIPFSFileSystem) -__all__ = ["__version__", "IPFSFileSystem"] +__all__ = ["__version__", "IPFSFileSystem", "AsyncIPFSFileSystem"] diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py new file mode 100644 index 0000000..aad28c7 --- /dev/null +++ b/ipfsspec/async_ipfs.py @@ -0,0 +1,333 @@ +import io +import time +import weakref + +import asyncio +import aiohttp + +from fsspec.asyn import AsyncFileSystem, sync, sync_wrapper +from fsspec.exceptions import FSTimeoutError + +from .core import get_default_gateways + +import logging + +logger = logging.getLogger("ipfsspec") + + +class RequestsTooQuick(OSError): + def __init__(self, retry_after=None): + self.retry_after = retry_after + + +class AsyncIPFSGatewayBase: + async def stat(self, path, session): + res = await self.api_get("files/stat", session, arg=path) + self._raise_not_found_for_status(res, path) + return await res.json() + + async def file_info(self, path, session): + info = {"name": path} + + headers = {"Accept-Encoding": "identity"} # this ensures correct file size + res = await self.cid_head(path, session, headers=headers) + + async with res: + self._raise_not_found_for_status(res, path) + if res.status != 200: + # TODO: maybe handle 301 here + raise FileNotFoundError(path) + if "Content-Length" in res.headers: + info["size"] = int(res.headers["Content-Length"]) + elif "Content-Range" in res.headers: + info["size"] = int(res.headers["Content-Range"].split("/")[1]) + + if "ETag" in res.headers: + etag = res.headers["ETag"].strip("\"") + info["ETag"] = etag + if etag.startswith("DirIndex"): + info["type"] = "directory" + info["CID"] = etag.split("-")[-1] + else: + info["type"] = "file" + info["CID"] = etag + + return info + + async def cat(self, path, session): + res = await self.cid_get(path, session) + async with res: + self._raise_not_found_for_status(res, path) + if res.status != 200: + raise FileNotFoundError(path) + return await res.read() + + async def ls(self, path, session): + res = await self.api_get("ls", session, arg=path) + self._raise_not_found_for_status(res, path) + resdata = await res.json() + types = {1: "directory", 2: "file"} + return [{ + "name": path + "/" + link["Name"], + "CID": link["Hash"], + "type": types[link["Type"]], + "size": link["Size"], + } + for link in resdata["Objects"][0]["Links"]] + + def _raise_not_found_for_status(self, response, url): + """ + Raises FileNotFoundError for 404s, otherwise uses raise_for_status. + """ + if response.status == 404: + raise FileNotFoundError(url) + elif response.status == 400: + raise FileNotFoundError(url) + response.raise_for_status() + + +class AsyncIPFSGateway(AsyncIPFSGatewayBase): + resolution = "path" + + def __init__(self, url): + self.url = url + + async def api_get(self, endpoint, session, **kwargs): + res = await session.get(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url}) + self._raise_requests_too_quick(res) + return res + + async def api_post(self, endpoint, session, **kwargs): + res = await session.post(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url}) + self._raise_requests_too_quick(res) + return res + + async def _cid_req(self, method, path, headers=None, **kwargs): + headers = headers or {} + if self.resolution == "path": + res = await method(self.url + "/ipfs/" + path, trace_request_ctx={'gateway': self.url}, headers=headers) + elif self.resolution == "subdomain": + raise NotImplementedError("subdomain resolution is not yet implemented") + else: + raise NotImplementedError(f"'{self.resolution}' resolution is not known") + # TODO: maybe handle 301 here + self._raise_requests_too_quick(res) + return res + + async def cid_head(self, path, session, headers=None, **kwargs): + return await self._cid_req(session.head, path, headers=headers, **kwargs) + + async def cid_get(self, path, session, headers=None, **kwargs): + return await self._cid_req(session.get, path, headers=headers, **kwargs) + + async def version(self, session): + res = await self.api_get("version", session) + res.raise_for_status() + return await res.json() + + @staticmethod + def _raise_requests_too_quick(response): + if response.status == 429: + if "retry-after" in response.headers: + retry_after = int(response.headers["retry-after"]) + else: + retry_after = None + raise RequestsTooQuick(retry_after) + + def __str__(self): + return f"GW({self.url})" + + +class GatewayState: + def __init__(self): + self.reachable = True + self.next_request_time = 0 + self.backoff_time = 0 + self.start_backoff = 1e-5 + self.max_backoff = 5 + + def schedule_next(self): + self.next_request_time = time.monotonic() + self.backoff_time + + def backoff(self): + if self.backoff_time < self.start_backoff: + self.backoff_time = self.start_backoff + else: + self.backoff_time *= 2 + self.reachable = True + self.schedule_next() + + def speedup(self, not_below=0): + did_speed_up = False + if self.backoff_time > not_below: + self.backoff_time *= 0.9 + did_speed_up = True + self.reachable = True + self.schedule_next() + return did_speed_up + + def broken(self): + self.backoff_time = self.max_backoff + self.reachable = False + self.schedule_next() + + def trying_to_reach(self): + self.next_request_time = time.monotonic() + 1 + + +class MultiGateway(AsyncIPFSGatewayBase): + def __init__(self, gws, max_backoff_rounds=50): + self.gws = [(GatewayState(), gw) for gw in gws] + self.max_backoff_rounds = max_backoff_rounds + + @property + def _gws_in_priority_order(self): + now = time.monotonic() + return sorted(self.gws, key=lambda x: max(now, x[0].next_request_time)) + + async def _gw_op(self, op): + for _ in range(self.max_backoff_rounds): + for state, gw in self._gws_in_priority_order: + not_before = state.next_request_time + if not state.reachable: + state.trying_to_reach() + else: + state.schedule_next() + now = time.monotonic() + if not_before > now: + await asyncio.sleep(not_before - now) + logger.debug("tring %s", gw) + try: + res = await op(gw) + if state.speedup(time.monotonic() - now): + logger.debug("%s speedup", gw) + return res + except (RequestsTooQuick, aiohttp.ClientResponseError, asyncio.TimeoutError) as e: + state.backoff() + logger.debug("%s backoff %s", gw, e) + break + except IOError as e: + exception = e + state.broken() + logger.debug("%s broken", gw) + continue + else: + raise exception + raise RequestsTooQuick() + + async def api_get(self, endpoint, session, **kwargs): + return await self._gw_op(lambda gw: gw.api_get(endpoint, session, **kwargs)) + + async def api_post(self, endpoint, session, **kwargs): + return await self._gw_op(lambda gw: gw.api_post(endpoint, session, **kwargs)) + + async def cid_head(self, path, session, headers=None, **kwargs): + return await self._gw_op(lambda gw: gw.cid_head(path, session, headers=headers, **kwargs)) + + async def cid_get(self, path, session, headers=None, **kwargs): + return await self._gw_op(lambda gw: gw.cid_get(path, session, headers=headers, **kwargs)) + + async def cat(self, path, session): + return await self._gw_op(lambda gw: gw.cat(path, session)) + + async def ls(self, path, session): + return await self._gw_op(lambda gw: gw.ls(path, session)) + + def state_report(self): + return "\n".join(f"{s.next_request_time}, {gw}" for s, gw in self.gws) + + def __str__(self): + return "Multi-GW(" + ", ".join(str(gw) for _, gw in self.gws) + ")" + + +async def get_client(**kwargs): + timeout = aiohttp.ClientTimeout(sock_connect=1, sock_read=5) + kwargs = {"timeout": timeout, **kwargs} + return aiohttp.ClientSession(**kwargs) + + +DEFAULT_GATEWAY = None + + +def get_gateway(): + global DEFAULT_GATEWAY + if DEFAULT_GATEWAY is None: + use_gateway(*get_default_gateways()) + return DEFAULT_GATEWAY + + +def use_gateway(*urls): + global DEFAULT_GATEWAY + if len(urls) == 1: + DEFAULT_GATEWAY = AsyncIPFSGateway(urls[0]) + else: + DEFAULT_GATEWAY = MultiGateway([AsyncIPFSGateway(url) for url in urls]) + + +class AsyncIPFSFileSystem(AsyncFileSystem): + sep = "/" + protocol = "ipfs" + + def __init__(self, asynchronous=False, loop=None, client_kwargs=None, **storage_options): + super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options) + self._session = None + + self.client_kwargs = client_kwargs or {} + self.get_client = get_client + + if not asynchronous: + sync(self.loop, self.set_session) + + @property + def gateway(self): + return get_gateway() + + @staticmethod + def close_session(loop, session): + if loop is not None and loop.is_running(): + try: + sync(loop, session.close, timeout=0.1) + return + except (TimeoutError, FSTimeoutError): + pass + if session._connector is not None: + # close after loop is dead + session._connector._close() + + async def set_session(self): + if self._session is None: + self._session = await self.get_client(loop=self.loop, **self.client_kwargs) + if not self.asynchronous: + weakref.finalize(self, self.close_session, self.loop, self._session) + return self._session + + async def _ls(self, path, detail=True, **kwargs): + path = self._strip_protocol(path) + session = await self.set_session() + res = await self.gateway.ls(path, session) + if detail: + return res + else: + return [r["name"] for r in res] + + ls = sync_wrapper(_ls) + + async def _cat_file(self, path, start=None, end=None, **kwargs): + path = self._strip_protocol(path) + session = await self.set_session() + return (await self.gateway.cat(path, session))[start:end] + + async def _info(self, path, **kwargs): + path = self._strip_protocol(path) + session = await self.set_session() + return await self.gateway.file_info(path, session) + + def open(self, path, mode="rb", block_size=None, cache_options=None, **kwargs): + if mode != "rb": + raise NotImplementedError + data = self.cat_file(path) # load whole chunk into memory + return io.BytesIO(data) + + def ukey(self, path): + """returns the CID, which is by definition an unchanging identitifer""" + return self.info(path)["CID"] diff --git a/ipfsspec/tracing.py b/ipfsspec/tracing.py new file mode 100644 index 0000000..37adb79 --- /dev/null +++ b/ipfsspec/tracing.py @@ -0,0 +1,24 @@ +import asyncio +import aiohttp + + +class GatewayTracer: + def __init__(self): + from collections import defaultdict + self.samples = defaultdict(list) + + def make_trace_config(self): + trace_config = aiohttp.TraceConfig() + trace_config.on_request_start.append(self.on_request_start) + trace_config.on_request_end.append(self.on_request_end) + return trace_config + + async def on_request_start(self, session, trace_config_ctx, params): + trace_config_ctx.start = asyncio.get_event_loop().time() + + async def on_request_end(self, session, trace_config_ctx, params): + trace_config_ctx.end = asyncio.get_event_loop().time() + elapsed = trace_config_ctx.end - trace_config_ctx.start + status = params.response.status + gateway = trace_config_ctx.trace_request_ctx.get("gateway", None) + self.samples[gateway].append({"url": params.url, "method": params.method, "elapsed": elapsed, "status": status}) diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..0e50de1 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +markers = + local_gw: marks tests requiring a local gateway (deselect with '-m "not local_gw"') diff --git a/setup.py b/setup.py index 3a9ad33..1c9fdaf 100644 --- a/setup.py +++ b/setup.py @@ -27,10 +27,11 @@ "requests", "protobuf>=3.15.8", "py-cid", + "aiohttp", ], entry_points={ 'fsspec.specs': [ - 'ipfs=ipfsspec.IPFSFileSystem', + 'ipfs=ipfsspec.AsyncIPFSFileSystem', ], }, ) diff --git a/test/test_async.py b/test/test_async.py new file mode 100644 index 0000000..c5711df --- /dev/null +++ b/test/test_async.py @@ -0,0 +1,98 @@ +import pytest +from ipfsspec.async_ipfs import AsyncIPFSGateway, MultiGateway, AsyncIPFSFileSystem +import aiohttp + +TEST_ROOT = "QmW3CrGFuFyF3VH1wvrap4Jend5NRTgtESDjuQ7QhHD5dd" +REF_CONTENT = b'ipfsspec test data' +TEST_FILENAMES = ["default", "multi", "raw", "raw_multi", "write"] + + +@pytest.fixture +async def session(): + async with aiohttp.ClientSession() as session: + yield session + + +@pytest.mark.local_gw +@pytest.mark.parametrize("gw_host", ["http://127.0.0.1:8080"]) +@pytest.mark.parametrize("filename", TEST_FILENAMES) +@pytest.mark.asyncio +async def test_different_file_representations(filename, gw_host, session): + gw = AsyncIPFSGateway(gw_host) + + path = TEST_ROOT + "/" + filename + info = await gw.file_info(path, session) + assert info["size"] == len(REF_CONTENT) + assert info["type"] == "file" + content = await gw.cat(path, session) + assert content == REF_CONTENT + + +@pytest.mark.local_gw +@pytest.mark.parametrize("gw_host", ["http://127.0.0.1:8080"]) +@pytest.mark.asyncio +async def test_get_cid_of_folder(gw_host, session): + gw = AsyncIPFSGateway(gw_host) + + info = await gw.file_info(TEST_ROOT, session) + assert info["CID"] == TEST_ROOT + + +@pytest.mark.local_gw +@pytest.mark.parametrize("gw_hosts", [ + ["http://127.0.0.1:8080"], + ["http://127.0.0.1:9999", "http://127.0.0.1:8080"], + ["http://127.0.0.1:8080", "http://127.0.0.1:9999"], + ["https://ipfs.io", "http://127.0.0.1:8080"], + ["http://127.0.0.1:8080", "https://ipfs.io"], +]) +@pytest.mark.asyncio +async def test_multi_gw_cat(gw_hosts, session): + gws = [AsyncIPFSGateway(gw_host) for gw_host in gw_hosts] + gw = MultiGateway(gws) + + res = await gw.cat(TEST_ROOT + "/default", session) + assert res == REF_CONTENT + + +@pytest.mark.asyncio +async def test_ls(event_loop): + AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop + fs = AsyncIPFSFileSystem(asynchronous=True, loop=event_loop) + res = await fs._ls(TEST_ROOT, detail=False) + assert res == [TEST_ROOT + fs.sep + fn for fn in TEST_FILENAMES] + res = await fs._ls(TEST_ROOT, detail=True) + assert [r["name"] for r in res] == [TEST_ROOT + fs.sep + fn for fn in TEST_FILENAMES] + assert all([r["size"] == len(REF_CONTENT) for r in res]) + + +@pytest.mark.asyncio +async def test_cat_file(event_loop): + AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop + fs = AsyncIPFSFileSystem(asynchronous=True, loop=event_loop) + res = await fs._cat_file(TEST_ROOT + "/default") + assert res == REF_CONTENT + res = await fs._cat_file(TEST_ROOT + "/default", start=3, end=7) + assert res == REF_CONTENT[3:7] + + +@pytest.mark.asyncio +async def test_exists(event_loop): + AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop + fs = AsyncIPFSFileSystem(asynchronous=True, loop=event_loop) + res = await fs._exists(TEST_ROOT + "/default") + assert res is True + res = await fs._exists(TEST_ROOT + "/missing") + assert res is False + res = await fs._exists("/missing") + assert res is False + + +@pytest.mark.asyncio +async def test_isfile(event_loop): + AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop + fs = AsyncIPFSFileSystem(asynchronous=True, loop=event_loop) + res = await fs._isfile(TEST_ROOT + "/default") + assert res is True + res = await fs._isfile(TEST_ROOT) + assert res is False diff --git a/test/test_async_fallback.py b/test/test_async_fallback.py new file mode 100644 index 0000000..b2f9a67 --- /dev/null +++ b/test/test_async_fallback.py @@ -0,0 +1,73 @@ +import pytest +import time + +from ipfsspec.async_ipfs import MultiGateway, AsyncIPFSGatewayBase, RequestsTooQuick + + +class MockGateway(AsyncIPFSGatewayBase): + def __init__(self, objects): + self.objects = objects + + async def cid_get(self, path, session, headers=None, **kwargs): + try: + return self.objects[path] + except KeyError: + raise FileNotFoundError(path) + + +class RateLimitedMockGateway(AsyncIPFSGatewayBase): + def __init__(self, max_rate, base, report_time=True): + self.request_count = 0 + self.next_allowed_request = time.monotonic() + + self.max_rate = max_rate + self.base = base + self.report_time = report_time + + def _rate_limit(self): + self.request_count += 1 + now = time.monotonic() + if now <= self.next_allowed_request: + raise RequestsTooQuick(self.next_allowed_request - now if self.report_time else None) + else: + self.next_allowed_request = now + self.max_rate + + async def cid_get(self, path, session, headers=None, **kwargs): + self._rate_limit() + return await self.base.cid_get(path, session, headers=headers, **kwargs) + + +@pytest.fixture +def session(): + return None + + +@pytest.mark.asyncio +async def test_backoff(session): + base = MockGateway({ + "QmTz3oc4gdpRMKP2sdGUPZTAGRngqjsi99BPoztyP53JMM": "bar", + }) + gws = [RateLimitedMockGateway(0.01, base)] + gw = MultiGateway(gws) + + for _ in range(50): + obj = await gw.cid_get("QmTz3oc4gdpRMKP2sdGUPZTAGRngqjsi99BPoztyP53JMM", session) + assert obj == "bar" + assert 50 <= gws[0].request_count < 240 + + +@pytest.mark.asyncio +async def test_backoff_use_faster_server(session): + base = MockGateway({ + "QmTz3oc4gdpRMKP2sdGUPZTAGRngqjsi99BPoztyP53JMM": "zapp", + }) + gws = [ + RateLimitedMockGateway(0.1, base), + RateLimitedMockGateway(0.01, base) + ] + gw = MultiGateway(gws) + + for _ in range(100): + obj = await gw.cid_get("QmTz3oc4gdpRMKP2sdGUPZTAGRngqjsi99BPoztyP53JMM", session) + assert obj == "zapp" + assert gws[0].request_count < gws[1].request_count diff --git a/test/test_gateway.py b/test/test_gateway.py index 6d5d4ed..757aa72 100644 --- a/test/test_gateway.py +++ b/test/test_gateway.py @@ -3,7 +3,7 @@ import base64 from mockserver import mock_servers -import fsspec +from ipfsspec.core import IPFSFileSystem from ipfsspec.unixfs_pb2 import Data as UnixFSData from http.server import BaseHTTPRequestHandler @@ -117,7 +117,7 @@ def test_backoff(): {"QmTz3oc4gdpRMKP2sdGUPZTAGRngqjsi99BPoztyP53JMM": "bar"}), ] with mock_servers(handlers) as gateways: - fs = fsspec.filesystem("ipfs", gateways=gateways, timeout=1) + fs = IPFSFileSystem(gateways=gateways, timeout=1) for _ in range(50): with fs.open("QmTz3oc4gdpRMKP2sdGUPZTAGRngqjsi99BPoztyP53JMM") as f: assert f.read().decode("utf-8") == "bar" @@ -134,7 +134,7 @@ def test_backoff_use_faster_server(): {"QmaBoSxocSYTx1WLw55NJ2rEkvt5WTJxxuUSCWtabHurqE": "zapp"}), ] with mock_servers(handlers) as gateways: - fs = fsspec.filesystem("ipfs", gateways=gateways, timeout=1) + fs = IPFSFileSystem(gateways=gateways, timeout=1) for _ in range(100): with fs.open("QmaBoSxocSYTx1WLw55NJ2rEkvt5WTJxxuUSCWtabHurqE") as f: assert f.read().decode("utf-8") == "zapp" @@ -149,6 +149,6 @@ def test_fallback_if_incomplete(): {"QmWLdkp93sNxGRjnFHPaYg8tCQ35NBY3XPn6KiETd3Z4WR": "baz"}), ] with mock_servers(handlers) as gateways: - fs = fsspec.filesystem("ipfs", gateways=gateways, timeout=1) + fs = IPFSFileSystem(gateways=gateways, timeout=1) with fs.open("QmWLdkp93sNxGRjnFHPaYg8tCQ35NBY3XPn6KiETd3Z4WR") as f: assert f.read().decode("utf-8") == "baz" diff --git a/test/testdata.car b/test/testdata.car new file mode 100644 index 0000000..6e1a0e9 Binary files /dev/null and b/test/testdata.car differ