In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.window import Window
from src.main.data_loading import DataLoadingClass

spark = SparkSession.builder.appName("Case Study Analysis").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/28 19:42:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [47]:
data_loader = DataLoadingClass(spark, "local")
charges_df = data_loader.get_charges()
primary_person_df = data_loader.get_primary_person()
units_df = data_loader.get_units()
damages_df = data_loader.get_damages()
restrict_df = data_loader.get_restrict()
endorsements_df = data_loader.get_endorse()

[32m2024-10-28 20:43:06.467[0m | [1mINFO    [0m | [36msrc.utils.config_parser[0m:[36mread_config_file[0m:[36m19[0m - [1mReading config...[0m
[32m2024-10-28 20:43:06.472[0m | [1mINFO    [0m | [36msrc.main.data_loading[0m:[36m_load_data[0m:[36m48[0m - [1mTrying to read Charges_use.csv from ./data/input_data[0m
[32m2024-10-28 20:43:06.472[0m | [1mINFO    [0m | [36msrc.main.data_loading[0m:[36m_load_data[0m:[36m48[0m - [1mTrying to read Charges_use.csv from ./data/input_data[0m
[32m2024-10-28 20:43:06.801[0m | [1mINFO    [0m | [36msrc.main.data_loading[0m:[36m_load_data[0m:[36m48[0m - [1mTrying to read Primary_Person_use.csv from ./data/input_data[0m
[32m2024-10-28 20:43:07.010[0m | [1mINFO    [0m | [36msrc.main.data_loading[0m:[36m_load_data[0m:[36m48[0m - [1mTrying to read Units_use.csv from ./data/input_data[0m
[32m2024-10-28 20:43:07.274[0m | [1mINFO    [0m | [36msrc.main.data_loading[0m:[36m_load_data[0m:[36m48[0m -

In [3]:
# 1. Find the number of crashes (accidents) where the number of males killed is greater than 2.
male_fatalities_df = primary_person_df.where(f.expr("PRSN_INJRY_SEV_ID == 'KILLED' and PRSN_GNDR_ID == 'MALE'"))
male_fatality_count_df = male_fatalities_df.groupBy("CRASH_ID").agg(f.sum("DEATH_CNT").alias("total_male_deaths"))
crashes_with_male_fatalities = male_fatality_count_df.filter(f.col("total_male_deaths") > 2)
crashes_with_male_fatalities.show()

+--------+-----------------+
|CRASH_ID|total_male_deaths|
+--------+-----------------+
+--------+-----------------+



In [4]:
# 2. Count how many two-wheelers are booked for crashes.
motorcyle_crashes = units_df.filter(f.expr("VEH_BODY_STYL_ID == 'POLICE MOTORCYCLE' or VEH_BODY_STYL_ID == 'MOTORCYCLE' "))
motorcyle_crashes.groupBy("CRASH_ID").agg(f.count("VEH_BODY_STYL_ID").alias("vehicle_count")).filter(f.col("vehicle_count") > 1).show(5)



+--------+-------------+
|CRASH_ID|vehicle_count|
+--------+-------------+
|14898984|            2|
|14926725|            2|
|14916528|            2|
|14932523|            3|
|14947649|            2|
+--------+-------------+
only showing top 5 rows



                                                                                

In [5]:
# 3. Determine the Top 5 Vehicle Makes of cars involved in crashes where the driver died and airbags did not deploy
fatalities_df = primary_person_df.where(f.expr("PRSN_INJRY_SEV_ID == 'KILLED' and PRSN_AIRBAG_ID == 'NOT DEPLOYED' and PRSN_TYPE_ID == 'DRIVER'"))
filtered_units_df = units_df.filter(f.col("VEH_BODY_STYL_ID").isin(["PICKUP", "SPORT UTILITY VEHICLE", "PASSENGER CAR, 2-DOOR", "PASSENGER CAR, 4-DOOR", "TRUCK", "POLICE CAR/TRUCK"]))
fatalities_unit_df = fatalities_df.join(filtered_units_df, on=["CRASH_ID", "UNIT_NBR"], how='inner')
fatalities_unit_df.groupBy("VEH_MAKE_ID").agg(f.count("VEH_MAKE_ID").alias("VEH_MAKE_COUNT")).orderBy(f.desc(f.col("VEH_MAKE_COUNT"))).show(5)

+-----------+--------------+
|VEH_MAKE_ID|VEH_MAKE_COUNT|
+-----------+--------------+
|  CHEVROLET|             8|
|       FORD|             6|
|     NISSAN|             5|
|      DODGE|             2|
|      HONDA|             2|
+-----------+--------------+
only showing top 5 rows



In [6]:
# 4: Determine number of Vehicles with driver having valid licences involved in hit and run?
filtered_primary_person_df = primary_person_df.filter(f.expr("DRVR_LIC_TYPE_ID == 'DRIVER LICENSE' and PRSN_TYPE_ID == 'DRIVER' "))
filtered_units_df = units_df.filter(f.expr("VEH_HNR_FL == 'Y' "))
joined_df= filtered_primary_person_df.join(filtered_units_df, on=["CRASH_ID", "UNIT_NBR"])
joined_df.count()

2489

In [7]:
# 5: Which state has highest number of accidents in which females are not involved
female_crashes_df = primary_person_df.where(f.expr("PRSN_GNDR_ID != 'FEMALE'"))
female_crashes_units_df = female_crashes_df.join(units_df, on=["CRASH_ID", "UNIT_NBR"])
female_crashes_units_df.groupBy("VEH_LIC_STATE_ID").agg(f.count("VEH_LIC_STATE_ID").alias("VEH_LIC_STATE_COUNT")).orderBy(f.desc("VEH_LIC_STATE_COUNT")).show(1)


+----------------+-------------------+
|VEH_LIC_STATE_ID|VEH_LIC_STATE_COUNT|
+----------------+-------------------+
|              TX|              94472|
+----------------+-------------------+
only showing top 1 row



In [8]:
# 6: Which are the Top 3rd to 5th VEH_MAKE_IDs that contribute to a largest number of injuries including death
injuries_df = units_df.groupBy("VEH_MAKE_ID").agg(f.sum(f.col("TOT_INJRY_CNT") + f.col("DEATH_CNT")).alias("total_injuries"))
injuries_df = injuries_df.withColumn("dummy_col_for_partition", f.lit(1)) # To avoid - WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
window_fn = Window.orderBy(f.col("total_injuries").desc()).partitionBy("dummy_col_for_partition")

injuries_df.withColumn("row_num", f.row_number().over(window_fn)).filter(f.col("row_num").between(3, 5)).show(5)

+-----------+--------------+-----------------------+-------+
|VEH_MAKE_ID|total_injuries|dummy_col_for_partition|row_num|
+-----------+--------------+-----------------------+-------+
|     TOYOTA|          4228|                      1|      3|
|      DODGE|          3146|                      1|      4|
|     NISSAN|          3118|                      1|      5|
+-----------+--------------+-----------------------+-------+



In [9]:
# 7. Analysis 7: For all the body styles involved in crashes, mention the top ethnic user group of each unique body style
units_persons_df = units_df.join(primary_person_df, on=["CRASH_ID", "UNIT_NBR"])
units_persons_df = units_persons_df.select("VEH_BODY_STYL_ID", "PRSN_ETHNICITY_ID").filter(~f.col("VEH_BODY_STYL_ID").isin(["NA", "UNKNOWN"]))
ethnicity_counts = units_persons_df.groupBy("VEH_BODY_STYL_ID", "PRSN_ETHNICITY_ID").agg(f.count("*").alias("ethnicity_count"))

window_fn = Window.partitionBy("VEH_BODY_STYL_ID").orderBy(f.desc("ethnicity_count"))
top_ethnic_group = ethnicity_counts.withColumn("rank", f.row_number().over(window_fn)) \
                                   .filter(f.col("rank") == 1) \
                                   .select("VEH_BODY_STYL_ID", "PRSN_ETHNICITY_ID", "ethnicity_count").orderBy(f.col("VEH_BODY_STYL_ID"), f.col("PRSN_ETHNICITY_ID"), f.desc(f.col("ethnicity_count")))
top_ethnic_group.show()

+--------------------+-----------------+---------------+
|    VEH_BODY_STYL_ID|PRSN_ETHNICITY_ID|ethnicity_count|
+--------------------+-----------------+---------------+
|           AMBULANCE|            WHITE|             55|
|                 BUS|         HISPANIC|            158|
|      FARM EQUIPMENT|            WHITE|             27|
|          FIRE TRUCK|            WHITE|             64|
|          MOTORCYCLE|            WHITE|            499|
|NEV-NEIGHBORHOOD ...|            WHITE|              5|
|        NOT REPORTED|            WHITE|              2|
|OTHER  (EXPLAIN I...|            WHITE|            216|
|PASSENGER CAR, 2-...|            WHITE|           4716|
|PASSENGER CAR, 4-...|            WHITE|          25351|
|              PICKUP|            WHITE|          19117|
|    POLICE CAR/TRUCK|            WHITE|            182|
|   POLICE MOTORCYCLE|            WHITE|              2|
|SPORT UTILITY VEH...|            WHITE|          15497|
|               TRUCK|         

In [20]:
# 8: Among the crashed cars, what are the Top 5 Zip Codes with highest number crashes with alcohols as the contributing factor to a crash (Use Driver Zip Code)

alcohol_crash_cars_df = units_df.filter(
    (f.col("VEH_BODY_STYL_ID").isin(["PICKUP", "SPORT UTILITY VEHICLE", "PASSENGER CAR, 2-DOOR", "PASSENGER CAR, 4-DOOR", "TRUCK", "POLICE CAR/TRUCK"])) &
    (f.col("CONTRIB_FACTR_1_ID").isin(["HAD BEEN DRINKING", "UNDER INFLUENCE - ALCOHOL"]) | f.col("CONTRIB_FACTR_2_ID").isin(["HAD BEEN DRINKING", "UNDER INFLUENCE - ALCOHOL"]))
)
alcohol_crash_cars_df.count()

car_alcohol_crashes_with_zip_df = alcohol_crash_cars_df.join(
    primary_person_df, 
    on=["CRASH_ID", "UNIT_NBR"],
    how="inner"
).select("DRVR_ZIP").filter(f.col("DRVR_ZIP").isNotNull())

top_zip_codes_df = car_alcohol_crashes_with_zip_df.groupBy("DRVR_ZIP").count().orderBy(f.desc("count")).limit(5).show()



+--------+-----+
|DRVR_ZIP|count|
+--------+-----+
|   78521|   51|
|   76010|   50|
|   75067|   44|
|   78741|   42|
|   78550|   39|
+--------+-----+



                                                                                

In [33]:
# 9. Analysis 9: Count of Distinct Crash IDs where No Damaged Property was observed and Damage Level (VEH_DMAG_SCL~) is above 4 and car avails Insurance 
unit_with_damage_df = units_df.join(damages_df, on="CRASH_ID", how="left")

filtered_crashes_df = unit_with_damage_df.filter(
    # No other damaged property
    (f.col("DAMAGED_PROPERTY").isNull()) &
    
    # Damage level above 4
    ((f.col("VEH_DMAG_SCL_1_ID").isin(["DAMAGED 5", "DAMAGED 6", "DAMAGED 7 HIGHEST"])) | (f.col("VEH_DMAG_SCL_2_ID").isin(["DAMAGED 5", "DAMAGED 6", "DAMAGED 7 HIGHEST"]))) &
    
    # Car avails insurance
    (f.col("FIN_RESP_TYPE_ID") != 'NA')
)
print(filtered_crashes_df.count())
filtered_crashes_df.select("CRASH_ID").distinct().count()

11430


                                                                                

8852

In [48]:
# 10: Determine the Top 5 Vehicle Makes where drivers are charged with speeding related offences, has licensed Drivers, used top 10 used vehicle colours and has car licensed with the Top 25 states with highest number of offences (to be deduced from the data)
speeding_charges_df = charges_df.filter(f.expr("CHARGE LIKE '%SPEED%'"))

licensed_drivers_df = speeding_charges_df.join(endorsements_df, ["CRASH_ID", "UNIT_NBR"], "inner") \
                                 .join(restrict_df, ["CRASH_ID", "UNIT_NBR"], "inner")

top_colors_df = units_df.groupBy("VEH_COLOR_ID").count() \
                        .orderBy(f.col("count").desc()).limit(10) \
                        .select("VEH_COLOR_ID").collect()
top_colors = [row["VEH_COLOR_ID"] for row in top_colors_df]

top_states_df = speeding_charges_df.join(units_df, ["CRASH_ID", "UNIT_NBR"], "inner") \
                           .groupBy("VEH_LIC_STATE_ID").count() \
                           .orderBy(f.col("count").desc()).limit(25) \
                           .select("VEH_LIC_STATE_ID").collect()  

top_states = [row["VEH_LIC_STATE_ID"] for row in top_states_df]  # List of top 25 states

filtered_units_df = units_df.filter(
    (f.col("VEH_COLOR_ID").isin(top_colors)) & 
    (f.col("VEH_LIC_STATE_ID").isin(top_states))
)
filtered_units_df.count()

151340

In [49]:
result_df = licensed_drivers_df.join(filtered_units_df, ["CRASH_ID", "UNIT_NBR"], "inner") \
                               .groupBy("VEH_MAKE_ID") \
                               .count() \
                               .withColumn("rank", f.rank().over(Window.orderBy(f.col("count").desc()))) \
                               .filter(f.col("rank") <= 5)

result_df.select("VEH_MAKE_ID", "count").show()

24/10/28 20:43:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/10/28 20:43:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/10/28 20:43:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/10/28 20:43:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/10/28 20:43:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/10/28 20:43:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/10/28 2

+-----------+-----+
|VEH_MAKE_ID|count|
+-----------+-----+
|       FORD| 4944|
|  CHEVROLET| 4302|
|     TOYOTA| 2394|
|      DODGE| 2216|
|      HONDA| 1573|
+-----------+-----+



24/10/28 20:43:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/10/28 20:43:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/10/28 20:43:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/10/28 20:43:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
