diff --git a/awswrangler/s3/_copy.py b/awswrangler/s3/_copy.py index fca198865..6c9b5249d 100644 --- a/awswrangler/s3/_copy.py +++ b/awswrangler/s3/_copy.py @@ -125,7 +125,9 @@ def merge_datasets( target_path = target_path[:-1] if target_path[-1] == "/" else target_path session: boto3.Session = _utils.ensure_session(session=boto3_session) - paths: List[str] = list_objects(path=f"{source_path}/", ignore_empty=ignore_empty, boto3_session=session) + paths: List[str] = list_objects( # type: ignore + path=f"{source_path}/", ignore_empty=ignore_empty, boto3_session=session + ) _logger.debug("len(paths): %s", len(paths)) if len(paths) < 1: return [] diff --git a/awswrangler/s3/_list.py b/awswrangler/s3/_list.py index 1232c67ef..8efa8c44e 100644 --- a/awswrangler/s3/_list.py +++ b/awswrangler/s3/_list.py @@ -3,7 +3,7 @@ import datetime import fnmatch import logging -from typing import Any, Dict, List, Optional, Sequence, Union +from typing import Any, Dict, Iterator, List, Optional, Sequence, Union import boto3 import botocore.exceptions @@ -28,7 +28,7 @@ def _path2list( _suffix: Optional[List[str]] = [suffix] if isinstance(suffix, str) else suffix _ignore_suffix: Optional[List[str]] = [ignore_suffix] if isinstance(ignore_suffix, str) else ignore_suffix if isinstance(path, str): # prefix - paths: List[str] = list_objects( + paths: List[str] = list_objects( # type: ignore path=path, suffix=_suffix, ignore_suffix=_ignore_suffix, @@ -79,7 +79,7 @@ def _list_objects( # pylint: disable=too-many-branches last_modified_end: Optional[datetime.datetime] = None, boto3_session: Optional[boto3.Session] = None, ignore_empty: bool = False, -) -> List[str]: +) -> Iterator[List[str]]: bucket: str prefix_original: str bucket, prefix_original = _utils.parse_path(path=path) @@ -87,14 +87,19 @@ def _list_objects( # pylint: disable=too-many-branches _suffix: Union[List[str], None] = [suffix] if isinstance(suffix, str) else suffix _ignore_suffix: Union[List[str], None] = [ignore_suffix] if isinstance(ignore_suffix, str) else ignore_suffix client_s3: boto3.client = _utils.client(service_name="s3", session=boto3_session) + default_pagination: Dict[str, int] = {"PageSize": 1000} + extra_kwargs: Dict[str, Any] = {"PaginationConfig": default_pagination} if s3_additional_kwargs: - extra_kwargs: Dict[str, Any] = _fs.get_botocore_valid_kwargs( + extra_kwargs = _fs.get_botocore_valid_kwargs( function_name="list_objects_v2", s3_additional_kwargs=s3_additional_kwargs ) - else: - extra_kwargs = {} + extra_kwargs["PaginationConfig"] = ( + s3_additional_kwargs["PaginationConfig"] + if "PaginationConfig" in s3_additional_kwargs + else default_pagination + ) paginator = client_s3.get_paginator("list_objects_v2") - args: Dict[str, Any] = {"Bucket": bucket, "Prefix": prefix, "PaginationConfig": {"PageSize": 1000}, **extra_kwargs} + args: Dict[str, Any] = {"Bucket": bucket, "Prefix": prefix, **extra_kwargs} if delimiter is not None: args["Delimiter"] = delimiter _logger.debug("args: %s", args) @@ -127,13 +132,15 @@ def _list_objects( # pylint: disable=too-many-branches key = pfx["Prefix"] paths.append(f"s3://{bucket}/{key}") - if prefix != prefix_original: - paths = fnmatch.filter(paths, path) + if prefix != prefix_original: + paths = fnmatch.filter(paths, path) - if _ignore_suffix is not None: - paths = [p for p in paths if p.endswith(tuple(_ignore_suffix)) is False] + if _ignore_suffix is not None: + paths = [p for p in paths if p.endswith(tuple(_ignore_suffix)) is False] - return paths + if paths: + yield paths + paths = [] def does_object_exist( @@ -203,8 +210,11 @@ def does_object_exist( def list_directories( - path: str, s3_additional_kwargs: Optional[Dict[str, Any]] = None, boto3_session: Optional[boto3.Session] = None -) -> List[str]: + path: str, + chunked: bool = False, + s3_additional_kwargs: Optional[Dict[str, Any]] = None, + boto3_session: Optional[boto3.Session] = None, +) -> Union[List[str], Iterator[List[str]]]: """List Amazon S3 objects from a prefix. This function accepts Unix shell-style wildcards in the path argument. @@ -217,6 +227,8 @@ def list_directories( ---------- path : str S3 path (e.g. s3://bucket/prefix). + chunked: bool + If True returns iterator, and a single list otherwise. False by default. s3_additional_kwargs : Optional[Dict[str, Any]] Forwarded to botocore requests. e.g. s3_additional_kwargs={'RequestPayer': 'requester'} @@ -225,7 +237,7 @@ def list_directories( Returns ------- - List[str] + Union[List[str], Iterator[List[str]]] List of objects paths. Examples @@ -244,9 +256,15 @@ def list_directories( ['s3://bucket/prefix/dir0/', 's3://bucket/prefix/dir1/', 's3://bucket/prefix/dir2/'] """ - return _list_objects( - path=path, delimiter="/", boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs + result_iterator = _list_objects( + path=path, + delimiter="/", + boto3_session=boto3_session, + s3_additional_kwargs=s3_additional_kwargs, ) + if chunked: + return result_iterator + return [path for paths in result_iterator for path in paths] def list_objects( @@ -256,9 +274,10 @@ def list_objects( last_modified_begin: Optional[datetime.datetime] = None, last_modified_end: Optional[datetime.datetime] = None, ignore_empty: bool = False, + chunked: bool = False, s3_additional_kwargs: Optional[Dict[str, Any]] = None, boto3_session: Optional[boto3.Session] = None, -) -> List[str]: +) -> Union[List[str], Iterator[List[str]]]: """List Amazon S3 objects from a prefix. This function accepts Unix shell-style wildcards in the path argument. @@ -287,6 +306,8 @@ def list_objects( The filter is applied only after list all s3 files. ignore_empty: bool Ignore files with 0 bytes. + chunked: bool + If True returns iterator, and a single list otherwise. False by default. s3_additional_kwargs : Optional[Dict[str, Any]] Forwarded to botocore requests. e.g. s3_additional_kwargs={'RequestPayer': 'requester'} @@ -295,7 +316,7 @@ def list_objects( Returns ------- - List[str] + Union[List[str], Iterator[List[str]]] List of objects paths. Examples @@ -314,15 +335,24 @@ def list_objects( ['s3://bucket/prefix0', 's3://bucket/prefix1', 's3://bucket/prefix2'] """ - paths: List[str] = _list_objects( + # On top of user provided ignore_suffix input, add "/" + ignore_suffix_acc = set("/") + if isinstance(ignore_suffix, str): + ignore_suffix_acc.add(ignore_suffix) + elif isinstance(ignore_suffix, list): + ignore_suffix_acc.update(ignore_suffix) + + result_iterator = _list_objects( path=path, delimiter=None, suffix=suffix, - ignore_suffix=ignore_suffix, + ignore_suffix=list(ignore_suffix_acc), boto3_session=boto3_session, last_modified_begin=last_modified_begin, last_modified_end=last_modified_end, ignore_empty=ignore_empty, s3_additional_kwargs=s3_additional_kwargs, ) - return [p for p in paths if not p.endswith("/")] + if chunked: + return result_iterator + return [path for paths in result_iterator for path in paths] diff --git a/awswrangler/timestream.py b/awswrangler/timestream.py index 630332f52..660b7edb5 100644 --- a/awswrangler/timestream.py +++ b/awswrangler/timestream.py @@ -251,7 +251,7 @@ def query( Returns ------- - pd.DataFrame + Union[pd.DataFrame, Iterator[pd.DataFrame]] Pandas DataFrame https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html Examples diff --git a/tests/test_s3.py b/tests/test_s3.py index 4db1acf2a..e5f82f190 100644 --- a/tests/test_s3.py +++ b/tests/test_s3.py @@ -323,15 +323,19 @@ def test_prefix_cleanup(): assert wr.s3._list._prefix_cleanup(glob.escape("foo[]boo")) == glob.escape("foo") -def test_prefix_list(path): +@pytest.mark.parametrize( + "s3_additional_kwargs", + [None, {"FetchOwner": True}, {"PaginationConfig": {"PageSize": 100}}], +) +def test_prefix_list(path, s3_additional_kwargs): df = pd.DataFrame({"c0": [0]}) prefixes = ["foo1boo", "foo2boo", "foo3boo", "foo10boo", "foo*boo", "abc1boo", "foo1abc"] paths = [path + p for p in prefixes] for p in paths: wr.s3.to_parquet(df=df, path=p) - assert len(wr.s3.list_objects(path + "*")) == 7 - assert len(wr.s3.list_objects(path + "foo*")) == 6 - assert len(wr.s3.list_objects(path + "*boo")) == 6 - assert len(wr.s3.list_objects(path + "foo?boo")) == 4 - assert len(wr.s3.list_objects(path + "foo*boo")) == 5 - assert len(wr.s3.list_objects(path + "foo[12]boo")) == 2 + assert len(wr.s3.list_objects(path + "*", s3_additional_kwargs=s3_additional_kwargs)) == 7 + assert len(wr.s3.list_objects(path + "foo*", s3_additional_kwargs=s3_additional_kwargs)) == 6 + assert len(wr.s3.list_objects(path + "*boo", s3_additional_kwargs=s3_additional_kwargs)) == 6 + assert len(wr.s3.list_objects(path + "foo?boo", s3_additional_kwargs=s3_additional_kwargs)) == 4 + assert len(wr.s3.list_objects(path + "foo*boo", s3_additional_kwargs=s3_additional_kwargs)) == 5 + assert len(wr.s3.list_objects(path + "foo[12]boo", s3_additional_kwargs=s3_additional_kwargs)) == 2