In [1]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import *

In [2]:
spark = SparkSession.builder.appName("CarCrashCaseStudy").master("local[*]").getOrCreate()

In [3]:
df_charge = spark.read.csv("Data/Charges_use.csv", header=True, inferSchema=True)
df_damage = spark.read.csv("Data/Damages_use.csv", header=True, inferSchema=True)
df_Endorse = spark.read.csv("Data/Endorse_use.csv", header=True, inferSchema=True)
df_person = spark.read.csv("Data/Primary_Person_use.csv", header=True, inferSchema=True)
df_restrict = spark.read.csv("Data/Restrict_use.csv", header=True, inferSchema=True)
df_units = spark.read.csv("Data/Units_use.csv", header=True, inferSchema=True)

In [4]:
#answer 1 ------------------------

df_person_male_killed = df_person\
.select('crash_id', 'unit_nbr', 'prsn_nbr', 'PRSN_GNDR_ID', 'PRSN_INJRY_SEV_ID').distinct()\
.where( (upper(col('PRSN_GNDR_ID'))=='MALE') & (upper(col('PRSN_INJRY_SEV_ID'))=='KILLED') ).orderBy('crash_id')

df_male_killed_more_than_2 = df_person_male_killed.groupBy('crash_id').agg(count('*').alias('cnt')).filter(col('cnt')>2)

df_male_killed_more_than_2.count()

0

In [5]:
#answer 2 -----------------------

df_units_2_wheeler = df_units.select('crash_id', 'unit_nbr', 'VEH_BODY_STYL_ID').distinct()\
.where(upper(col('VEH_BODY_STYL_ID')).like('%MOTORCYCLE%'))

df_units_2_wheeler.count()

773

In [6]:
#answer 3 -----------

df_units_filter = df_units.select('crash_id', 'unit_nbr', 'VEH_BODY_STYL_ID', 'VEH_MAKE_ID').distinct()\
.where(upper(col('VEH_BODY_STYL_ID')).like('%CAR%'))

df_person_filter = df_person\
.select('crash_id', 'unit_nbr', 'prsn_nbr', 'PRSN_TYPE_ID', 'PRSN_AIRBAG_ID', 'DEATH_CNT', 'PRSN_INJRY_SEV_ID').distinct()\
.where( (upper(col('PRSN_TYPE_ID'))=='DRIVER') & (upper(col('PRSN_AIRBAG_ID'))=='NOT DEPLOYED') & (upper(col('PRSN_INJRY_SEV_ID'))=='KILLED') )

df_person_unit_merge = df_units_filter.join(df_person_filter, ['crash_id', 'unit_nbr'], 'inner')\
.groupBy('VEH_MAKE_ID').agg(count('*').alias('brand_cnt')).orderBy(col('brand_cnt').desc()).limit(5).select('VEH_MAKE_ID')

df_person_unit_merge.show()

+-----------+
|VEH_MAKE_ID|
+-----------+
|     NISSAN|
|  CHEVROLET|
|       FORD|
|      HONDA|
|    PONTIAC|
+-----------+



In [7]:
#answer 4 ------------

df_units_filter = df_units.select('crash_id', 'unit_nbr', 'VEH_HNR_FL').distinct()\
.where(col('VEH_HNR_FL')=='Y')

df_person_filter = df_person\
.select('crash_id', 'unit_nbr', 'prsn_nbr', 'PRSN_TYPE_ID', 'DRVR_LIC_TYPE_ID').distinct()\
.where( upper(col('PRSN_TYPE_ID')).isin('DRIVER', 'DRIVER OF MOTORCYCLE TYPE VEHICLE') & upper(col('DRVR_LIC_TYPE_ID')).isin('COMMERCIAL DRIVER LIC.', 'OCCUPATIONAL', 'DRIVER LICENSE') )

df_person_unit_merge = df_units_filter.join(df_person_filter, ['crash_id', 'unit_nbr'], 'inner')

df_person_unit_merge.count()

2575

In [8]:
#answer 5 ------------

"""
The question can be interpreted in two ways:

1. Consider only the gender of the drivers.
2. Consider both the gender of the drivers and passengers.

Step: going forward with first option
"""

df_person_filter = df_person.select('crash_id', 'unit_nbr', 'prsn_nbr', 'PRSN_TYPE_ID', 'DRVR_LIC_STATE_ID', 'PRSN_GNDR_ID').distinct()

df_person_filter.persist()

df_person_female_involved = df_person_filter.where(upper(col('PRSN_GNDR_ID'))=='FEMALE').select('crash_id').distinct()

crash_ids_female_involved = [row.crash_id for row in df_person_female_involved.collect()]

df_person_states = df_person_filter.filter(~col('crash_id').isin(crash_ids_female_involved))\
.groupBy('DRVR_LIC_STATE_ID').agg(countDistinct('crash_id').alias('no_of_accidents')).orderBy(col('no_of_accidents').desc())\
.select('DRVR_LIC_STATE_ID').limit(1)

df_person_states.show()

+-----------------+
|DRVR_LIC_STATE_ID|
+-----------------+
|            Texas|
+-----------------+



In [9]:
#answer 6 ---------

df_units_filter = df_units.select('crash_id', 'unit_nbr', 'VEH_MAKE_ID', 'TOT_INJRY_CNT', 'DEATH_CNT').distinct()

df_injury_counts = df_units_filter.groupBy("VEH_MAKE_ID")\
.agg(sum(col('TOT_INJRY_CNT')).alias('INJURY_COUNT'), sum(col('DEATH_CNT')).alias('DEATH_COUNT'))

df_total_injury_counts = df_injury_counts.withColumn("TOTAL_INJURIES_COUNT", col("INJURY_COUNT") + col("DEATH_COUNT"))\
.select('VEH_MAKE_ID', 'TOTAL_INJURIES_COUNT')

window_spec = Window.orderBy(col('TOTAL_INJURIES_COUNT').desc())

df_top_3_to_5_brands = df_total_injury_counts.withColumn('rn', row_number().over(window_spec)).filter(col('rn').between(3,5))\
.drop('rn')

df_top_3_to_5_brands.show()

+-----------+--------------------+
|VEH_MAKE_ID|TOTAL_INJURIES_COUNT|
+-----------+--------------------+
|     TOYOTA|                4227|
|      DODGE|                3136|
|     NISSAN|                3113|
+-----------+--------------------+



In [10]:
#answer 7 ----------

df_units_filter = df_units.select('crash_id', 'unit_nbr', 'VEH_BODY_STYL_ID').distinct()\
.where(~col('VEH_BODY_STYL_ID').isin('NA', 'UNKNOWN', 'OTHER  (EXPLAIN IN NARRATIVE)', 'NOT REPORTED'))

df_person_filter = df_person.select('crash_id', 'unit_nbr', 'prsn_nbr', 'PRSN_ETHNICITY_ID').distinct()\
.where(~col('PRSN_ETHNICITY_ID').isin('NA', 'UNKNOWN', 'OTHER'))

df_person_unit_merge = df_units_filter.join(df_person_filter, ['crash_id', 'unit_nbr'], 'inner')\
.groupBy(['VEH_BODY_STYL_ID', 'PRSN_ETHNICITY_ID']).agg(count('*').alias('person_count'))\
.orderBy('VEH_BODY_STYL_ID', col('person_count').desc())

window_spec = Window.partitionBy('VEH_BODY_STYL_ID').orderBy(col('VEH_BODY_STYL_ID').desc())

df_top_ethnic = df_person_unit_merge.withColumn('rn', row_number().over(window_spec))\
.where(col('rn')==1).select('VEH_BODY_STYL_ID', 'PRSN_ETHNICITY_ID')

df_top_ethnic.show(truncate=False)

+---------------------------------+-----------------+
|VEH_BODY_STYL_ID                 |PRSN_ETHNICITY_ID|
+---------------------------------+-----------------+
|AMBULANCE                        |WHITE            |
|BUS                              |BLACK            |
|FARM EQUIPMENT                   |WHITE            |
|FIRE TRUCK                       |WHITE            |
|MOTORCYCLE                       |WHITE            |
|NEV-NEIGHBORHOOD ELECTRIC VEHICLE|WHITE            |
|PASSENGER CAR, 2-DOOR            |WHITE            |
|PASSENGER CAR, 4-DOOR            |WHITE            |
|PICKUP                           |WHITE            |
|POLICE CAR/TRUCK                 |WHITE            |
|POLICE MOTORCYCLE                |WHITE            |
|SPORT UTILITY VEHICLE            |WHITE            |
|TRUCK                            |WHITE            |
|TRUCK TRACTOR                    |WHITE            |
|VAN                              |WHITE            |
|YELLOW SCHOOL BUS          

In [11]:
#answer 8 -----------

df_units_filter = df_units.select('crash_id', 'unit_nbr', 'CONTRIB_FACTR_1_ID', 'CONTRIB_FACTR_2_ID').distinct()\
.where(upper(col('CONTRIB_FACTR_1_ID')).like('%ALCOHOL%') | upper(col('CONTRIB_FACTR_2_ID')).like('%ALCOHOL%'))

df_person_filter = df_person.select('crash_id', 'unit_nbr', 'prsn_nbr', 'DRVR_ZIP').distinct()\
.where(col('DRVR_ZIP').isNotNull())

df_person_unit_merge = df_units_filter.join(df_person_filter, ['crash_id', 'unit_nbr'], 'inner')\
.groupBy('DRVR_ZIP').agg(countDistinct('crash_id').alias('crash_count')).orderBy(col('crash_count').desc()).limit(5)

df_person_unit_merge.show()

+--------+-----------+
|DRVR_ZIP|crash_count|
+--------+-----------+
|   76010|         52|
|   78521|         40|
|   75067|         37|
|   78130|         32|
|   78542|         31|
+--------+-----------+



In [12]:
#answer 9 ----------

df_damage_filter = df_damage.select('crash_id', 'DAMAGED_PROPERTY').distinct()\
.where(col('DAMAGED_PROPERTY').like('%NONE%'))

df_units_filter = df_units.select('crash_id', 'unit_nbr', 'VEH_DMAG_SCL_1_ID', 'VEH_DMAG_SCL_2_ID', 'FIN_RESP_TYPE_ID').distinct()\
.withColumn("VEH_DMAG_SCL_1_value", regexp_extract("VEH_DMAG_SCL_1_ID", r"DAMAGED\s*(\d+)", 1).cast('int'))\
.withColumn("VEH_DMAG_SCL_2_value", regexp_extract("VEH_DMAG_SCL_2_ID", r"DAMAGED\s*(\d+)", 1).cast('int'))\
.where( (col('VEH_DMAG_SCL_1_value').isNotNull() & (col('VEH_DMAG_SCL_1_value')>4)
       | col('VEH_DMAG_SCL_2_value').isNotNull() & (col('VEH_DMAG_SCL_2_value')>4))
       & upper(col('FIN_RESP_TYPE_ID')).like('%INSURANCE%')
      )\
.select('crash_id', 'unit_nbr', 'VEH_DMAG_SCL_1_value', 'VEH_DMAG_SCL_2_value').distinct()

df_damage_unit_merge = df_units_filter.join(df_damage_filter, ['crash_id'], 'inner')\
.select('crash_id').distinct()

df_damage_unit_merge.count()

8

In [13]:
#answer 10

df_units_filter = df_units.select('crash_id', 'unit_nbr', 'VEH_COLOR_ID', 'VEH_LIC_STATE_ID', 'VEH_MAKE_ID').distinct()

df_charge_filter = df_charge.select('crash_id', 'unit_nbr', 'prsn_nbr', 'charge').distinct()\
.where(upper(col('charge')).like('%SPEED%'))

df_person_filter = df_person.select('crash_id', 'unit_nbr', 'prsn_nbr', 'DRVR_LIC_TYPE_ID').distinct()\
.where(upper(col('DRVR_LIC_TYPE_ID')).isin("DRIVER LICENSE", "COMMERCIAL DRIVER LIC."))

df_top_10_vehicle_colors = df_units_filter.where(~col('VEH_COLOR_ID').isin('NA','99'))\
.groupBy('VEH_COLOR_ID').agg(count('*').alias('color_count')).orderBy(col('color_count').desc()).limit(10)

top_10_vehicle_colors = [row["VEH_COLOR_ID"] for row in df_top_10_vehicle_colors.collect()]

df_top_25_states = df_units_filter.where(~col('VEH_LIC_STATE_ID').isin('98', 'NA'))\
.groupBy('VEH_LIC_STATE_ID').agg(count('*').alias('state_count')).orderBy(col('state_count').desc()).limit(25)

top_25_states = [row["VEH_LIC_STATE_ID"] for row in df_top_25_states.collect()]

df_charge_person_merge = df_charge_filter.join(df_person_filter, ['crash_id', 'unit_nbr', 'prsn_nbr'], 'inner')\
.join(df_units_filter, ['crash_id', 'unit_nbr'], 'inner').distinct()\
.where(col('VEH_COLOR_ID').isin(top_10_vehicle_colors) & col('VEH_LIC_STATE_ID').isin(top_25_states))\
.orderBy(col('crash_id').desc())\
# .select('crash_id', 'unit_nbr', 'VEH_MAKE_ID').distinct()

df_vehicle_agg = df_charge_person_merge.groupBy('VEH_MAKE_ID').agg(count('*').alias('offence_count'))\
.orderBy(col('offence_count').desc()).limit(5)

df_vehicle_agg.show()

+-----------+-------------+
|VEH_MAKE_ID|offence_count|
+-----------+-------------+
|       FORD|         4235|
|  CHEVROLET|         3685|
|     TOYOTA|         2259|
|      DODGE|         1910|
|      HONDA|         1382|
+-----------+-------------+

