Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 7 additions & 23 deletions fsspec-proxy/fsspec_proxy/bytes_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
from fsspec_proxy import file_manager


URL_SCHEMA = "{prefix}/{key}/{op}/{path}"
# where op = bytes | list

@asynccontextmanager
async def lifespan(app: fastapi.FastAPI):
# start instances in async context
Expand All @@ -24,18 +27,7 @@ async def lifespan(app: fastapi.FastAPI):
)


@app.get("/api/list")
async def list_root():
keys = list(app.manager.filesystems)
return {
"status": "ok",
"contents": [
{"name": k, "size": 0, "type": "directory"} for k in keys
]
}


@app.get("/api/list/{key}/{path:path}")
@app.get("/{key}/list/{path:path}")
async def list_dir(key, path):
fs_info = app.manager.get_filesystem(key)
if fs_info is None:
Expand All @@ -53,7 +45,7 @@ async def list_dir(key, path):
return {"status": "ok", "contents": out}


@app.delete("/api/delete/{key}/{path:path}")
@app.delete("/{key}/delete/{path:path}")
async def delete_file(key, path, response: fastapi.Response):
fs_info = app.manager.get_filesystem(key)
path = f"{fs_info['path'].rstrip('/')}/{path.lstrip('/')}"
Expand All @@ -70,7 +62,7 @@ async def delete_file(key, path, response: fastapi.Response):
response.status_code = 204


@app.get("/api/bytes/{key}/{path:path}")
@app.get("/{key}/bytes/{path:path}")
async def get_bytes(key, path, request: fastapi.Request):
start, end = _process_range(request.headers.get("Range"))
fs_info = app.manager.get_filesystem(key)
Expand All @@ -84,7 +76,7 @@ async def get_bytes(key, path, request: fastapi.Request):
return StreamingResponse(io.BytesIO(out), media_type="application/octet-stream")


@app.post("/api/bytes/{key}/{path:path}")
@app.post("/{key}/bytes/{path:path}")
async def put_bytes(key, path, request: fastapi.Request, response: fastapi.Response):
fs_info = app.manager.get_filesystem(key)
if fs_info is None:
Expand All @@ -101,14 +93,6 @@ async def put_bytes(key, path, request: fastapi.Request, response: fastapi.Respo
return {"contents": []}


@app.post("/api/config")
async def setup(request: fastapi.Request):
if not app.manager.config.get("allow_reload", False):
raise fastapi.HTTPException(status_code=403, detail="Not Allowed")
app.manager.config = await request.json()
app.manager.initialize_filesystems()


def _process_range(range):
if range and range.startswith("bytes=") and range.count("-") == 1:
sstart, sstop = range.split("=")[1].split("-")
Expand Down
20 changes: 6 additions & 14 deletions pyscript-fsspec-client/pyscript_fsspec_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

logger = logging.getLogger("pyscript_fsspec_client")
fsspec.utils.setup_logging(logger=logger)
default_endpoint = os.getenv("FSSPEC_PROXY_URL", "http://127.0.0.1:8000/api")
default_endpoint = os.getenv("FSSPEC_PROXY_URL", "http://127.0.0.1:8000")


class PyscriptFileSystem(AbstractFileSystem):
Expand Down Expand Up @@ -57,9 +57,9 @@ def ls(self, path, detail=True, **kwargs):
key, *path = path.split("/", 1)
if key:
part = path[0] if path else ""
out = self._call(f"list/{key}/{part}")
out = self._call(f"{key}/list/{part}")
else:
out = self._call(f"list")
raise ValueError

if detail:
return out
Expand All @@ -68,7 +68,7 @@ def ls(self, path, detail=True, **kwargs):
def rm_file(self, path):
path = self._strip_protocol(path)
key, path = path.split("/", 1)
self._call(f"delete/{key}/{path}", method="DELETE", binary=True)
self._call(f"{key}/delete/{path}", method="DELETE", binary=True)

def _open(
self,
Expand All @@ -87,19 +87,11 @@ def cat_file(self, path, start=None, end=None, **kw):
range = (start, end + 1)
else:
range = None
return self._call(f"bytes/{key}/{relpath}", binary=True, range=range)
return self._call(f"{key}/bytes/{relpath}", binary=True, range=range)

def pipe_file(self, path, value, mode="overwrite", **kwargs):
key, relpath = self._split_path(path)
self._call(f"bytes/{key}/{relpath}", method="POST", data=value)

def reconfigure(self, config):
# only privileged identities can do this
if not "sources" in config:
raise ValueError("Bad config")
if not ["name" in _ and "path" in _ for _ in config["sources"]]:
raise ValueError("Bad config")
self._call(f"config", method="POST", json=config, binary=True)
self._call(f"{key}/bytes/{relpath}", method="POST", data=value)


class JFile(AbstractBufferedFile):
Expand Down
10 changes: 1 addition & 9 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def server():
raise
count -= 1
time.sleep(0.1)
yield f"{s}/api"
yield f"{s}"
P.terminate()
P.wait()

Expand All @@ -42,11 +42,3 @@ def test_file(fs):
assert f.read() == b"hello"
fs.rm("inmemory/afile")
assert not fs.exists("inmemory/afile")


def test_config(fs):
out = fs.ls("", detail=False)
assert "inmemory" in out and "local" in out # other spaces might fail
fs.reconfigure({"sources": [{"name": "mem", "path": "memory://"}]})
out = fs.ls("", detail=False)
assert out == ["mem"]