In [0]:
#importing required modules

from pyspark.sql import SparkSession
from pyspark.sql.functions import * 
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
# creating Spark Session
spark = SparkSession \
        .builder \
        .master("local")\
        .appName("CarCrash_Spark_Application") \
        .getOrCreate()

In [0]:
# creating dataframes with data in the source path

df_damages = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("Data/Damages_use.csv")
df_endorse = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("Data/Endorse_use.csv")
df_charges = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("Data/Charges_use.csv")
df_restrict = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("Data/Restrict_use.csv")
df_primperson = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("Datam/Primary_Person_use.csv")
df_units = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("Data/Units_use.csv")

Analytics 1: Find the number of crashes (accidents) in which number of males killed are greater than 2?

In [0]:
# Different persons(PRSN_GNDR_ID) that are involved in the crashes
df_primperson.select('PRSN_GNDR_ID').distinct().show()

# finding CRASH_ID in which number of males killed is greater than 2
df = df_primperson.filter((col('PRSN_GNDR_ID')=='MALE') & (col('DEATH_CNT')==1)). \
                    groupBy('CRASH_ID').agg(sum(col("DEATH_CNT")).alias("NO_OF_MALES_KILLED")). \
                    filter(col("NO_OF_MALES_KILLED")>2)
df.show()


+------------+
|PRSN_GNDR_ID|
+------------+
|          NA|
|     UNKNOWN|
|        MALE|
|      FEMALE|
+------------+

+--------+------------------+
|CRASH_ID|NO_OF_MALES_KILLED|
+--------+------------------+
+--------+------------------+



Analysis 2: How many two wheelers are booked for crashes? 

In [0]:
# finding No of two wheelers that are booked for crashes

df_two_wheelers = df_units.filter(col('VEH_BODY_STYL_ID').contains("MOTORCYCLE")). \
                            select(countDistinct('VIN').alias('NO_OF_DISTINCT_TWO_WHEELERS'))
df_two_wheelers.show()


+---------------------------+
|NO_OF_DISTINCT_TWO_WHEELERS|
+---------------------------+
|                        766|
+---------------------------+



In [0]:
 df_units.select('VEH_BODY_STYL_ID').distinct().show()

+--------------------+
|    VEH_BODY_STYL_ID|
+--------------------+
|                 BUS|
|                  NA|
|                 VAN|
|              PICKUP|
|SPORT UTILITY VEH...|
|PASSENGER CAR, 4-...|
|          FIRE TRUCK|
|               TRUCK|
|             UNKNOWN|
|           AMBULANCE|
|    POLICE CAR/TRUCK|
|          MOTORCYCLE|
|   YELLOW SCHOOL BUS|
|PASSENGER CAR, 2-...|
|       TRUCK TRACTOR|
|      FARM EQUIPMENT|
|NEV-NEIGHBORHOOD ...|
|OTHER  (EXPLAIN I...|
|   POLICE MOTORCYCLE|
|        NOT REPORTED|
+--------------------+



Analysis 3: Determine the Top 5 Vehicle Makes of the cars present in the crashes in which driver died and Airbags did not deploy.

In [0]:
# Type of vehicles based on the different vehicle body styles
df_units.select("VEH_BODY_STYL_ID").distinct().show(truncate=False)

# finding value types of PRSN_AIRBAG_ID in case airbags are not deployed
df_primperson.select('PRSN_AIRBAG_ID').distinct().show()

+---------------------------------+
|VEH_BODY_STYL_ID                 |
+---------------------------------+
|BUS                              |
|NA                               |
|VAN                              |
|PICKUP                           |
|SPORT UTILITY VEHICLE            |
|PASSENGER CAR, 4-DOOR            |
|FIRE TRUCK                       |
|TRUCK                            |
|UNKNOWN                          |
|AMBULANCE                        |
|POLICE CAR/TRUCK                 |
|MOTORCYCLE                       |
|YELLOW SCHOOL BUS                |
|PASSENGER CAR, 2-DOOR            |
|TRUCK TRACTOR                    |
|FARM EQUIPMENT                   |
|NEV-NEIGHBORHOOD ELECTRIC VEHICLE|
|OTHER  (EXPLAIN IN NARRATIVE)    |
|POLICE MOTORCYCLE                |
|NOT REPORTED                     |
+---------------------------------+

+-----------------+
|   PRSN_AIRBAG_ID|
+-----------------+
|               NA|
|     NOT DEPLOYED|
|   DEPLOYED, SIDE|
|          UNKN

In [0]:
# finding the Top 5 Vehicle Makes of the CARS present in the crashes in which DRIVEDR DIED and Airbags DID NOT DEPLOY.

df = df_primperson.alias("pp").join(df_units,on=['CRASH_ID','UNIT_NBR'],how="inner"). \
                    filter((col('PRSN_TYPE_ID').contains('DRIVER')) & ((col('PRSN_AIRBAG_ID')=='NOT DEPLOYED')) & ((col('pp.DEATH_CNT')==1))). \
                    filter(col('VEH_BODY_STYL_ID').contains('CAR')). \
                    groupBy('VEH_MAKE_ID').agg(countDistinct("VIN").alias('COUNT_OF_VEHICLES_PER_VEH_MAKE')). \
                    orderBy(col('COUNT_OF_VEHICLES_PER_VEH_MAKE').desc(),col('VEH_MAKE_ID').asc()).limit(5)

df.show()


+-----------+------------------------------+
|VEH_MAKE_ID|COUNT_OF_VEHICLES_PER_VEH_MAKE|
+-----------+------------------------------+
|     NISSAN|                             4|
|  CHEVROLET|                             3|
|       FORD|                             2|
|      HONDA|                             2|
|      BUICK|                             1|
+-----------+------------------------------+



Analysis 4: Determine number of Vehicles with driver having valid licences involved in hit and run? 

In [0]:
# Distinct type of persons involved in Crashes
df_primperson.select("PRSN_TYPE_ID").distinct().show(truncate=False)

# Finding what can be the valid Driver license types and the Driver license class ID
df_primperson.select('DRVR_LIC_TYPE_ID','DRVR_LIC_CLS_ID').distinct().orderBy('DRVR_LIC_CLS_ID').show(50,truncate=False)


+---------------------------------+
|PRSN_TYPE_ID                     |
+---------------------------------+
|PEDESTRIAN                       |
|DRIVER OF MOTORCYCLE TYPE VEHICLE|
|PASSENGER/OCCUPANT               |
|DRIVER                           |
|UNKNOWN                          |
|OTHER (EXPLAIN IN NARRATIVE)     |
|PEDALCYCLIST                     |
+---------------------------------+

+----------------------+------------------+
|DRVR_LIC_TYPE_ID      |DRVR_LIC_CLS_ID   |
+----------------------+------------------+
|DRIVER LICENSE        |CLASS A           |
|COMMERCIAL DRIVER LIC.|CLASS A           |
|OCCUPATIONAL          |CLASS A           |
|OTHER                 |CLASS A           |
|DRIVER LICENSE        |CLASS A AND M     |
|COMMERCIAL DRIVER LIC.|CLASS A AND M     |
|DRIVER LICENSE        |CLASS B           |
|COMMERCIAL DRIVER LIC.|CLASS B           |
|OTHER                 |CLASS B           |
|DRIVER LICENSE        |CLASS B AND M     |
|COMMERCIAL DRIVER LIC.|CLASS B

In [0]:
# Considering VALID DRIVER LICENSES as the ones with class ID in the list->
# [CLASS A, CLASS B, CLASS C, CLASS M, CLASS A AND M, CLASS B AND M, CLASS C AND M]

# Finding Number of distinct Vehicles where PRSN_TYPE_ID is any DRIVER having a VALID LICENSE and involved in HIT and RUN 

df_no_of_vehicles = df_primperson.join(df_units,on=['CRASH_ID','UNIT_NBR'],how="inner"). \
                                filter((col('PRSN_TYPE_ID').contains('DRIVER'))&(col('DRVR_LIC_CLS_ID').contains('CLASS'))). \
                                filter(col('VEH_HNR_FL')=='Y'). \
                                select(countDistinct("VIN").alias("NO_OF_VEHICLES"))

df_no_of_vehicles.show()



+--------------+
|NO_OF_VEHICLES|
+--------------+
|          2365|
+--------------+



Analysis 5: Which state has highest number of accidents in which females are not involved? 

In [0]:
# Finding Crash IDs where No Female is involved
df_crash_with_no_female = df_primperson.withColumn('NO_OF_FEMALES',when(col('PRSN_GNDR_ID')=='FEMALE',1).otherwise(0)). \
                                        groupBy(col('CRASH_ID')).agg(sum(col('NO_OF_FEMALES')).alias('NO_OF_FEMALES_PER_CRASH')). \
                                        filter(col('NO_OF_FEMALES_PER_CRASH')==0).select('CRASH_ID')

# State that have highest number of accidents in which No Females are involved
df_state_most_accidents = df_crash_with_no_female.join(df_primperson,on='CRASH_ID',how='inner'). \
                                                            groupBy('DRVR_LIC_STATE_ID').count(). \
                                                            filter(~col("DRVR_LIC_STATE_ID").isin("Other","Unknown","NA")). \
                                                            orderBy(col("count").desc()).limit(1)
df_state_most_accidents.show()


+-----------------+-----+
|DRVR_LIC_STATE_ID|count|
+-----------------+-----+
|            Texas|51957|
+-----------------+-----+



Analysis 6: Which are the Top 3rd to 5th VEH_MAKE_IDs that contribute to a largest number of injuries including death

In [0]:
# creating Window object for row_number() to be used next 
w = Window.orderBy(col("TOTAL_INJURIES_VEH_MAKEID").desc())


# Finding Top 3rd to 5th VEH_MAKE_IDs that contribute to the largest number of injuries including death    
                                             
df_3to5_veh_make_id = df_units.withColumn('TOTAL_INJURIES', col('TOT_INJRY_CNT')+col('DEATH_CNT')). \
                            groupby("VEH_MAKE_ID").agg(sum(col("TOTAL_INJURIES")).alias('TOTAL_INJURIES_VEH_MAKEID')). \
                            withColumn("row", row_number().over(w)).filter(col("row").isin([3,4,5]))

df_3to5_veh_make_id.show(truncate=False)
       

+-----------+-------------------------+---+
|VEH_MAKE_ID|TOTAL_INJURIES_VEH_MAKEID|row|
+-----------+-------------------------+---+
|TOYOTA     |4228                     |3  |
|DODGE      |3146                     |4  |
|NISSAN     |3118                     |5  |
+-----------+-------------------------+---+



Analysis 7: For all the body styles involved in crashes, mention the top ethnic user group of each unique body style

In [0]:
# Unique vehicle body styles involved in crashes
df_units.select('VEH_BODY_STYL_ID').distinct().show(truncate=False)

# Distinct ethnic groups of people involved in Crashes
df_primperson.select('PRSN_ETHNICITY_ID').distinct().orderBy("PRSN_ETHNICITY_ID").show(truncate=False)

+---------------------------------+
|VEH_BODY_STYL_ID                 |
+---------------------------------+
|BUS                              |
|NA                               |
|VAN                              |
|PICKUP                           |
|SPORT UTILITY VEHICLE            |
|PASSENGER CAR, 4-DOOR            |
|FIRE TRUCK                       |
|TRUCK                            |
|UNKNOWN                          |
|AMBULANCE                        |
|POLICE CAR/TRUCK                 |
|MOTORCYCLE                       |
|YELLOW SCHOOL BUS                |
|PASSENGER CAR, 2-DOOR            |
|TRUCK TRACTOR                    |
|FARM EQUIPMENT                   |
|NEV-NEIGHBORHOOD ELECTRIC VEHICLE|
|OTHER  (EXPLAIN IN NARRATIVE)    |
|POLICE MOTORCYCLE                |
|NOT REPORTED                     |
+---------------------------------+

+---------------------------+
|PRSN_ETHNICITY_ID          |
+---------------------------+
|AMER. INDIAN/ALASKAN NATIVE|
|ASIAN         

In [0]:
# creating Window object for row_number() to be used next 
w = Window.partitionBy("VEH_BODY_STYL_ID").orderBy(col("count").desc())

# Finding top ethnic user group for each unique vehicle body style involved in Crashes

df=df_primperson.alias('pp').join(df_units.alias('u'),on=['CRASH_ID','UNIT_NBR'],how='inner'). \
                filter(~col('u.VEH_BODY_STYL_ID').isin(["NA", "UNKNOWN", "NOT REPORTED", "OTHER  (EXPLAIN IN NARRATIVE)"])). \
                filter(~col('pp.PRSN_ETHNICITY_ID').isin(["NA", "UNKNOWN"])). \
                groupby("VEH_BODY_STYL_ID", "PRSN_ETHNICITY_ID").count(). \
                withColumn("row", row_number().over(w)).filter(col("row") == 1).drop("row", "count")

df.show(truncate=False)

+---------------------------------+-----------------+
|VEH_BODY_STYL_ID                 |PRSN_ETHNICITY_ID|
+---------------------------------+-----------------+
|AMBULANCE                        |WHITE            |
|BUS                              |HISPANIC         |
|FARM EQUIPMENT                   |WHITE            |
|FIRE TRUCK                       |WHITE            |
|MOTORCYCLE                       |WHITE            |
|NEV-NEIGHBORHOOD ELECTRIC VEHICLE|WHITE            |
|PASSENGER CAR, 2-DOOR            |WHITE            |
|PASSENGER CAR, 4-DOOR            |WHITE            |
|PICKUP                           |WHITE            |
|POLICE CAR/TRUCK                 |WHITE            |
|POLICE MOTORCYCLE                |WHITE            |
|SPORT UTILITY VEHICLE            |WHITE            |
|TRUCK                            |WHITE            |
|TRUCK TRACTOR                    |WHITE            |
|VAN                              |WHITE            |
|YELLOW SCHOOL BUS          

Analysis 8: Among the crashed cars, what are the Top 5 Zip Codes with highest number crashes with alcohols as the contributing factor to a crash (Use Driver Zip Code)

In [0]:
# Top 5 Zip Codes with highest number crashes with alcohols as the contributing factor to a crash

df = df_units.join(df_primperson, on=['CRASH_ID'], how='inner'). \
            dropna(subset=["DRVR_ZIP"]). \
            filter(col("CONTRIB_FACTR_1_ID").contains("ALCOHOL") | col("CONTRIB_FACTR_2_ID").contains("ALCOHOL")). \
            groupby("DRVR_ZIP").count().orderBy(col("count").desc()).limit(5).drop("count")
df.show()            

+--------+
|DRVR_ZIP|
+--------+
|   76010|
|   78521|
|   75067|
|   78574|
|   75052|
+--------+



Analysis 9: Count of Distinct Crash IDs where No Damaged Property was observed and Damage Level (VEH_DMAG_SCL~) is above 4 and car avails Insurance

In [0]:
# Different kinds of Insurance that the vehicle can possess

df_units.select('FIN_RESP_TYPE_ID').distinct().show(truncate=False)

# Different types of damages observed during the crashes

df_units.select('VEH_DMAG_SCL_1_ID').distinct().orderBy('VEH_DMAG_SCL_1_ID').show()

+----------------------------------------+
|FIN_RESP_TYPE_ID                        |
+----------------------------------------+
|INSURANCE BINDER                        |
|LIABILITY INSURANCE POLICY              |
|NA                                      |
|CERTIFICATE OF SELF-INSURANCE           |
|CERTIFICATE OF DEPOSIT WITH COMPTROLLER |
|SURETY BOND                             |
|PROOF OF LIABILITY INSURANCE            |
|CERTIFICATE OF DEPOSIT WITH COUNTY JUDGE|
+----------------------------------------+

+-----------------+
|VEH_DMAG_SCL_1_ID|
+-----------------+
|DAMAGED 1 MINIMUM|
|        DAMAGED 2|
|        DAMAGED 3|
|        DAMAGED 4|
|        DAMAGED 5|
|        DAMAGED 6|
|DAMAGED 7 HIGHEST|
|    INVALID VALUE|
|               NA|
|        NO DAMAGE|
+-----------------+



In [0]:
# Considering all types of car insurances as valid except where "FIN_RESP_TYPE_ID"= "NA"

# Count of Distinct Crash IDs where No Damaged Property was observed and Damage Level is above 4 and car avails Insurance
df = df_damages.alias('d').join(df_units.alias('u'), on=["CRASH_ID"], how='inner'). \
            filter(
            ((col('u.VEH_DMAG_SCL_1_ID') > "DAMAGED 4") & (~col('u.VEH_DMAG_SCL_1_ID').isin(["NA", "NO DAMAGE", "INVALID VALUE"]))
            ) | 
            ((col('u.VEH_DMAG_SCL_2_ID') > "DAMAGED 4") & (~col('u.VEH_DMAG_SCL_2_ID').isin(["NA", "NO DAMAGE", "INVALID VALUE"]))
            )). \
            filter(col('d.DAMAGED_PROPERTY') == "NONE"). \
            filter(col('u.FIN_RESP_TYPE_ID') != "NA"). \
            select(countDistinct('u.CRASH_ID').alias("COUNT_OF_DISTINCT_CRASH_IDS"))

df.show()

+---------------------------+
|COUNT_OF_DISTINCT_CRASH_IDS|
+---------------------------+
|                          8|
+---------------------------+



Analysis 10: Determine the Top 5 Vehicle Makes where drivers are charged with speeding related offences, has licensed Drivers, used top 10 used vehicle colours and has car licensed with the Top 25 states with highest number of offences (to be deduced from the data)

In [0]:
# Creating list of the Top 25 states with highest number of offences
top25_offences_state = [row[0] for row in df_units.filter(col("VEH_LIC_STATE_ID").cast("int").isNull()). \
                            groupby("VEH_LIC_STATE_ID").count().orderBy(col("count").desc()).limit(25).collect()]
print("top25_offences_state: ",top25_offences_state)


# Creating list of top 10 used vehicle colours
top10_vehicle_colors = [row[0] for row in df_units.filter(col('VEH_COLOR_ID') != "NA"). \
                        groupby("VEH_COLOR_ID").count().orderBy(col("count").desc()).limit(10).collect()]
print("top10_vehicle_colors: ",top10_vehicle_colors)


# Top 5 Vehicle Makes where drivers are licensed Drivers and charged with speeding related offences, used top 10 used vehicle colours and has car licensed with the Top 25 states with highest number of offences

df = df_charges.join(df_primperson, on=['CRASH_ID'], how='inner'). \
                join(df_units, on=['CRASH_ID'], how='inner'). \
                filter(df_charges.CHARGE.contains("SPEED")). \
                filter(df_primperson.DRVR_LIC_TYPE_ID.isin(["DRIVER LICENSE", "COMMERCIAL DRIVER LIC."])). \
                filter(df_units.VEH_COLOR_ID.isin(top10_vehicle_colors)). \
                filter(df_units.VEH_LIC_STATE_ID.isin(top25_offences_state)). \
                groupby("VEH_MAKE_ID").count(). \
                orderBy(col("count").desc()).limit(5).drop("count")
df.show()                

top25_offences_state:  ['TX', 'NA', 'UN', 'OK', 'LA', 'NM', 'IN', 'MX', 'CA', 'FL', 'IL', 'AR', 'TN', 'MS', 'AZ', 'KS', 'MO', 'GA', 'CO', 'NC', 'AL', 'OH', 'MI', 'MN', 'WI']
top10_vehicle_colors:  ['WHI', 'BLK', 'SIL', 'GRY', 'BLU', 'RED', 'GRN', 'MAR', 'TAN', 'GLD']
+-----------+
|VEH_MAKE_ID|
+-----------+
|       FORD|
|  CHEVROLET|
|     TOYOTA|
|      DODGE|
|     NISSAN|
+-----------+

