In [None]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark

^C


In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import findspark
findspark.init()


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("CarCrashAnalysis")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
from zipfile import ZipFile

# loading the temp.zip and creating a zip object
with ZipFile("Data.zip", 'r') as zObject:

	# Extracting all the members of the zip
	# into a specific location.
	zObject.extractall(path="/Unzipped/")


In [None]:
df_charges_use = spark.read.option("header",True).csv("/Unzipped/Data/Charges_use.csv")
df_damages_use = spark.read.option("header",True).csv("/Unzipped/Data/Damages_use.csv")
df_endorse_use = spark.read.option("header",True).csv("/Unzipped/Data/Endorse_use.csv")
df_primary_person_use = spark.read.option("header",True).csv("/Unzipped/Data/Primary_Person_use.csv")
df_restrict_use = spark.read.option("header",True).csv("/Unzipped/Data/Restrict_use.csv")
df_units_use = spark.read.option("header",True).csv("/Unzipped/Data/Units_use.csv")

In [None]:
def deduplicate_df(dup_df):
  """
  De-duplicates a given dataframe
  :param dup_df: The DataFrame to be de-duplicated
  :return: De-duplicated dataframe
  """
  df = dup_df.dropDuplicates()
  return df

In [None]:
def load_csv_data_to_df(spark, file_path):
    """
    Read CSV data
    :param spark: spark instance
    :param file_path: path to the csv file
    :return: dataframe
    """
    return spark.read.option("inferSchema", "true").csv(file_path, header=True)

In [None]:
def extract_zipped_content(zipped_file_path, unzipped_files_path):
  """
  Extracts all the files present within a zipped folder to another location
  :param zipped_file_path: Location to the Zipped file
  :param unzipped_files_path: Location to the Extracted files
  """
  # loading the temp.zip and creating a zip object
  with ZipFile(f"{zipped_file_path}", 'r') as zobject:

	# Extracting all the members of the zip
	# into a specific location.
	    zobject.extractall(path=f"{unzipped_files_path}")
  print("Unzipping files successfully to : ",unzipped_files_path)


In [None]:

extract_zipped_content("/content/Data.zip","/content/Unzipped/")

Unzipping files successfully to :  /content/Unzipped/


In [None]:
df_charges_use = df_charges_use.dropDuplicates()
df_damages_use = df_damages_use.dropDuplicates()
df_endorse_use = df_endorse_use.dropDuplicates()
df_primary_person_use = df_primary_person_use.dropDuplicates()
df_restrict_use = df_restrict_use.dropDuplicates()
df_units_use = df_units_use.dropDuplicates()

In [None]:
#if df.count() > df.dropDuplicates([listOfColumns]).count():
 #   raise ValueError('Data has duplicates')

In [None]:
df_charges_use \
    .groupby(df_charges_use.columns) \
    .count() \
    .where('count > 1') \
    .sort('count', ascending=False) \
    .show()

+--------+--------+--------+------+------------+-----+
|CRASH_ID|UNIT_NBR|PRSN_NBR|CHARGE|CITATION_NBR|count|
+--------+--------+--------+------+------------+-----+
+--------+--------+--------+------+------------+-----+



In [None]:
df_charges_use.printSchema()

root
 |-- CRASH_ID: string (nullable = true)
 |-- UNIT_NBR: string (nullable = true)
 |-- PRSN_NBR: string (nullable = true)
 |-- CHARGE: string (nullable = true)
 |-- CITATION_NBR: string (nullable = true)



In [None]:
df_charges_use.show(5,truncate=True)

+--------+--------+--------+--------------------+------------+
|CRASH_ID|UNIT_NBR|PRSN_NBR|              CHARGE|CITATION_NBR|
+--------+--------+--------+--------------------+------------+
|14839048|       1|       1|EVADING IN  A VEH...| 2016-000008|
|14842539|       1|       1|FAILURE TO CONTRO...|   138474673|
|14845037|       1|       1|DRIVERS LICENSE R...|    M105252B|
|14845764|       1|       1|FAIL TO DRIVE IN ...|   008388006|
|14845973|       1|       1|NO DRIVER'S LICEN...|TX4IEN0FTMG3|
+--------+--------+--------+--------------------+------------+
only showing top 5 rows



In [None]:
df_charges_use.count()

115909

In [None]:
df_charges_use = df_charges_use.dropDuplicates()

In [None]:
df_charges_use.count()

115909

In [None]:
df_damages_use.show(5,truncate=True)

+--------+--------------------+
|CRASH_ID|    DAMAGED_PROPERTY|
+--------+--------------------+
|14852553|       CABLE BARRIER|
|14853703|         LANDSCAPING|
|14862814|75 FT. OF GUARD RAIL|
|14867689|         CABLE LINES|
|14873013|          LIGHT POLE|
+--------+--------------------+
only showing top 5 rows



In [None]:
df_endorse_use.show(5,truncate=True)

+--------+--------+------------------+
|CRASH_ID|UNIT_NBR|DRVR_LIC_ENDORS_ID|
+--------+--------+------------------+
|14842529|       2|              NONE|
|14843083|       1|        UNLICENSED|
|14844479|       1|        UNLICENSED|
|14845535|       1|              NONE|
|14846619|       2|              NONE|
+--------+--------+------------------+
only showing top 5 rows



In [None]:
df_primary_person_use.show(5,truncate=True) #DRVR_LIC_STATE_ID

+--------+--------+--------+------------+------------------+--------------------+--------+-----------------+------------+------------+-------------------+---------------+--------------+-----------+---------------------+----------------+------------------+---------------------+----------------+-----------------+---------------+---------------+------------------+--------------+-------------+--------------+-------------+---------+--------------------+-----------------+---------------+--------+
|CRASH_ID|UNIT_NBR|PRSN_NBR|PRSN_TYPE_ID|PRSN_OCCPNT_POS_ID|   PRSN_INJRY_SEV_ID|PRSN_AGE|PRSN_ETHNICITY_ID|PRSN_GNDR_ID|PRSN_EJCT_ID|       PRSN_REST_ID| PRSN_AIRBAG_ID|PRSN_HELMET_ID|PRSN_SOL_FL|PRSN_ALC_SPEC_TYPE_ID|PRSN_ALC_RSLT_ID|PRSN_BAC_TEST_RSLT|PRSN_DRG_SPEC_TYPE_ID|PRSN_DRG_RSLT_ID|DRVR_DRG_CAT_1_ID|PRSN_DEATH_TIME|INCAP_INJRY_CNT|NONINCAP_INJRY_CNT|POSS_INJRY_CNT|NON_INJRY_CNT|UNKN_INJRY_CNT|TOT_INJRY_CNT|DEATH_CNT|    DRVR_LIC_TYPE_ID|DRVR_LIC_STATE_ID|DRVR_LIC_CLS_ID|DRVR_ZIP|
+-------

In [None]:
df_restrict_use.show(5,truncate=True)

+--------+--------+--------------------+
|CRASH_ID|UNIT_NBR| DRVR_LIC_RESTRIC_ID|
+--------+--------+--------------------+
|14839097|       3|WITH CORRECTIVE L...|
|14840787|       1|TRC 545.424 APPLI...|
|14842529|       2|                NONE|
|14843083|       1|          UNLICENSED|
|14844479|       1|          UNLICENSED|
+--------+--------+--------------------+
only showing top 5 rows



In [None]:
df_units_use.show(5,truncate=True) #VEH_LIC_STATE_ID #

+--------+--------+-------------+-------------+----------+----------------+-----------------+------------+------------+-----------+----------+--------------------+---------------+--------+-----------------+--------------------+--------------------+-----------------+--------------+--------------------+-----------------+--------------+------------------+-----------------+--------------------+--------------------+--------------------+-------------------+---------------+---------------------+---------------+------------------+--------------+-------------+--------------+-------------+---------+
|CRASH_ID|UNIT_NBR| UNIT_DESC_ID|VEH_PARKED_FL|VEH_HNR_FL|VEH_LIC_STATE_ID|              VIN|VEH_MOD_YEAR|VEH_COLOR_ID|VEH_MAKE_ID|VEH_MOD_ID|    VEH_BODY_STYL_ID|EMER_RESPNDR_FL|OWNR_ZIP|FIN_RESP_PROOF_ID|    FIN_RESP_TYPE_ID|  VEH_DMAG_AREA_1_ID|VEH_DMAG_SCL_1_ID|FORCE_DIR_1_ID|  VEH_DMAG_AREA_2_ID|VEH_DMAG_SCL_2_ID|FORCE_DIR_2_ID|VEH_INVENTORIED_FL|  VEH_TRANSP_NAME|     VEH_TRANSP_DEST|  CONTRIB_

## Analytics 1

In [None]:
#Analytics 1: Find the number of crashes (accidents) in which number of persons killed are male?
df_primary_person_use.select('PRSN_GNDR_ID').distinct().show()


+------------+
|PRSN_GNDR_ID|
+------------+
|          NA|
|     UNKNOWN|
|        MALE|
|      FEMALE|
+------------+



In [None]:
df_primary_person_use.select('PRSN_NBR').distinct().show()

+--------+
|PRSN_NBR|
+--------+
|       1|
+--------+



In [None]:
male_accidents_df = df_primary_person_use.filter((df_primary_person_use.PRSN_GNDR_ID == "MALE") & (df_primary_person_use.DEATH_CNT == 1))

In [None]:
male_accidents_df.show(5,truncate=True)

+--------+--------+--------+------------+--------------------+-----------------+--------+-----------------+------------+--------------+-------------------+--------------+--------------+-----------+---------------------+----------------+------------------+---------------------+----------------+-----------------+---------------+---------------+------------------+--------------+-------------+--------------+-------------+---------+--------------------+-----------------+---------------+--------+
|CRASH_ID|UNIT_NBR|PRSN_NBR|PRSN_TYPE_ID|  PRSN_OCCPNT_POS_ID|PRSN_INJRY_SEV_ID|PRSN_AGE|PRSN_ETHNICITY_ID|PRSN_GNDR_ID|  PRSN_EJCT_ID|       PRSN_REST_ID|PRSN_AIRBAG_ID|PRSN_HELMET_ID|PRSN_SOL_FL|PRSN_ALC_SPEC_TYPE_ID|PRSN_ALC_RSLT_ID|PRSN_BAC_TEST_RSLT|PRSN_DRG_SPEC_TYPE_ID|PRSN_DRG_RSLT_ID|DRVR_DRG_CAT_1_ID|PRSN_DEATH_TIME|INCAP_INJRY_CNT|NONINCAP_INJRY_CNT|POSS_INJRY_CNT|NON_INJRY_CNT|UNKN_INJRY_CNT|TOT_INJRY_CNT|DEATH_CNT|    DRVR_LIC_TYPE_ID|DRVR_LIC_STATE_ID|DRVR_LIC_CLS_ID|DRVR_ZIP|
+-------

In [None]:
male_accidents_df.count()

182

In [None]:
print("Number of Males who died in Car Crash are",male_accidents_df.count())


Number of Males who died in Car Crash are 182


## Analytics 2

In [None]:
#Analytics 2: How many two wheelers are booked for crashes
df_units_use.select('VEH_BODY_STYL_ID').distinct().show()

+--------------------+
|    VEH_BODY_STYL_ID|
+--------------------+
|                 BUS|
|                  NA|
|                 VAN|
|              PICKUP|
|SPORT UTILITY VEH...|
|PASSENGER CAR, 4-...|
|          FIRE TRUCK|
|               TRUCK|
|             UNKNOWN|
|           AMBULANCE|
|    POLICE CAR/TRUCK|
|          MOTORCYCLE|
|   YELLOW SCHOOL BUS|
|   POLICE MOTORCYCLE|
|PASSENGER CAR, 2-...|
|       TRUCK TRACTOR|
|      FARM EQUIPMENT|
|NEV-NEIGHBORHOOD ...|
|OTHER  (EXPLAIN I...|
|        NOT REPORTED|
+--------------------+



In [None]:
two_wheelers_df = df_units_use.filter(df_units_use.VEH_BODY_STYL_ID.contains("MOTORCYCLE"))

In [None]:
two_wheelers_df = two_wheelers_df.dropDuplicates()

In [None]:
print("Count of 2-Wheelers Booked for Crash:",two_wheelers_df.count() )

Count of 2-Wheelers Booked for Crash: 773


## Analytics 3

In [None]:
# Which state has highest number of accidents in which females are involved?

In [None]:
from pyspark.sql import functions as F
female_state_accidents_df = df_primary_person_use.filter(df_primary_person_use.PRSN_GNDR_ID == "FEMALE"). \
            groupby("DRVR_LIC_STATE_ID").count(). \
            orderBy(F.col("count").desc())

In [None]:
female_state_accidents_df.show(1)
female_state_accidents = female_state_accidents_df.select("DRVR_LIC_STATE_ID")

+-----------------+-----+
|DRVR_LIC_STATE_ID|count|
+-----------------+-----+
|            Texas|53319|
+-----------------+-----+
only showing top 1 row



In [None]:
print("State with highest number of accidents in which Females are involved is:", female_state_accidents.collect()[0][0])
#female_state_accidents.first().DRVR_LIC_STATE_ID

State with highest number of accidents in which Females are involved is: Texas


## Analytics 4

In [None]:
# Which are the Top 5th to 15th VEH_MAKE_IDs that contribute to a largest number of injuries including death

In [None]:
df_units_use.select("VEH_MAKE_ID").distinct().show(df_units_use.count(),False)

+---------------------------------+
|VEH_MAKE_ID                      |
+---------------------------------+
|UNITED EXPRESS LINE INC          |
|UTILIMASTER                      |
|AMERICAN IRON HORSE              |
|ACURA                            |
|NOVA                             |
|FRUEHAUF                         |
|WHITE                            |
|MOTOR COACH MND INC              |
|MCLAREN                          |
|PIERCE                           |
|PORSCHE                          |
|WHITEGMC                         |
|FREIGHTLINER                     |
|BUELL                            |
|STERLING                         |
|HYUNDAI                          |
|INTERNATIONAL                    |
|PETER PIRSCH & SONS              |
|FIAT                             |
|GILLIG                           |
|NA                               |
|MIDLAND MANUFACTURING LIMITED    |
|DATSUN                           |
|CAN-AM                           |
|HYUNDAI STEEL INDUSTRIES   

In [None]:
casualties_df = df_units_use.filter(df_units_use.VEH_MAKE_ID != "NA"). \
            withColumn('TOT_CASUALTIES_CNT', df_units_use[35] + df_units_use[36]). \
            groupby("VEH_MAKE_ID").sum("TOT_CASUALTIES_CNT"). \
            withColumnRenamed("sum(TOT_CASUALTIES_CNT)", "TOT_CASUALTIES"). \
            orderBy(F.col("TOT_CASUALTIES").desc())

In [None]:
casualties_df.show()

+-------------+--------------+
|  VEH_MAKE_ID|TOT_CASUALTIES|
+-------------+--------------+
|    CHEVROLET|        7000.0|
|         FORD|        6944.0|
|       TOYOTA|        4227.0|
|        DODGE|        3138.0|
|       NISSAN|        3114.0|
|        HONDA|        2892.0|
|          GMC|        1246.0|
|      HYUNDAI|        1103.0|
|          KIA|        1049.0|
|         JEEP|         988.0|
|     CHRYSLER|         955.0|
|        MAZDA|         711.0|
|   VOLKSWAGEN|         582.0|
|      PONTIAC|         564.0|
|        LEXUS|         523.0|
|        BUICK|         521.0|
|   MITSUBISHI|         510.0|
|     CADILLAC|         498.0|
|      MERCURY|         397.0|
|MERCEDES-BENZ|         393.0|
+-------------+--------------+
only showing top 20 rows



In [None]:
df_top_casualties = casualties_df.limit(14).subtract(casualties_df.limit(4))

In [None]:
df_top_casualties.show()

+-----------+--------------+
|VEH_MAKE_ID|TOT_CASUALTIES|
+-----------+--------------+
|     NISSAN|        3114.0|
|      HONDA|        2892.0|
|        GMC|        1246.0|
|    HYUNDAI|        1103.0|
|        KIA|        1049.0|
|       JEEP|         988.0|
|   CHRYSLER|         955.0|
|      MAZDA|         711.0|
| VOLKSWAGEN|         582.0|
|    PONTIAC|         564.0|
+-----------+--------------+



In [None]:
from pyspark.sql.window import Window
"""w = Window.orderBy(F.col("TOT_CASUALTIES").desc())
casualties_df = casualties_df.withColumn("index", F.row_number().over(w))
casualties_df.show()"""

'w = Window.orderBy(F.col("TOT_CASUALTIES").desc())\ncasualties_df = casualties_df.withColumn("index", F.row_number().over(w))\ncasualties_df.show()'

In [None]:
list_veh = ""
for i in  df_top_casualties.select('VEH_MAKE_ID').rdd.flatMap(lambda x: x).collect():
  list_veh += i + ", "
list_veh.strip(", ")

'NISSAN, HONDA, GMC, HYUNDAI, KIA, JEEP, CHRYSLER, MAZDA, VOLKSWAGEN, PONTIAC'

In [None]:
print("Top 5th to 15th VEH_MAKE_IDs that contribute to a largest number of injuries including death are: ", list_veh.strip(", "))

Top 5th to 15th VEH_MAKE_IDs that contribute to a largest number of injuries including death are:  NISSAN, HONDA, GMC, HYUNDAI, KIA, JEEP, CHRYSLER, MAZDA, VOLKSWAGEN, PONTIAC


## Analytics 5

In [None]:
#For all the body styles involved in crashes, mention the top ethnic user group of each unique body style
joined_eth_style = df_primary_person_use.join(df_units_use,df_primary_person_use.CRASH_ID ==  df_units_use.CRASH_ID,"inner") 

In [None]:
joined_eth_style. \
filter((~joined_eth_style.VEH_BODY_STYL_ID.isin(["NA", "UNKNOWN", "NOT REPORTED","OTHER  (EXPLAIN IN NARRATIVE)"])) & (~joined_eth_style.PRSN_ETHNICITY_ID.isin(["NA","UNKNOWN"]))). \
groupBy("VEH_BODY_STYL_ID","PRSN_ETHNICITY_ID").count(). \
withColumn("row_number", F.row_number().over(Window.partitionBy("VEH_BODY_STYL_ID"). \
                                             orderBy(F.col("VEH_BODY_STYL_ID").desc(),F.col("count").desc()))). \
                                             filter(F.col("row_number")==1). \
                                             orderBy(F.col("count").desc()). \
                                             drop("row_number", "count"). \
                                             show()

+--------------------+-----------------+
|    VEH_BODY_STYL_ID|PRSN_ETHNICITY_ID|
+--------------------+-----------------+
|PASSENGER CAR, 4-...|            WHITE|
|              PICKUP|            WHITE|
|SPORT UTILITY VEH...|            WHITE|
|PASSENGER CAR, 2-...|            WHITE|
|                 VAN|            WHITE|
|               TRUCK|            WHITE|
|       TRUCK TRACTOR|            WHITE|
|          MOTORCYCLE|            WHITE|
|    POLICE CAR/TRUCK|            WHITE|
|                 BUS|         HISPANIC|
|   YELLOW SCHOOL BUS|            WHITE|
|           AMBULANCE|            WHITE|
|          FIRE TRUCK|            WHITE|
|      FARM EQUIPMENT|            WHITE|
|NEV-NEIGHBORHOOD ...|            WHITE|
|   POLICE MOTORCYCLE|         HISPANIC|
+--------------------+-----------------+



In [None]:
joined_eth_style. \
filter((~joined_eth_style.VEH_BODY_STYL_ID.isin(["NA", "UNKNOWN", "NOT REPORTED","OTHER  (EXPLAIN IN NARRATIVE)"])) & (~joined_eth_style.PRSN_ETHNICITY_ID.isin(["NA","UNKNOWN"]))). \
groupBy("VEH_BODY_STYL_ID","PRSN_ETHNICITY_ID").count(). \
withColumn("row_number", F.row_number().over(Window.partitionBy("VEH_BODY_STYL_ID"). \
                                             orderBy(F.col("VEH_BODY_STYL_ID").desc(),F.col("count").desc()))). \
                                             filter(F.col("row_number")==1). \
                                             orderBy(F.col("count").desc()). \
                                             show()

+--------------------+-----------------+-----+----------+
|    VEH_BODY_STYL_ID|PRSN_ETHNICITY_ID|count|row_number|
+--------------------+-----------------+-----+----------+
|PASSENGER CAR, 4-...|            WHITE|58283|         1|
|              PICKUP|            WHITE|38295|         1|
|SPORT UTILITY VEH...|            WHITE|33884|         1|
|PASSENGER CAR, 2-...|            WHITE| 9865|         1|
|                 VAN|            WHITE| 5206|         1|
|               TRUCK|            WHITE| 3347|         1|
|       TRUCK TRACTOR|            WHITE| 2951|         1|
|          MOTORCYCLE|            WHITE|  840|         1|
|    POLICE CAR/TRUCK|            WHITE|  366|         1|
|                 BUS|         HISPANIC|  208|         1|
|   YELLOW SCHOOL BUS|            WHITE|  131|         1|
|           AMBULANCE|            WHITE|   90|         1|
|          FIRE TRUCK|            WHITE|   69|         1|
|      FARM EQUIPMENT|            WHITE|   54|         1|
|NEV-NEIGHBORH

In [None]:
joined_eth_style.filter(joined_eth_style.VEH_BODY_STYL_ID == "POLICE MOTORCYCLE").groupBy("PRSN_ETHNICITY_ID").count().show()

+-----------------+-----+
|PRSN_ETHNICITY_ID|count|
+-----------------+-----+
|            WHITE|    2|
|            BLACK|    1|
|         HISPANIC|    3|
+-----------------+-----+



## Analytics 6

In [None]:
#Among the crashed cars, what are the Top 5 Zip Codes with highest number crashes with alcohols as the 
#contributing factor to a crash (Use Driver Zip Code)

In [None]:
test = df_units_use.join(df_primary_person_use, on=['CRASH_ID'], how='inner'). \
            dropna(subset=["DRVR_ZIP"]). \
            filter(F.col("CONTRIB_FACTR_1_ID").contains("ALCOHOL") | F.col("CONTRIB_FACTR_2_ID").contains("ALCOHOL")). \
            groupby("DRVR_ZIP").count().orderBy(F.col("count").desc())

In [None]:
test = df_units_use.join(df_primary_person_use, on=['CRASH_ID'], how='inner'). \
            dropna(subset=["DRVR_ZIP"]). \
            filter(F.col("CONTRIB_FACTR_1_ID").contains("ALCOHOL") | F.col("CONTRIB_FACTR_2_ID").contains("ALCOHOL"))

In [None]:
test.select("CONTRIB_FACTR_1_ID").filter(F.col("CONTRIB_FACTR_1_ID").contains("ALCOHOL")).distinct().show(truncate=False)

+-------------------------+
|CONTRIB_FACTR_1_ID       |
+-------------------------+
|UNDER INFLUENCE - ALCOHOL|
+-------------------------+



In [None]:
test.select("CONTRIB_FACTR_2_ID").filter(F.col("CONTRIB_FACTR_2_ID").contains("ALCOHOL")).distinct().show(truncate=False)

+-------------------------+
|CONTRIB_FACTR_2_ID       |
+-------------------------+
|UNDER INFLUENCE - ALCOHOL|
+-------------------------+



In [None]:
df_units_use.join(df_primary_person_use, on=['CRASH_ID'], how='inner'). \
            dropna(subset=["DRVR_ZIP"]). \
            filter(F.col("CONTRIB_FACTR_1_ID").contains("ALCOHOL") | F.col("CONTRIB_FACTR_2_ID").contains("ALCOHOL")). \
            groupby("DRVR_ZIP").count().orderBy(F.col("count").desc()).show(5)

+--------+-----+
|DRVR_ZIP|count|
+--------+-----+
|   76010|   75|
|   78521|   61|
|   75067|   54|
|   78574|   47|
|   75052|   43|
+--------+-----+
only showing top 5 rows



## Analytics 7

In [None]:
#Count of Distinct Crash IDs where No Damaged Property was observed and Damage Level (VEH_DMAG_SCL~) is above 4 and car avails Insurance

In [None]:
insured_no_dmg_property = df_units_use.join(df_damages_use, on=['CRASH_ID'], how='inner'). \
filter((F.col("DAMAGED_PROPERTY").rlike("NO DAMAGE") | (F.col("DAMAGED_PROPERTY").rlike("NONE"))) 
& ((F.col("VEH_DMAG_SCL_1_ID").rlike("[5-9]")) | ((F.col("VEH_DMAG_SCL_2_ID").rlike("[5-9]"))))). \
filter((F.col("FIN_RESP_TYPE_ID") == "PROOF OF LIABILITY INSURANCE") | (F.col("FIN_RESP_TYPE_ID") == "INSURANCE BINDER") | (F.col("FIN_RESP_TYPE_ID") == "LIABILITY INSURANCE POLICY"))

In [None]:
insured_no_dmg_property = df_units_use.join(df_damages_use, on=['CRASH_ID'], how='inner'). \
filter((F.col("DAMAGED_PROPERTY").rlike("NO DAMAGE")) | (F.col("DAMAGED_PROPERTY").rlike("NONE"))). \
filter((F.col("VEH_DMAG_SCL_1_ID").rlike("[5-9]")) | (F.col("VEH_DMAG_SCL_2_ID").rlike("[5-9]"))). \
filter((F.col("FIN_RESP_TYPE_ID") == "PROOF OF LIABILITY INSURANCE") | (F.col("FIN_RESP_TYPE_ID") == "INSURANCE BINDER") | (F.col("FIN_RESP_TYPE_ID") == "LIABILITY INSURANCE POLICY"))

In [None]:
insured_no_dmg_property.show(truncate=False)

+--------+--------+-------------+-------------+----------+----------------+-----------------+------------+------------+-----------+----------+---------------------+---------------+--------+-----------------+----------------------------+---------------------------------------------+-----------------+--------------+--------------------------------------+-----------------+--------------+------------------+---------------------------------+---------------------------------------+------------------------------+------------------+----------------------------+---------------+---------------------+---------------+------------------+--------------+-------------+--------------+-------------+---------+---------------------------------------+
|CRASH_ID|UNIT_NBR|UNIT_DESC_ID |VEH_PARKED_FL|VEH_HNR_FL|VEH_LIC_STATE_ID|VIN              |VEH_MOD_YEAR|VEH_COLOR_ID|VEH_MAKE_ID|VEH_MOD_ID|VEH_BODY_STYL_ID     |EMER_RESPNDR_FL|OWNR_ZIP|FIN_RESP_PROOF_ID|FIN_RESP_TYPE_ID            |VEH_DMAG_AREA_1_ID     

In [None]:
df_units_use.select("VEH_DMAG_SCL_2_ID").distinct().show(truncate=False)

+-----------------+
|VEH_DMAG_SCL_2_ID|
+-----------------+
|DAMAGED 4        |
|INVALID VALUE    |
|NA               |
|DAMAGED 5        |
|DAMAGED 1 MINIMUM|
|DAMAGED 3        |
|NO DAMAGE        |
|DAMAGED 7 HIGHEST|
|DAMAGED 2        |
|DAMAGED 6        |
+-----------------+



In [None]:
dmg_df = df_units_use.join(df_damages_use, on=['CRASH_ID'], how='inner'). \
filter((F.col("DAMAGED_PROPERTY").rlike("NO DAMAGE")) | (F.col("DAMAGED_PROPERTY").rlike("NONE")))

In [None]:
dmg_df.select("VEH_DMAG_SCL_2_ID").distinct().show(truncate=False)

+-----------------+
|VEH_DMAG_SCL_2_ID|
+-----------------+
|NA               |
|DAMAGED 1 MINIMUM|
|DAMAGED 3        |
|NO DAMAGE        |
|DAMAGED 2        |
+-----------------+



In [None]:
print("Distinct Crash IDs where No Damaged Property was observed and Damage Level (VEH_DMAG_SCL~) is above 4 and car avails Insurance",insured_no_dmg_property.select("CRASH_ID").distinct().count())

Distinct Crash IDs where No Damaged Property was observed and Damage Level (VEH_DMAG_SCL~) is above 4 and car avails Insurance 10


## Analytics 8

In [None]:
#Determine the Top 5 Vehicle Makes where drivers are charged with speeding related offences,
#has licensed Drivers, uses top 10 used vehicle colours and 
#has car licensed with the Top 25 states with highest number of offences (to be deduced from the data)

In [None]:
top_25_states = [row[0] for row in df_units_use.filter(F.col("VEH_LIC_STATE_ID").cast("int").isNull()).
            groupby("VEH_LIC_STATE_ID").count().orderBy(F.col("count").desc()).limit(25).collect()]
top_10_used_vcolors = [row[0] for row in df_units_use.filter(df_units_use.VEH_COLOR_ID != "NA").
            groupby("VEH_COLOR_ID").count().orderBy(F.col("count").desc()).limit(10).collect()]

In [None]:
top_5_veh_makes = df_charges_use.join(df_primary_person_use, on=['CRASH_ID'], how='inner'). \
            join(df_units_use, on=['CRASH_ID'], how='inner'). \
            filter(df_charges_use.CHARGE.contains("SPEED")). \
            filter(df_primary_person_use.DRVR_LIC_TYPE_ID.isin(["DRIVER LICENSE", "COMMERCIAL DRIVER LIC."])). \
            filter(df_units_use.VEH_COLOR_ID.isin(top_10_used_vcolors)). \
            filter(df_units_use.VEH_LIC_STATE_ID.isin(top_25_states)). \
            groupby("VEH_MAKE_ID").count(). \
            orderBy(F.col("count").desc()).limit(5)

In [None]:
list_veh = ""
for i in  top_5_veh_makes.select('VEH_MAKE_ID').rdd.flatMap(lambda x: x).collect():
  list_veh += i + ", "
list_veh.strip(", ")

'FORD, CHEVROLET, TOYOTA, DODGE, NISSAN'

In [None]:
print("Top 5 Vehicle Makes where drivers are charged with speeding related offences, has licensed Drivers, uses top 10 used vehicle colours and has car licensed with the Top 25 states with highest number of offences  are:",list_veh.strip(", "))

Top 5 Vehicle Makes where drivers are charged with speeding related offences, has licensed Drivers, uses top 10 used vehicle colours and has car licensed with the Top 25 states with highest number of offences  are: FORD, CHEVROLET, TOYOTA, DODGE, NISSAN
