Import required spark packages

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, row_number
from pyspark.sql.window import Window

Create Spark Session

In [130]:
spark = SparkSession\
.builder\
.appName("BCG_Analysis")\
.getOrCreate()

Read all files in dataframes

In [131]:
df_charges = spark.read.csv("../data/Charges_use.csv", header = True)
df_damages = spark.read.csv("../data/Damages_use.csv", header = True)
df_endorse = spark.read.csv("../data/Endorse_use.csv", header = True)
df_primary_person = spark.read.csv("../data/Primary_Person_use.csv", header = True)
df_restrict = spark.read.csv("../data/Restrict_use.csv", header = True)
df_units = spark.read.csv("../data/Units_use.csv", header = True)

In [132]:
df_charges.printSchema()

root
 |-- CRASH_ID: string (nullable = true)
 |-- UNIT_NBR: string (nullable = true)
 |-- PRSN_NBR: string (nullable = true)
 |-- CHARGE: string (nullable = true)
 |-- CITATION_NBR: string (nullable = true)



In [133]:
df_damages.printSchema()

root
 |-- CRASH_ID: string (nullable = true)
 |-- DAMAGED_PROPERTY: string (nullable = true)



In [134]:
df_endorse.printSchema()

root
 |-- CRASH_ID: string (nullable = true)
 |-- UNIT_NBR: string (nullable = true)
 |-- DRVR_LIC_ENDORS_ID: string (nullable = true)



In [135]:
df_primary_person.printSchema()

root
 |-- CRASH_ID: string (nullable = true)
 |-- UNIT_NBR: string (nullable = true)
 |-- PRSN_NBR: string (nullable = true)
 |-- PRSN_TYPE_ID: string (nullable = true)
 |-- PRSN_OCCPNT_POS_ID: string (nullable = true)
 |-- PRSN_INJRY_SEV_ID: string (nullable = true)
 |-- PRSN_AGE: string (nullable = true)
 |-- PRSN_ETHNICITY_ID: string (nullable = true)
 |-- PRSN_GNDR_ID: string (nullable = true)
 |-- PRSN_EJCT_ID: string (nullable = true)
 |-- PRSN_REST_ID: string (nullable = true)
 |-- PRSN_AIRBAG_ID: string (nullable = true)
 |-- PRSN_HELMET_ID: string (nullable = true)
 |-- PRSN_SOL_FL: string (nullable = true)
 |-- PRSN_ALC_SPEC_TYPE_ID: string (nullable = true)
 |-- PRSN_ALC_RSLT_ID: string (nullable = true)
 |-- PRSN_BAC_TEST_RSLT: string (nullable = true)
 |-- PRSN_DRG_SPEC_TYPE_ID: string (nullable = true)
 |-- PRSN_DRG_RSLT_ID: string (nullable = true)
 |-- DRVR_DRG_CAT_1_ID: string (nullable = true)
 |-- PRSN_DEATH_TIME: string (nullable = true)
 |-- INCAP_INJRY_CNT: string

In [136]:
df_restrict.printSchema()

root
 |-- CRASH_ID: string (nullable = true)
 |-- UNIT_NBR: string (nullable = true)
 |-- DRVR_LIC_RESTRIC_ID: string (nullable = true)



In [137]:
df_units.printSchema()

root
 |-- CRASH_ID: string (nullable = true)
 |-- UNIT_NBR: string (nullable = true)
 |-- UNIT_DESC_ID: string (nullable = true)
 |-- VEH_PARKED_FL: string (nullable = true)
 |-- VEH_HNR_FL: string (nullable = true)
 |-- VEH_LIC_STATE_ID: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- VEH_MOD_YEAR: string (nullable = true)
 |-- VEH_COLOR_ID: string (nullable = true)
 |-- VEH_MAKE_ID: string (nullable = true)
 |-- VEH_MOD_ID: string (nullable = true)
 |-- VEH_BODY_STYL_ID: string (nullable = true)
 |-- EMER_RESPNDR_FL: string (nullable = true)
 |-- OWNR_ZIP: string (nullable = true)
 |-- FIN_RESP_PROOF_ID: string (nullable = true)
 |-- FIN_RESP_TYPE_ID: string (nullable = true)
 |-- VEH_DMAG_AREA_1_ID: string (nullable = true)
 |-- VEH_DMAG_SCL_1_ID: string (nullable = true)
 |-- FORCE_DIR_1_ID: string (nullable = true)
 |-- VEH_DMAG_AREA_2_ID: string (nullable = true)
 |-- VEH_DMAG_SCL_2_ID: string (nullable = true)
 |-- FORCE_DIR_2_ID: string (nullable = true)
 |-- V

1.	Analytics 1: Find the number of crashes (accidents) in which number of males killed are greater than 2?

In [138]:
df_primary_person.select('CRASH_ID').dropDuplicates().count()

83805

In [139]:
df_units.select('CRASH_ID').dropDuplicates().count()

83805

In [140]:
df_1 = df_primary_person.select('CRASH_ID','DEATH_CNT')\
        .withColumn('DEATH_CNT',col('DEATH_CNT').cast('int'))\
        .filter(
        df_primary_person.PRSN_GNDR_ID == 'MALE'
        ).groupBy('CRASH_ID')\
.agg(sum('DEATH_CNT').alias('COUNT_DEATH_CNT'))

In [141]:
df_1.filter(col('COUNT_DEATH_CNT')>2).show()

+--------+---------------+
|CRASH_ID|COUNT_DEATH_CNT|
+--------+---------------+
+--------+---------------+



2.	Analysis 2: How many two wheelers are booked for crashes? 

In [142]:
df_units.select('VEH_BODY_STYL_ID').distinct().show()

+--------------------+
|    VEH_BODY_STYL_ID|
+--------------------+
|                 BUS|
|                  NA|
|                 VAN|
|              PICKUP|
|SPORT UTILITY VEH...|
|PASSENGER CAR, 4-...|
|          FIRE TRUCK|
|               TRUCK|
|             UNKNOWN|
|           AMBULANCE|
|    POLICE CAR/TRUCK|
|          MOTORCYCLE|
|   YELLOW SCHOOL BUS|
|   POLICE MOTORCYCLE|
|PASSENGER CAR, 2-...|
|       TRUCK TRACTOR|
|      FARM EQUIPMENT|
|NEV-NEIGHBORHOOD ...|
|OTHER  (EXPLAIN I...|
|        NOT REPORTED|
+--------------------+



In [143]:
df_2 = df_units.select('CRASH_ID','VEH_BODY_STYL_ID')\
    .filter(col('VEH_BODY_STYL_ID').isin('MOTORCYCLE','POLICE MOTORCYCLE'))
df_2.count()

784

3.	Analysis 3: Determine the Top 5 Vehicle Makes of the cars present in the crashes in which driver died and Airbags did not deploy.

In [144]:
df_units.select('VEH_MAKE_ID').distinct().show()

+--------------------+
|         VEH_MAKE_ID|
+--------------------+
|UNITED EXPRESS LI...|
|         UTILIMASTER|
| AMERICAN IRON HORSE|
|               ACURA|
|                NOVA|
|            FRUEHAUF|
|               WHITE|
| MOTOR COACH MND INC|
|             MCLAREN|
|              PIERCE|
|             PORSCHE|
|            WHITEGMC|
|        FREIGHTLINER|
|               BUELL|
|            STERLING|
|             HYUNDAI|
|       INTERNATIONAL|
| PETER PIRSCH & SONS|
|                FIAT|
|              GILLIG|
+--------------------+
only showing top 20 rows



In [145]:
df_units_person = df_units.join(df_primary_person,on = ['CRASH_ID','UNIT_NBR'],how = 'inner')\
.select(df_units.CRASH_ID,df_units.UNIT_NBR,df_units.VEH_MAKE_ID,
        df_primary_person.PRSN_TYPE_ID,df_primary_person.PRSN_INJRY_SEV_ID,df_primary_person.PRSN_AIRBAG_ID)

In [146]:
df_units_person.count()

162285

In [147]:
df_3 = df_units_person\
.filter((col('PRSN_TYPE_ID').isin('DRIVER','DRIVER OF MOTORCYCLE')) 
        & (col('PRSN_INJRY_SEV_ID')=='KILLED') 
        & (col('PRSN_AIRBAG_ID')=='NOT DEPLOYED'))\
.groupBy('VEH_MAKE_ID').count().orderBy(col('count').desc()).limit(5)

In [148]:
df_3.show()

[Stage 259:>                                                        (0 + 1) / 1]

+-------------+-----+
|  VEH_MAKE_ID|count|
+-------------+-----+
|    CHEVROLET|    8|
|         FORD|    6|
|       NISSAN|    5|
|INTERNATIONAL|    2|
|        DODGE|    2|
+-------------+-----+



                                                                                

4.	Analysis 4: Determine number of Vehicles with driver having valid licences involved in hit and run? 

In [149]:
df_primary_person.select('DRVR_LIC_TYPE_ID').distinct().show()

+--------------------+
|    DRVR_LIC_TYPE_ID|
+--------------------+
|                  NA|
|COMMERCIAL DRIVER...|
|             ID CARD|
|             UNKNOWN|
|        OCCUPATIONAL|
|          UNLICENSED|
|               OTHER|
|      DRIVER LICENSE|
+--------------------+



In [150]:
df_pp = df_primary_person.select('CRASH_ID','UNIT_NBR','DRVR_LIC_TYPE_ID')\
    .filter(col('DRVR_LIC_TYPE_ID').isin('DRIVER LICENSE', 'COMMERCIAL DRIVER LIC.'))

In [151]:
df_pp.count()

132430

In [152]:
df_u = df_units.select('CRASH_ID','UNIT_NBR','VEH_HNR_FL').filter(col('VEH_HNR_FL')=='Y')

In [153]:
df_u.count()

4971

In [154]:
df_4 = df_pp.join(df_u, on= ['CRASH_ID','UNIT_NBR']).count()
print(df_4)

2609


5.	Analysis 5: Which state has highest number of accidents in which females are not involved? 

In [155]:
df_primary_person.select('DRVR_LIC_STATE_ID').distinct().show()

[Stage 275:>                                                        (0 + 1) / 1]

+--------------------+
|   DRVR_LIC_STATE_ID|
+--------------------+
|                Utah|
|              Hawaii|
| U.S. Virgin Islands|
|           Minnesota|
|                Ohio|
|Northern Mariana ...|
|            Arkansas|
|              Oregon|
|                  NA|
|               Texas|
|        North Dakota|
|        Pennsylvania|
|         Connecticut|
|            Nebraska|
|             Vermont|
|              Nevada|
|         Puerto Rico|
|          Washington|
|            Illinois|
|            Oklahoma|
+--------------------+
only showing top 20 rows



                                                                                

In [156]:
df_primary_person.select('PRSN_GNDR_ID').distinct().show()

+------------+
|PRSN_GNDR_ID|
+------------+
|          NA|
|     UNKNOWN|
|        MALE|
|      FEMALE|
+------------+



In [157]:
df_filtered = df_primary_person.select('CRASH_ID','PRSN_GNDR_ID','DRVR_LIC_STATE_ID')\
.filter(col('PRSN_GNDR_ID')!='FEMALE').groupBy('DRVR_LIC_STATE_ID','CRASH_ID').count()

In [158]:
df_5 = df_filtered.groupBy('DRVR_LIC_STATE_ID').count()

In [159]:
df_5.orderBy(col('count').desc()).limit(1).show()

[Stage 281:>                                                        (0 + 1) / 1]

+-----------------+-----+
|DRVR_LIC_STATE_ID|count|
+-----------------+-----+
|            Texas|61589|
+-----------------+-----+



                                                                                

6.	Analysis 6: Which are the Top 3rd to 5th VEH_MAKE_IDs that contribute to a largest number of injuries including death

In [160]:
df_units.select('VEH_MAKE_ID').distinct().show()

+--------------------+
|         VEH_MAKE_ID|
+--------------------+
|UNITED EXPRESS LI...|
|         UTILIMASTER|
| AMERICAN IRON HORSE|
|               ACURA|
|                NOVA|
|            FRUEHAUF|
|               WHITE|
| MOTOR COACH MND INC|
|             MCLAREN|
|              PIERCE|
|             PORSCHE|
|            WHITEGMC|
|        FREIGHTLINER|
|               BUELL|
|            STERLING|
|             HYUNDAI|
|       INTERNATIONAL|
| PETER PIRSCH & SONS|
|                FIAT|
|              GILLIG|
+--------------------+
only showing top 20 rows



In [161]:
df_6 = df_units.select('VEH_MAKE_ID','TOT_INJRY_CNT','DEATH_CNT')\
.withColumn('INJURIES_CNT', col('TOT_INJRY_CNT').cast('int')+col('DEATH_CNT').cast('int'))\
.groupBy('VEH_MAKE_ID').agg(sum('INJURIES_CNT').alias('INJURIES_CNT'))\
.orderBy(col('INJURIES_CNT').desc())\
.limit(5)


In [162]:
df_6.subtract(df_6.limit(2)).show()

+-----------+------------+
|VEH_MAKE_ID|INJURIES_CNT|
+-----------+------------+
|     TOYOTA|        4228|
|      DODGE|        3146|
|     NISSAN|        3118|
+-----------+------------+



7.	Analysis 7: For all the body styles involved in crashes, mention the top ethnic user group of each unique body style  

In [163]:
df_u = df_units\
        .select('CRASH_ID','UNIT_NBR','VEH_BODY_STYL_ID')\
        .filter(~df_units.VEH_BODY_STYL_ID.isin(['NA', 'UNKNOWN', 'NOT REPORTED', 'OTHER  (EXPLAIN IN NARRATIVE)']))
df_pp = df_primary_person\
        .select('CRASH_ID','UNIT_NBR','PRSN_ETHNICITY_ID')\
        .filter(~df_primary_person.PRSN_ETHNICITY_ID.isin(['NA', 'UNKNOWN']))

In [164]:
windoSpec = Window.partitionBy('VEH_BODY_STYL_ID').orderBy(col('count').desc())
df_7 = df_u.join(df_pp, on=['CRASH_ID','UNIT_NBR'], how='inner')\
            .groupby('VEH_BODY_STYL_ID', 'PRSN_ETHNICITY_ID')\
            .count()\
            .withColumn('ROW_NO', row_number().over(windoSpec))\
            .filter(col('ROW_NO') == 1)\
            .drop('ROW_NO', 'count')

In [165]:
df_7.show()

[Stage 296:>                                                        (0 + 1) / 1]

+--------------------+-----------------+
|    VEH_BODY_STYL_ID|PRSN_ETHNICITY_ID|
+--------------------+-----------------+
|           AMBULANCE|            WHITE|
|                 BUS|         HISPANIC|
|      FARM EQUIPMENT|            WHITE|
|          FIRE TRUCK|            WHITE|
|          MOTORCYCLE|            WHITE|
|NEV-NEIGHBORHOOD ...|            WHITE|
|PASSENGER CAR, 2-...|            WHITE|
|PASSENGER CAR, 4-...|            WHITE|
|              PICKUP|            WHITE|
|    POLICE CAR/TRUCK|            WHITE|
|   POLICE MOTORCYCLE|            WHITE|
|SPORT UTILITY VEH...|            WHITE|
|               TRUCK|            WHITE|
|       TRUCK TRACTOR|            WHITE|
|                 VAN|            WHITE|
|   YELLOW SCHOOL BUS|            BLACK|
+--------------------+-----------------+



                                                                                

8.	Analysis 8: Among the crashed cars, what are the Top 5 Zip Codes with highest number crashes with alcohols as the contributing factor to a crash (Use Driver Zip Code)

In [166]:
df_8 = df_units.join(df_primary_person, on=['CRASH_ID','UNIT_NBR'], how='inner')\
            .dropna(subset=['DRVR_ZIP'])\
            .filter(
                col('CONTRIB_FACTR_1_ID').contains('ALCOHOL')
                | col('CONTRIB_FACTR_2_ID').contains('ALCOHOL')
            )\
            .groupby('DRVR_ZIP')\
            .count()\
            .orderBy(col('count').desc())

In [167]:
df_8.limit(5).show()

+--------+-----+
|DRVR_ZIP|count|
+--------+-----+
|   76010|   52|
|   78521|   40|
|   75067|   37|
|   78130|   32|
|   78542|   31|
+--------+-----+



9.	Analysis 9: Count of Distinct Crash IDs where No Damaged Property was observed and Damage Level (VEH_DMAG_SCL~) is above 4 and car avails Insurance

In [168]:
df_u = df_units.select('CRASH_ID','FIN_RESP_TYPE_ID','VEH_DMAG_SCL_1_ID').filter(
                ((df_units.VEH_DMAG_SCL_1_ID > 'DAMAGED 4')
                & (~df_units.VEH_DMAG_SCL_1_ID.isin(['NA', 'NO DAMAGE', 'INVALID VALUE']))
                & (df_units.FIN_RESP_TYPE_ID == 'PROOF OF LIABILITY INSURANCE')
                ))
df_d = df_damages.select('CRASH_ID','DAMAGED_PROPERTY').filter(df_damages.DAMAGED_PROPERTY == 'NONE')

In [169]:
df_9 = df_d.join(df_u, on=['CRASH_ID'], how='inner')
df_9.count()

7

10.	Analysis 10: Determine the Top 5 Vehicle Makes where drivers are charged with speeding related offences, has licensed Drivers, used top 10 used vehicle colours and has car licensed with the Top 25 states with highest number of offences (to be deduced from the data)

In [170]:
df_state_top_25 = df_units.filter(col('VEH_LIC_STATE_ID').cast('int').isNull())\
            .groupby('VEH_LIC_STATE_ID')\
            .count()\
            .orderBy(col('count').desc())\
            .limit(25)\
            .select('VEH_LIC_STATE_ID')

df_colors_top_10 = df_units.filter(df_units.VEH_COLOR_ID != "NA")\
            .groupby('VEH_COLOR_ID')\
            .count()\
            .orderBy(col('count').desc())\
            .limit(10)\
            .select('VEH_COLOR_ID')

In [171]:
df_state_top_25.show()

+----------------+
|VEH_LIC_STATE_ID|
+----------------+
|              TX|
|              NA|
|              UN|
|              OK|
|              LA|
|              NM|
|              IN|
|              MX|
|              CA|
|              FL|
|              IL|
|              AR|
|              TN|
|              MS|
|              AZ|
|              KS|
|              MO|
|              GA|
|              CO|
|              NC|
+----------------+
only showing top 20 rows



In [172]:
df_colors_top_10.show()

+------------+
|VEH_COLOR_ID|
+------------+
|         WHI|
|         BLK|
|         SIL|
|         GRY|
|         BLU|
|         RED|
|         GRN|
|         MAR|
|         TAN|
|         GLD|
+------------+



In [173]:
df_10 = df_charges.join(df_primary_person, on=['CRASH_ID'], how='inner')\
            .join(df_units, on=['CRASH_ID'], how='inner')\
            .filter(df_charges.CHARGE.contains('SPEED'))\
            .filter(
                df_primary_person.DRVR_LIC_TYPE_ID.isin(
                    ['DRIVER LICENSE', 'COMMERCIAL DRIVER LIC.']
                )
            )\
            .filter(df_units.VEH_COLOR_ID.isin(df_colors_top_10[0]))\
            .filter(df_units.VEH_LIC_STATE_ID.isin(df_state_top_25[0]))\
            .groupby('VEH_MAKE_ID')\
            .count()\
            .orderBy(col('count').desc())\
            .limit(5)

In [None]:
df_10.show()

[Stage 318:>                                                        (0 + 1) / 1]

+-----------+-----+
|VEH_MAKE_ID|count|
+-----------+-----+
|       FORD|20396|
|  CHEVROLET|18228|
|     TOYOTA|12524|
|      DODGE| 8474|
|     NISSAN| 7720|
+-----------+-----+



                                                                                

24/12/01 01:54:03 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 945500 ms exceeds timeout 120000 ms
24/12/01 01:54:04 WARN SparkContext: Killing executors is not supported by current scheduler.
24/12/01 01:54:08 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$