# Apache Spark - Taxi Fare

## Set Up Libraries

We have imported several libraries according to what we needed to build the models up.

In [1]:
from pyspark import SparkContext
import pyspark as pyspark
from pyspark.sql import SparkSession 

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors

from pyspark.ml.regression import LinearRegression
from pyspark.sql.types import FloatType
import pandas as pd

from datetime import datetime 

# Native Spark

We use from Spark the API's for Machine Learning and DataBases purposes, so by Native Spark we mean to use that libraries without using any other component of Spark for doing this example. It's important to take it in a count because later we are going to use clustering for solve the same problem.

## Set a Spark Session 

This method is for create an interface between the Jupyter Notebook and the Spark-shell

In [2]:
spark = SparkSession.builder.appName('taxis_fare').getOrCreate()

## Load Dataset

Loading the data set with the spark method to do it.

In [3]:
# Load training data
df = spark.read.format("csv").option("header", "true").load("data/train.csv")

## Preparing Data

Charging the LinearRegression model

In [4]:
lr = LinearRegression()

We need to see the type of the attributes to check if we need to process some categorical attributes, but in this case we don't have to.

In [5]:
df.dtypes

[('key', 'string'),
 ('fare_amount', 'string'),
 ('pickup_datetime', 'string'),
 ('pickup_longitude', 'string'),
 ('pickup_latitude', 'string'),
 ('dropoff_longitude', 'string'),
 ('dropoff_latitude', 'string'),
 ('passenger_count', 'string')]

It's necessary to cast all the attributes to float because it's the one of the datatypes that Spark works with, not strings.

In [6]:
df = df.select(df['fare_amount'].cast("float").alias('fare_amount'),
               df['pickup_longitude'].cast("float").alias('pickup_longitude'),
               df['pickup_latitude'].cast("float").alias('pickup_latitude'),
               df['dropoff_longitude'].cast("float").alias('dropoff_longitude'),
               df['dropoff_latitude'].cast("float").alias('dropoff_latitude'),
               df['passenger_count'].cast("float").alias('passenger_count'))


We are going to select the most imporant features to work with them.

In [7]:
df = df.selectExpr("fare_amount as label",'pickup_longitude','pickup_latitude',
                  'dropoff_longitude','dropoff_latitude','passenger_count')

Check if everything it's okay just taking a look at the data.

In [8]:
df.show()

+-----+----------------+---------------+-----------------+----------------+---------------+
|label|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|passenger_count|
+-----+----------------+---------------+-----------------+----------------+---------------+
|  4.5|      -73.844315|      40.721317|        -73.84161|       40.712276|            1.0|
| 16.9|      -74.016045|      40.711304|        -73.97927|       40.782005|            1.0|
|  5.7|      -73.982735|       40.76127|        -73.99124|        40.75056|            2.0|
|  7.7|       -73.98713|      40.733143|        -73.99157|        40.75809|            1.0|
|  5.3|      -73.968094|       40.76801|        -73.95666|       40.783764|            1.0|
| 12.1|       -74.00096|       40.73163|        -73.97289|       40.758232|            1.0|
|  7.5|          -73.98|      40.751663|         -73.9738|       40.764843|            1.0|
| 16.5|        -73.9513|       40.77414|         -73.9901|        40.75105|     

We need to gather all the features into one colum called "features" because Spark needs it like this.

In [9]:
#colum features
vecAssembler = VectorAssembler(inputCols=["pickup_longitude", "pickup_latitude",
                                          "dropoff_longitude", "dropoff_latitude",
                                          "passenger_count"], outputCol="features")
new_df = vecAssembler.transform(df)
new_df.count()

55423856

We also have to delete null rows because Spark doesn't accept null ones.

In [10]:
#Delete null rows
new_df = vecAssembler.setHandleInvalid("skip").transform(df)
new_df.show()

+-----+----------------+---------------+-----------------+----------------+---------------+--------------------+
|label|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|passenger_count|            features|
+-----+----------------+---------------+-----------------+----------------+---------------+--------------------+
|  4.5|      -73.844315|      40.721317|        -73.84161|       40.712276|            1.0|[-73.844314575195...|
| 16.9|      -74.016045|      40.711304|        -73.97927|       40.782005|            1.0|[-74.016044616699...|
|  5.7|      -73.982735|       40.76127|        -73.99124|        40.75056|            2.0|[-73.982734680175...|
|  7.7|       -73.98713|      40.733143|        -73.99157|        40.75809|            1.0|[-73.987129211425...|
|  5.3|      -73.968094|       40.76801|        -73.95666|       40.783764|            1.0|[-73.968093872070...|
| 12.1|       -74.00096|       40.73163|        -73.97289|       40.758232|            1.0|[-74.

## Train Model

We use a Linear Regression algorithms for train the model. We also have to measue the time that the model spends to train the data, which is one of the main purposes of this project.

In [11]:
# Fit the model
start_time = datetime.now()

lrModel = lr.fit(new_df.select('label','features'))

time_elapsed = datetime.now() - start_time 
print('TIME OF LINEAR REGRESSION TRAINING (hh:mm:ss.ms) {}'.format(time_elapsed))

TIME OF LINEAR REGRESSION TRAINING (hh:mm:ss.ms) 0:08:17.698225


## Measures

### Root Mean Squared Error

Basically here we show how's the RMSE

In [12]:
#Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)

RMSE: 20.710237


In [13]:
spark.stop()

# Decision tree regression

The same as before but for Decision tree regression model.

In [14]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder.appName('taxis_fare').getOrCreate()

# Load training data
df = spark.read.format("csv").option("header", "true").load("data/train.csv")

df = df.select(df['fare_amount'].cast("float").alias('fare_amount'),
               df['pickup_longitude'].cast("float").alias('pickup_longitude'),
               df['pickup_latitude'].cast("float").alias('pickup_latitude'),
               df['dropoff_longitude'].cast("float").alias('dropoff_longitude'),
               df['dropoff_latitude'].cast("float").alias('dropoff_latitude'),
               df['passenger_count'].cast("float").alias('passenger_count'))

df = df.selectExpr("fare_amount as label",'pickup_longitude','pickup_latitude',
                  'dropoff_longitude','dropoff_latitude','passenger_count')

new_df = vecAssembler.setHandleInvalid("skip").transform(df)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = new_df.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeRegressor()

start_time = datetime.now()

# Train model.  This also runs the indexer.
model = dt.fit(trainingData)

time_elapsed = datetime.now() - start_time 
print('TIME OF DECISION TREE REGRESSION TRAINING (hh:mm:ss.ms) {}'.format(time_elapsed))

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

TIME OF DECISION TREE REGRESSION TRAINING (hh:mm:ss.ms) 0:20:05.413627
+------------------+------+--------------------+
|        prediction| label|            features|
+------------------+------+--------------------+
| 34.02801889571103|-29.87|[-73.863159179687...|
|11.850275013996376| -20.0|       (5,[4],[5.0])|
| 8.896463517049114|  -6.5|[-73.984352111816...|
| 8.896463517049114|  -6.0|[-73.987518310546...|
| 8.896463517049114|  -3.0|[-73.995063781738...|
+------------------+------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 23.9768


# Random forest regression

The same as before but for Random Forest regression model

In [15]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder.appName('taxis_fare').getOrCreate()

# Load training data
df = spark.read.format("csv").option("header", "true").load("data/train.csv")

df = df.select(df['fare_amount'].cast("float").alias('fare_amount'),
               df['pickup_longitude'].cast("float").alias('pickup_longitude'),
               df['pickup_latitude'].cast("float").alias('pickup_latitude'),
               df['dropoff_longitude'].cast("float").alias('dropoff_longitude'),
               df['dropoff_latitude'].cast("float").alias('dropoff_latitude'),
               df['passenger_count'].cast("float").alias('passenger_count'))

df = df.selectExpr("fare_amount as label",'pickup_longitude','pickup_latitude',
                  'dropoff_longitude','dropoff_latitude','passenger_count')

new_df = vecAssembler.setHandleInvalid("skip").transform(df)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = new_df.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestRegressor()

# Train model.  
start_time = datetime.now()

model = rf.fit(trainingData)

time_elapsed = datetime.now() - start_time 
print('TIME OF RANDOM FOREST TRAINING (hh:mm:ss.ms) {}'.format(time_elapsed))

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

TIME OF RANDOM FOREST TRAINING (hh:mm:ss.ms) 0:35:53.671012
+------------------+------+--------------------+
|        prediction| label|            features|
+------------------+------+--------------------+
|18.195694139170584| -44.9|[-73.871116638183...|
|30.771499981119245|-29.87|[-73.863159179687...|
|13.290360749502483| -18.1|[-73.958274841308...|
| 8.933319739165913| -10.1|[-73.983306884765...|
| 8.933319739165913|  -6.5|[-73.984352111816...|
+------------------+------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 24.8237


# With Local Cluster

Spark implicitly split his own data structures to parallelize them into the clusters, so we set a master for manage the others slaves and then we run as many slaves as logical cores has our own computer.

The local[*] string is a special string denoting that you’re using a local cluster,
which is another way of saying you’re running in single-machine mode. 
The * tells Spark to create as many worker threads as logical cores on your machine.

In [16]:
spark = SparkSession.builder.master('local[*]').appName('taxi_fare_local_cluster').config('spark.driver.memory', '8g').getOrCreate()

### Preparing data as before

The same as before, to prepare the dataset to be trained.

In [17]:
# Load training data
df = spark.read.csv('data/train.csv', header=True,
                    inferSchema=True, nullValue=' ')

lr = LinearRegression()

df = df.select(df['fare_amount'].cast("float").alias('fare_amount'),
               df['pickup_longitude'].cast("float").alias('pickup_longitude'),
               df['pickup_latitude'].cast("float").alias('pickup_latitude'),
               df['dropoff_longitude'].cast("float").alias('dropoff_longitude'),
               df['dropoff_latitude'].cast("float").alias('dropoff_latitude'),
               df['passenger_count'].cast("float").alias('passenger_count'))


df = df.selectExpr("fare_amount as label",'pickup_longitude','pickup_latitude',
                  'dropoff_longitude','dropoff_latitude','passenger_count')

#colum features
vecAssembler = VectorAssembler(inputCols=["pickup_longitude", "pickup_latitude",
                                          "dropoff_longitude", "dropoff_latitude",
                                          "passenger_count"], outputCol="features")

#Delete null rows
df_local_clust = vecAssembler.setHandleInvalid("skip").transform(df)

## Train Local Cluster Model

Same kind of training, as always.

In [18]:
# Fit the model
start_time = datetime.now()

lrModel = lr.fit(df_local_clust.select('label','features'))

time_elapsed = datetime.now() - start_time 
print('TIME OF LINEAR REGRESSION TRAINING (hh:mm:ss.ms) {}'.format(time_elapsed))

TIME OF LINEAR REGRESSION TRAINING (hh:mm:ss.ms) 0:07:36.215578


## Measures

### Root Mean Squared Error

Showing again the RMSE for this model.

In [19]:
#Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)

RMSE: 20.710237


# Decision tree regression local cluster

The same as before but for Decision tree regression model

In [20]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils

spark = SparkSession.builder.master('local[*]').appName('taxi_fare_local_cluster').config('spark.driver.memory', '8g').getOrCreate()

# Load training data
df = spark.read.csv('data/train.csv', header=True,
                    inferSchema=True, nullValue=' ')

df = df.select(df['fare_amount'].cast("float").alias('fare_amount'),
               df['pickup_longitude'].cast("float").alias('pickup_longitude'),
               df['pickup_latitude'].cast("float").alias('pickup_latitude'),
               df['dropoff_longitude'].cast("float").alias('dropoff_longitude'),
               df['dropoff_latitude'].cast("float").alias('dropoff_latitude'),
               df['passenger_count'].cast("float").alias('passenger_count'))

df = df.selectExpr("fare_amount as label",'pickup_longitude','pickup_latitude',
                  'dropoff_longitude','dropoff_latitude','passenger_count')

#colum features
vecAssembler = VectorAssembler(inputCols=["pickup_longitude", "pickup_latitude",
                                          "dropoff_longitude", "dropoff_latitude",
                                          "passenger_count"], outputCol="features")

#Delete null rows
df_local_clust = vecAssembler.setHandleInvalid("skip").transform(df)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = df_local_clust.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeRegressor()

start_time = datetime.now()

# Train model.  This also runs the indexer.
model = dt.fit(trainingData.select('label','features'))

time_elapsed = datetime.now() - start_time 
print('TIME OF DECISION TREE ON LOCAL CLUSTER TRAINING (hh:mm:ss.ms) {}'.format(time_elapsed))

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

TIME OF DECISION TREE ON LOCAL CLUSTER TRAINING (hh:mm:ss.ms) 0:13:05.248696
+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
|  8.93128770001487|-45.0|[-73.980125427246...|
|13.792550010457832|-18.1|[-73.958274841308...|
|  8.93128770001487|-10.1|[-73.983306884765...|
|  8.93128770001487| -6.5|[-73.984352111816...|
|  8.93128770001487| -5.3|[-73.984802246093...|
+------------------+-----+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 24.0373


# Random forest regression local cluster

The same as before but for Random Forest regression model

In [21]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils

spark = SparkSession.builder.master('local[*]').appName('taxi_fare_local_cluster').config('spark.driver.memory', '8g').getOrCreate()

# Load training data
df = spark.read.csv('data/train.csv', header=True,
                    inferSchema=True, nullValue=' ')

df = df.select(df['fare_amount'].cast("float").alias('fare_amount'),
               df['pickup_longitude'].cast("float").alias('pickup_longitude'),
               df['pickup_latitude'].cast("float").alias('pickup_latitude'),
               df['dropoff_longitude'].cast("float").alias('dropoff_longitude'),
               df['dropoff_latitude'].cast("float").alias('dropoff_latitude'),
               df['passenger_count'].cast("float").alias('passenger_count'))


df = df.selectExpr("fare_amount as label",'pickup_longitude','pickup_latitude',
                  'dropoff_longitude','dropoff_latitude','passenger_count')

#colum features
vecAssembler = VectorAssembler(inputCols=["pickup_longitude", "pickup_latitude",
                                          "dropoff_longitude", "dropoff_latitude",
                                          "passenger_count"], outputCol="features")

#Delete null rows
df_local_clust = vecAssembler.setHandleInvalid("skip").transform(df)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = df_local_clust.randomSplit([0.7, 0.3])

# Train a RandomForest model.
#  Note: Use larger numTrees in practice.
#  Setting featureSubsetStrategy="auto" lets the algorithm choose.
rf = RandomForestRegressor()

start_time = datetime.now()

# Train model.  This also runs the indexer.
model = rf.fit(trainingData.select('label','features'))

time_elapsed = datetime.now() - start_time 
print('TIME OF RANDOM FOREST ON LOCAL CLUSTER TRAINING (hh:mm:ss.ms) {}'.format(time_elapsed))

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

TIME OF RANDOM FOREST ON LOCAL CLUSTER TRAINING (hh:mm:ss.ms) 0:26:28.543485
+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
| 8.887903540270049|-45.0|[-73.980125427246...|
|13.359459177697568|-18.1|[-73.958274841308...|
| 8.887903540270049|-10.1|[-73.983306884765...|
|11.238500888096038| -4.5|[-74.006141662597...|
| 8.934682006302216| -3.0|[-74.004997253417...|
+------------------+-----+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 6.8643


## Conclusion

#### As we can appreciate the Root Mean Squared Error is the same because we use the same algorithm, but if we check the time is 58% faster working with the cluster, even if the are local.