In [4]:
# Initialise the app
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import *
spark = SparkSession.builder.appName('iteration4').getOrCreate()


In [5]:
# Loading dataset
collision = spark.read.csv('./collisions 15-18.csv', header='true')
victims = spark.read.csv('./victims 15-18.csv', header='true')
parties = spark.read.csv('./parties 15-18.csv', header='true')
collision.show()

+-------+-------------+-----------------+--------+---------+------------+---------+---------+-------------+-----------------+---------------+------------------+--------------+---------------+-----------+------------------------+------------------+----------------------+---------------+-----------------+---------------------------+--------------------+------------+----------------+----------------+--------------------+--------------------+-----------------+--------------------+---------------+--------------------+----------------+-------------------+--------------------------+------------------------------+-----------------------+------------------------+----------------------+-----------------------+-------------------------+--------------------------+--------------+--------------+------------+
|case_id| primary_road|   secondary_road|distance|direction|intersection|weather_1|weather_2|location_type|ramp_intersection|side_of_highway|collision_severity|killed_victims|injured_victims|par

# 2. Data Understanding
## 2.1 Collect Initial Data

In [6]:
#Understanding our data
#Showing specific columns
collision_part1 = collision.select('case_id', 'primary_road', 'secondary_road', 'distance', 'direction',
       'intersection', 'weather_1', 'weather_2', 'location_type', 'ramp_intersection')
collision_part1.show()

+-------+-------------+-----------------+--------+---------+------------+---------+---------+-------------+-----------------+
|case_id| primary_road|   secondary_road|distance|direction|intersection|weather_1|weather_2|location_type|ramp_intersection|
+-------+-------------+-----------------+--------+---------+------------+---------+---------+-------------+-----------------+
|6292121|        RT 49|   HOLLY VISTA WY|       0|     null|           1|   cloudy|     null| intersection|                5|
|6292127|  RICHLAND ST|          ROSS ST|       0|     null|           1|    clear|     null|         null|             null|
|6292128|       RT 118|         HITCH BL|    2549|     east|           0|    clear|     null|      highway|             null|
|6292133|        RT 91|    WEST GRAND BL|     463|     east|           0|   cloudy|     null|      highway|             null|
|6292134|       RT 299|          BAIR RD|     970|     west|           0|    clear|     null|      highway|           

In [7]:
collision_part2 = collision.select('side_of_highway',
       'collision_severity', 'killed_victims', 'injured_victims',
       'alcohol_involved','pcf_violation_category')
collision_part2.show()

+---------------+------------------+--------------+---------------+----------------+----------------------+
|side_of_highway|collision_severity|killed_victims|injured_victims|alcohol_involved|pcf_violation_category|
+---------------+------------------+--------------+---------------+----------------+----------------------+
|     northbound|             fatal|             1|              0|            null|  automobile right ...|
|           null|             fatal|             1|              0|               1|              speeding|
|      eastbound|             fatal|             1|              0|            null|    wrong side of road|
|      eastbound|             fatal|             1|              0|               1|      improper turning|
|      westbound|             fatal|             1|              1|            null|    wrong side of road|
|      westbound|             fatal|             1|              0|            null|    wrong side of road|
|      westbound|           

In [8]:
victims.show()

+-------+-------+------------+-----------+----------+----------+-----------------------+-----------------------+--------------+
|     id|case_id|party_number|victim_role|victim_sex|victim_age|victim_degree_of_injury|victim_seating_position|victim_ejected|
+-------+-------+------------+-----------+----------+----------+-----------------------+-----------------------+--------------+
|2609463|6292121|           1|          2|    female|        10|              no injury|                      3|             0|
|2609464|6292121|           1|          2|      male|         8|              no injury|                      6|             0|
|2609465|6292121|           2|          1|      male|        57|                 killed|                      1|             1|
|2609471|6292127|           1|          1|      male|        30|                 killed|                      1|             1|
|2609472|6292128|           1|          1|      male|        58|                 killed|                

In [9]:
parties_part1 = parties.select('id', 'case_id', 'party_number', 'party_type', 'at_fault', 'party_sex',
       'party_age', 'party_sobriety','cellphone_use')
parties_part1.show()
parties_part2 = parties.select('party_drug_physical', 'party_safety_equipment_1',
       'party_safety_equipment_2', 'financial_responsibility',
       'hazardous_materials')
parties_part2.show()

+-------+-------+------------+--------------+--------+---------+---------+--------------+-------------+
|     id|case_id|party_number|    party_type|at_fault|party_sex|party_age|party_sobriety|cellphone_use|
+-------+-------+------------+--------------+--------+---------+---------+--------------+-------------+
|4734135|6292121|           1|        driver|       1|     male|       39|             A|            3|
|4734136|6292121|           2|        driver|       0|     male|       57|          null|            3|
|4734144|6292127|           1|        driver|       1|     male|       30|             C|            3|
|4734145|6292127|           2|parked vehicle|       0|     null|     null|             H|            3|
|4734146|6292128|           1|        driver|       1|     male|       58|             G|            3|
|4734147|6292128|           2|        driver|       0|     male|       48|             A|            3|
|4734152|6292133|           1|        driver|       1|     male|

## 2.2 Describe the data

In [10]:
#Describing data
collisionRow = collision.count()
collisionCol = len(collision.columns)
partiesRow = parties.count()
partiesCol = len(parties.columns)
victimsRow = victims.count()
victimsCol = len(victims.columns)
print('Collisions table has '+ str(collisionRow) + ' no. of rows and ' + str(collisionCol) + ' no. of fields')
print('Parties table has '+ str(partiesRow) + ' no. of rows and ' + str(partiesCol) + ' no. of fields')
print('Victims table has '+ str(victimsRow) + ' no. of rows and ' + str(victimsCol) + ' no. of fields')

Collisions table has 69160 no. of rows and 44 no. of fields
Parties table has 129666 no. of rows and 28 no. of fields
Victims table has 79293 no. of rows and 9 no. of fields


In [11]:
collision.printSchema()

root
 |-- case_id: string (nullable = true)
 |-- primary_road: string (nullable = true)
 |-- secondary_road: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- direction: string (nullable = true)
 |-- intersection: string (nullable = true)
 |-- weather_1: string (nullable = true)
 |-- weather_2: string (nullable = true)
 |-- location_type: string (nullable = true)
 |-- ramp_intersection: string (nullable = true)
 |-- side_of_highway: string (nullable = true)
 |-- collision_severity: string (nullable = true)
 |-- killed_victims: string (nullable = true)
 |-- injured_victims: string (nullable = true)
 |-- party_count: string (nullable = true)
 |-- primary_collision_factor: string (nullable = true)
 |-- pcf_violation_code: string (nullable = true)
 |-- pcf_violation_category: string (nullable = true)
 |-- hit_and_run: string (nullable = true)
 |-- type_of_collision: string (nullable = true)
 |-- motor_vehicle_involved_with: string (nullable = true)
 |-- pedestrian_actio

In [12]:
parties.printSchema()

root
 |-- id: string (nullable = true)
 |-- case_id: string (nullable = true)
 |-- party_number: string (nullable = true)
 |-- party_type: string (nullable = true)
 |-- at_fault: string (nullable = true)
 |-- party_sex: string (nullable = true)
 |-- party_age: string (nullable = true)
 |-- party_sobriety: string (nullable = true)
 |-- party_drug_physical: string (nullable = true)
 |-- direction_of_travel: string (nullable = true)
 |-- party_safety_equipment_1: string (nullable = true)
 |-- party_safety_equipment_2: string (nullable = true)
 |-- financial_responsibility: string (nullable = true)
 |-- hazardous_materials: string (nullable = true)
 |-- cellphone_use: string (nullable = true)
 |-- oaf_violation_code: string (nullable = true)
 |-- oaf_violation_category: string (nullable = true)
 |-- oaf_violation_section: string (nullable = true)
 |-- oaf_violation_suffix: string (nullable = true)
 |-- other_associate_factor_1: string (nullable = true)
 |-- other_associate_factor_2: string

In [13]:
victims.printSchema()

root
 |-- id: string (nullable = true)
 |-- case_id: string (nullable = true)
 |-- party_number: string (nullable = true)
 |-- victim_role: string (nullable = true)
 |-- victim_sex: string (nullable = true)
 |-- victim_age: string (nullable = true)
 |-- victim_degree_of_injury: string (nullable = true)
 |-- victim_seating_position: string (nullable = true)
 |-- victim_ejected: string (nullable = true)



In [14]:
victims.select("victim_degree_of_injury").distinct().show()

+-----------------------+
|victim_degree_of_injury|
+-----------------------+
|                      7|
|   other visible injury|
|                      5|
|                      6|
|      complaint of pain|
|              no injury|
|          severe injury|
|                 killed|
+-----------------------+



## 2.3 Explore the data (This was done on Tableau)

## 2.4 Verify the data quality
### 2.4.1. Missing Values


In [15]:
### Count of NaN and Null values in pyspark
from pyspark.sql.functions import isnan, when, count, col
collision_part1.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in collision_part1.columns]).show()
collision_part2.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in collision_part2.columns]).show()

+-------+------------+--------------+--------+---------+------------+---------+---------+-------------+-----------------+
|case_id|primary_road|secondary_road|distance|direction|intersection|weather_1|weather_2|location_type|ramp_intersection|
+-------+------------+--------------+--------+---------+------------+---------+---------+-------------+-----------------+
|      0|           0|             0|       0|    15457|         228|      151|    68471|        62476|            67302|
+-------+------------+--------------+--------+---------+------------+---------+---------+-------------+-----------------+

+---------------+------------------+--------------+---------------+----------------+----------------------+
|side_of_highway|collision_severity|killed_victims|injured_victims|alcohol_involved|pcf_violation_category|
+---------------+------------------+--------------+---------------+----------------+----------------------+
|          62476|                 0|             0|              

In [16]:
parties_part1.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in parties_part1.columns]).show()
parties_part2.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in parties_part2.columns]).show()

+---+-------+------------+----------+--------+---------+---------+--------------+-------------+
| id|case_id|party_number|party_type|at_fault|party_sex|party_age|party_sobriety|cellphone_use|
+---+-------+------------+----------+--------+---------+---------+--------------+-------------+
|  0|      0|           0|        35|       0|    10432|    12326|          2090|        13013|
+---+-------+------------+----------+--------+---------+---------+--------------+-------------+

+-------------------+------------------------+------------------------+------------------------+-------------------+
|party_drug_physical|party_safety_equipment_1|party_safety_equipment_2|financial_responsibility|hazardous_materials|
+-------------------+------------------------+------------------------+------------------------+-------------------+
|             113724|                   41742|                   14313|                    9597|             129579|
+-------------------+------------------------+-----

In [17]:
victims.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in victims.columns]).show()

+---+-------+------------+-----------+----------+----------+-----------------------+-----------------------+--------------+
| id|case_id|party_number|victim_role|victim_sex|victim_age|victim_degree_of_injury|victim_seating_position|victim_ejected|
+---+-------+------------+-----------+----------+----------+-----------------------+-----------------------+--------------+
|  0|      0|           0|          0|      1463|      1128|                      0|                      0|            18|
+---+-------+------------+-----------+----------+----------+-----------------------+-----------------------+--------------+



### 2.4.2 Measurement Errors

In [18]:
#Showing specific columns summary
collision.select("alcohol_involved").describe().show()

+-------+----------------+
|summary|alcohol_involved|
+-------+----------------+
|  count|            5099|
|   mean|             1.0|
| stddev|             0.0|
|    min|               1|
|    max|               1|
+-------+----------------+



### 2.4.4 Coding inconsistencies

In [19]:
parties.groupBy('cellphone_use').count().orderBy('count', ascending=False).show()

+-------------+------+
|cellphone_use| count|
+-------------+------+
|            3|114339|
|         null| 13013|
|            2|   666|
|            D|   665|
|            C|   648|
|            1|   324|
|            B|    11|
+-------------+------+



In [20]:
victims.groupBy('victim_degree_of_injury').count().orderBy(col('count').desc()).show()

+-----------------------+-----+
|victim_degree_of_injury|count|
+-----------------------+-----+
|   other visible injury|23314|
|      complaint of pain|19204|
|              no injury|14779|
|          severe injury| 9049|
|                      6| 5061|
|                      7| 2886|
|                      5| 2785|
|                 killed| 2215|
+-----------------------+-----+



# Data Preparation
## 3.1 Select the data
### 3.1.3 Excluding the data

In [21]:
#Exclude data from the Collisions table
collision_to_drop = ['primary_road', 'secondary_road', 'distance', 'direction',
       'intersection', 'weather_2', 'location_type',
       'ramp_intersection', 'side_of_highway', 'injured_victims', 'party_count',
       'primary_collision_factor', 'pcf_violation_code',
       'pcf_violation_category', 'hit_and_run', 'type_of_collision',
       'motor_vehicle_involved_with', 'pedestrian_action', 'road_surface',
       'road_condition_2','pedestrian_collision', 'bicycle_collision', 'motorcycle_collision',
       'truck_collision', 'not_private_property',
       'severe_injury_count', 'other_visible_injury_count',
       'complaint_of_pain_injury_count', 'pedestrian_killed_count',
       'pedestrian_injured_count', 'bicyclist_killed_count',
       'bicyclist_injured_count', 'motorcyclist_killed_count',
       'motorcyclist_injured_count', 'collision_time',
       'process_date']
collision_clean = collision.drop(*collision_to_drop)
collision_clean.show()

+-------+---------+------------------+--------------+----------------+--------------------+----------------+--------------+
|case_id|weather_1|collision_severity|killed_victims|road_condition_1|            lighting|alcohol_involved|collision_date|
+-------+---------+------------------+--------------+----------------+--------------------+----------------+--------------+
|6292121|   cloudy|             fatal|             1|          normal|dark with street ...|            null|    18/01/2015|
|6292127|    clear|             fatal|             1|          normal|            daylight|               1|    18/02/2015|
|6292128|    clear|             fatal|             1|          normal|            daylight|            null|    19/02/2015|
|6292133|   cloudy|             fatal|             1|    construction|dark with street ...|               1|    20/02/2015|
|6292134|    clear|             fatal|             1|          normal|dark with no stre...|            null|    20/02/2015|
|6292140

In [22]:
collision_clean.printSchema()

root
 |-- case_id: string (nullable = true)
 |-- weather_1: string (nullable = true)
 |-- collision_severity: string (nullable = true)
 |-- killed_victims: string (nullable = true)
 |-- road_condition_1: string (nullable = true)
 |-- lighting: string (nullable = true)
 |-- alcohol_involved: string (nullable = true)
 |-- collision_date: string (nullable = true)



In [23]:
#Exclude data from the Parties table
parties_to_drop = ['party_number', 'party_type', 'at_fault', 'party_sex',
       'party_age', 'party_drug_physical',
       'direction_of_travel', 'party_safety_equipment_1',
       'party_safety_equipment_2', 'financial_responsibility',
       'hazardous_materials', 'oaf_violation_code',
       'oaf_violation_category', 'oaf_violation_section',
       'oaf_violation_suffix', 'other_associate_factor_1',
       'other_associate_factor_2', 'party_number_killed',
       'party_number_injured', 'vehicle_year',
       'vehicle_make', 'statewide_vehicle_type', 'party_race']
parties_clean = parties.drop(*parties_to_drop)
parties_clean.show()

+-------+-------+--------------+-------------+----------------------------+
|     id|case_id|party_sobriety|cellphone_use|movement_preceding_collision|
+-------+-------+--------------+-------------+----------------------------+
|4734135|6292121|             A|            3|            making left turn|
|4734136|6292121|          null|            3|         proceeding straight|
|4734144|6292127|             C|            3|         proceeding straight|
|4734145|6292127|             H|            3|                      parked|
|4734146|6292128|             G|            3|        crossed into oppo...|
|4734147|6292128|             A|            3|         proceeding straight|
|4734152|6292133|             B|            3|                       other|
|4734153|6292133|             A|            3|         proceeding straight|
|4734154|6292134|             A|            3|        crossed into oppo...|
|4734155|6292134|          null|            3|         proceeding straight|
|4734165|629

## 3.2 Clean the data
### 3.2.1 Measurement Errors - ALCOHOL_INVOLVED


In [24]:
collision_clean = collision_clean.na.fill(value="0", subset=['alcohol_involved'])

In [25]:
#Showing specific columns summary
collision_clean.select("alcohol_involved").describe().show()
collision_clean.groupBy('alcohol_involved').count().orderBy('count', ascending=False).show()

+-------+-------------------+
|summary|   alcohol_involved|
+-------+-------------------+
|  count|              69160|
|   mean|0.07372758820127241|
| stddev| 0.2613289467328907|
|    min|                  0|
|    max|                  1|
+-------+-------------------+

+----------------+-----+
|alcohol_involved|count|
+----------------+-----+
|               0|64061|
|               1| 5099|
+----------------+-----+



In [26]:
print(str(collision_clean.count()))
print(str(len(collision_clean.columns)))

69160
8


### 3.2.2 Missing Values

In [27]:
#Filter out null values in the collision table
collision_clean = collision_clean.na.drop(subset=["lighting", "weather_1", "road_condition_1"])
collision_clean.count()
print(str(collision_clean.count()))
print(str(len(collision_clean.columns)))

68588
8


In [28]:
collision_clean.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in collision_clean.columns]).show()

+-------+---------+------------------+--------------+----------------+--------+----------------+--------------+
|case_id|weather_1|collision_severity|killed_victims|road_condition_1|lighting|alcohol_involved|collision_date|
+-------+---------+------------------+--------------+----------------+--------+----------------+--------------+
|      0|        0|                 0|             0|               0|       0|               0|             0|
+-------+---------+------------------+--------------+----------------+--------+----------------+--------------+



In [29]:
#Filter out null values in the parties table
parties_clean = parties_clean.na.drop(subset=["party_sobriety", "movement_preceding_collision"])
parties_clean.count()
print(str(parties_clean.count()))
print(str(len(parties_clean.columns)))

127076
5


In [30]:
parties_clean.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in parties_clean.columns]).show()

+---+-------+--------------+-------------+----------------------------+
| id|case_id|party_sobriety|cellphone_use|movement_preceding_collision|
+---+-------+--------------+-------------+----------------------------+
|  0|      0|             0|        11598|                           0|
+---+-------+--------------+-------------+----------------------------+



### 3.2.3 Code inconsistencies – CELLPHONE_USE

In [31]:
#Relabelling cellphone_use
parties_clean = parties_clean.withColumn("cellphone_use", when(col("cellphone_use") == '1',"Cell Phone in Use") \
      .when(col("cellphone_use") == '2',"Cell Phone in Use") \
      .when(col("cellphone_use") == 'B',"Cell Phone in Use") \
      .when(col("cellphone_use") == '3',"Cell Phone Not in Use") \
      .when(col("cellphone_use") == 'C',"Cell Phone Not in Use") \
      .otherwise("No Cellphone/Unknown"))

In [32]:
parties_clean.groupBy('cellphone_use').count().orderBy('count', ascending=False).show()

+--------------------+------+
|       cellphone_use| count|
+--------------------+------+
|Cell Phone Not in...|113832|
|No Cellphone/Unknown| 12256|
|   Cell Phone in Use|   988|
+--------------------+------+



## 3.3 Construct the data/ Deriving a New Feature

In [33]:
from pyspark.sql.functions import unix_timestamp, from_unixtime
date_test = collision_clean.withColumn('formatted_date',
            from_unixtime(unix_timestamp('collision_date', 'dd/MM/yyyy')))

In [34]:
from pyspark.sql.functions import month

month_test = date_test.withColumn('month',month(date_test.formatted_date))
month_test.select("collision_date","formatted_date","month").show()

+--------------+-------------------+-----+
|collision_date|     formatted_date|month|
+--------------+-------------------+-----+
|    18/01/2015|2015-01-18 00:00:00|    1|
|    18/02/2015|2015-02-18 00:00:00|    2|
|    19/02/2015|2015-02-19 00:00:00|    2|
|    20/02/2015|2015-02-20 00:00:00|    2|
|    20/02/2015|2015-02-20 00:00:00|    2|
|    23/02/2015|2015-02-23 00:00:00|    2|
|    24/02/2015|2015-02-24 00:00:00|    2|
|    28/02/2015|2015-02-28 00:00:00|    2|
|     1/02/2015|2015-02-01 00:00:00|    2|
|    15/02/2015|2015-02-15 00:00:00|    2|
|    20/02/2015|2015-02-20 00:00:00|    2|
|    20/02/2015|2015-02-20 00:00:00|    2|
|    18/03/2015|2015-03-18 00:00:00|    3|
|    12/03/2015|2015-03-12 00:00:00|    3|
|    15/03/2015|2015-03-15 00:00:00|    3|
|    10/01/2015|2015-01-10 00:00:00|    1|
|    15/03/2015|2015-03-15 00:00:00|    3|
|    17/03/2015|2015-03-17 00:00:00|    3|
|    11/04/2015|2015-04-11 00:00:00|    4|
|    20/04/2015|2015-04-20 00:00:00|    4|
+----------

In [35]:
month_test = month_test.withColumn("season", when((col("month") == 12) | (col("month") < 3),"winter") \
      .when((col("month") >= 3) & (col("month") < 6) ,"spring") \
      .when((col("month") >= 6) & (col("month") < 9),"summer") \
      .otherwise("autumn"))
month_test.select("collision_date","formatted_date","month","season").show()

+--------------+-------------------+-----+------+
|collision_date|     formatted_date|month|season|
+--------------+-------------------+-----+------+
|    18/01/2015|2015-01-18 00:00:00|    1|winter|
|    18/02/2015|2015-02-18 00:00:00|    2|winter|
|    19/02/2015|2015-02-19 00:00:00|    2|winter|
|    20/02/2015|2015-02-20 00:00:00|    2|winter|
|    20/02/2015|2015-02-20 00:00:00|    2|winter|
|    23/02/2015|2015-02-23 00:00:00|    2|winter|
|    24/02/2015|2015-02-24 00:00:00|    2|winter|
|    28/02/2015|2015-02-28 00:00:00|    2|winter|
|     1/02/2015|2015-02-01 00:00:00|    2|winter|
|    15/02/2015|2015-02-15 00:00:00|    2|winter|
|    20/02/2015|2015-02-20 00:00:00|    2|winter|
|    20/02/2015|2015-02-20 00:00:00|    2|winter|
|    18/03/2015|2015-03-18 00:00:00|    3|spring|
|    12/03/2015|2015-03-12 00:00:00|    3|spring|
|    15/03/2015|2015-03-15 00:00:00|    3|spring|
|    10/01/2015|2015-01-10 00:00:00|    1|winter|
|    15/03/2015|2015-03-15 00:00:00|    3|spring|


In [None]:
month_test.groupBy('season').count().orderBy('count', ascending=False).show()
month_test.printSchema()

In [None]:
collision_clean = month_test

In [None]:
#save to csv files
#collision_clean.write.csv('collision_clean.csv')
#parties_clean.write.csv('parties_clean.csv')

In [None]:
print(str(collision_clean.count()))
print(str(len(collision_clean.columns)))
print(str(parties_clean.count()))
print(str(len(parties_clean.columns)))

## 3.4 Integrate various data sources

In [None]:
join_table = collision_clean.join(parties_clean, "case_id")
print(str(join_table.count()))
print(str(len(join_table.columns)))

In [None]:
join_table.show()
join_table.write.csv('join_table.csv',header=True)