diff --git a/README.md b/README.md index 3e302c3..048eafa 100644 --- a/README.md +++ b/README.md @@ -14,13 +14,13 @@ Install the two sub-packages: - fsspec-proxy, a fastAPI-based server which reads/writes to configured storage locations - pyscript-fsspec-client, a filesystem implementation that connects to the proxy, - allowing even pyscript to access bytes in remote stores + allowing even pyscript to access bytes in remote stores. Now run: ```bash $ fsspec-proxy dev ``` -to start the (unsecured) proxy server, with port 8000. Further arguments +This starts the (unsecured) proxy server, with port 8000. Further arguments will be passed to fastAPI to configure, for example, the port and address to listen on. @@ -33,6 +33,15 @@ server can be reconfigured via an API call. or auth. It can be regarded as a prototype to base production-level implementations on. +Options +------- + +- `run` (default) runs the server in production mode +- `dev` run the server in development mode +- `private` adds Access-Control-Allow-Private-Network header to allow some + requests in some CORS situations. If you are seeing CORS issues, adding + this might help. + Demo ---- @@ -43,27 +52,21 @@ The server will show incoming byte range requests, and you can also track them in the browser's debug console. The end result should be a table view of the contents of the target parquet file. -Installation with Optional Dependencies (fsspec-proxy) ------------------------------------------------------ - -The following steps apply only to the `fsspec-proxy` package. The package has -several optional dependency groups: - -- `s3`: Required for S3 access (needed for the "Conda Stats" example) -- `anaconda`: Required for Anaconda Cloud access -- `all`: All optional dependencies +By default, the server attempts to instantiate S3 and anaconda filesystems, +but will skip these with a message if the dependencies are not available. The +demo uses the S3 backend, so you will need S3 support (below). S3 Support -~~~~~~~~~~ +---------- To use S3 functionality (including the "Conda Stats" example): ```bash -pip install .[s3] +pip install "./fsspec-proxy[s3]" ``` Anaconda Cloud Support -~~~~~~~~~~~~~~~~~~~~~~ +---------------------- To use Anaconda Cloud functionality, you'll need to install dependencies from the Anaconda Cloud index. You can do this in two ways: @@ -86,20 +89,3 @@ the Anaconda Cloud index. You can do this in two ways: ```bash pip install .[anaconda] --extra-index-url https://pypi.anaconda.org/anaconda-cloud/simple ``` - -All Optional Dependencies -~~~~~~~~~~~~~~~~~~~~~~~~ - -To install all optional dependencies: - -```bash -# With pip config -pip install .[all] - -# Or directly with extra index -pip install .[all] --extra-index-url https://pypi.anaconda.org/anaconda-cloud/simple -``` - -This will ensure that all required packages for `fsspec-proxy`, including those -only available on Anaconda Cloud, are installed. ->>>>>>> 4408c85bdd76295a43f0d7a20041a646e46b3f25 diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..267fdfb --- /dev/null +++ b/config.yaml @@ -0,0 +1,14 @@ +sources: + - name: inmemory + path: memory://mytests + - name: local + path: file:///Users + readonly: true + - name: "Conda Stats" + path: "s3://anaconda-package-data/conda/hourly/" + kwargs: + anon: True + - name: "MyAnaconda" + path: "anaconda://my/" +allow_reload: true + diff --git a/fsspec-proxy/fsspec_proxy/__main__.py b/fsspec-proxy/fsspec_proxy/__main__.py index 11e34c6..7b79f92 100644 --- a/fsspec-proxy/fsspec_proxy/__main__.py +++ b/fsspec-proxy/fsspec_proxy/__main__.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +import os import re import sys @@ -11,7 +12,9 @@ def run_main(): mode = "dev" if "dev" in sys.argv else "run" - argv = [_ for _ in sys.argv if _ not in {"dev", "run"}] + # TODO: this should be unified with the config + os.environ["FS_PROXY_PRIVATE"] = str("private" in sys.argv) + argv = [_ for _ in sys.argv if _ not in {"dev", "run", "private"}] sys.argv = [ re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0]), mode, diff --git a/fsspec-proxy/fsspec_proxy/bytes_server.py b/fsspec-proxy/fsspec_proxy/bytes_server.py index d948c3c..78b47e4 100644 --- a/fsspec-proxy/fsspec_proxy/bytes_server.py +++ b/fsspec-proxy/fsspec_proxy/bytes_server.py @@ -1,7 +1,7 @@ from contextlib import asynccontextmanager import io import fastapi -from fastapi.middleware.cors import CORSMiddleware +from fsspec_proxy.cors import CORSMiddleware from starlette.responses import StreamingResponse from fsspec_proxy import file_manager @@ -62,7 +62,7 @@ async def delete_file(key, path, response: fastapi.Response): if fs_info.get("readonly"): raise fastapi.HTTPException(status_code=403, detail="Not Allowed") try: - out = await fs_info["instance"]._rm_file(path) + await fs_info["instance"]._rm_file(path) except FileNotFoundError: raise fastapi.HTTPException(status_code=404, detail="Item not found") except PermissionError: @@ -93,7 +93,6 @@ async def put_bytes(key, path, request: fastapi.Request, response: fastapi.Respo raise fastapi.HTTPException(status_code=403, detail="Not Allowed") path = f"{fs_info['path'].rstrip('/')}/{path.lstrip('/')}" data = await request.body() - print("####", data) try: await fs_info["instance"]._pipe_file(path, data) except FileNotFoundError: diff --git a/fsspec-proxy/fsspec_proxy/cors.py b/fsspec-proxy/fsspec_proxy/cors.py new file mode 100644 index 0000000..c9e355d --- /dev/null +++ b/fsspec-proxy/fsspec_proxy/cors.py @@ -0,0 +1,178 @@ +"""Copy of fastapi.middleware.cors, with """ + +from __future__ import annotations + +import functools +import os +import re +import typing + +from starlette.datastructures import Headers, MutableHeaders +from starlette.responses import PlainTextResponse, Response +from starlette.types import ASGIApp, Message, Receive, Scope, Send + +PRIVATE = os.getenv("FS_PROXY_PRIVATE", False) == "True" +ALL_METHODS = ("DELETE", "GET", "HEAD", "OPTIONS", "PATCH", "POST", "PUT") +SAFELISTED_HEADERS = {"Accept", "Accept-Language", "Content-Language", "Content-Type"} + + +class CORSMiddleware: + def __init__( + self, + app: ASGIApp, + allow_origins: typing.Sequence[str] = (), + allow_methods: typing.Sequence[str] = ("GET",), + allow_headers: typing.Sequence[str] = (), + allow_credentials: bool = False, + allow_origin_regex: str | None = None, + expose_headers: typing.Sequence[str] = (), + max_age: int = 600, + ) -> None: + if "*" in allow_methods: + allow_methods = ALL_METHODS + + compiled_allow_origin_regex = None + if allow_origin_regex is not None: + compiled_allow_origin_regex = re.compile(allow_origin_regex) + + allow_all_origins = "*" in allow_origins + allow_all_headers = "*" in allow_headers + preflight_explicit_allow_origin = not allow_all_origins or allow_credentials + + simple_headers = {} + if allow_all_origins: + simple_headers["Access-Control-Allow-Origin"] = "*" + if allow_credentials: + simple_headers["Access-Control-Allow-Credentials"] = "true" + if expose_headers: + simple_headers["Access-Control-Expose-Headers"] = ", ".join(expose_headers) + + preflight_headers = {} + if preflight_explicit_allow_origin: + # The origin value will be set in preflight_response() if it is allowed. + preflight_headers["Vary"] = "Origin" + else: + preflight_headers["Access-Control-Allow-Origin"] = "*" + preflight_headers.update( + { + "Access-Control-Allow-Methods": ", ".join(allow_methods), + "Access-Control-Max-Age": str(max_age), + } + ) + allow_headers = sorted(SAFELISTED_HEADERS | set(allow_headers)) + if allow_headers and not allow_all_headers: + preflight_headers["Access-Control-Allow-Headers"] = ", ".join(allow_headers) + if allow_credentials: + preflight_headers["Access-Control-Allow-Credentials"] = "true" + + self.app = app + self.allow_origins = allow_origins + self.allow_methods = allow_methods + self.allow_headers = [h.lower() for h in allow_headers] + self.allow_all_origins = allow_all_origins + self.allow_all_headers = allow_all_headers + self.preflight_explicit_allow_origin = preflight_explicit_allow_origin + self.allow_origin_regex = compiled_allow_origin_regex + self.simple_headers = simple_headers + self.preflight_headers = preflight_headers + + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: + if scope["type"] != "http": # pragma: no cover + await self.app(scope, receive, send) + return + + method = scope["method"] + headers = Headers(scope=scope) + origin = headers.get("origin") + + if origin is None: + await self.app(scope, receive, send) + return + + if method == "OPTIONS" and "access-control-request-method" in headers: + response = self.preflight_response(request_headers=headers) + await response(scope, receive, send) + return + + await self.simple_response(scope, receive, send, request_headers=headers) + + def is_allowed_origin(self, origin: str) -> bool: + if self.allow_all_origins: + return True + + if self.allow_origin_regex is not None and self.allow_origin_regex.fullmatch(origin): + return True + + return origin in self.allow_origins + + def preflight_response(self, request_headers: Headers) -> Response: + requested_origin = request_headers["origin"] + requested_method = request_headers["access-control-request-method"] + requested_headers = request_headers.get("access-control-request-headers") + + headers = dict(self.preflight_headers) + failures = [] + + if self.is_allowed_origin(origin=requested_origin): + if self.preflight_explicit_allow_origin: + # The "else" case is already accounted for in self.preflight_headers + # and the value would be "*". + headers["Access-Control-Allow-Origin"] = requested_origin + else: + failures.append("origin") + + if requested_method not in self.allow_methods: + failures.append("method") + + # If we allow all headers, then we have to mirror back any requested + # headers in the response. + if self.allow_all_headers and requested_headers is not None: + headers["Access-Control-Allow-Headers"] = requested_headers + elif requested_headers is not None: + for header in [h.lower() for h in requested_headers.split(",")]: + if header.strip() not in self.allow_headers: + failures.append("headers") + break + + # We don't strictly need to use 400 responses here, since its up to + # the browser to enforce the CORS policy, but its more informative + # if we do. + if failures: + failure_text = "Disallowed CORS " + ", ".join(failures) + return PlainTextResponse(failure_text, status_code=400, headers=headers) + + if PRIVATE: + headers["Access-Control-Allow-Private-Network"] = "true" + return PlainTextResponse("OK", status_code=200, headers=headers) + + async def simple_response(self, scope: Scope, receive: Receive, send: Send, request_headers: Headers) -> None: + send = functools.partial(self.send, send=send, request_headers=request_headers) + await self.app(scope, receive, send) + + async def send(self, message: Message, send: Send, request_headers: Headers) -> None: + if message["type"] != "http.response.start": + await send(message) + return + + message.setdefault("headers", []) + headers = MutableHeaders(scope=message) + headers.update(self.simple_headers) + origin = request_headers["Origin"] + has_cookie = "cookie" in request_headers + + # If request includes any cookie headers, then we must respond + # with the specific origin instead of '*'. + if self.allow_all_origins and has_cookie: + self.allow_explicit_origin(headers, origin) + + # If we only allow specific origins, then we have to mirror back + # the Origin header in the response. + elif not self.allow_all_origins and self.is_allowed_origin(origin=origin): + self.allow_explicit_origin(headers, origin) + + await send(message) + + @staticmethod + def allow_explicit_origin(headers: MutableHeaders, origin: str) -> None: + headers["Access-Control-Allow-Origin"] = origin + headers.add_vary_header("Origin") diff --git a/fsspec-proxy/fsspec_proxy/file_manager.py b/fsspec-proxy/fsspec_proxy/file_manager.py index b1971f9..2348429 100644 --- a/fsspec-proxy/fsspec_proxy/file_manager.py +++ b/fsspec-proxy/fsspec_proxy/file_manager.py @@ -1,11 +1,14 @@ -from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper -import fsspec.utils import io +import logging import os import yaml -import logging + +from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper +import fsspec.utils logger = logging.getLogger("fsspec_proxy") +# TODO: this config is copied as config.yaml; de-dup and move other options +# into the config default_config = b"""sources: - name: inmemory path: memory://mytests @@ -62,7 +65,7 @@ def initialize_filesystems(self): fs, url2 = fsspec.url_to_fs(fs_path, **kwargs) except Exception: # or we could still list show their names but not the contents - logger.error("Instantiating filesystem failed") + logger.exception("Instantiating filesystem %s failed, skipping", key) continue if not fs.async_impl: fs = AsyncFileSystemWrapper(fs) diff --git a/fsspec-proxy/pyproject.toml b/fsspec-proxy/pyproject.toml index 23dae2d..d852a58 100644 --- a/fsspec-proxy/pyproject.toml +++ b/fsspec-proxy/pyproject.toml @@ -29,7 +29,7 @@ dependencies = [ ] dynamic = ["version", "urls", "keywords"] -[dependency-groups] +[project.optional-dependencies] s3 = [ "s3fs" ] @@ -37,8 +37,8 @@ anaconda = [ "anaconda-cloud-storage" ] all = [ - {include-group = "s3"}, - {include-group = "anaconda"} + "s3fs", + "anaconda-cloud-storage" ] [tool.hatch.build.hooks.version]