# Exploration and Modeling of Sampled 2013 NYC Taxi Trip and Fare Dataset in Spark 2.0

## Introduction

This notebook shows the basic data science steps for Spark. It demos features of Spark's MLlib toolkit using the NYC taxi trip and fare data-set from 2013. We take a 0.1% sample of this data-set (about 170K rows, 35 Mb) to to show MLlib's modeling features for binary classification.

Many thanks to Debraj GuhaThakurta who developed significant portions of this content. A longer version of this notebook is available on the Linux DSVM. Log in to JupyterHub, then navigate to SparkML -> pySpark -> pySpark modeling.ipynb.

A similar tutorial is available for HDInsight on the [Cortana Intelligence and Machine Learning blog](https://blogs.technet.microsoft.com/machinelearning/2017/03/22/end-to-end-data-science-walkthrough-with-spark-2-0-on-azure-hdinsight-hadoop-clusters/).

## Overview

This notebook shows data ingestion, exploration and plotting, data preparation (featurizing/transformation), modeling, prediction, model persistance and model evaluation on an independent validation data-set. Machine learning tasks are performed using Spark's MLlib functions. For plotting purposes, the Spark dataframes are converted to pandas dataframes so matplotlib functions can be used. This is because there are no good Spark libraries for creating plots from Spark dataframes.

We address a single regression problem: predicting the amount of tip paid for taxi trips.

In [None]:
from IPython.display import Image
Image("https://dsvmassets.blob.core.windows.net/images/spark_process.png")

In [None]:
# Location of training data. This could also be in Azure blob storage or ADLS
taxi_train_file_loc = "../Data/JoinedTaxiTripFare.Point1Pct.Train.csv"
taxi_valid_file_loc = "../Data/JoinedTaxiTripFare.Point1Pct.Valid.csv"

# you could also use data from blob storage
# taxi_train_file_loc = "wasb://nyctaxi@dsvmdemo.blob.core.windows.net/JoinedTaxiTripFare.Point1Pct.Train.csv"
# taxi_valid_file_loc = "wasb://nyctaxi@dsvmdemo.blob.core.windows.net/JoinedTaxiTripFare.Point1Pct.Valid.csv"

# 2. Set model storage directory path. This is where models will be saved.
modelDir = "../Outputs/" # The last backslash is needed

In [None]:
# import necessary libraries
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer, RFormula
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import matplotlib.pyplot as plt
import numpy as np
import datetime
import atexit
from sklearn.metrics import roc_curve,auc

## Data ingestion

Read in joined 0.1% taxi trip and fare file (as csv), format and clean data, and create data frame.

In [None]:
## Read in CSV data as a Spark dataframe
taxi_train_df = spark.read.csv(path=taxi_train_file_loc, header=True, inferSchema=True)

## Drop some unnecessary columns
taxi_df_train_cleaned = taxi_train_df.drop('medallion').drop('hack_license')
    
## filter out undesirable values and outliers
taxi_df_train_cleaned = taxi_df_train_cleaned.filter("passenger_count > 0 and passenger_count < 8")

## Register the dataframe as a temp table in the SQL context
taxi_df_train_cleaned.createOrReplaceTempView("taxi_train")

taxi_df_train_cleaned.printSchema()

## Data exploration & visualization

#### Plot histogram of tip amount, relationship between tip amount vs. other features

For plotting, the dataframe will first have to be converted to a pandas dataframe so matplotlib can use it for generating plots. Here, if the Spark dataframe is large, it can be down-sampled (using the "sample" function). In the example below, 50% of data was sampled.

In [None]:
sqlStatement = "SELECT fare_amount, passenger_count, tip_amount, tipped FROM taxi_train"
sqlResultsPD = spark.sql(sqlStatement).sample(False, 0.5, seed=1234).toPandas()

%matplotlib inline

## tip by payment type and passenger count
ax1 = sqlResultsPD[['tip_amount']].plot(kind='hist', bins=25, facecolor='lightblue')
ax1.set_title('Tip amount distribution')
ax1.set_xlabel('Tip Amount ($)'); ax1.set_ylabel('Counts')
plt.figure(figsize=(4,4)); plt.suptitle(''); plt.show()

## tip amount by fare amount. Points are scaled by passenger count
ax = sqlResultsPD.plot(kind='scatter', x= 'fare_amount', y = 'tip_amount', c='blue', alpha = 0.10, s=2.5*(sqlResultsPD.passenger_count))
ax.set_title('Tip amount by Fare amount')
ax.set_xlabel('Fare Amount ($)'); ax.set_ylabel('Tip Amount ($)')
plt.axis([-2, 80, -2, 20])
plt.figure(figsize=(4,4)); plt.suptitle(''); plt.show()

We leave it up to you to figure out what the vertical line of dots is near the $50 fare amount. Spark SQL makes this straightforward.

## Feature engineering, transformation and data prep for modeling

#### Create a new feature by binning hours into traffic time buckets using Spark SQL

Spark SQL can be a very convenient way to perform pre-modeling steps, including data transformation, clean-up etc.

In [None]:
## create four buckets for pickup times
sqlStatement = """ SELECT *, CASE
     WHEN (pickup_hour <= 6 OR pickup_hour >= 20) THEN "Night" 
     WHEN (pickup_hour >= 7 AND pickup_hour <= 10) THEN "AMRush" 
     WHEN (pickup_hour >= 11 AND pickup_hour <= 15) THEN "Afternoon"
     WHEN (pickup_hour >= 16 AND pickup_hour <= 19) THEN "PMRush"
    END as TrafficTimeBins
    FROM taxi_train 
"""
taxi_df_train_with_newFeatures = spark.sql(sqlStatement)

This next cell is necessary to render tables correctly.

In [None]:
%%html
<style>
  table {margin-left: 0 !important;}
</style>

#### Indexing and one-hot encoding of categorical features

Here we only transform a few variables to an example of how to transform strings to one-hot encoding. Other variables, such as weekday, which are represented by numerical values, can also be indexed as categorical variables.

For indexing, we used StringIndexer, and for one-hot encoding, we used OneHotEncoder functions from MLlib. StringIndexer converts text values to index values. For example, assume our data is

| id | string-value  
|----|---------------
|  0 | a             
|  1 | b             
|  2 | b             
|  3 | b             
|  4 | a             

StringIndexer converts this to

| id | string-value  | index-value |
|----|---------------|-------------|
|  0 | a             | 1           |
|  1 | b             | 0           |
|  2 | b             | 1           |
|  3 | b             | 1           |
|  4 | a             | 0           |

In [None]:
## define the transformations that need to be applied to some of the features
sI1 = StringIndexer(inputCol="vendor_id", outputCol="vendorIndex"); en1 = OneHotEncoder(dropLast=False, inputCol="vendorIndex", outputCol="vendorVec");
sI2 = StringIndexer(inputCol="rate_code", outputCol="rateIndex"); en2 = OneHotEncoder(dropLast=False, inputCol="rateIndex", outputCol="rateVec");
sI3 = StringIndexer(inputCol="payment_type", outputCol="paymentIndex"); en3 = OneHotEncoder(dropLast=False, inputCol="paymentIndex", outputCol="paymentVec");
sI4 = StringIndexer(inputCol="TrafficTimeBins", outputCol="TrafficTimeBinsIndex"); en4 = OneHotEncoder(dropLast=False, inputCol="TrafficTimeBinsIndex", outputCol="TrafficTimeBinsVec");

## apply the transformations
encodedFinal = Pipeline(stages=[sI1, en1, sI2, en2, sI3, en3, sI4, en4]).fit(taxi_df_train_with_newFeatures).transform(taxi_df_train_with_newFeatures)

#### Create a random sampling of the data, as needed (50% is used here). This can save time while training models. Then, split into train/test.

In [None]:
trainingFraction = 0.5
seed = 1234;
encodedFinalSampled = encodedFinal.sample(False, 0.5, seed=seed)

## split sampled dataframe into train and test
trainData, testData = encodedFinalSampled.randomSplit([trainingFraction, 1.0 - trainingFraction], seed=seed);

## cache the dataframes in memory
trainData.cache(); trainData.count();
testData.cache(); testData.count();

## Regression model training: Predicting amount of tip paid for taxi trips

For modeling, the features and class labels are specified using the convenient RFormula function

In [None]:
## Define regression formula
regFormula = RFormula(formula="tip_amount ~ paymentIndex + vendorIndex + rateIndex + TrafficTimeBinsIndex + pickup_hour + weekday + passenger_count + trip_time_in_secs + trip_distance + fare_amount")

## Define indexer for categorical variables
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=32)

## Random forest estimator
randForest = RandomForestRegressor(featuresCol = 'indexedFeatures', labelCol = 'label', numTrees=20, 
                                   featureSubsetStrategy="auto",impurity='variance', maxDepth=6, maxBins=100)

## Fit model, with formula and other transformations
model = Pipeline(stages=[regFormula, featureIndexer, randForest]).fit(trainData)

## PREDICT ON TEST DATA AND EVALUATE
predictions = model.transform(testData)
predictionAndLabels = predictions.select("label","prediction").rdd
testMetrics = RegressionMetrics(predictionAndLabels)
print("RMSE = %s" % testMetrics.rootMeanSquaredError)
print("R-sqr = %s" % testMetrics.r2)

## PLOC ACTUALS VS. PREDICTIONS
predictionsPD = predictions.select("label","prediction").toPandas()

ax = predictionsPD.plot(kind='scatter', figsize = (5,5), x='label', y='prediction', color='blue', alpha = 0.15, label='Actual vs. predicted');
fit = np.polyfit(predictionsPD['label'], predictionsPD['prediction'], deg=1)
ax.set_title('Actual vs. Predicted Tip Amounts ($)')
ax.set_xlabel("Actual"); ax.set_ylabel("Predicted");
ax.plot(predictionsPD['label'], fit[0] * predictionsPD['label'] + fit[1], color='magenta')
plt.axis([-1, 15, -1, 15])
plt.show(ax)

## Save the model, then load it and evaluate test data

In [None]:
## SAVE MODEL
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
fileName = "RandomForestRegressionModel_" + datestamp;
randForestDirfilename = modelDir + fileName;
model.save(randForestDirfilename)

# Load the model to predict on the test data. You probably wouldn't 
savedModel = PipelineModel.load(randForestDirfilename)

## PREDICT ON TEST DATA AND EVALUATE
predictions = savedModel.transform(testData)
predictionAndLabels = predictions.select("label","prediction").rdd
testMetrics = RegressionMetrics(predictionAndLabels)
print("RMSE = %s" % testMetrics.rootMeanSquaredError)
print("R-sqr = %s" % testMetrics.r2)

## Hyper-parameter tuning: Train a random forest model using hyper-parameter tuning and cross-validation

Notice that as expected, the parameter tuning and cross-validation improves the model performance (R-sqr) significantly on test data.

In [None]:
## DEFINE RANDOM FOREST MODELS
randForest = RandomForestRegressor(featuresCol = 'indexedFeatures', labelCol = 'label', 
                                   featureSubsetStrategy="auto",impurity='variance', maxBins=100)

## DEFINE MODELING PIPELINE, INCLUDING FORMULA, FEATURE TRANSFORMATIONS, AND ESTIMATOR
pipeline = Pipeline(stages=[regFormula, featureIndexer, randForest])

## DEFINE PARAMETER GRID FOR RANDOM FOREST
paramGrid = ParamGridBuilder() \
    .addGrid(randForest.numTrees, [10, 25, 50]) \
    .addGrid(randForest.maxDepth, [3, 5, 7]) \
    .build()

## DEFINE CROSS VALIDATION
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(metricName="rmse"),
                          numFolds=3)

## TRAIN MODEL USING CV
cvModel = crossval.fit(trainData)

## PREDICT AND EVALUATE TEST DATA SET
predictions = cvModel.transform(testData)
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R-squared on test data = %g" % r2)

## Load independent validation data-set and evaluate a model

The validation data-set needs to be transformed in the same way as the training data in order to score it correctly.

In [None]:
## READ IN DATA FRAME FROM CSV
taxi_valid_df = spark.read.csv(path=taxi_valid_file_loc, header=True, inferSchema=True)

## CREATE A CLEANED DATA-FRAME BY DROPPING SOME UN-NECESSARY COLUMNS & FILTERING FOR UNDESIRED VALUES OR OUTLIERS
taxi_df_valid_cleaned = taxi_valid_df.drop('medallion').drop('hack_license').drop('store_and_fwd_flag').drop('pickup_datetime')\
    .drop('dropoff_datetime').drop('pickup_longitude').drop('pickup_latitude').drop('dropoff_latitude')\
    .drop('dropoff_longitude').drop('tip_class').drop('total_amount').drop('tolls_amount').drop('mta_tax')\
    .drop('direct_distance').drop('surcharge')\
    .filter("passenger_count > 0 and passenger_count < 8 AND payment_type in ('CSH', 'CRD') \
        AND tip_amount >= 0 AND tip_amount < 30 AND fare_amount >= 1 AND fare_amount < 200 \
        AND trip_distance > 0 AND trip_distance < 100 AND trip_time_in_secs > 30 AND trip_time_in_secs < 7200")

## REGISTER DATA-FRAME AS A TEMP-TABLE IN SQL-CONTEXT
taxi_df_valid_cleaned.createOrReplaceTempView("taxi_valid")

### CREATE FOUR BUCKETS FOR TRAFFIC TIMES
sqlStatement = """ SELECT *, CASE
     WHEN (pickup_hour <= 6 OR pickup_hour >= 20) THEN "Night" 
     WHEN (pickup_hour >= 7 AND pickup_hour <= 10) THEN "AMRush" 
     WHEN (pickup_hour >= 11 AND pickup_hour <= 15) THEN "Afternoon"
     WHEN (pickup_hour >= 16 AND pickup_hour <= 19) THEN "PMRush"
    END as TrafficTimeBins
    FROM taxi_valid
"""
taxi_df_valid_with_newFeatures = spark.sql(sqlStatement)

## APPLY THE SAME TRANSFORATION ON THIS DATA AS ORIGINAL TRAINING DATA
encodedFinalValid = Pipeline(stages=[sI1, en1, sI2, en2, sI3, en3, sI4, en4]).fit(taxi_df_train_with_newFeatures).transform(taxi_df_valid_with_newFeatures)

# predict using the cross-validation model
predictions = cvModel.bestModel.transform(encodedFinalValid)
r2 = evaluator.evaluate(predictions)
print("R-squared on test data = %g" % r2)