In [None]:
# 9 - Amazon Athena Cache
# awswrangler has a cache strategy that is disabled by default and can be enabled by passing max_cache_seconds bigger than 0 as part of the athena_cache_settings parameter. This cache strategy for Amazon Athena can help you to decrease query times and costs.

# When calling read_sql_query, instead of just running the query, we now can verify if the query has been run before. If so, and this last run was within max_cache_seconds (a new parameter to read_sql_query), we return the same results as last time if they are still available in S3. We have seen this increase performance more than 100x, but the potential is pretty much infinite.

# The detailed approach is: - When read_sql_query is called with max_cache_seconds > 0 (it defaults to 0), we check for the last queries run by the same workgroup (the most we can get without pagination). - By default it will check the last 50 queries, but you can customize it through the max_cache_query_inspections argument. - We then sort those queries based on CompletionDateTime, descending - For each of those queries, we check if their CompletionDateTime is still within the max_cache_seconds window. If so, we check if the query string is the same as now (with some smart heuristics to guarantee coverage over both ctas_approaches). If they are the same, we check if the last one’s results are still on S3, and then return them instead of re-running the query. - During the whole cache resolution phase, if there is anything wrong, the logic falls back to the usual read_sql_query path.

In [None]:
import awswrangler as wr

In [None]:
# Enter your bucket name

In [None]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"

In [None]:
# Checking/Creating Glue Catalog Databases

In [None]:
if "awswrangler_test" not in wr.catalog.databases().values:
    wr.catalog.create_database("awswrangler_test")

In [None]:
# Creating a Parquet Table from the NOAA’s CSV files

In [None]:
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
df = wr.s3.read_csv(path="s3://noaa-ghcn-pds/csv/by_year/1865.csv", names=cols, parse_dates=["dt", "obs_time"])
df

In [None]:
wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite", database="awswrangler_test", table="noaa")

In [None]:
wr.catalog.table(database="awswrangler_test", table="noaa")

In [None]:
# The test query

In [None]:
query = """
SELECT
    n1.element,
    count(1) as cnt
FROM
    noaa n1
JOIN
    noaa n2
ON
    n1.id = n2.id
GROUP BY
    n1.element
"""

In [None]:
# First execution

In [None]:
%%time
wr.athena.read_sql_query(query, database="awswrangler_test")

In [None]:
# Second execution with CACHE

In [None]:
%%time

wr.athena.read_sql_query(query, database="awswrangler_test", athena_cache_settings={"max_cache_seconds": 900})

In [None]:
# Allowing awswrangler to inspect up to 500 historical queries to find same result to reuse.¶

In [None]:
%%time

wr.athena.read_sql_query(
    query,
    database="awswrangler_test",
    athena_cache_settings={"max_cache_seconds": 900, "max_cache_query_inspections": 500},
)

In [None]:
# Cleaning Up S3

In [None]:
wr.s3.delete_objects(path)

In [None]:
# Delete table

In [None]:
wr.catalog.delete_table_if_exists(database="awswrangler_test", table="noaa")

In [None]:
# Delete Database


In [None]:
wr.catalog.delete_database("awswrangler_test")