# 1. Working with RDD

##    1.1 Data Preparation and Loading

##### 1. Write the code to create a SparkContext object using SparkSession, which tells Spark how to access a cluster. To create a SparkSession you first need to build a SparkConf object that contains information about your application. Give an appropriate name for your application and run Spark locally with as many working processors as logical cores on your machine .

In [1]:
# Import SparkConf class into program
from pyspark import SparkConf

# local[*]: run Spark in local mode with as many working processors as logical cores on our machine
master = "local[*]"
# The `appName` field is a name to be shown on the Spark cluster UI page
app_name = "Assignment 1"
# Setup configuration parameters for Spark
spark_conf = SparkConf().setMaster(master).setAppName(app_name)

# Import SparkContext and SparkSession classes
from pyspark import SparkContext # Spark
from pyspark.sql import SparkSession # Spark SQL

# Using SparkSession
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')

##### 2. Import all the “Units” csv files from 2015-2019 into a single RDD.

In [2]:
#Importing all the units csv files into a single rdd named units_rdd
units_rdd = sc.textFile('2015_DATA_SA_Units.csv,2016_DATA_SA_Units.csv,2017_DATA_SA_Units.csv,2018_DATA_SA_Units.csv,2019_DATA_SA_Units.csv')

##### 3. Import all the “Crashes” csv files from 2015-2019 into a single RDD.

In [3]:
#Importing all the crashes csv files into a single rdd named crashes_rdd
crashes_rdd = sc.textFile('2015_DATA_SA_Crash.csv,2016_DATA_SA_Crash.csv,2017_DATA_SA_Crash.csv,2018_DATA_SA_Crash.csv,2019_DATA_SA_Crash.csv')

##### 4. For each Units and Crashes RDDs, remove the header rows and display the total count and first 10 records .

In [4]:
# Replace the double quotes("") by nothing otherwise the elements are shown as strings of strings eg."'052'". Thus removing the extra double quotes from the units_rdd.
units_rdd = units_rdd.map(lambda x: x.replace('"',''))

In [5]:
# remove the header row
header = units_rdd.first()

# the filter method is applied to remove the header rows 
units_rdd_1 = units_rdd.filter(lambda x: x != header)

#Display the count of the rdd
print("Total count of the units_rdd:", units_rdd_1.count())

Total count of the units_rdd: 153854


In [6]:
# show 10 records with the Spark *action* take
units_rdd_1.take(10)

['2015-1-21/08/2019,01,0,SA,RIGID TRUCK LGE GE 4.5T,1999,North East,Male,052,SA,HRR ,Full,Not Towing,Swerving,001,5109,,',
 '2015-1-21/08/2019,02,1,SA,Motor Cars - Sedan,2009,North East,Female,057,SA,C ,Full,Not Towing,Straight Ahead,002,5125,,',
 '2015-2-21/08/2019,01,0,SA,Motor Cars - Sedan,2009,South East,Male,020,SA,MR,Provisional 1 ,Not Towing,Straight Ahead,001,5110,,',
 '2015-2-21/08/2019,02,1,SA,Motor Cars - Sedan,1994,South East,Female,021,SA,C ,Full,Not Towing,Stopped on Carriageway,001,5096,,',
 '2015-3-21/08/2019,01,0,SA,Motor Cars - Sedan,2008,North East,Male,023,SA,C ,Full,Not Towing,Straight Ahead,001,5034,,',
 '2015-3-21/08/2019,02,1,SA,Motor Cars - Sedan,2007,North East,Female,025,SA,C ,Full,Not Towing,Stopped on Carriageway,001,5015,,',
 '2015-4-21/08/2019,01,0,SA,Station Wagon,1992,West,Male,059,SA,C R ,Full,Not Towing,Straight Ahead,001,5043,,',
 '2015-4-21/08/2019,02,0,SA,Motor Cars - Sedan,2009,West,Male,040,SA,C ,Full,Not Towing,Stopped on Carriageway,003,5044,,'

In [7]:
# Replace the double quotes("") by nothing otherwise the elements are shown as strings of strings eg."'052'". Thus removing the extra double quotes from the crashes_rdd.
crashes_rdd_1 = crashes_rdd.map(lambda x: x.replace('"',''))

In [8]:
# remove the header row
crashes_header = crashes_rdd_1.first()
# the filter method apply a function to each elemnts. The function output is a boolean value (TRUE or FALSE)
# elements that have output TRUE will be kept.
crashes_rdd_2 = crashes_rdd_1.filter(lambda x: x != crashes_header)
#split columns by a ","
crashes_rdd_3 = crashes_rdd_2.map(lambda x : x.split(','))
#Display the count of the rdd
print("Total count of the crashes_rdd:", crashes_rdd_2.count())

Total count of the crashes_rdd: 72006


In [9]:
# show 10 records of the crashes_rdd_2 with the Spark *action* take
crashes_rdd_3.take(10)

[['2015-1-21/08/2019',
  '2 Metropolitan',
  'ELIZABETH VALE',
  '5112',
  'CITY OF PLAYFORD.',
  '2',
  '1',
  '0',
  '0',
  '1',
  '2015',
  'January',
  'Wednesday',
  '01:00 pm',
  '060',
  'T-Junction',
  'Straight road',
  'Level',
  'Not Applicable',
  'Sealed',
  'Dry',
  'Not Raining',
  'Daylight',
  'Side Swipe',
  '01',
  'Driver Rider',
  '2: MI',
  'No Control',
  '',
  '',
  '1335254.54',
  '1690056.88',
  '13352551690057'],
 ['2015-2-21/08/2019',
  '2 Metropolitan',
  'SALISBURY',
  '5108',
  'CITY OF SALISBURY',
  '2',
  '1',
  '0',
  '0',
  '1',
  '2015',
  'February',
  'Tuesday',
  '03:38 pm',
  '060',
  'Cross Road',
  'Straight road',
  'Level',
  'Not Applicable',
  'Sealed',
  'Dry',
  'Not Raining',
  'Daylight',
  'Rear End',
  '01',
  'Driver Rider',
  '2: MI',
  'Traffic Signals',
  '',
  '',
  '1333389.6',
  '1688248.34',
  '13333901688248'],
 ['2015-3-21/08/2019',
  '2 Metropolitan',
  'ST MARYS',
  '5042',
  'CC MITCHAM.                   ',
  '2',
  '1',

## 1.2 Data Partitioning in RDD

##### 1. How many partitions do the above RDDs have? How is the data in these RDDs partitioned by default, when we do not explicitly specify any partitioning strategy?

In [10]:
print('Default partitions of the units_rdd: ',units_rdd_1.getNumPartitions())

Default partitions of the units_rdd:  5


In [11]:
print('Default partitions of the crashes_rdd: ',crashes_rdd_1.getNumPartitions())

Default partitions of the crashes_rdd:  5


By default when we do not explicitly specify any partitioning strategy, Spark partitions the data using <strong>Random equal partitioning</strong> unless there are specific transformations that uses a different type of partitioning.

##### 2. In the “Units” csv dataset, there is a column called Lic State which shows the state where the vehicle is registered. Assume we want to keep all the data related to SA in one partition and the rest of the data in another partition.
###### a. Create a Key Value Pair RDD with Lic State as the key and rest of the other columns as value.

In [12]:
# Implement function with logic to be applied to the RDDs
def parseRecord(line):
    # Split line separated by comma
    array_line = line.split(',')
    # Return a tuple with the Lic State as first element and the remaining as the second element
    return (array_line[9], array_line[0:8]+array_line[10:])

#The key value pair RDD with Lic State as the key is stored in a new rdd named as units_rdd_2
units_rdd_2 = units_rdd_1.map(parseRecord)

###### b. Write the code to implement this partitioning in RDD using appropriate partitioning functions.

In [13]:
from pyspark.rdd import RDD

#A Function to print the data items in each RDD
def print_partitions(data):
    if isinstance(data, RDD):
        numPartitions = data.getNumPartitions()
        partitions = data.glom().collect()
    else:
        numPartitions = data.rdd.getNumPartitions()
        partitions = data.rdd.glom().collect()
    
    
    for index, partition in enumerate(partitions):
        # show partition if it is not empty
        if len(partition) > 0:
            print(f"Partition {index}: {len(partition)} records")
             

In [14]:
#Hash Function to implement Hash Partitioning 
#The hash function used states that when the key is SA, the total is 2 else it is 3
def hash_function(key):
    total=0
    if(key=='SA'):
        total=2
    else:
        total=3
    return total

In [15]:
#Define the number of partitions
no_of_partitions = 2

In [16]:
# hash partitioning
#As the number of partitions is 2, if the total is 2 i.e. when key is SA => 2%2 = 0 => It goes in partition 0 else 3%2 = 1 => It goes in partition 1.
hash_partitioned_rdd = units_rdd_2.partitionBy(no_of_partitions, hash_function)

###### c. Write the code to print the number of records in each partition. What does it tell about the data skewness?

In [17]:
print("Number of partitions: {}".format(hash_partitioned_rdd.getNumPartitions()))
print_partitions(hash_partitioned_rdd)

Number of partitions: 2
Partition 0: 109684 records
Partition 1: 44170 records


The data is spread unevenly i.e. it is skewed after partitioning with 109684 records in partition 0 and 44170 records in partition 1. Unevenness of data placement is caused by the fact that data value distribution, which is used in the data partitioning function,may well be nonuniform because of the nature of data value distribution. 

## 1.3 Query/Analysis

###### 1. Find the average age of male and female drivers separately.

In [18]:
# Implement function with logic to be applied to the RDDs
def parseRecord(line):
    # Split line separated by comma
    array_line = line.split(',')
    # Return a tuple with the car model as first element and the remaining as the second element
    return (array_line)

units_rdd_1 = units_rdd_1.map(parseRecord)

In [19]:
#A filter method is applied to keep only those records of people with sex as male.
male_units_rdd = units_rdd_1.filter(lambda x: x[7]=='Male')

In [20]:
#A filter method is applied to keep only those records of people with sex as female.
female_units_rdd = units_rdd_1.filter(lambda x: x[7]=='Female')

In [21]:
male_avg_age = male_units_rdd.map(lambda x : x[8]).filter(lambda x : x.isdigit()).map(lambda x: int(x)).mean()
print("Male drivers' average age: "+str(round(male_avg_age,2)))

Male drivers' average age: 40.98


In [22]:
female_avg_age = female_units_rdd.map(lambda x : x[8]).filter(lambda x : x.isdigit()).map(lambda x: int(x)).mean()
print("Female drivers' average age: "+str(round(female_avg_age,2)))

Female drivers' average age: 40.39


###### 2. What is the oldest and the newest vehicle year involved in the accident? Display the Registration State, Year and Unit type of the vehicle.

In [23]:
#Newest vehicle year is the year with the maximum in the vehicle year column
newest_veh_year = units_rdd_1.map(lambda x : x[5]).filter(lambda x : x.isdigit()).map(lambda x: int(x)).max()
print("Newest vehicle year: " +str(newest_veh_year))
#Display the details of the vehicle with the newest vehicle year
newest_year_details = units_rdd_1.filter(lambda x: x[5]==str(newest_veh_year)).map(lambda field: (field[3],field[4],field[5]))
newest_year_details.take(10)

Newest vehicle year: 2019


[('SA', 'Station Wagon', '2019'),
 ('SA', 'OMNIBUS', '2019'),
 ('SA', 'Motor Cars - Sedan', '2019'),
 ('SA', 'Station Wagon', '2019'),
 ('SA', 'SEMI TRAILER', '2019'),
 ('SA', 'Motor Cars - Sedan', '2019'),
 ('SA', 'Motor Cars - Sedan', '2019'),
 ('VIC', 'Station Wagon', '2019'),
 ('SA', 'Station Wagon', '2019'),
 ('SA', 'Utility', '2019')]

In [24]:
#Oldest vehicle year is the year with the maximum in the vehicle year column
oldest_veh_year = units_rdd_1.map(lambda x : x[5]).filter(lambda x : x.isdigit()).map(lambda x: int(x)).min()
print("Oldest vehicle year: " +str(oldest_veh_year))
#Display the details of the vehicle with the oldest vehicle year
oldest_year_details = units_rdd_1.filter(lambda x: x[5]==str(oldest_veh_year)).map(lambda field: (field[3],field[4],field[5]))
oldest_year_details.take(10)

Oldest vehicle year: 1900


[('VIC', 'Motor Cycle', '1900'),
 ('SA', 'Motor Cycle', '1900'),
 ('SA', 'Motor Cycle', '1900'),
 ('SA', 'Motor Cycle', '1900'),
 ('SA', 'Motor Cycle', '1900'),
 ('SA', 'Motor Cycle', '1900'),
 ('SA', 'Motor Cycle', '1900'),
 ('SA', 'RIGID TRUCK LGE GE 4.5T', '1900'),
 ('SA', 'Motor Cycle', '1900'),
 ('SA', 'Motor Cycle', '1900')]

# 2. Working with DataFrames

## 2.1 Data Preparation and Loading

###### 1. Load all units and crash data into two separate dataframes.

In [25]:
#Load all the units csv files in a units_df dataframe
units_df = spark.read.csv("*_DATA_SA_Units.csv", inferSchema = True, header = True)

In [26]:
#Load all the crashes csv files in a crashes_df dataframe
crashes_df = spark.read.csv("*_DATA_SA_Crash.csv", inferSchema = True, header = True)

###### 2. Display the schema of the final two dataframes.

In [27]:
#Schema for units_df
units_df.printSchema()

root
 |-- REPORT_ID: string (nullable = true)
 |-- Unit No: integer (nullable = true)
 |-- No Of Cas: integer (nullable = true)
 |-- Veh Reg State: string (nullable = true)
 |-- Unit Type: string (nullable = true)
 |-- Veh Year: string (nullable = true)
 |-- Direction Of Travel: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Lic State: string (nullable = true)
 |-- Licence Class: string (nullable = true)
 |-- Licence Type: string (nullable = true)
 |-- Towing: string (nullable = true)
 |-- Unit Movement: string (nullable = true)
 |-- Number Occupants: string (nullable = true)
 |-- Postcode: string (nullable = true)
 |-- Rollover: string (nullable = true)
 |-- Fire: string (nullable = true)



In [28]:
#Schema for crashes_df
crashes_df.printSchema()

root
 |-- REPORT_ID: string (nullable = true)
 |-- Stats Area: string (nullable = true)
 |-- Suburb: string (nullable = true)
 |-- Postcode: integer (nullable = true)
 |-- LGA Name: string (nullable = true)
 |-- Total Units: integer (nullable = true)
 |-- Total Cas: integer (nullable = true)
 |-- Total Fats: integer (nullable = true)
 |-- Total SI: integer (nullable = true)
 |-- Total MI: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: string (nullable = true)
 |-- Day: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Area Speed: integer (nullable = true)
 |-- Position Type: string (nullable = true)
 |-- Horizontal Align: string (nullable = true)
 |-- Vertical Align: string (nullable = true)
 |-- Other Feat: string (nullable = true)
 |-- Road Surface: string (nullable = true)
 |-- Moisture Cond: string (nullable = true)
 |-- Weather Cond: string (nullable = true)
 |-- DayNight: string (nullable = true)
 |-- Crash Type: string (nullable = true

## 2.2 Query/Analysis

###### 1. Find all the crash events in Adelaide where the total number of casualties in the event is more than 3.

In [29]:
from pyspark.sql.functions import col

In [30]:
# A filter method is applied to get only the rows with Suburb as Adelaide and total casualities more than 3
crashes_df.filter(col("Suburb") == 'ADELAIDE')\
                .filter(col("Total Cas")>3).show()

+--------------------+----------+--------+--------+----------------+-----------+---------+----------+--------+--------+----+--------+--------+--------+----------+-------------+----------------+--------------+--------------------+------------+-------------+------------+--------+--------------+---------+------------+-------------+---------------+------------+--------------+----------+----------+--------------+
|           REPORT_ID|Stats Area|  Suburb|Postcode|        LGA Name|Total Units|Total Cas|Total Fats|Total SI|Total MI|Year|   Month|     Day|    Time|Area Speed|Position Type|Horizontal Align|Vertical Align|          Other Feat|Road Surface|Moisture Cond|Weather Cond|DayNight|    Crash Type|Unit Resp| Entity Code|CSEF Severity|  Traffic Ctrls|DUI Involved|Drugs Involved|  ACCLOC_X|  ACCLOC_Y|    UNIQUE_LOC|
+--------------------+----------+--------+--------+----------------+-----------+---------+----------+--------+--------+----+--------+--------+--------+----------+-------------+

###### 2. Display 10 crash events with highest casualties .

In [31]:
#10 events with highest casualities is shown by sorting the dataframe
crashes_df_desc = crashes_df.select("*").sort('Total Cas', ascending=False).show(10)

+--------------------+--------------+---------------+--------+--------------------+-----------+---------+----------+--------+--------+----+--------+---------+--------+----------+-------------+--------------------+--------------+--------------+------------+-------------+------------+--------+-----------+---------+------------+-------------+---------------+------------+--------------+----------+----------+--------------+
|           REPORT_ID|    Stats Area|         Suburb|Postcode|            LGA Name|Total Units|Total Cas|Total Fats|Total SI|Total MI|Year|   Month|      Day|    Time|Area Speed|Position Type|    Horizontal Align|Vertical Align|    Other Feat|Road Surface|Moisture Cond|Weather Cond|DayNight| Crash Type|Unit Resp| Entity Code|CSEF Severity|  Traffic Ctrls|DUI Involved|Drugs Involved|  ACCLOC_X|  ACCLOC_Y|    UNIQUE_LOC|
+--------------------+--------------+---------------+--------+--------------------+-----------+---------+----------+--------+--------+----+--------+------

###### 3. Find the total number of fatalities for each crash type.

In [32]:
import pyspark.sql.functions as F
#The crashes_df is grouped by the crash type and then the fatalities are counted for each of the crash type and then the data is sorted in the descending order
agg_attribute = 'Crash Type'
sum_attribute = 'Total Fats'
crashes_fats_df = crashes_df.groupby(agg_attribute).agg(F.sum(sum_attribute).alias('Total Fatalities')).sort('Total Fatalities', ascending = False)

In [33]:
crashes_fats_df.show()

+--------------------+----------------+
|          Crash Type|Total Fatalities|
+--------------------+----------------+
|    Hit Fixed Object|             152|
|             Head On|              86|
|      Hit Pedestrian|              70|
|           Roll Over|              57|
|         Right Angle|              45|
|          Side Swipe|              20|
|          Right Turn|              18|
|            Rear End|              16|
|  Hit Parked Vehicle|               9|
|          Hit Animal|               4|
|  Hit Object on Road|               2|
|               Other|               2|
|Left Road - Out o...|               1|
+--------------------+----------------+



###### 4. Find the total number of casualties for each suburb when the vehicle was driven by an unlicensed driver. You are required to display the name of the suburb and the total number of casualties.

In [34]:
#the crashes and the units dataframes are joined on the Report_ID column 
crashes_units_df = units_df.join(crashes_df,units_df['Report_ID']==crashes_df['Report_ID'])

In [35]:
import pyspark.sql.functions as F
#The joined dataframe is filtered for the unlicenced licence type and then grouped by suburb and then the casualities are calculated for each suburb and the data is sorted in descending order.
agg_attribute = 'Suburb'
sum_attribute = 'Total Cas'
crashes_units_cas_df = crashes_units_df.filter(col("Licence Type") == 'Unlicenced').groupby(agg_attribute).agg(F.sum(sum_attribute).alias('Total Casualities')).sort('Total Casualities', ascending = False)

In [36]:
crashes_units_cas_df.show(15)

+---------------+-----------------+
|         Suburb|Total Casualities|
+---------------+-----------------+
|       ADELAIDE|               19|
|      DRY CREEK|               18|
|      SALISBURY|               18|
| SALISBURY EAST|               16|
|       PROSPECT|               14|
| NORTH ADELAIDE|               13|
|   ANDREWS FARM|               12|
|        ENFIELD|               12|
|SALISBURY SOUTH|               11|
|SALISBURY DOWNS|               11|
|   BEDFORD PARK|               11|
|     INGLE FARM|               11|
|     MUNNO PARA|               10|
|         BURTON|               10|
|   MOUNT BARKER|               10|
+---------------+-----------------+
only showing top 15 rows



## 2.3 Severity Analysis

###### 1. Find the total number of crash events for each severity level. Which severity level is the most common?

In [37]:
import pyspark.sql.functions as F
#Aggregate the data by security level and count the total fatalities for each severity level
agg_attribute = 'CSEF Severity'
sum_attribute = 'Total Fats'
crashes_fats_df = crashes_df.groupby(agg_attribute).agg(F.count(agg_attribute).alias('Total Crash Events')).sort('Total Crash events', ascending = False)

In [38]:
crashes_fats_df.show(4)

+-------------+------------------+
|CSEF Severity|Total Crash Events|
+-------------+------------------+
|       1: PDO|             46696|
|        2: MI|             21881|
|        3: SI|              2978|
|     4: Fatal|               451|
+-------------+------------------+



1:PDO i.e. the Property Damage Only is the most common severity level.

###### 2. Compute the total number of crash events for each severity level and the percentage for the four different scenarios.

###### a. When the driver is tested positive on drugs.

In [39]:
from pyspark.sql.functions import when, col, count

In [40]:
import pyspark.sql.functions as F
#Aggregate the data by security level and count the total crash events for each severity level 
agg_attribute = 'CSEF Severity'
sum_attribute = 'Drugs Involved'
crashes_drugs_df = crashes_df.groupby(agg_attribute).agg(count(when(col("Drugs Involved") == 'Y',1)).alias('Count'))
#total number of crash events
total = crashes_drugs_df.select(F.sum('Count')). collect()[0][0]                       
#Add the percentage column when driver is tested positive on drugs to the dataframe
crashes_drugs_df = crashes_drugs_df.withColumn('Percentage',col('Count')*100/total)

In [41]:
crashes_drugs_df.show(5)

+-------------+-----+------------------+
|CSEF Severity|Count|        Percentage|
+-------------+-----+------------------+
|     4: Fatal|   82| 6.539074960127592|
|        2: MI|  749|59.728867623604465|
|       1: PDO|  176|14.035087719298245|
|        3: SI|  247|19.696969696969695|
+-------------+-----+------------------+



###### b. When the driver is tested positive for blood alcohol concentration.

In [42]:
import pyspark.sql.functions as F
#Aggregate the data by security level and count the total crash events for each severity level 
agg_attribute = 'CSEF Severity'
sum_attribute = 'DUI Involved'
crashes_dui_df = crashes_df.groupby(agg_attribute).agg(count(when(col("DUI Involved") == 'Y',1)).alias('Count'))
#total number of crash events
total = crashes_dui_df.select(F.sum('Count')). collect()[0][0]                       
#Add the percentage column when driver is tested positive on alcohol to the dataframe
crashes_dui_df = crashes_dui_df.withColumn('Percentage',col('Count')*100/total)

In [43]:
crashes_dui_df.show(5)

+-------------+-----+------------------+
|CSEF Severity|Count|        Percentage|
+-------------+-----+------------------+
|     4: Fatal|   79|  3.51423487544484|
|        2: MI|  737|  32.7846975088968|
|       1: PDO| 1173|52.179715302491104|
|        3: SI|  259| 11.52135231316726|
+-------------+-----+------------------+



###### c. When the driver is tested positive for both drugs and blood alcohol

In [44]:
import pyspark.sql.functions as F

agg_attribute = 'CSEF Severity'
#Aggregate the data by security level and count the total crash events for each severity level 
crashes_drugs_dui_df = crashes_df.groupby(agg_attribute).agg(count(when((col("Drugs Involved") == "Y") & (col("DUI Involved") == "Y"),1)).alias('Count'))
#total number of crash events
total = crashes_drugs_dui_df.select(F.sum('Count')). collect()[0][0] 
#Add the percentage column when driver is tested positive on drugs and alcohol to the dataframe
crashes_drugs_dui_df = crashes_drugs_dui_df.withColumn('Percentage',col('Count')*100/total)

In [45]:
crashes_drugs_dui_df.show(5)

+-------------+-----+------------------+
|CSEF Severity|Count|        Percentage|
+-------------+-----+------------------+
|     4: Fatal|   27|15.428571428571429|
|        2: MI|   89|50.857142857142854|
|       1: PDO|   24|13.714285714285714|
|        3: SI|   35|              20.0|
+-------------+-----+------------------+



###### d. When the driver is tested negative for both (no alcohol and no drugs).

In [46]:
import pyspark.sql.functions as F
#Aggregate the data by security level and count the total crash events for each severity level 
agg_attribute = 'CSEF Severity'
crashes_drugs_dui_neg_df = crashes_df.groupby(agg_attribute).agg(count(when((col("Drugs Involved").isNull()) & (col("DUI Involved").isNull()),1)).alias('Count'))
#total number of crash events
total = crashes_drugs_dui_neg_df.select(F.sum('Count')). collect()[0][0]                       
#Add the percentage column when driver is tested negative on drugs and alcohol to the dataframe
crashes_drugs_dui_neg_df = crashes_drugs_dui_neg_df.withColumn('Percentage',col('Count')*100/total)

In [47]:
crashes_drugs_dui_neg_df.show(5)

+-------------+-----+------------------+
|CSEF Severity|Count|        Percentage|
+-------------+-----+------------------+
|     4: Fatal|  317|0.4615675825215859|
|        2: MI|20484|29.825710916000524|
|       1: PDO|45371| 66.06240626683557|
|        3: SI| 2507|3.6503152346423215|
+-------------+-----+------------------+



###### Compare the results in these 4 scenarios. Briefly explain the observation from this analysis.

1:PDO i.e. Property Damage Only is the most common when the driver is on alcohol or is sober i.e. not under the influence of drugs or alcohol. 2:MI i.e. Minor Injury is prevalent when the driver is on drugs or both i.e. on drugs as well as alcohol. A 3: SI severity level of severe injury is caused mostly when the driver is under the influence of drugs or under the influence of both drugs and alcohol. Crashes lead to fatality i.e. severity level 4 the maximum times when the driver is under the influence of both drugs and alcohol.

## 2.4 RDDs vs DataFrame vs Spark SQL

###### 1. Find the Date1 and Time of Crash, Number of Casualties in each unit and the Gender, Age, License Type of the unit driver for the suburb "Adelaide".

###### RDD Approach:

In [48]:
%%time
#get the columns needed from the units_rdd
units_details = units_rdd_1.map(lambda x: (x[0], [x[2], x[7], x[8], x[11]]))
#filter method is applied to suburbs that are Adelaide and then date, time and total casualities are mapped from the carshes_rdd 
crashes_adelaide_details = crashes_rdd_3.filter(lambda x: x[0:][2] == 'ADELAIDE').map(lambda x: (x[0],[x[10]+'-'+x[11]+'-'+x[12],x[13]]))
#Both the rdd's having the required details are joined
crashes_units_details_rdd = units_details.join(crashes_adelaide_details)
crashes_units_details_rdd = crashes_units_details_rdd.map(lambda x:  [x[1][1][0], x[1][1][1], x[1][0][0], x[1][0][1], x[1][0][2], x[1][0][3]])
#print the top 10 rows
crashes_units_details_rdd.take(10)

CPU times: user 58 ms, sys: 9.16 ms, total: 67.1 ms
Wall time: 4.22 s


[['2015-March-Saturday', '08:00 pm', '0', 'Male', '042', 'Full'],
 ['2015-October-Tuesday', '10:00 am', '0', 'Male', '024', 'Full'],
 ['2015-October-Tuesday', '10:00 am', '0', '', '', ''],
 ['2015-October-Tuesday', '10:00 am', '0', '', '', ''],
 ['2015-October-Thursday', '01:10 pm', '0', 'Female', '040', ''],
 ['2015-October-Thursday', '01:10 pm', '0', 'Female', 'XXX', 'Unknown'],
 ['2015-December-Friday', '05:50 pm', '0', 'Male', '065', 'Full'],
 ['2015-December-Friday', '05:50 pm', '0', 'Female', '035', 'Full'],
 ['2015-February-Saturday', '04:00 pm', '1', 'Female', '030', 'Full'],
 ['2015-February-Saturday', '04:00 pm', '0', 'Male', '049', 'Full']]

###### Dataframe approach:

In [49]:
from pyspark.sql.functions import col, concat, lit

In [50]:
%%time
#join the crashes and the units dataframes on the report id column
crashes_units_df = crashes_df.join(units_df,units_df['Report_ID'] == crashes_df['Report_ID'])
#Filter the rows with suburb as Adelaide and select the gender, age, licence type, number of casualities
crashes_by_unit_df =  crashes_units_df.filter(col("Suburb") == "ADELAIDE")
crashes_by_unit_df = crashes_by_unit_df.select(concat(col("Year"), lit("-"), col("Month"),lit("-"),col("Day")).alias('Date'),'Time','No of Cas',col('Sex').alias("Gender"),'Age','Licence Type')
#dataframe is ordered by the date
crashes_by_unit_df = crashes_by_unit_df.orderBy(crashes_by_unit_df.Date.asc())
crashes_by_unit_df.show(truncate = False)

+-------------------+--------+---------+-------+----+--------------+
|Date               |Time    |No of Cas|Gender |Age |Licence Type  |
+-------------------+--------+---------+-------+----+--------------+
|2015-April-Friday  |03:44 pm|0        |null   |null|null          |
|2015-April-Friday  |03:44 pm|1        |Male   |016 |null          |
|2015-April-Friday  |03:44 pm|0        |null   |null|null          |
|2015-April-Monday  |02:22 pm|0        |Male   |040 |Unlicenced    |
|2015-April-Monday  |12:03 pm|0        |Male   |049 |Full          |
|2015-April-Monday  |09:05 am|1        |Male   |067 |Full          |
|2015-April-Monday  |09:05 am|0        |Male   |036 |Full          |
|2015-April-Monday  |02:22 pm|0        |Female |045 |Provisional 1 |
|2015-April-Monday  |12:03 pm|0        |Male   |040 |Full          |
|2015-April-Monday  |03:15 pm|0        |Male   |027 |Full          |
|2015-April-Monday  |03:15 pm|0        |Unknown|XXX |Unknown       |
|2015-April-Monday  |03:15 pm|1   

###### SQL Approach:

In [51]:
#a sql view is created for both crashes_df as well as units_df
crashes_df.createOrReplaceTempView("sql_crashes")
units_df.createOrReplaceTempView("sql_units")

In [52]:
%%time
#Join sql_crashes and sql_units on report_id, filter the rows with suburb as Adelaide and select the gender, age, licence type,number of casualities and order by the date
sql_crashes_units = spark.sql('''select concat(c.Year,"-",c.Month,"-",c.Day) as Date, c.Time, c.`Total Cas` as Total_Casualities, u.`Unit No`, u.Sex as Gender, u.Age, u.`Licence Type` 
                    from sql_crashes c JOIN sql_units u
                    on c.Report_ID = u.Report_ID
                    where c.Suburb = "ADELAIDE"
                    Group By c.Year, c.Month, c.Day, c.Time, u.`Unit No`,u.Sex, u.Age, u.`Licence Type`, Total_Casualities
                    order by Date desc, Time desc''')
sql_crashes_units.show(truncate = False)

+------------------------+--------+-----------------+-------+-------+---+------------+
|Date                    |Time    |Total_Casualities|Unit No|Gender |Age|Licence Type|
+------------------------+--------+-----------------+-------+-------+---+------------+
|2019-September-Wednesday|11:40 am|0                |2      |Unknown|XXX|Unknown     |
|2019-September-Wednesday|11:40 am|0                |1      |Male   |056|Full        |
|2019-September-Wednesday|11:38 am|1                |2      |Male   |073|null        |
|2019-September-Wednesday|11:38 am|1                |1      |Male   |029|Full        |
|2019-September-Wednesday|07:55 pm|1                |1      |Female |031|null        |
|2019-September-Wednesday|07:55 pm|1                |2      |Unknown|XXX|null        |
|2019-September-Wednesday|06:50 pm|0                |1      |Female |075|Full        |
|2019-September-Wednesday|06:50 pm|0                |2      |Male   |039|Full        |
|2019-September-Wednesday|06:00 pm|1       

###### 2. Find the total number of casualties for each suburb when the vehicle was driven by an unlicensed driver. You are required to display the name of the suburb and the total number of casualties.

###### RDD Approach:

In [53]:
from tssplit import tssplit

In [54]:
%%time
crash_rdd2 = crashes_rdd.map(lambda x: tssplit(x, quote='"', delimiter=',')).\
   filter(lambda x: x[0] != 'REPORT_ID')
units_cas = units_rdd_1.filter(lambda x: x[11]=='Unlicenced').map(lambda x: (x[0], x[11]))
crashes_cas = crash_rdd2.map(lambda x: (x[0], [x[2],x[6]]))
suburb_casualities = crashes_cas.join(units_cas)

suburb_casualities = suburb_casualities.map(lambda x: (x[1][0][0],int(x[1][0][1]))).groupByKey().mapValues(list).map(lambda x: (x[0], sum([int(i) for i in x[1]])))
suburb_casualities.sortBy(lambda x:-x[1]).take(10)

CPU times: user 149 ms, sys: 21.4 ms, total: 170 ms
Wall time: 10.3 s


[('ADELAIDE', 19),
 ('DRY CREEK', 18),
 ('SALISBURY', 18),
 ('SALISBURY EAST', 16),
 ('PROSPECT', 14),
 ('NORTH ADELAIDE', 13),
 ('ANDREWS FARM', 12),
 ('ENFIELD', 12),
 ('INGLE FARM', 11),
 ('SALISBURY SOUTH', 11)]

###### Dataframe Approach:

In [55]:
%%time
#the crashes_df and units_df are joined on the Report_ID column
crashes_units_df = crashes_df.join(units_df,units_df['Report_ID']==crashes_df['Report_ID'])
import pyspark.sql.functions as F
#The joined dataframe is filtered for the unlicenced licence type and then aggregated by suburb and then the sum of casualities 
#is calculated for each suburb and the data is sorted in descending order.
agg_attribute = 'Suburb'
sum_attribute = 'Total Cas'
crashes_units_cas_df = crashes_units_df.filter(col("Licence Type") == 'Unlicenced').groupby(agg_attribute)\
                        .agg(F.sum(sum_attribute).alias('Total_Casualities')).sort('Total_Casualities', ascending = False)

CPU times: user 14.5 ms, sys: 2.11 ms, total: 16.6 ms
Wall time: 288 ms


In [56]:
crashes_units_cas_df.show(10)

+---------------+-----------------+
|         Suburb|Total_Casualities|
+---------------+-----------------+
|       ADELAIDE|               19|
|      DRY CREEK|               18|
|      SALISBURY|               18|
| SALISBURY EAST|               16|
|       PROSPECT|               14|
| NORTH ADELAIDE|               13|
|   ANDREWS FARM|               12|
|        ENFIELD|               12|
|SALISBURY DOWNS|               11|
|   BEDFORD PARK|               11|
+---------------+-----------------+
only showing top 10 rows



###### SQL Approach:

In [57]:
#a sql view is created for both crashes_df as well as units_df
crashes_df.createOrReplaceTempView("sql_crashes")
units_df.createOrReplaceTempView("sql_units")

In [58]:
%%time
#The sql_crashes and sql_units are joined on Report_ID and then filtered for the unlicenced licence type and then grouped by 
#suburb and then the casualities are calculated for each suburb and the data is sorted in descending order.

sql_casualities = spark.sql('''
  SELECT c.Suburb,sum(c.`Total Cas`) as Total_Casualities
  FROM sql_crashes c JOIN sql_units u
  ON c.REPORT_ID = u.REPORT_ID 
  where u.`Licence Type`= 'Unlicenced' 
  group by c.Suburb
  order by Total_Casualities desc
''')

CPU times: user 148 µs, sys: 4.31 ms, total: 4.46 ms
Wall time: 115 ms


In [59]:
#print only top 10 rows
sql_casualities.show(10)

+---------------+-----------------+
|         Suburb|Total_Casualities|
+---------------+-----------------+
|       ADELAIDE|               19|
|      SALISBURY|               18|
|      DRY CREEK|               18|
| SALISBURY EAST|               16|
|       PROSPECT|               14|
| NORTH ADELAIDE|               13|
|   ANDREWS FARM|               12|
|        ENFIELD|               12|
|SALISBURY DOWNS|               11|
|     INGLE FARM|               11|
+---------------+-----------------+
only showing top 10 rows



From 2.4.1, RDD takes 91.2 ms to complete its operations, Dataframe takes 21 ms and SparkSQL takes 10.6 ms to perform the operations. For 2.4.2, RDD takes 145 ms to complete its operations, Dataframe takes 12.7 ms and SparkSQL takes 3.07 ms to perform the operations. It can be observed that the SQL approach takes the least amount of time in comparison to the RDD Approach and the dataframe Approach. RDDs take the maximum amount of time to perform the operations