In [None]:
import boto3

athena = boto3.client('athena')
glue = boto3.client('glue')

In [None]:
athena.list_data_catalogs()

In [None]:
athena.list_databases(CatalogName='AWSDataCatalog')

In [None]:
glue.create_database(
    DatabaseInput={
        'Name':'CloudyVents'
    }
)

In [None]:
athena.list_databases(CatalogName='AWSDataCatalog')

In [None]:
table_ddl = r'''
CREATE EXTERNAL TABLE `cloudyvents.indexed`(
  `eventid` string, 
  `eventtime` string, 
  `eventtype` string, 
  `eventsubject` string, 
  `eventpayload` string)
PARTITIONED BY ( 
  `year` string, 
  `month` string, 
  `day` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://kfs3-destination-dev/indexed/'
'''

In [None]:
# From https://gist.github.com/stephenconnolly1/12f9e945f266c4f64dce8aa200cb34fd
import time
import json
import pprint
import sys

def run_athena_query (query, queryparams):
    print("Executing query:\n{0}".format(query))
    response = athena.start_query_execution(
        QueryString=query,
        ResultConfiguration={
            'OutputLocation': query_output
        }
    )
    execution_id = response['QueryExecutionId']
    queryparams['execution_id'] = execution_id
    status = ''
    while True:
        stats = athena.get_query_execution(QueryExecutionId=execution_id)
        status = stats['QueryExecution']['Status']['State']
        if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
            return status
        time.sleep(0.2)  # 200ms

In [None]:
def print_results(execution_id):
    results = athena.get_query_results(QueryExecutionId=execution_id)
    print(json.dumps(results, sort_keys=True, indent=4))

In [None]:
queryparams = {}
query_output = 's3://athenaout-97068/'

In [None]:
queryparams['execution_id']=''
run_athena_query(table_ddl, queryparams)

In [None]:
print_results(queryparams['execution_id'])

In [None]:
## This command is needed to 'pick' up the data in the bucket... also had to run again when a new partition showed 
## up... would a scheduled crawler be in order?
query = 'MSCK REPAIR TABLE cloudyvents.indexed'

In [None]:
queryparams['execution_id']=''
run_athena_query(query, queryparams)

In [None]:
query = r'''
    EXPLAIN
    SELECT
        eventsubject,
        COUNT (*) as cnt
    FROM cloudyvents.indexed
    GROUP BY
        eventsubject
    '''

In [None]:
queryparams['execution_id']=''
run_athena_query(query, queryparams)

In [None]:
print_results(queryparams['execution_id'])

In [None]:
query = r'''
    EXPLAIN
    SELECT
        eventsubject,
        COUNT (*) as cnt
    FROM cloudyvents.indexed
    WHERE year='2021' and month='08' and day='08'
    GROUP BY
        eventsubject
    '''

In [None]:
queryparams['execution_id']=''
run_athena_query(query, queryparams)

In [None]:
print_results(queryparams['execution_id'])

In [None]:
query = r'''
    SELECT
        eventsubject,
        COUNT (*) as cnt
    FROM cloudyvents.indexed
    WHERE year='2021' and month='08' and day='09'
    GROUP BY
        eventsubject
    '''

In [None]:
queryparams['execution_id']=''
run_athena_query(query, queryparams)

In [None]:
print_results(queryparams['execution_id'])

In [None]:
glue.delete_table(
    DatabaseName='cloudyvents',
    Name='indexed'
)

In [None]:
glue.delete_database(
    Name='cloudyvents',
)