From efb8d177a1aa70b700e048636d617f43085c7589 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Wed, 15 Sep 2021 16:58:56 +0200 Subject: [PATCH 01/32] WIP: first async draft --- ipfsspec/__init__.py | 4 +- ipfsspec/async_ipfs.py | 187 +++++++++++++++++++++++++++++++++++++++++ test/test_async.py | 66 +++++++++++++++ 3 files changed, 256 insertions(+), 1 deletion(-) create mode 100644 ipfsspec/async_ipfs.py create mode 100644 test/test_async.py diff --git a/ipfsspec/__init__.py b/ipfsspec/__init__.py index 8ee98f0..7ee15ca 100644 --- a/ipfsspec/__init__.py +++ b/ipfsspec/__init__.py @@ -1,4 +1,5 @@ from .core import IPFSFileSystem +from .async_ipfs import AsyncIPFSFileSystem from fsspec import register_implementation from ._version import get_versions @@ -6,5 +7,6 @@ del get_versions register_implementation(IPFSFileSystem.protocol, IPFSFileSystem) +register_implementation(AsyncIPFSFileSystem.protocol, AsyncIPFSFileSystem) -__all__ = ["__version__", "IPFSFileSystem"] +__all__ = ["__version__", "IPFSFileSystem", "AsyncIPFSFileSystem"] diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py new file mode 100644 index 0000000..947e0ac --- /dev/null +++ b/ipfsspec/async_ipfs.py @@ -0,0 +1,187 @@ +import asyncio +import weakref + +import aiohttp + +from fsspec.asyn import AsyncFileSystem, sync, sync_wrapper + +class AsyncIPFSGateway: + # stats + # roundtrip time + # troughput + + resolution = "path" + + def __init__(self, url): + self.url = url + + async def api_get(self, endpoint, session, **kwargs): + return await session.get(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url}) + + async def api_post(self, endpoint, session, **kwargs): + return await session.post(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url}) + + async def version(self, session): + res = await self.api_get("version", session) + res.raise_for_status() + return await res.json() + + async def stat(self, path, session): + res = await self.api_get("files/stat", session, arg=path) + res.raise_for_status() + return await res.json() + + async def file_info(self, path, session): + info = {"name": path} + + headers = {"Accept-Encoding": "identity"} # this ensures correct file size + if self.resolution == "path": + res = await session.head(self.url + "/ipfs/" + path, trace_request_ctx={'gateway': self.url}, headers=headers) + #print(res) + elif self.resolution == "subdomain": + raise NotImplementedError("subdomain resolution is not yet implemented") + else: + raise NotImplementedError(f"'{self.resolution}' resolution is not known") + + async with res: + self._raise_not_found_for_status(res, path) + if "Content-Length" in res.headers: + info["size"] = int(res.headers["Content-Length"]) + elif "Content-Range" in res.headers: + info["size"] = int(res.headers["Content-Range"].split("/")[1]) + + if "ETag" in res.headers: + etag = res.headers["ETag"].strip("\"") + info["ETag"] = etag + if etag.startswith("DirIndex"): + info["type"] = "directory" + info["CID"] = etag.split("-")[-1] + else: + info["type"] = "file" + info["CID"] = etag + + return info + + async def cat(self, path, session): + info = {"name": path} + + if self.resolution == "path": + res = await session.get(self.url + "/ipfs/" + path, trace_request_ctx={'gateway': self.url}) + elif self.resolution == "subdomain": + raise NotImplementedError("subdomain resolution is not yet implemented") + else: + raise NotImplementedError(f"'{self.resolution}' resolution is not known") + + async with res: + self._raise_not_found_for_status(res, path) + return await res.read() + + async def ls(self, path, session): + res = await self.api_get("ls", session, arg=path) + self._raise_not_found_for_status(res, path) + resdata = await res.json() + types = {1: "directory", 2: "file"} + return [{ + "name": path + "/" + link["Name"], + "CID": link["Hash"], + "type": types[link["Type"]], + "size": link["Size"], + } + for link in resdata["Objects"][0]["Links"]] + + def _raise_not_found_for_status(self, response, url): + """ + Raises FileNotFoundError for 404s, otherwise uses raise_for_status. + """ + if response.status == 404: + raise FileNotFoundError(url) + response.raise_for_status() + +# required functions +# cat +# ls +# info + +async def get_client(**kwargs): + return aiohttp.ClientSession(**kwargs) + +class AsyncIPFSFileSystem(AsyncFileSystem): + sep = "/" + protocol = "aipfs" + + def __init__(self, asynchronous=False, loop=None, client_kwargs=None, **storage_options): + super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options) + self._session = None + + self.client_kwargs = client_kwargs or {} + self.get_client = get_client + + self.gateway = AsyncIPFSGateway("http://127.0.0.1:8080") + + if not asynchronous: + sync(self.loop, self.set_session) + + @staticmethod + def close_session(loop, session): + if loop is not None and loop.is_running(): + try: + sync(loop, session.close, timeout=0.1) + return + except (TimeoutError, FSTimeoutError): + pass + if session._connector is not None: + # close after loop is dead + session._connector._close() + + async def set_session(self): + if self._session is None: + self._session = await self.get_client(loop=self.loop, **self.client_kwargs) + if not self.asynchronous: + weakref.finalize(self, self.close_session, self.loop, self._session) + return self._session + + async def _ls(self, path, detail=True, **kwargs): + path = self._strip_protocol(path) + session = await self.set_session() + res = await self.gateway.ls(path, session) + if detail: + return res + else: + return [r["name"] for r in res] + + ls = sync_wrapper(_ls) + + async def _cat_file(self, path, start=None, end=None, **kwargs): + path = self._strip_protocol(path) + session = await self.set_session() + return (await self.gateway.cat(path, session))[start:end] + + async def _info(self, path, **kwargs): + path = self._strip_protocol(path) + session = await self.set_session() + return await self.gateway.file_info(path, session) + + +class GatewayTracer: + def __init__(self): + from collections import defaultdict + self.samples = defaultdict(list) + + def make_trace_config(self): + trace_config = aiohttp.TraceConfig() + trace_config.on_request_start.append(self.on_request_start) + trace_config.on_request_end.append(self.on_request_end) + return trace_config + + async def on_request_start(self, session, trace_config_ctx, params): + trace_config_ctx.start = asyncio.get_event_loop().time() + #print("Starting request") + + async def on_request_end(self, session, trace_config_ctx, params): + #print(trace_config_ctx) + trace_config_ctx.end = asyncio.get_event_loop().time() + elapsed = trace_config_ctx.end - trace_config_ctx.start + status = params.response.status + gateway = trace_config_ctx.trace_request_ctx.get("gateway", None) + self.samples[gateway].append({"url": params.url, "method": params.method, "elapsed": elapsed, "status": status}) + #print("Ending request") diff --git a/test/test_async.py b/test/test_async.py new file mode 100644 index 0000000..7cc5c6f --- /dev/null +++ b/test/test_async.py @@ -0,0 +1,66 @@ +import pytest +from ipfsspec.async_ipfs import AsyncIPFSGateway, AsyncIPFSFileSystem +import aiohttp + +TEST_ROOT = "QmW3CrGFuFyF3VH1wvrap4Jend5NRTgtESDjuQ7QhHD5dd" +REF_CONTENT = b'ipfsspec test data' +TEST_FILENAMES = ["default", "multi", "raw", "raw_multi", "write"] + + +@pytest.mark.parametrize("gw_host", ["http://127.0.0.1:8080"]) +@pytest.mark.parametrize("filename", TEST_FILENAMES) +@pytest.mark.asyncio +async def test_different_file_representations(filename, gw_host): + gw = AsyncIPFSGateway(gw_host) + async with aiohttp.ClientSession() as session: + path = TEST_ROOT + "/" + filename + info = await gw.file_info(path, session) + assert info["size"] == len(REF_CONTENT) + assert info["type"] == "file" + content = await gw.cat(path, session) + assert content == REF_CONTENT + +@pytest.mark.parametrize("gw_host", ["http://127.0.0.1:8080"]) +@pytest.mark.asyncio +async def test_get_cid_of_folder(gw_host): + gw = AsyncIPFSGateway(gw_host) + async with aiohttp.ClientSession() as session: + info = await gw.file_info(TEST_ROOT, session) + assert info["CID"] == TEST_ROOT + +@pytest.mark.asyncio +async def test_ls(event_loop): + AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop + fs = AsyncIPFSFileSystem(asynchronous=True, loop=event_loop) + res = await fs._ls(TEST_ROOT, detail=False) + assert res == [TEST_ROOT + fs.sep + fn for fn in TEST_FILENAMES] + res = await fs._ls(TEST_ROOT, detail=True) + assert [r["name"] for r in res] == [TEST_ROOT + fs.sep + fn for fn in TEST_FILENAMES] + assert all([r["size"] == len(REF_CONTENT) for r in res]) + +@pytest.mark.asyncio +async def test_cat_file(event_loop): + AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop + fs = AsyncIPFSFileSystem(asynchronous=True, loop=event_loop) + res = await fs._cat_file(TEST_ROOT + "/default") + assert res == REF_CONTENT + res = await fs._cat_file(TEST_ROOT + "/default", start=3, end=7) + assert res == REF_CONTENT[3:7] + +@pytest.mark.asyncio +async def test_exists(event_loop): + AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop + fs = AsyncIPFSFileSystem(asynchronous=True, loop=event_loop) + res = await fs._exists(TEST_ROOT + "/default") + assert res == True + res = await fs._exists(TEST_ROOT + "/missing") + assert res == False + +@pytest.mark.asyncio +async def test_isfile(event_loop): + AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop + fs = AsyncIPFSFileSystem(asynchronous=True, loop=event_loop) + res = await fs._isfile(TEST_ROOT + "/default") + assert res == True + res = await fs._isfile(TEST_ROOT) + assert res == False From 20b44ce56de2b7a240591f2aedac9e050b6a0675 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Wed, 15 Sep 2021 17:18:11 +0200 Subject: [PATCH 02/32] adding async dependencies --- .github/workflows/local_gateway.yml | 2 +- setup.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/local_gateway.yml b/.github/workflows/local_gateway.yml index c5d52fd..3a9e2de 100644 --- a/.github/workflows/local_gateway.yml +++ b/.github/workflows/local_gateway.yml @@ -34,5 +34,5 @@ jobs: ipfs daemon > ipfs.log & - name: Test with pytest run: | - pip install pytest + pip install pytest pytest-asyncio pytest diff --git a/setup.py b/setup.py index d7bc3b6..9ee11c3 100644 --- a/setup.py +++ b/setup.py @@ -27,6 +27,7 @@ "requests", "protobuf>=3.15.8", "py-cid", + "aiohttp", ], entry_points={ 'fsspec.specs': [ From a558efa1480f1fed905d4798f0410ed731c1782a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Wed, 15 Sep 2021 17:46:57 +0200 Subject: [PATCH 03/32] upgrade local ipfs version --- .github/workflows/local_gateway.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/local_gateway.yml b/.github/workflows/local_gateway.yml index 3a9e2de..83c9ae3 100644 --- a/.github/workflows/local_gateway.yml +++ b/.github/workflows/local_gateway.yml @@ -10,7 +10,7 @@ jobs: max-parallel: 4 matrix: python-version: [3.9] - ipfs-version: ["0.8.0"] + ipfs-version: ["0.9.1"] env: IPFSSPEC_GATEWAYS: "http://127.0.0.1:8080" # use only localhost as gateway steps: From 7955c5b337d81f67168ccb83be642a084f0bd86c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Wed, 15 Sep 2021 17:53:50 +0200 Subject: [PATCH 04/32] increase udp read buffer size See https://github.com/lucas-clemente/quic-go/wiki/UDP-Receive-Buffer-Size for details. Also show IPFS readme to verify it's working. --- .github/workflows/local_gateway.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/local_gateway.yml b/.github/workflows/local_gateway.yml index 83c9ae3..3c298e6 100644 --- a/.github/workflows/local_gateway.yml +++ b/.github/workflows/local_gateway.yml @@ -29,9 +29,11 @@ jobs: tar -xvzf go-ipfs_v${{ matrix.ipfs-version }}_linux-amd64.tar.gz cd go-ipfs sudo bash install.sh + sysctl -w net.core.rmem_max=2500000 ipfs --version ipfs init --profile server ipfs daemon > ipfs.log & + ipfs cat /ipfs/QmQPeNsJPyVWPFDVHb77w8G42Fvo15z4bG2X8D2GhfbSXc/readme - name: Test with pytest run: | pip install pytest pytest-asyncio From fba2cb64c2f6fd24d502082f5e811abfe407b753 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Wed, 15 Sep 2021 18:02:22 +0200 Subject: [PATCH 05/32] CI: wait for IPFS daemon to start --- .github/workflows/local_gateway.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/local_gateway.yml b/.github/workflows/local_gateway.yml index 3c298e6..177bc84 100644 --- a/.github/workflows/local_gateway.yml +++ b/.github/workflows/local_gateway.yml @@ -32,7 +32,7 @@ jobs: sysctl -w net.core.rmem_max=2500000 ipfs --version ipfs init --profile server - ipfs daemon > ipfs.log & + ipfs daemon 2>ipfs.log | grep -i -o -m1 'Daemon is ready' & tail -f --pid=$! ipfs.log ipfs cat /ipfs/QmQPeNsJPyVWPFDVHb77w8G42Fvo15z4bG2X8D2GhfbSXc/readme - name: Test with pytest run: | From 0dc8012ce7282331bf8a9e42ef9b63193d5532f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Wed, 15 Sep 2021 18:02:58 +0200 Subject: [PATCH 06/32] disabled workflows using remote gateways --- .github/{workflows => disabled_workflows}/pythonpackage.yml | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename .github/{workflows => disabled_workflows}/pythonpackage.yml (100%) diff --git a/.github/workflows/pythonpackage.yml b/.github/disabled_workflows/pythonpackage.yml similarity index 100% rename from .github/workflows/pythonpackage.yml rename to .github/disabled_workflows/pythonpackage.yml From c293512953b1a1c22bcb9a5fb2667b35d77c4a5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Wed, 15 Sep 2021 18:04:54 +0200 Subject: [PATCH 07/32] use sudo to increase UDP memory --- .github/workflows/local_gateway.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/local_gateway.yml b/.github/workflows/local_gateway.yml index 177bc84..f358fd0 100644 --- a/.github/workflows/local_gateway.yml +++ b/.github/workflows/local_gateway.yml @@ -29,7 +29,7 @@ jobs: tar -xvzf go-ipfs_v${{ matrix.ipfs-version }}_linux-amd64.tar.gz cd go-ipfs sudo bash install.sh - sysctl -w net.core.rmem_max=2500000 + sudo sysctl -w net.core.rmem_max=2500000 ipfs --version ipfs init --profile server ipfs daemon 2>ipfs.log | grep -i -o -m1 'Daemon is ready' & tail -f --pid=$! ipfs.log From d2a857d5a19809f36b2322a2088ccedbcb18d2ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Tue, 21 Sep 2021 17:21:28 +0200 Subject: [PATCH 08/32] return FileNotFound also for paths without CID --- ipfsspec/async_ipfs.py | 8 ++++++++ test/test_async.py | 2 ++ 2 files changed, 10 insertions(+) diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index 947e0ac..e84c93c 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -45,6 +45,9 @@ async def file_info(self, path, session): async with res: self._raise_not_found_for_status(res, path) + if res.status != 200: + # TODO: maybe handle 301 here + raise FileNotFoundError(path) if "Content-Length" in res.headers: info["size"] = int(res.headers["Content-Length"]) elif "Content-Range" in res.headers: @@ -74,6 +77,9 @@ async def cat(self, path, session): async with res: self._raise_not_found_for_status(res, path) + if res.status != 200: + # TODO: maybe handle 301 here + raise FileNotFoundError(path) return await res.read() async def ls(self, path, session): @@ -95,6 +101,8 @@ def _raise_not_found_for_status(self, response, url): """ if response.status == 404: raise FileNotFoundError(url) + elif response.status == 400: + raise FileNotFoundError(url) response.raise_for_status() # required functions diff --git a/test/test_async.py b/test/test_async.py index 7cc5c6f..16dadd7 100644 --- a/test/test_async.py +++ b/test/test_async.py @@ -55,6 +55,8 @@ async def test_exists(event_loop): assert res == True res = await fs._exists(TEST_ROOT + "/missing") assert res == False + res = await fs._exists("/missing") + assert res == False @pytest.mark.asyncio async def test_isfile(event_loop): From 341b1197f60cf46539c3cc976e995e78d9ea87d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Tue, 21 Sep 2021 17:22:08 +0200 Subject: [PATCH 09/32] added open() for async filesystem --- ipfsspec/async_ipfs.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index e84c93c..5a0880c 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -1,3 +1,4 @@ +import io import asyncio import weakref @@ -169,6 +170,12 @@ async def _info(self, path, **kwargs): session = await self.set_session() return await self.gateway.file_info(path, session) + def open(self, path, mode="rb", block_size=None, cache_options=None, **kwargs): + if mode != "rb": + raise NotImplementedError + data = self.cat_file(path) # load whole chunk into memory + return io.BytesIO(data) + class GatewayTracer: def __init__(self): From 6ec7f945e115d54d1d44530e1fe81bb0c611094a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Tue, 21 Sep 2021 17:22:22 +0200 Subject: [PATCH 10/32] return CID as ukey --- ipfsspec/async_ipfs.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index 5a0880c..83c7d49 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -176,6 +176,11 @@ def open(self, path, mode="rb", block_size=None, cache_options=None, **kwargs): data = self.cat_file(path) # load whole chunk into memory return io.BytesIO(data) + def ukey(self, path): + """returns the CID, which is by definition an unchanging identitifer""" + return self.info(path)["CID"] + + class GatewayTracer: def __init__(self): From c91dd1aae554bd8ef022c0d8a7a5b6a7b0a88222 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Tue, 21 Sep 2021 17:25:22 +0200 Subject: [PATCH 11/32] cleanup --- ipfsspec/async_ipfs.py | 34 ---------------------------------- ipfsspec/tracing.py | 20 ++++++++++++++++++++ 2 files changed, 20 insertions(+), 34 deletions(-) create mode 100644 ipfsspec/tracing.py diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index 83c7d49..2f386c3 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -7,10 +7,6 @@ from fsspec.asyn import AsyncFileSystem, sync, sync_wrapper class AsyncIPFSGateway: - # stats - # roundtrip time - # troughput - resolution = "path" def __init__(self, url): @@ -106,10 +102,6 @@ def _raise_not_found_for_status(self, response, url): raise FileNotFoundError(url) response.raise_for_status() -# required functions -# cat -# ls -# info async def get_client(**kwargs): return aiohttp.ClientSession(**kwargs) @@ -179,29 +171,3 @@ def open(self, path, mode="rb", block_size=None, cache_options=None, **kwargs): def ukey(self, path): """returns the CID, which is by definition an unchanging identitifer""" return self.info(path)["CID"] - - - -class GatewayTracer: - def __init__(self): - from collections import defaultdict - self.samples = defaultdict(list) - - def make_trace_config(self): - trace_config = aiohttp.TraceConfig() - trace_config.on_request_start.append(self.on_request_start) - trace_config.on_request_end.append(self.on_request_end) - return trace_config - - async def on_request_start(self, session, trace_config_ctx, params): - trace_config_ctx.start = asyncio.get_event_loop().time() - #print("Starting request") - - async def on_request_end(self, session, trace_config_ctx, params): - #print(trace_config_ctx) - trace_config_ctx.end = asyncio.get_event_loop().time() - elapsed = trace_config_ctx.end - trace_config_ctx.start - status = params.response.status - gateway = trace_config_ctx.trace_request_ctx.get("gateway", None) - self.samples[gateway].append({"url": params.url, "method": params.method, "elapsed": elapsed, "status": status}) - #print("Ending request") diff --git a/ipfsspec/tracing.py b/ipfsspec/tracing.py new file mode 100644 index 0000000..27f6aa2 --- /dev/null +++ b/ipfsspec/tracing.py @@ -0,0 +1,20 @@ +class GatewayTracer: + def __init__(self): + from collections import defaultdict + self.samples = defaultdict(list) + + def make_trace_config(self): + trace_config = aiohttp.TraceConfig() + trace_config.on_request_start.append(self.on_request_start) + trace_config.on_request_end.append(self.on_request_end) + return trace_config + + async def on_request_start(self, session, trace_config_ctx, params): + trace_config_ctx.start = asyncio.get_event_loop().time() + + async def on_request_end(self, session, trace_config_ctx, params): + trace_config_ctx.end = asyncio.get_event_loop().time() + elapsed = trace_config_ctx.end - trace_config_ctx.start + status = params.response.status + gateway = trace_config_ctx.trace_request_ctx.get("gateway", None) + self.samples[gateway].append({"url": params.url, "method": params.method, "elapsed": elapsed, "status": status}) From 36406a7a92eacd1606d332ba6be32ff688a9120a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Tue, 21 Sep 2021 18:26:26 +0200 Subject: [PATCH 12/32] flake8 --- ipfsspec/async_ipfs.py | 7 +++---- ipfsspec/tracing.py | 6 +++++- test/test_async.py | 23 ++++++++++++++--------- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index 2f386c3..15fadc3 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -1,10 +1,11 @@ import io -import asyncio import weakref import aiohttp from fsspec.asyn import AsyncFileSystem, sync, sync_wrapper +from fsspec.exceptions import FSTimeoutError + class AsyncIPFSGateway: resolution = "path" @@ -34,7 +35,6 @@ async def file_info(self, path, session): headers = {"Accept-Encoding": "identity"} # this ensures correct file size if self.resolution == "path": res = await session.head(self.url + "/ipfs/" + path, trace_request_ctx={'gateway': self.url}, headers=headers) - #print(res) elif self.resolution == "subdomain": raise NotImplementedError("subdomain resolution is not yet implemented") else: @@ -63,8 +63,6 @@ async def file_info(self, path, session): return info async def cat(self, path, session): - info = {"name": path} - if self.resolution == "path": res = await session.get(self.url + "/ipfs/" + path, trace_request_ctx={'gateway': self.url}) elif self.resolution == "subdomain": @@ -106,6 +104,7 @@ def _raise_not_found_for_status(self, response, url): async def get_client(**kwargs): return aiohttp.ClientSession(**kwargs) + class AsyncIPFSFileSystem(AsyncFileSystem): sep = "/" protocol = "aipfs" diff --git a/ipfsspec/tracing.py b/ipfsspec/tracing.py index 27f6aa2..37adb79 100644 --- a/ipfsspec/tracing.py +++ b/ipfsspec/tracing.py @@ -1,3 +1,7 @@ +import asyncio +import aiohttp + + class GatewayTracer: def __init__(self): from collections import defaultdict @@ -8,7 +12,7 @@ def make_trace_config(self): trace_config.on_request_start.append(self.on_request_start) trace_config.on_request_end.append(self.on_request_end) return trace_config - + async def on_request_start(self, session, trace_config_ctx, params): trace_config_ctx.start = asyncio.get_event_loop().time() diff --git a/test/test_async.py b/test/test_async.py index 16dadd7..3ff21b1 100644 --- a/test/test_async.py +++ b/test/test_async.py @@ -20,6 +20,7 @@ async def test_different_file_representations(filename, gw_host): content = await gw.cat(path, session) assert content == REF_CONTENT + @pytest.mark.parametrize("gw_host", ["http://127.0.0.1:8080"]) @pytest.mark.asyncio async def test_get_cid_of_folder(gw_host): @@ -28,9 +29,10 @@ async def test_get_cid_of_folder(gw_host): info = await gw.file_info(TEST_ROOT, session) assert info["CID"] == TEST_ROOT + @pytest.mark.asyncio async def test_ls(event_loop): - AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop + AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop fs = AsyncIPFSFileSystem(asynchronous=True, loop=event_loop) res = await fs._ls(TEST_ROOT, detail=False) assert res == [TEST_ROOT + fs.sep + fn for fn in TEST_FILENAMES] @@ -38,31 +40,34 @@ async def test_ls(event_loop): assert [r["name"] for r in res] == [TEST_ROOT + fs.sep + fn for fn in TEST_FILENAMES] assert all([r["size"] == len(REF_CONTENT) for r in res]) + @pytest.mark.asyncio async def test_cat_file(event_loop): - AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop + AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop fs = AsyncIPFSFileSystem(asynchronous=True, loop=event_loop) res = await fs._cat_file(TEST_ROOT + "/default") assert res == REF_CONTENT res = await fs._cat_file(TEST_ROOT + "/default", start=3, end=7) assert res == REF_CONTENT[3:7] + @pytest.mark.asyncio async def test_exists(event_loop): - AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop + AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop fs = AsyncIPFSFileSystem(asynchronous=True, loop=event_loop) res = await fs._exists(TEST_ROOT + "/default") - assert res == True + assert res is True res = await fs._exists(TEST_ROOT + "/missing") - assert res == False + assert res is False res = await fs._exists("/missing") - assert res == False + assert res is False + @pytest.mark.asyncio async def test_isfile(event_loop): - AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop + AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop fs = AsyncIPFSFileSystem(asynchronous=True, loop=event_loop) res = await fs._isfile(TEST_ROOT + "/default") - assert res == True + assert res is True res = await fs._isfile(TEST_ROOT) - assert res == False + assert res is False From a152990fba32246c7f62a8b62492d39bec086a6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Tue, 21 Sep 2021 19:59:11 +0200 Subject: [PATCH 13/32] ship IPFS testdata as car --- .github/workflows/local_gateway.yml | 1 + test/testdata.car | Bin 0 -> 2193 bytes 2 files changed, 1 insertion(+) create mode 100644 test/testdata.car diff --git a/.github/workflows/local_gateway.yml b/.github/workflows/local_gateway.yml index f358fd0..bc7cf4d 100644 --- a/.github/workflows/local_gateway.yml +++ b/.github/workflows/local_gateway.yml @@ -34,6 +34,7 @@ jobs: ipfs init --profile server ipfs daemon 2>ipfs.log | grep -i -o -m1 'Daemon is ready' & tail -f --pid=$! ipfs.log ipfs cat /ipfs/QmQPeNsJPyVWPFDVHb77w8G42Fvo15z4bG2X8D2GhfbSXc/readme + ipfs dag import test/testdata.car - name: Test with pytest run: | pip install pytest pytest-asyncio diff --git a/test/testdata.car b/test/testdata.car new file mode 100644 index 0000000000000000000000000000000000000000..6e1a0e9cf8a2a098bc8637a809f5048ce876e888 GIT binary patch literal 2193 zcmcCmlvf5hqwo7Q9oMHX#sPvvWoBMK3Zn@A` zlU|luRGgWg$2gOTuwo&7E+rv_)tT28USLqR+9R*Pnj|p&yv%y_RT7>z#2ijqKgj&K z{-zLnN@`kSX-K<{-LbyaCp?oe#Q4M7ypf1efPfcCrd#g z)?A?KOo=zlLb_ZkjG;mb3QO5Jy$%X3pK*HO=1p@uv#mMrt7cxi^H1+s$_n-CQCEbR zixSHvgoKPh#=m`ij8E)3ANQUo>#eMQ{0coaZQ^$IxNZA)y*~ZfZth7*Ax@y;c!(9- zKvn?tM3k-Eepjp1mgfAzmM9_C@}kU=R0%gOCJsg$ z!hYtG;9wFG$}C7LE-pw-Rwzj=E>TEHEJ>6Q+QLjk0BC^Rv~`ie)9Kd|%oTU@Oq-(D zT-@_u(~g_vGLf%mtvi^-HubF#g9MilSmpivZ_LpO=h7x8tI~+1c``KAr`AJn@*JgZ}rtHF)Id4BgCD$3F z&5K%P_i}Jphb(i8{PyOr!n*0}8il3R+I+ZPsQpH$?3B9tWpZHorMsF|7go$z_98!T zTg&boCgIg}39D@r`il`Nl|+ATH#{7%-_%6R)TMT_>AWw_VFLA0-w$uysn)iAiZnW%kw88}}@p zYBBHppU2-w@koP@dQ(-!A3g4z=#{hL$E*%1USzd*dow@9 z&0&}?*LUFOzkLf|<;MM&3R-S+No>P$-^_OxDv;G4*dxdBv&33A;OfVOQs(`BEwes6 z*Zi|$T4406sj&r%?U2=W{r(<&?W;(>Wb@uV8(qBqWF%)>a94yzul7-1XLU1{2U+c^ zh)vRr*LJL&BNY)gO?TJfoYJn!J6z?59tfY)v3zr<3t4Shr1sormzyk>YMBMLRTMrw zR`AS-tM5xz%hpK#PUFvDH{i;XYM|&R6hWB Date: Tue, 21 Sep 2021 20:02:40 +0200 Subject: [PATCH 14/32] CI: change back directory --- .github/workflows/local_gateway.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/local_gateway.yml b/.github/workflows/local_gateway.yml index bc7cf4d..b5a9e37 100644 --- a/.github/workflows/local_gateway.yml +++ b/.github/workflows/local_gateway.yml @@ -27,9 +27,10 @@ jobs: run: | wget https://dist.ipfs.io/go-ipfs/v${{ matrix.ipfs-version }}/go-ipfs_v${{ matrix.ipfs-version }}_linux-amd64.tar.gz tar -xvzf go-ipfs_v${{ matrix.ipfs-version }}_linux-amd64.tar.gz - cd go-ipfs + pushd go-ipfs sudo bash install.sh sudo sysctl -w net.core.rmem_max=2500000 + popd ipfs --version ipfs init --profile server ipfs daemon 2>ipfs.log | grep -i -o -m1 'Daemon is ready' & tail -f --pid=$! ipfs.log From bdf0bfd24e0c508ef2bf27fbebc0c10e5a573941 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Wed, 17 Nov 2021 11:12:25 +0100 Subject: [PATCH 15/32] async: added gateway switching facility --- ipfsspec/async_ipfs.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index 15fadc3..511cb05 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -105,6 +105,20 @@ async def get_client(**kwargs): return aiohttp.ClientSession(**kwargs) +DEFAULT_GATEWAY = None + +def get_gateway(): + global DEFAULT_GATEWAY + if DEFAULT_GATEWAY is None: + use_gateway("http://127.0.0.1:8080") + return DEFAULT_GATEWAY + + +def use_gateway(url): + global DEFAULT_GATEWAY + DEFAULT_GATEWAY = AsyncIPFSGateway(url) + + class AsyncIPFSFileSystem(AsyncFileSystem): sep = "/" protocol = "aipfs" @@ -116,11 +130,13 @@ def __init__(self, asynchronous=False, loop=None, client_kwargs=None, **storage_ self.client_kwargs = client_kwargs or {} self.get_client = get_client - self.gateway = AsyncIPFSGateway("http://127.0.0.1:8080") - if not asynchronous: sync(self.loop, self.set_session) + @property + def gateway(self): + return get_gateway() + @staticmethod def close_session(loop, session): if loop is not None and loop.is_running(): From 2be1e7cfb940f103b5e99d5ea142aee9deac9644 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Wed, 9 Mar 2022 16:41:01 +0100 Subject: [PATCH 16/32] refactor: separated AsyncIPFSGatewayBase --- ipfsspec/async_ipfs.py | 69 +++++++++++++++++++++++------------------- 1 file changed, 38 insertions(+), 31 deletions(-) diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index 511cb05..d27de72 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -7,23 +7,7 @@ from fsspec.exceptions import FSTimeoutError -class AsyncIPFSGateway: - resolution = "path" - - def __init__(self, url): - self.url = url - - async def api_get(self, endpoint, session, **kwargs): - return await session.get(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url}) - - async def api_post(self, endpoint, session, **kwargs): - return await session.post(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url}) - - async def version(self, session): - res = await self.api_get("version", session) - res.raise_for_status() - return await res.json() - +class AsyncIPFSGatewayBase: async def stat(self, path, session): res = await self.api_get("files/stat", session, arg=path) res.raise_for_status() @@ -33,12 +17,7 @@ async def file_info(self, path, session): info = {"name": path} headers = {"Accept-Encoding": "identity"} # this ensures correct file size - if self.resolution == "path": - res = await session.head(self.url + "/ipfs/" + path, trace_request_ctx={'gateway': self.url}, headers=headers) - elif self.resolution == "subdomain": - raise NotImplementedError("subdomain resolution is not yet implemented") - else: - raise NotImplementedError(f"'{self.resolution}' resolution is not known") + res = await self.cid_head(path, session, headers=headers) async with res: self._raise_not_found_for_status(res, path) @@ -63,17 +42,10 @@ async def file_info(self, path, session): return info async def cat(self, path, session): - if self.resolution == "path": - res = await session.get(self.url + "/ipfs/" + path, trace_request_ctx={'gateway': self.url}) - elif self.resolution == "subdomain": - raise NotImplementedError("subdomain resolution is not yet implemented") - else: - raise NotImplementedError(f"'{self.resolution}' resolution is not known") - + res = await self.cid_get(path, session) async with res: self._raise_not_found_for_status(res, path) if res.status != 200: - # TODO: maybe handle 301 here raise FileNotFoundError(path) return await res.read() @@ -101,6 +73,41 @@ def _raise_not_found_for_status(self, response, url): response.raise_for_status() +class AsyncIPFSGateway(AsyncIPFSGatewayBase): + resolution = "path" + + def __init__(self, url): + self.url = url + + async def api_get(self, endpoint, session, **kwargs): + return await session.get(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url}) + + async def api_post(self, endpoint, session, **kwargs): + return await session.post(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url}) + + async def _cid_req(self, method, path, headers=None, **kwargs): + headers = headers or {} + if self.resolution == "path": + res = await method(self.url + "/ipfs/" + path, trace_request_ctx={'gateway': self.url}, headers=headers) + elif self.resolution == "subdomain": + raise NotImplementedError("subdomain resolution is not yet implemented") + else: + raise NotImplementedError(f"'{self.resolution}' resolution is not known") + # TODO: maybe handle 301 here + return res + + async def cid_head(self, path, session, headers=None, **kwargs): + return await self._cid_req(session.head, path, headers=headers, **kwargs) + + async def cid_get(self, path, session, headers=None, **kwargs): + return await self._cid_req(session.get, path, headers=headers, **kwargs) + + async def version(self, session): + res = await self.api_get("version", session) + res.raise_for_status() + return await res.json() + + async def get_client(**kwargs): return aiohttp.ClientSession(**kwargs) From 5f92fc129f036a76fe77b950982b2dfa7dc76e76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Thu, 10 Mar 2022 20:42:25 +0100 Subject: [PATCH 17/32] async: added MultiGateway --- ipfsspec/async_ipfs.py | 38 +++++++++++++++++++++++++++++++++++--- test/test_async.py | 23 ++++++++++++++++++++++- 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index d27de72..5797a98 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -6,6 +6,8 @@ from fsspec.asyn import AsyncFileSystem, sync, sync_wrapper from fsspec.exceptions import FSTimeoutError +from .core import get_default_gateways + class AsyncIPFSGatewayBase: async def stat(self, path, session): @@ -108,6 +110,33 @@ async def version(self, session): return await res.json() +class MultiGateway(AsyncIPFSGatewayBase): + def __init__(self, gws): + self.gws = gws + + async def _gw_op(self, op): + exception = None + for gw in self.gws: + try: + return await op(gw) + except IOError as e: + exception = e + continue + raise exception + + async def api_get(self, endpoint, session, **kwargs): + return await self._gw_op(lambda gw: gw.api_get(endpoint, session, **kwargs)) + + async def api_post(self, endpoint, session, **kwargs): + return await self._gw_op(lambda gw: gw.api_post(endpoint, session, **kwargs)) + + async def cid_head(self, path, session, headers=None, **kwargs): + return await self._gw_op(lambda gw: gw.cid_head(path, session, headers=headers, **kwargs)) + + async def cid_get(self, path, session, headers=None, **kwargs): + return await self._gw_op(lambda gw: gw.cid_get(path, session, headers=headers, **kwargs)) + + async def get_client(**kwargs): return aiohttp.ClientSession(**kwargs) @@ -117,13 +146,16 @@ async def get_client(**kwargs): def get_gateway(): global DEFAULT_GATEWAY if DEFAULT_GATEWAY is None: - use_gateway("http://127.0.0.1:8080") + use_gateway(*get_default_gateways()) return DEFAULT_GATEWAY -def use_gateway(url): +def use_gateway(*urls): global DEFAULT_GATEWAY - DEFAULT_GATEWAY = AsyncIPFSGateway(url) + if len(urls) == 1: + DEFAULT_GATEWAY = AsyncIPFSGateway(urls[0]) + else: + DEFAULT_GATEWAY = MultiGateway([AsyncIPFSGateway(url) for url in urls]) class AsyncIPFSFileSystem(AsyncFileSystem): diff --git a/test/test_async.py b/test/test_async.py index 3ff21b1..b452c89 100644 --- a/test/test_async.py +++ b/test/test_async.py @@ -1,11 +1,16 @@ import pytest -from ipfsspec.async_ipfs import AsyncIPFSGateway, AsyncIPFSFileSystem +from ipfsspec.async_ipfs import AsyncIPFSGateway, MultiGateway, AsyncIPFSFileSystem import aiohttp TEST_ROOT = "QmW3CrGFuFyF3VH1wvrap4Jend5NRTgtESDjuQ7QhHD5dd" REF_CONTENT = b'ipfsspec test data' TEST_FILENAMES = ["default", "multi", "raw", "raw_multi", "write"] +@pytest.fixture +async def session(): + async with aiohttp.ClientSession() as session: + yield session + @pytest.mark.parametrize("gw_host", ["http://127.0.0.1:8080"]) @pytest.mark.parametrize("filename", TEST_FILENAMES) @@ -30,6 +35,22 @@ async def test_get_cid_of_folder(gw_host): assert info["CID"] == TEST_ROOT +@pytest.mark.parametrize("gw_hosts", [ + ["http://127.0.0.1:8080"], + ["http://127.0.0.1:9999", "http://127.0.0.1:8080"], + ["http://127.0.0.1:8080", "http://127.0.0.1:9999"], + ["https://ipfs.io", "http://127.0.0.1:8080"], + ["http://127.0.0.1:8080", "https://ipfs.io"], +]) +@pytest.mark.asyncio +async def test_multi_gw_cat(gw_hosts, session): + gws = [AsyncIPFSGateway(gw_host) for gw_host in gw_hosts] + gw = MultiGateway(gws) + + res = await gw.cat(TEST_ROOT + "/default", session) + assert res == REF_CONTENT + + @pytest.mark.asyncio async def test_ls(event_loop): AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop From cd277115774456a75dbb2778b19ba768b993ef85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Thu, 10 Mar 2022 20:50:07 +0100 Subject: [PATCH 18/32] refactor async tests: use session fixture --- test/test_async.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/test/test_async.py b/test/test_async.py index b452c89..083e28c 100644 --- a/test/test_async.py +++ b/test/test_async.py @@ -15,24 +15,24 @@ async def session(): @pytest.mark.parametrize("gw_host", ["http://127.0.0.1:8080"]) @pytest.mark.parametrize("filename", TEST_FILENAMES) @pytest.mark.asyncio -async def test_different_file_representations(filename, gw_host): +async def test_different_file_representations(filename, gw_host, session): gw = AsyncIPFSGateway(gw_host) - async with aiohttp.ClientSession() as session: - path = TEST_ROOT + "/" + filename - info = await gw.file_info(path, session) - assert info["size"] == len(REF_CONTENT) - assert info["type"] == "file" - content = await gw.cat(path, session) - assert content == REF_CONTENT + + path = TEST_ROOT + "/" + filename + info = await gw.file_info(path, session) + assert info["size"] == len(REF_CONTENT) + assert info["type"] == "file" + content = await gw.cat(path, session) + assert content == REF_CONTENT @pytest.mark.parametrize("gw_host", ["http://127.0.0.1:8080"]) @pytest.mark.asyncio -async def test_get_cid_of_folder(gw_host): +async def test_get_cid_of_folder(gw_host, session): gw = AsyncIPFSGateway(gw_host) - async with aiohttp.ClientSession() as session: - info = await gw.file_info(TEST_ROOT, session) - assert info["CID"] == TEST_ROOT + + info = await gw.file_info(TEST_ROOT, session) + assert info["CID"] == TEST_ROOT @pytest.mark.parametrize("gw_hosts", [ From 2a8da0cf359a10b986238d86b6741f50b5075dd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Fri, 11 Mar 2022 20:43:51 +0100 Subject: [PATCH 19/32] async: enable backoff --- ipfsspec/async_ipfs.py | 81 ++++++++++++++++++++++++++++++++----- test/test_async_fallback.py | 71 ++++++++++++++++++++++++++++++++ 2 files changed, 142 insertions(+), 10 deletions(-) create mode 100644 test/test_async_fallback.py diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index 5797a98..be64d66 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -1,6 +1,8 @@ import io +import time import weakref +import asyncio import aiohttp from fsspec.asyn import AsyncFileSystem, sync, sync_wrapper @@ -9,10 +11,15 @@ from .core import get_default_gateways +class RequestsTooQuick(OSError): + def __init__(self, retry_after=None): + self.retry_after = retry_after + + class AsyncIPFSGatewayBase: async def stat(self, path, session): res = await self.api_get("files/stat", session, arg=path) - res.raise_for_status() + self._raise_not_found_for_status(res, path) return await res.json() async def file_info(self, path, session): @@ -72,6 +79,12 @@ def _raise_not_found_for_status(self, response, url): raise FileNotFoundError(url) elif response.status == 400: raise FileNotFoundError(url) + elif response.status == 429: + if "retry-after" in response.headers: + retry_after = int(response.headers["retry-after"]) + else: + retry_after = None + raise RequestsTooQuick(retry_after) response.raise_for_status() @@ -109,20 +122,62 @@ async def version(self, session): res.raise_for_status() return await res.json() + def __str__(self): + return f"GW({self.url})" + + +class GatewayState: + def __init__(self): + self.next_request_time = 0 + self.backoff_time = 0 + self.start_backoff = 1e-5 + self.max_backoff = 5 + + def schedule_next(self): + self.next_request_time = time.monotonic() + self.backoff_time + + def backoff(self): + if self.backoff_time < self.start_backoff: + self.backoff_time = self.start_backoff + else: + self.backoff_time *= 2 + self.schedule_next() + + def speedup(self): + self.backoff_time = max(0, self.backoff_time * 0.9) + self.schedule_next() + + def broken(self): + self.backoff_time = self.max_backoff + class MultiGateway(AsyncIPFSGatewayBase): - def __init__(self, gws): - self.gws = gws + def __init__(self, gws, max_backoff_rounds=50): + self.gws = [(GatewayState(), gw) for gw in gws] + self.max_backoff_rounds = max_backoff_rounds async def _gw_op(self, op): exception = None - for gw in self.gws: - try: - return await op(gw) - except IOError as e: - exception = e - continue - raise exception + for _ in range(self.max_backoff_rounds): + now = time.monotonic() + for state, gw in sorted(self.gws, key=lambda x: max(now, x[0].next_request_time)): + now = time.monotonic() + if state.next_request_time > now: + await asyncio.sleep(state.next_request_time - now) + try: + res = await op(gw) + state.speedup() + return res + except RequestsTooQuick as e: + state.backoff() + break + except IOError as e: + exception = e + state.broken() + continue + else: + raise exception + raise RequestsTooQuick() async def api_get(self, endpoint, session, **kwargs): return await self._gw_op(lambda gw: gw.api_get(endpoint, session, **kwargs)) @@ -136,6 +191,12 @@ async def cid_head(self, path, session, headers=None, **kwargs): async def cid_get(self, path, session, headers=None, **kwargs): return await self._gw_op(lambda gw: gw.cid_get(path, session, headers=headers, **kwargs)) + def state_report(self): + return "\n".join(f"{s.next_request_time}, {gw}" for s, gw in self.gws) + + def __str__(self): + return "Multi-GW(" + ", ".join(str(gw) for _, gw in self.gws) + ")" + async def get_client(**kwargs): return aiohttp.ClientSession(**kwargs) diff --git a/test/test_async_fallback.py b/test/test_async_fallback.py new file mode 100644 index 0000000..31c4835 --- /dev/null +++ b/test/test_async_fallback.py @@ -0,0 +1,71 @@ +import pytest +import time + +from ipfsspec.async_ipfs import MultiGateway, AsyncIPFSGatewayBase, RequestsTooQuick + +class MockGateway(AsyncIPFSGatewayBase): + def __init__(self, objects): + self.objects = objects + + async def cid_get(self, path, session, headers=None, **kwargs): + try: + return self.objects[path] + except KeyError: + raise FileNotFoundError(path) + +class RateLimitedMockGateway(AsyncIPFSGatewayBase): + def __init__(self, max_rate, base, report_time=True): + self.request_count = 0 + self.next_allowed_request = time.monotonic() + + self.max_rate = max_rate + self.base = base + self.report_time = report_time + + def _rate_limit(self): + self.request_count += 1 + now = time.monotonic() + if now <= self.next_allowed_request: + raise RequestsTooQuick(self.next_allowed_request - now if self.report_time else None) + else: + self.next_allowed_request = now + self.max_rate + + async def cid_get(self, path, session, headers=None, **kwargs): + self._rate_limit() + return await self.base.cid_get(path, session, headers=headers, **kwargs) + + +@pytest.fixture +def session(): + return None + + +@pytest.mark.asyncio +async def test_backoff(session): + base = MockGateway({ + "QmTz3oc4gdpRMKP2sdGUPZTAGRngqjsi99BPoztyP53JMM": "bar", + }) + gws = [RateLimitedMockGateway(0.01, base)] + gw = MultiGateway(gws) + + for _ in range(50): + obj = await gw.cid_get("QmTz3oc4gdpRMKP2sdGUPZTAGRngqjsi99BPoztyP53JMM", session) + assert obj == "bar" + assert 50 <= gws[0].request_count < 240 + + +@pytest.mark.asyncio +async def test_backoff_use_faster_server(session): + base = MockGateway({ + "QmTz3oc4gdpRMKP2sdGUPZTAGRngqjsi99BPoztyP53JMM": "zapp", + }) + gws = [ + RateLimitedMockGateway(0.1, base), + RateLimitedMockGateway(0.01, base) + ] + gw = MultiGateway(gws) + + for _ in range(100): + obj = await gw.cid_get("QmTz3oc4gdpRMKP2sdGUPZTAGRngqjsi99BPoztyP53JMM", session) + assert obj == "zapp" + assert gws[0].request_count < gws[1].request_count From b6b7d418e1ad45be467a9a4d70b131b0ba219fac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Fri, 11 Mar 2022 20:52:06 +0100 Subject: [PATCH 20/32] make the async implementation the default --- ipfsspec/__init__.py | 2 +- ipfsspec/async_ipfs.py | 2 +- setup.py | 2 +- test/test_gateway.py | 7 ++++--- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/ipfsspec/__init__.py b/ipfsspec/__init__.py index 7ee15ca..81b482a 100644 --- a/ipfsspec/__init__.py +++ b/ipfsspec/__init__.py @@ -6,7 +6,7 @@ __version__ = get_versions()['version'] del get_versions -register_implementation(IPFSFileSystem.protocol, IPFSFileSystem) +#register_implementation(IPFSFileSystem.protocol, IPFSFileSystem) register_implementation(AsyncIPFSFileSystem.protocol, AsyncIPFSFileSystem) __all__ = ["__version__", "IPFSFileSystem", "AsyncIPFSFileSystem"] diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index be64d66..f1fc6df 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -221,7 +221,7 @@ def use_gateway(*urls): class AsyncIPFSFileSystem(AsyncFileSystem): sep = "/" - protocol = "aipfs" + protocol = "ipfs" def __init__(self, asynchronous=False, loop=None, client_kwargs=None, **storage_options): super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options) diff --git a/setup.py b/setup.py index 4dfcef8..1c9fdaf 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ ], entry_points={ 'fsspec.specs': [ - 'ipfs=ipfsspec.IPFSFileSystem', + 'ipfs=ipfsspec.AsyncIPFSFileSystem', ], }, ) diff --git a/test/test_gateway.py b/test/test_gateway.py index 6d5d4ed..5cad767 100644 --- a/test/test_gateway.py +++ b/test/test_gateway.py @@ -4,6 +4,7 @@ from mockserver import mock_servers import fsspec +from ipfsspec.core import IPFSFileSystem from ipfsspec.unixfs_pb2 import Data as UnixFSData from http.server import BaseHTTPRequestHandler @@ -117,7 +118,7 @@ def test_backoff(): {"QmTz3oc4gdpRMKP2sdGUPZTAGRngqjsi99BPoztyP53JMM": "bar"}), ] with mock_servers(handlers) as gateways: - fs = fsspec.filesystem("ipfs", gateways=gateways, timeout=1) + fs = IPFSFileSystem(gateways=gateways, timeout=1) for _ in range(50): with fs.open("QmTz3oc4gdpRMKP2sdGUPZTAGRngqjsi99BPoztyP53JMM") as f: assert f.read().decode("utf-8") == "bar" @@ -134,7 +135,7 @@ def test_backoff_use_faster_server(): {"QmaBoSxocSYTx1WLw55NJ2rEkvt5WTJxxuUSCWtabHurqE": "zapp"}), ] with mock_servers(handlers) as gateways: - fs = fsspec.filesystem("ipfs", gateways=gateways, timeout=1) + fs = IPFSFileSystem(gateways=gateways, timeout=1) for _ in range(100): with fs.open("QmaBoSxocSYTx1WLw55NJ2rEkvt5WTJxxuUSCWtabHurqE") as f: assert f.read().decode("utf-8") == "zapp" @@ -149,6 +150,6 @@ def test_fallback_if_incomplete(): {"QmWLdkp93sNxGRjnFHPaYg8tCQ35NBY3XPn6KiETd3Z4WR": "baz"}), ] with mock_servers(handlers) as gateways: - fs = fsspec.filesystem("ipfs", gateways=gateways, timeout=1) + fs = IPFSFileSystem(gateways=gateways, timeout=1) with fs.open("QmWLdkp93sNxGRjnFHPaYg8tCQ35NBY3XPn6KiETd3Z4WR") as f: assert f.read().decode("utf-8") == "baz" From 9fb6159ac810e2103dbe63a36f7b61cab9538e94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Fri, 11 Mar 2022 21:27:10 +0100 Subject: [PATCH 21/32] async: improved retry algorithm --- ipfsspec/async_ipfs.py | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index f1fc6df..fa659e1 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -10,6 +10,10 @@ from .core import get_default_gateways +import logging + +logger = logging.getLogger("ipfsspec") + class RequestsTooQuick(OSError): def __init__(self, retry_after=None): @@ -128,6 +132,7 @@ def __str__(self): class GatewayState: def __init__(self): + self.reachable = True self.next_request_time = 0 self.backoff_time = 0 self.start_backoff = 1e-5 @@ -141,14 +146,25 @@ def backoff(self): self.backoff_time = self.start_backoff else: self.backoff_time *= 2 + self.reachable = True self.schedule_next() - def speedup(self): - self.backoff_time = max(0, self.backoff_time * 0.9) + def speedup(self, not_below=0): + did_speed_up = False + if self.backoff_time > not_below: + self.backoff_time *= 0.9 + did_speed_up = True + self.reachable = True self.schedule_next() + return did_speed_up def broken(self): self.backoff_time = self.max_backoff + self.reachable = False + self.schedule_next() + + def trying_to_reach(self): + self.next_request_time = time.monotonic() + 1 class MultiGateway(AsyncIPFSGatewayBase): @@ -161,19 +177,25 @@ async def _gw_op(self, op): for _ in range(self.max_backoff_rounds): now = time.monotonic() for state, gw in sorted(self.gws, key=lambda x: max(now, x[0].next_request_time)): + if not state.reachable: + state.trying_to_reach() now = time.monotonic() if state.next_request_time > now: await asyncio.sleep(state.next_request_time - now) + logger.debug("tring %s", gw) try: res = await op(gw) - state.speedup() + if state.speedup(time.monotonic() - now): + logger.debug("%s speedup", gw) return res except RequestsTooQuick as e: state.backoff() + logger.debug("%s backoff", gw) break except IOError as e: exception = e state.broken() + logger.debug("%s broken", gw) continue else: raise exception @@ -199,6 +221,8 @@ def __str__(self): async def get_client(**kwargs): + timeout = aiohttp.ClientTimeout(sock_connect=1, sock_read=5) + kwags = {"timeout": timeout, **kwargs} return aiohttp.ClientSession(**kwargs) From 42598a0ade9428ca1f9a21e2a1c9ef6aaf85f7f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Fri, 11 Mar 2022 21:38:54 +0100 Subject: [PATCH 22/32] CI: run tests with only default gateways --- ipfsspec/__init__.py | 2 +- ipfsspec/async_ipfs.py | 5 +++-- test/test_async.py | 4 ++++ test/test_async_fallback.py | 2 ++ test/test_gateway.py | 1 - 5 files changed, 10 insertions(+), 4 deletions(-) diff --git a/ipfsspec/__init__.py b/ipfsspec/__init__.py index 81b482a..f77cc28 100644 --- a/ipfsspec/__init__.py +++ b/ipfsspec/__init__.py @@ -6,7 +6,7 @@ __version__ = get_versions()['version'] del get_versions -#register_implementation(IPFSFileSystem.protocol, IPFSFileSystem) +# register_implementation(IPFSFileSystem.protocol, IPFSFileSystem) register_implementation(AsyncIPFSFileSystem.protocol, AsyncIPFSFileSystem) __all__ = ["__version__", "IPFSFileSystem", "AsyncIPFSFileSystem"] diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index fa659e1..b5b52f3 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -188,7 +188,7 @@ async def _gw_op(self, op): if state.speedup(time.monotonic() - now): logger.debug("%s speedup", gw) return res - except RequestsTooQuick as e: + except RequestsTooQuick: state.backoff() logger.debug("%s backoff", gw) break @@ -222,12 +222,13 @@ def __str__(self): async def get_client(**kwargs): timeout = aiohttp.ClientTimeout(sock_connect=1, sock_read=5) - kwags = {"timeout": timeout, **kwargs} + kwargs = {"timeout": timeout, **kwargs} return aiohttp.ClientSession(**kwargs) DEFAULT_GATEWAY = None + def get_gateway(): global DEFAULT_GATEWAY if DEFAULT_GATEWAY is None: diff --git a/test/test_async.py b/test/test_async.py index 083e28c..c5711df 100644 --- a/test/test_async.py +++ b/test/test_async.py @@ -6,12 +6,14 @@ REF_CONTENT = b'ipfsspec test data' TEST_FILENAMES = ["default", "multi", "raw", "raw_multi", "write"] + @pytest.fixture async def session(): async with aiohttp.ClientSession() as session: yield session +@pytest.mark.local_gw @pytest.mark.parametrize("gw_host", ["http://127.0.0.1:8080"]) @pytest.mark.parametrize("filename", TEST_FILENAMES) @pytest.mark.asyncio @@ -26,6 +28,7 @@ async def test_different_file_representations(filename, gw_host, session): assert content == REF_CONTENT +@pytest.mark.local_gw @pytest.mark.parametrize("gw_host", ["http://127.0.0.1:8080"]) @pytest.mark.asyncio async def test_get_cid_of_folder(gw_host, session): @@ -35,6 +38,7 @@ async def test_get_cid_of_folder(gw_host, session): assert info["CID"] == TEST_ROOT +@pytest.mark.local_gw @pytest.mark.parametrize("gw_hosts", [ ["http://127.0.0.1:8080"], ["http://127.0.0.1:9999", "http://127.0.0.1:8080"], diff --git a/test/test_async_fallback.py b/test/test_async_fallback.py index 31c4835..b2f9a67 100644 --- a/test/test_async_fallback.py +++ b/test/test_async_fallback.py @@ -3,6 +3,7 @@ from ipfsspec.async_ipfs import MultiGateway, AsyncIPFSGatewayBase, RequestsTooQuick + class MockGateway(AsyncIPFSGatewayBase): def __init__(self, objects): self.objects = objects @@ -13,6 +14,7 @@ async def cid_get(self, path, session, headers=None, **kwargs): except KeyError: raise FileNotFoundError(path) + class RateLimitedMockGateway(AsyncIPFSGatewayBase): def __init__(self, max_rate, base, report_time=True): self.request_count = 0 diff --git a/test/test_gateway.py b/test/test_gateway.py index 5cad767..757aa72 100644 --- a/test/test_gateway.py +++ b/test/test_gateway.py @@ -3,7 +3,6 @@ import base64 from mockserver import mock_servers -import fsspec from ipfsspec.core import IPFSFileSystem from ipfsspec.unixfs_pb2 import Data as UnixFSData From 65fd970f320e3f311dc474fe712762e8eece8ad6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Fri, 11 Mar 2022 21:43:29 +0100 Subject: [PATCH 23/32] CI: enable more tests --- .github/workflows/local_gateway.yml | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/workflows/local_gateway.yml b/.github/workflows/local_gateway.yml index b5a9e37..e0e2c69 100644 --- a/.github/workflows/local_gateway.yml +++ b/.github/workflows/local_gateway.yml @@ -9,8 +9,11 @@ jobs: strategy: max-parallel: 4 matrix: - python-version: [3.9] - ipfs-version: ["0.9.1"] + python-version: ["3.8", "3.9", "3.10"] + ipfs-version: ["0.12.0"] + include: + - python-version: "3.10" + ipfs-version: "0.9.1" env: IPFSSPEC_GATEWAYS: "http://127.0.0.1:8080" # use only localhost as gateway steps: From c6854bc369e1aaa320bd3782b90d640a3dc9c9e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Fri, 11 Mar 2022 21:47:11 +0100 Subject: [PATCH 24/32] CI: enable tests without local gateway --- .github/workflows/default_gateways.yml | 34 ++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 .github/workflows/default_gateways.yml diff --git a/.github/workflows/default_gateways.yml b/.github/workflows/default_gateways.yml new file mode 100644 index 0000000..03c0bfe --- /dev/null +++ b/.github/workflows/default_gateways.yml @@ -0,0 +1,34 @@ +name: Test without local gateway + +on: [push] + +jobs: + test: + + runs-on: ubuntu-latest + strategy: + max-parallel: 4 + matrix: + python-version: [3.9] + + steps: + - uses: actions/checkout@v1 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v1 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e . + - name: Lint with flake8 + run: | + pip install flake8 + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + # The GitHub editor is 127 chars wide + flake8 . --count --max-complexity=10 --max-line-length=127 --statistics + - name: Test with pytest + run: | + pip install pytest + pytest -m "not local_gw" From 61860447e0ff997e7861278fe68d715d1f935fcf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Fri, 11 Mar 2022 21:49:20 +0100 Subject: [PATCH 25/32] CI: require pytest-asyncio for all tests --- .github/workflows/default_gateways.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/default_gateways.yml b/.github/workflows/default_gateways.yml index 03c0bfe..d94a6c0 100644 --- a/.github/workflows/default_gateways.yml +++ b/.github/workflows/default_gateways.yml @@ -30,5 +30,5 @@ jobs: flake8 . --count --max-complexity=10 --max-line-length=127 --statistics - name: Test with pytest run: | - pip install pytest + pip install pytest pytest-asyncio pytest -m "not local_gw" From 7aa60969f52b9134ed7856d8ad47b05b06135d7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Fri, 11 Mar 2022 21:57:13 +0100 Subject: [PATCH 26/32] handle ClientResponseErrors as backoff reason --- ipfsspec/async_ipfs.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index b5b52f3..b668548 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -192,6 +192,12 @@ async def _gw_op(self, op): state.backoff() logger.debug("%s backoff", gw) break + except aiohttp.ClientResponseError as e: + # for now, handle client response errors also as backoff + # maybe someone will come up with a better solution some times + state.backoff() + logger.debug("%s backoff due to response %d", gw, e.status) + break except IOError as e: exception = e state.broken() From 6548cad5a942704c68dce8a3346d1f7fdaafefc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Fri, 11 Mar 2022 21:57:29 +0100 Subject: [PATCH 27/32] pytest: register local_gw marker --- pytest.ini | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 pytest.ini diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..0e50de1 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +markers = + local_gw: marks tests requiring a local gateway (deselect with '-m "not local_gw"') From 2d4ea4839e3c4418e56e30f2d7c67451d6348927 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Fri, 11 Mar 2022 22:02:29 +0100 Subject: [PATCH 28/32] async: handle TimeoutError --- ipfsspec/async_ipfs.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index b668548..817ccbe 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -192,6 +192,10 @@ async def _gw_op(self, op): state.backoff() logger.debug("%s backoff", gw) break + except asyncio.TimeoutError: + state.backoff() + logger.debug("%s backoff due to timeout", gw) + break except aiohttp.ClientResponseError as e: # for now, handle client response errors also as backoff # maybe someone will come up with a better solution some times From 3af9cb3c810cdb35867dc7bdc8f879cfcc6a853f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Fri, 11 Mar 2022 22:11:31 +0100 Subject: [PATCH 29/32] reduced complexity of _gw_op --- ipfsspec/async_ipfs.py | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index 817ccbe..12e02b9 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -172,11 +172,14 @@ def __init__(self, gws, max_backoff_rounds=50): self.gws = [(GatewayState(), gw) for gw in gws] self.max_backoff_rounds = max_backoff_rounds + @property + def _gws_in_priority_order(self): + now = time.monotonic() + return sorted(self.gws, key=lambda x: max(now, x[0].next_request_time)) + async def _gw_op(self, op): - exception = None for _ in range(self.max_backoff_rounds): - now = time.monotonic() - for state, gw in sorted(self.gws, key=lambda x: max(now, x[0].next_request_time)): + for state, gw in self._gws_in_priority_order: if not state.reachable: state.trying_to_reach() now = time.monotonic() @@ -188,19 +191,9 @@ async def _gw_op(self, op): if state.speedup(time.monotonic() - now): logger.debug("%s speedup", gw) return res - except RequestsTooQuick: - state.backoff() - logger.debug("%s backoff", gw) - break - except asyncio.TimeoutError: - state.backoff() - logger.debug("%s backoff due to timeout", gw) - break - except aiohttp.ClientResponseError as e: - # for now, handle client response errors also as backoff - # maybe someone will come up with a better solution some times + except (RequestsTooQuick, aiohttp.ClientResponseError, asyncio.TimeoutError) as e: state.backoff() - logger.debug("%s backoff due to response %d", gw, e.status) + logger.debug("%s backoff %s", gw, e) break except IOError as e: exception = e From 84739609baead6028e83a71d340e99aaa878e6f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Fri, 11 Mar 2022 22:42:09 +0100 Subject: [PATCH 30/32] async: raise RequestsTooQuick within AsyncIPFSGateway --- ipfsspec/async_ipfs.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index 12e02b9..38b4f2e 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -83,12 +83,6 @@ def _raise_not_found_for_status(self, response, url): raise FileNotFoundError(url) elif response.status == 400: raise FileNotFoundError(url) - elif response.status == 429: - if "retry-after" in response.headers: - retry_after = int(response.headers["retry-after"]) - else: - retry_after = None - raise RequestsTooQuick(retry_after) response.raise_for_status() @@ -99,10 +93,14 @@ def __init__(self, url): self.url = url async def api_get(self, endpoint, session, **kwargs): - return await session.get(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url}) + res = await session.get(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url}) + self._raise_requests_too_quick(res) + return res async def api_post(self, endpoint, session, **kwargs): - return await session.post(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url}) + res = await session.post(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url}) + self._raise_requests_too_quick(res) + return res async def _cid_req(self, method, path, headers=None, **kwargs): headers = headers or {} @@ -113,6 +111,7 @@ async def _cid_req(self, method, path, headers=None, **kwargs): else: raise NotImplementedError(f"'{self.resolution}' resolution is not known") # TODO: maybe handle 301 here + self._raise_requests_too_quick(res) return res async def cid_head(self, path, session, headers=None, **kwargs): @@ -126,6 +125,15 @@ async def version(self, session): res.raise_for_status() return await res.json() + @staticmethod + def _raise_requests_too_quick(response): + if response.status == 429: + if "retry-after" in response.headers: + retry_after = int(response.headers["retry-after"]) + else: + retry_after = None + raise RequestsTooQuick(retry_after) + def __str__(self): return f"GW({self.url})" From 5286e26cc346c9abef57eed7c3f1c447938377eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Fri, 11 Mar 2022 22:46:53 +0100 Subject: [PATCH 31/32] async: schedule next request earlier to actually delay async requests --- ipfsspec/async_ipfs.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index 38b4f2e..ef1cbe6 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -188,11 +188,14 @@ def _gws_in_priority_order(self): async def _gw_op(self, op): for _ in range(self.max_backoff_rounds): for state, gw in self._gws_in_priority_order: + not_before = state.next_request_time if not state.reachable: state.trying_to_reach() + else: + state.schedule_next() now = time.monotonic() - if state.next_request_time > now: - await asyncio.sleep(state.next_request_time - now) + if not_before > now: + await asyncio.sleep(not_before - now) logger.debug("tring %s", gw) try: res = await op(gw) From 5c9bca41ee33ab69ff15bbd258b8e641e93583cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Fri, 11 Mar 2022 23:10:11 +0100 Subject: [PATCH 32/32] async: enable retry logic also for response bodies --- ipfsspec/async_ipfs.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index ef1cbe6..aad28c7 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -227,6 +227,12 @@ async def cid_head(self, path, session, headers=None, **kwargs): async def cid_get(self, path, session, headers=None, **kwargs): return await self._gw_op(lambda gw: gw.cid_get(path, session, headers=headers, **kwargs)) + async def cat(self, path, session): + return await self._gw_op(lambda gw: gw.cat(path, session)) + + async def ls(self, path, session): + return await self._gw_op(lambda gw: gw.ls(path, session)) + def state_report(self): return "\n".join(f"{s.next_request_time}, {gw}" for s, gw in self.gws)