# Big Data Project : Spark MLlib vs Apache Mahout 
### Apache Spark MLLIB Nootbook

In [1]:
!pip install pyspark

Collecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.0/199.0 kB[0m [31m172.3 kB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: py4j
Successfully installed py4j-0.10.9.3


# Importing PySpark

In [None]:
import pyspark

# Creating Spark Session

In [2]:
import pyspark

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('Project').getOrCreate()

In [5]:
spark

# Loading Dataset

In [93]:
data = spark.read.csv('MasterFile.csv')

In [94]:
data = spark.read.option('header','true').csv('MasterFile.csv', inferSchema = True)

In [95]:
type(data)

pyspark.sql.dataframe.DataFrame

# Data Pre - Processing

In [96]:
data.head(3)

[Row(Accident_Index='200501BS00001', Location_Easting_OSGR=525680.0, Location_Northing_OSGR=178240.0, Longitude=-0.19117, Latitude=51.489096, Police_Force=1, Accident_Severity=2, Number_of_Vehicles=1, Number_of_Casualties=1, Date='04/01/2005', Day_of_Week=3, Time='17:42', Local_Authority_(District)=12, Local_Authority_(Highway)='E09000020', 1st_Road_Class=3, 1st_Road_Number=3218, Road_Type='Single carriageway', Speed_limit=30, Junction_Detail=None, Junction_Control=None, 2nd_Road_Class=-1, 2nd_Road_Number=0, Pedestrian_Crossing-Human_Control='None within 50 metres', Pedestrian_Crossing-Physical_Facilities='Zebra crossing', Light_Conditions='Daylight: Street light present', Weather_Conditions='Raining without high winds', Road_Surface_Conditions='Wet/Damp', Special_Conditions_at_Site='None', Carriageway_Hazards='None', Urban_or_Rural_Area=1, Did_Police_Officer_Attend_Scene_of_Accident='Yes', LSOA_of_Accident_Location='E01002849', Year=2005),
 Row(Accident_Index='200501BS00002', Location

In [97]:
data.printSchema()

root
 |-- Accident_Index: string (nullable = true)
 |-- Location_Easting_OSGR: double (nullable = true)
 |-- Location_Northing_OSGR: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Police_Force: integer (nullable = true)
 |-- Accident_Severity: integer (nullable = true)
 |-- Number_of_Vehicles: integer (nullable = true)
 |-- Number_of_Casualties: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Day_of_Week: integer (nullable = true)
 |-- Time: string (nullable = true)
 |-- Local_Authority_(District): integer (nullable = true)
 |-- Local_Authority_(Highway): string (nullable = true)
 |-- 1st_Road_Class: integer (nullable = true)
 |-- 1st_Road_Number: integer (nullable = true)
 |-- Road_Type: string (nullable = true)
 |-- Speed_limit: integer (nullable = true)
 |-- Junction_Detail: string (nullable = true)
 |-- Junction_Control: string (nullable = true)
 |-- 2nd_Road_Class: integer (nullable = true)
 |-- 2nd

In [98]:
data.columns

['Accident_Index',
 'Location_Easting_OSGR',
 'Location_Northing_OSGR',
 'Longitude',
 'Latitude',
 'Police_Force',
 'Accident_Severity',
 'Number_of_Vehicles',
 'Number_of_Casualties',
 'Date',
 'Day_of_Week',
 'Time',
 'Local_Authority_(District)',
 'Local_Authority_(Highway)',
 '1st_Road_Class',
 '1st_Road_Number',
 'Road_Type',
 'Speed_limit',
 'Junction_Detail',
 'Junction_Control',
 '2nd_Road_Class',
 '2nd_Road_Number',
 'Pedestrian_Crossing-Human_Control',
 'Pedestrian_Crossing-Physical_Facilities',
 'Light_Conditions',
 'Weather_Conditions',
 'Road_Surface_Conditions',
 'Special_Conditions_at_Site',
 'Carriageway_Hazards',
 'Urban_or_Rural_Area',
 'Did_Police_Officer_Attend_Scene_of_Accident',
 'LSOA_of_Accident_Location',
 'Year']

In [99]:
data.select('Light_Conditions')

DataFrame[Light_Conditions: string]

In [100]:
data.select('Light_Conditions').show()

+--------------------+
|    Light_Conditions|
+--------------------+
|Daylight: Street ...|
|Darkness: Street ...|
|Darkness: Street ...|
|Daylight: Street ...|
|Darkness: Street ...|
|Daylight: Street ...|
|Darkness: Street ...|
|Daylight: Street ...|
|Darkness: Street ...|
|Daylight: Street ...|
|Darkness: Street ...|
|Darkness: Street ...|
|Daylight: Street ...|
|Darkness: Street ...|
|Daylight: Street ...|
|Daylight: Street ...|
|Darkness: Street ...|
|Daylight: Street ...|
|Darkness: Street ...|
|Darkness: Street ...|
+--------------------+
only showing top 20 rows



## Dropping Unique Features

In [101]:
filtered_data = data.drop('Time')

In [102]:
filtered_data = filtered_data.drop('Date')

In [103]:
filtered_data = filtered_data.drop('Accident_Index')

In [104]:
filtered_data = filtered_data.drop('LSOA_of_Accident_Location')

In [105]:
filtered_data = filtered_data.drop('Local_Authority_(Highway)')

In [106]:
filtered_data.columns

['Location_Easting_OSGR',
 'Location_Northing_OSGR',
 'Longitude',
 'Latitude',
 'Police_Force',
 'Accident_Severity',
 'Number_of_Vehicles',
 'Number_of_Casualties',
 'Day_of_Week',
 'Local_Authority_(District)',
 '1st_Road_Class',
 '1st_Road_Number',
 'Road_Type',
 'Speed_limit',
 'Junction_Detail',
 'Junction_Control',
 '2nd_Road_Class',
 '2nd_Road_Number',
 'Pedestrian_Crossing-Human_Control',
 'Pedestrian_Crossing-Physical_Facilities',
 'Light_Conditions',
 'Weather_Conditions',
 'Road_Surface_Conditions',
 'Special_Conditions_at_Site',
 'Carriageway_Hazards',
 'Urban_or_Rural_Area',
 'Did_Police_Officer_Attend_Scene_of_Accident',
 'Year']

In [107]:
filtered_data = filtered_data.drop('Junction_Detail')

In [108]:
filtered_data = filtered_data.drop('Junction_Control')

In [111]:
filtered_data.columns

['Location_Easting_OSGR',
 'Location_Northing_OSGR',
 'Longitude',
 'Latitude',
 'Police_Force',
 'Accident_Severity',
 'Number_of_Vehicles',
 'Number_of_Casualties',
 'Day_of_Week',
 'Local_Authority_(District)',
 '1st_Road_Class',
 '1st_Road_Number',
 'Road_Type',
 'Speed_limit',
 '2nd_Road_Class',
 '2nd_Road_Number',
 'Pedestrian_Crossing-Human_Control',
 'Pedestrian_Crossing-Physical_Facilities',
 'Light_Conditions',
 'Weather_Conditions',
 'Road_Surface_Conditions',
 'Special_Conditions_at_Site',
 'Carriageway_Hazards',
 'Urban_or_Rural_Area',
 'Did_Police_Officer_Attend_Scene_of_Accident',
 'Year']

In [112]:
filtered_data.count()

1504150

## Checking For Null Values

In [120]:
import pyspark.sql.functions as F

df_agg = filtered_data.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in filtered_data.columns])

In [121]:
df_agg.show()

+---------------------+----------------------+---------+--------+------------+-----------------+------------------+--------------------+-----------+--------------------------+--------------+---------------+---------+-----------+--------------+---------------+---------------------------------+---------------------------------------+----------------+------------------+-----------------------+--------------------------+-------------------+-------------------+-------------------------------------------+----+
|Location_Easting_OSGR|Location_Northing_OSGR|Longitude|Latitude|Police_Force|Accident_Severity|Number_of_Vehicles|Number_of_Casualties|Day_of_Week|Local_Authority_(District)|1st_Road_Class|1st_Road_Number|Road_Type|Speed_limit|2nd_Road_Class|2nd_Road_Number|Pedestrian_Crossing-Human_Control|Pedestrian_Crossing-Physical_Facilities|Light_Conditions|Weather_Conditions|Road_Surface_Conditions|Special_Conditions_at_Site|Carriageway_Hazards|Urban_or_Rural_Area|Did_Police_Officer_Attend_Scen

### Aggregating Null Values

In [122]:
from functools import reduce
df_agg_col = reduce(
    lambda a, b: a.union(b),
    (
        df_agg.select(F.lit(c).alias("Column_Name"), F.col(c).alias("NULL_Count")) 
        for c in df_agg.columns
    )
)
df_agg_col.show()

+--------------------+----------+
|         Column_Name|NULL_Count|
+--------------------+----------+
|Location_Easting_...|       101|
|Location_Northing...|       101|
|           Longitude|       101|
|            Latitude|       101|
|        Police_Force|         0|
|   Accident_Severity|         0|
|  Number_of_Vehicles|         0|
|Number_of_Casualties|         0|
|         Day_of_Week|         0|
|Local_Authority_(...|         0|
|      1st_Road_Class|         0|
|     1st_Road_Number|         0|
|           Road_Type|         0|
|         Speed_limit|         0|
|      2nd_Road_Class|         0|
|     2nd_Road_Number|         0|
|Pedestrian_Crossi...|        17|
|Pedestrian_Crossi...|        34|
|    Light_Conditions|         0|
|  Weather_Conditions|       126|
+--------------------+----------+
only showing top 20 rows



### Dropping Null Values

In [132]:
filter_data = filtered_data.dropna(how = 'any')

In [133]:
filter_data.count()

1499613

In [134]:
filtered_data.count()

1504150

In [135]:
len(filter_data.columns)

26

# One - Hot Encoding

## String Indexer

In [141]:
from pyspark.ml.feature import StringIndexer

In [138]:
  catCols = [x for (x,dataType) in filter_data.dtypes if dataType == "string"]

In [139]:
print(catCols)

['Road_Type', 'Pedestrian_Crossing-Human_Control', 'Pedestrian_Crossing-Physical_Facilities', 'Light_Conditions', 'Weather_Conditions', 'Road_Surface_Conditions', 'Special_Conditions_at_Site', 'Carriageway_Hazards', 'Did_Police_Officer_Attend_Scene_of_Accident']


In [142]:
string_indexer = StringIndexer(inputCols = ['Road_Type', 'Pedestrian_Crossing-Human_Control', 'Pedestrian_Crossing-Physical_Facilities', 'Light_Conditions', 'Weather_Conditions', 'Road_Surface_Conditions', 'Special_Conditions_at_Site', 'Carriageway_Hazards', 'Did_Police_Officer_Attend_Scene_of_Accident'],
                               outputCols = ['Road_Type_StringIndexer', 'Pedestrian_Crossing-Human_Control_StringIndexer', 'Pedestrian_Crossing-Physical_Facilities_StringIndexer', 'Light_Conditions_StringIndexer', 'Weather_Conditions_StringIndexer', 'Road_Surface_Conditions_StringIndexer', 'Special_Conditions_at_Site_StringIndexer', 'Carriageway_Hazards_StringIndexer', 'Did_Police_Officer_Attend_Scene_of_Accident_StringIndexer'], handleInvalid = "skip")

In [143]:
indexed = string_indexer.fit(filter_data).transform(filter_data)

In [144]:
indexed.printSchema()

root
 |-- Location_Easting_OSGR: double (nullable = true)
 |-- Location_Northing_OSGR: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Police_Force: integer (nullable = true)
 |-- Accident_Severity: integer (nullable = true)
 |-- Number_of_Vehicles: integer (nullable = true)
 |-- Number_of_Casualties: integer (nullable = true)
 |-- Day_of_Week: integer (nullable = true)
 |-- Local_Authority_(District): integer (nullable = true)
 |-- 1st_Road_Class: integer (nullable = true)
 |-- 1st_Road_Number: integer (nullable = true)
 |-- Road_Type: string (nullable = true)
 |-- Speed_limit: integer (nullable = true)
 |-- 2nd_Road_Class: integer (nullable = true)
 |-- 2nd_Road_Number: integer (nullable = true)
 |-- Pedestrian_Crossing-Human_Control: string (nullable = true)
 |-- Pedestrian_Crossing-Physical_Facilities: string (nullable = true)
 |-- Light_Conditions: string (nullable = true)
 |-- Weather_Conditions: string (nullable = tr

In [146]:
from pyspark.ml.feature import OneHotEncoder

In [148]:
one_hot_encoder = OneHotEncoder(inputCols = [f"{x}_StringIndexer" for x in catCols], outputCols = [f"{x}_OneHotEncoder" for x in catCols])

In [149]:
model = one_hot_encoder.fit(indexed).transform(indexed)

In [150]:
model.printSchema()

root
 |-- Location_Easting_OSGR: double (nullable = true)
 |-- Location_Northing_OSGR: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Police_Force: integer (nullable = true)
 |-- Accident_Severity: integer (nullable = true)
 |-- Number_of_Vehicles: integer (nullable = true)
 |-- Number_of_Casualties: integer (nullable = true)
 |-- Day_of_Week: integer (nullable = true)
 |-- Local_Authority_(District): integer (nullable = true)
 |-- 1st_Road_Class: integer (nullable = true)
 |-- 1st_Road_Number: integer (nullable = true)
 |-- Road_Type: string (nullable = true)
 |-- Speed_limit: integer (nullable = true)
 |-- 2nd_Road_Class: integer (nullable = true)
 |-- 2nd_Road_Number: integer (nullable = true)
 |-- Pedestrian_Crossing-Human_Control: string (nullable = true)
 |-- Pedestrian_Crossing-Physical_Facilities: string (nullable = true)
 |-- Light_Conditions: string (nullable = true)
 |-- Weather_Conditions: string (nullable = tr

In [151]:
from pyspark.ml.feature import VectorAssembler

In [152]:
vector_assembler = VectorAssembler(
    inputCols = ["Location_Easting_OSGR","Location_Northing_OSGR","Longitude","Latitude","Police_Force","Number_of_Vehicles","Number_of_Casualties","Day_of_Week","Local_Authority_(District)","1st_Road_Class","1st_Road_Number","Speed_limit","2nd_Road_Class","2nd_Road_Number","Urban_or_Rural_Area","Year","Road_Type_OneHotEncoder","Pedestrian_Crossing-Human_Control_OneHotEncoder","Pedestrian_Crossing-Physical_Facilities_OneHotEncoder","Light_Conditions_OneHotEncoder","Weather_Conditions_OneHotEncoder","Road_Surface_Conditions_OneHotEncoder","Special_Conditions_at_Site_OneHotEncoder","Carriageway_Hazards_OneHotEncoder","Did_Police_Officer_Attend_Scene_of_Accident_OneHotEncoder"], outputCol = "features"
)

In [153]:
output = vector_assembler.transform(model)

In [154]:
model_filter_data = output.select("features","Accident_Severity")

# Machine Learning Models

In [159]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler

### Train - Test Split

In [166]:
train , test = model_filter_data.randomSplit([0.7,0.3])
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 1051002
Test Dataset Count: 448611


## Decision Tree Classifier

In [160]:
%%time
decision_tree = DecisionTreeClassifier(featuresCol = 'features',labelCol = 'Accident_Severity')

CPU times: user 9.91 ms, sys: 0 ns, total: 9.91 ms
Wall time: 1.23 s


In [165]:
#import time
#start_time = time.time()
start = time.perf_counter()
decision_tree.fit(train)
end = time.perf_counter()
duration = format((end-start),'.4f')
print("Time in Seconds :", duration)

Time in Seconds : 176.3336


In [210]:
model = decision_tree.fit(train)

In [211]:
prediction_test = model.transform(test)

In [212]:
predictionAndLabels = prediction_test.select("Accident_Severity","prediction").rdd

In [213]:
evaluator = MulticlassClassificationEvaluator(labelCol = "Accident_Severity", predictionCol = "prediction", metricName = "accuracy")
Accuracy_DT = evaluator.evaluate(prediction_test)
print("Accuracy Score of Decision Tree:" , Accuracy_DT)

Accuracy Score of Decision Tree: 0.8515105514577217


## Random Forest Classifier

In [167]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'Accident_Severity')

In [168]:
start = time.perf_counter()
rf.fit(train)
end = time.perf_counter()
duration = format((end-start),'.4f')
print("Random Forest Classifier Time in Seconds :", duration)

Random Forest Classifier Time in Seconds : 347.4570


In [205]:
model = rf.fit(train)

In [206]:
prediction_test = model.transform(test)

In [208]:
predictionAndLabels = prediction_test.select("Accident_Severity","prediction").rdd

In [214]:
evaluator = MulticlassClassificationEvaluator(labelCol = "Accident_Severity", predictionCol = "prediction", metricName = "accuracy")
Accuracy_RF = evaluator.evaluate(prediction_test)
print("Accuracy Score of Random Forest:" , Accuracy_RF)

Accuracy Score of Random Forest: 0.8515105514577217


## Logistic Regression

In [171]:
from pyspark.ml.classification import LogisticRegression

log_reg = LogisticRegression(featuresCol='features',labelCol='Accident_Severity')


In [181]:
start = time.perf_counter()
log_reg.fit(train)
end = time.perf_counter()
duration = format((end-start),'.4f')
print("Logistic Regression Time in Seconds :", duration)

Logistic Regression Time in Seconds : 107.3300


In [189]:
log_reg2 = LogisticRegression(featuresCol='features',labelCol='Accident_Severity', maxIter = 20)
model = log_reg2.fit(train)

In [191]:
prediction_test = model.transform(test)

In [192]:
prediction_test.show()

+--------------------+-----------------+--------------------+--------------------+----------+
|            features|Accident_Severity|       rawPrediction|         probability|prediction|
+--------------------+-----------------+--------------------+--------------------+----------+
|(57,[0,1,2,3,4,5,...|                3|[-8.7757203119566...|[3.38519414336258...|       3.0|
|(57,[0,1,2,3,4,5,...|                3|[-8.7757869155237...|[3.09446539477171...|       3.0|
|(57,[0,1,2,3,4,5,...|                3|[-8.7759778220738...|[5.12729244314534...|       3.0|
|(57,[0,1,2,3,4,5,...|                3|[-8.7759765829342...|[5.21232819862944...|       3.0|
|(57,[0,1,2,3,4,5,...|                3|[-8.7759338932484...|[3.85303528423077...|       3.0|
|(57,[0,1,2,3,4,5,...|                3|[-8.7758947983594...|[3.81508451356216...|       3.0|
|(57,[0,1,2,3,4,5,...|                3|[-8.7758880185192...|[3.77716040099032...|       3.0|
|(57,[0,1,2,3,4,5,...|                3|[-8.7758832711596...

In [193]:
predictionAndLabels = prediction_test.select("Accident_Severity","prediction").rdd

In [200]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [199]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

In [204]:
evaluator = MulticlassClassificationEvaluator(labelCol = "Accident_Severity", predictionCol = "prediction", metricName = "accuracy")
Accuracy_LR = evaluator.evaluate(prediction_test)
print("Accuracy Score of Logistic Regression:" , Accuracy_LR)

Accuracy Score of Logistic Regression: 0.851497176841406
