Skip to content

Commit

Permalink
find/ls: use list_object_versions instead of list_objects_v2 for vers…
Browse files Browse the repository at this point in the history
…ion aware filesystems (#654)

* add test for version-aware info after ls

* ls: use list_object_versions for version_aware fs

* lint

* update version_aware docstring

Co-authored-by: Martin Durant <martin.durant@alumni.utoronto.ca>
  • Loading branch information
pmrowla and martindurant committed Oct 4, 2022
1 parent 1c1ab4e commit 962c8af
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 10 deletions.
43 changes: 33 additions & 10 deletions s3fs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,12 @@ class S3FileSystem(AsyncFileSystem):
version_aware : bool (False)
Whether to support bucket versioning. If enable this will require the
user to have the necessary IAM permissions for dealing with versioned
objects.
objects. Note that in the event that you only need to work with the
latest version of objects in a versioned bucket, and do not need the
VersionId for those objects, you should set ``version_aware`` to False
for performance reasons. When set to True, filesystem instances will
use the S3 ListObjectVersions API call to list directory contents,
which requires listing all historical object versions.
cache_regions : bool (False)
Whether to cache bucket regions or not. Whenever a new bucket is used,
it will first find out which region it belongs and then use the client
Expand Down Expand Up @@ -659,7 +664,13 @@ async def _lsdir(
logger.debug("Get directory listing page for %s" % path)
await self.set_session()
s3 = await self.get_s3(bucket)
pag = s3.get_paginator("list_objects_v2")
if self.version_aware:
method = "list_object_versions"
contents_key = "Versions"
else:
method = "list_objects_v2"
contents_key = "Contents"
pag = s3.get_paginator(method)
config = {}
if max_items is not None:
config.update(MaxItems=max_items, PageSize=2 * max_items)
Expand All @@ -674,10 +685,11 @@ async def _lsdir(
dircache = []
async for i in it:
dircache.extend(i.get("CommonPrefixes", []))
for c in i.get("Contents", []):
c["type"] = "file"
c["size"] = c["Size"]
files.append(c)
for c in i.get(contents_key, []):
if not self.version_aware or c.get("IsLatest"):
c["type"] = "file"
c["size"] = c["Size"]
files.append(c)
if dircache:
files.extend(
[
Expand Down Expand Up @@ -1161,10 +1173,21 @@ async def _info(self, path, bucket=None, key=None, refresh=False, version_id=Non
if not refresh:
out = self._ls_from_cache(path)
if out is not None:
out = [o for o in out if o["name"] == path]
if out:
return out[0]
return {"name": path, "size": 0, "type": "directory"}
if self.version_aware and version_id is not None:
# If cached info does not match requested version_id,
# fallback to calling head_object
out = [
o
for o in out
if o["name"] == path and version_id == o.get("VersionId")
]
if out:
return out[0]
else:
out = [o for o in out if o["name"] == path]
if out:
return out[0]
return {"name": path, "size": 0, "type": "directory"}
if key:
try:
out = await self._call_s3(
Expand Down
27 changes: 27 additions & 0 deletions s3fs/tests/test_s3fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1622,6 +1622,33 @@ def test_versions_unaware(s3):
fo.read()


def test_versions_dircached(s3):
versioned_file = versioned_bucket_name + "/dir/versioned_file"
s3 = S3FileSystem(
anon=False, version_aware=True, client_kwargs={"endpoint_url": endpoint_uri}
)
with s3.open(versioned_file, "wb") as fo:
fo.write(b"1")
first_version = fo.version_id
with s3.open(versioned_file, "wb") as fo:
fo.write(b"2")
second_version = fo.version_id
s3.find(versioned_bucket_name)
cached = s3.dircache[versioned_bucket_name + "/dir"][0]

assert cached.get("VersionId") == second_version
assert s3.info(versioned_file) == cached

assert (
s3.info(versioned_file, version_id=first_version).get("VersionId")
== first_version
)
assert (
s3.info(versioned_file, version_id=second_version).get("VersionId")
== second_version
)


def test_text_io__stream_wrapper_works(s3):
"""Ensure using TextIOWrapper works."""
s3.mkdir("bucket")
Expand Down

0 comments on commit 962c8af

Please sign in to comment.