# Apache Spark - Taxi Fare

## Set Up Libraries

In [1]:
from pyspark import SparkContext
import pyspark as pyspark
from pyspark.sql import SparkSession 

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors

from pyspark.ml.regression import LinearRegression
from pyspark.sql.types import FloatType
import pandas as pd

from datetime import datetime 

# Native Spark

We use from Spark the API's for Machine Learning and DataBases purposes, so by Native Spark we mean to use that libraries without using any other component of Spark for doing this example. It's important to take it in a count because later we are going to use clustering for solve the same problem.

## Set a Spark Session 

This method is for create an interface between the Jupyter Notebook and the Spark-shell

In [2]:
spark = SparkSession.builder.appName('taxis_fare').getOrCreate()

## Load Dataset

In [3]:
# Load training data
df = spark.read.format("csv").option("header", "true").load("data/train.csv")

## Preparing Data

In [4]:
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [5]:
df.dtypes

[('key', 'string'),
 ('fare_amount', 'string'),
 ('pickup_datetime', 'string'),
 ('pickup_longitude', 'string'),
 ('pickup_latitude', 'string'),
 ('dropoff_longitude', 'string'),
 ('dropoff_latitude', 'string'),
 ('passenger_count', 'string')]

In [6]:
df = df.select(df['fare_amount'].cast("float").alias('fare_amount'),
               df['pickup_longitude'].cast("float").alias('pickup_longitude'),
               df['pickup_latitude'].cast("float").alias('pickup_latitude'),
               df['dropoff_longitude'].cast("float").alias('dropoff_longitude'),
               df['dropoff_latitude'].cast("float").alias('dropoff_latitude'),
               df['passenger_count'].cast("float").alias('passenger_count'))


We are going to select the most imporant features to work with them.

In [7]:
df = df.selectExpr("fare_amount as label",'pickup_longitude','pickup_latitude',
                  'dropoff_longitude','dropoff_latitude','passenger_count')

In [8]:
df.show()

+-----+----------------+---------------+-----------------+----------------+---------------+
|label|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|passenger_count|
+-----+----------------+---------------+-----------------+----------------+---------------+
|  4.5|      -73.844315|      40.721317|        -73.84161|       40.712276|            1.0|
| 16.9|      -74.016045|      40.711304|        -73.97927|       40.782005|            1.0|
|  5.7|      -73.982735|       40.76127|        -73.99124|        40.75056|            2.0|
|  7.7|       -73.98713|      40.733143|        -73.99157|        40.75809|            1.0|
|  5.3|      -73.968094|       40.76801|        -73.95666|       40.783764|            1.0|
| 12.1|       -74.00096|       40.73163|        -73.97289|       40.758232|            1.0|
|  7.5|          -73.98|      40.751663|         -73.9738|       40.764843|            1.0|
| 16.5|        -73.9513|       40.77414|         -73.9901|        40.75105|     

In [9]:
#colum features
vecAssembler = VectorAssembler(inputCols=["pickup_longitude", "pickup_latitude",
                                          "dropoff_longitude", "dropoff_latitude",
                                          "passenger_count"], outputCol="features")
new_df = vecAssembler.transform(df)
new_df.count()

55423856

In [10]:
#Delete null rows
new_df = vecAssembler.setHandleInvalid("skip").transform(df)
new_df.show()

+-----+----------------+---------------+-----------------+----------------+---------------+--------------------+
|label|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|passenger_count|            features|
+-----+----------------+---------------+-----------------+----------------+---------------+--------------------+
|  4.5|      -73.844315|      40.721317|        -73.84161|       40.712276|            1.0|[-73.844314575195...|
| 16.9|      -74.016045|      40.711304|        -73.97927|       40.782005|            1.0|[-74.016044616699...|
|  5.7|      -73.982735|       40.76127|        -73.99124|        40.75056|            2.0|[-73.982734680175...|
|  7.7|       -73.98713|      40.733143|        -73.99157|        40.75809|            1.0|[-73.987129211425...|
|  5.3|      -73.968094|       40.76801|        -73.95666|       40.783764|            1.0|[-73.968093872070...|
| 12.1|       -74.00096|       40.73163|        -73.97289|       40.758232|            1.0|[-74.

## Train Model

We use a Linear Regression algorithms for train the model.

In [11]:
# Fit the model
start_time = datetime.now()

lrModel = lr.fit(new_df.select('label','features'))

time_elapsed = datetime.now() - start_time 
print('TIME OF LINEAR REGRESSION TRAINING (hh:mm:ss.ms) {}'.format(time_elapsed))

TIME OF LINEAR REGRESSION TRAINING (hh:mm:ss.ms) 0:11:17.159047


In [12]:
# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

Coefficients: [0.0,0.0,0.0,0.0,0.0]
Intercept: 11.345004953120542


## Measures

### Root Mean Squared Error

In [13]:
#Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)

numIterations: 1
RMSE: 20.710867


In [14]:
spark.stop()

# Decision tree regression

In [15]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder.appName('taxis_fare').getOrCreate()

# Load training data
df = spark.read.format("csv").option("header", "true").load("data/train.csv")

df = df.select(df['fare_amount'].cast("float").alias('fare_amount'),
               df['pickup_longitude'].cast("float").alias('pickup_longitude'),
               df['pickup_latitude'].cast("float").alias('pickup_latitude'),
               df['dropoff_longitude'].cast("float").alias('dropoff_longitude'),
               df['dropoff_latitude'].cast("float").alias('dropoff_latitude'),
               df['passenger_count'].cast("float").alias('passenger_count'))

df = df.selectExpr("fare_amount as label",'pickup_longitude','pickup_latitude',
                  'dropoff_longitude','dropoff_latitude','passenger_count')

new_df = vecAssembler.setHandleInvalid("skip").transform(df)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = new_df.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeRegressor()

start_time = datetime.now()

# Train model.  This also runs the indexer.
model = dt.fit(trainingData)

time_elapsed = datetime.now() - start_time 
print('TIME OF DECISION TREE REGRESSION TRAINING (hh:mm:ss.ms) {}'.format(time_elapsed))

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

TIME OF DECISION TREE REGRESSION TRAINING (hh:mm:ss.ms) 0:20:22.153943
+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
| 8.923282596071983|-62.0|[-74.002838134765...|
| 33.94856839028241|-44.9|[-73.871116638183...|
|13.820852104281952|-18.1|[-73.958274841308...|
| 8.923282596071983|-10.1|[-73.983306884765...|
| 8.923282596071983| -6.5|[-73.984352111816...|
+------------------+-----+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 28.3057


# Random forest regression

In [16]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder.appName('taxis_fare').getOrCreate()

# Load training data
df = spark.read.format("csv").option("header", "true").load("data/train.csv")

df = df.select(df['fare_amount'].cast("float").alias('fare_amount'),
               df['pickup_longitude'].cast("float").alias('pickup_longitude'),
               df['pickup_latitude'].cast("float").alias('pickup_latitude'),
               df['dropoff_longitude'].cast("float").alias('dropoff_longitude'),
               df['dropoff_latitude'].cast("float").alias('dropoff_latitude'),
               df['passenger_count'].cast("float").alias('passenger_count'))

df = df.selectExpr("fare_amount as label",'pickup_longitude','pickup_latitude',
                  'dropoff_longitude','dropoff_latitude','passenger_count')

new_df = vecAssembler.setHandleInvalid("skip").transform(df)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = new_df.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestRegressor()

# Train model.  
start_time = datetime.now()

model = rf.fit(trainingData)

time_elapsed = datetime.now() - start_time 
print('TIME OF RANDOM FOREST TRAINING (hh:mm:ss.ms) {}'.format(time_elapsed))

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

TIME OF RANDOM FOREST TRAINING (hh:mm:ss.ms) 0:35:37.276246
+------------------+------+--------------------+
|        prediction| label|            features|
+------------------+------+--------------------+
|28.582299205178288|-29.87|[-73.863159179687...|
|14.533684751999676| -18.1|[-73.958274841308...|
|  8.88273460890856|  -6.5|[-73.993659973144...|
|  8.88273460890856|  -6.5|[-73.989494323730...|
|  8.88273460890856|  -5.3|[-73.984802246093...|
+------------------+------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 23.9302


# With Local Cluster

Spark implicitly split his own data structures to parallelize them into the clusters, so we set a master for manage the others slaves and then we run as many slaves as logical cores has our own computer.

The local[*] string is a special string denoting that you’re using a local cluster,
which is another way of saying you’re running in single-machine mode. 
The * tells Spark to create as many worker threads as logical cores on your machine.

In [17]:
#Library for work with Clusters
from pyspark.ml.clustering import KMeans

In [18]:
spark = SparkSession.builder.master('local[*]').appName('taxi_fare_local_cluster').config('spark.driver.memory', '8g').getOrCreate()

### Preparing data as before

The same as before, for preparing the dataset to be train.

In [23]:
# Load training data
df = spark.read.csv('data/train.csv', header=True,
                    inferSchema=True, nullValue=' ')

lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

df = df.select(df['fare_amount'].cast("float").alias('fare_amount'),
               df['pickup_longitude'].cast("float").alias('pickup_longitude'),
               df['pickup_latitude'].cast("float").alias('pickup_latitude'),
               df['dropoff_longitude'].cast("float").alias('dropoff_longitude'),
               df['dropoff_latitude'].cast("float").alias('dropoff_latitude'),
               df['passenger_count'].cast("float").alias('passenger_count'))


df = df.selectExpr("fare_amount as label",'pickup_longitude','pickup_latitude',
                  'dropoff_longitude','dropoff_latitude','passenger_count')

#colum features
vecAssembler = VectorAssembler(inputCols=["pickup_longitude", "pickup_latitude",
                                          "dropoff_longitude", "dropoff_latitude",
                                          "passenger_count"], outputCol="features")
df_local_clust = vecAssembler.transform(df)

#Delete null rows
df_local_clust = vecAssembler.setHandleInvalid("skip").transform(df)

In [24]:
df_local_clust.columns

['label',
 'pickup_longitude',
 'pickup_latitude',
 'dropoff_longitude',
 'dropoff_latitude',
 'passenger_count',
 'features']

## Train Local Cluster Model

In [25]:
# Fit the model
start_time = datetime.now()

lrModel = lr.fit(df_local_clust.select('label','features'))

time_elapsed = datetime.now() - start_time 
print('TIME OF LINEAR REGRESSION TRAINING (hh:mm:ss.ms) {}'.format(time_elapsed))

TIME OF LINEAR REGRESSION TRAINING (hh:mm:ss.ms) 0:07:40.082209


In [26]:
# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

Coefficients: [0.0,0.0,0.0,0.0,0.0]
Intercept: 11.345004953120545


## Measures

### Root Mean Squared Error

In [27]:
#Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)

numIterations: 1
RMSE: 20.710867


# Decision tree regression local cluster

In [21]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils

spark = SparkSession.builder.master('local[*]').appName('taxi_fare_local_cluster').config('spark.driver.memory', '8g').getOrCreate()

# Load training data
df = spark.read.csv('data/train.csv', header=True,
                    inferSchema=True, nullValue=' ')

df = df.select(df['fare_amount'].cast("float").alias('fare_amount'),
               df['pickup_longitude'].cast("float").alias('pickup_longitude'),
               df['pickup_latitude'].cast("float").alias('pickup_latitude'),
               df['dropoff_longitude'].cast("float").alias('dropoff_longitude'),
               df['dropoff_latitude'].cast("float").alias('dropoff_latitude'),
               df['passenger_count'].cast("float").alias('passenger_count'))

df = df.selectExpr("fare_amount as label",'pickup_longitude','pickup_latitude',
                  'dropoff_longitude','dropoff_latitude','passenger_count')

#colum features
vecAssembler = VectorAssembler(inputCols=["pickup_longitude", "pickup_latitude",
                                          "dropoff_longitude", "dropoff_latitude",
                                          "passenger_count"], outputCol="features")

#Delete null rows
df_local_clust = vecAssembler.setHandleInvalid("skip").transform(df)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = df_local_clust.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeRegressor()

start_time = datetime.now()

# Train model.  This also runs the indexer.
model = dt.fit(trainingData.select('label','features'))

time_elapsed = datetime.now() - start_time 
print('TIME OF DECISION TREE ON LOCAL CLUSTER TRAINING (hh:mm:ss.ms) {}'.format(time_elapsed))

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

TIME OF DECISION TREE ON LOCAL CLUSTER TRAINING (hh:mm:ss.ms) 0:11:35.108962
+-----------------+-----+--------------------+
|       prediction|label|            features|
+-----------------+-----+--------------------+
|8.742626065536477|-62.0|[-74.002838134765...|
|8.742626065536477|-10.1|[-73.983306884765...|
|8.742626065536477| -6.5|[-73.981338500976...|
|8.742626065536477| -6.0|[-73.987518310546...|
|8.742626065536477| -5.0|[-73.990974426269...|
+-----------------+-----+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 6.96171


# Random forest regression local cluster

In [22]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils

spark = SparkSession.builder.master('local[*]').appName('taxi_fare_local_cluster').config('spark.driver.memory', '8g').getOrCreate()

# Load training data
df = spark.read.csv('data/train.csv', header=True,
                    inferSchema=True, nullValue=' ')

df = df.select(df['fare_amount'].cast("float").alias('fare_amount'),
               df['pickup_longitude'].cast("float").alias('pickup_longitude'),
               df['pickup_latitude'].cast("float").alias('pickup_latitude'),
               df['dropoff_longitude'].cast("float").alias('dropoff_longitude'),
               df['dropoff_latitude'].cast("float").alias('dropoff_latitude'),
               df['passenger_count'].cast("float").alias('passenger_count'))


df = df.selectExpr("fare_amount as label",'pickup_longitude','pickup_latitude',
                  'dropoff_longitude','dropoff_latitude','passenger_count')

#colum features
vecAssembler = VectorAssembler(inputCols=["pickup_longitude", "pickup_latitude",
                                          "dropoff_longitude", "dropoff_latitude",
                                          "passenger_count"], outputCol="features")

#Delete null rows
df_local_clust = vecAssembler.setHandleInvalid("skip").transform(df)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = df_local_clust.randomSplit([0.7, 0.3])

# Train a RandomForest model.
#  Note: Use larger numTrees in practice.
#  Setting featureSubsetStrategy="auto" lets the algorithm choose.
rf = RandomForestRegressor()

start_time = datetime.now()

# Train model.  This also runs the indexer.
model = rf.fit(trainingData.select('label','features'))

time_elapsed = datetime.now() - start_time 
print('TIME OF RANDOM FOREST ON LOCAL CLUSTER TRAINING (hh:mm:ss.ms) {}'.format(time_elapsed))

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

TIME OF RANDOM FOREST ON LOCAL CLUSTER TRAINING (hh:mm:ss.ms) 0:18:30.459010
+-----------------+------+--------------------+
|       prediction| label|            features|
+-----------------+------+--------------------+
| 8.96474941906633| -62.0|[-74.002838134765...|
|31.47212434781009|-29.87|[-73.863159179687...|
|12.33413473421554| -18.1|[-73.958274841308...|
| 8.96474941906633| -10.1|[-73.983306884765...|
| 8.96474941906633|  -5.0|[-73.990974426269...|
+-----------------+------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 23.9699


## Conclusion

#### As we can appreciate the Root Mean Squared Error is the same because we use the same algorithm, but if we check the time is 58% faster working with the cluster, even if the are local.