# Analyzing Car Accidents in Canada from 1999 - 2014 using a distributed environment in PySpark

In [28]:
import pyspark as py
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession

In [29]:
spark = SparkSession.builder.appName('analysis').getOrCreate()
df = spark.read.csv('NCDB_1999_to_2014.csv',header=True)
df.show(10,False)

+------+------+------+------+-----+------+------+------+------+------+------+------+----+------+------+----+-----+-----+-----+------+------+------+
|C_YEAR|C_MNTH|C_WDAY|C_HOUR|C_SEV|C_VEHS|C_CONF|C_RCFG|C_WTHR|C_RSUR|C_RALN|C_TRAF|V_ID|V_TYPE|V_YEAR|P_ID|P_SEX|P_AGE|P_PSN|P_ISEV|P_SAFE|P_USER|
+------+------+------+------+-----+------+------+------+------+------+------+------+----+------+------+----+-----+-----+-----+------+------+------+
|1999  |01    |1     |20    |2    |02    |34    |UU    |1     |5     |3     |03    |01  |06    |1990  |01  |M    |41   |11   |1     |UU    |1     |
|1999  |01    |1     |20    |2    |02    |34    |UU    |1     |5     |3     |03    |02  |01    |1987  |01  |M    |19   |11   |1     |UU    |1     |
|1999  |01    |1     |20    |2    |02    |34    |UU    |1     |5     |3     |03    |02  |01    |1987  |02  |F    |20   |13   |2     |02    |2     |
|1999  |01    |1     |08    |2    |01    |01    |UU    |5     |3     |6     |18    |01  |01    |1986  |01  |M   

In [30]:
print("Total Rows in Dataset:",df.select("*").count())

Total Rows in Dataset: 5860405


In [31]:
print("Name of Columns - Printing for reference")
df.schema.names

Name of Columns - Printing for reference


['C_YEAR',
 'C_MNTH',
 'C_WDAY',
 'C_HOUR',
 'C_SEV',
 'C_VEHS',
 'C_CONF',
 'C_RCFG',
 'C_WTHR',
 'C_RSUR',
 'C_RALN',
 'C_TRAF',
 'V_ID',
 'V_TYPE',
 'V_YEAR',
 'P_ID',
 'P_SEX',
 'P_AGE',
 'P_PSN',
 'P_ISEV',
 'P_SAFE',
 'P_USER']

In [35]:
###################
###Cleaning Data###
###################
df = df.select("*").filter(col("C_MNTH").cast("int").isNotNull())

# Severity Group by Month

In [36]:
severity = df.select(concat(col("C_YEAR"), lit("-"), col("C_MNTH")).alias("Date"), "C_SEV")
severity = severity.groupBy("Date","C_SEV").count().sort("count")
severity.show(severity.count(),False)

+-------+-----+-----+
|Date   |C_SEV|count|
+-------+-----+-----+
|2013-04|1    |230  |
|2013-03|1    |237  |
|2014-04|1    |246  |
|2014-03|1    |260  |
|2013-02|1    |280  |
|2009-03|1    |294  |
|2011-04|1    |299  |
|2012-04|1    |302  |
|2010-04|1    |313  |
|2012-02|1    |317  |
|2009-04|1    |325  |
|2014-02|1    |326  |
|2010-02|1    |327  |
|2011-03|1    |332  |
|2011-05|1    |333  |
|2010-03|1    |334  |
|2009-02|1    |335  |
|2011-02|1    |339  |
|2003-02|1    |347  |
|2014-11|1    |347  |
|2013-06|1    |349  |
|2013-12|1    |353  |
|2008-02|1    |353  |
|2014-01|1    |360  |
|2008-04|1    |362  |
|2008-03|1    |371  |
|2014-06|1    |373  |
|2014-12|1    |381  |
|2013-05|1    |381  |
|2012-06|1    |382  |
|2012-05|1    |386  |
|2014-05|1    |388  |
|2009-05|1    |391  |
|2012-11|1    |394  |
|2004-02|1    |399  |
|2004-03|1    |400  |
|2005-02|1    |403  |
|2014-09|1    |406  |
|2012-12|1    |407  |
|2012-09|1    |407  |
|2012-10|1    |408  |
|2001-01|1    |408  |
|2009-01|1