## Installing Required Libraries

In [1]:

# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark

In [2]:
!pip install pyspark



In [3]:
import findspark
findspark.init()

## Starting Spark Session

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("CarCrashAnalysis")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [5]:
from zipfile import ZipFile

In [6]:
# loading the temp.zip and creating a zip object
with ZipFile("Data.zip", 'r') as zObject:

	# Extracting all the members of the zip
	# into a specific location.
	zObject.extractall(path="/Unzipped/")

## Loading data into Data Frames

In [7]:
df_charges_use = spark.read.option("header",True).csv("/Unzipped/Data/Charges_use.csv")
df_damages_use = spark.read.option("header",True).csv("/Unzipped/Data/Damages_use.csv")
df_endorse_use = spark.read.option("header",True).csv("/Unzipped/Data/Endorse_use.csv")
df_primary_person_use = spark.read.option("header",True).csv("/Unzipped/Data/Primary_Person_use.csv")
df_restrict_use = spark.read.option("header",True).csv("/Unzipped/Data/Restrict_use.csv")
df_units_use = spark.read.option("header",True).csv("/Unzipped/Data/Units_use.csv")

## Defining Reusable Functions

In [8]:

def deduplicate_df(dup_df):
  """
  De-duplicates a given dataframe
  :param dup_df: The DataFrame to be de-duplicated
  :return: De-duplicated dataframe
  """
  df = dup_df.dropDuplicates()
  return df

def load_csv_data_to_df(spark, file_path):
    """
    Read CSV data
    :param spark: spark instance
    :param file_path: path to the csv file
    :return: dataframe
    """
    return spark.read.option("inferSchema", "true").csv(file_path, header=True)

def extract_zipped_content(zipped_file_path, unzipped_files_path):
  """
  Extracts all the files present within a zipped folder to another location
  :param zipped_file_path: Location to the Zipped file
  :param unzipped_files_path: Location to the Extracted files
  """
  # loading the temp.zip and creating a zip object
  with ZipFile(f"{zipped_file_path}", 'r') as zobject:

	# Extracting all the members of the zip
	# into a specific location.
	    zobject.extractall(path=f"{unzipped_files_path}")
  print("Unzipping files successfully to : ",unzipped_files_path)



In [9]:
extract_zipped_content("/content/Data.zip","/content/Unzipped/")

Unzipping files successfully to :  /content/Unzipped/


In [10]:
df_charges_use = df_charges_use.dropDuplicates()
df_damages_use = df_damages_use.dropDuplicates()
df_endorse_use = df_endorse_use.dropDuplicates()
df_primary_person_use = df_primary_person_use.dropDuplicates()
df_restrict_use = df_restrict_use.dropDuplicates()
df_units_use = df_units_use.dropDuplicates()

## Seeing a glimpse of the data

In [11]:
df_charges_use.show(5,False)

+--------+--------+--------+-------------------------------------+------------+
|CRASH_ID|UNIT_NBR|PRSN_NBR|CHARGE                               |CITATION_NBR|
+--------+--------+--------+-------------------------------------+------------+
|14839048|1       |1       |EVADING IN  A VEHICLE                |2016-000008 |
|14842539|1       |1       |FAILURE TO CONTROL SPEED (ACC)       |138474673   |
|14845037|1       |1       |DRIVERS LICENSE RESTRICTION VIOLATION|M105252B    |
|14845764|1       |1       |FAIL TO DRIVE IN SINGLE LANE         |008388006   |
|14845973|1       |1       |NO DRIVER'S LICENSE WHEN UNLICENSED  |TX4IEN0FTMG3|
+--------+--------+--------+-------------------------------------+------------+
only showing top 5 rows



In [12]:
df_damages_use.show(5,False)

+--------+--------------------+
|CRASH_ID|DAMAGED_PROPERTY    |
+--------+--------------------+
|14852553|CABLE BARRIER       |
|14853703|LANDSCAPING         |
|14862814|75 FT. OF GUARD RAIL|
|14867689|CABLE LINES         |
|14873013|LIGHT POLE          |
+--------+--------------------+
only showing top 5 rows



In [13]:
df_endorse_use.show(5,False)

+--------+--------+------------------+
|CRASH_ID|UNIT_NBR|DRVR_LIC_ENDORS_ID|
+--------+--------+------------------+
|14842529|2       |NONE              |
|14843083|1       |UNLICENSED        |
|14844479|1       |UNLICENSED        |
|14845535|1       |NONE              |
|14846619|2       |NONE              |
+--------+--------+------------------+
only showing top 5 rows



In [14]:
df_primary_person_use.show(5,False)

+--------+--------+--------+------------+------------------+---------------------+--------+-----------------+------------+------------+-------------------+---------------+--------------+-----------+---------------------+----------------+------------------+---------------------+----------------+-----------------+---------------+---------------+------------------+--------------+-------------+--------------+-------------+---------+----------------------+-----------------+---------------+--------+
|CRASH_ID|UNIT_NBR|PRSN_NBR|PRSN_TYPE_ID|PRSN_OCCPNT_POS_ID|PRSN_INJRY_SEV_ID    |PRSN_AGE|PRSN_ETHNICITY_ID|PRSN_GNDR_ID|PRSN_EJCT_ID|PRSN_REST_ID       |PRSN_AIRBAG_ID |PRSN_HELMET_ID|PRSN_SOL_FL|PRSN_ALC_SPEC_TYPE_ID|PRSN_ALC_RSLT_ID|PRSN_BAC_TEST_RSLT|PRSN_DRG_SPEC_TYPE_ID|PRSN_DRG_RSLT_ID|DRVR_DRG_CAT_1_ID|PRSN_DEATH_TIME|INCAP_INJRY_CNT|NONINCAP_INJRY_CNT|POSS_INJRY_CNT|NON_INJRY_CNT|UNKN_INJRY_CNT|TOT_INJRY_CNT|DEATH_CNT|DRVR_LIC_TYPE_ID      |DRVR_LIC_STATE_ID|DRVR_LIC_CLS_ID|DRVR_ZIP|
+-

In [15]:
df_restrict_use.show(5,False)

+--------+--------+----------------------------------+
|CRASH_ID|UNIT_NBR|DRVR_LIC_RESTRIC_ID               |
+--------+--------+----------------------------------+
|14839097|3       |WITH CORRECTIVE LENSES            |
|14840787|1       |TRC 545.424 APPLIES UNTIL MM/DD/YY|
|14842529|2       |NONE                              |
|14843083|1       |UNLICENSED                        |
|14844479|1       |UNLICENSED                        |
+--------+--------+----------------------------------+
only showing top 5 rows



In [16]:
df_units_use.show(5,False)

+--------+--------+-------------+-------------+----------+----------------+-----------------+------------+------------+-----------+----------+---------------------+---------------+--------+-----------------+----------------------------+-----------------------------------------+-----------------+--------------+----------------------------------------+-----------------+--------------+------------------+-----------------+--------------------------+----------------------------+----------------------+-------------------+---------------+---------------------+---------------+------------------+--------------+-------------+--------------+-------------+---------+
|CRASH_ID|UNIT_NBR|UNIT_DESC_ID |VEH_PARKED_FL|VEH_HNR_FL|VEH_LIC_STATE_ID|VIN              |VEH_MOD_YEAR|VEH_COLOR_ID|VEH_MAKE_ID|VEH_MOD_ID|VEH_BODY_STYL_ID     |EMER_RESPNDR_FL|OWNR_ZIP|FIN_RESP_PROOF_ID|FIN_RESP_TYPE_ID            |VEH_DMAG_AREA_1_ID                       |VEH_DMAG_SCL_1_ID|FORCE_DIR_1_ID|VEH_DMAG_AREA_2_ID        

In [17]:
from pyspark.sql.functions import *

## Analytics 1: Find the number of crashes (accidents) in which number of males killed are greater than 2?

In [18]:
male_deaths_greater_than_2_df = df_primary_person_use.filter('PRSN_GNDR_ID == "MALE" and DEATH_CNT==1').select(["CRASH_ID"]).groupBy("CRASH_ID").agg(count("*").alias("count")).orderBy(desc("count")).filter("count > 2").count()
print("Number of crashes (accidents) in which number of males killed are greater than 2:",male_deaths_greater_than_2_df)

Number of crashes (accidents) in which number of males killed are greater than 2: 0


## Analysis 2: How many two wheelers are booked for crashes?

In [19]:
two_wheelers_df = df_units_use.filter(df_units_use.VEH_BODY_STYL_ID.contains("MOTORCYCLE")).dropDuplicates().count()

In [20]:
print("Count of 2-Wheelers Booked for Crash:",two_wheelers_df )

Count of 2-Wheelers Booked for Crash: 773


## Analysis 3: Determine the Top 5 Vehicle Makes of the cars present in the crashes in which driver died and Airbags did not deploy.

In [37]:
driver_death_airbag_not_deployed_df = df_primary_person_use.filter((col('DEATH_CNT') == 1) &
            (col('PRSN_AIRBAG_ID').isin('NOT DEPLOYED')) &
                             col('PRSN_TYPE_ID').isin('DRIVER')) \
    .select('CRASH_ID','PRSN_AIRBAG_ID','PRSN_TYPE_ID') \
    .distinct()
vehicle_make_df = df_units_use.select('CRASH_ID','VEH_MAKE_ID') \
    .distinct()

top_5_vehicle_makes_with_death_and_no_airbag_df = driver_death_airbag_not_deployed_df.join(vehicle_make_df, on=['CRASH_ID'], how='inner').groupBy('VEH_MAKE_ID').count().orderBy(desc('count')).select(['VEH_MAKE_ID']).limit(5)
top_5_vehicle_makes_with_death_and_no_airbag_df.show()

+-----------+
|VEH_MAKE_ID|
+-----------+
|  CHEVROLET|
|       FORD|
|         NA|
|      DODGE|
|     NISSAN|
+-----------+



## Analysis 4: Determine number of Vehicles with driver having valid licences involved in hit and run?

In [22]:
driver_details_df = df_primary_person_use.select(['CRASH_ID', 'DRVR_LIC_TYPE_ID']).filter((~col('DRVR_LIC_TYPE_ID').isin('NA','UNKNOWN','UNLICENSED')) & (col('PRSN_TYPE_ID').isin('DRIVER'))).distinct()
charges_details_df = df_charges_use.select(['CRASH_ID', 'CHARGE']).filter(upper(col('CHARGE')).contains('HIT AND RUN')).distinct()

print('Number of Vehicles with driver having valid licences involved in hit and run:', driver_details_df.join(charges_details_df, on=['CRASH_ID'], how='inner').count())

Number of Vehicles with driver having valid licences involved in hit and run: 49


 ## Analysis 5: Which state has highest number of accidents in which females are not involved

In [38]:
print('State has highest number of accidents in which females are not involved:', df_primary_person_use.select('DRVR_LIC_STATE_ID','PRSN_GNDR_ID').filter(~col('PRSN_GNDR_ID').isin('FEMALE')).groupBy('DRVR_LIC_STATE_ID').count().orderBy(desc('count')).select('DRVR_LIC_STATE_ID').limit(1).collect()[0][0])

State has highest number of accidents in which females are not involved: Texas


## Analysis 6: Which are the Top 3rd to 5th VEH_MAKE_IDs that contribute to a largest number of injuries including death

In [40]:
casualties_df = df_units_use.filter(df_units_use.VEH_MAKE_ID != "NA"). \
            withColumn('TOT_CASUALTIES_CNT', df_units_use[35] + df_units_use[36]). \
            groupby("VEH_MAKE_ID").sum("TOT_CASUALTIES_CNT"). \
            withColumnRenamed("sum(TOT_CASUALTIES_CNT)", "TOT_CASUALTIES"). \
            orderBy(col("TOT_CASUALTIES").desc())
df_top_casualties = casualties_df.limit(5).subtract(casualties_df.limit(2))
df_top_casualties.show()

+-----------+--------------+
|VEH_MAKE_ID|TOT_CASUALTIES|
+-----------+--------------+
|     TOYOTA|        4227.0|
|      DODGE|        3138.0|
|     NISSAN|        3114.0|
+-----------+--------------+



##  Analysis 7: For all the body styles involved in crashes, mention the top ethnic user group of each unique body style

In [25]:
from pyspark.sql.window import Window
joined_eth_style = df_primary_person_use.join(df_units_use,df_primary_person_use.CRASH_ID ==  df_units_use.CRASH_ID,"inner")
joined_eth_style. \
filter((~joined_eth_style.VEH_BODY_STYL_ID.isin(["NA", "UNKNOWN", "NOT REPORTED","OTHER  (EXPLAIN IN NARRATIVE)"])) & (~joined_eth_style.PRSN_ETHNICITY_ID.isin(["NA","UNKNOWN"]))). \
groupBy("VEH_BODY_STYL_ID","PRSN_ETHNICITY_ID").count(). \
withColumn("row_number", row_number().over(Window.partitionBy("VEH_BODY_STYL_ID"). \
                                             orderBy(col("VEH_BODY_STYL_ID").desc(),col("count").desc()))). \
                                             filter(col("row_number")==1). \
                                             orderBy(col("count").desc()). \
                                             drop("row_number", "count")
joined_eth_style.filter(joined_eth_style.VEH_BODY_STYL_ID == "POLICE MOTORCYCLE").groupBy("PRSN_ETHNICITY_ID").count().show()

+-----------------+-----+
|PRSN_ETHNICITY_ID|count|
+-----------------+-----+
|            WHITE|    2|
|            BLACK|    1|
|         HISPANIC|    3|
+-----------------+-----+



## Analysis 8: Among the crashed cars, what are the Top 5 Zip Codes with highest number crashes with alcohols as the contributing factor to a crash (Use Driver Zip Code)

In [26]:
df_units_use.join(df_primary_person_use, on=['CRASH_ID'], how='inner'). \
            dropna(subset=["DRVR_ZIP"]). \
            filter(col("CONTRIB_FACTR_1_ID").contains("ALCOHOL") | col("CONTRIB_FACTR_2_ID").contains("ALCOHOL")). \
            groupby("DRVR_ZIP").count().orderBy(col("count").desc()).show(5)

+--------+-----+
|DRVR_ZIP|count|
+--------+-----+
|   76010|   75|
|   78521|   61|
|   75067|   54|
|   78574|   47|
|   75052|   43|
+--------+-----+
only showing top 5 rows



## Analysis 9: Count of Distinct Crash IDs where No Damaged Property was observed and Damage Level (VEH_DMAG_SCL~) is above 4 and car avails Insurance


In [27]:
insured_no_dmg_property = df_units_use.join(df_damages_use, on=['CRASH_ID'], how='inner'). \
filter((col("DAMAGED_PROPERTY").rlike("NO DAMAGE")) | (col("DAMAGED_PROPERTY").rlike("NONE"))). \
filter((col("VEH_DMAG_SCL_1_ID").rlike("[5-9]")) | (col("VEH_DMAG_SCL_2_ID").rlike("[5-9]"))). \
filter((col("FIN_RESP_TYPE_ID") == "PROOF OF LIABILITY INSURANCE") | (col("FIN_RESP_TYPE_ID") == "INSURANCE BINDER") | (col("FIN_RESP_TYPE_ID") == "LIABILITY INSURANCE POLICY"))

In [28]:
dmg_df = df_units_use.join(df_damages_use, on=['CRASH_ID'], how='inner'). \
filter((col("DAMAGED_PROPERTY").rlike("NO DAMAGE")) | (col("DAMAGED_PROPERTY").rlike("NONE")))

NameError: name 'F' is not defined

## Analysis 10: Determine the Top 5 Vehicle Makes where drivers are charged with speeding related offences, has licensed Drivers, used top 10 used vehicle colours and has car licensed with the Top 25 states with highest number of offences (to be deduced from the data)

In [None]:
top_25_states = [row[0] for row in df_units_use.filter(col("VEH_LIC_STATE_ID").cast("int").isNull()).
            groupby("VEH_LIC_STATE_ID").count().orderBy(col("count").desc()).limit(25).collect()]
top_10_used_vcolors = [row[0] for row in df_units_use.filter(df_units_use.VEH_COLOR_ID != "NA").
            groupby("VEH_COLOR_ID").count().orderBy(col("count").desc()).limit(10).collect()]
top_5_veh_makes = df_charges_use.join(df_primary_person_use, on=['CRASH_ID'], how='inner'). \
            join(df_units_use, on=['CRASH_ID'], how='inner'). \
            filter(df_charges_use.CHARGE.contains("SPEED")). \
            filter(df_primary_person_use.DRVR_LIC_TYPE_ID.isin(["DRIVER LICENSE", "COMMERCIAL DRIVER LIC."])). \
            filter(df_units_use.VEH_COLOR_ID.isin(top_10_used_vcolors)). \
            filter(df_units_use.VEH_LIC_STATE_ID.isin(top_25_states)). \
            groupby("VEH_MAKE_ID").count(). \
            orderBy(col("count").desc()).limit(5)
list_veh = ""
for i in  top_5_veh_makes.select('VEH_MAKE_ID').rdd.flatMap(lambda x: x).collect():
  list_veh += i + ", "
list_veh.strip(", ")
print("Top 5 Vehicle Makes where drivers are charged with speeding related offences, has licensed Drivers, uses top 10 used vehicle colours and has car licensed with the Top 25 states with highest number of offences  are:",list_veh.strip(", "))