In [1]:
import pyspark
import pandas as pd
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

Initialize Spark session.

In [2]:
spark = pyspark.sql.SparkSession.builder.appName('2018_Yellow_Taxi_Trip_Data').getOrCreate()
spark.sparkContext._conf.set('spark.executor.cores', 4)

Read data.

In [3]:
df = spark.read.csv('hdfs://cluster-ebb0-m/hadoop/2018_Yellow_Taxi_Trip_Data.csv', header=True, inferSchema=True)

In [4]:
df.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount']

View the data schema.

In [5]:
df.dtypes

[('VendorID', 'int'),
 ('tpep_pickup_datetime', 'string'),
 ('tpep_dropoff_datetime', 'string'),
 ('passenger_count', 'int'),
 ('trip_distance', 'double'),
 ('RatecodeID', 'int'),
 ('store_and_fwd_flag', 'string'),
 ('PULocationID', 'int'),
 ('DOLocationID', 'int'),
 ('payment_type', 'int'),
 ('fare_amount', 'double'),
 ('extra', 'double'),
 ('mta_tax', 'double'),
 ('tip_amount', 'double'),
 ('tolls_amount', 'double'),
 ('improvement_surcharge', 'double'),
 ('total_amount', 'double')]

View the first 5 observations.

In [5]:
pd.DataFrame(df.head(5), columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4
VendorID,1,2,2,1,1
tpep_pickup_datetime,09/04/2018 08:52:14 PM,09/04/2018 08:33:03 PM,09/04/2018 08:54:45 PM,09/04/2018 08:57:24 PM,09/04/2018 08:30:56 PM
tpep_dropoff_datetime,09/04/2018 08:59:39 PM,09/04/2018 08:40:55 PM,09/04/2018 09:04:58 PM,09/04/2018 09:06:24 PM,09/04/2018 08:41:18 PM
passenger_count,1,1,1,1,1
trip_distance,1.2,1.06,1.91,1.3,1.2
RatecodeID,1,1,1,1,1
store_and_fwd_flag,N,N,N,N,N
PULocationID,142,113,211,161,239
DOLocationID,48,90,234,234,238
payment_type,1,1,2,1,1


Compute basic statistics by features.

In [None]:
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
VendorID,112234626,1.5897626281571964,0.5179106148171566,1,4
tpep_pickup_datetime,112234626,,,01/01/2001 10:55:43 PM,12/31/2018 12:59:59 PM
tpep_dropoff_datetime,112234626,,,01/01/2001 04:53:11 PM,12/31/2018 12:59:59 PM
passenger_count,112234626,1.5955153091524534,1.241768530226261,0,192
trip_distance,112234626,2.9294395068416135,18.337394520895376,0.0,189483.84
RatecodeID,112234626,1.0494463892096901,0.7380673063734534,1,99
store_and_fwd_flag,112234626,,,N,Y
PULocationID,112234626,163.17901635810682,66.5401807546528,1,265
DOLocationID,112234626,161.40208369385041,70.42425630974124,1,265


Count the number of unique values by features.

In [6]:
[(feature, df.select(feature).distinct().count()) for feature in df.columns]

[('VendorID', 3),
 ('tpep_pickup_datetime', 265019),
 ('tpep_dropoff_datetime', 265987),
 ('passenger_count', 10),
 ('trip_distance', 3156),
 ('RatecodeID', 7),
 ('store_and_fwd_flag', 2),
 ('PULocationID', 250),
 ('DOLocationID', 259),
 ('payment_type', 4),
 ('fare_amount', 1566),
 ('extra', 18),
 ('mta_tax', 9),
 ('tip_amount', 2094),
 ('tolls_amount', 330),
 ('improvement_surcharge', 3),
 ('total_amount', 6329)]

Define target feature and compute linear correlations between it and other numerical features.

In [7]:
target = 'total_amount'

In [8]:
for feature, feature_type in df.dtypes:
    if feature!=target and feature_type!='string':
        print('Correlation between total_amount and '+feature+': '+str(df.stat.corr(target, feature)))

Correlation between total_amount and VendorID: 0.0197917706212
Correlation between total_amount and passenger_count: 0.00924588181119
Correlation between total_amount and trip_distance: 0.88137909097
Correlation between total_amount and RatecodeID: 0.307053895976
Correlation between total_amount and PULocationID: -0.0755780016015
Correlation between total_amount and DOLocationID: -0.0823406136942
Correlation between total_amount and payment_type: -0.127486456994
Correlation between total_amount and fare_amount: 0.979547614633
Correlation between total_amount and extra: 0.118150936323
Correlation between total_amount and mta_tax: -0.0820888468893
Correlation between total_amount and tip_amount: 0.663182857453
Correlation between total_amount and tolls_amount: 0.613979162677
Correlation between total_amount and improvement_surcharge: 0.0428946700627


Define the list of features for the models.

In [9]:
model_features = ['VendorID','passenger_count','trip_distance','extra','mta_tax','tip_amount','tolls_amount','improvement_surcharge','fare_amount','RatecodeID','payment_type']

Create the Spark features assembler for the models and apply it to the data.

In [10]:
assembler = VectorAssembler(inputCols=model_features, outputCol='features')
df_assembler = assembler.transform(df).select(['features',target])

Split data into train and test.

In [11]:
train, test = df_assembler.randomSplit([0.8,0.2], seed=1)

Define the validation metrics.

In [12]:
r2 = RegressionEvaluator(predictionCol='prediction', labelCol=target, metricName='r2')
rmse = RegressionEvaluator(predictionCol='prediction', labelCol=target, metricName='rmse')

#### Linear Regression

Train a Linear Regression with train data.

In [13]:
model_lr = LinearRegression(featuresCol='features', labelCol=target, maxIter=10, regParam=0.3, elasticNetParam=0.8)
model_lr = model_lr.fit(train)

Print the model coefficients of features.

In [14]:
for feature, coefficient in zip(features, model_lr.coefficients):
    print('Coefficient for '+feature+' :'+str(coefficient))
print('Intercept: '+str(model_lr.intercept))

Coefficient for VendorID :0.0
Coefficient for passenger_count :0.0
Coefficient for trip_distance :0.07191029728547152
Coefficient for extra :0.574443964363591
Coefficient for mta_tax :0.0
Coefficient for tip_amount :0.9212248613586805
Coefficient for tolls_amount :0.8898254289302157
Coefficient for improvement_surcharge :0.0
Coefficient for fare_amount :0.9731481648205144
Coefficient for RatecodeID :0.0
Coefficient for payment_type :0.0
Intercept: 1.30884065482


Evaluation metrics on train and test data.

In [15]:
train_prediction_lr = model_lr.transform(train)
test_prediction_lr = model_lr.transform(test)
print('R2 on train data: '+str(r2.evaluate(train_prediction_lr)))
print('R2 on test data: '+str(r2.evaluate(test_prediction_lr)))
print('RMSE on train data: '+str(rmse.evaluate(train_prediction_lr)))
print('RMSE on test data: '+str(rmse.evaluate(test_prediction_lr)))

R2 on train data: 0.998986136841
R2 on test data: 0.998960124184
RMSE on train data: 0.462498364762
RMSE on test data: 0.459752030492


Save the model into HDFS.

In [16]:
model_lr.save('hdfs://cluster-ebb0-m/hadoop/model_lr.model')

#### GBT Regressor

Train a GBT Regressor with train data.

In [17]:
model_gbt = GBTRegressor(featuresCol='features', labelCol=target, maxIter=20)
model_gbt = model_gbt.fit(train)

Print the model importance of features.

In [18]:
for feature, importance in zip(features, model_gbt.featureImportances):
    print('Importance for '+feature+' :'+str(importance))

Importance for VendorID :0.0160317309196147
Importance for passenger_count :0.14219269723412462
Importance for trip_distance :0.08401662463745625
Importance for extra :0.025521871273252866
Importance for mta_tax :0.03581274772313998
Importance for tip_amount :0.09563840355267972
Importance for tolls_amount :0.1165711884478701
Importance for improvement_surcharge :0.030422229486347974
Importance for fare_amount :0.2918222044596368
Importance for RatecodeID :0.12675943941113266
Importance for payment_type :0.03521086285474421


Evaluation metrics on train and test data.

In [19]:
train_prediction_gbt = model_gbt.transform(train)
test_prediction_gbt = model_gbt.transform(test)
print('R2 on train data: '+str(r2.evaluate(train_prediction_gbt)))
print('R2 on test data: '+str(r2.evaluate(test_prediction_gbt)))
print('RMSE on train data: '+str(rmse.evaluate(train_prediction_gbt)))
print('RMSE on test data: '+str(rmse.evaluate(test_prediction_gbt)))

R2 on train data: 0.940314848766
R2 on test data: 0.938992851618
RMSE on train data: 3.54857247698
RMSE on test data: 3.5214642172


Save the model into HDFS.

In [20]:
model_gbt.save('hdfs://cluster-ebb0-m/hadoop/model_gbt.model')

Stop Spark session.

In [21]:
spark.stop()