Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 45 additions & 16 deletions awswrangler/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals
database: str,
ctas_approach: bool = True,
categories: List[str] = None,
chunksize: Optional[int] = None,
chunksize: Optional[Union[int, bool]] = None,
s3_output: Optional[str] = None,
workgroup: Optional[str] = None,
encryption: Optional[str] = None,
Expand All @@ -353,10 +353,6 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals
CONS: Slower (But stills faster than other libraries that uses the regular Athena API)
and does not handle nested types at all.

Note
----
If `chunksize` is passed, then a Generator of DataFrames is returned.

Note
----
If `ctas_approach` is True, `chunksize` will return non deterministic chunks sizes,
Expand All @@ -367,6 +363,21 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals
Create the default Athena bucket if it doesn't exist and s3_output is None.
(E.g. s3://aws-athena-query-results-ACCOUNT-REGION/)

Note
----
``Batching`` (`chunksize` argument) (Memory Friendly):

Will anable the function to return a Iterable of DataFrames instead of a regular DataFrame.

There are two batching strategies on Wrangler:

- If **chunksize=True**, a new DataFrame will be returned for each file in the query result.

- If **chunked=INTEGER**, Wrangler will iterate on the data by number of rows igual the received INTEGER.

`P.S.` `chunksize=True` if faster and uses less memory while `chunksize=INTEGER` is more precise
in number of rows for each Dataframe.

Note
----
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
Expand All @@ -383,8 +394,10 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals
categories: List[str], optional
List of columns names that should be returned as pandas.Categorical.
Recommended for memory restricted environments.
chunksize: int, optional
If specified, return an generator where chunksize is the number of rows to include in each chunk.
chunksize : Union[int, bool], optional
If passed will split the data in a Iterable of DataFrames (Memory friendly).
If `True` wrangler will iterate on the data by files in the most efficient way without guarantee of chunksize.
If an `INTEGER` is passed Wrangler will iterate on the data by number of rows igual the received INTEGER.
s3_output : str, optional
AWS S3 path.
workgroup : str, optional
Expand Down Expand Up @@ -454,7 +467,7 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals
catalog.delete_table_if_exists(database=database, table=name, boto3_session=session)
manifest_path: str = f"{_s3_output}/tables/{query_id}-manifest.csv"
paths: List[str] = _extract_ctas_manifest_paths(path=manifest_path, boto3_session=session)
chunked: bool = chunksize is not None
chunked: Union[bool, int] = False if chunksize is None else chunksize
_logger.debug(f"chunked: {chunked}")
if not paths:
if chunked is False:
Expand All @@ -473,6 +486,8 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals
path = f"{_s3_output}/{query_id}.csv"
s3.wait_objects_exist(paths=[path], use_threads=False, boto3_session=session)
_logger.debug(f"Start CSV reading from {path}")
_chunksize: Optional[int] = chunksize if isinstance(chunksize, int) else None
_logger.debug(f"_chunksize: {_chunksize}")
ret = s3.read_csv(
path=[path],
dtype=dtype,
Expand All @@ -481,7 +496,7 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals
quoting=csv.QUOTE_ALL,
keep_default_na=False,
na_values=[""],
chunksize=chunksize,
chunksize=_chunksize,
skip_blank_lines=False,
use_threads=False,
boto3_session=session,
Expand Down Expand Up @@ -565,7 +580,7 @@ def read_sql_table(
database: str,
ctas_approach: bool = True,
categories: List[str] = None,
chunksize: Optional[int] = None,
chunksize: Optional[Union[int, bool]] = None,
s3_output: Optional[str] = None,
workgroup: Optional[str] = None,
encryption: Optional[str] = None,
Expand All @@ -589,10 +604,6 @@ def read_sql_table(
CONS: Slower (But stills faster than other libraries that uses the regular Athena API)
and does not handle nested types at all

Note
----
If `chunksize` is passed, then a Generator of DataFrames is returned.

Note
----
If `ctas_approach` is True, `chunksize` will return non deterministic chunks sizes,
Expand All @@ -603,6 +614,21 @@ def read_sql_table(
Create the default Athena bucket if it doesn't exist and s3_output is None.
(E.g. s3://aws-athena-query-results-ACCOUNT-REGION/)

Note
----
``Batching`` (`chunksize` argument) (Memory Friendly):

Will anable the function to return a Iterable of DataFrames instead of a regular DataFrame.

There are two batching strategies on Wrangler:

- If **chunksize=True**, a new DataFrame will be returned for each file in the query result.

- If **chunked=INTEGER**, Wrangler will iterate on the data by number of rows igual the received INTEGER.

`P.S.` `chunksize=True` if faster and uses less memory while `chunksize=INTEGER` is more precise
in number of rows for each Dataframe.

Note
----
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
Expand All @@ -619,8 +645,10 @@ def read_sql_table(
categories: List[str], optional
List of columns names that should be returned as pandas.Categorical.
Recommended for memory restricted environments.
chunksize: int, optional
If specified, return an generator where chunksize is the number of rows to include in each chunk.
chunksize : Union[int, bool], optional
If passed will split the data in a Iterable of DataFrames (Memory friendly).
If `True` wrangler will iterate on the data by files in the most efficient way without guarantee of chunksize.
If an `INTEGER` is passed Wrangler will iterate on the data by number of rows igual the received INTEGER.
s3_output : str, optional
AWS S3 path.
workgroup : str, optional
Expand All @@ -646,6 +674,7 @@ def read_sql_table(
>>> df = wr.athena.read_sql_table(table='...', database='...')

"""
table = catalog.sanitize_table_name(table=table)
return read_sql_query(
sql=f'SELECT * FROM "{table}"',
database=database,
Expand Down
29 changes: 24 additions & 5 deletions awswrangler/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ def unload_redshift(
con: sqlalchemy.engine.Engine,
iam_role: str,
categories: List[str] = None,
chunked: bool = False,
chunked: Union[bool, int] = False,
keep_files: bool = False,
use_threads: bool = True,
boto3_session: Optional[boto3.Session] = None,
Expand All @@ -906,6 +906,22 @@ def unload_redshift(

https://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html

Note
----
``Batching`` (`chunked` argument) (Memory Friendly):

Will anable the function to return a Iterable of DataFrames instead of a regular DataFrame.

There are two batching strategies on Wrangler:

- If **chunked=True**, a new DataFrame will be returned for each file in your path/dataset.

- If **chunked=INTEGER**, Wrangler will iterate on the data by number of rows igual the received INTEGER.

`P.S.` `chunked=True` if faster and uses less memory while `chunked=INTEGER` is more precise
in number of rows for each Dataframe.


Note
----
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
Expand All @@ -926,9 +942,10 @@ def unload_redshift(
Recommended for memory restricted environments.
keep_files : bool
Should keep the stage files?
chunked : bool
If True will break the data in smaller DataFrames (Non deterministic number of lines).
Otherwise return a single DataFrame with the whole data.
chunked : Union[int, bool]
If passed will split the data in a Iterable of DataFrames (Memory friendly).
If `True` wrangler will iterate on the data by files in the most efficient way without guarantee of chunksize.
If an `INTEGER` is passed Wrangler will iterate on the data by number of rows igual the received INTEGER.
use_threads : bool
True to enable concurrent requests, False to disable multiple threads.
If enabled os.cpu_count() will be used as the max number of threads.
Expand Down Expand Up @@ -979,6 +996,7 @@ def unload_redshift(
return _read_parquet_iterator(
paths=paths,
categories=categories,
chunked=chunked,
use_threads=use_threads,
boto3_session=session,
s3_additional_kwargs=s3_additional_kwargs,
Expand All @@ -991,13 +1009,14 @@ def _read_parquet_iterator(
keep_files: bool,
use_threads: bool,
categories: List[str] = None,
chunked: Union[bool, int] = True,
boto3_session: Optional[boto3.Session] = None,
s3_additional_kwargs: Optional[Dict[str, str]] = None,
) -> Iterator[pd.DataFrame]:
dfs: Iterator[pd.DataFrame] = s3.read_parquet(
path=paths,
categories=categories,
chunked=True,
chunked=chunked,
dataset=False,
use_threads=use_threads,
boto3_session=boto3_session,
Expand Down
Loading