In [1]:
#!/usr/bin/env python
# vim: set fileencoding=utf8 :
#```

#!pip install -U boto3 retrying
#!export AWS_DEFAULT_PROFILE=test

import pandas as pd
import random

## Use Athena to extract features on all data

<p>The dataset we are working with contains 55M records, making its handling too heavy for a single machine.</p>
<p>Using a distributed computing engine like&nbsp;<a href="https://aws.amazon.com/athena/">AWS Athena</a>&nbsp;will enable you to extract features and save data efficiently.&nbsp;</p>
<p>In order to work on the data, we upload it to S3, and than partition it using AWS Glue. Partitioning is critical to make Athena run efficiently. For examples on how to use Glue, go&nbsp;<a href="https://github.com/doitintl/aws-glue-workshop">HERE</a>.</p>

### Extract features 

<p>With the data partitioned (say, by year and month), run the following Athena query to extract the following features&nbsp;</p>
<p>After extracting features, partition the query results using Glue (again)</p>

CREATE DATABASE IF NOT EXISTS taxinyc;

CREATE EXTERNAL TABLE IF NOT EXISTS taxinyc.raw_data (
               key VARCHAR(255),
               fare_amount FLOAT,
               pickup_datetime VARCHAR(255),
               pickup_longitude FLOAT,
               pickup_latitude FLOAT,
               dropoff_longitude FLOAT,
               dropoff_latitude FLOAT,
               passenger_count INT
               )
               ROW FORMAT DELIMITED
               FIELDS TERMINATED BY ","
               LINES TERMINATED BY "\n"
               LOCATION 's3://aws-worskhop-data/taxi-nyc'
               TBLPROPERTIES (
               'skip.header.line.count' = '1'
               );

SELECT * FROM "taxinyc"."raw_data" limit 10;

In [2]:
SQL = '''
WITH 
    dataset AS 
    (SELECT CAST (pickup_datetime AS TIMESTAMP WITH time zone) AT TIME ZONE 'America/New_York' AS est, 
                  ST_POINT(pickup_longitude,pickup_latitude) pickup_point,
                  ST_POINT(dropoff_longitude,dropoff_latitude) dropoff_point,
                  to_unixtime( CAST (pickup_datetime AS TIMESTAMP WITH time zone) AT TIME ZONE 'America/New_York') AS                     epoch,
                  24*60*60 as seconds_in_day,
                  *
      FROM train_v3),
    
    airports AS (SELECT 
                  kv['LaGuardia'] AS LaGuardia,
                  kv['Downtown Manhattan/Wall St. Heliport'] AS Manhattan,
                  kv['John F Kennedy Intl'] AS JFK
    FROM (SELECT map_agg(name, point_location) kv
        FROM 
            (SELECT name,
         ST_POINT(longitude,
         latitude) point_location
            FROM usa_airports
            WHERE city = 'New York' )
            ))
        SELECT 
        
        -- Target
         fare_amount,
         
         -- time features
         day(est) day,
         day_of_week(est) dayofweek ,
         year(est) year ,
         month(est) month ,
         day_of_month(est) dayofmonth ,
         hour(est) hour ,
         minute(est) minute ,
         
         -- cyclclical variables
         sin(2*pi()*epoch/seconds_in_day) sin_day,
         cos(2*pi()*epoch/seconds_in_day) cos_day,
         sin(2*pi()*epoch/(seconds_in_day*7)) sin_week,
         cos(2*pi()*epoch/(seconds_in_day*7)) cos_week,
         
         
         -- Distance features
         pickup_longitude - dropoff_longitude diff_longitude,
         pickup_latitude - dropoff_latitude diff_latitude,
         ST_Distance(pickup_point, dropoff_point) dist,
         
         -- Airports features
         ST_DISTANCE(airports.LaGuardia, dropoff_point) dropoff_laguardia,
         ST_DISTANCE(airports.LaGuardia, pickup_point ) pickup_laguardia,
         ST_DISTANCE(airports.JFK, dropoff_point) dropoff_JFK,
         ST_DISTANCE(airports.JFK, pickup_point) pickup_JFK,
         ST_DISTANCE(airports.Manhattan, dropoff_point) dropoff_manhattan,
         ST_DISTANCE(airports.Manhattan, pickup_point) pickup_manhattan,
         
         -- Raw features
         pickup_longitude,
         pickup_latitude,
         dropoff_longitude,
         dropoff_latitude,
         passenger_count
         
    FROM dataset, airports
'''

In [3]:
!cat athena_taxi_raw_2014.sql


WITH 
    dataset AS 
    (SELECT CAST (pickup_datetime AS TIMESTAMP WITH time zone) AT TIME ZONE 'America/New_York' AS est, 
                  ST_POINT(pickup_longitude,pickup_latitude) pickup_point,
                  ST_POINT(dropoff_longitude,dropoff_latitude) dropoff_point,
                  to_unixtime( CAST (pickup_datetime AS TIMESTAMP WITH time zone) AT TIME ZONE 'America/New_York') AS                     epoch,
                  24*60*60 as seconds_in_day,
                  *
     FROM raw_data)
    
     SELECT
     
        -- Target
        fare_amount,
        
        -- time features
        day(est) day,
        day_of_week(est) dayofweek ,
        year(est) year ,
        month(est) month ,
        day_of_month(est) dayofmonth ,
        hour(est) hour ,
        minute(est) minute ,
         
        -- cyclclical variables
        sin(2*pi()*epoch/seconds_in_day) sin_day,
        cos(2*pi()*epoch/seconds_in_day) cos_day,
        sin(2*pi()*ep

In [4]:
!python athena.py athena_taxi_raw_2014.sql

athena_taxi_raw_2014.sql


In [5]:
path_train = 'athena_taxi_raw_2014.sql.csv'

In [6]:
df_train = pd.read_csv(path_train, skiprows=lambda i: i>0 and random.random() > 0.3, header=0)

In [26]:
df_train.to_csv('train_half_2014.csv', header=True, index=False)

In [27]:
!zip taxinyc_train_2014.csv.zip athena_taxi_raw_2014.sql.csv


zip error: Nothing to do! (taxinyc_train_2014.csv.zip)


In [28]:
#!split taxinyc_train_2014.csv.zip -b 300M ZIPCHUNKS
!split train_half_2014.csv -b 300M ZIPCHUNKS

In [29]:
!rm athena_taxi_raw_2014.sql.csv

rm: cannot remove ‘athena_taxi_raw_2014.sql.csv’: No such file or directory


In [30]:
!rm train_half_2014.sql.csv

rm: cannot remove ‘train_half_2014.sql.csv’: No such file or directory


In [31]:
!rm taxinyc_train_2014.csv.zip

rm: cannot remove ‘taxinyc_train_2014.csv.zip’: No such file or directory


In [32]:
ls_sagemaker = !ls -1

In [33]:
ls_sagemaker

['athena.log',
 'athena.py',
 'athena_taxi_raw_2014.sql',
 'athena_taxi_raw_2014.sql.log',
 'athena_taxi_raw_2015.sql',
 'athena_taxi_raw_2015.sql.log',
 'athena_taxi_raw.sql',
 'athena_taxi_raw.sql.log',
 'carparts49',
 'foo.sql',
 'foo.sql.csv',
 'foo.sql.log',
 'lost+found',
 'run_athena_query.ipynb',
 'run_athena_query_train.ipynb',
 'run_athena_query_validation.ipynb',
 'taxi_fare_prediction_dataframe.ipynb',
 'taxi_fare_prediction_dataframe_monotonic.ipynb',
 'taxi_fare_prediction_debug.ipynb',
 'taxi_fare_prediction.ipynb',
 'taxi_fare_prediction_original.ipynb',
 'train_half_2014.csv',
 'train_small_2014.csv',
 'validate_small_2015.csv',
 'ZIPCHUNKSaa',
 'ZIPCHUNKSab',
 'ZIPCHUNKSac',
 'ZIPCHUNKSad',
 'ZIPCHUNKSae',
 'ZIPCHUNKSaf',
 'ZIPCHUNKSag',
 'ZIPCHUNKSah',
 'ZIPCHUNKSai',
 'ZIPCHUNKSaj',
 'ZIPCHUNKSak']

athena.log  # program log
athena.py   # main program
foo.sql     # query execution result
foo.sql.csv # sql output

In [34]:
from s3fs.core import S3FileSystem
import os

s3 = S3FileSystem(anon=False)
bucket = 'aws-worskhop-data'

file_list = ls_sagemaker
subs = 'ZIPCHUNKS'
ZIPCHUNKS_list = [i for i in file_list if subs in i] 

In [35]:
ZIPCHUNKS_list

['ZIPCHUNKSaa',
 'ZIPCHUNKSab',
 'ZIPCHUNKSac',
 'ZIPCHUNKSad',
 'ZIPCHUNKSae',
 'ZIPCHUNKSaf',
 'ZIPCHUNKSag',
 'ZIPCHUNKSah',
 'ZIPCHUNKSai',
 'ZIPCHUNKSaj',
 'ZIPCHUNKSak']

In [36]:
bucket = 'aws-worskhop-data'
file_path = 'train_chunks/' 
s3_path = os.path.join('s3://', bucket, file_path)

In [37]:
s3_path

's3://aws-worskhop-data/train_chunks/'

In [39]:
for file in ZIPCHUNKS_list: 
    local_path = os.path.join('./', file)
    print (local_path)
    !aws s3 cp $local_path $s3_path 
    !rm $local_path

./ZIPCHUNKSaa
upload: ./ZIPCHUNKSaa to s3://aws-worskhop-data/train_chunks/ZIPCHUNKSaa
./ZIPCHUNKSab
upload: ./ZIPCHUNKSab to s3://aws-worskhop-data/train_chunks/ZIPCHUNKSab
./ZIPCHUNKSac
upload: ./ZIPCHUNKSac to s3://aws-worskhop-data/train_chunks/ZIPCHUNKSac
./ZIPCHUNKSad
upload: ./ZIPCHUNKSad to s3://aws-worskhop-data/train_chunks/ZIPCHUNKSad
./ZIPCHUNKSae
upload: ./ZIPCHUNKSae to s3://aws-worskhop-data/train_chunks/ZIPCHUNKSae
./ZIPCHUNKSaf
upload: ./ZIPCHUNKSaf to s3://aws-worskhop-data/train_chunks/ZIPCHUNKSaf
./ZIPCHUNKSag
upload: ./ZIPCHUNKSag to s3://aws-worskhop-data/train_chunks/ZIPCHUNKSag
./ZIPCHUNKSah
upload: ./ZIPCHUNKSah to s3://aws-worskhop-data/train_chunks/ZIPCHUNKSah
./ZIPCHUNKSai
upload: ./ZIPCHUNKSai to s3://aws-worskhop-data/train_chunks/ZIPCHUNKSai
./ZIPCHUNKSaj
upload: ./ZIPCHUNKSaj to s3://aws-worskhop-data/train_chunks/ZIPCHUNKSaj
./ZIPCHUNKSak
upload: ./ZIPCHUNKSak to s3://aws-worskhop-data/train_chunks/ZIPCHUNKSak


In [40]:
df_train.columns

Index(['fare_amount', 'day', 'dayofweek', 'year', 'month', 'dayofmonth',
       'hour', 'minute', 'sin_day', 'cos_day', 'sin_week', 'cos_week',
       'diff_longitude', 'diff_latitude', 'dist', 'pickup_datetime',
       'pickup_longitude', 'pickup_latitude', 'dropoff_longitude',
       'dropoff_latitude', 'passenger_count'],
      dtype='object')

In [41]:
df_train_part = pd.read_csv('train_half_2014.csv')
df_train_part

Unnamed: 0,fare_amount,day,dayofweek,year,month,dayofmonth,hour,minute,sin_day,cos_day,...,cos_week,diff_longitude,diff_latitude,dist,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
0,10.00,19,7,2014,1,19,19,4,0.019561,0.999809,...,-0.899753,-0.004860,0.021500,0.022042,2014-01-20 00:04:29 UTC,-73.990730,40.750916,-73.985870,40.729416,1
1,6.50,17,7,2014,8,17,4,26,0.802123,-0.597159,...,-0.991172,0.013344,0.012108,0.018018,2014-08-17 08:26:40 UTC,-73.967730,40.792984,-73.981070,40.780876,1
2,9.70,17,1,2011,10,17,23,4,0.720248,0.693716,...,-0.109301,-0.023087,-0.043823,0.049532,2011-10-18 03:04:18 UTC,-73.993190,40.752663,-73.970100,40.796486,1
3,8.00,17,1,2014,11,17,8,43,-0.437979,-0.898985,...,-0.571583,0.010902,0.003632,0.011491,2014-11-17 13:43:54 UTC,-73.962300,40.767963,-73.973206,40.764330,1
4,4.50,4,3,2013,12,4,6,28,0.135499,-0.990777,...,0.892375,-0.004852,-0.006657,0.008237,2013-12-04 11:28:51 UTC,-73.985990,40.740680,-73.981140,40.747337,1
5,15.50,24,4,2013,1,24,3,36,0.777146,-0.629320,...,0.948718,0.042549,0.017666,0.046071,2013-01-24 08:36:00 UTC,-73.957770,40.779250,-74.000320,40.761585,3
6,38.10,8,3,2010,9,8,0,10,0.888317,0.459231,...,0.737551,-0.027031,-0.023380,0.035739,2010-09-08 04:10:39 UTC,-74.002010,40.740856,-73.974976,40.764236,1
7,32.50,7,2,2014,1,7,3,49,0.737572,-0.675268,...,0.105706,-0.056427,-0.079720,0.097669,2014-01-07 08:49:54 UTC,-74.003845,40.725643,-73.947420,40.805363,1
8,10.90,12,2,2010,1,12,13,38,-0.985801,0.167916,...,0.455471,-0.005615,-0.030598,0.031109,2010-01-12 18:38:40 UTC,-73.987850,40.741276,-73.982230,40.771873,1
9,8.10,1,4,2011,12,1,3,44,0.753946,-0.656937,...,0.947077,0.015770,-0.012543,0.020150,2011-12-01 08:44:16 UTC,-73.968260,40.759174,-73.984030,40.771717,1
