# 2013 NYC Taxi Tipping Prediction 

The dataset being used is the 2013 NYC Taxi Dataset [https://archive.org/details/nycTaxiTripData2013](https://archive.org/details/nycTaxiTripData2013). 

In [1]:
sc

In [2]:
spark

## Import necessary libraries

In [3]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.feature import RFormula
import matplotlib.pyplot as plt
import numpy as np
import datetime

## Part 1 - Load the data, convert data types and create new features (5 points)

### Load the data

In [4]:
nyctaxi = spark.read.parquet("s3://bigdatateaching/nyctaxi-2013/merged-parquet/")

In [5]:
nyctaxi.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- surcharge: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- total: string (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_time_in_secs: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)



In [6]:
nyctaxi.count()

173185091

In [7]:
nyctaxi.show(10)

+--------------------+--------------------+---------+-------------------+------------+-----------+---------+-------+----------+------------+-----+---------+------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+
|           medallion|        hack_license|vendor_id|    pickup_datetime|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total|rate_code|store_and_fwd_flag|   dropoff_datetime|passenger_count|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|
+--------------------+--------------------+---------+-------------------+------------+-----------+---------+-------+----------+------------+-----+---------+------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+
|00005007A9F30E289...|132A7AC13C8471488...|      CMT|2013-07-30 

### Convert data types

In [8]:
nyctaxi_converted = nyctaxi.withColumn("trip_time_in_secs",nyctaxi["trip_time_in_secs"].cast("int"))
nyctaxi_converted = nyctaxi_converted.withColumn("trip_distance",nyctaxi["trip_distance"].cast("float"))
nyctaxi_converted = nyctaxi_converted.withColumn("pickup_latitude",nyctaxi["pickup_latitude"].cast("float"))
nyctaxi_converted = nyctaxi_converted.withColumn("pickup_longitude",nyctaxi["pickup_longitude"].cast("float"))
nyctaxi_converted = nyctaxi_converted.withColumn("dropoff_latitude",nyctaxi["dropoff_latitude"].cast("float"))
nyctaxi_converted = nyctaxi_converted.withColumn("dropoff_longitude",nyctaxi["dropoff_longitude"].cast("float"))
nyctaxi_converted = nyctaxi_converted.withColumn("fare_amount",nyctaxi["fare_amount"].cast("float"))
nyctaxi_converted = nyctaxi_converted.withColumn("surcharge",nyctaxi["surcharge"].cast("float"))
nyctaxi_converted = nyctaxi_converted.withColumn("mta_tax",nyctaxi["mta_tax"].cast("float"))
nyctaxi_converted = nyctaxi_converted.withColumn("tip_amount",nyctaxi["tip_amount"].cast("float"))
nyctaxi_converted = nyctaxi_converted.withColumn("tolls_amount",nyctaxi["tolls_amount"].cast("float"))
nyctaxi_converted = nyctaxi_converted.withColumn("total",nyctaxi["total"].cast("float"))
nyctaxi_converted = nyctaxi_converted.withColumn("pickup_datetime",nyctaxi["pickup_datetime"].cast("timestamp"))
nyctaxi_converted = nyctaxi_converted.withColumn("dropoff_datetime",nyctaxi["dropoff_datetime"].cast("timestamp"))
nyctaxi_converted = nyctaxi_converted.withColumn("passenger_count",nyctaxi["passenger_count"].cast("int"))

### Add new features

Add a few new fields with data derived from current fields as new features.
* A column called `pickup_hour` with the hour from `pickup_datetime`. This provides an integer from 0 to 23.
* A column called `pickup_week` with the week of the year from `pickup_datetime`. This provides an integer from 1 to 53.
* A column called `weekday` with the name of the day of the week ffrom `pickup_datetime`, in long format.
* A column called `tipped` which is an indicator of wether or not there was a tip. If the tip is 0, then it's 0, otherwise 1.

In [9]:
nyctaxi_converted=nyctaxi_converted.withColumn("pickup_hour", hour(col("pickup_datetime")))
nyctaxi_converted=nyctaxi_converted.withColumn("pickup_week", weekofyear(col("pickup_datetime")))
nyctaxi_converted=nyctaxi_converted.withColumn("weekday", date_format("pickup_datetime",'EEEE'))
nyctaxi_converted=nyctaxi_converted.withColumn("tipped", when(nyctaxi_converted["tip_amount"]>0.0, 1.0).otherwise(0.0))
nyctaxi_converted=nyctaxi_converted.withColumn("tipped", nyctaxi_converted["tipped"].cast("int"))

In [10]:
nyctaxi_converted.createOrReplaceTempView("nyctaxi_converted_tbl")

Add a new column called `time_bins` that takes the value of `pickup_hour` and buckets it according to the following rules. 
* If the value of the pickup hour is at-or-before 6am, or at-or-after 8pm, then the value is "night"
* If the value of the pickup hour is between 7am and 10am (inclusive), then the value is "am_rush"
* If the value of the pickup hour is between 11am and 3pm (inclusive), then the value is "afternoon"
* If the value of the pickup hour is between 4pm and 7pm (inclusive), then the value is "pm_rush"


In [11]:
nyctaxi_converted = sqlContext.sql("""
SELECT *,
CASE WHEN pickup_hour <= 6 or pickup_hour >= 20 THEN 'night'
     WHEN pickup_hour >= 7 and pickup_hour <= 10 THEN 'am_rush'
     WHEN pickup_hour >= 11 and pickup_hour <= 15 THEN 'afternoon'
     ELSE 'pm_rush' END AS time_bins
FROM nyctaxi_converted_tbl
""")

In [12]:
nyctaxi_converted.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- surcharge: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- total: float (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_time_in_secs: integer (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- pickup_longitude: float (nullable = true)
 |-- pickup_latitude: float (nullable = true)
 |-- dropoff_longitude: float (nullable = true)
 |-- dropoff_latitude: float (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- pickup_week: i

In [13]:
nyctaxi_converted.show(10)

+--------------------+--------------------+---------+-------------------+------------+-----------+---------+-------+----------+------------+-----+---------+------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+-----------+-----------+---------+------+---------+
|           medallion|        hack_license|vendor_id|    pickup_datetime|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total|rate_code|store_and_fwd_flag|   dropoff_datetime|passenger_count|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|pickup_hour|pickup_week|  weekday|tipped|time_bins|
+--------------------+--------------------+---------+-------------------+------------+-----------+---------+-------+----------+------------+-----+---------+------------------+-------------------+---------------+-----------------+-------------+----------------+--------------

## Part 2 - Drop unused variables and filter data for clean trips

* Keep only these fields: time_bins, tipped, weekday, pickup_week, pickup_hour, trip_distance, trip_time_in_secs, passenger_count, rate_code, total, tip_amount, fare_amount, payment_type, vendor_id
* Filter records on the following criteria:
  * Passenger count is greater than 0 and less than 8
  * Payment type is cash (CSH) or credit card (CRD)
  * Tip amount is 0 or more, but less than 30
  * Fare amount is between \\$1 or more, and less than \\$150
  * Trip distance is less than 100 miles, but more than 0
  * The time of the trip is 30 seconds or more, and less than 2 hours

In [14]:
nyctaxi_converted.createOrReplaceTempView("nyctaxi_converted_tbl2")
valid_trips = sqlContext.sql("""
SELECT time_bins, tipped, weekday, pickup_week, pickup_hour, trip_distance, trip_time_in_secs, passenger_count, rate_code, total, tip_amount, fare_amount, payment_type, vendor_id
FROM nyctaxi_converted_tbl2
WHERE (passenger_count > 0 and passenger_count < 8) and
      (payment_type = 'CSH' or payment_type = 'CRD') and
      (tip_amount >= 0 and tip_amount < 30) and
      (fare_amount >= 1 and fare_amount < 150) and
      (trip_distance > 0 and trip_distance < 100) and
      (trip_time_in_secs >= 30 and trip_time_in_secs < 7200)
""")

In [15]:
valid_trips.printSchema()

root
 |-- time_bins: string (nullable = false)
 |-- tipped: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- pickup_week: integer (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- trip_time_in_secs: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- total: float (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- vendor_id: string (nullable = true)



The number of records for `valid_trips`.

In [16]:
valid_trips.count()

171182914

The number of records that were dropped with the filter applied.

In [17]:
nyctaxi_converted.count()-valid_trips.count()

2002177

## Part 3 - Feature transformation and data preparation for modeling

In [18]:
si_1 = StringIndexer(inputCol="vendor_id", outputCol="vendor_si")

In [19]:
si_2 = StringIndexer(inputCol="rate_code", outputCol="rate_si")

In [20]:
si_3 = StringIndexer(inputCol="payment_type", outputCol="payment_si")

In [21]:
si_4 = StringIndexer(inputCol="time_bins", outputCol="time_bins_si")

In [22]:
en_1 = OneHotEncoder(inputCol="vendor_si", outputCol="vendor_vec", dropLast=False)

In [23]:
en_2 = OneHotEncoder(inputCol="rate_si", outputCol="rate_vec", dropLast=False)

In [24]:
en_3 = OneHotEncoder(inputCol="payment_si", outputCol="payment_vec", dropLast=False)

In [25]:
en_4 = OneHotEncoder(inputCol="time_bins_si", outputCol="time_bins_vec", dropLast=False)

In [26]:
encoded_final = Pipeline(stages=[si_1, en_1, si_2, en_2, si_3, en_3, si_4, en_4]).fit(valid_trips).transform(valid_trips)

Show the first 10 rows of `encoded_final`.

In [27]:
encoded_final.show(10)

+---------+------+---------+-----------+-----------+-------------+-----------------+---------------+---------+-----+----------+-----------+------------+---------+---------+-------------+-------+--------------+----------+-------------+------------+-------------+
|time_bins|tipped|  weekday|pickup_week|pickup_hour|trip_distance|trip_time_in_secs|passenger_count|rate_code|total|tip_amount|fare_amount|payment_type|vendor_id|vendor_si|   vendor_vec|rate_si|      rate_vec|payment_si|  payment_vec|time_bins_si|time_bins_vec|
+---------+------+---------+-----------+-----------+-------------+-----------------+---------------+---------+-----+----------+-----------+------------+---------+---------+-------------+-------+--------------+----------+-------------+------------+-------------+
|    night|     1|  Tuesday|         31|         22|          2.5|              679|              1|        1| 14.5|       3.0|       10.5|         CRD|      CMT|      1.0|(2,[1],[1.0])|    0.0|(20,[0],[1.0])|     

Split `encoded_final` into `train` and `test` using 90% train, 10% test and a seed of 12345. 

In [28]:
sample_data = encoded_final.sample(0.1, seed=0)
splitted_data = sample_data.randomSplit([0.9, 0.1], 12345)
train = splitted_data[0]
test = splitted_data[1]

Cache the `train` DataFrame.

In [29]:
train.cache()

DataFrame[time_bins: string, tipped: int, weekday: string, pickup_week: int, pickup_hour: int, trip_distance: float, trip_time_in_secs: int, passenger_count: int, rate_code: string, total: float, tip_amount: float, fare_amount: float, payment_type: string, vendor_id: string, vendor_si: double, vendor_vec: vector, rate_si: double, rate_vec: vector, payment_si: double, payment_vec: vector, time_bins_si: double, time_bins_vec: vector]

Cache the `test` DataFrame.

In [30]:
test.cache()

DataFrame[time_bins: string, tipped: int, weekday: string, pickup_week: int, pickup_hour: int, trip_distance: float, trip_time_in_secs: int, passenger_count: int, rate_code: string, total: float, tip_amount: float, fare_amount: float, payment_type: string, vendor_id: string, vendor_si: double, vendor_vec: vector, rate_si: double, rate_vec: vector, payment_si: double, payment_vec: vector, time_bins_si: double, time_bins_vec: vector]

Count the number of records in the `train` DataFrame.

In [31]:
train.count()

15409797

## Part 4 - Build a Logistic Regression Model to predict tipping

In this section, you will train a Logistic Regression model to predict wether or not there was a tip for each ride using the training data created in the previous section. You will build pipelines using both transformers and estimators. 

Create a `LogisticRegression` estimator 

In [32]:
log_reg = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

The Logistic Regression model we will be fitting is predicting `tipped` as a function of:
* Pickup hour
* Passenger count
* Trip time
* Trip distance
* Fare amount
* Vendor id
* Payment type
* Rate code
* Time bins



In [33]:
class_formula = RFormula(formula="tipped ~ pickup_hour + passenger_count + trip_time_in_secs + trip_distance + fare_amount + vendor_si + rate_si + payment_si + time_bins_si")

Create a PipelineModel object called model, with the stages of `class_formula` and `log_reg`.

In [34]:
model = Pipeline(stages=[class_formula, log_reg]).fit(train)

Create a `predictions` DataFrame

In [35]:
predictions = model.transform(test)

In [36]:
predictions_and_labels = predictions['label', 'prediction'].rdd

In [37]:
metrics = BinaryClassificationMetrics(predictions_and_labels) 

In [38]:
print("Area under ROC = %s" % metrics.areaUnderROC)

Area under ROC = 0.985067685211


The AUC is nearly 1 which is very high, so it means our model has a great performance in predicting the target variable.