Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions example/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ def console_print(x):
print(x)

pw.sync.session = io.request
pw.sync.batch = io.batch
pw.sync.console_print = console_print
20 changes: 14 additions & 6 deletions example/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,23 @@
import pandas as pd
import pyscript_fsspec_client.client

fs = fsspec.filesystem("pyscript")
sync.console_print(str(fs.ls("local")))
fs = fsspec.filesystem("pyscript", base_url="http://localhost:8000/local")
sync.console_print(str(fs.ls("")))

out = fs.cat("local/mdurant/code/fsspec-proxy/pyproject.toml")
out = fs.cat("mdurant/code/fsspec-proxy/pyproject.toml")
sync.console_print(str(("binary:", type(out), out)))

out = fs.cat("local/mdurant/code/fsspec-proxy/pyproject.toml", start=0, end=10)
out = fs.cat("mdurant/code/fsspec-proxy/pyproject.toml", start=0, end=10)
sync.console_print(str(("binary:", type(out), out)))

fs.pipe_file("local/mdurant/code/fsspec-proxy/OUTPUT", b"hello world")
out = fs.cat_ranges(
paths=["mdurant/code/fsspec-proxy/pyproject.toml"] * 3,
starts=[0, 0, 20],
ends=[1, 10, 30]
)
sync.console_print(str(("binary:", type(out), out)))

fs.pipe_file("mdurant/code/fsspec-proxy/OUTPUT", b"hello world")


def make_output(table):
Expand All @@ -24,5 +31,6 @@ def make_output(table):
new_div.innerHTML = table
page.append(new_div)

my_data = pd.read_parquet("pyscript://Conda Stats/2017/01/2017-01-07.parquet")
my_data = pd.read_parquet("pyscript://2017/01/2017-01-07.parquet",
storage_options={"base_url": "http://localhost:8000/Conda Stats"})
make_output(my_data[:100].to_html())
2 changes: 1 addition & 1 deletion fsspec-proxy/fsspec_proxy/bytes_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async def list_dir(key, path):
except FileNotFoundError:
raise fastapi.HTTPException(status_code=404, detail="Item not found")
out = [
{"name": f"{key}/{o['name'].replace(fs_info['path'], '', 1).lstrip('/')}",
{"name": f"{o['name'].replace(fs_info['path'], '', 1).lstrip('/')}",
"size": o["size"], "type": o["type"]}
for o in out
]
Expand Down
19 changes: 17 additions & 2 deletions pyscript-fsspec-client/pyscript_fsspec_client/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""An fsspec filesystem that proxies via pyscriptapps.com."""

from json import dumps, loads
from json import loads
import logging

from pyscript import sync, ffi
Expand All @@ -16,7 +16,7 @@ class PyscriptFileSystem(AbstractFileSystem):

protocol = "pyscript"

def __init__(self, base_url):
def __init__(self, base_url="http://0.0.0.0:8000/local"):
super().__init__()
self.base_url = base_url

Expand Down Expand Up @@ -44,6 +44,21 @@ def rm_file(self, path):
path = self._strip_protocol(path)
self._call(f"delete/{path}", method="DELETE", binary=True)

def cat_ranges(
self, paths, starts, ends, max_gap=None, on_error="return", **kwargs
):
logger.debug("cat_ranges: %s paths", len(paths))
out = sync.batch(
[{
"args": ("GET", f"{self.base_url}/bytes/{path}"),
"kwargs": {"headers": ffi.to_js({"Range": f"bytes={s}-{e + 1}"}), "outmode": "bytes"}
}
for path, s, e in zip(paths, starts, ends)],
)
return [(OSError(0, o) if isinstance(o, str) and o == "ISawAnError"
else bytes(o.to_py()))
for o in out]

def _open(
self,
path,
Expand Down
57 changes: 39 additions & 18 deletions pyscript-fsspec-client/pyscript_fsspec_client/io.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,46 @@
import json
import pyscript
import asyncio
import js
from pyodide import ffi, console
from pyscript import window

LOGGING = False


async def request(method, path, data=None, headers=None,
outmode="text", **kwargs):
if data:
resp = await js.fetch(path, method=method, body=data.buffer, headers=headers or {},
**kwargs)
else:
resp = await js.fetch(path, method=method, headers=headers or {},
**kwargs)
if not resp.ok:
return "ISawAnError"
if resp.status >= 400:
if LOGGING:
print(method, path, outmode, kwargs, headers)
try:
if data:
resp = await js.fetch(path, method=method, body=data.buffer, headers=headers or {},
**kwargs)
else:
resp = await js.fetch(path, method=method, headers=ffi.to_js(headers) or {},
**kwargs)
except Exception as e:
window.console.log(str(e))
return "ISawAnError"
if outmode == "text":
return await resp.text()
if outmode == "bytes":
return await resp.arrayBuffer()
if outmode is None:
return
return "ISawAnError"
if not resp.ok:
out = "ISawAnError"
elif resp.status >= 400:
out = "ISawAnError"
elif outmode == "text":
out = await resp.text()
elif outmode == "bytes":
out = await resp.arrayBuffer()
elif outmode is None:
out = None
else:
out = "ISawAnError"
if LOGGING:
print(out)
return out


async def batch(requests):
requests = [r.to_py() for r in requests]
out = asyncio.gather(
*[request(*r["args"], **r["kwargs"]) for r in requests],
return_exceptions=True
)
return out