# UK Road Safety: Traffic Accidents and Vehicles - ETL
## Detailed dataset of road accidents and involved vehicles in the UK (2005-2016)
Source:
<U>https://www.kaggle.com/tsiaras/uk-road-safety-accidents-and-vehicles#Accident_Information.csv</U>


# Extract
* Load file into a Spark session from Object Store
* One large CSV file <br>
* Allow for loading a sample for intial exploration

In [6]:
# The code was removed by Watson Studio for sharing.

Row(Accident_Index='200501BS00001', 1st_Road_Class='A', 1st_Road_Number='3218', 2nd_Road_Class='NA', 2nd_Road_Number='0', Accident_Severity='Serious', Carriageway_Hazards='None', Date='2005-01-04', Day_of_Week='Tuesday', Did_Police_Officer_Attend_Scene_of_Accident='1', Junction_Control='Data missing or out of range', Junction_Detail='Not at junction or within 20 metres', Latitude='51.489096', Light_Conditions='Daylight', Local_Authority_(District)='Kensington and Chelsea', Local_Authority_(Highway)='Kensington and Chelsea', Location_Easting_OSGR='525680', Location_Northing_OSGR='178240', Longitude='-0.19117', LSOA_of_Accident_Location='E01002849', Number_of_Casualties='1', Number_of_Vehicles='1', Pedestrian_Crossing-Human_Control='0', Pedestrian_Crossing-Physical_Facilities='1', Police_Force='Metropolitan Police', Road_Surface_Conditions='Wet or damp', Road_Type='Single carriageway', Special_Conditions_at_Site='None', Speed_limit='30', Time='17:42', Urban_or_Rural_Area='Urban', Weather

# Transform
* Rename some columns without special characters
* Cast some columns to appropriate types (from string to int or double)



In [8]:
#rename columns with special characters
rdd_accidents = rdd_accidents.withColumnRenamed("Local_Authority_(District)", "Local_Authority_District")\
    .withColumnRenamed("Local_Authority_(Highway)", "Local_Authority_Highway")\
    .withColumnRenamed("Pedestrian_Crossing-Human_Control", "Pedestrian_Crossing_Human_Control")\
    .withColumnRenamed("Pedestrian_Crossing-Physical_Facilities", "Pedestrian_Crossing_Physical_Facilities")

#type conversions
rdd_accidents = rdd_accidents.withColumn("Latitude", rdd_accidents["Latitude"].cast("double"))
rdd_accidents = rdd_accidents.withColumn("Longitude", rdd_accidents["Longitude"].cast("double"))
rdd_accidents = rdd_accidents.withColumn("Location_Northing_OSGR", rdd_accidents["Location_Northing_OSGR"].cast("int"))
rdd_accidents = rdd_accidents.withColumn("Location_Easting_OSGR", rdd_accidents["Location_Easting_OSGR"].cast("int"))
rdd_accidents = rdd_accidents.withColumn("Number_of_Casualties", rdd_accidents["Number_of_Casualties"].cast("int"))
rdd_accidents = rdd_accidents.withColumn("Number_of_Vehicles", rdd_accidents["Number_of_Vehicles"].cast("int"))
rdd_accidents = rdd_accidents.withColumn("Speed_limit", rdd_accidents["Speed_limit"].cast("int"))
rdd_accidents = rdd_accidents.withColumn("Year", rdd_accidents["Year"].cast("int"))
rdd_accidents.take(1)

rdd_accidents.createOrReplaceTempView("accidents")
rdd_accidents.printSchema()
print("number of records: {:d}".format(rdd_accidents.count()))



root
 |-- Accident_Index: string (nullable = true)
 |-- 1st_Road_Class: string (nullable = true)
 |-- 1st_Road_Number: string (nullable = true)
 |-- 2nd_Road_Class: string (nullable = true)
 |-- 2nd_Road_Number: string (nullable = true)
 |-- Accident_Severity: string (nullable = true)
 |-- Carriageway_Hazards: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Day_of_Week: string (nullable = true)
 |-- Did_Police_Officer_Attend_Scene_of_Accident: string (nullable = true)
 |-- Junction_Control: string (nullable = true)
 |-- Junction_Detail: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Light_Conditions: string (nullable = true)
 |-- Local_Authority_District: string (nullable = true)
 |-- Local_Authority_Highway: string (nullable = true)
 |-- Location_Easting_OSGR: integer (nullable = true)
 |-- Location_Northing_OSGR: integer (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- LSOA_of_Accident_Location: string (nullable = true)
 |-- Num

# Load
* Save Enterprise data as Parquet files distributed over the Spark cluster

In [9]:
#Save as parquet files
rdd_accidents.write.format("parquet").mode("overwrite").save("accidents.parquet")