# NYC TLC Public Dataset Glue Example #

This example analyzes [New York City Taxi and Limousine Commission Trip Record Data](https://registry.opendata.aws/nyc-tlc-trip-records-pds/) and prepares it for an example machine learning exercise. This example assumes that the NYC taxi dataset has already been crawled using a [Glue Crawler](https://docs.aws.amazon.com/glue/latest/dg/add-crawler.html) and the inferred schema is already stored in the Glue Catalog database.

To get strated, first we import Python packages to create a Glue context. 

In [None]:
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions

### Create Glue Context ###
Next we create a Glue context.

In [None]:
# Create a Glue context
glueContext = GlueContext(SparkContext.getOrCreate())

### Create Glue Dynamic Frame ###
In this example, we first create a Glue Dynamic Frame, and then get a PySpark DataFrame from the Glue Dynamic Frame. One can of course use the [Glue Dynamic Frame API](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html) for what we want to do, but we chose to use the PySpark DataFrame API, because presumably you may already be familiar with the PySpark API.

In [None]:
# Create a DynamicFrame using the uber' table
uber_dyf = glueContext.create_dynamic_frame.from_catalog(database="nyc-tlc-misc", table_name="uber_nyc_data_csv")
uber_df=uber_dyf.toDF()
print uber_df.printSchema()
uber_df.show(10)

### Clean data ###
Next, we clean the data. We drop any rows with any NULL on NaN data. 

In [None]:
# clean up data 
# remove id column as we don't need it
uber_df1=uber_df.drop(uber_df.id)

# drop all rows with any null value
uber_df1=uber_df.dropna(how='any')

# filter rows where destnation, orign and trip duration are not set to NULL
uber_df1=uber_df1.filter((uber_df1.destination_taz != 'NULL')  & 
    (uber_df1.origin_taz != 'NULL')  & 
    (uber_df1.trip_duration != 'NULL') )

# show 10 rows
uber_df1.show(10)

### Define PySpark user-defined functions ###
Below, we import relevant Python clasess for defining PySpark user-defined functions.

In [None]:
from pyspark.sql.functions import udf, to_timestamp
from pyspark.sql.types import IntegerType
from datetime import datetime

Below we define a PySpark user-defined function for extracting ordinal day of the week from pickup date timestamp.

In [None]:
# define UDF for extracting pickup day of the week from datetime

def weekday(x):
    pickup=datetime.strptime(x, '%Y-%m-%d %H:%M:%S')
    return int(pickup.date().weekday())
    
pickup_day_udf = udf(weekday, IntegerType())

Below we define a PySpark user-defined function for extracting month from the pickup date timestamp.

In [None]:
# define month udf for extracting pickup month from datetime
def month(x):
    pickup=datetime.strptime(x, '%Y-%m-%d %H:%M:%S')
    return int(pickup.date().month)
    
pickup_month_udf = udf(month, IntegerType())

Below we define a PySpark user-defined function for extracting hour of the day from the pickup date timestamp.

In [None]:
# define minutes udf for extracting pickup hour from datetime

def pickup_time(x):
    ptime = datetime.strptime(x, '%Y-%m-%d %H:%M:%S').time()
    return int(ptime.hour)
    
pickup_time_udf = udf(pickup_time, IntegerType())

SageMaker built-in XGBoost algorithm expects numeric input. So, we need to encode the pickup source and destination target zones as numbers. We define a PySpark user-defined function that encodes source and target zones as hexadecimal integers.

In [None]:
def encode_taz(x):
   return int(x, 16)

taz_udf=udf(encode_taz, IntegerType())

Below we define a PySpark user-defined function that computes duration of the trip in minutes.

In [None]:
# define duration udf for extracting duration in minutes
def duration(x):
    time=x.split(':')
    duration = int(time[0]*60) + int(time[1])
    return duration

duration_udf = udf(duration, IntegerType())

### Prepare data for SageMaker XGBoost algorithm ###
SageMaker XGBoost algorithm expects the label to be the first column. So, we transform the PySpark DataFrame to make 'duration' as the first column, because we want to train the model to predict duration. Other columns of the DataFrame are transformed using PySpark user-defined functions defined above. 

We also drop any rows with Null or NaN values as a result of transformations, and also drop any row with duration of 60 minutes or more.

In [None]:
# create a new data frame
# we want trip duration (seconds) in the first column as label for the row
# our feature vector includes origin, desination, and pickup month, day, and hour
# we will discard other columms
uber_df2 = uber_df1.select(duration_udf(uber_df1.trip_duration).alias('duration'),
    taz_udf(uber_df1.origin_taz).alias('origin'), 
    taz_udf(uber_df1.destination_taz).alias('destination'), 
    pickup_month_udf(uber_df1.pickup_datetime).alias('month'), 
    pickup_day_udf(uber_df1.pickup_datetime).alias('day'), 
    pickup_time_udf(uber_df1.pickup_datetime).alias('pickup_time'))

uber_df3 = uber_df2.dropna(how='any')
uber_df4 = uber_df3.filter(uber_df3.duration < 60)

In [None]:
# show 
uber_df4.show(10)
uber_df4.count()

### Save prepared data in S3 bucket ###
Finally, we save the transformed PySpark DataFrame in S3 bucket.

In [None]:
#save second data frame
uber_df4.write.save("s3://aws-ajayvohra-nyc-tlc-misc/glue/output/uber_nyc", format='csv', header=False)