In [None]:
!wget -q https://downloads.apache.org/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:

!ls /usr/lib/jvm/

default-java  java-1.11.0-openjdk-amd64  java-11-openjdk-amd64


In [None]:

!pip install h2o-pysparkling-3.0

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
from pyspark import SparkContext
SparkContext.setSystemProperty('spark.executor.memory', '2g')
SparkContext.setSystemProperty('spark.driver.memory', '2g')
SparkContext.setSystemProperty('spark.memory.fraction', '0.9')

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
import sys,tempfile, urllib

In [None]:
!ls

DfTRoadSafety_Accidents_2014.csv  spark-3.0.0-bin-hadoop2.7.tgz
sample_data			  spark-3.0.0-bin-hadoop2.7.tgz.1
spark-3.0.0-bin-hadoop2.7	  spark-3.0.0-bin-hadoop2.7.tgz.2


In [None]:
uk_acc_2014_df = spark.read.option("inferSchema", "true").csv("DfTRoadSafety_Accidents_2014.csv", header=True)

In [None]:

uk_acc_2014_df.show()

+--------------+---------------------+----------------------+---------+---------+------------+-----------------+------------------+--------------------+----------+-----------+-----+--------------------------+-------------------------+--------------+---------------+---------+-----------+---------------+----------------+--------------+---------------+---------------------------------+---------------------------------------+----------------+------------------+-----------------------+--------------------------+-------------------+-------------------+-------------------------------------------+-------------------------+
|Accident_Index|Location_Easting_OSGR|Location_Northing_OSGR|Longitude| Latitude|Police_Force|Accident_Severity|Number_of_Vehicles|Number_of_Casualties|      Date|Day_of_Week| Time|Local_Authority_(District)|Local_Authority_(Highway)|1st_Road_Class|1st_Road_Number|Road_Type|Speed_limit|Junction_Detail|Junction_Control|2nd_Road_Class|2nd_Road_Number|Pedestrian_Crossing-Human_

In [None]:
uk_acc_2014_df.groupBy("Did_Police_Officer_Attend_Scene_of_Accident").count().show()

+-------------------------------------------+------+
|Did_Police_Officer_Attend_Scene_of_Accident| count|
+-------------------------------------------+------+
|                                          1|119607|
|                                          2| 26715|
+-------------------------------------------+------+



In [None]:
from pyspark.sql.functions import isnan, when, count, col
uk_acc_2014_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in uk_acc_2014_df.columns]).show()

+--------------+---------------------+----------------------+---------+--------+------------+-----------------+------------------+--------------------+----+-----------+----+--------------------------+-------------------------+--------------+---------------+---------+-----------+---------------+----------------+--------------+---------------+---------------------------------+---------------------------------------+----------------+------------------+-----------------------+--------------------------+-------------------+-------------------+-------------------------------------------+-------------------------+
|Accident_Index|Location_Easting_OSGR|Location_Northing_OSGR|Longitude|Latitude|Police_Force|Accident_Severity|Number_of_Vehicles|Number_of_Casualties|Date|Day_of_Week|Time|Local_Authority_(District)|Local_Authority_(Highway)|1st_Road_Class|1st_Road_Number|Road_Type|Speed_limit|Junction_Detail|Junction_Control|2nd_Road_Class|2nd_Road_Number|Pedestrian_Crossing-Human_Control|Pedestri

In [None]:
uk_acc_2014_df.printSchema()

root
 |-- Accident_Index: string (nullable = true)
 |-- Location_Easting_OSGR: integer (nullable = true)
 |-- Location_Northing_OSGR: integer (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Police_Force: integer (nullable = true)
 |-- Accident_Severity: integer (nullable = true)
 |-- Number_of_Vehicles: integer (nullable = true)
 |-- Number_of_Casualties: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Day_of_Week: integer (nullable = true)
 |-- Time: string (nullable = true)
 |-- Local_Authority_(District): integer (nullable = true)
 |-- Local_Authority_(Highway): string (nullable = true)
 |-- 1st_Road_Class: integer (nullable = true)
 |-- 1st_Road_Number: integer (nullable = true)
 |-- Road_Type: integer (nullable = true)
 |-- Speed_limit: integer (nullable = true)
 |-- Junction_Detail: integer (nullable = true)
 |-- Junction_Control: integer (nullable = true)
 |-- 2nd_Road_Class: integer (nullable = true)
 |-

In [None]:
uk_acc_2014_df.describe().show()

+-------+--------------+---------------------+----------------------+-------------------+------------------+------------------+------------------+------------------+--------------------+----------+------------------+------+--------------------------+-------------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+---------------------------------+---------------------------------------+------------------+------------------+-----------------------+--------------------------+-------------------+-------------------+-------------------------------------------+-------------------------+
|summary|Accident_Index|Location_Easting_OSGR|Location_Northing_OSGR|          Longitude|          Latitude|      Police_Force| Accident_Severity|Number_of_Vehicles|Number_of_Casualties|      Date|       Day_of_Week|  Time|Local_Authority_(District)|Local_Authority_(Highway)|   1st_Road_Class|   1s

In [98]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import  StringIndexer, VectorAssembler
from pysparkling.ml import H2OAutoML

In [118]:
uk_acc_2014_df = uk_acc_2014_df.withColumn("Did_Police_Officer_Attend_Scene_of_Accident", uk_acc_2014_df.Did_Police_Officer_Attend_Scene_of_Accident.cast("string"))

In [121]:
## Create a list of predictor variables
predictors = list(uk_acc_2014_df.columns) 
#predictors.remove('Did_Police_Officer_Attend_Scene_of_Accident')  # Since that's the target variable
#Remove any variables which seem identity variables or provide a chance for overfitting.
remove_list = ['Accident_Index','Location_Easting_OSGR','Location_Northing_OSGR','LSOA_of_Accident_Location','Time',
 'Local_Authority_(District)','Local_Authority_(Highway)','Police_Force','Date']
for i in remove_list: 
    try: 
        predictors.remove(i) 
    except ValueError: 
        pass
predictors

['Longitude',
 'Latitude',
 'Accident_Severity',
 'Number_of_Vehicles',
 'Number_of_Casualties',
 'Day_of_Week',
 '1st_Road_Class',
 '1st_Road_Number',
 'Road_Type',
 'Speed_limit',
 'Junction_Detail',
 'Junction_Control',
 '2nd_Road_Class',
 '2nd_Road_Number',
 'Pedestrian_Crossing-Human_Control',
 'Pedestrian_Crossing-Physical_Facilities',
 'Light_Conditions',
 'Weather_Conditions',
 'Road_Surface_Conditions',
 'Special_Conditions_at_Site',
 'Carriageway_Hazards',
 'Urban_or_Rural_Area',
 'Did_Police_Officer_Attend_Scene_of_Accident']

In [122]:
predictors_df = uk_acc_2014_df.select(predictors)
(train_data, test_data) = predictors_df.randomSplit([0.8, 0.2], 24)

print("Records for training: " + str(train_data.count()))
print("Records for evaluation: " + str(test_data.count()))

Records for training: 117148
Records for evaluation: 29174


In [123]:
train_data.persist()
test_data.cache()

DataFrame[Longitude: double, Latitude: double, Accident_Severity: int, Number_of_Vehicles: int, Number_of_Casualties: int, Day_of_Week: int, 1st_Road_Class: int, 1st_Road_Number: int, Road_Type: int, Speed_limit: int, Junction_Detail: int, Junction_Control: int, 2nd_Road_Class: int, 2nd_Road_Number: int, Pedestrian_Crossing-Human_Control: int, Pedestrian_Crossing-Physical_Facilities: int, Light_Conditions: int, Weather_Conditions: int, Road_Surface_Conditions: int, Special_Conditions_at_Site: int, Carriageway_Hazards: int, Urban_or_Rural_Area: int, Did_Police_Officer_Attend_Scene_of_Accident: string]

In [126]:
from pysparkling import * # Import PySparkling
hc = H2OContext.getOrCreate(spark) # Start the H2OContext
aml = H2OAutoML(maxRuntimeSecs=60, splitRatio=0.9, labelCol="Did_Police_Officer_Attend_Scene_of_Accident")

Method getOrCreate with spark argument is deprecated. Please use either just getOrCreate() or if you need to pass extra H2OConf, use getOrCreate(conf). The spark argument will be removed in release 3.32.


In [130]:

h2Omodel = aml.fit(train_data)

In [131]:
aml.getLeaderboard().show()

+---+--------------------+------------------+-------------------+-------------------+--------------------+-------------------+-------------------+
|   |            model_id|               auc|            logloss|              aucpr|mean_per_class_error|               rmse|                mse|
+---+--------------------+------------------+-------------------+-------------------+--------------------+-------------------+-------------------+
|  0|StackedEnsemble_A...|0.7082639426294752|0.43317261438959154| 0.3490564401870879|  0.3528876852327917| 0.3693365085035248|0.13640945651357425|
|  1|StackedEnsemble_B...|0.7010855125260173|0.43850270852901746| 0.3329809860528052|   0.357620106291268| 0.3716497253685129|0.13812351836649106|
|  2|XGBoost_2_AutoML_...|0.6943691098102526| 0.5776361319792741| 0.3252376496607406| 0.35822920003390535|0.43956621389175804| 0.1932184563951348|
|  3|XGBoost_1_AutoML_...|0.6738142512835954| 0.5795769965525763| 0.3112945312873111|  0.3736389621067056|0.4405701601

In [132]:
predicted = h2Omodel.transform(test_data)

In [133]:
predicted.take(5)

[Row(Longitude=-3.593268, Latitude=54.54811, Accident_Severity=3, Number_of_Vehicles=1, Number_of_Casualties=3, Day_of_Week=1, 1st_Road_Class=6, 1st_Road_Number=0, Road_Type=9, Speed_limit=20, Junction_Detail=0, Junction_Control=-1, 2nd_Road_Class=-1, 2nd_Road_Number=0, Pedestrian_Crossing-Human_Control=0, Pedestrian_Crossing-Physical_Facilities=0, Light_Conditions=1, Weather_Conditions=1, Road_Surface_Conditions=1, Special_Conditions_at_Site=0, Carriageway_Hazards=2, Urban_or_Rural_Area=1, Did_Police_Officer_Attend_Scene_of_Accident='2', prediction='1'),
 Row(Longitude=-3.587683, Latitude=54.547645, Accident_Severity=3, Number_of_Vehicles=1, Number_of_Casualties=1, Day_of_Week=6, 1st_Road_Class=6, 1st_Road_Number=0, Road_Type=2, Speed_limit=20, Junction_Detail=6, Junction_Control=4, 2nd_Road_Class=6, 2nd_Road_Number=0, Pedestrian_Crossing-Human_Control=0, Pedestrian_Crossing-Physical_Facilities=0, Light_Conditions=1, Weather_Conditions=1, Road_Surface_Conditions=1, Special_Conditions_

In [139]:
h2Omodel.getModelDetails()

'{\n  "names": [\n    "Longitude",\n    "Latitude",\n    "Accident_Severity",\n    "Number_of_Vehicles",\n    "Number_of_Casualties",\n    "Day_of_Week",\n    "1st_Road_Class",\n    "1st_Road_Number",\n    "Road_Type",\n    "Speed_limit",\n    "Junction_Detail",\n    "Junction_Control",\n    "2nd_Road_Class",\n    "2nd_Road_Number",\n    "Pedestrian_Crossing-Human_Control",\n    "Pedestrian_Crossing-Physical_Facilities",\n    "Light_Conditions",\n    "Weather_Conditions",\n    "Road_Surface_Conditions",\n    "Special_Conditions_at_Site",\n    "Carriageway_Hazards",\n    "Urban_or_Rural_Area",\n    "Did_Police_Officer_Attend_Scene_of_Accident"\n  ],\n  "column_types": [\n    "Numeric",\n    "Numeric",\n    "Numeric",\n    "Numeric",\n    "Numeric",\n    "Numeric",\n    "Numeric",\n    "Numeric",\n    "Numeric",\n    "Numeric",\n    "Numeric",\n    "Numeric",\n    "Numeric",\n    "Numeric",\n    "Numeric",\n    "Numeric",\n    "Numeric",\n    "Numeric",\n    "Numeric",\n    "Numeric",\n 