Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 26 additions & 22 deletions awswrangler/athena/_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -1357,7 +1357,7 @@ def read_sql_table(
- `Global Configurations <https://aws-sdk-pandas.readthedocs.io/en/3.0.0rc2/
tutorials/021%20-%20Global%20Configurations.html>`_

**There are two approaches to be defined through ctas_approach parameter:**
**There are three approaches available through ctas_approach and unload_approach parameters:**

**1** - ctas_approach=True (Default):

Expand All @@ -1375,8 +1375,26 @@ def read_sql_table(
- Does not support columns with repeated names.
- Does not support columns with undefined data types.
- A temporary table will be created and then deleted immediately.
- Does not support custom data_source/catalog_id.

**2** - unload_approach=True and ctas_approach=False:

Does an UNLOAD query on Athena and parse the Parquet result on s3.

PROS:

- Faster for mid and big result sizes.
- Can handle some level of nested types.
- Does not modify Glue Data Catalog

CONS:

**2** - ctas_approach=False:
- Output S3 path must be empty.
- Does not support timestamp with time zone.
- Does not support columns with repeated names.
- Does not support columns with undefined data types.

**3** - ctas_approach=False:

Does a regular query on Athena and parse the regular CSV result on s3.

Expand All @@ -1385,6 +1403,7 @@ def read_sql_table(
- Faster for small result sizes (less latency).
- Does not require create/delete table permissions on Glue
- Supports timestamp with time zone.
- Support custom data_source/catalog_id.

CONS:

Expand Down Expand Up @@ -1423,16 +1442,15 @@ def read_sql_table(

There are two batching strategies:

- If **chunksize=True**, depending on the size of the data, one or more data frames will be
returned per each file in the query result.
Unlike **chunksize=INTEGER**, rows from different files will not be mixed in the resulting data frames.
- If **chunksize=True**, depending on the size of the data, one or more data frames are returned per file in the query result.
Unlike **chunksize=INTEGER**, rows from different files are not mixed in the resulting data frames.

- If **chunksize=INTEGER**, awswrangler will iterate on the data by number of rows egual the received INTEGER.
- If **chunksize=INTEGER**, awswrangler iterates on the data by number of rows equal to the received INTEGER.

`P.S.` `chunksize=True` is faster and uses less memory while `chunksize=INTEGER` is more precise
in number of rows for each Dataframe.
in number of rows for each data frame.

`P.P.S.` If `ctas_approach=False` and `chunksize=True`, you will always receive an interador with a
`P.P.S.` If `ctas_approach=False` and `chunksize=True`, you will always receive an iterator with a
single DataFrame because regular Athena queries only produces a single output file.

Note
Expand Down Expand Up @@ -1473,20 +1491,6 @@ def read_sql_table(
For SSE-KMS, this is the KMS key ARN or ID.
keep_files : bool
Should awswrangler delete or keep the staging files produced by Athena?
ctas_database : str, optional
The name of the alternative database where the CTAS temporary table is stored.
If None, the default `database` is used.
ctas_temp_table_name : str, optional
The name of the temporary table and also the directory name on S3 where the CTAS result is stored.
If None, it will use the follow random pattern: `f"temp_table_{uuid.uuid4().hex}"`.
On S3 this directory will be under under the pattern: `f"{s3_output}/{ctas_temp_table_name}/"`.
ctas_bucketing_info: Tuple[List[str], int], optional
Tuple consisting of the column names used for bucketing as the first element and the number of buckets as the
second element.
Only `str`, `int` and `bool` are supported as column data types for bucketing.
ctas_write_compression: str, optional
Write compression for the temporary table where the CTAS result is stored.
Corresponds to the `write_compression` parameters for CREATE TABLE AS statement in Athena.
use_threads : bool, int
True to enable concurrent requests, False to disable multiple threads.
If enabled os.cpu_count() will be used as the max number of threads.
Expand Down
2 changes: 1 addition & 1 deletion awswrangler/distributed/ray/modin/s3/_write_text.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def _to_text_distributed( # pylint: disable=unused-argument
# Create Ray Dataset
ds = _ray_dataset_from_df(df)

# Repartition into a single block if or writing into a single key or if bucketing is enabled
# Repartition into a single block if writing into a single key or if bucketing is enabled
if ds.count() > 0 and path:
ds = ds.repartition(1)
_logger.warning(
Expand Down
26 changes: 9 additions & 17 deletions tests/load/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ def _modin_repartition(df: pd.DataFrame, num_blocks: int) -> pd.DataFrame:
return dataset.to_modin()


@pytest.mark.repeat(1)
@pytest.mark.parametrize("benchmark_time", [150])
def test_s3_select(benchmark_time: float, request: pytest.FixtureRequest) -> None:
paths = [f"s3://ursa-labs-taxi-data/2018/{i}/data.parquet" for i in range(10, 13)]
Expand Down Expand Up @@ -96,7 +95,7 @@ def test_s3_read_parquet_many_files(
assert timer.elapsed_time < benchmark_time


@pytest.mark.parametrize("benchmark_time", [30])
@pytest.mark.parametrize("benchmark_time", [40])
def test_s3_read_parquet_partition_filter(benchmark_time: float, request: pytest.FixtureRequest) -> None:
path = "s3://amazon-reviews-pds/parquet/"
with ExecutionTimer(request, data_paths=path) as timer:
Expand Down Expand Up @@ -167,18 +166,15 @@ def test_s3_write_parquet_blocks(

@pytest.mark.parametrize("benchmark_time", [5])
def test_s3_delete_objects(path: str, path2: str, benchmark_time: float, request: pytest.FixtureRequest) -> None:
df = pd.DataFrame({"id": [1, 2, 3]})
objects_per_bucket = 505
paths1 = [f"{path}delete-test{i}.json" for i in range(objects_per_bucket)]
paths2 = [f"{path2}delete-test{i}.json" for i in range(objects_per_bucket)]
df = pd.DataFrame({"id": range(0, 505)})
paths1 = wr.s3.to_parquet(df=df, path=path, max_rows_by_file=1)["paths"]
paths2 = wr.s3.to_parquet(df=df, path=path2, max_rows_by_file=1)["paths"]
paths = paths1 + paths2
for path in paths:
wr.s3.to_csv(df, path)
with ExecutionTimer(request) as timer:
wr.s3.delete_objects(path=paths)
assert timer.elapsed_time < benchmark_time
assert len(wr.s3.list_objects(f"{path}delete-test*")) == 0
assert len(wr.s3.list_objects(f"{path2}delete-test*")) == 0
assert len(wr.s3.list_objects(path)) == 0
assert len(wr.s3.list_objects(path2)) == 0


@pytest.mark.parametrize("benchmark_time", [20])
Expand Down Expand Up @@ -226,16 +222,12 @@ def test_s3_write_json(
@pytest.mark.timeout(300)
@pytest.mark.parametrize("benchmark_time", [15])
def test_wait_object_exists(path: str, benchmark_time: int, request: pytest.FixtureRequest) -> None:
df = pd.DataFrame({"c0": [0, 1, 2], "c1": [3, 4, 5]})

num_objects = 200
file_paths = [f"{path}{i}.txt" for i in range(num_objects)]
df = pd.DataFrame({"c0": range(0, 200)})

for file_path in file_paths:
wr.s3.to_csv(df, file_path, index=True)
paths = wr.s3.to_parquet(df=df, path=path, max_rows_by_file=1)["paths"]

with ExecutionTimer(request) as timer:
wr.s3.wait_objects_exist(file_paths, parallelism=16)
wr.s3.wait_objects_exist(paths, parallelism=16)

assert timer.elapsed_time < benchmark_time

Expand Down
10 changes: 1 addition & 9 deletions tests/unit/test_athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ def test_athena_create_ctas(path, glue_table, glue_table2, glue_database, glue_c

@pytest.mark.xfail(is_ray_modin, raises=AssertionError, reason="Index equality regression")
def test_athena(path, glue_database, glue_table, kms_key, workgroup0, workgroup1):
wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table)
wr.s3.to_parquet(
df=get_df(),
path=path,
Expand Down Expand Up @@ -301,7 +300,6 @@ def test_athena(path, glue_database, glue_table, kms_key, workgroup0, workgroup1


def test_read_sql_query_parameter_formatting_respects_prefixes(path, glue_database, glue_table, workgroup0):
wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table)
wr.s3.to_parquet(
df=get_df(),
path=path,
Expand Down Expand Up @@ -329,7 +327,6 @@ def test_read_sql_query_parameter_formatting_respects_prefixes(path, glue_databa
[("string", "Seattle"), ("date", datetime.date(2020, 1, 1)), ("bool", True), ("category", 1.0)],
)
def test_read_sql_query_parameter_formatting(path, glue_database, glue_table, workgroup0, col_name, col_value):
wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table)
wr.s3.to_parquet(
df=get_df(),
path=path,
Expand All @@ -354,7 +351,6 @@ def test_read_sql_query_parameter_formatting(path, glue_database, glue_table, wo

@pytest.mark.parametrize("col_name", [("string"), ("date"), ("bool"), ("category")])
def test_read_sql_query_parameter_formatting_null(path, glue_database, glue_table, workgroup0, col_name):
wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table)
wr.s3.to_parquet(
df=get_df(),
path=path,
Expand All @@ -377,6 +373,7 @@ def test_read_sql_query_parameter_formatting_null(path, glue_database, glue_tabl
assert len(df.index) == 1


@pytest.mark.xfail(raises=botocore.exceptions.ClientError, reason="QueryId not found.")
def test_athena_query_cancelled(glue_database):
query_execution_id = wr.athena.start_query_execution(
sql="SELECT " + "rand(), " * 10000 + "rand()", database=glue_database
Expand Down Expand Up @@ -551,7 +548,6 @@ def test_category(path, glue_table, glue_database):
)
for df2 in dfs:
ensure_data_types_category(df2)
assert wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table) is True


@pytest.mark.parametrize("workgroup", [None, 0, 1, 2, 3])
Expand Down Expand Up @@ -784,7 +780,6 @@ def test_bucketing_catalog_parquet_table(path, glue_database, glue_table):
table = next(wr.catalog.get_tables(name_contains=glue_table))
assert table["StorageDescriptor"]["NumberOfBuckets"] == nb_of_buckets
assert table["StorageDescriptor"]["BucketColumns"] == bucket_cols
assert wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table)


@pytest.mark.parametrize("bucketing_data", [[0, 1, 2], [False, True, False], ["b", "c", "d"]])
Expand Down Expand Up @@ -873,7 +868,6 @@ def test_bucketing_catalog_csv_table(path, glue_database, glue_table):
table = next(wr.catalog.get_tables(name_contains=glue_table))
assert table["StorageDescriptor"]["NumberOfBuckets"] == nb_of_buckets
assert table["StorageDescriptor"]["BucketColumns"] == bucket_cols
assert wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table)


@pytest.mark.parametrize("bucketing_data", [[0, 1, 2], [False, True, False], ["b", "c", "d"]])
Expand Down Expand Up @@ -1196,7 +1190,6 @@ def test_bucketing_combined_csv_saving(path, glue_database, glue_table):


def test_start_query_execution_wait(path, glue_database, glue_table):
wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table)
wr.s3.to_parquet(
df=get_df(),
path=path,
Expand Down Expand Up @@ -1358,7 +1351,6 @@ def test_get_query_execution(workgroup0, workgroup1):

@pytest.mark.parametrize("compression", [None, "snappy", "gzip"])
def test_read_sql_query_ctas_write_compression(path, glue_database, glue_table, compression):
wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table)
wr.s3.to_parquet(
df=get_df(),
path=path,
Expand Down
64 changes: 20 additions & 44 deletions tests/unit/test_athena_csv.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import csv
import logging
from sys import version_info

import boto3
import pyarrow as pa
Expand Down Expand Up @@ -290,7 +289,6 @@ def test_csv_catalog(path, glue_table, glue_database, use_threads, concurrent_pa
assert len(df2.columns) == 11
assert df2["id"].sum() == 6
ensure_data_types_csv(df2)
assert wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table) is True


@pytest.mark.parametrize("use_threads", [True, False])
Expand Down Expand Up @@ -377,7 +375,6 @@ def test_athena_csv_types(path, glue_database, glue_table):
assert len(df2.columns) == 10
assert df2["id"].sum() == 6
ensure_data_types_csv(df2)
assert wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table) is True


@pytest.mark.parametrize("use_threads", [True, False])
Expand Down Expand Up @@ -457,47 +454,26 @@ def test_failing_catalog(path, glue_table, use_threads):
@pytest.mark.parametrize("concurrent_partitioning", [True, False])
@pytest.mark.parametrize("compression", ["gzip", "bz2", None])
def test_csv_compressed(path, glue_table, glue_database, use_threads, concurrent_partitioning, compression):
df = get_df_csv()
if version_info < (3, 7) and compression:
with pytest.raises(wr.exceptions.InvalidArgument):
wr.s3.to_csv(
df=df,
path=path,
sep="\t",
index=True,
use_threads=use_threads,
boto3_session=None,
s3_additional_kwargs=None,
dataset=True,
partition_cols=["par0", "par1"],
mode="overwrite",
table=glue_table,
database=glue_database,
concurrent_partitioning=concurrent_partitioning,
compression=compression,
)
else:
wr.s3.to_csv(
df=df,
path=path,
sep="\t",
index=True,
use_threads=use_threads,
boto3_session=None,
s3_additional_kwargs=None,
dataset=True,
partition_cols=["par0", "par1"],
mode="overwrite",
table=glue_table,
database=glue_database,
concurrent_partitioning=concurrent_partitioning,
compression=compression,
)
df2 = wr.athena.read_sql_table(glue_table, glue_database)
assert df2.shape == (3, 11)
assert df2["id"].sum() == 6
ensure_data_types_csv(df2)
assert wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table) is True
wr.s3.to_csv(
df=get_df_csv(),
path=path,
sep="\t",
index=True,
use_threads=use_threads,
boto3_session=None,
s3_additional_kwargs=None,
dataset=True,
partition_cols=["par0", "par1"],
mode="overwrite",
table=glue_table,
database=glue_database,
concurrent_partitioning=concurrent_partitioning,
compression=compression,
)
df2 = wr.athena.read_sql_table(glue_table, glue_database)
assert df2.shape == (3, 11)
assert df2["id"].sum() == 6
ensure_data_types_csv(df2)


@pytest.mark.parametrize("use_threads", [True, False])
Expand Down
15 changes: 1 addition & 14 deletions tests/unit/test_athena_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ def test_parquet_catalog(path, path2, glue_table, glue_table2, glue_database):
assert len(columns_types) == 18
assert len(partitions_types) == 2
assert len(partitions_values) == 2
assert wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table) is True
assert wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table2) is True


@pytest.mark.parametrize("use_threads", [True, False])
Expand Down Expand Up @@ -163,8 +161,6 @@ def test_parquet_catalog_casting(path, glue_database, glue_table):
df = wr.athena.read_sql_table(table=glue_table, database=glue_database, ctas_approach=False)
assert df.shape == (3, 16)
ensure_data_types(df=df, has_list=False)
wr.s3.delete_objects(path=path)
assert wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table) is True


def test_parquet_catalog_casting_to_string_with_null(path, glue_table, glue_database):
Expand Down Expand Up @@ -208,8 +204,6 @@ def test_parquet_compress(path, glue_table, glue_database, compression):
df2 = wr.athena.read_sql_table(glue_table, glue_database)
ensure_data_types(df2)
df2 = wr.s3.read_parquet(path=path)
wr.s3.delete_objects(path=path)
assert wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table) is True
ensure_data_types(df2)


Expand Down Expand Up @@ -242,7 +236,6 @@ def test_parquet_char_length(path, glue_database, glue_table):
@pytest.mark.parametrize("col2", [[1, 1, 1, 1, 1], [1, 2, 3, 4, 5], [1, 1, 1, 1, 2], [1, 2, 2, 2, 2]])
@pytest.mark.parametrize("chunked", [True, 1, 2, 100])
def test_parquet_chunked(path, glue_database, glue_table, col2, chunked):
wr.s3.delete_objects(path=path)
values = list(range(5))
df = pd.DataFrame({"col1": values, "col2": col2})
wr.s3.to_parquet(
Expand Down Expand Up @@ -274,12 +267,9 @@ def test_parquet_chunked(path, glue_database, glue_table, col2, chunked):
assert chunked == len(df2)
assert chunked >= len(dfs[-1])

assert wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table) is True


@pytest.mark.xfail(is_ray_modin, raises=AssertionError, reason="Issue since upgrading to Ray 2.3")
@pytest.mark.xfail(is_ray_modin, raises=AssertionError, reason="Issue since upgrading to PyArrow 11.0")
def test_unsigned_parquet(path, glue_database, glue_table):
wr.s3.delete_objects(path=path)
df = pd.DataFrame({"c0": [0, 0, (2**8) - 1], "c1": [0, 0, (2**16) - 1], "c2": [0, 0, (2**32) - 1]})
df["c0"] = df.c0.astype("uint8")
df["c1"] = df.c1.astype("uint16")
Expand Down Expand Up @@ -317,9 +307,6 @@ def test_unsigned_parquet(path, glue_database, glue_table):
mode="overwrite",
)

wr.s3.delete_objects(path=path)
wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table)


def test_parquet_overwrite_partition_cols(path, glue_database, glue_table):
df = pd.DataFrame({"c0": [1, 2, 1, 2], "c1": [1, 2, 1, 2], "c2": [2, 1, 2, 1]})
Expand Down
Loading