[![AWS Data Wrangler](_static/logo.png "AWS Data Wrangler")](https://github.com/awslabs/aws-data-wrangler)

# 6 - Amazon Athena

[Wrangler](https://github.com/awslabs/aws-data-wrangler) has two ways to run queries on Athena and fetch the result as a DataFrame:

- **ctas_approach=True** (Default)

    Wraps the query with a CTAS and then reads the table data as parquet directly from s3.
    
    `PROS`: Faster and can handle some level of nested types.
    
    `CONS`: Requires create/delete table permissions on Glue and Does not support timestamp with time zone (A temporary table will be created and then deleted immediately).

- **ctas_approach=False**

    Does a regular query on Athena and parse the regular CSV result on s3.
    
    `PROS`: Does not require create/delete table permissions on Glue and supports timestamp with time zone.
    
    `CONS`: Slower (But stills faster than other libraries that uses the regular Athena API) and does not handle nested types at all.

In [1]:
import awswrangler as wr

## Enter your bucket name:

In [2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"

········


## Checking/Creating Glue Catalog Databases

In [4]:
if "awswrangler_test" not in wr.catalog.databases().values:
    wr.catalog.create_database("awswrangler_test")

            Database                                   Description
0  aws_data_wrangler  AWS Data Wrangler Test Arena - Glue Database
1   awswrangler_test                                              
2            default                         Default Hive database


### Creating a Parquet Table from the NOAA's CSV files

[Reference](https://registry.opendata.aws/noaa-ghcn/)

In [5]:
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]

df = wr.s3.read_csv(
    path="s3://noaa-ghcn-pds/csv/189",
    names=cols,
    parse_dates=["dt", "obs_time"])  # Read 10 files from the 1890 decade (~1GB)

df

Unnamed: 0,id,dt,element,value,m_flag,q_flag,s_flag,obs_time
0,ASN00070200,1890-01-01,PRCP,0,,,a,
1,SF000782720,1890-01-01,PRCP,0,,,I,
2,CA005022790,1890-01-01,TMAX,-222,,,C,
3,CA005022790,1890-01-01,TMIN,-261,,,C,
4,CA005022790,1890-01-01,PRCP,0,,,C,
...,...,...,...,...,...,...,...,...
1276241,CA001167635,1890-12-31,SNOW,0,,,C,
1276242,ASN00019053,1890-12-31,PRCP,0,,,a,
1276243,ASN00024501,1890-12-31,PRCP,0,,,a,
1276244,SF001035700,1890-12-31,PRCP,0,,,I,


In [6]:
wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite",
    database="awswrangler_test",
    table="noaa"
);

In [7]:
wr.catalog.table(database="awswrangler_test", table="noaa")

Unnamed: 0,Column Name,Type,Partition,Comment
0,id,string,False,
1,dt,timestamp,False,
2,element,string,False,
3,value,bigint,False,
4,m_flag,string,False,
5,q_flag,string,False,
6,s_flag,string,False,
7,obs_time,string,False,


## Default reading (ctas_approach=True) 30x faster!

In [8]:
%%time

wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test")

CPU times: user 1.57 s, sys: 454 ms, total: 2.02 s
Wall time: 46.6 s


Unnamed: 0,id,dt,element,value,m_flag,q_flag,s_flag,obs_time
0,ASN00061069,1890-01-01,PRCP,0,,,a,
1,USC00212904,1890-01-01,PRCP,0,,,6,
2,USC00212904,1890-01-01,SNWD,305,,,6,
3,ASN00019052,1890-01-01,PRCP,0,,,a,
4,RSM00022112,1890-01-01,PRCP,0,,,I,
...,...,...,...,...,...,...,...,...
1276241,ASN00075035,1890-10-28,PRCP,0,,,a,
1276242,SF001988360,1890-10-28,PRCP,51,,,I,
1276243,ASN00048021,1890-10-28,PRCP,0,,,a,
1276244,USC00412758,1890-10-28,PRCP,0,,,6,


## Reading with ctas_approach=False

In [9]:
%%time

wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", ctas_approach=False)

CPU times: user 21.2 s, sys: 1.8 s, total: 23 s
Wall time: 6min 22s


Unnamed: 0,id,dt,element,value,m_flag,q_flag,s_flag,obs_time
0,ASN00070200,1890-01-01,PRCP,0,,,a,
1,SF000782720,1890-01-01,PRCP,0,,,I,
2,CA005022790,1890-01-01,TMAX,-222,,,C,
3,CA005022790,1890-01-01,TMIN,-261,,,C,
4,CA005022790,1890-01-01,PRCP,0,,,C,
...,...,...,...,...,...,...,...,...
1276241,CA006131910,1890-12-31,SNOW,0,,,C,
1276242,USC00174230,1890-12-31,TMAX,-106,,,6,
1276243,USC00174230,1890-12-31,TMIN,-244,,,6,
1276244,USC00174230,1890-12-31,PRCP,0,P,,6,


## Using categories to speed up and save memory!

In [10]:
%%time

wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", categories=["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"])

CPU times: user 748 ms, sys: 279 ms, total: 1.03 s
Wall time: 48.8 s


Unnamed: 0,id,dt,element,value,m_flag,q_flag,s_flag,obs_time
0,ASN00061069,1890-01-01,PRCP,0,,,a,
1,USC00212904,1890-01-01,PRCP,0,,,6,
2,USC00212904,1890-01-01,SNWD,305,,,6,
3,ASN00019052,1890-01-01,PRCP,0,,,a,
4,RSM00022112,1890-01-01,PRCP,0,,,I,
...,...,...,...,...,...,...,...,...
1276241,SF004323870,1890-01-03,PRCP,0,,,I,
1276242,SF001018040,1890-01-03,PRCP,0,,,I,
1276243,LG000026314,1890-01-03,PRCP,0,,,I,
1276244,CA004016320,1890-01-03,TMAX,-278,,,C,


## Batching (Good for restricted memory environments)

In [11]:
dfs = wr.athena.read_sql_query(
    "SELECT * FROM noaa",
    database="awswrangler_test",
    ctas_approach=True,
    chunksize=True  # Chunksize calculated automatically for ctas_approach.
)

for df in dfs:  # Batching
    print(len(df.index))

110592
150870
1024
1024
1012736


In [12]:
dfs = wr.athena.read_sql_query(
    "SELECT * FROM noaa",
    database="awswrangler_test",
    ctas_approach=False,
    chunksize=500_000
)

for df in dfs:  # Batching
    print(len(df.index))

1276246


## Cleaning Up S3

In [13]:
wr.s3.delete_objects(path)

## Delete table

In [14]:
wr.catalog.delete_table_if_exists(database="awswrangler_test", table="noaa")

## Delete Database

In [15]:
wr.catalog.delete_database('awswrangler_test')