Skip to content

Commit

Permalink
add UPath tests (failing)
Browse files Browse the repository at this point in the history
  • Loading branch information
d-v-b committed Jun 12, 2024
1 parent 68f9d2c commit 8fa06e1
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 21 deletions.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ extra-dependencies = [
"flask-cors",
"flask",
"requests",
"mypy"
"mypy",
"universal_pathlib"
]
features = ["extra"]

Expand Down
6 changes: 4 additions & 2 deletions src/zarr/store/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ def __init__(
"""

super().__init__(mode=mode)
self._url = url
if isinstance(url, str):
self._url = url
self._fs, self.path = fsspec.url_to_fs(url, **storage_options)
elif hasattr(url, "protocol") and hasattr(url, "fs"):
# is UPath-like - but without importing
Expand All @@ -62,8 +62,10 @@ def __init__(
"If constructed with a UPath object, no additional "
"storage_options are allowed"
)

self._url = str(url)
self.path = url.path
self._fs = url._fs
self._fs = url.fs
else:
raise ValueError("URL not understood, %s", url)
self.allowed_exceptions = allowed_exceptions
Expand Down
69 changes: 51 additions & 18 deletions tests/v3/test_store/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import fsspec
import pytest
from upath import UPath

from zarr.buffer import Buffer, default_buffer_prototype
from zarr.store import RemoteStore
Expand All @@ -16,7 +17,7 @@
test_bucket_name = "test"
secure_bucket_name = "test-secure"
port = 5555
endpoint_uri = f"http://127.0.0.1:{port}/"
endpoint_url = f"http://127.0.0.1:{port}/"


@pytest.fixture(scope="module")
Expand All @@ -40,18 +41,31 @@ def get_boto3_client():

# NB: we use the sync botocore client for setup
session = Session()
return session.create_client("s3", endpoint_url=endpoint_uri)
return session.create_client("s3", endpoint_url=endpoint_url)


@pytest.fixture(autouse=True, scope="function")
def s3(s3_base):
"""
Quoting Martin Durant:
pytest-asyncio creates a new event loop for each async test.
When an async-mode s3fs instance is made from async, it will be assigned to the loop from
which it is made. That means that if you use s3fs again from a subsequent test,
you will have the same identical instance, but be running on a different loop - which fails.
For the rest: it's very convenient to clean up the state of the store between tests,
make sure we start off blank each time.
https://github.com/zarr-developers/zarr-python/pull/1785#discussion_r1634856207
"""
client = get_boto3_client()
client.create_bucket(Bucket=test_bucket_name, ACL="public-read")
s3fs.S3FileSystem.clear_instance_cache()
s3 = s3fs.S3FileSystem(anon=False, client_kwargs={"endpoint_url": endpoint_uri})
s3 = s3fs.S3FileSystem(anon=False, client_kwargs={"endpoint_url": endpoint_url})
s3.invalidate_cache()
yield s3
requests.post(f"{endpoint_uri}/moto-api/reset")
requests.post(f"{endpoint_url}/moto-api/reset")
client.close()


# ### end from s3fs ### #
Expand All @@ -65,7 +79,7 @@ async def alist(it):


async def test_basic():
store = RemoteStore(f"s3://{test_bucket_name}", mode="w", endpoint_url=endpoint_uri, anon=False)
store = RemoteStore(f"s3://{test_bucket_name}", mode="w", endpoint_url=endpoint_url, anon=False)
assert not await alist(store.list())
assert not await store.exists("foo")
data = b"hello"
Expand All @@ -81,29 +95,48 @@ async def test_basic():
class TestRemoteStoreS3(StoreTests[RemoteStore]):
store_cls = RemoteStore

@pytest.fixture(scope="function")
def store_kwargs(self) -> dict[str, str | bool]:
return {
"mode": "w",
"endpoint_url": endpoint_uri,
"anon": False,
"url": f"s3://{test_bucket_name}",
}
@pytest.fixture(scope="function", params=(False, True))
def store_kwargs(self, request) -> dict[str, str | bool]:
url = f"s3://{test_bucket_name}"
anon = False
mode = "w"
if request.param:
return {"mode": mode, "url": UPath(url, endpoint_url=endpoint_url, anon=anon)}
return {"url": url, "mode": mode, "anon": anon, "endpoint_url": endpoint_url}

@pytest.fixture(scope="function")
def store(self, store_kwargs: dict[str, str | bool]) -> RemoteStore:
self._fs, _ = fsspec.url_to_fs(asynchronous=False, **store_kwargs)
out = self.store_cls(asynchronous=True, **store_kwargs)
url = store_kwargs["url"]
mode = store_kwargs["mode"]
if isinstance(url, UPath):
out = self.store_cls(url=url, mode=mode)
else:
endpoint_url = store_kwargs["endpoint_url"]
out = self.store_cls(url=url, asynchronous=True, mode=mode, endpoint_url=endpoint_url)
return out

def get(self, store: RemoteStore, key: str) -> Buffer:
return Buffer.from_bytes(self._fs.cat(f"{store.path}/{key}"))
# make a new, synchronous instance of the filesystem because this test is run in sync code
fs, _ = fsspec.url_to_fs(
url=store._url,
asynchronous=False,
anon=store._fs.anon,
endpoint_url=store._fs.endpoint_url,
)
return Buffer.from_bytes(fs.cat(f"{store.path}/{key}"))

def set(self, store: RemoteStore, key: str, value: Buffer) -> None:
self._fs.write_bytes(f"{store.path}/{key}", value.to_bytes())
# make a new, synchronous instance of the filesystem because this test is run in sync code
fs, _ = fsspec.url_to_fs(
url=store._url,
asynchronous=False,
anon=store._fs.anon,
endpoint_url=store._fs.endpoint_url,
)
fs.write_bytes(f"{store.path}/{key}", value.to_bytes())

def test_store_repr(self, store: RemoteStore) -> None:
assert str(store) == f"{store._url}"
assert str(store) == f"s3://{test_bucket_name}"

def test_store_supports_writes(self, store: RemoteStore) -> None:
assert True
Expand Down

0 comments on commit 8fa06e1

Please sign in to comment.