# Import data stored in hdfs distributed file system

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import matplotlib.pyplot as plt
import pandas as pd

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
sc = spark.sparkContext

data = spark.read.csv("hdfs://namenode/Crashes_Last_Five_Years.csv", header = True, inferSchema = True)

print("success")

success


# Clean relevent null data

In [2]:
# clean null data
data = data[data.DAY_OF_WEEK != 'null']
data = data[data.SEVERITY != 'null']
data = data[data.ACCIDENT_DATE != 'null']
data = data[data.NODE_TYPE != 'Unknown']
data = data[data.ROAD_GEOMETRY != 'Unknown']
data = data[data.SPEED_ZONE != 'Not known']

print("success")

success


In [3]:
# generate view for following queries
data.createOrReplaceTempView("data")
print("success")

success


# Clean abnormal data and outliers

In [4]:
data.groupby('UNLICENCSED').count().toPandas()

Unnamed: 0,UNLICENCSED,count
0,1,2338
1,2,12
2,0,66702


In [5]:
# Because of unlicensed valid value is only 0 (no) and 1 (yes)
# Delete 12 pieces of abnormal data with unlicensed = 2
data = spark.sql("select * from data where UNLICENCSED between 0 and 1")
data.groupby('UNLICENCSED').count().toPandas()

Unnamed: 0,UNLICENCSED,count
0,1,2338
1,0,66702


In [6]:
# Regarding those without a driver’s license as outliers
# there is no guarantee that they are well capable of driving
data = spark.sql("select * from data where UNLICENCSED = 0")
data.groupby('UNLICENCSED').count().toPandas()

Unnamed: 0,UNLICENCSED,count
0,0,66702


# Show features of processed data view

In [7]:
# show the first 5 lines of dataset to see that if data is successfully loaded
pd.DataFrame(data.take(5), columns = data.columns).transpose()

Unnamed: 0,0,1,2,3,4
OBJECTID,3401744,3401745,3401746,3401747,3401748
ACCIDENT_NO,T20130013732,T20130013736,T20130013737,T20130013738,T20130013739
ABS_CODE,ABS to receive accident,ABS to receive accident,ABS to receive accident,ABS to receive accident,ABS to receive accident
ACCIDENT_STATUS,Finished,Finished,Finished,Finished,Finished
ACCIDENT_DATE,1/7/2013,2/7/2013,2/7/2013,2/7/2013,2/7/2013
...,...,...,...,...,...
RMA,Local Road,Arterial Other,Local Road,Freeway,Local Road
RMA_ALL,Local Road,"Arterial Other,Local Road",Local Road,Freeway,Local Road
DIVIDED,Undivided,Divided,Undivided,Divided,Undivided
DIVIDED_ALL,Undiv,"Div,Undiv",Undiv,Div,Undiv


In [8]:
# compute count, mean, standard dev, min and max values of all numeric attributes
numeric_features = [t[0] for t in data.dtypes if t[1] == 'int']
data.select(numeric_features).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
OBJECTID,66702,3438996.8031993043,21684.155329872086,3401744,3476651
NODE_ID,66702,202288.44069143353,125959.62425479155,4,343479
TOTAL_PERSONS,66702,2.414035561152589,1.5665646569261693,1,97
INJ_OR_FATAL,66702,1.2725255614524302,0.7066139780333256,0,23
FATALITY,66702,0.019654583070972384,0.15101902909609613,0,4
SERIOUSINJURY,66702,0.33946508350574195,0.594907652857355,0,17
OTHERINJURY,66702,0.9134058948757159,0.764674017252065,0,23
NONINJURED,66702,1.141509999700159,1.3790104954893991,0,87
MALES,66702,1.3236784504212766,1.0193976459053773,0,46


# Quantitative analysis of different accident types and severity level

In [None]:
data.groupby('ACCIDENT_TYPE').count().toPandas()

Unnamed: 0,ACCIDENT_TYPE,count
0,Collision with vehicle,42579
1,collision with some other object,535
2,Struck animal,678
3,Vehicle overturned (no collision),3124
4,Collision with a fixed object,10507
5,Struck Pedestrian,6022
6,Other accident,51
7,Fall from or in moving vehicle,309
8,No collision and no object struck,2897


  The most common accident type is "Collision with vehicle"

In [None]:
data.groupby('SEVERITY').count().toPandas()

# The trend of accidents in recent years

In [None]:
# Orders by year
# ACCIDENT_DATE original format (1/7/2013)
year_results = spark.sql(" \
    select \
        substr(ACCIDENT_DATE, length(ACCIDENT_DATE) - (LOCATE('/', REVERSE(ACCIDENT_DATE)) - 1) + 1, 4) as year, \
        count(OBJECTID) as total \
        from data \
    group by year \
    order by year desc")

df_pandas = year_results.toPandas()
df_pandas.plot(kind='barh',x='year',y='total')

# the relationship between accidents frequency and weekly time session

In [None]:
# Orders by Day of Week
week_results = spark.sql("select count(OBJECTID) as total_accidents, DAY_OF_WEEK as day_of_week \
    from data \
    group by day_of_week \
    order by total_accidents desc")

df_pandas = week_results.toPandas()
df_pandas.plot(kind='barh',x='day_of_week',y='total_accidents')

  There is no significant difference in the incidence of car accidents in a weekly cycle. And the frequency of car accidents on weekends can be relatively smaller compared to the other days.

# the relationship between accidents frequency and daily time session

In [None]:
# time session divided in 24 hours, listed from 0 to 23
# ACCIDENT_TIME format (18.30.00)
# Orders by accidents frequency
hour_results = spark.sql(" \
    select \
        substr(ACCIDENT_TIME, 0, (LOCATE('.', ACCIDENT_TIME) - 1)) as time_session, \
        count(OBJECTID) as total \
        from data \
    group by time_session \
    order by count(OBJECTID)")

df_pandas = hour_results.toPandas()
df_pandas.plot(kind='barh',x='time_session',y='total')


The three hours during 15:00 and 18:00 is the most likely time period for a car accident.

# the relationship between accidents frequency and light condition

In [None]:
# Orders by accidents frequency
speed_results = spark.sql(" \
    select \
        SPEED_ZONE, \
        count(OBJECTID) as total \
        from data \
    group by SPEED_ZONE \
    order by total")

df_pandas = speed_results.toPandas()
df_pandas.plot(kind='barh',x='SPEED_ZONE',y='total')

  The most common area for car accidents is not the highway section, but the section where the speed limit is 60km/h.

# the relationship between accidents frequency and light condition

In [None]:
# Orders by accidents frequency
speed_results = spark.sql(" \
    select \
        LIGHT_CONDITION, \
        count(OBJECTID) as total \
        from data \
    group by LIGHT_CONDITION \
    order by total")

df_pandas = speed_results.toPandas()
df_pandas.plot(kind='barh',x='LIGHT_CONDITION',y='total')

In [None]:
  Most car accidents happen during the day.

# the relationship between accidents frequency and road geometry

In [None]:
data.groupby('ROAD_GEOMETRY').count().toPandas()

In [None]:
geometry_results = spark.sql("select count(OBJECTID) as count, ROAD_GEOMETRY \
    from data\
    group by ROAD_GEOMETRY \
    order by count desc")

df_pandas = geometry_results.toPandas()
df_pandas.plot(kind='barh',x='ROAD_GEOMETRY',y='count')

  Whether it is an intersection or an ordinary road without an intersection, the frequencies of car accidents are both very high.

# accidents frequency distribution and intersection type

In [None]:
# the relationship between accidents frequency and interaction type
week_results = spark.sql("select count(OBJECTID) as count, ROAD_GEOMETRY \
    from (select * from data where NODE_TYPE = 'Intersection')\
    group by ROAD_GEOMETRY \
    order by count desc")

df_pandas = week_results.toPandas()
df_pandas.plot(kind='pie', y='count', labels=df_pandas['ROAD_GEOMETRY'], legend = False)

  Cross intersection and T intersection are the two types of intersections where a crashing accident mostly likely to happen.

# The relationship between whether alcohol involved and accident severity

In [None]:
data.groupby('ALCOHOL_RELATED').count().toPandas()

In [None]:
# Severity disribution in alcohol-related accidents
severity_light = spark.sql(" select count(*) as num, SEVERITY\
    from (select * from data where ALCOHOL_RELATED = 'Yes')\
    group by SEVERITY")

df_pandas = severity_light.toPandas()
df_pandas.plot(kind='pie', y='num', labels=df_pandas['SEVERITY'], legend = False)


In [None]:
# Severity disribution in not alcohol-related accidents
severity_light = spark.sql(" select count(*) as num, SEVERITY\
    from (select * from data where ALCOHOL_RELATED = 'No')\
    group by SEVERITY")

df_pandas = severity_light.toPandas()
df_pandas.plot(kind='pie', y='num', labels=df_pandas['SEVERITY'], legend = False)

  Car accidents involving alcohol are more likely to cause more serious casualties than those without alcohol.