# Predict NYC Taxi Tips using Spark ML and Azure Data Lake

The notebook ingests, visualizes, prepares and then trains a model based on an Open Dataset that tracks NYC Yellow Taxi trips and various attributes around them.
The goal is to predict for a given trip whether there will be a trip or not.


In [31]:
import matplotlib.pyplot as plt
import pandas as pd

from pyspark.sql.functions import unix_timestamp

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pyspark.ml.feature import RFormula
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from datetime import datetime

## Ingest Data¶ 

Get a sample data of nyc yellow taxi to make it faster/easier to evaluate different approaches to prep for the modelling phase later in the notebook.

In [2]:
# Primary storage info
account_name = ''
container_name = ''
relative_path = ''

adls_path = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, relative_path)
print('Primary storage account path: ' + adls_path)

In [5]:
#Define schema of csv file
schema = StructType() \
      .add("VendorID",IntegerType(),True) \
      .add("tpep_pickup_datetime",StringType(),True) \
      .add("tpep_dropoff_datetime",StringType(),True) \
      .add("passenger_count",IntegerType(),True) \
      .add("trip_distance",DoubleType(),True) \
      .add("RatecodeID",StringType(),True) \
      .add("store_and_fwd_flag",StringType(),True) \
      .add("PULocationID",IntegerType(),True) \
      .add("DOLocationID",IntegerType(),True) \
      .add("payment_type",IntegerType(),True) \
      .add("fare_amount",DoubleType(),True) \
      .add("extra",DoubleType(),True) \
      .add("mta_tax",DoubleType(),True) \
      .add("tip_amount",DoubleType(),True) \
      .add("tolls_amount",DoubleType(),True) \
      .add("improvement_surcharge",DoubleType(),True) \
      .add("total_amount",DoubleType(),True) \

# Read data from path into dataframe    
nyc_tlc_df = spark.read.format("csv") \
      .option("header", True) \
      .schema(schema) \
      .load(adls_path)

In [6]:
# Display 5 rows
nyc_tlc_df.show(5, truncate = False)

In [8]:
#To make development easier, faster and less expensive downsample for now
sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)

In [9]:
#The charting package needs a Pandas dataframe or numpy array do the conversion
sampled_taxi_pd_df = sampled_taxi_df.toPandas()

In [16]:
#Converting string to datetime
sampled_taxi_pd_df['tpep_pickup_datetime'] = pd.to_datetime(sampled_taxi_pd_df['tpep_pickup_datetime'], format='%Y-%m-%d %H:%M:%S')
sampled_taxi_pd_df['tpep_dropoff_datetime'] = pd.to_datetime(sampled_taxi_pd_df['tpep_dropoff_datetime'], format='%Y-%m-%d %H:%M:%S')

In [34]:
#Print information about dataframe schema
print(sampled_taxi_pd_df.info())

## Exploratory Data Analysis

Look at the data and evaluate its suitability for use in a model, do this via some basic charts focussed on tip values and relationships.

In [19]:
# Look at tips by amount count histogram
ax1 = sampled_taxi_pd_df['tip_amount'].plot(kind='hist', bins=25, facecolor='lightblue')
ax1.set_title('Tip amount distribution')
ax1.set_xlabel('Tip Amount ($)')
ax1.set_ylabel('Counts')
plt.suptitle('')
plt.show()

# How many passengers tip'd by various amounts
ax2 = sampled_taxi_pd_df.boxplot(column=['tip_amount'], by=['passenger_count'])
ax2.set_title('Tip amount by Passenger count')
ax2.set_xlabel('Passenger count') 
ax2.set_ylabel('Tip Amount ($)')
plt.suptitle('')
plt.show()

# Look at the relationship between fare and tip amounts
ax = sampled_taxi_pd_df.plot(kind='scatter', x= 'fare_amount', y = 'tip_amount', c='blue', alpha = 0.10, s=2.5*(sampled_taxi_pd_df['passenger_count']))
ax.set_title('Tip amount by Fare amount')
ax.set_xlabel('Fare Amount ($)')
ax.set_ylabel('Tip Amount ($)')
plt.axis([-2, 80, -2, 20])
plt.suptitle('')
plt.show()

## Data Prep and Featurization

It's clear from the visualizations above that there are a bunch of outliers in the data. These will need to be filtered out in addition there are extra variables that are not going to be useful in the model we build at the end.

Finally there is a need to create some new (derived) variables that will work better with the model.


In [22]:
taxi_df = sampled_taxi_df.select('total_amount', 'fare_amount', 'tip_amount', 'payment_type', 'RatecodeID', 'passenger_count'\
                                , 'trip_distance', 'tpep_pickup_datetime', 'tpep_dropoff_datetime'\
                                , date_format('tpep_pickup_datetime', 'hh').alias('pickupHour')\
                                , date_format('tpep_pickup_datetime', 'EEEE').alias('weekdayString')\
                                , (unix_timestamp(col('tpep_dropoff_datetime')) - unix_timestamp(col('tpep_pickup_datetime'))).alias('tripTimeSecs')\
                                , (when(col('tip_amount') > 0, 1).otherwise(0)).alias('tipped')
                                )\
                        .filter((sampled_taxi_df.passenger_count > 0) & (sampled_taxi_df.passenger_count < 8)\
                                & (sampled_taxi_df.tip_amount >= 0) & (sampled_taxi_df.tip_amount <= 25)\
                                & (sampled_taxi_df.fare_amount >= 1) & (sampled_taxi_df.fare_amount <= 250)\
                                & (sampled_taxi_df.tip_amount < sampled_taxi_df.fare_amount)\
                                & (sampled_taxi_df.trip_distance > 0) & (sampled_taxi_df.trip_distance <= 100)\
                                & (sampled_taxi_df.RatecodeID <= 5)
                                & (sampled_taxi_df.payment_type.isin({"1", "2"}))
                                )

## Data Prep and Featurization Part 2

Having created new variables its now possible to drop the columns they were derived from so that the dataframe that goes into the model is the smallest in terms of number of variables, that is required.

Also create some more features based on new columns from the first round.


In [24]:
taxi_featurised_df = taxi_df.select('total_amount', 'fare_amount', 'tip_amount', 'payment_type', 'passenger_count'\
                                                , 'trip_distance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

## Encoding

Different ML algorithms support different types of input, for this example Logistic Regression is being used for Binary Classification. This means that any Categorical (string) variables must be converted to numbers.

The process is not as simple as a "map" style function as the relationship between the numbers can introduce a bias in the resulting model, the approach is to index the variable and then encode using a std approach called One Hot Encoding.

This approach requires the encoder to "learn"/fit a model over the data in the Spark instance and then transform based on what was learnt.


In [26]:
# The sample uses an algorithm that only works with numeric features convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex"); 
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec");
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex"); 
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec");

# Create a new dataframe that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

## Generation of Testing and Training Data Sets
Simple split, 70% for training and 30% for testing the model. Playing with this ratio may result in different models.


In [28]:
# Decide on the split between training and testing data from the dataframe 
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the dataframe into test and training dataframes
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

## Train the Model

Train the Logistic Regression model and then evaluate it using Area under ROC as the metric.

In [32]:
## Create a new LR object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passenger_count + tripTimeSecs + trip_distance + fare_amount + payment_type+ trafficTimeBinsVec")

## Undertake training and create an LR model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Saving the model is optional but its another for of inter session cache
datestamp = datetime.now().strftime('%m-%d-%Y-%s');
fileName = "lrModel_" + datestamp;
logRegDirfilename = fileName;
lrModel.save(logRegDirfilename)

## Predict tip 1/0 (yes/no) on the test dataset, evaluation using AUROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

## Evaluate and Visualize

Plot the actual curve to develop a better understanding of the model.


In [33]:
## Plot the ROC curve, no need for pandas as this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()