# Spark ML Model for Cab Analysis

## 1. PySpark environment setup

In [1]:
import findspark
findspark.init()

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

ModuleNotFoundError: No module named 'findspark'

## 2. Data source and Spark data abstraction setup (Data Collection)


In [85]:
CabsRawDF = spark.read\
                    .format("csv")\
                    .option("inferSchema", "true") \
                    .option('header', 'true')\
                    .load("cabs.csv")
CabsRawDF.show(5)

                                                                                

+---+--------+--------+-------------+----------------+-----+----------------+--------------------+------------+------------+---+-----+----+----+---------------+
|_c0|distance|cab_type|  destination|          source|price|surge_multiplier|                  id|  product_id|        name|day|month|year|hour|           time|
+---+--------+--------+-------------+----------------+-----+----------------+--------------------+------------+------------+---+-----+----+----+---------------+
|  0|    0.44|    Lyft|North Station|Haymarket Square|  5.0|             1.0|424553bb-7174-41e...|   lyft_line|      Shared| 16|   12|2018|   9|09:30:07.890000|
|  1|    0.44|    Lyft|North Station|Haymarket Square| 11.0|             1.0|4bd23055-6827-41c...|lyft_premier|         Lux| 27|   11|2018|   2|02:00:23.677000|
|  2|    0.44|    Lyft|North Station|Haymarket Square|  7.0|             1.0|981a3613-77af-462...|        lyft|        Lyft| 28|   11|2018|   1|01:00:22.198000|
|  3|    0.44|    Lyft|North Stati

## 3. Data set understanding (Data Cleansing)
Let's do some *Exploratory Data Analysis* to understand our data better and do some cleansing to move data forward in the workflow:

In [86]:
rowsNumber = CabsRawDF.count()
print ("Total number of row: ", rowsNumber)
print ("List of columns and data types:")
CabsRawDF.dtypes

Total number of row:  693071
List of columns and data types:


                                                                                

[('_c0', 'int'),
 ('distance', 'double'),
 ('cab_type', 'string'),
 ('destination', 'string'),
 ('source', 'string'),
 ('price', 'double'),
 ('surge_multiplier', 'double'),
 ('id', 'string'),
 ('product_id', 'string'),
 ('name', 'string'),
 ('day', 'int'),
 ('month', 'int'),
 ('year', 'int'),
 ('hour', 'int'),
 ('time', 'string')]

Let's **reduce the number of columns** we're going to work with, and **convert some of the columns to the right datatype**:

In [87]:
from pyspark.sql.functions import col

CabsRawDF = CabsRawDF.select(col('distance').cast('float'),\
                                   col('source'),\
                                   col('destination'),\
                                   col('name'),\
                                   col('price'))
CabsRawDF.show()

+--------+----------------+--------------------+------------+-----+
|distance|          source|         destination|        name|price|
+--------+----------------+--------------------+------------+-----+
|    0.44|Haymarket Square|       North Station|      Shared|  5.0|
|    0.44|Haymarket Square|       North Station|         Lux| 11.0|
|    0.44|Haymarket Square|       North Station|        Lyft|  7.0|
|    0.44|Haymarket Square|       North Station|Lux Black XL| 26.0|
|    0.44|Haymarket Square|       North Station|     Lyft XL|  9.0|
|    0.44|Haymarket Square|       North Station|   Lux Black| 16.5|
|    1.08|        Back Bay|Northeastern Univ...|     Lyft XL| 10.5|
|    1.08|        Back Bay|Northeastern Univ...|   Lux Black| 16.5|
|    1.08|        Back Bay|Northeastern Univ...|      Shared|  3.0|
|    1.08|        Back Bay|Northeastern Univ...|Lux Black XL| 27.5|
|    1.08|        Back Bay|Northeastern Univ...|         Lux| 13.5|
|    1.08|        Back Bay|Northeastern Univ...|

Let's **check if there are null values** that we need to remove before moving forward:

In [88]:
from pyspark.sql.functions import isnull, when, count, col

CabsRawDF.select([count(when(isnull(c), c)).alias(c) for c in CabsRawDF.columns])\
            .show()

[Stage 174:>                                                        (0 + 2) / 2]

+--------+------+-----------+----+-----+
|distance|source|destination|name|price|
+--------+------+-----------+----+-----+
|       0|     0|          0|   0|55095|
+--------+------+-----------+----+-----+



                                                                                

In [89]:
CabsDF = CabsRawDF.dropna(how='any')

## 4. Feature Engineering
It's time to **convert data into a suitable format** for machine learning algorithms.

In order to do so, we are going to *encode* their values by using something called [StringIndexer](https://spark.apache.org/docs/latest/ml-features#stringindexer):

In [90]:
from pyspark.ml.feature import StringIndexer

CabsNameDF = StringIndexer(inputCol='name',\
                                outputCol='Cab Type',\
                                handleInvalid='keep').fit(CabsDF).transform(CabsDF)
CabsNameSourceDF = StringIndexer(inputCol='source',\
                                       outputCol='Origin',\
                                       handleInvalid='keep').fit(CabsNameDF).transform(CabsNameDF)
CabsNameRideDF = StringIndexer(inputCol='destination',\
                                       outputCol='Final Stop',\
                                       handleInvalid='keep').fit(CabsNameSourceDF).transform(CabsNameSourceDF)

CabsNameRideDF.show()

                                                                                

+--------+----------------+--------------------+------------+-----+--------+------+----------+
|distance|          source|         destination|        name|price|Cab Type|Origin|Final Stop|
+--------+----------------+--------------------+------------+-----+--------+------+----------+
|    0.44|Haymarket Square|       North Station|      Shared|  5.0|    11.0|   8.0|      11.0|
|    0.44|Haymarket Square|       North Station|         Lux| 11.0|     6.0|   8.0|      11.0|
|    0.44|Haymarket Square|       North Station|        Lyft|  7.0|     9.0|   8.0|      11.0|
|    0.44|Haymarket Square|       North Station|Lux Black XL| 26.0|     8.0|   8.0|      11.0|
|    0.44|Haymarket Square|       North Station|     Lyft XL|  9.0|    10.0|   8.0|      11.0|
|    0.44|Haymarket Square|       North Station|   Lux Black| 16.5|     7.0|   8.0|      11.0|
|    1.08|        Back Bay|Northeastern Univ...|     Lyft XL| 10.5|    10.0|   1.0|       6.0|
|    1.08|        Back Bay|Northeastern Univ...|  

Let's check if the **data types of the new two columns** are numeric, and let's get rid of the original ones:

In [91]:
CabsNameRideDF.dtypes

[('distance', 'float'),
 ('source', 'string'),
 ('destination', 'string'),
 ('name', 'string'),
 ('price', 'double'),
 ('Cab Type', 'double'),
 ('Origin', 'double'),
 ('Final Stop', 'double')]

In [92]:
CabsNameRideDF = CabsNameRideDF.drop('name','destination','source')
CabsNameRideDF.show()

+--------+-----+--------+------+----------+
|distance|price|Cab Type|Origin|Final Stop|
+--------+-----+--------+------+----------+
|    0.44|  5.0|    11.0|   8.0|      11.0|
|    0.44| 11.0|     6.0|   8.0|      11.0|
|    0.44|  7.0|     9.0|   8.0|      11.0|
|    0.44| 26.0|     8.0|   8.0|      11.0|
|    0.44|  9.0|    10.0|   8.0|      11.0|
|    0.44| 16.5|     7.0|   8.0|      11.0|
|    1.08| 10.5|    10.0|   1.0|       6.0|
|    1.08| 16.5|     7.0|   1.0|       6.0|
|    1.08|  3.0|    11.0|   1.0|       6.0|
|    1.08| 27.5|     8.0|   1.0|       6.0|
|    1.08| 13.5|     6.0|   1.0|       6.0|
|    1.08|  7.0|     9.0|   1.0|       6.0|
|    1.11| 12.0|     1.0|   4.0|       9.0|
|    1.11| 16.0|     3.0|   4.0|       9.0|
|    1.11|  7.5|     4.0|   4.0|       9.0|
|    1.11|  7.5|     2.0|   4.0|       9.0|
|    1.11| 26.0|     0.0|   4.0|       9.0|
|    1.11|  5.5|     5.0|   4.0|       9.0|
|    0.72| 11.0|    10.0|  11.0|       4.0|
|    0.72| 16.5|     7.0|  11.0|


As the **"Cab Type" variable** is the one we want to predict, **all the other variables** will be considered to build the **list with required features**:

In [93]:
from pyspark.ml.feature import VectorAssembler

required_features = ['distance','price','Origin','Final Stop']
assembler = VectorAssembler(inputCols=required_features,\
                            outputCol='features')
CabsFeatureddDF = assembler.transform(CabsNameRideDF)

CabsFeatureddDF.show()

+--------+-----+--------+------+----------+--------------------+
|distance|price|Cab Type|Origin|Final Stop|            features|
+--------+-----+--------+------+----------+--------------------+
|    0.44|  5.0|    11.0|   8.0|      11.0|[0.43999999761581...|
|    0.44| 11.0|     6.0|   8.0|      11.0|[0.43999999761581...|
|    0.44|  7.0|     9.0|   8.0|      11.0|[0.43999999761581...|
|    0.44| 26.0|     8.0|   8.0|      11.0|[0.43999999761581...|
|    0.44|  9.0|    10.0|   8.0|      11.0|[0.43999999761581...|
|    0.44| 16.5|     7.0|   8.0|      11.0|[0.43999999761581...|
|    1.08| 10.5|    10.0|   1.0|       6.0|[1.08000004291534...|
|    1.08| 16.5|     7.0|   1.0|       6.0|[1.08000004291534...|
|    1.08|  3.0|    11.0|   1.0|       6.0|[1.08000004291534...|
|    1.08| 27.5|     8.0|   1.0|       6.0|[1.08000004291534...|
|    1.08| 13.5|     6.0|   1.0|       6.0|[1.08000004291534...|
|    1.08|  7.0|     9.0|   1.0|       6.0|[1.08000004291534...|
|    1.11| 12.0|     1.0|

## 5. Model training


In [94]:
from pyspark.ml.classification import RandomForestClassifier

# 1. Split the current data set to get the training (80%) and test (20%) data sets.
(trainingDF, testDF) = CabsFeatureddDF.randomSplit([0.8,0.2])

# 2. Initialize the algorithm to understand our featured data set.
rfcAlgorithm = RandomForestClassifier(labelCol='Cab Type',\
                                      featuresCol='features',\
                                      maxDepth=5)

#3. Train the algorithm to build the model and apply it on the test data set.
model = rfcAlgorithm.fit(trainingDF)
predictions = model.transform(testDF)
predictions.dtypes

                                                                                

[('distance', 'float'),
 ('price', 'double'),
 ('Cab Type', 'double'),
 ('Origin', 'double'),
 ('Final Stop', 'double'),
 ('features', 'vector'),
 ('rawPrediction', 'vector'),
 ('probability', 'vector'),
 ('prediction', 'double')]

## 6. Model evaluation

In [95]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol='Cab Type',\
                                              predictionCol='prediction',\
                                              metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print('The accuracy of our model is ', accuracy)



The accuracy of our model is  0.4894776886211576


                                                                                