# NYC Taxi Fare Pridiction  (using Difference between pickup and dropoff points)

We have a training dataset comprised of pick up and drop off locations and we are gonna predict the fare amount for taxi rides.

In [1]:
%%init_spark
launcher.master="yarn"
launcher.num_executors=6
launcher.executor_cores=2
launcher.executor_memory='5000m'

# Data Exploration

In [2]:
val training_data=spark.read.option("header","true").option("inferschema", "true").csv("/hadoop-user/data/train.csv")


Intitializing Scala interpreter ...

Spark Web UI available at http://C570BD-HM-Master:8088/proxy/application_1544119661487_0002
SparkContext available as 'sc' (version = 2.3.2, master = yarn, app id = application_1544119661487_0002)
SparkSession available as 'spark'


2018-12-06 12:43:27 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2018-12-06 12:43:30 WARN  Client:66 - Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


training_data: org.apache.spark.sql.DataFrame = [key: timestamp, fare_amount: double ... 6 more fields]


# Feature Engineering

We will see all the attributes in the dataset, their data types and will make new columns using the available ones and then downsample the data because the size of data is too big, i.e. 5.7 GB (55 million records)

In [3]:
training_data.show(5)

+-------------------+-----------+--------------------+----------------+---------------+-----------------+----------------+---------------+
|                key|fare_amount|     pickup_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|passenger_count|
+-------------------+-----------+--------------------+----------------+---------------+-----------------+----------------+---------------+
|2009-06-15 17:26:21|        4.5|2009-06-15 17:26:...|      -73.844311|      40.721319|        -73.84161|       40.712278|              1|
|2010-01-05 16:52:16|       16.9|2010-01-05 16:52:...|      -74.016048|      40.711303|       -73.979268|       40.782004|              1|
|2011-08-18 00:35:00|        5.7|2011-08-18 00:35:...|      -73.982738|       40.76127|       -73.991242|       40.750562|              2|
|2012-04-21 04:30:42|        7.7|2012-04-21 04:30:...|       -73.98713|      40.733143|       -73.991567|       40.758092|              1|
|2010-03-09 07:51:00|      

In [4]:
//to see the data types of the attributes
training_data.printSchema()

root
 |-- key: timestamp (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: integer (nullable = true)



Now we will calculate the difference between pickup logitude and latitude, and the drop off longitude and latitude and add the new columns.

In [5]:
val train1=training_data.withColumn("diff_long",expr(("dropoff_longitude - pickup_longitude"))).
withColumn("diff_lat",expr(("dropoff_latitude - pickup_latitude")))

train1: org.apache.spark.sql.DataFrame = [key: timestamp, fare_amount: double ... 8 more fields]


In [6]:
//taking the absolute values of the difference
val train2=train1.withColumn("diff_long",abs(col("diff_long"))).
withColumn("diff_lat",abs(col("diff_lat")))

train2: org.apache.spark.sql.DataFrame = [key: timestamp, fare_amount: double ... 8 more fields]


In [7]:
train2.printSchema()
train2.count()

root
 |-- key: timestamp (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- diff_long: double (nullable = true)
 |-- diff_lat: double (nullable = true)



res2: Long = 55423856


In [8]:
train2.show(5)

+-------------------+-----------+--------------------+----------------+---------------+-----------------+----------------+---------------+--------------------+--------------------+
|                key|fare_amount|     pickup_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|passenger_count|           diff_long|            diff_lat|
+-------------------+-----------+--------------------+----------------+---------------+-----------------+----------------+---------------+--------------------+--------------------+
|2009-06-15 17:26:21|        4.5|2009-06-15 17:26:...|      -73.844311|      40.721319|        -73.84161|       40.712278|              1|0.002701000000001841|0.009041000000003407|
|2010-01-05 16:52:16|       16.9|2010-01-05 16:52:...|      -74.016048|      40.711303|       -73.979268|       40.782004|              1| 0.03677999999999315| 0.07070099999999968|
|2011-08-18 00:35:00|        5.7|2011-08-18 00:35:...|      -73.982738|       40.76127|       -

In [9]:
// dropping off the null values
val train3=train2.na.drop()
train3.count()

train3: org.apache.spark.sql.DataFrame = [key: timestamp, fare_amount: double ... 8 more fields]
res4: Long = 55423480


In [10]:
// dropping of the values of difference that are greator then 5 because they are outliers
val tr4=train3.filter($"diff_long" < 5).filter($"diff_lat" < 5).filter($"fare_amount" > 0).toDF()
val tr5=tr4.drop(col("pickup_datetime")).drop(col("key")).toDF()

tr4: org.apache.spark.sql.DataFrame = [key: timestamp, fare_amount: double ... 8 more fields]
tr5: org.apache.spark.sql.DataFrame = [fare_amount: double, pickup_longitude: double ... 6 more fields]


In [11]:
// Downsampling the dataset, we are taking 25% of the data
val factor=0.25
val downSampledData=tr5.sample(true,factor)

factor: Double = 0.25
downSampledData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [fare_amount: double, pickup_longitude: double ... 6 more fields]


In [12]:
downSampledData.count()

res5: Long = 13827233


In [13]:
downSampledData.show(5)

+-----------+----------------+---------------+-----------------+----------------+---------------+--------------------+--------------------+
|fare_amount|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|passenger_count|           diff_long|            diff_lat|
+-----------+----------------+---------------+-----------------+----------------+---------------+--------------------+--------------------+
|       16.9|      -74.016048|      40.711303|       -73.979268|       40.782004|              1| 0.03677999999999315| 0.07070099999999968|
|       16.5|        -73.9513|      40.774138|       -73.990095|       40.751048|              1| 0.03879499999999325|0.023090000000003386|
|        7.0|       -74.00536|      40.728867|       -74.008913|       40.710907|              1|0.003553000000010...|0.017960000000002196|
|       11.5|      -73.957954|      40.779252|        -73.96125|       40.758787|              1|0.003296000000005961| 0.02046500000000151|
|        4.5|      -

Now we will separate the target variable and then assemble the features for processing by the models

In [14]:
import org.apache.spark.ml.feature._

//get all the numeric features except the target variable
val numeric_features=downSampledData.columns.filter(c =>  !c.equals("fare_amount") )

//Use VectorAssesmbler to aseemble numeric features into a vector
val vectorizer_numeric=new VectorAssembler().setInputCols(numeric_features).setOutputCol("features")

//Create an estimator to standardize the numeric feature
//val standardizer=new StandardScaler().setWithMean(true).setInputCol("numeric_features").setOutputCol("features")


import org.apache.spark.ml.feature._
numeric_features: Array[String] = Array(pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, passenger_count, diff_long, diff_lat)
vectorizer_numeric: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_6d0bd41afafa


# Using Linear Regression for fare prediction

In [15]:
import org.apache.spark.ml._
import org.apache.spark.ml.feature._
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.regression._
//Creating the linearRegression model and fit it to the transformed training data
val LR= new LinearRegression().setLabelCol("fare_amount").setFeaturesCol("features").
setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.7)

import org.apache.spark.ml._
import org.apache.spark.ml.feature._
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.regression._
LR: org.apache.spark.ml.regression.LinearRegression = linReg_38b43dbfc71e


In [16]:
//Creating a Pipeline and add the transformation we did so far to this pipeline
val pipeline = new Pipeline().setStages(Array(vectorizer_numeric, LR))

pipeline: org.apache.spark.ml.Pipeline = pipeline_b5669d24d1d1


In [17]:
//Split the data randomly to 80% tranining and 20% testing. The training data is used to build the model and the testing data is used for testing the model
val Array(training,testing)=downSampledData.randomSplit(Array(0.8,0.2),111)

training: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [fare_amount: double, pickup_longitude: double ... 6 more fields]
testing: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [fare_amount: double, pickup_longitude: double ... 6 more fields]


In [18]:
import org.apache.spark.ml.evaluation._

//Fitting the pipeline to the traning data and transforming the training data
val pipeline_model= pipeline.fit(training)


2018-12-06 12:55:05 WARN  BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
2018-12-06 12:55:05 WARN  BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


import org.apache.spark.ml.evaluation._
pipeline_model: org.apache.spark.ml.PipelineModel = pipeline_b5669d24d1d1


In [19]:
import org.apache.spark.ml.evaluation._

//apllyintg the model to the test data to make predictions
val predictions = pipeline_model.transform(testing)

// Select example rows to display.
predictions.select("prediction","fare_amount", "features").show(5)



+-----------------+-----------+--------------------+
|       prediction|fare_amount|            features|
+-----------------+-----------+--------------------+
|6.580124890951362|        2.5|[-74.115267,40.66...|
|6.590436438975953|        2.5|[-74.037935,40.71...|
|6.581130692465249|        2.5|[-74.036437,40.75...|
|6.582617250543411|        2.5|[-74.034508,40.72...|
|12.45539479985031|        2.5|[-74.01578,40.705...|
+-----------------+-----------+--------------------+
only showing top 5 rows



import org.apache.spark.ml.evaluation._
predictions: org.apache.spark.sql.DataFrame = [fare_amount: double, pickup_longitude: double ... 8 more fields]


In [None]:
// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
  .setLabelCol("fare_amount")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

# GBT Regression

In [20]:
import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}

// Create a GBT model.
val gbt = new GBTRegressor()
  .setLabelCol("fare_amount")
  .setFeaturesCol("features")


import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}
gbt: org.apache.spark.ml.regression.GBTRegressor = gbtr_3a892327b813


In [21]:
val pipeline_gbt = new Pipeline().setStages(Array(vectorizer_numeric, gbt))

pipeline_gbt: org.apache.spark.ml.Pipeline = pipeline_7025db886c8b


In [22]:
//Split the data randomly to 80% tranining and 20% testing. The training data is used to build the model and the testing data is used for testing the model
val Array(training_gbt,testing_gbt)=downSampledData.randomSplit(Array(0.8,0.2),111)

training_gbt: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [fare_amount: double, pickup_longitude: double ... 6 more fields]
testing_gbt: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [fare_amount: double, pickup_longitude: double ... 6 more fields]


In [38]:
val pipeline_model_gbt= pipeline_gbt.fit(training_gbt)

2018-12-06 13:58:14 WARN  BlockManagerMasterEndpoint:66 - No more replicas available for rdd_936_0 !
2018-12-06 13:58:14 WARN  BlockManagerMasterEndpoint:66 - No more replicas available for rdd_963_23 !
2018-12-06 13:58:14 WARN  BlockManagerMasterEndpoint:66 - No more replicas available for rdd_963_7 !
2018-12-06 13:58:14 WARN  BlockManagerMasterEndpoint:66 - No more replicas available for rdd_963_39 !
2018-12-06 13:58:14 WARN  BlockManagerMasterEndpoint:66 - No more replicas available for rdd_722_39 !
2018-12-06 13:58:14 WARN  BlockManagerMasterEndpoint:66 - No more replicas available for rdd_936_32 !
2018-12-06 13:58:14 WARN  BlockManagerMasterEndpoint:66 - No more replicas available for rdd_722_8 !
2018-12-06 13:58:14 WARN  BlockManagerMasterEndpoint:66 - No more replicas available for rdd_722_12 !
2018-12-06 13:58:14 WARN  BlockManagerMasterEndpoint:66 - No more replicas available for rdd_722_36 !
2018-12-06 13:58:14 WARN  BlockManagerMasterEndpoint:66 - No more replicas available 

pipeline_model_gbt: org.apache.spark.ml.PipelineModel = pipeline_7025db886c8b


In [39]:
//applyintg the model to the test data to make predictions
val predictions_gbt = pipeline_model_gbt.transform(testing_gbt)

// Select example rows to display.
predictions_gbt.select("prediction", "fare_amount","features").show(5)

+------------------+-----------+--------------------+
|        prediction|fare_amount|            features|
+------------------+-----------+--------------------+
| 48.22619519179591|        2.5|[-74.115267,40.66...|
|25.369613964560102|        2.5|[-74.037935,40.71...|
| 36.73248646834585|        2.5|[-74.036437,40.75...|
| 34.95496567378947|        2.5|[-74.034508,40.72...|
|13.838395376308867|        2.5|[-74.01578,40.705...|
+------------------+-----------+--------------------+
only showing top 5 rows



predictions_gbt: org.apache.spark.sql.DataFrame = [fare_amount: double, pickup_longitude: double ... 8 more fields]


In [40]:
// Select (prediction, true label) and compute test error.
val evaluator_gbt = new RegressionEvaluator()
  .setLabelCol("fare_amount")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val rmse_gbt = evaluator_gbt.evaluate(predictions_gbt)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse_gbt")

Root Mean Squared Error (RMSE) on test data = 4.795093561075967


evaluator_gbt: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_c610a4ae0347
rmse_gbt: Double = 4.795093561075967


# Using Random Forest Regressor

In [26]:
import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}

// Create a RF Regression model.
val RF = new RandomForestRegressor()
  .setLabelCol("fare_amount")
  .setFeaturesCol("features")

import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}
RF: org.apache.spark.ml.regression.RandomForestRegressor = rfr_72ccd2af2ddb


In [27]:
val pipeline_rf = new Pipeline().setStages(Array(vectorizer_numeric, RF))

pipeline_rf: org.apache.spark.ml.Pipeline = pipeline_fa08ed7b80d9


In [28]:
//Split the data randomly to 80% tranining and 20% testing. The training data is used to build the model and the testing data is used for testing the model
val Array(training_rf,testing_rf)=downSampledData.randomSplit(Array(0.8,0.2),111)

training_rf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [fare_amount: double, pickup_longitude: double ... 6 more fields]
testing_rf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [fare_amount: double, pickup_longitude: double ... 6 more fields]


In [29]:
val pipeline_model_rf= pipeline_rf.fit(training_rf)

pipeline_model_rf: org.apache.spark.ml.PipelineModel = pipeline_fa08ed7b80d9


In [30]:
//apllyintg the model to the test data to make predictions
val predictions_rf = pipeline_model_rf.transform(testing_rf)

// Select example rows to display.
predictions_rf.select("prediction", "fare_amount","features").show(5)

+------------------+-----------+--------------------+
|        prediction|fare_amount|            features|
+------------------+-----------+--------------------+
|10.148876263017446|        2.5|[-74.115267,40.66...|
|  7.97000768220021|        2.5|[-74.037935,40.71...|
| 10.33006375060903|        2.5|[-74.036437,40.75...|
| 8.337633665873257|        2.5|[-74.034508,40.72...|
|12.746673314664807|        2.5|[-74.01578,40.705...|
+------------------+-----------+--------------------+
only showing top 5 rows



predictions_rf: org.apache.spark.sql.DataFrame = [fare_amount: double, pickup_longitude: double ... 8 more fields]


In [31]:
// Select (prediction, true label) and compute test error.
val evaluator_rf = new RegressionEvaluator()
  .setLabelCol("fare_amount")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val rmse_rf = evaluator_rf.evaluate(predictions_rf)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse_rf")

Root Mean Squared Error (RMSE) on test data = 5.139040224281306


evaluator_rf: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_57ecea0e0c04
rmse_rf: Double = 5.139040224281306


# Using Decision Tree Regression

In [32]:
import org.apache.spark.ml.regression.{DecisionTreeRegressionModel, DecisionTreeRegressor}

// Create a Decision Tree Regression model.
val DT = new DecisionTreeRegressor()
  .setLabelCol("fare_amount")
  .setFeaturesCol("features")

import org.apache.spark.ml.regression.{DecisionTreeRegressionModel, DecisionTreeRegressor}
DT: org.apache.spark.ml.regression.DecisionTreeRegressor = dtr_9928a8f912df


In [33]:
val pipeline_dt = new Pipeline().setStages(Array(vectorizer_numeric, DT))

pipeline_dt: org.apache.spark.ml.Pipeline = pipeline_b8ae04e1fbd3


In [34]:
//Split the data randomly to 80% tranining and 20% testing. The training data is used to build the model and the testing data is used for testing the model
val Array(training_dt,testing_dt)=downSampledData.randomSplit(Array(0.8,0.2),111)

training_dt: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [fare_amount: double, pickup_longitude: double ... 6 more fields]
testing_dt: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [fare_amount: double, pickup_longitude: double ... 6 more fields]


In [35]:
val pipeline_model_dt= pipeline_dt.fit(training_dt)

pipeline_model_dt: org.apache.spark.ml.PipelineModel = pipeline_b8ae04e1fbd3


In [36]:
//apllyintg the model to the test data to make predictions
val predictions_dt = pipeline_model_dt.transform(testing_dt)

// Select example rows to display.
predictions_dt.select("prediction","fare_amount", "features").show(5)

+------------------+-----------+--------------------+
|        prediction|fare_amount|            features|
+------------------+-----------+--------------------+
|11.961113633244354|        2.5|[-74.115267,40.66...|
|11.961113633244354|        2.5|[-74.037935,40.71...|
|11.961113633244354|        2.5|[-74.036437,40.75...|
|11.961113633244354|        2.5|[-74.034508,40.72...|
|13.151258649816727|        2.5|[-74.01578,40.705...|
+------------------+-----------+--------------------+
only showing top 5 rows



predictions_dt: org.apache.spark.sql.DataFrame = [fare_amount: double, pickup_longitude: double ... 8 more fields]


In [37]:
// Select (prediction, true label) and compute test error.
val evaluator_dt = new RegressionEvaluator()
  .setLabelCol("fare_amount")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val rmse_dt = evaluator_dt.evaluate(predictions_dt)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse_dt")

Root Mean Squared Error (RMSE) on test data = 5.125889477501701


evaluator_dt: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_e1ddc52ffb2c
rmse_dt: Double = 5.125889477501701


# Conclusion

By seeing the RMSE of all the regression alogrithms, we can say that GBT is the most efficient because the RMSE value is the lowest in that, i.e. 4.79