From 0376aefa25cc6ad9422e9a226a0d635bad64a63f Mon Sep 17 00:00:00 2001 From: igorborgest Date: Tue, 21 Apr 2020 23:26:58 -0300 Subject: [PATCH 1/2] Add chunked=INTEGER option to ensure batch number of rows #192 --- awswrangler/athena.py | 61 +++++++--- awswrangler/db.py | 29 ++++- awswrangler/s3.py | 124 +++++++++++++++++---- testing/test_awswrangler/test_data_lake.py | 36 ++++++ 4 files changed, 209 insertions(+), 41 deletions(-) diff --git a/awswrangler/athena.py b/awswrangler/athena.py index 1933606ad..a899d405b 100644 --- a/awswrangler/athena.py +++ b/awswrangler/athena.py @@ -329,7 +329,7 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals database: str, ctas_approach: bool = True, categories: List[str] = None, - chunksize: Optional[int] = None, + chunksize: Optional[Union[int, bool]] = None, s3_output: Optional[str] = None, workgroup: Optional[str] = None, encryption: Optional[str] = None, @@ -353,10 +353,6 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals CONS: Slower (But stills faster than other libraries that uses the regular Athena API) and does not handle nested types at all. - Note - ---- - If `chunksize` is passed, then a Generator of DataFrames is returned. - Note ---- If `ctas_approach` is True, `chunksize` will return non deterministic chunks sizes, @@ -367,6 +363,21 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals Create the default Athena bucket if it doesn't exist and s3_output is None. (E.g. s3://aws-athena-query-results-ACCOUNT-REGION/) + Note + ---- + ``Batching`` (`chunksize` argument) (Memory Friendly): + + Will anable the function to return a Iterable of DataFrames instead of a regular DataFrame. + + There are two batching strategies on Wrangler: + + - If **chunksize=True**, a new DataFrame will be returned for each file in the query result. + + - If **chunked=INTEGER**, Wrangler will iterate on the data by number of rows igual the received INTEGER. + + `P.S.` `chunksize=True` if faster and uses less memory while `chunksize=INTEGER` is more precise + in number of rows for each Dataframe. + Note ---- In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). @@ -383,8 +394,10 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals categories: List[str], optional List of columns names that should be returned as pandas.Categorical. Recommended for memory restricted environments. - chunksize: int, optional - If specified, return an generator where chunksize is the number of rows to include in each chunk. + chunksize : Union[int, bool], optional + If passed will split the data in a Iterable of DataFrames (Memory friendly). + If `True` wrangler will iterate on the data by files in the most efficient way without guarantee of chunksize. + If an `INTEGER` is passed Wrangler will iterate on the data by number of rows igual the received INTEGER. s3_output : str, optional AWS S3 path. workgroup : str, optional @@ -454,7 +467,7 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals catalog.delete_table_if_exists(database=database, table=name, boto3_session=session) manifest_path: str = f"{_s3_output}/tables/{query_id}-manifest.csv" paths: List[str] = _extract_ctas_manifest_paths(path=manifest_path, boto3_session=session) - chunked: bool = chunksize is not None + chunked: Union[bool, int] = False if chunksize is None else chunksize _logger.debug(f"chunked: {chunked}") if not paths: if chunked is False: @@ -473,6 +486,8 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals path = f"{_s3_output}/{query_id}.csv" s3.wait_objects_exist(paths=[path], use_threads=False, boto3_session=session) _logger.debug(f"Start CSV reading from {path}") + _chunksize: Optional[int] = chunksize if isinstance(chunksize, int) else None + _logger.debug(f"_chunksize: {_chunksize}") ret = s3.read_csv( path=[path], dtype=dtype, @@ -481,7 +496,7 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals quoting=csv.QUOTE_ALL, keep_default_na=False, na_values=[""], - chunksize=chunksize, + chunksize=_chunksize, skip_blank_lines=False, use_threads=False, boto3_session=session, @@ -565,7 +580,7 @@ def read_sql_table( database: str, ctas_approach: bool = True, categories: List[str] = None, - chunksize: Optional[int] = None, + chunksize: Optional[Union[int, bool]] = None, s3_output: Optional[str] = None, workgroup: Optional[str] = None, encryption: Optional[str] = None, @@ -589,10 +604,6 @@ def read_sql_table( CONS: Slower (But stills faster than other libraries that uses the regular Athena API) and does not handle nested types at all - Note - ---- - If `chunksize` is passed, then a Generator of DataFrames is returned. - Note ---- If `ctas_approach` is True, `chunksize` will return non deterministic chunks sizes, @@ -603,6 +614,21 @@ def read_sql_table( Create the default Athena bucket if it doesn't exist and s3_output is None. (E.g. s3://aws-athena-query-results-ACCOUNT-REGION/) + Note + ---- + ``Batching`` (`chunksize` argument) (Memory Friendly): + + Will anable the function to return a Iterable of DataFrames instead of a regular DataFrame. + + There are two batching strategies on Wrangler: + + - If **chunksize=True**, a new DataFrame will be returned for each file in the query result. + + - If **chunked=INTEGER**, Wrangler will iterate on the data by number of rows igual the received INTEGER. + + `P.S.` `chunksize=True` if faster and uses less memory while `chunksize=INTEGER` is more precise + in number of rows for each Dataframe. + Note ---- In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). @@ -619,8 +645,10 @@ def read_sql_table( categories: List[str], optional List of columns names that should be returned as pandas.Categorical. Recommended for memory restricted environments. - chunksize: int, optional - If specified, return an generator where chunksize is the number of rows to include in each chunk. + chunksize : Union[int, bool], optional + If passed will split the data in a Iterable of DataFrames (Memory friendly). + If `True` wrangler will iterate on the data by files in the most efficient way without guarantee of chunksize. + If an `INTEGER` is passed Wrangler will iterate on the data by number of rows igual the received INTEGER. s3_output : str, optional AWS S3 path. workgroup : str, optional @@ -646,6 +674,7 @@ def read_sql_table( >>> df = wr.athena.read_sql_table(table='...', database='...') """ + table = catalog.sanitize_table_name(table=table) return read_sql_query( sql=f'SELECT * FROM "{table}"', database=database, diff --git a/awswrangler/db.py b/awswrangler/db.py index 491fe7784..2c8ac2799 100644 --- a/awswrangler/db.py +++ b/awswrangler/db.py @@ -888,7 +888,7 @@ def unload_redshift( con: sqlalchemy.engine.Engine, iam_role: str, categories: List[str] = None, - chunked: bool = False, + chunked: Union[bool, int] = False, keep_files: bool = False, use_threads: bool = True, boto3_session: Optional[boto3.Session] = None, @@ -906,6 +906,22 @@ def unload_redshift( https://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html + Note + ---- + ``Batching`` (`chunked` argument) (Memory Friendly): + + Will anable the function to return a Iterable of DataFrames instead of a regular DataFrame. + + There are two batching strategies on Wrangler: + + - If **chunked=True**, a new DataFrame will be returned for each file in your path/dataset. + + - If **chunked=INTEGER**, Wrangler will iterate on the data by number of rows igual the received INTEGER. + + `P.S.` `chunked=True` if faster and uses less memory while `chunked=INTEGER` is more precise + in number of rows for each Dataframe. + + Note ---- In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). @@ -926,9 +942,10 @@ def unload_redshift( Recommended for memory restricted environments. keep_files : bool Should keep the stage files? - chunked : bool - If True will break the data in smaller DataFrames (Non deterministic number of lines). - Otherwise return a single DataFrame with the whole data. + chunked : Union[int, bool] + If passed will split the data in a Iterable of DataFrames (Memory friendly). + If `True` wrangler will iterate on the data by files in the most efficient way without guarantee of chunksize. + If an `INTEGER` is passed Wrangler will iterate on the data by number of rows igual the received INTEGER. use_threads : bool True to enable concurrent requests, False to disable multiple threads. If enabled os.cpu_count() will be used as the max number of threads. @@ -979,6 +996,7 @@ def unload_redshift( return _read_parquet_iterator( paths=paths, categories=categories, + chunked=chunked, use_threads=use_threads, boto3_session=session, s3_additional_kwargs=s3_additional_kwargs, @@ -991,13 +1009,14 @@ def _read_parquet_iterator( keep_files: bool, use_threads: bool, categories: List[str] = None, + chunked: Union[bool, int] = True, boto3_session: Optional[boto3.Session] = None, s3_additional_kwargs: Optional[Dict[str, str]] = None, ) -> Iterator[pd.DataFrame]: dfs: Iterator[pd.DataFrame] = s3.read_parquet( path=paths, categories=categories, - chunked=True, + chunked=chunked, dataset=False, use_threads=use_threads, boto3_session=boto3_session, diff --git a/awswrangler/s3.py b/awswrangler/s3.py index f728937db..0127f8897 100644 --- a/awswrangler/s3.py +++ b/awswrangler/s3.py @@ -1501,6 +1501,7 @@ def _read_parquet_init( filters=filters, read_dictionary=categories, validate_schema=validate_schema, + split_row_groups=False, ) return data @@ -1510,7 +1511,7 @@ def read_parquet( filters: Optional[Union[List[Tuple], List[List[Tuple]]]] = None, columns: Optional[List[str]] = None, validate_schema: bool = True, - chunked: bool = False, + chunked: Union[bool, int] = False, dataset: bool = False, categories: List[str] = None, use_threads: bool = True, @@ -1522,6 +1523,22 @@ def read_parquet( The concept of Dataset goes beyond the simple idea of files and enable more complex features like partitioning and catalog integration (AWS Glue Catalog). + Note + ---- + ``Batching`` (`chunked` argument) (Memory Friendly): + + Will anable the function to return a Iterable of DataFrames instead of a regular DataFrame. + + There are two batching strategies on Wrangler: + + - If **chunked=True**, a new DataFrame will be returned for each file in your path/dataset. + + - If **chunked=INTEGER**, Wrangler will iterate on the data by number of rows igual the received INTEGER. + + `P.S.` `chunked=True` if faster and uses less memory while `chunked=INTEGER` is more precise + in number of rows for each Dataframe. + + Note ---- In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). @@ -1538,11 +1555,12 @@ def read_parquet( Check that individual file schemas are all the same / compatible. Schemas within a folder prefix should all be the same. Disable if you have schemas that are different and want to disable this check. - chunked : bool - If True will break the data in smaller DataFrames (Non deterministic number of lines). - Otherwise return a single DataFrame with the whole data. + chunked : Union[int, bool] + If passed will split the data in a Iterable of DataFrames (Memory friendly). + If `True` wrangler will iterate on the data by files in the most efficient way without guarantee of chunksize. + If an `INTEGER` is passed Wrangler will iterate on the data by number of rows igual the received INTEGER. dataset: bool - If True read a parquet dataset instead of simple file(s) loading all the related partitions as columns. + If `True` read a parquet dataset instead of simple file(s) loading all the related partitions as columns. categories: List[str], optional List of columns names that should be returned as pandas.Categorical. Recommended for memory restricted environments. @@ -1583,29 +1601,43 @@ def read_parquet( >>> import awswrangler as wr >>> df = wr.s3.read_parquet(path=['s3://bucket/filename0.parquet', 's3://bucket/filename1.parquet']) - Reading in chunks + Reading in chunks (Chunk by file) >>> import awswrangler as wr >>> dfs = wr.s3.read_parquet(path=['s3://bucket/filename0.csv', 's3://bucket/filename1.csv'], chunked=True) >>> for df in dfs: >>> print(df) # Smaller Pandas DataFrame + Reading in chunks (Chunk by 1MM rows) + + >>> import awswrangler as wr + >>> dfs = wr.s3.read_parquet(path=['s3://bucket/filename0.csv', 's3://bucket/filename1.csv'], chunked=1_000_000) + >>> for df in dfs: + >>> print(df) # 1MM Pandas DataFrame + """ data: pyarrow.parquet.ParquetDataset = _read_parquet_init( path=path, filters=filters, dataset=dataset, categories=categories, + validate_schema=validate_schema, use_threads=use_threads, boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs, - validate_schema=validate_schema, ) if chunked is False: return _read_parquet( data=data, columns=columns, categories=categories, use_threads=use_threads, validate_schema=validate_schema ) - return _read_parquet_chunked(data=data, columns=columns, categories=categories, use_threads=use_threads) + return _read_parquet_chunked( + data=data, + columns=columns, + categories=categories, + chunked=chunked, + use_threads=use_threads, + validate_schema=validate_schema, + ) def _read_parquet( @@ -1639,22 +1671,50 @@ def _read_parquet_chunked( data: pyarrow.parquet.ParquetDataset, columns: Optional[List[str]] = None, categories: List[str] = None, + validate_schema: bool = True, + chunked: Union[bool, int] = True, use_threads: bool = True, ) -> Iterator[pd.DataFrame]: + promote: bool = not validate_schema + next_slice: Optional[pa.Table] = None for piece in data.pieces: table: pa.Table = piece.read( columns=columns, use_threads=use_threads, partitions=data.partitions, use_pandas_metadata=False ) - yield table.to_pandas( - use_threads=use_threads, - split_blocks=True, - self_destruct=True, - integer_object_nulls=False, - date_as_object=True, - ignore_metadata=True, - categories=categories, - types_mapper=_data_types.pyarrow2pandas_extension, - ) + if chunked is True: + yield _table2df(table=table, categories=categories, use_threads=use_threads) + else: + if next_slice is not None: + table = pa.lib.concat_tables([next_slice, table], promote=promote) + length: int = len(table) + while True: + if length == chunked: + yield _table2df(table=table, categories=categories, use_threads=use_threads) + next_slice = None + break + if length < chunked: + next_slice = table + break + yield _table2df( + table=table.slice(offset=0, length=chunked), categories=categories, use_threads=use_threads + ) + table = table.slice(offset=chunked, length=None) + length = len(table) + if next_slice is not None: + yield _table2df(table=next_slice, categories=categories, use_threads=use_threads) + + +def _table2df(table: pa.Table, categories: List[str] = None, use_threads: bool = True) -> pd.DataFrame: + return table.to_pandas( + use_threads=use_threads, + split_blocks=True, + self_destruct=True, + integer_object_nulls=False, + date_as_object=True, + ignore_metadata=True, + categories=categories, + types_mapper=_data_types.pyarrow2pandas_extension, + ) def read_parquet_metadata( @@ -1972,13 +2032,30 @@ def read_parquet_table( filters: Optional[Union[List[Tuple], List[List[Tuple]]]] = None, columns: Optional[List[str]] = None, categories: List[str] = None, - chunked: bool = False, + chunked: Union[bool, int] = False, use_threads: bool = True, boto3_session: Optional[boto3.Session] = None, s3_additional_kwargs: Optional[Dict[str, str]] = None, ) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]: """Read Apache Parquet table registered on AWS Glue Catalog. + Note + ---- + ``Batching`` (`chunked` argument) (Memory Friendly): + + Will anable the function to return a Iterable of DataFrames instead of a regular DataFrame. + + There are two batching strategies on Wrangler: + + - If **chunked=True**, a new DataFrame will be returned for each file in your path/dataset. + + - If **chunked=INTEGER**, Wrangler will paginate through files slicing and concatenating + to return DataFrames with the number of row igual the received INTEGER. + + `P.S.` `chunked=True` if faster and uses less memory while `chunked=INTEGER` is more precise + in number of rows for each Dataframe. + + Note ---- In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). @@ -2032,13 +2109,20 @@ def read_parquet_table( ... } ... ) - Reading Parquet Table in chunks + Reading Parquet Table in chunks (Chunk by file) >>> import awswrangler as wr >>> dfs = wr.s3.read_parquet_table(database='...', table='...', chunked=True) >>> for df in dfs: >>> print(df) # Smaller Pandas DataFrame + Reading in chunks (Chunk by 1MM rows) + + >>> import awswrangler as wr + >>> dfs = wr.s3.read_parquet(path=['s3://bucket/filename0.csv', 's3://bucket/filename1.csv'], chunked=1_000_000) + >>> for df in dfs: + >>> print(df) # 1MM Pandas DataFrame + """ path: str = catalog.get_table_location(database=database, table=table, boto3_session=boto3_session) return read_parquet( diff --git a/testing/test_awswrangler/test_data_lake.py b/testing/test_awswrangler/test_data_lake.py index afa2a8307..15369ee2c 100644 --- a/testing/test_awswrangler/test_data_lake.py +++ b/testing/test_awswrangler/test_data_lake.py @@ -3,6 +3,7 @@ import gzip import logging import lzma +import math from io import BytesIO, TextIOWrapper import boto3 @@ -1084,3 +1085,38 @@ def test_copy(bucket): wr.s3.delete_objects(path=path) wr.s3.delete_objects(path=path2) + + +@pytest.mark.parametrize("col2", [[1, 1, 1, 1, 1], [1, 2, 3, 4, 5], [1, 1, 1, 1, 2], [1, 2, 2, 2, 2]]) +@pytest.mark.parametrize("chunked", [True, 1, 2, 100]) +def test_parquet_chunked(bucket, database, col2, chunked): + table = f"test_parquet_chunked_{chunked}_{''.join([str(x) for x in col2])}" + path = f"s3://{bucket}/{table}/" + wr.s3.delete_objects(path=path) + values = list(range(5)) + df = pd.DataFrame({"col1": values, "col2": col2}) + paths = wr.s3.to_parquet( + df, path, index=False, dataset=True, database=database, table=table, partition_cols=["col2"], mode="overwrite" + )["paths"] + wr.s3.wait_objects_exist(paths=paths) + + dfs = list(wr.s3.read_parquet(path=path, dataset=True, chunked=chunked)) + assert sum(values) == pd.concat(dfs, ignore_index=True).col1.sum() + if chunked is not True: + assert len(dfs) == int(math.ceil(len(df) / chunked)) + for df2 in dfs[:-1]: + assert chunked == len(df2) + assert chunked >= len(dfs[-1]) + else: + assert len(dfs) == len(set(col2)) + + dfs = list(wr.athena.read_sql_table(database=database, table=table, chunksize=chunked)) + assert sum(values) == pd.concat(dfs, ignore_index=True).col1.sum() + if chunked is not True: + assert len(dfs) == int(math.ceil(len(df) / chunked)) + for df2 in dfs[:-1]: + assert chunked == len(df2) + assert chunked >= len(dfs[-1]) + + wr.s3.delete_objects(path=paths) + assert wr.catalog.delete_table_if_exists(database=database, table=table) is True From aeb8792c939db9138660be33854027434ddf4677 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Mon, 27 Apr 2020 13:01:43 -0300 Subject: [PATCH 2/2] Improving the chunksize parser slicer algorithm --- awswrangler/s3.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/awswrangler/s3.py b/awswrangler/s3.py index 0127f8897..7090c2c0f 100644 --- a/awswrangler/s3.py +++ b/awswrangler/s3.py @@ -1684,23 +1684,15 @@ def _read_parquet_chunked( if chunked is True: yield _table2df(table=table, categories=categories, use_threads=use_threads) else: - if next_slice is not None: + if next_slice: table = pa.lib.concat_tables([next_slice, table], promote=promote) - length: int = len(table) - while True: - if length == chunked: - yield _table2df(table=table, categories=categories, use_threads=use_threads) - next_slice = None - break - if length < chunked: - next_slice = table - break + while len(table) >= chunked: yield _table2df( table=table.slice(offset=0, length=chunked), categories=categories, use_threads=use_threads ) table = table.slice(offset=chunked, length=None) - length = len(table) - if next_slice is not None: + next_slice = table + if next_slice: yield _table2df(table=next_slice, categories=categories, use_threads=use_threads)