# Setup

## Imports

In [2]:
%reload_ext autoreload
%autoreload 2

import sys
from pathlib import Path 
current_path = Path().resolve()
abs_path = str(current_path.parent)
sys.path.append(abs_path)

RAW_PATH = current_path.parent / 'data' / 'raw'
OUTPUT_PATH = current_path.parent / 'data' / 'output'

import warnings
warnings.filterwarnings('ignore')

import pandas as pd
pd.set_option('display.max_columns', None)

import plotly.graph_objs as go
import plotly.plotly as py
import plotly.offline as offline
offline.init_notebook_mode(connected=True)

from h3 import h3
import folium
from pyathena import connect
import geopandas as gpd

EXTERNAL_LOCATION = 's3://athena-fgv/'

In [3]:
!pip3.7 install h3

Collecting h3
  Using cached https://files.pythonhosted.org/packages/f3/ee/9fc225d259538f73f08d21b52aa9c64d6e911d2a358866d5c1c860c2f7c8/h3-3.4.2.tar.gz
Building wheels for collected packages: h3
  Building wheel for h3 (setup.py) ... [?25l\^C
[?25canceled


## Load cities datasets

In [4]:
cities = {'city1': pd.read_csv(RAW_PATH / 'raw_pollution_city1.csv'),
         'city2': pd.read_csv(RAW_PATH / 'raw_pollution_city2.csv')}

In [5]:
cities['city1'].head()

Unnamed: 0,Lon,Lat,NO_Med,NO2_Med,BC_Med,NO_Mean,NO2_Mean,BC_Mean,Speed_Med,Unique_Days,Unique_1Hz,AdjFac_NO,AdjFac_NO2,AdjFac_BC,Road_Type,Index_Hwy,Index_Domain,NO_SE,NO2_SE,BC_SE
0,-122.305836,37.808121,47.011244,43.184173,2.33241,72.232684,46.539966,3.375462,13.033449,82,198,1.064103,1.121613,1.117647,2,0,1,2.724004,2.080245,0.147212
1,-122.305702,37.808196,61.50476,45.305978,2.7029,82.330182,48.39179,4.476297,15.451338,60,76,1.023256,1.125,1.005076,3,0,1,6.447487,3.198008,0.423799
2,-122.305675,37.807926,65.461539,55.908574,2.5938,95.61227,59.104009,2.649551,10.947377,20,37,1.2,1.244681,1.055556,2,0,1,24.511033,10.8332,0.370535
3,-122.305652,37.807967,50.142858,41.317134,1.633319,68.687605,42.583438,3.036578,4.426794,58,256,1.092554,1.22449,1.109083,2,0,1,2.702267,1.307927,0.0988
4,-122.305496,37.808105,50.474726,39.777,2.313093,79.554567,44.614032,3.993717,10.438077,81,293,1.092929,1.065217,1.117647,2,0,1,4.02063,2.18625,0.250747


## Get H3 hexagons

In [9]:
for key in cities.keys():
    for res in range(7, 12):
        cities[key][f'h3id_{res}'] = cities[key][['Lon', 'Lat']].apply(lambda x: h3.geo_to_h3(x['Lat'], x['Lon'], res), 1)

In [10]:
for key in cities.keys():
    cities[key].to_csv(RAW_PATH / f'raw_pollution_{key}_h3.csv')

In [11]:
len(set(list(cities['city1']['h3id_9'])))

137

### Check H3 hexagons sizes

In [61]:
latitude, longitude = cities['city1']['Lat'][1], cities['city1']['Lon'][1]

m = folium.Map(
    location=[latitude, longitude],
    zoom_start=13
)

folium.PolyLine(locations=h3.h3_to_geo_boundary(cities['city1']['h3id_9'][1])).add_to(m)

# folium.Circle(location=[latitude, longitude], radius=100).add_to(m)

# for i, row in final[final['sensor_id'] == 312422.0].iterrows():

#     folium.Circle(location=[row['max_lat'], row['max_lon']]).add_to(m)

m

## Aggregate pollution data by hexagon

In [6]:
for i in range(1, 3):
    cities[f'city{i}_agg_9'] = cities[f'city{i}'].groupby('h3id_9').median()[['NO_Med', 'NO2_Med', 'BC_Med']]
    cities[f'city{i}_agg_9']['city'] = i
    cities[f'city{i}_agg_9'] = cities[f'city{i}_agg_9'].reset_index()
    cities[f'city{i}_agg_9']['polygon'] = cities[f'city{i}_agg_9']['h3id_9'].apply(h3.h3_to_geo_boundary)
    cities[f'city{i}_agg_9']['polygon'] = cities[f'city{i}_agg_9']['polygon'].apply(lambda a: 'POLYGON((' + str(a).replace(',', '').replace('[', '').replace(']', ',')[:-2] + '))')
    cities[f'city{i}_agg_9'].to_parquet(RAW_PATH / f'city{i}_agg_9.parquet')

KeyError: 'h3id_9'

## Adding aggregated data to S3 and Athena

In [153]:
import boto3
s3 = boto3.client('s3')
s3.upload_file(str(RAW_PATH) + '/city1_agg_9.parquet', 'data-producao', 'us/ca/san-francisco/pollution/city1_agg_9.parquet')
s3.upload_file(str(RAW_PATH) + '/city2_agg_9.parquet', 'data-producao', 'us/ca/san-francisco/pollution/city2_agg_9.parquet')

# Divide OSM in squares given raw pollution boundaries

## Connect With Athena

In [62]:
from pyathena import connect

conn = connect(s3_staging_dir='s3://athena-robusta/teste',
               region_name='us-east-2')

In [63]:
def zip_columns(res):
    return list(zip(map(lambda x: x[0], res.description), res.fetchall()[0]))

def execute(query, conn, get_data=True):
    cursor = conn.cursor()
    res = cursor.execute(query)
    if get_data:
        data = zip_columns(res)
        cursor.close()
        return data
    else:
        cursor.close()

## Create OSM table with relevant data

In [12]:
def hexagons_to_boundaries(df):
    
    lats = []
    lons = []
    for i in range(len(df)):
        
        try:
            a = h3.h3_to_geo_boundary(df['h3id_9'][i])
        except:
            print(h3_id)

        lats.append(list(map(lambda x: x[0], a)))
        lons.append(list(map(lambda x: x[1], a)))
        
    flatten = lambda l: [item for sublist in l for item in sublist]
    lats = flatten(lats)
    lons = flatten(lons)
    return {
     'max_lon': max(lons),
     'max_lat': max(lats),
     'min_lon': min(lons),
     'min_lat': min(lats)}


In [13]:
for i in range(1, 3):
    cities[f'boundaries{i}'] = hexagons_to_boundaries(cities[f'city{i}'])

In [23]:
{'max_lat': max(cities['boundaries1']['max_lat'], cities['boundaries2']['max_lat']),
'max_lon': max(cities['boundaries1']['max_lon'], cities['boundaries2']['max_lon']),
'min_lat': min(cities['boundaries1']['min_lat'], cities['boundaries2']['min_lat']),
'min_lon': min(cities['boundaries1']['min_lon'], cities['boundaries2']['min_lon'])
}

{'max_lat': 37.83415225556136,
 'max_lon': -122.14436018993949,
 'min_lat': 37.71854003174384,
 'min_lon': -122.30693060560719}

In [20]:
cities['boundaries1']

{'max_lat': 37.83415225556136,
 'max_lon': -122.24983597738388,
 'min_lat': 37.78934418537051,
 'min_lon': -122.30693060560719}

In [99]:
row = cities['boundaries1']

In [160]:
for i in range(1,3):
    row = cities[f'boundaries{i}']
    query = f"""
    CREATE TABLE pollution.osm_city{i} WITH (
        external_location = 's3://athena-robusta/pollution/osm_city{i}/1/',
        format = 'ORC'
    ) AS
    SELECT *
    FROM osm.planet
    WHERE lat < {row['max_lat']}
      AND lat > {row['min_lat']}
      AND lon < {row['max_lon']}
      AND lon > {row['min_lon']}
      """
    try:
        cursor = conn.cursor()
        cursor.execute(query)
        cursor.close()
    except Exception as e:
        print(e)
        continue

SYNTAX_ERROR: line 1:1: Destination table 'awsdatacatalog.pollution.osm_city1' already exists. You may need to manually clean the data at location 's3://athena-robusta/teste/tables/a48f814a-af0f-4ae1-9f8d-ae2d4efc35e7' before retrying. Athena will not delete data in your account.
SYNTAX_ERROR: line 1:1: Destination table 'awsdatacatalog.pollution.osm_city2' already exists. You may need to manually clean the data at location 's3://athena-robusta/teste/tables/fad28da6-2f8e-4db8-b94d-84d025489cff' before retrying. Athena will not delete data in your account.


In [162]:
print(query)


    CREATE TABLE pollution.osm_city2 WITH (
        external_location = 's3://athena-robusta/pollution/osm_city2/1/',
        format = 'ORC'
    ) AS
    SELECT *
    FROM osm.planet
    WHERE lat < 37.778227313337425
      AND lat > 37.71854003174384
      AND lon < -122.14436018993949
      AND lon > -122.2037841933418
      


## Associate Hexagon with OSM

In [157]:
query = """CREATE TABLE pollution.osm_hexagons_flatten WITH (
  external_location = 's3://athena-robusta/pollution/osm_hexagons_flatten/2/',
  format = 'ORC'
  ) AS
SELECT *
FROM pollution t1
JOIN (SELECT * FROM  osm_city2) t2
ON ST_WITHIN(ST_POINT(t2.lat, t2.lon), ST_POLYGON(t1.polygon))
UNION ALL
SELECT *
FROM pollution t1
JOIN (SELECT * FROM  osm_city1) t2
ON ST_WITHIN(ST_POINT(t2.lat, t2.lon), ST_POLYGON(t1.polygon))"""

## Open OSM tags

In [159]:
"""CREATE OR REPLACE VIEW osm_hexagons_open AS 
WITH
  d AS (
   SELECT
     "id"
   , "h3id_9"
   , "type"
   , "tags"
   , "map_keys"("tags") "map_keys"
   , "map_values"("tags") "map_tags"
   FROM
     pollution.osm_hexagons_flatten
) 
SELECT
  "id" "osm_id"
, "h3id_9"
, "type"
, "tags"
, "info"
, 'keys' "info_type"
FROM
  (d
CROSS JOIN UNNEST("d"."map_keys") t (info))
UNION ALL SELECT
  "id" "osm_id"
, "h3id_9"
, "type"
, "tags"
, "info"
, 'values' "info_type"
FROM
  (d
CROSS JOIN UNNEST("d"."map_tags") s (info))
"""

'CREATE OR REPLACE VIEW osm_hexagons_open AS \nWITH\n  d AS (\n   SELECT\n     "id"\n   , "h3id_9"\n   , "type"\n   , "tags"\n   , "map_keys"("tags") "map_keys"\n   , "map_values"("tags") "map_tags"\n   FROM\n     pollution.osm_hexagons_flatten\n) \nSELECT\n  "id" "osm_id"\n, "h3id_9"\n, "type"\n, "tags"\n, "info"\n, \'keys\' "info_type"\nFROM\n  (d\nCROSS JOIN UNNEST("d"."map_keys") t (info))\nUNION ALL SELECT\n  "id" "osm_id"\n, "h3id_9"\n, "type"\n, "tags"\n, "info"\n, \'values\' "info_type"\nFROM\n  (d\nCROSS JOIN UNNEST("d"."map_tags") s (info))\n'

# Add Waze data

In [None]:
"""
CREATE TABLE pollution.raw_waze_jams WITH (
  external_location = 's3://athena-robusta/pollution/raw_waze_jams/1/',
  format = 'ORC'
) AS  
    SELECT *
    FROM waze.jams
    WHERE polygon_slug = '124-usa'
    AND month = '03'
    
"""

## Reduce data to hexagons limits

In [None]:
"""CREATE TABLE pollution.raw_waze_jams_reduced WITH (
        external_location = 's3://athena-robusta/pollution/raw_waze_jams_reduced/1/',
        format = 'ORC'
    ) AS
SELECT * FROM raw_waze_jams 
    WHERE line[1].y < 37.83415225556136
      AND line[1].y > 37.71854003174384
      AND line[1].x < -122.14436018993949
      AND line[1].x > -122.30693060560719"""

In [None]:
"""CREATE TABLE pollution.raw_waze_alerts_reduced WITH (
        external_location = 's3://athena-robusta/pollution/raw_waze_alerts_reduced/2/',
        format = 'ORC'
    ) AS
SELECT * FROM raw_waze_alerts
    WHERE latitude < 37.83415225556136
      AND latitude > 37.71854003174384
      AND longitude < -122.14436018993949
      AND longitude > -122.30693060560719"""

## Associate data to hexagon

In [4]:
"""CREATE TABLE pollution.waze_jams_hexagons WITH (
  external_location = 's3://athena-robusta/pollution/waze_jams_hexagons/2/',
  format = 'ORC'
  ) AS
SELECT t1.h3id_9, t1.no2_med, t1.no_med, t1.bc_med, t1.polygon, t2.*
FROM pollution t1
JOIN (SELECT *, line[1].x as lon, line[1].y as lat FROM raw_waze_jams_reduced ) t2
ON ST_WITHIN(ST_POINT(t2.lat, t2.lon), ST_POLYGON(t1.polygon))"""

day = '1' OR
day = '2' OR
day = '3' OR
day = '4' OR
day = '5' OR
day = '6' OR
day = '7' OR
day = '8' OR
day = '9' OR
day = '10' OR
day = '11' OR
day = '12' OR
day = '13' OR
day = '14' OR
day = '15' OR
day = '16' OR
day = '17' OR
day = '18' OR
day = '19' OR
day = '20' OR
day = '21' OR
day = '22' OR
day = '23' OR
day = '24' OR
day = '25' OR
day = '26' OR
day = '27' OR
day = '28' OR
day = '29' OR


In [None]:
"""
CREATE TABLE pollution.waze_alerts_hexagons WITH (
  external_location = 's3://athena-robusta/pollution/waze_alerts_hexagons/2/',
  format = 'ORC'
  ) AS
SELECT t1.h3id_9, t1.no2_med, t1.no_med, t1.bc_med, t1.polygon, t2.*
FROM pollution t1
JOIN (SELECT * FROM raw_waze_alerts_reduced ) t2
ON ST_WITHIN(ST_POINT(t2.latitude, t2.longitude), ST_POLYGON(t1.polygon))
"""