Skip to content

Commit

Permalink
Create async iterator version of _lsdir() (#670)
Browse files Browse the repository at this point in the history
* Extract _do_lsdir() from _lsdir()

* Refactor _do_lsdir() to do most of the work inside the loop

* Replace _do_lsdir() with async generator _iterdir()

* Add docstring for _iterdir()

* Move 'versions' consistency check from _lsdir() to _iterdir()
  • Loading branch information
rlamy committed Dec 12, 2022
1 parent f26f379 commit 5917684
Showing 1 changed file with 69 additions and 51 deletions.
120 changes: 69 additions & 51 deletions s3fs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,11 +661,6 @@ async def _lsdir(
prefix="",
versions=False,
):
if versions and not self.version_aware:
raise ValueError(
"versions cannot be specified if the filesystem is not version aware"
)

bucket, key, _ = self.split_path(path)
if not prefix:
prefix = ""
Expand All @@ -674,53 +669,20 @@ async def _lsdir(
if path not in self.dircache or refresh or not delimiter or versions:
try:
logger.debug("Get directory listing page for %s" % path)
await self.set_session()
s3 = await self.get_s3(bucket)
if self.version_aware:
method = "list_object_versions"
contents_key = "Versions"
else:
method = "list_objects_v2"
contents_key = "Contents"
pag = s3.get_paginator(method)
config = {}
if max_items is not None:
config.update(MaxItems=max_items, PageSize=2 * max_items)
it = pag.paginate(
Bucket=bucket,
Prefix=prefix,
Delimiter=delimiter,
PaginationConfig=config,
**self.req_kw,
)
dirs = []
files = []
dircache = []
async for i in it:
dircache.extend(i.get("CommonPrefixes", []))
for c in i.get(contents_key, []):
if not self.version_aware or c.get("IsLatest") or versions:
c["type"] = "file"
c["size"] = c["Size"]
files.append(c)
if dircache:
files.extend(
[
{
"Key": l["Prefix"][:-1],
"Size": 0,
"StorageClass": "DIRECTORY",
"type": "directory",
"size": 0,
}
for l in dircache
]
)
for f in files:
f["Key"] = "/".join([bucket, f["Key"]])
f["name"] = f["Key"]
version_id = f.get("VersionId")
if versions and version_id and version_id != "null":
f["name"] += f"?versionId={version_id}"
async for c in self._iterdir(
bucket,
max_items=max_items,
delimiter=delimiter,
prefix=prefix,
versions=versions,
):
if c["type"] == "directory":
dirs.append(c)
else:
files.append(c)
files += dirs
except ClientError as e:
raise translate_boto_error(e)

Expand All @@ -729,6 +691,62 @@ async def _lsdir(
return files
return self.dircache[path]

async def _iterdir(
self, bucket, max_items=None, delimiter="/", prefix="", versions=False
):
"""Iterate asynchronously over files and directories under `prefix`.
The contents are yielded in arbitrary order as info dicts.
"""
if versions and not self.version_aware:
raise ValueError(
"versions cannot be specified if the filesystem is not version aware"
)
await self.set_session()
s3 = await self.get_s3(bucket)
if self.version_aware:
method = "list_object_versions"
contents_key = "Versions"
else:
method = "list_objects_v2"
contents_key = "Contents"
pag = s3.get_paginator(method)
config = {}
if max_items is not None:
config.update(MaxItems=max_items, PageSize=2 * max_items)
it = pag.paginate(
Bucket=bucket,
Prefix=prefix,
Delimiter=delimiter,
PaginationConfig=config,
**self.req_kw,
)
async for i in it:
for l in i.get("CommonPrefixes", []):
c = {
"Key": l["Prefix"][:-1],
"Size": 0,
"StorageClass": "DIRECTORY",
"type": "directory",
}
self._fill_info(c, bucket, versions=False)
yield c
for c in i.get(contents_key, []):
if not self.version_aware or c.get("IsLatest") or versions:
c["type"] = "file"
c["size"] = c["Size"]
self._fill_info(c, bucket, versions=versions)
yield c

@staticmethod
def _fill_info(f, bucket, versions=False):
f["size"] = f["Size"]
f["Key"] = "/".join([bucket, f["Key"]])
f["name"] = f["Key"]
version_id = f.get("VersionId")
if versions and version_id and version_id != "null":
f["name"] += f"?versionId={version_id}"

async def _glob(self, path, **kwargs):
if path.startswith("*"):
raise ValueError("Cannot traverse all of S3")
Expand Down

0 comments on commit 5917684

Please sign in to comment.