In [1]:
# https://docs.aws.amazon.com/athena/latest/ug/geospatial-example-queries.html
import boto3

region = boto3.session.Session().region_name
bucket='beyoung-geospatial-20200831'

In [None]:
# create s3 bucket

if region != 'us-east-1':
    !aws s3api create-bucket --bucket {bucket} --create-bucket-configuration LocationConstraint={region}
else:
    !aws s3api create-bucket --bucket {bucket}

In [None]:
## Prepare the raw data

earthquakes_csv='https://raw.githubusercontent.com/Esri/gis-tools-for-hadoop/master/samples/data/earthquake-data/earthquakes.csv'
california_counties_json='https://raw.githubusercontent.com/Esri/gis-tools-for-hadoop/master/samples/data/counties-data/california-counties.json'

In [None]:
!wget {earthquakes_csv}
!wget {california_counties_json}

In [21]:
s3_earthquakes = f's3://{bucket}/csv/earthquakes.csv'
s3_california = f's3://{bucket}/json/california-counties.json'

In [42]:
s3_earthquakes

's3://beyoung-geospatial-20200831/csv/earthquakes.csv'

In [43]:
s3_california

's3://beyoung-geospatial-20200831/json/california-counties.json'

In [None]:
!aws s3 cp earthquakes.csv {s3_earthquakes}
!aws s3 cp california-counties.json {s3_california}

## Execution on Athena

In [47]:
s3_athena=f's3://{bucket}/athena'
s3_earthquakes_loc=f's3://{bucket}/csv/'
s3_california_loc=f's3://{bucket}/json/'
db_name='geospatial_20200831'

client = boto3.client('athena')
config = {'OutputLocation': s3_athena}

In [48]:
qs=f'CREATE DATABASE IF NOT EXISTS {db_name}'
qs

'CREATE DATABASE IF NOT EXISTS geospatial_20200831'

In [49]:
response = client.start_query_execution(QueryString = qs, 
                                        ResultConfiguration = config)
response['QueryExecutionId']

'5b9685b5-3bc7-4219-8ad7-4c09635b82c8'

In [50]:
context = {'Database': db_name}

In [51]:
qs = f'''
CREATE EXTERNAL TABLE IF NOT EXISTS earthquakes
(
 earthquake_date string,
 latitude double,
 longitude double,
 depth double,
 magnitude double,
 magtype string,
 mbstations string,
 gap string,
 distance string,
 rms string,
 source string,
 eventid string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE LOCATION '{s3_earthquakes_loc}';
'''
print(qs)


CREATE EXTERNAL TABLE IF NOT EXISTS earthquakes
(
 earthquake_date string,
 latitude double,
 longitude double,
 depth double,
 magnitude double,
 magtype string,
 mbstations string,
 gap string,
 distance string,
 rms string,
 source string,
 eventid string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE LOCATION 's3://beyoung-geospatial-20200831/csv/';



In [52]:
response = client.start_query_execution(QueryString = qs,
                                        QueryExecutionContext = context,
                                        ResultConfiguration = config)
QueryExecutionId = response['QueryExecutionId']
QueryExecutionId

'7c2c7691-7c59-406e-8c3f-9bc0f001a27d'

In [53]:
## create table for countries
qs=f'''
CREATE external TABLE IF NOT EXISTS counties
(
  Name string,
  BoundaryShape binary
)
ROW FORMAT SERDE 'com.esri.hadoop.hive.serde.JsonSerde'
STORED AS INPUTFORMAT 'com.esri.json.hadoop.EnclosedJsonInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '{s3_california_loc}';
'''
print(qs)


CREATE external TABLE IF NOT EXISTS counties
(
  Name string,
  BoundaryShape binary
)
ROW FORMAT SERDE 'com.esri.hadoop.hive.serde.JsonSerde'
STORED AS INPUTFORMAT 'com.esri.json.hadoop.EnclosedJsonInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://beyoung-geospatial-20200831/json/';



In [54]:
response = client.start_query_execution(QueryString = qs,
                                        QueryExecutionContext = context,
                                        ResultConfiguration = config)
QueryExecutionId = response['QueryExecutionId']
QueryExecutionId

'63afd490-5fb9-475d-b9d4-464e8a9a937f'

In [55]:
response=client.get_query_execution(QueryExecutionId=QueryExecutionId)
response

{'QueryExecution': {'QueryExecutionId': '63afd490-5fb9-475d-b9d4-464e8a9a937f',
  'Query': "CREATE external TABLE IF NOT EXISTS counties\n(\n  Name string,\n  BoundaryShape binary\n)\nROW FORMAT SERDE 'com.esri.hadoop.hive.serde.JsonSerde'\nSTORED AS INPUTFORMAT 'com.esri.json.hadoop.EnclosedJsonInputFormat'\nOUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'\nLOCATION 's3://beyoung-geospatial-20200831/json/'",
  'StatementType': 'DDL',
  'ResultConfiguration': {'OutputLocation': 's3://beyoung-geospatial-20200831/athena/63afd490-5fb9-475d-b9d4-464e8a9a937f.txt'},
  'QueryExecutionContext': {'Database': 'geospatial_20200831'},
  'Status': {'State': 'SUCCEEDED',
   'SubmissionDateTime': datetime.datetime(2020, 8, 31, 9, 28, 59, 617000, tzinfo=tzlocal()),
   'CompletionDateTime': datetime.datetime(2020, 8, 31, 9, 29, 1, 342000, tzinfo=tzlocal())},
  'Statistics': {'EngineExecutionTimeInMillis': 1190,
   'DataScannedInBytes': 0,
   'TotalExecutionTimeInMillis': 1725,

# Query

In [56]:
qs = f'''
SELECT counties.name,
        COUNT(*) cnt
FROM counties
CROSS JOIN earthquakes
WHERE ST_CONTAINS (counties.boundaryshape, ST_POINT(earthquakes.longitude, earthquakes.latitude))
GROUP BY  counties.name
ORDER BY  cnt DESC
'''

In [57]:
response = client.start_query_execution(QueryString = qs,
                                        QueryExecutionContext = context,
                                        ResultConfiguration = config)
QueryExecutionId = response['QueryExecutionId']
QueryExecutionId

'c2d95b5c-75bd-4471-9ee2-785f678202fc'

In [64]:
response = client.get_query_execution(QueryExecutionId=QueryExecutionId)
response

{'QueryExecution': {'QueryExecutionId': 'c2d95b5c-75bd-4471-9ee2-785f678202fc',
  'Query': 'SELECT counties.name,\n        COUNT(*) cnt\nFROM counties\nCROSS JOIN earthquakes\nWHERE ST_CONTAINS (counties.boundaryshape, ST_POINT(earthquakes.longitude, earthquakes.latitude))\nGROUP BY  counties.name\nORDER BY  cnt DESC',
  'StatementType': 'DML',
  'ResultConfiguration': {'OutputLocation': 's3://beyoung-geospatial-20200831/athena/c2d95b5c-75bd-4471-9ee2-785f678202fc.csv'},
  'QueryExecutionContext': {'Database': 'geospatial_20200831'},
  'Status': {'State': 'SUCCEEDED',
   'SubmissionDateTime': datetime.datetime(2020, 8, 31, 9, 38, 54, 370000, tzinfo=tzlocal()),
   'CompletionDateTime': datetime.datetime(2020, 8, 31, 9, 39, 27, 307000, tzinfo=tzlocal())},
  'Statistics': {'EngineExecutionTimeInMillis': 32795,
   'DataScannedInBytes': 6994001,
   'TotalExecutionTimeInMillis': 32937,
   'QueryQueueTimeInMillis': 108,
   'QueryPlanningTimeInMillis': 405,
   'ServiceProcessingTimeInMillis': 

In [65]:
results_paginator = client.get_paginator('get_query_results')
results_iter = results_paginator.paginate(
    QueryExecutionId=QueryExecutionId,
    PaginationConfig={
        'PageSize': 1000
    }
)
results_iter

<botocore.paginate.PageIterator at 0x7f290f1f1fd0>

In [66]:
data_list = []
for results_page in results_iter:
    for row in results_page['ResultSet']['Rows']:
        data_list.append(row['Data'])
data_list

[[{'VarCharValue': 'name'}, {'VarCharValue': 'cnt'}],
 [{'VarCharValue': 'Kern'}, {'VarCharValue': '36'}],
 [{'VarCharValue': 'San Bernardino'}, {'VarCharValue': '35'}],
 [{'VarCharValue': 'Imperial'}, {'VarCharValue': '28'}],
 [{'VarCharValue': 'Inyo'}, {'VarCharValue': '20'}],
 [{'VarCharValue': 'Los Angeles'}, {'VarCharValue': '18'}],
 [{'VarCharValue': 'Monterey'}, {'VarCharValue': '14'}],
 [{'VarCharValue': 'Riverside'}, {'VarCharValue': '14'}],
 [{'VarCharValue': 'Santa Clara'}, {'VarCharValue': '12'}],
 [{'VarCharValue': 'San Benito'}, {'VarCharValue': '11'}],
 [{'VarCharValue': 'Fresno'}, {'VarCharValue': '11'}],
 [{'VarCharValue': 'San Diego'}, {'VarCharValue': '7'}],
 [{'VarCharValue': 'Santa Cruz'}, {'VarCharValue': '5'}],
 [{'VarCharValue': 'San Luis Obispo'}, {'VarCharValue': '3'}],
 [{'VarCharValue': 'Ventura'}, {'VarCharValue': '3'}],
 [{'VarCharValue': 'Orange'}, {'VarCharValue': '2'}],
 [{'VarCharValue': 'San Mateo'}, {'VarCharValue': '1'}]]

In [67]:
results = []
for datum in data_list[1:]:
    results.append([x['VarCharValue'] for x in datum])
results

[['Kern', '36'],
 ['San Bernardino', '35'],
 ['Imperial', '28'],
 ['Inyo', '20'],
 ['Los Angeles', '18'],
 ['Monterey', '14'],
 ['Riverside', '14'],
 ['Santa Clara', '12'],
 ['San Benito', '11'],
 ['Fresno', '11'],
 ['San Diego', '7'],
 ['Santa Cruz', '5'],
 ['San Luis Obispo', '3'],
 ['Ventura', '3'],
 ['Orange', '2'],
 ['San Mateo', '1']]

In [68]:
[tuple(x) for x in results]

[('Kern', '36'),
 ('San Bernardino', '35'),
 ('Imperial', '28'),
 ('Inyo', '20'),
 ('Los Angeles', '18'),
 ('Monterey', '14'),
 ('Riverside', '14'),
 ('Santa Clara', '12'),
 ('San Benito', '11'),
 ('Fresno', '11'),
 ('San Diego', '7'),
 ('Santa Cruz', '5'),
 ('San Luis Obispo', '3'),
 ('Ventura', '3'),
 ('Orange', '2'),
 ('San Mateo', '1')]

In [69]:
import time
def fetchall_athena(query_id, client):
    query_status = None
    while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
        query_status = client.get_query_execution(QueryExecutionId=query_id)['QueryExecution']['Status']['State']
        if query_status == 'FAILED' or query_status == 'CANCELLED':
            raise Exception('Athena query with the string "{}" failed or was cancelled'.format(query_string))
        time.sleep(3)
    results_paginator = client.get_paginator('get_query_results')
    results_iter = results_paginator.paginate(
        QueryExecutionId=query_id,
        PaginationConfig={
            'PageSize': 1000
        }
    )
    results = []
    data_list = []
    for results_page in results_iter:
        for row in results_page['ResultSet']['Rows']:
            data_list.append(row['Data'])
    for datum in data_list[1:]:
        results.append([x['VarCharValue'] for x in datum])
    return [tuple(x) for x in results]

In [70]:
results = fetchall_athena(QueryExecutionId, client)
results

[('Kern', '36'),
 ('San Bernardino', '35'),
 ('Imperial', '28'),
 ('Inyo', '20'),
 ('Los Angeles', '18'),
 ('Monterey', '14'),
 ('Riverside', '14'),
 ('Santa Clara', '12'),
 ('San Benito', '11'),
 ('Fresno', '11'),
 ('San Diego', '7'),
 ('Santa Cruz', '5'),
 ('San Luis Obispo', '3'),
 ('Ventura', '3'),
 ('Orange', '2'),
 ('San Mateo', '1')]

## Clean up

In [44]:
# Delete Table

qs='DROP TABLE IF EXISTS earthquakes'
response = client.start_query_execution(QueryString = qs,
                                        QueryExecutionContext = context,
                                        ResultConfiguration = config)
QueryExecutionId = response['QueryExecutionId']
QueryExecutionId

'8a88edce-71d4-4d78-ae57-6ee1de18037f'

In [45]:
qs='DROP TABLE IF EXISTS counties'
response = client.start_query_execution(QueryString = qs,
                                        QueryExecutionContext = context,
                                        ResultConfiguration = config)
QueryExecutionId = response['QueryExecutionId']
QueryExecutionId

'e38dc05e-1e18-43e3-87b7-fe76f4f6476e'