Skip to content
4 changes: 3 additions & 1 deletion awswrangler/s3/_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ def merge_datasets(
target_path = target_path[:-1] if target_path[-1] == "/" else target_path
session: boto3.Session = _utils.ensure_session(session=boto3_session)

paths: List[str] = list_objects(path=f"{source_path}/", ignore_empty=ignore_empty, boto3_session=session)
paths: List[str] = list_objects( # type: ignore
path=f"{source_path}/", ignore_empty=ignore_empty, boto3_session=session
)
_logger.debug("len(paths): %s", len(paths))
if len(paths) < 1:
return []
Expand Down
74 changes: 52 additions & 22 deletions awswrangler/s3/_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import datetime
import fnmatch
import logging
from typing import Any, Dict, List, Optional, Sequence, Union
from typing import Any, Dict, Iterator, List, Optional, Sequence, Union

import boto3
import botocore.exceptions
Expand All @@ -28,7 +28,7 @@ def _path2list(
_suffix: Optional[List[str]] = [suffix] if isinstance(suffix, str) else suffix
_ignore_suffix: Optional[List[str]] = [ignore_suffix] if isinstance(ignore_suffix, str) else ignore_suffix
if isinstance(path, str): # prefix
paths: List[str] = list_objects(
paths: List[str] = list_objects( # type: ignore
path=path,
suffix=_suffix,
ignore_suffix=_ignore_suffix,
Expand Down Expand Up @@ -79,22 +79,27 @@ def _list_objects( # pylint: disable=too-many-branches
last_modified_end: Optional[datetime.datetime] = None,
boto3_session: Optional[boto3.Session] = None,
ignore_empty: bool = False,
) -> List[str]:
) -> Iterator[List[str]]:
bucket: str
prefix_original: str
bucket, prefix_original = _utils.parse_path(path=path)
prefix: str = _prefix_cleanup(prefix=prefix_original)
_suffix: Union[List[str], None] = [suffix] if isinstance(suffix, str) else suffix
_ignore_suffix: Union[List[str], None] = [ignore_suffix] if isinstance(ignore_suffix, str) else ignore_suffix
client_s3: boto3.client = _utils.client(service_name="s3", session=boto3_session)
default_pagination: Dict[str, int] = {"PageSize": 1000}
extra_kwargs: Dict[str, Any] = {"PaginationConfig": default_pagination}
if s3_additional_kwargs:
extra_kwargs: Dict[str, Any] = _fs.get_botocore_valid_kwargs(
extra_kwargs = _fs.get_botocore_valid_kwargs(
function_name="list_objects_v2", s3_additional_kwargs=s3_additional_kwargs
)
else:
extra_kwargs = {}
extra_kwargs["PaginationConfig"] = (
s3_additional_kwargs["PaginationConfig"]
if "PaginationConfig" in s3_additional_kwargs
else default_pagination
)
paginator = client_s3.get_paginator("list_objects_v2")
args: Dict[str, Any] = {"Bucket": bucket, "Prefix": prefix, "PaginationConfig": {"PageSize": 1000}, **extra_kwargs}
args: Dict[str, Any] = {"Bucket": bucket, "Prefix": prefix, **extra_kwargs}
if delimiter is not None:
args["Delimiter"] = delimiter
_logger.debug("args: %s", args)
Expand Down Expand Up @@ -127,13 +132,15 @@ def _list_objects( # pylint: disable=too-many-branches
key = pfx["Prefix"]
paths.append(f"s3://{bucket}/{key}")

if prefix != prefix_original:
paths = fnmatch.filter(paths, path)
if prefix != prefix_original:
paths = fnmatch.filter(paths, path)

if _ignore_suffix is not None:
paths = [p for p in paths if p.endswith(tuple(_ignore_suffix)) is False]
if _ignore_suffix is not None:
paths = [p for p in paths if p.endswith(tuple(_ignore_suffix)) is False]

return paths
if paths:
yield paths
paths = []


def does_object_exist(
Expand Down Expand Up @@ -203,8 +210,11 @@ def does_object_exist(


def list_directories(
path: str, s3_additional_kwargs: Optional[Dict[str, Any]] = None, boto3_session: Optional[boto3.Session] = None
) -> List[str]:
path: str,
chunked: bool = False,
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
boto3_session: Optional[boto3.Session] = None,
) -> Union[List[str], Iterator[List[str]]]:
"""List Amazon S3 objects from a prefix.

This function accepts Unix shell-style wildcards in the path argument.
Expand All @@ -217,6 +227,8 @@ def list_directories(
----------
path : str
S3 path (e.g. s3://bucket/prefix).
chunked: bool
If True returns iterator, and a single list otherwise. False by default.
s3_additional_kwargs : Optional[Dict[str, Any]]
Forwarded to botocore requests.
e.g. s3_additional_kwargs={'RequestPayer': 'requester'}
Expand All @@ -225,7 +237,7 @@ def list_directories(

Returns
-------
List[str]
Union[List[str], Iterator[List[str]]]
List of objects paths.

Examples
Expand All @@ -244,9 +256,15 @@ def list_directories(
['s3://bucket/prefix/dir0/', 's3://bucket/prefix/dir1/', 's3://bucket/prefix/dir2/']

"""
return _list_objects(
path=path, delimiter="/", boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs
result_iterator = _list_objects(
path=path,
delimiter="/",
boto3_session=boto3_session,
s3_additional_kwargs=s3_additional_kwargs,
)
if chunked:
return result_iterator
return [path for paths in result_iterator for path in paths]


def list_objects(
Expand All @@ -256,9 +274,10 @@ def list_objects(
last_modified_begin: Optional[datetime.datetime] = None,
last_modified_end: Optional[datetime.datetime] = None,
ignore_empty: bool = False,
chunked: bool = False,
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
boto3_session: Optional[boto3.Session] = None,
) -> List[str]:
) -> Union[List[str], Iterator[List[str]]]:
"""List Amazon S3 objects from a prefix.

This function accepts Unix shell-style wildcards in the path argument.
Expand Down Expand Up @@ -287,6 +306,8 @@ def list_objects(
The filter is applied only after list all s3 files.
ignore_empty: bool
Ignore files with 0 bytes.
chunked: bool
If True returns iterator, and a single list otherwise. False by default.
s3_additional_kwargs : Optional[Dict[str, Any]]
Forwarded to botocore requests.
e.g. s3_additional_kwargs={'RequestPayer': 'requester'}
Expand All @@ -295,7 +316,7 @@ def list_objects(

Returns
-------
List[str]
Union[List[str], Iterator[List[str]]]
List of objects paths.

Examples
Expand All @@ -314,15 +335,24 @@ def list_objects(
['s3://bucket/prefix0', 's3://bucket/prefix1', 's3://bucket/prefix2']

"""
paths: List[str] = _list_objects(
# On top of user provided ignore_suffix input, add "/"
ignore_suffix_acc = set("/")
if isinstance(ignore_suffix, str):
ignore_suffix_acc.add(ignore_suffix)
elif isinstance(ignore_suffix, list):
ignore_suffix_acc.update(ignore_suffix)

result_iterator = _list_objects(
path=path,
delimiter=None,
suffix=suffix,
ignore_suffix=ignore_suffix,
ignore_suffix=list(ignore_suffix_acc),
boto3_session=boto3_session,
last_modified_begin=last_modified_begin,
last_modified_end=last_modified_end,
ignore_empty=ignore_empty,
s3_additional_kwargs=s3_additional_kwargs,
)
return [p for p in paths if not p.endswith("/")]
if chunked:
return result_iterator
return [path for paths in result_iterator for path in paths]
2 changes: 1 addition & 1 deletion awswrangler/timestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def query(

Returns
-------
pd.DataFrame
Union[pd.DataFrame, Iterator[pd.DataFrame]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks :)

Pandas DataFrame https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html

Examples
Expand Down
18 changes: 11 additions & 7 deletions tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,15 +323,19 @@ def test_prefix_cleanup():
assert wr.s3._list._prefix_cleanup(glob.escape("foo[]boo")) == glob.escape("foo")


def test_prefix_list(path):
@pytest.mark.parametrize(
"s3_additional_kwargs",
[None, {"FetchOwner": True}, {"PaginationConfig": {"PageSize": 100}}],
)
def test_prefix_list(path, s3_additional_kwargs):
df = pd.DataFrame({"c0": [0]})
prefixes = ["foo1boo", "foo2boo", "foo3boo", "foo10boo", "foo*boo", "abc1boo", "foo1abc"]
paths = [path + p for p in prefixes]
for p in paths:
wr.s3.to_parquet(df=df, path=p)
assert len(wr.s3.list_objects(path + "*")) == 7
assert len(wr.s3.list_objects(path + "foo*")) == 6
assert len(wr.s3.list_objects(path + "*boo")) == 6
assert len(wr.s3.list_objects(path + "foo?boo")) == 4
assert len(wr.s3.list_objects(path + "foo*boo")) == 5
assert len(wr.s3.list_objects(path + "foo[12]boo")) == 2
assert len(wr.s3.list_objects(path + "*", s3_additional_kwargs=s3_additional_kwargs)) == 7
assert len(wr.s3.list_objects(path + "foo*", s3_additional_kwargs=s3_additional_kwargs)) == 6
assert len(wr.s3.list_objects(path + "*boo", s3_additional_kwargs=s3_additional_kwargs)) == 6
assert len(wr.s3.list_objects(path + "foo?boo", s3_additional_kwargs=s3_additional_kwargs)) == 4
assert len(wr.s3.list_objects(path + "foo*boo", s3_additional_kwargs=s3_additional_kwargs)) == 5
assert len(wr.s3.list_objects(path + "foo[12]boo", s3_additional_kwargs=s3_additional_kwargs)) == 2