# Inspection of data
We are going to load the data from the https://data.gov.uk/dataset/cb7ae6f0-4be6-4935-9277-47e5ce24a11f/road-safety-data pertaining to all collisions between the years of 2005 - 2014.

There are 3 files, the accident data, the casualties, and the vehicles.

We will look at each file individually in the following way.

 - Our plan is to split the columns into the three types of data, categorical, ordinal, and numerical.

 - Then we are going to have a look at the prevalence of missing data for each columns and make a decision as to keep, to drop a portion of, or drop entirely each column.

First we are going to load the accident data.

We have already loaded the data to its required position via the docker setup.

In [ ]:
implicit val spark: SparkSession = SparkSession.builder().getOrCreate()
val accidentsDF = spark.read.option("header", "true").csv("/tmp/data/Accidents0514.csv")
  .withColumn("Date", to_date($"Date", "dd/MM/yyyy"))
//   .sample(false, 0.01)
  .cache()

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@255d90ae
accidentsDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Accident_Index: string, Location_Easting_OSGR: string ... 30 more fields]


Lets just have a quick look at the accidents data first.

In [ ]:
val totalAccidents = accidentsDF.count()

totalAccidents: Long = 1640597


In [ ]:
accidentsDF.show()

+--------------+---------------------+----------------------+---------+---------+------------+-----------------+------------------+--------------------+----------+-----------+-----+--------------------------+-------------------------+--------------+---------------+---------+-----------+---------------+----------------+--------------+---------------+---------------------------------+---------------------------------------+----------------+------------------+-----------------------+--------------------------+-------------------+-------------------+-------------------------------------------+-------------------------+
|Accident_Index|Location_Easting_OSGR|Location_Northing_OSGR|Longitude| Latitude|Police_Force|Accident_Severity|Number_of_Vehicles|Number_of_Casualties|      Date|Day_of_Week| Time|Local_Authority_(District)|Local_Authority_(Highway)|1st_Road_Class|1st_Road_Number|Road_Type|Speed_limit|Junction_Detail|Junction_Control|2nd_Road_Class|2nd_Road_Number|Pedestrian_Crossing-Human_

So there appear to be 30 columns, lets have a look at the summar of each column.

In [ ]:
accidentsDF.describe()


res5: org.apache.spark.sql.DataFrame = [summary: string, Accident_Index: string ... 30 more fields]


Now lets split up the data into categorical, ordinal and numerical.

In [ ]:
val geoColumns = List(
  "Latitude",
  "Longitude")
val unimportantCategoricalColumns = List(
  "Location_Easting_OSGR",
  "Location_Northing_OSGR",
  "LSOA_of_Accident_Location")
val categoricalColumns = List(
  "Local_Authority_(District)",
  "Local_Authority_(Highway)",
  "1st_Road_Class",
  "1st_Road_Number",
  "Road_Type",
  "Junction_Detail",
  "Junction_Control",
  "2nd_Road_Class",
  "2nd_Road_Number",
  "Pedestrian_Crossing-Human_Control",
  "Pedestrian_Crossing-Physical_Facilities",
  "Light_Conditions",
  "Weather_Conditions",
  "Road_Surface_Conditions",
  "Special_Conditions_at_Site",
  "Carriageway_Hazards",
  "Urban_or_Rural_Area",
  "Did_Police_Officer_Attend_Scene_of_Accident")
val ordinalColumns = List(
  "Police_Force",
  "Accident_Severity",
  "day_of_year",
  "day_of_week",
  "month_of_year",
  "hour_of_day",
  "minute_of_hour")
  
val numericalColumns = List(
  "Number_of_Vehicles",
  "Number_of_Casualties",
  "Speed_limit")

nonLabelColumns: List[String] = List(Date, Day_of_Week)
unimportantCategoricalColumns: List[String] = List(Location_Easting_OSGR, Location_Northing_OSGR, LSOA_of_Accident_Location)
geoColumns: List[String] = List(Latitude, Longitude)
categoricalColumns: List[String] = List(Local_Authority_(District), Local_Authority_(Highway), 1st_Road_Class, 1st_Road_Number, Road_Type, Junction_Detail, Junction_Control, 2nd_Road_Class, 2nd_Road_Number, Pedestrian_Crossing-Human_Control, Pedestrian_Crossing-Physical_Facilities, Light_Conditions, Weather_Conditions, Road_Surface_Conditions, Special_Conditions_at_Site, Carriageway_Hazards, Urban_or_Rural_Area, Did_Police_Officer_Attend_Scene_of_Accident)
ordinalColumns: List[String] = List(Police_Force, Accident_Severity, day_of_year, day_of_week, month_o...

In [ ]:
def addDateFeatures(df: DataFrame): DataFrame = {
  df
    .withColumn("year", year($"Date"))
    .withColumn("day_of_year", dayofyear($"Date"))
    .withColumn("day_of_week", dayofweek($"Date"))
    .withColumn("month_of_year", month($"Date"))
    .withColumn("hour_of_day", hour(to_timestamp($"Time", "HH:mm")))
    .withColumn("minute_of_hour", minute(to_timestamp($"Time", "HH:mm")))
    .drop("Date", "Time")
}
val dateFeaturesDF = addDateFeatures(accidentsDF)

addDateFeatures: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
dateFeaturesDF: org.apache.spark.sql.DataFrame = [Accident_Index: string, Location_Easting_OSGR: string ... 33 more fields]


In [ ]:
def castTypes(df: DataFrame, columnName: String, typeToCast: String): DataFrame = {
  df.withColumn(columnName, df(columnName).cast(typeToCast))
}
val castGeoAccidentsDF = geoColumns.foldLeft(dateFeaturesDF){ (df, columnName) =>
  castTypes(df, columnName, "Double")
}
val castAccidentsDF = (numericalColumns ++ ordinalColumns).foldLeft(castGeoAccidentsDF) { (df, columnName) =>
  castTypes(df, columnName, "Int")
}

castTypes: (df: org.apache.spark.sql.DataFrame, columnName: String, typeToCast: String)org.apache.spark.sql.DataFrame
castGeoAccidentsDF: org.apache.spark.sql.DataFrame = [Accident_Index: string, Location_Easting_OSGR: string ... 33 more fields]
castAccidentsDF: org.apache.spark.sql.DataFrame = [Accident_Index: string, Location_Easting_OSGR: string ... 33 more fields]


Now to figure out which columns to filter.

In [ ]:
import scala.collection.mutable.ListBuffer

val columnsToFilter = ListBuffer[String]()
val columnsToDrop = ListBuffer[String]()

castAccidentsDF.columns.filter({ x => !List("LSOA_of_Accident_Location").contains(x) }).foreach { x =>
  val missingCount = castAccidentsDF
    .filter(castAccidentsDF(x) === -1 || castAccidentsDF(x).isNull)
    .count
  val missingPercentage = (missingCount.toFloat / totalAccidents ) * 100
  if (missingPercentage == 0) {
  } else if (missingPercentage < 5) {
    columnsToFilter += x
    println(x)
    println(s"Missing ${missingPercentage}%. Should filter these.")
  } else {
    columnsToDrop += x
    println(x)
    println(s"Missing ${missingPercentage}%. Should drop this column.")
  }
}

Location_Easting_OSGR
Missing 0.00676583%. Should filter these.
Location_Northing_OSGR
Missing 0.00676583%. Should filter these.
Longitude
Missing 0.006826783%. Should filter these.
Latitude
Missing 0.00676583%. Should filter these.
1st_Road_Number
Missing 1.21906836E-4%. Should filter these.
Junction_Detail
Missing 0.0010971616%. Should filter these.
Junction_Control
Missing 35.67738%. Should drop this column.
2nd_Road_Class
Missing 41.220848%. Should drop this column.
2nd_Road_Number
Missing 0.98244727%. Should filter these.
Pedestrian_Crossing-Human_Control
Missing 0.0012800219%. Should filter these.
Pedestrian_Crossing-Physical_Facilities
Missing 0.0022552765%. Should filter these.
Weather_Conditions
Missing 0.009813501%. Should filter these.
Road_Surface_Conditions
Missing 0.13135462%. Should filter these.
Special_Conditions_at_Site
Missing 0.0010971616%. Should filter these.
Carriageway_Hazards
Missing 0.0019505094%. Should filter these.
Did_Police_Officer_Attend_Scene_of_Acciden

In [ ]:
// I couldn't remember how to drop columns with a list, so this would have to do.

val accidentsDroppedDF = columnsToDrop.foldLeft(castAccidentsDF) { (df, x) =>
  df.drop(x)
}
println(accidentsDroppedDF.count)
val accidentsFilteredDF = columnsToFilter.foldLeft(accidentsDroppedDF) { (df, x) =>
  df.filter(df(x) =!= -1 && !df(x).isNull)
}

1640597
accidentsDroppedDF: org.apache.spark.sql.DataFrame = [Accident_Index: string, Location_Easting_OSGR: string ... 31 more fields]
accidentsFilteredDF: org.apache.spark.sql.DataFrame = [Accident_Index: string, Location_Easting_OSGR: string ... 31 more fields]


In [ ]:
val finalCount = accidentsFilteredDF.count
val removedCount = totalAccidents - finalCount
val finalPercentage = finalCount.toFloat / totalAccidents * 100
println(s"So we have dropped a total of ${removedCount}, which means we are left with ${finalPercentage}% of our original data")

So we have dropped a total of 18953, which means we are left with 98.84475% of our original data
finalCount: Long = 1621644
removedCount: Long = 18953
finalPercentage: Float = 98.84475


In [ ]:
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.Normalizer

val PValue = 1

val numericalVecColumnName = "numerical_columns_vec"

val scaledAssembler = new VectorAssembler()
  .setInputCols(numericalColumns.toArray)
  .setOutputCol(numericalVecColumnName)
val scaledVecDF = scaledAssembler.transform(accidentsFilteredDF)

val scaler = new Normalizer()
  .setInputCol(numericalVecColumnName)
  .setOutputCol(numericalVecColumnName + "_scaled")
  .setP(PValue)

// Compute summary statistics by fitting the StandardScaler.
val scaledDF = scaler.transform(scaledVecDF)


import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.Normalizer
PValue: Int = 1
numericalVecColumnName: String = numerical_columns_vec
scaledAssembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_3978632bfee6
scaledVecDF: org.apache.spark.sql.DataFrame = [Accident_Index: string, Location_Easting_OSGR: string ... 32 more fields]
scaler: org.apache.spark.ml.feature.Normalizer = normalizer_018741300878
scaledDF: org.apache.spark.sql.DataFrame = [Accident_Index: string, Location_Easting_OSGR: string ... 33 more fields]


It looks like that over the break between ChristmasTime and the 3rd of January, there is a steep drop off of in number of collisions and causalties. This could be explained by a lower percentage of the UK population in the UK, or perhaps there are less people on the UK roads as everyone stays in 1 location for that time period.

In [ ]:
widgets.display(kmeansInputDF.groupBy("day_of_year").agg(sum("Number_of_Casualties")).sort("day_of_year"))

res60: notebook.front.Widget = <Tabs widget>


Understandably, the two most deadly times on the UK roads is during morning and evening peak hours as there would be more cars on the road.

In [ ]:
widgets.display(kmeansInputDF.groupBy("hour_of_day").count().sort("hour_of_day"))

res68: notebook.front.Widget = <Tabs widget>


So the speed limit aggregation seems to indicate that the 30 speed limit is where the most deaths happen. As well as all the above statistics, it would be interesting to index these against number of cars in these speed limit areas, or miles of road which have these speed limits to normalise the data.

In [ ]:
widgets.display(kmeansInputDF.groupBy("Speed_limit").agg(sum("Number_of_Casualties")).sort("Speed_limit") )

res73: notebook.front.Widget = <Tabs widget>


Whats interesting about this graph is that most of the call-outs to collisions are just a single police officer. I guess that is because police officer would usually be called out to any collision, and then would request assistance should anything be seriously dire.

In [ ]:
widgets.display(kmeansInputDF.groupBy("Police_Force").agg(sum("Number_of_Casualties")).sort("Police_Force") )

res77: notebook.front.Widget = <Tabs widget>


In [ ]:
widgets.display(kmeansInputDF.groupBy("Number_of_Casualties").count().sort("Number_of_Casualties"))

res85: notebook.front.Widget = <Tabs widget>


It looks like the least deadly time on the road is the weekends, as 1 indicates Sunday and 7 indicates Saturday.

In [ ]:
widgets.display(kmeansInputDF.groupBy("day_of_week").count().sort("day_of_week")) 

res87: notebook.front.Widget = <Tabs widget>


###Plan from here

- Continue to inspect data manually for interesting insights, especially across the categorical values.
- One hot encode the categorical features for clustering, correlation analysis.
- Build the correlation matrix and investigate correlations and plot any correlated axes.
- For all features that are not Latitude and Longitude, or high cardinality features, implement the PCA algorithm to minimise the dimensionality so that K-means can be possible. Select the number of rotations(p) for the PCA using elbow analysis, ie. when increasing the number of rotations reaches very little increase in explanation of the variance of the features.
- Join the remaining features to the new PCA vectors as features, and then run the k-means algorithm iteratively using elbow analysis again.
- Sort the clusters by number of causalties, and then list the principalities of each cluster and also project the PCA value of each cluster back to the original high dimensional vector to get the average values of the features for each cluster, which will then describe the averages for each cluster. Also, look at the at long to see if there is a significant intersection being highlighted.

- Now load the other two data sets and load them onto the original accidents Dataframe, one hot encode the variables, repeat the above analysis/ filtering for those new attributes, including the PCA/k-means clustering and see if there is any difference to the clustering, which in this case will be clustering by causalty, rather than by accident.