In [48]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import *

In [49]:
### INITIALIZING SPARKSESSION

spark = SparkSession \
        .builder \
        .master("local")\
        .appName("CrashAnalysis") \
        .getOrCreate()

sc = spark.sparkContext
sc

In [50]:
### READING CSV FILES INTO DATAFRAME

df_charges = spark.read.option("header", True).option("inferSchema", True).csv("Data/Charges_use.csv")
df_damages = spark.read.option("header", True).option("inferSchema", True).csv("Data/Damages_use.csv")
df_endorse = spark.read.option("header", True).option("inferSchema", True).csv("Data/Endorse_use.csv")
df_primary_person = spark.read.option("header", True).option("inferSchema", True).csv("Data/Primary_Person_use.csv")
df_units = spark.read.option("header", True).option("inferSchema", True).csv("Data/Units_use.csv")
df_restrict = spark.read.option("header", True).option("inferSchema", True).csv("Data/Restrict_use.csv")

In [51]:
# ANALYSIS 1 - Find the number of crashes (accidents) in which number of persons killed are male? 

df = df_primary_person.distinct().filter((upper(col("PRSN_GNDR_ID"))=="MALE") & (upper(col("PRSN_INJRY_SEV_ID"))=="KILLED"))\
                    .select(countDistinct(col("CRASH_ID")).alias("CRASHES_WITH_MALES_KILLED"))


df.show(truncate=False)

                                                                                                                                                    

+-------------------------+
|CRASHES_WITH_MALES_KILLED|
+-------------------------+
|180                      |
+-------------------------+



In [52]:
# ANALYSIS 2 - How many two wheelers are booked for crashes? 

df = df_units.distinct().filter(upper(col("VEH_BODY_STYL_ID")).contains("MOTORCYCLE"))\
             .select(countDistinct(col("VIN")).alias("TWO_WHEELERS_BOOKED_FOR_CRASHES"))


df.show(truncate=False)

+-------------------------------+
|TWO_WHEELERS_BOOKED_FOR_CRASHES|
+-------------------------------+
|766                            |
+-------------------------------+



In [53]:
# ANALYSIS 3 - Which state has highest number of accidents in which females are involved?

df = df_primary_person.distinct().filter((upper(col("PRSN_GNDR_ID"))=="FEMALE") & (~upper(col("DRVR_LIC_STATE_ID")).isin("NA", "UNKNOWN")))\
                     .groupBy(col("DRVR_LIC_STATE_ID")).agg(countDistinct(col("CRASH_ID")).alias("TOTAL_CRASH")).orderBy(col("TOTAL_CRASH").desc()) \
                     .select(col("DRVR_LIC_STATE_ID").alias("STATE_HAVING_HIGHEST_ACCIDENTS_WITH_FEMALE_INVOLVED")).limit(1)

                     
df.show(truncate=False)

+---------------------------------------------------+
|STATE_HAVING_HIGHEST_ACCIDENTS_WITH_FEMALE_INVOLVED|
+---------------------------------------------------+
|Texas                                              |
+---------------------------------------------------+



In [54]:
# ANALYSIS 4 - Which are the Top 5th to 15th VEH_MAKE_IDs that contribute to a largest number of injuries including death?

df = df_units.distinct().filter(upper(col("VEH_MAKE_ID")) != "NA"). \
            withColumn('TOTAL_INJURIES_DEATH_CNT', df_units[35] + df_units[36]). \
            groupBy("VEH_MAKE_ID").sum("TOTAL_INJURIES_DEATH_CNT"). \
            withColumnRenamed("sum(TOTAL_INJURIES_DEATH_CNT)", "TOTAL_INJURIES_DEATH_CNT_AGG"). \
            orderBy(col("TOTAL_INJURIES_DEATH_CNT_AGG").desc())

window_spec_veh_make = Window.orderBy(col("TOTAL_INJURIES_DEATH_CNT_AGG").desc())
df = df.withColumn("rank", rank().over(window_spec_veh_make)).select(col("VEH_MAKE_ID"), col("RANK")).filter( (col("rank")>= 5) & (col("rank")<= 15))


df.show(truncate=False)

+-----------+----+
|VEH_MAKE_ID|RANK|
+-----------+----+
|NISSAN     |5   |
|HONDA      |6   |
|GMC        |7   |
|HYUNDAI    |8   |
|KIA        |9   |
|JEEP       |10  |
|CHRYSLER   |11  |
|MAZDA      |12  |
|VOLKSWAGEN |13  |
|PONTIAC    |14  |
|LEXUS      |15  |
+-----------+----+



In [55]:
# ANALYSIS 5 - For all the body styles involved in crashes, mention the top ethnic user group of each unique body style?

w = Window.partitionBy(col("VEH_BODY_STYL_ID")).orderBy(col("count").desc())
df_primary_person_distinct= df_primary_person.distinct()
df_units_distinct= df_units.distinct()
df = df_units_distinct.join(df_primary_person_distinct, "CRASH_ID", how='inner'). \
            filter(~upper(col("VEH_BODY_STYL_ID")).isin(["NA", "UNKNOWN", "NOT REPORTED",
                                                         "OTHER  (EXPLAIN IN NARRATIVE)"])). \
            filter(~upper(col("PRSN_ETHNICITY_ID")).isin(["NA", "UNKNOWN"])). \
            groupby(col("VEH_BODY_STYL_ID"), col("PRSN_ETHNICITY_ID")).count(). \
            withColumn("row", row_number().over(w)).filter(col("row") == 1).drop("row", "count")

            
df.show(truncate=False)

+---------------------------------+-----------------+
|VEH_BODY_STYL_ID                 |PRSN_ETHNICITY_ID|
+---------------------------------+-----------------+
|AMBULANCE                        |WHITE            |
|BUS                              |HISPANIC         |
|FARM EQUIPMENT                   |WHITE            |
|FIRE TRUCK                       |WHITE            |
|MOTORCYCLE                       |WHITE            |
|NEV-NEIGHBORHOOD ELECTRIC VEHICLE|WHITE            |
|PASSENGER CAR, 2-DOOR            |WHITE            |
|PASSENGER CAR, 4-DOOR            |WHITE            |
|PICKUP                           |WHITE            |
|POLICE CAR/TRUCK                 |WHITE            |
|POLICE MOTORCYCLE                |HISPANIC         |
|SPORT UTILITY VEHICLE            |WHITE            |
|TRUCK                            |WHITE            |
|TRUCK TRACTOR                    |WHITE            |
|VAN                              |WHITE            |
|YELLOW SCHOOL BUS          

In [56]:
# ANALYSIS 6 - Among the crashed cars, what are the Top 5 Zip Codes with highest number crashes with alcohols as the 
#contributing factor to a crash (Use Driver Zip Code)?

car_crash_check = instr(upper(col("VEH_BODY_STYL_ID")) , "CAR") >=1
df_units_crash_include_car = df_units.withColumn("CAR_FLG", car_crash_check)\
                    .filter(col("CAR_FLG")).select(col("CRASH_ID")).distinct()

win_spec = Window.orderBy(col("NUM_TOT_CRASHES").desc())
df = df_primary_person.join(df_units_crash_include_car, "CRASH_ID", "inner").where((upper(col("PRSN_ALC_RSLT_ID")) == "POSITIVE") & (col("DRVR_ZIP").isNotNull()))\
              .groupBy(col("DRVR_ZIP")).agg(countDistinct("CRASH_ID").alias("NUM_TOT_CRASHES"))\
              .withColumn("rank", rank().over(win_spec))

df = df.where(col("rank")<=5).select(col("DRVR_ZIP"), col("RANK"))


df.show(truncate=False)


+--------+----+
|DRVR_ZIP|RANK|
+--------+----+
|78521   |1   |
|76010   |2   |
|79936   |3   |
|79938   |4   |
|78550   |5   |
|78240   |5   |
+--------+----+



In [57]:
# ANALYSIS 7 - Count of Distinct Crash IDs where No Damaged Property was observed and Damage Level (VEH_DMAG_SCL~) is above 4 and car avails Insurance?

insurance_check = instr(col("FIN_RESP_TYPE_ID") , "INSURANCE") >=1

df_units_ins_dmg = df_units.distinct()\
        .withColumn("VEH_DMG_SCL_1",regexp_extract(col('VEH_DMAG_SCL_1_id'), r'(\d+)', 1).cast('bigint'))\
        .withColumn("VEH_DMG_SCL_2",regexp_extract(col('VEH_DMAG_SCL_2_id'), r'(\d+)', 1).cast('bigint'))\
        .withColumn("INSURANCE_FLG", insurance_check)\
        .select(col("CRASH_ID"),col('VEH_DMG_SCL_1'), col("VEH_DMG_SCL_2"), col("INSURANCE_FLG"),col("FIN_RESP_PROOF_ID"))\
        .filter((~col("FIN_RESP_PROOF_ID").isin("NA","NR")) & ((col("VEH_DMG_SCL_1") >4) | (col("VEH_DMG_SCL_2") >4) ) & (col("INSURANCE_FLG")))

df_damage_filter = df_damages.distinct().filter(~upper(col("DAMAGED_PROPERTY")).isin("NONE", "NONE1"))
df = df_units_ins_dmg.join(df_damage_filter, "CRASH_ID", "leftanti")

df = df.select(countDistinct(col('CRASH_ID')).alias('DISTINCT_CRASH_IDS_WITH_NODAMAGEPROPERTY'))


df.show(truncate=False)

+----------------------------------------+
|DISTINCT_CRASH_IDS_WITH_NODAMAGEPROPERTY|
+----------------------------------------+
|8857                                    |
+----------------------------------------+



In [58]:
# ANALYSIS 8 - Determine the Top 5 Vehicle Makes where drivers are charged with speeding related offences, has licensed Drivers,
# used top 10 used vehicle colours and has car licensed with the Top 25 states with highest number of offences (to be deduced from the data)?

top_25_state = [row[0] for row in df_units.distinct().filter(col("VEH_LIC_STATE_ID").cast("int").isNull())\
            .groupBy("VEH_LIC_STATE_ID").agg(countDistinct(col("CRASH_ID")).alias("Distinct_CNT")).orderBy(col("Distinct_CNT").desc()).limit(25).collect()]
top_10_used_colors = [row[0] for row in df_units.distinct().filter(df_units.VEH_COLOR_ID != "NA")\
            .groupBy("VEH_COLOR_ID").agg(countDistinct(col("CRASH_ID")).alias("Distinct_CNT")).orderBy(col("Distinct_CNT").desc()).collect()]

df_charges_filter = df_charges.distinct()
df_primary_person_filter = df_primary_person.distinct()
df_units_filter = df_units.distinct()
df = df_charges_filter.join(df_primary_person_filter, on=["CRASH_ID"], how='inner') \
            .join(df_units_filter, on=['CRASH_ID'], how='inner') \
            .filter(col("CHARGE").contains("SPEED")) \
            .filter(col("DRVR_LIC_TYPE_ID").isin(["DRIVER LICENSE", "COMMERCIAL DRIVER LIC."])) \
            .filter(col("VEH_COLOR_ID").isin(top_10_used_colors)) \
            .filter(col("VEH_LIC_STATE_ID").isin(top_25_state)) \
            .groupBy("VEH_MAKE_ID").count() \
            .orderBy(col("count").desc()).limit(5).select(col("VEH_MAKE_ID"))


df.show(truncate=False)

+-----------+
|VEH_MAKE_ID|
+-----------+
|FORD       |
|CHEVROLET  |
|TOYOTA     |
|DODGE      |
|NISSAN     |
+-----------+

