In [3]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
import pyspark.sql.functions as F
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import filepath

In [4]:
def prepareDF(fileformat,fpath):
    df=spark.read.format(fileformat)\
            .option("header", True)\
            .option("sep",",") \
            .load(fpath)
    return df

In [5]:
chargesdf=prepareDF(filepath.fformat,filepath.chargespath)
damagesdf=prepareDF(filepath.fformat,filepath.damagespath)
endorsedf=prepareDF(filepath.fformat,filepath.endorsepath)
primarypersondf=prepareDF(filepath.fformat,filepath.primarypersonpath)
restrictdf=prepareDF(filepath.fformat,filepath.restrictpath)
unitsdf=prepareDF(filepath.fformat,filepath.unitspath)

AttributeError: 'module' object has no attribute 'fformat'

In [4]:
##Analytics 1: Find the number of crashes (accidents) in which number of persons killed are male?

a1=primarypersondf.filter((primarypersondf.PRSN_GNDR_ID =='MALE') 
                          & (primarypersondf.PRSN_INJRY_SEV_ID =='KILLED')).select('CRASH_ID').distinct()
print('Number of Crashes in which persons killed are male is ',a1.count())

('Number of Crashes in which persons killed are male is ', 180)


In [5]:
##Analysis 2: How many two wheelers are booked for crashes?

interdf=unitsdf.filter((unitsdf.VEH_BODY_STYL_ID =='MOTORCYCLE') | (unitsdf.VEH_BODY_STYL_ID == 'POLICE MOTORCYCLE'))
a2=chargesdf.join(interdf,['CRASH_ID'],'inner').select('CRASH_ID').distinct()
print('Number of Two Wheelers booked for crashes are ',a2.count())

('Number of Two Wheelers booked for crashes are ', 757)


In [6]:
##Analysis 3: Which state has highest number of accidents in which females are involved? 

femaledf=primarypersondf.filter((primarypersondf.PRSN_GNDR_ID =='FEMALE'))
a3=femaledf.groupBy('DRVR_LIC_STATE_ID').count().sort(col("count").desc()).select('DRVR_LIC_STATE_ID').limit(1)
print('The state which has highest number of accidents in which females involved is ')
a3.show()

The state which has highest number of accidents in which females involved is 
+-----------------+
|DRVR_LIC_STATE_ID|
+-----------------+
|            Texas|
+-----------------+



In [7]:
##Analysis 4: Which are the Top 5th to 15th VEH_MAKE_IDs that contribute to a 
##largest number of injuries including death

crashdf=primarypersondf.filter(col('PRSN_INJRY_SEV_ID')
                               .isin(['NA','UNKNOWN','NOT INJURED']) == False).select('CRASH_ID').distinct()
joineddf=crashdf.join(unitsdf,['CRASH_ID'],'left').drop_duplicates(subset = ['CRASH_ID'])

df=joineddf.groupBy('VEH_MAKE_ID').count().sort(col("count").desc())
def getrows(df, rownums=None):
    return df.rdd.zipWithIndex().filter(lambda x: x[1] in rownums).map(lambda x: x[0])

a=getrows(df, rownums=[i for i in range(4,15)])
schema = StructType([StructField("VEH_MAKE_ID", StringType()),
                     StructField("count", IntegerType())])
df2 = sqlContext.createDataFrame(a, schema) 
a4=df2.selectExpr("VEH_MAKE_ID as Top_5_to_15Th")
print('The top 5th to 15th VEH_MAKE_IDs that contribute to a largest number of injuries including death are')
a4.show(15)

The top 5th to 15th VEH_MAKE_IDs that contribute to a largest number of injuries including death are
+-------------+
|Top_5_to_15Th|
+-------------+
|        HONDA|
|       NISSAN|
|          GMC|
|         JEEP|
|      HYUNDAI|
|     CHRYSLER|
|          KIA|
|        MAZDA|
|      PONTIAC|
|        LEXUS|
|        BUICK|
+-------------+



In [11]:
##Analysis 5: For all the body styles involved in crashes, mention the top ethnic user group of each unique body styleÂ 

joindf=unitsdf.join(primarypersondf,['CRASH_ID'],'left')
testdf=joindf.filter((col('PRSN_ETHNICITY_ID')
                               .isin(['NA','UNKNOWN']) == False) & (col('VEH_BODY_STYL_ID')
                               .isin(['NOT REPORTED','NA']) == False))\
                               .groupBy('VEH_BODY_STYL_ID','PRSN_ETHNICITY_ID').count()

ranked =testdf.withColumn("rank", dense_rank().over(Window.partitionBy("VEH_BODY_STYL_ID").orderBy(desc("count"))))
a5=ranked.filter(col('rank').isin([1]) == True).select('VEH_BODY_STYL_ID','PRSN_ETHNICITY_ID')
print('The top ethnic user group of each unique body style are:')
a5.show(truncate=False)

The top ethnic user group of each unique body style are:
+---------------------------------+-----------------+
|VEH_BODY_STYL_ID                 |PRSN_ETHNICITY_ID|
+---------------------------------+-----------------+
|BUS                              |HISPANIC         |
|VAN                              |WHITE            |
|PICKUP                           |WHITE            |
|SPORT UTILITY VEHICLE            |WHITE            |
|PASSENGER CAR, 4-DOOR            |WHITE            |
|FIRE TRUCK                       |WHITE            |
|TRUCK                            |WHITE            |
|UNKNOWN                          |WHITE            |
|AMBULANCE                        |WHITE            |
|POLICE CAR/TRUCK                 |WHITE            |
|MOTORCYCLE                       |WHITE            |
|YELLOW SCHOOL BUS                |WHITE            |
|POLICE MOTORCYCLE                |HISPANIC         |
|PASSENGER CAR, 2-DOOR            |WHITE            |
|TRUCK TRACTOR           

In [12]:
#Analysis 6: Among the crashed cars, what are the Top 5 Zip Codes 
#with highest number crashes with alcohols as the contributing factor to a crash (Use Driver Zip Code)

a6=primarypersondf.filter((col('PRSN_ALC_RSLT_ID').isin(['Positive']) == True) & (col('DRVR_ZIP')
                               .isin(['null']) == False)).groupBy('DRVR_ZIP').count().sort(col("count").desc())\
                               .select('DRVR_ZIP')
print('Top 5 Zip Codes with highest number crashes with alcohols as the contributing factor to a crash are')
a6.show(5)

Top 5 Zip Codes with highest number crashes with alcohols as the contributing factor to a crash are
+--------+
|DRVR_ZIP|
+--------+
|   78521|
|   76010|
|   79936|
|   79938|
|   79907|
+--------+
only showing top 5 rows



In [13]:
#Analysis 7: Count of Distinct Crash IDs where No Damaged Property was observed and Damage Level 
#(VEH_DMAG_SCL~) is above 4 and car avails Insurance

filteredunitdf=unitsdf.filter((col('VEH_DMAG_SCL_1_ID').isin(['DAMAGED 5','DAMAGED 6','DAMAGED 7 HIGHEST']) == True) & 
              (col('FIN_RESP_TYPE_ID').isin(['NA']) == False))
a7=filteredunitdf.join(damagesdf, ["CRASH_ID"], "leftanti").select('CRASH_ID').distinct()
print('Count of Distinct Crash IDs where No Damaged Property was observed and Damage\
      Level is above 4 and car avails Insurance is',a7.count())

('Count of Distinct Crash IDs where No Damaged Property was observed and Damage       Level is above 4 and car avails Insurance is', 8415)


In [16]:
#Analysis 8: Determine the Top 5 Vehicle Makes where drivers are charged with speeding related offences, 
#has licensed Drivers, uses top 10 used vehicle colours and has car licensed with the Top 25 states with 
#highest number of offences (to be deduced from the data)

df1=chargesdf.filter(chargesdf["CHARGE"].contains('SPEED'))
df2=primarypersondf.filter((col('DRVR_LIC_TYPE_ID').isin(['COMMERCIAL DRIVER LIC.','ID CARD','OCCUPATIONAL',
                                                          'DRIVER LICENSE'])) == True)
df3=unitsdf.groupBy('VEH_COLOR_ID').count().sort(col("count").desc())\
            .filter(col('VEH_COLOR_ID').isin(['NA']) ==False).select('VEH_COLOR_ID').limit(10)
df4=primarypersondf.groupBy('DRVR_LIC_STATE_ID').count().sort(col("count")\
                    .desc()).filter(col('DRVR_LIC_STATE_ID').isin(['NA','Other','Unknown']) ==False).limit(25)

joindata=df1.join(unitsdf,['CRASH_ID'],'left')
joindata=joindata.join(df2,['CRASH_ID'],'left')

joindata=joindata.na.drop(subset = ['DRVR_LIC_TYPE_ID','CHARGE'])

newdf=df3.join(joindata,['VEH_COLOR_ID'],'left')
newdf=df4.join(newdf,['DRVR_LIC_STATE_ID'],'left')
a8=newdf.groupBy('VEH_MAKE_ID').count().sort(col("count").desc()).select('VEH_MAKE_ID')
print('Top 5 Vehicle Makes where drivers are charged with speeding related offences,has licensed Drivers,\
      uses top 10 used vehicle colours and has car licensed with the Top 25 states with highest number\
      of offences are')
a8.show(5)

Top 5 Vehicle Makes where drivers are charged with speeding related offences,has licensed Drivers,      uses top 10 used vehicle colours and has car licensed with the Top 25 states with highest number      of offences are
+-----------+
|VEH_MAKE_ID|
+-----------+
|       FORD|
|  CHEVROLET|
|     TOYOTA|
|      DODGE|
|     NISSAN|
+-----------+
only showing top 5 rows

