In [None]:
import yaml


def read_yaml(file_path):
    '''
        Reads data from config yaml file

        Parameters : 
            file_path : File path to yaml config file

        Returns : 
            Dict with config details

    '''
    try : 
        with open(file_path, 'r') as file:
            return yaml.safe_load(file)

    except Exception as e : 
        print(f'Error occurred while loading data from yaml file with following exception {e}')


def read_csv_data(spark, file_path):
    '''
        Reads data in csv format

        Parameters : 
            spark : Spark instance 
            file_path : File path containing csv data
        
        Returns : 
            Dataframe : Spark Dataframe containing input csv file data
    '''
    try : 
        return spark.read.option('header', 'True').option('inferSchema', 'true').csv(file_path)

    except Exception as e : 
        print(f'Error occurred while reading data from file with following exception {e}')



def write_csv_data(df, file_path):
    '''
        Write output dataframe to csv file

        Parameters : 
            df : Dataframe containing the results
            file_path : Path to file in which data needs to be stored

        Returns : 
            None
    '''
    try : 
        df.write.coalesce(1).format('csv').mode('overwrite').option('header', 'true').save(file_path)
    
    except Exception as e : 
        print(f'Error occurred while writing df data to csv file with following exception {e}')




In [1]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, row_number
from utilities import read_yaml , read_csv_data  , write_csv_data 


In [2]:
from pyspark.sql.functions import lit , col , dense_rank , sum , count


In [3]:
spark = SparkSession \
        .builder \
        .appName('CarCrashAnalysis') \
        .getOrCreate()

24/01/07 23:12:25 WARN Utils: Your hostname, Sigmoids-MacBook-Air-4.local resolves to a loopback address: 127.0.0.1; using 192.168.29.131 instead (on interface en0)
24/01/07 23:12:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/07 23:12:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
config_file_path = '../config.yaml'
    # spark.sparkContext.setLogLevel("ERROR")



In [12]:
input_source_paths = read_yaml(config_file_path)['input_source']


In [13]:
input_source_paths

{'type': 'csv',
 'Charges': '../Data/Charges_use.csv',
 'Damages': '../Data/Damages_use.csv',
 'Endorse': '../Data/Endorse_use.csv',
 'Primary_Person': '../Data/Primary_Person_use.csv',
 'Restrict': '../Data/Restrict_use.csv',
 'Unit': '../Data/Units_use.csv'}

In [6]:
output_file_paths = read_yaml(config_file_path)['output_destination']['analysis1']

In [7]:
charges_df = read_csv_data(spark, input_source_paths['Charges'])
damages_df = read_csv_data(spark, input_source_paths['Damages'])
endorse_df = read_csv_data(spark, input_source_paths['Endorse'])
primary_person_df = read_csv_data(spark, input_source_paths['Primary_Person'])
units_df = read_csv_data(spark, input_source_paths['Unit'])
restrict_df = read_csv_data(spark, input_source_paths['Restrict'])

                                                                                

In [None]:
charges_df.count()

In [None]:
charges_speed_filtered_df = charges_df.filter(col('CHARGE').like('%SPEED%')).select('CRASH_ID','UNIT_NBR','CHARGE')

In [None]:
charges_speed_filtered_df

In [None]:
primary_person_licensed_df

In [None]:
primary_person_licensed_df = primary_person_df.filter(~col('DRVR_LIC_CLS_ID').like('UNLICENSED')).select('CRASH_ID','UNIT_NBR', 'DRVR_LIC_CLS_ID')

In [8]:
units_top10_color_df = units_df.filter(col('VEH_COLOR_ID')!='NA').groupBy('VEH_COLOR_ID').agg(count('*').alias('VEH_COLOR_COUNT'))

In [9]:
spec_color = Window().orderBy(col('VEH_COLOR_COUNT').desc())
units_top10_color_df = units_top10_color_df.withColumn('TOP10_VEH_COLOR', dense_rank().over(spec_color)).filter(col('TOP10_VEH_COLOR')<=10)

In [None]:
units_top25_state_df = units_df.filter(col('VEH_LIC_STATE_ID')!='NA').groupBy('VEH_LIC_STATE_ID').agg(count('*').alias('VEH_STATE_COUNT'))   

In [None]:
spec_state = Window().orderBy(col('VEH_STATE_COUNT').desc())
units_top25_state_df = units_top25_state_df.withColumn('TOP25_VEH_STATE', dense_rank().over(spec_state)).filter(col('TOP25_VEH_STATE')<=25)

In [None]:
units_filtered_color_state_df = units_df.filter(col('VEH_COLOR_ID').isin([i[0] for i in units_top10_color_df.collect()])) \
    .filter(col('VEH_LIC_STATE_ID').isin([i[0] for i in units_top25_state_df.collect()])).select('CRASH_ID' , 'UNIT_NBR', 'VEH_MAKE_ID')

In [None]:
print(charges_speed_filtered_df.count(),primary_person_licensed_df.count(),  units_filtered_color_state_df.count())

In [None]:
final_units_charges_licensed_drivers_df = units_filtered_color_state_df.join(charges_speed_filtered_df , on=['CRASH_ID' , 'UNIT_NBR'], how='inner') \
    .join(primary_person_licensed_df , on=['CRASH_ID' , 'UNIT_NBR'], how='inner')

In [None]:
final_units_charges_licensed_drivers_df = final_units_charges_licensed_drivers_df.groupBy('VEH_MAKE_ID').agg(count('*').alias('VEH_MAKE_ID_COUNT'))

In [None]:
spec_make_id = Window().orderBy(col('VEH_MAKE_ID_COUNT').desc())


In [None]:
final_units_charges_licensed_drivers_df = final_units_charges_licensed_drivers_df.withColumn('RANK', dense_rank().over(spec_make_id)).filter(col('RANK')<=5)

In [None]:
final_units_charges_licensed_drivers_df.show()

In [None]:
[i[0] for i in final_units_charges_licensed_drivers_df.collect()]

In [None]:
write_csv_data(final_units_charges_licensed_drivers_df, output_file_paths)


In [None]:
[(i[0] , i[1]) for i in top_ethinc_vehicle_body_wise.collect()]


In [None]:
class CarCrashAnalysis:
    def __init__(self, config_file_path):
        input_source_paths = read_yaml(config_file_path)['input_source']
        self.charges_df = read_csv_data(spark, input_source_paths['Charges'])
        self.damages_df = read_csv_data(spark, input_source_paths['Damages'])
        self.endorse_df = read_csv_data(spark, input_source_paths['Endorse'])
        self.primary_person_df = read_csv_data(spark, input_source_paths['Primary_Person'])
        self.units_df = read_csv_data(spark, input_source_paths['Unit'])
        self.restrict_df = read_csv_data(spark, input_source_paths['Restrict'])

    def count_male_accidents(self, output_destination_path):
        """
        Finds the crashes (accidents) in which number of persons killed are male
        :param output_path: output file path
        :param output_format: Write file format
        :return: dataframe count
        """
        df = self.primary_person_df.filter(self.primary_person_df.PRSN_GNDR_ID == "MALE")
        write_csv_data(df, output_destination_path)
        return df.count()

In [None]:
if __name__ == '__main__':
    # Initialize sparks session
    spark = SparkSession \
        .builder \
        .appName('CarCrashAnalysis') \
        .getOrCreate()

    config_file_path = 'config.yaml'
    # spark.sparkContext.setLogLevel("ERROR")

    usvaa = CarCrashAnalysis(config_file_path)
    output_file_paths = read_yaml(config_file_path)['output_destination']['analysis1']
    # file_format = read_yaml(config_file_path).get("FILE_FORMAT")

    # 1. Find the number of crashes (accidents) in which number of persons killed are male?
    print("1. Result:", usvaa.count_male_accidents(output_file_paths))

In [None]:
output_file_paths

In [None]:
input_source_paths = read_yaml(config_file_path)['input_source']

In [None]:
primary_person_df = read_csv_data(spark, input_source_paths['Primary_Person'])

In [None]:
primary_person_df = primary_person_df.repartition(1)
primary_person_df.write.format('csv').mode('overwrite').option('header', 'true').save(output_file_paths)