In [0]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('analytics').getOrCreate()

In [0]:
from pyspark.sql.functions import col,when,sum,count,row_number,dense_rank,countDistinct
from pyspark.sql.window import Window

In [0]:
class Analytics:
    def __init__(self,spark,config,data_frames):
        self.spark=spark
        self.config=config
        self.data_frames=data_frames
        self.primary_people_df=self.data_frames['Primary_Person_use']
        self.charges_df=self.data_frames['Charges_use']
        self.units_df=self.data_frames['Units_use']
        self.retrict_df=self.data_frames['Restrict_use']
        self.endorse_df=self.data_frames['Endorse_use']
        self.damage_df=self.data_frames['Damages_use']
    def analytics_1(self):
        """
        Find the number of crashes (accidents) in which number of males killed are greater than 2
        """
        primary_people_df=self.data_frames['Charges_use']
        
        analytics_1_df=self.primary_people_df.groupBy('CRASH_ID').agg(sum(when(((col('PRSN_GNDR_ID')=='MALE')),col('DEATH_CNT')).otherwise(0)).alias('total_death')).filter(col('total_death')>2)
        self.save(analytics_1_df,'analysis_1')
        return analytics_1_df
    def analytics_2(self):
        """
        How many two wheelers are booked for crashes
        """
        analytics_2_df=self.units_df.filter(col('VEH_BODY_STYL_ID')=='MOTORCYCLE').select('VIN').distinct().count()
        schema=['crashed_2_wheeler']
        result=spark.createDataFrame([(analytics_2_df,)],schema)
        self.save(result,'analysis_2')
        return result
    def analytics_3(self):
        """
        Determine the Top 5 Vehicle Makes of the cars present in the crashes in which driver died and Airbags did not deploy.
        """
        driver_airbag=self.primary_people_df.filter((col('PRSN_TYPE_ID')=='DRIVER') & (col('PRSN_AIRBAG_ID')=='NOT DEPLOYED') & (col('PRSN_INJRY_SEV_ID')=='KILLED')).select('CRASH_ID','PRSN_TYPE_ID','PRSN_INJRY_SEV_ID','PRSN_AIRBAG_ID','DEATH_CNT')

        body_style=self.units_df.select('CRASH_ID','UNIT_DESC_ID','VEH_BODY_STYL_ID','VEH_MAKE_ID','VEH_MOD_ID').filter(col('VEH_BODY_STYL_ID').like('%CAR%'))
        cars_joined=driver_airbag.join(body_style,driver_airbag.CRASH_ID==body_style.CRASH_ID,how='inner')
        
        analytics_3_df=cars_joined.groupBy('VEH_MAKE_ID').agg(sum('DEATH_CNT').alias('total_death')).orderBy(col('total_death'),ascending=False).limit(5)
        self.save(analytics_3_df,'analysis_3')
        return analytics_3_df
    def analytics_4(self):
        """
        Determine number of Vehicles with driver having valid licences involved in hit and run

        """
        valid_license=self.primary_people_df.select('CRASH_ID','DRVR_LIC_TYPE_ID').filter((col('PRSN_TYPE_ID')=='DRIVER')&(col('DRVR_LIC_TYPE_ID').like('%DRIVER LIC%'))).distinct()

        hit_run=self.units_df.filter(col('VEH_HNR_FL')=='Y').select('CRASH_ID','VIN','VEH_HNR_FL')
        # hit_run.display()
        analytics_4_df=hit_run.join(valid_license,valid_license.CRASH_ID==hit_run.CRASH_ID,how='inner').select('VIN').distinct().count()
        schema=['vechicle_HNR_valid_licence']
        result=spark.createDataFrame([(analytics_4_df,)],schema)
        self.save(result,'analysis_4')
        return result
    def analytics_5(self):
        """
        Which state has highest number of accidents in which females are not involved? 
        """

        # There is a chances that in one crashId, male and female both are invloved,so we have to remove those accidents also,so that we can have only those accident in which females are invloves
        # This primary_people table is based on people.In this, multiple crashId could be there in rows,becasuse in one accident is assigned with one crashId and in one accidents ,multiple people could be involved.
        female=self.primary_people_df.filter(col('PRSN_GNDR_ID')=='FEMALE').select('CRASH_ID').distinct()
        # Use left anti join to exclude crashes involving females
        filtered_df = self.primary_people_df.join(female, on='CRASH_ID', how='left_anti')
        # filtered_df.display()
        analytics_5_df=filtered_df.groupBy('DRVR_LIC_STATE_ID').agg(countDistinct('CRASH_ID').alias('total_accidents')).orderBy('total_accidents',ascending=False).limit(1).select('DRVR_LIC_STATE_ID')
        self.save(analytics_5_df,'analysis_5')
        return analytics_5_df
    def analytics_6(self):
        """
        Which are the Top 3rd to 5th VEH_MAKE_IDs that contribute to a largest number of injuries including death
        """
        window=Window.orderBy(col('crash_sum').desc())
        total_count_df=self.units_df.withColumn('total_count',(col('TOT_INJRY_CNT')+col('DEATH_CNT')))
        crash_sum=total_count_df.groupBy('VEH_MAKE_ID').agg(sum('total_count').alias('crash_sum'))

        top_3_5=crash_sum.withColumn('rnk',row_number().over(window))
        analytics_6_df=top_3_5.filter(((col('rnk')>=3) & (col('rnk')<=5)))
        self.save(analytics_6_df,'analysis_6')
        return analytics_6_df
    def analytics_7(self):
        """
        For all the body styles involved in crashes, mention the top ethnic user group of each unique body style  

        """
        df1=self.units_df.select('CRASH_ID','VEH_BODY_STYL_ID')
        df2=self.primary_people_df.select('CRASH_ID','PRSN_ETHNICITY_ID')
        join=df1.join(df2,df1.CRASH_ID==df2.CRASH_ID,how='inner')
        # join.display()
        df_counts = join.groupBy("VEH_BODY_STYL_ID", "PRSN_ETHNICITY_ID").agg(count("*").alias("total_count"))
        window_spec = Window.partitionBy("VEH_BODY_STYL_ID").orderBy(col("total_count").desc())

        # Add a rank or row number column based on the window spec
        df_ranked = df_counts.withColumn("rank", row_number().over(window_spec))

        # Filter for the top record within each VEH_BODY_STYL_ID if needed
        analytics_7_df = df_ranked.filter(col("rank") == 1).select('VEH_BODY_STYL_ID','PRSN_ETHNICITY_ID')
        self.save(analytics_7_df,'analysis_7')
        return analytics_7_df
    def analytics_8(self):
        """
        Among the crashed cars, what are the Top 5 Zip Codes with highest number crashes with alcohols as the contributing factor to a crash (Use Driver Zip Code)

        """
        alcohols_df=self.units_df.filter(( (col('CONTRIB_FACTR_1_ID').like('%ALCOHOL%') )| (col('CONTRIB_FACTR_2_ID').like('%ALCOHOL%')) | (col('CONTRIB_FACTR_P1_ID').like('%ALCOHOL%')) ) & (col('VEH_BODY_STYL_ID').like('%CAR%'))).select('CRASH_ID','CONTRIB_FACTR_1_ID','VEH_BODY_STYL_ID').distinct()
        driver=self.primary_people_df.select('CRASH_ID','DRVR_ZIP').distinct()
        alco_driver=alcohols_df.join(driver,driver.CRASH_ID==alcohols_df.CRASH_ID,how='inner').select(alcohols_df['CRASH_ID'].alias('CRASH_ID'), driver['DRVR_ZIP'],'CONTRIB_FACTR_1_ID','VEH_BODY_STYL_ID')
        analytics_8_df=alco_driver.groupBy('DRVR_ZIP').agg(countDistinct('CRASH_ID').alias('crash_count')).orderBy(col('crash_count').desc()).filter(col('DRVR_ZIP').isNotNull()).limit(5)
        self.save(analytics_8_df,'analysis_8')
        return analytics_8_df
    def analytics_9(self):
        """
        Count of Distinct Crash IDs where No Damaged Property was observed and Damage Level (VEH_DMAG_SCL~) is above 4 and car avails Insurance
        """
        damage_9=self.damage_df.filter(col('DAMAGED_PROPERTY').like('%NO DAMAGE%')).select('CRASH_ID')
        # damage_9.display()

        
        unit_filter=self.units_df.select('CRASH_ID','FIN_RESP_TYPE_ID','VEH_DMAG_SCL_1_ID').filter((self.units_df.VEH_DMAG_SCL_1_ID.isin('DAMAGED 6','DAMAGED 5')) & (col('FIN_RESP_TYPE_ID').like('%INSURANCE%')) )
        analytics_9_df=unit_filter.join(damage_9,on='CRASH_ID',how='left_semi').select('CRASH_ID').distinct().count()

        schema=['crashID_count']
        result=spark.createDataFrame([(analytics_9_df,)],schema)
        self.save(result,'analysis_9')
        return result
    def analytics_10(self):
        """
        Determine the Top 5 Vehicle Makes where drivers are charged with speeding related offences, has licensed Drivers, used top 10 used vehicle colours and has car licensed with the Top 25 states with highest number of offences (to be deduced from the data)
        """
        top_state_df=self.primary_people_df.groupBy('DRVR_LIC_STATE_ID').agg(countDistinct('CRASH_ID').alias('total_accidents')).orderBy('total_accidents',ascending=False).limit(25).select('DRVR_LIC_STATE_ID').filter(~col('DRVR_LIC_STATE_ID').isin('NA','Unknown','Other'))
        top_state = [row['DRVR_LIC_STATE_ID'] for row in top_state_df.collect()] 


        license_state_10=self.primary_people_df.select('CRASH_ID','DRVR_LIC_TYPE_ID','DRVR_LIC_STATE_ID').filter((col('PRSN_TYPE_ID')=='DRIVER')&(col('DRVR_LIC_TYPE_ID').like('%DRIVER LIC%')) & (col('DRVR_LIC_STATE_ID').isin(top_state))  ).distinct()
        # license_state_10.display()

        top_color=self.units_df.groupBy('VEH_COLOR_ID').agg(count('*').alias('color_count')).orderBy(col('color_count').desc()).limit(10).select('VEH_COLOR_ID').distinct()
        top_colors = [row['VEH_COLOR_ID'] for row in top_color.collect()] 

        speeding_color=self.units_df.filter(( (col('CONTRIB_FACTR_1_ID').like('%SPEED%') )| (col('CONTRIB_FACTR_2_ID').like('%SPEED%')) | (col('CONTRIB_FACTR_P1_ID').like('%SPEED%')) ) & (col('VEH_COLOR_ID').isin(top_colors))  ).select('CRASH_ID','CONTRIB_FACTR_1_ID','VEH_COLOR_ID','VEH_MAKE_ID')
        # speeding_color.display()

        analytics_10_df=speeding_color.join(license_state_10,on='CRASH_ID').groupBy('VEH_MAKE_ID').agg(count('*').alias('cnt')).orderBy(col('cnt').desc()).limit(5).select('VEH_MAKE_ID')
        self.save(analytics_10_df,'analysis_10')
        return analytics_10_df
    def save(self,result,file_name):
        """save the output
        """
        result_path = os.path.join(self.config['output']['result_path'], f"{file_name}.txt")
        result.write.mode('overwrite').option('header',True).csv(result_path)

    def run_all_analytics(self):
        """
        run all analytics
        """
        self.analytics_1()
        self.analytics_2()
        self.analytics_3()
        self.analytics_4()
        self.analytics_5()
        self.analytics_6()
        self.analytics_7()
        self.analytics_8()
        self.analytics_9()
        self.analytics_10()

