In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import*
from pyspark.sql.functions import col, sum, countDistinct, desc, when, expr, row_number
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("Crash Analytics") \
    .getOrCreate()

units_df = spark.read.csv('Files/case1/Units_use.csv', header=True, inferSchema=True)
charges_df = spark.read.csv('Files/case1/Charges_use.csv', header=True, inferSchema=True)
endorse_df = spark.read.csv('Files/case1/Endorse_use.csv', header=True, inferSchema=True)
primary_person_df = spark.read.csv('Files/case1/Primary_Person_use.csv', header=True, inferSchema=True)
damage_df = spark.read.csv('Files/case1/Damages_use.csv', header=True, inferSchema=True)
restrict_df = spark.read.csv('Files/case1/Restrict_use.csv', header=True, inferSchema=True)


StatementMeta(, 786c03ce-838d-4484-9e86-67fcdead35b5, 15, Finished, Available, Finished)

In [12]:
# Analysis 1: Crashes where more than 2 males were killed
result1 = primary_person_df.filter((col('PRSN_GNDR_ID') == 'MALE') & (col('DEATH_CNT') > 0)) \
    .groupBy('CRASH_ID') \
    .agg(sum('DEATH_CNT').alias('MALE_DEATH_CNT')) \
    .filter(col('MALE_DEATH_CNT') > 2) \
    .count()

print(f"Analysis 1: {result1} crashes where more than 2 males were killed.")

StatementMeta(, 786c03ce-838d-4484-9e86-67fcdead35b5, 16, Finished, Available, Finished)

Analysis 1: 0 crashes where more than 2 males were killed.


In [13]:
# Analysis 2: Crashes involving two-wheelers
result2 = units_df.filter(col('VEH_BODY_STYL_ID').rlike('(?i)MOTORCYCLE|SCOOTER')) \
    .select('CRASH_ID') \
    .distinct() \
    .count()

print(f"Analysis 2: {result2} crashes involving two-wheelers.")

StatementMeta(, 786c03ce-838d-4484-9e86-67fcdead35b5, 17, Finished, Available, Finished)

Analysis 2: 757 crashes involving two-wheelers.


In [14]:
# Analysis 3: Top 5 Vehicle Makes where driver died and airbags did not deploy
result3 = units_df.join(
    primary_person_df.filter((col('DEATH_CNT') > 0) & (col('PRSN_TYPE_ID') == 'DRIVER')),
    ['CRASH_ID', 'UNIT_NBR']
).filter(col('PRSN_AIRBAG_ID').rlike('(?i)NOT DEPLOYED')) \
.groupBy('VEH_MAKE_ID') \
.count() \
.orderBy(desc('count')) \
.limit(5)

print("Analysis 3: Top 5 Vehicle Makes where driver died and airbags did not deploy")
result3.show()

StatementMeta(, 786c03ce-838d-4484-9e86-67fcdead35b5, 18, Finished, Available, Finished)

Analysis 3: Top 5 Vehicle Makes where driver died and airbags did not deploy
+-----------+-----+
|VEH_MAKE_ID|count|
+-----------+-----+
|  CHEVROLET|    8|
|       FORD|    6|
|     NISSAN|    5|
|      DODGE|    2|
|      HONDA|    2|
+-----------+-----+



In [15]:
# Analysis 4: Valid license drivers involved in hit-and-run crashes (HIT AND RUN anywhere in the CHARGE column)

result4 = primary_person_df.join(
    charges_df.filter(col('CHARGE').rlike('(?i).*HIT AND RUN*')),  # HIT AND RUN anywhere in the CHARGE
    on='CRASH_ID'
).filter(col('DRVR_LIC_TYPE_ID') == 'DRIVER LICENSE').select('CRASH_ID') \
 .distinct() \
 .count()

print(f"Analysis 4: {result4} valid license drivers involved in hit-and-run crashes.")


StatementMeta(, 786c03ce-838d-4484-9e86-67fcdead35b5, 19, Finished, Available, Finished)

Analysis 4: 39 valid license drivers involved in hit-and-run crashes.


In [16]:
# Analysis 5: State with highest crashes involving no females
result5 = primary_person_df.filter(col('PRSN_GNDR_ID') != 'FEMALE') \
    .groupBy('CRASH_ID', 'DRVR_LIC_STATE_ID') \
    .count() \
    .orderBy(desc('count')) \
    .limit(5)

print(f"Analysis 5: Top states without females")
result5.show()

StatementMeta(, 786c03ce-838d-4484-9e86-67fcdead35b5, 20, Finished, Available, Finished)

Analysis 5: Top states without females
+--------+-----------------+-----+
|CRASH_ID|DRVR_LIC_STATE_ID|count|
+--------+-----------------+-----+
|14918153|            Texas|    9|
|15146770|            Texas|    9|
|15460374|            Texas|    8|
|15085120|            Texas|    6|
|14904113|            Texas|    6|
+--------+-----------------+-----+



In [17]:
# Analysis 6: Top 3-5 Vehicles contributing to injuries
result6 = units_df.groupBy('VEH_MAKE_ID').agg(sum('TOT_INJRY_CNT').alias('InjuryCount')) \
    .orderBy(desc('InjuryCount')) \
    .limit(5)

print(f"Analysis 6: Top 3 to 5 vehicles contributing to injuries")
result6.show()

StatementMeta(, 786c03ce-838d-4484-9e86-67fcdead35b5, 21, Finished, Available, Finished)

Analysis 6: Top 3 to 5 vehicles contributing to injuries
+-----------+-----------+
|VEH_MAKE_ID|InjuryCount|
+-----------+-----------+
|  CHEVROLET|       6986|
|       FORD|       6949|
|     TOYOTA|       4208|
|      DODGE|       3134|
|     NISSAN|       3097|
+-----------+-----------+



In [18]:
# Analysis 7: Top Ethnic group for all body styles
result7 = units_df.join(primary_person_df, ['CRASH_ID', 'UNIT_NBR']) \
    .groupBy('VEH_BODY_STYL_ID', 'PRSN_ETHNICITY_ID') \
    .count() \
    .withColumn("row", row_number().over(Window.partitionBy("VEH_BODY_STYL_ID").orderBy(desc("count")))) \
    .filter("row == 1") \
    .drop("row") \
    .orderBy(desc("VEH_BODY_STYL_ID"))

print(f"Analysis 7: Top Ethnic group for all body styles")
result7.show()

StatementMeta(, 786c03ce-838d-4484-9e86-67fcdead35b5, 22, Finished, Available, Finished)

Analysis 7: Top Ethnic group for all body styles
+--------------------+-----------------+-----+
|    VEH_BODY_STYL_ID|PRSN_ETHNICITY_ID|count|
+--------------------+-----------------+-----+
|   YELLOW SCHOOL BUS|            BLACK|  106|
|                 VAN|            WHITE| 2212|
|             UNKNOWN|          UNKNOWN|  843|
|       TRUCK TRACTOR|            WHITE| 2560|
|               TRUCK|            WHITE| 1814|
|SPORT UTILITY VEH...|            WHITE|15497|
|   POLICE MOTORCYCLE|            WHITE|    2|
|    POLICE CAR/TRUCK|            WHITE|  182|
|              PICKUP|            WHITE|19117|
|PASSENGER CAR, 4-...|            WHITE|25351|
|PASSENGER CAR, 2-...|            WHITE| 4716|
|OTHER  (EXPLAIN I...|            WHITE|  216|
|        NOT REPORTED|            WHITE|    2|
|NEV-NEIGHBORHOOD ...|            WHITE|    5|
|                  NA|            WHITE|  294|
|          MOTORCYCLE|            WHITE|  499|
|          FIRE TRUCK|            WHITE|   64|
|      FARM

In [19]:
# Analysis 8: Top 5 Zip Codes with highest number of crashes involving alcohol (excluding nulls)

result8 = primary_person_df.filter(
    (col('PRSN_ALC_RSLT_ID').rlike('(?i)positive')) &  # Filter for positive alcohol results
    (col('PRSN_ALC_RSLT_ID').isNotNull()) &            # Exclude null alcohol results
    (col('DRVR_ZIP').isNotNull())                      # Exclude null ZIP codes
).groupBy('DRVR_ZIP') \
 .count() \
 .orderBy(desc('count')) \
 .limit(5)

print("Analysis 8: Top 5 ZIP Codes with Alcohol-Related Crashes (Excluding Nulls)")
result8.show()

StatementMeta(, 786c03ce-838d-4484-9e86-67fcdead35b5, 23, Finished, Available, Finished)

Analysis 8: Top 5 ZIP Codes with Alcohol-Related Crashes (Excluding Nulls)
+--------+-----+
|DRVR_ZIP|count|
+--------+-----+
|   78521|   62|
|   76010|   48|
|   79936|   42|
|   79938|   37|
|   79907|   34|
+--------+-----+



In [20]:
# Analysis 9: Count of Distinct Crash IDs where No Damaged Property was observed and Damage Level>4


merged_df = units_df.join(damage_df.select("CRASH_ID", "DAMAGED_PROPERTY"), 
                          on="CRASH_ID", how="left")

no_damaged_property_df = merged_df.filter(col("DAMAGED_PROPERTY").isNull())

distinct_crash_ids_count = no_damaged_property_df.withColumn('DMAG1_RANGE',regexp_extract(col('VEH_DMAG_SCL_1_ID'), "\\d+", 0)) \
	                        .withColumn('DMAG2_RANGE',regexp_extract(col('VEH_DMAG_SCL_2_ID'), "\\d+", 0)) \
	                        .filter("DMAG1_RANGE > 4 or DMAG2_RANGE > 4") \
	                        .select('CRASH_ID').distinct().count()


print(f"Count of distinct CRASH_IDs: {distinct_crash_ids_count}")


StatementMeta(, 786c03ce-838d-4484-9e86-67fcdead35b5, 24, Finished, Available, Finished)

Count of distinct CRASH_IDs: 9663


In [21]:
# Analysis 10:Top vehicle makes involved in speeding accidents, filtered by color and state

top_colors = (
    units_df.groupBy("VEH_COLOR_ID")
    .count()
    .orderBy(desc("count"))
    .limit(10)
    .select("VEH_COLOR_ID")
    .rdd.flatMap(lambda x: x)
    .collect()
)

top_states = (
    units_df.groupBy("VEH_LIC_STATE_ID")
    .agg(count("CRASH_ID").alias("crash_count"))
    .orderBy(desc("crash_count"))
    .limit(25)
    .select("VEH_LIC_STATE_ID")
    .rdd.flatMap(lambda x: x)
    .collect()
)

merged_df = units_df.join(charges_df, on="CRASH_ID", how="left")

filtered_df = (
    merged_df.filter(
        (col("CHARGE").contains("SPEED")) &
        (col("VEH_COLOR_ID").isin(top_colors)) &
        (col("VEH_LIC_STATE_ID").isin(top_states))
    )
)
top_vehicle_makes = (
    filtered_df.groupBy("VEH_MAKE_ID")
    .count()
    .orderBy(desc("count"))
    .limit(5)
)

top_vehicle_makes.show()

StatementMeta(, 786c03ce-838d-4484-9e86-67fcdead35b5, 25, Finished, Available, Finished)

+-----------+-----+
|VEH_MAKE_ID|count|
+-----------+-----+
|       FORD| 9162|
|  CHEVROLET| 8051|
|     TOYOTA| 5078|
|      DODGE| 3902|
|     NISSAN| 3348|
+-----------+-----+

