# FIT5202 - Data processing for Big data
## Semester 2, 2020
***

# Assignment 1 - Analyzing Road Crash Data
<br>

**Background**

The Department of Planning, Transport and Infrastructure (DPTI), South Australia collects data from various road crashes for further analysis in an endeavor to improve road safety. Over time, the data increases in size; the increase in the number of vehicles also contributes to huge amounts of data. As we look across multiple states, we can imagine a rather large set of data. Here, we want to employ various operations on the dataset using Spark to answer different queries.<br><br>
**Required Datasets:**<br>
Two datasets: Unit and Crash datasets from year 2015 - 2019

## Student Information
*** 
Name: <br>
<span style="color: brown">Abhilash Anil Kale</span><br><br>
Student number: <br>
<span style="color: brown">30254140</span><br><br>
Tutorial number:<br>
<span style="color: brown">16-P2</span><br><br>
Tutors:<br>
<span style="color: brown">Prajwol Sangat & Neha Jain</span><br><br>
Environment:<br>
<span style="color: brown">Python *3.8.2* and Spark *3.0* </span>

# Part A: Working with RDDs and DataFrames

## Table of Contents

* [1. Working with RDD](#1)
    * [1.1 Data Preparation and Loading](#1.1)
    * [1.2 Data Partitioning in RDD](#1.2)
    * [1.3 Query/Analysis](#1.3)
* [2. Working with DataFrames](#2)
    * [2.1 Data Preparation and Loading](#2.1)
    * [2.2 Query/Analysis](#2.2)
    * [2.3 Severity Analysis](#2.3)
    * [Crash Severity Analysis - For Part B](#b)
    * [2.4 RDDs vs DataFrame vs Spark SQL](#2.4)

# 1. Working with RDD <a class="anchor" id="1"></a>
## 1.1 Data Preparation and Loading <a class="anchor" id="1.1"></a>

 <div class="alert alert-block alert-info">
1. Write the code to create a SparkContext object using SparkSession, which tells Spark
how to access a cluster. To create a SparkSession you first need to build a SparkConf
object that contains information about your application. Give an appropriate name for
your application and run Spark locally with as many working processors as logical
cores on your machine .

In [1]:
# Import SparkConf class into program
from pyspark import SparkConf

# Run Spark locally with as many working processors as logical cores on the machine
master = 'local[*]'
# Setup an appropriate application name
app_name = 'Analyzing Road Crash Data'
# Setup configuration parameters for Spark
spark_conf = SparkConf().setMaster(master).setAppName(app_name)

# Import SparkSession classes 
from pyspark.sql import SparkSession # Spark SQL

# Initialize Spark Session and create a SparkContext Object
spark = SparkSession.builder.config(conf = spark_conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')

In [2]:
# Import the required necessary libraries throughout the notebook
import csv
import pandas as pd
import numpy as np

from pyspark.rdd import RDD
import pyspark.sql.functions as f
from pyspark.sql.window import Window

 <div class="alert alert-block alert-info">
2. Import all the “Units” csv files from 2015-2019 into a single RDD.<br>

In [3]:
# Import all the Units csv files into a single RDD
rdd_units = sc.textFile('*Units.csv').mapPartitions(lambda x: csv.reader(x))

 <div class="alert alert-block alert-info">
3. Import all the “Crashes” csv files from 2015-2019 into a single RDD.

In [4]:
# Import all the Crashes csv files into a single RDD
rdd_crash = sc.textFile('*Crash.csv').mapPartitions(lambda x: csv.reader(x))

 <div class="alert alert-block alert-info">
4. For each Units and Crashes RDDs, remove the header rows and display the total
count and first 10 records.

In [5]:
# Extract the header from the RDD
header_units = rdd_units.first()
# Filter out the header from the RDD
rdd_units = rdd_units.filter(lambda x : x != header_units)

# Print out the total count and show the first 10 rows of the RDD
print('Total Count of Units: {}'.format(rdd_units.count()), '\n')
rdd_units.take(10)

Total Count of Units: 153854 



[['2016-1-15/08/2019',
  '01',
  '0',
  'SA',
  'OMNIBUS',
  '2011',
  'North',
  'Male',
  '056',
  'SA',
  'HR',
  'Full',
  'Not Towing',
  'Straight Ahead',
  '010',
  '5121',
  '',
  ''],
 ['2016-1-15/08/2019',
  '02',
  '1',
  '',
  'Pedestrian on Road',
  '',
  'East',
  'Male',
  '072',
  '',
  '',
  '',
  '',
  'Walking on Road',
  '',
  '5084',
  '',
  ''],
 ['2016-2-15/08/2019',
  '01',
  '0',
  'SA',
  'Motor Cars - Sedan',
  '2004',
  'Unknown',
  'Female',
  '023',
  'SA',
  'C ',
  'Full',
  'Not Towing',
  'Straight Ahead',
  '001',
  '5087',
  '',
  ''],
 ['2016-2-15/08/2019',
  '02',
  '0',
  'SA',
  'Station Wagon',
  '2008',
  'Unknown',
  'Male',
  '040',
  'SA',
  'C ',
  'Full',
  'Not Towing',
  'Straight Ahead',
  '001',
  '5084',
  '',
  ''],
 ['2016-3-15/08/2019',
  '01',
  '0',
  'SA',
  'RIGID TRUCK LGE GE 4.5T',
  '1990',
  'South',
  'Unknown',
  'XXX',
  'SA',
  'MR',
  'Provisional 2',
  'Not Towing',
  'Straight Ahead',
  '001',
  '5115',
  '',
  ''],


In [6]:
# Extract the header from the RDD
header_crash = rdd_crash.first() #extract header
# Filter out the header from the RDD
rdd_crash = rdd_crash.filter(lambda x : x != header_crash)

# Print out the total count and show the first 10 rows of the RDD
print('Total Count of Crashes: {}'.format(rdd_crash.count()), '\n')
rdd_crash.take(10)

Total Count of Crashes: 72006 



[['2019-1-8/07/2020',
  '2 Metropolitan',
  'HAMPSTEAD GARDENS',
  '5086',
  'CITY OF PORT ADELAIDE ENFIELD',
  '2',
  '0',
  '0',
  '0',
  '0',
  '2019',
  'June',
  'Wednesday',
  '11:15 am',
  '060',
  'Cross Road',
  'Straight road',
  'Level',
  'Not Applicable',
  'Sealed',
  'Dry',
  'Not Raining',
  'Daylight',
  'Right Angle',
  '01',
  'Driver Rider',
  '1: PDO',
  'Give Way Sign',
  '',
  '',
  '1331810.03',
  '1676603.26',
  '13318101676603'],
 ['2019-2-8/07/2020',
  '2 Metropolitan',
  'DRY CREEK',
  '5094',
  'CITY OF SALISBURY',
  '2',
  '0',
  '0',
  '0',
  '0',
  '2019',
  'January',
  'Tuesday',
  '12:49 am',
  '090',
  'Divided Road',
  'Straight road',
  'Level',
  'Not Applicable',
  'Sealed',
  'Dry',
  'Not Raining',
  'Night',
  'Rear End',
  '02',
  'Driver Rider',
  '1: PDO',
  'No Control',
  '',
  '',
  '1328376.2',
  '1682942.63',
  '13283761682943'],
 ['2019-3-8/07/2020',
  '2 Metropolitan',
  'MILE END',
  '5031',
  'CITY OF WEST TORRENS',
  '2',
  '1',
 

## 1.2 Data Partitioning in RDD<a class="anchor" id="1.2"></a>

 <div class="alert alert-block alert-info">
1. How many partitions do the above RDDs have? How is the data in these RDDs
partitioned by default, when we do not explicitly specify any partitioning strategy?

In [7]:
# Print the default number of partitions for both the RDDs
print('Number of partitions for the Units RDD:', rdd_units.getNumPartitions())
print('Number of partitions for the Crash RDD:', rdd_crash.getNumPartitions())

Number of partitions for the Units RDD: 5
Number of partitions for the Crash RDD: 5


 <div class="alert alert-block alert-warning">
 <span style="color: green"><b>Answer</b></span><br>
Spark assigns a new InputFileReader task to every chunk of file that it reads into RDDs, unless specified explicitly about the number of partitions and the partitioning strategy. This helps in segregating the files in separate partitions and maintaining the integrity of the file structure. Here, 5 different CSV files have been imported while creating each of the 2 RDDs. Hence, each file has been stored in a different partition, creating a total of 5 partitions for both the RDDs.

 <div class="alert alert-block alert-info">
2. In the “Units” csv dataset, there is a column called Lic State which shows the state
where the vehicle is registered. Assume we want to keep all the data related to SA in
one partition and the rest of the data in another partition.

 <div class="alert alert-block alert-info">
a. Create a Key Value Pair RDD with Lic State as the key and rest of the other
columns as value.

In [8]:
# Create a Key Value Pair with Lic State as the key
rdd_units_kv = rdd_units.map(lambda x: (x[9], x[0:9] + x[10:]))

 <div class="alert alert-block alert-info">
b. Write the code to implement this partitioning in RDD using appropriate
partitioning functions.

In [9]:
# Define the number of partitions
no_of_partitions = 2;

# Define the hash function with the 'SA' lic state condition
def hash_function(key):
    if key == 'SA':
        return 2
    else:
        return 3

# Carry out the hash partitioning
hash_partitioned_rdd = rdd_units_kv.partitionBy(no_of_partitions, hash_function)

 <div class="alert alert-block alert-info">
c. Write the code to print the number of records in each partition. What does it
tell about the data skewness?

In [10]:
# Define a function to print the number of partitions and the number of records in each partition
def print_partitions(data):
    if isinstance(data, RDD):
        numPartitions = data.getNumPartitions()
        partitions = data.glom().collect()
    else:
        numPartitions = data.rdd.getNumPartitions()
        partitions = data.rdd.glom().collect()
    
    print(f'Number of Partitions: {numPartitions}\n')
    print(f'Number of records in each partition:')
    for index, partition in enumerate(partitions):
        print(f'Partition {index}: {len(partition)} records')

# Print the number of partitions and the number of records in each partition
print_partitions(hash_partitioned_rdd)

Number of Partitions: 2

Number of records in each partition:
Partition 0: 109684 records
Partition 1: 44170 records


 <div class="alert alert-block alert-warning">
 <span style="color: green"><b>Answer</b></span><br>
The partitions above tell us that the data is highly skewed as partition 0 has more than double the records in partition 1. This will affect the processing performance of the data due to inefficient load balancing.

## 1.3 Query/Analysis<a class="anchor" id="1.3"></a>

 <div class="alert alert-block alert-info">
1. Find the average age of male and female drivers separately.

In [11]:
# Create a Key Value pair for sex and age, and clean the age data in RDD
rdd_units_driver = rdd_units.map(lambda x: (x[7], x[8]))\
                            .filter(lambda x: x[0] != 'Unknown')\
                            .filter(lambda x: x[1] != 'XXX')\
                            .filter(lambda x: x[0] != '')\
                            .filter(lambda x: x[1] != '')\
                            .map(lambda x: (x[0], int(x[1])))


# Filter the RDD only for the ages of male drivers
rdd_units_driver_male = rdd_units_driver.filter(lambda x : x[0] == 'Male')
# Count the male drivers in the RDD
male_count = rdd_units_driver_male.count()
# Calculate the average age of the male drivers
male_avg_age = rdd_units_driver_male.reduceByKey(lambda x, y: x + y)
# Store the average age of male drivers as a list
avg_age_male = male_avg_age.collect()

# Print out the average age of male drivers
print('The average age of male drivers is', round(avg_age_male[0][1] / male_count, 2), 'years.')


# Filter the RDD only for the ages of female drivers
rdd_units_driver_female = rdd_units_driver.filter(lambda x : x[0] == 'Female')
# Count the female drivers in the RDD
female_count = rdd_units_driver_female.count()
# Calculate the average age of the female drivers
female_avg_age = rdd_units_driver_female.reduceByKey(lambda x, y: x + y)
# Store the average age of female drivers as a list
avg_age_female = female_avg_age.collect()

# Print out the average age of female drivers
print('The average age of female drivers is', round(avg_age_female[0][1] / female_count, 2), 'years.')

The average age of male drivers is 40.98 years.
The average age of female drivers is 40.39 years.


 <div class="alert alert-block alert-info">
2. What is the oldest and the newest vehicle year involved in the accident? Display the
Registration State, Year and Unit type of the vehicle.

In [12]:
# Clean the year data and create a Key Value pair for the RDD
rdd_units_vehicle = rdd_units.map(lambda x: (x[5], (x[3], x[4])))\
                            .filter(lambda x: x[0] != 'XXXX')\
                            .filter(lambda x: x[0] != '')\
                            .map(lambda x: (int(x[0]), (x[1][0], x[1][1])))\
                            .filter(lambda x: x[0] != 0)

# Extract the minimum and the maximum values of the year data 
rdd_units_vehicle_min_max = rdd_units_vehicle.map(lambda x: x[0])
year_min = rdd_units_vehicle_min_max.min()
year_max = rdd_units_vehicle_min_max.max()

# Create the final RDD with the details of the unique oldest vehicles
rdd_oldest_vehicle = rdd_units_vehicle.filter(lambda x: x[0] == year_min)\
                                    .map(lambda x: (x[1][0], x[0], x[1][1])).distinct()

# Create the final RDD with the details of the unique newest vehicles
rdd_newest_vehicle = rdd_units_vehicle.filter(lambda x: x[0] == year_max)\
                                    .map(lambda x: (x[1][0], x[0], x[1][1])).distinct()

In [13]:
# Show the RDD with the oldest vehicle data
rdd_oldest_vehicle.collect()

[('SA', 1900, 'RIGID TRUCK LGE GE 4.5T'),
 ('SA', 1900, 'Motor Cycle'),
 ('VIC', 1900, 'Motor Cycle')]

In [14]:
# Show the RDD with the newest vehicle data
rdd_newest_vehicle.collect()

[('SA', 2019, 'Station Wagon'),
 ('VIC', 2019, 'Station Wagon'),
 ('SA', 2019, 'BDOUBLE - ROAD TRAIN'),
 ('SA', 2019, 'Motor Vehicle - Type Unknown'),
 ('SA', 2019, 'Light Truck LT 4.5T'),
 ('QLD', 2019, 'RIGID TRUCK LGE GE 4.5T'),
 ('SA', 2019, 'OMNIBUS'),
 ('SA', 2019, 'Utility'),
 ('VIC', 2019, 'Utility'),
 ('SA', 2019, 'SEMI TRAILER'),
 ('SA', 2019, 'Other Defined Special Vehicle'),
 ('VIC', 2019, 'Motor Cars - Sedan'),
 ('NT', 2019, 'Station Wagon'),
 ('SA', 2019, 'Scooter'),
 ('NSW', 2019, 'Motor Cars - Sedan'),
 ('VIC', 2019, 'BDOUBLE - ROAD TRAIN'),
 ('SA', 2019, 'Motor Cars - Sedan'),
 ('SA', 2019, 'Motor Cycle'),
 ('QLD', 2019, 'SEMI TRAILER'),
 ('SA', 2019, 'RIGID TRUCK LGE GE 4.5T'),
 ('SA', 2019, 'Panel Van')]

# 2 Working with DataFrames<a class="anchor" id="2"></a>
## 2.1 Data Preparation and Loading<a class="anchor" id="2.1"></a>

 <div class="alert alert-block alert-info">
1. Load all units and crash data into two separate dataframes.

In [15]:
# Import all the Units and Crashes csv files into single respective DataFrames
df_units = spark.read.csv('*Units.csv', inferSchema = True, header = True)
df_crash = spark.read.csv('*Crash.csv', inferSchema = True, header = True)

 <div class="alert alert-block alert-info">
2. Display the schema of the final two dataframes.

In [16]:
# Show the schema for the Units DF
df_units.printSchema()

root
 |-- REPORT_ID: string (nullable = true)
 |-- Unit No: integer (nullable = true)
 |-- No Of Cas: integer (nullable = true)
 |-- Veh Reg State: string (nullable = true)
 |-- Unit Type: string (nullable = true)
 |-- Veh Year: string (nullable = true)
 |-- Direction Of Travel: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Lic State: string (nullable = true)
 |-- Licence Class: string (nullable = true)
 |-- Licence Type: string (nullable = true)
 |-- Towing: string (nullable = true)
 |-- Unit Movement: string (nullable = true)
 |-- Number Occupants: string (nullable = true)
 |-- Postcode: string (nullable = true)
 |-- Rollover: string (nullable = true)
 |-- Fire: string (nullable = true)



In [17]:
# Show the schema for the Crashes DF
df_crash.printSchema()

root
 |-- REPORT_ID: string (nullable = true)
 |-- Stats Area: string (nullable = true)
 |-- Suburb: string (nullable = true)
 |-- Postcode: integer (nullable = true)
 |-- LGA Name: string (nullable = true)
 |-- Total Units: integer (nullable = true)
 |-- Total Cas: integer (nullable = true)
 |-- Total Fats: integer (nullable = true)
 |-- Total SI: integer (nullable = true)
 |-- Total MI: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: string (nullable = true)
 |-- Day: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Area Speed: integer (nullable = true)
 |-- Position Type: string (nullable = true)
 |-- Horizontal Align: string (nullable = true)
 |-- Vertical Align: string (nullable = true)
 |-- Other Feat: string (nullable = true)
 |-- Road Surface: string (nullable = true)
 |-- Moisture Cond: string (nullable = true)
 |-- Weather Cond: string (nullable = true)
 |-- DayNight: string (nullable = true)
 |-- Crash Type: string (nullable = true

## 2.2 Query/Analysis<a class="anchor" id="2.2"></a>

 <div class="alert alert-block alert-info">
1. Find all the crash events in Adelaide where the total number of casualties in the event
is more than 3.

In [18]:
# Filter the data to Adelaide suburb and the total casualties being more than 3
crash_df = df_crash.filter((f.col('Suburb') == 'ADELAIDE') & (f.col('Total Cas') > 3))
# Convert the DF to a Pandas DF for better readability and display the results
crash_df.toPandas()

Unnamed: 0,REPORT_ID,Stats Area,Suburb,Postcode,LGA Name,Total Units,Total Cas,Total Fats,Total SI,Total MI,...,Crash Type,Unit Resp,Entity Code,CSEF Severity,Traffic Ctrls,DUI Involved,Drugs Involved,ACCLOC_X,ACCLOC_Y,UNIQUE_LOC
0,2018-601-17/01/2020,1 City,ADELAIDE,5000,CITY OF ADELAIDE,8,4,0,2,2,...,Hit Pedestrian,1,Driver Rider,3: SI,No Control,,,1329806.36,1670224.76,13298061670225
1,2017-1613-15/08/2019,1 City,ADELAIDE,5000,CITY OF ADELAIDE,2,4,0,0,4,...,Right Turn,1,Driver Rider,2: MI,Traffic Signals,,,1327951.24,1669556.92,13279511669557
2,2017-12182-15/08/2019,1 City,ADELAIDE,5000,CITY OF ADELAIDE,6,5,0,1,4,...,Hit Pedestrian,1,Driver Rider,3: SI,Traffic Signals,,,1329016.2,1670995.07,13290161670995
3,2019-10404-8/07/2020,1 City,ADELAIDE,5000,CITY OF ADELAIDE,4,6,0,0,6,...,Right Turn,1,Driver Rider,2: MI,No Control,,,1327088.72,1670880.07,13270891670880


 <div class="alert alert-block alert-info">
2. Display 10 crash events with highest casualties.

In [19]:
# Sort the DF by total casualties in descending order
crash_df = df_crash.sort('Total Cas', ascending = False)
# Convert the DF to a Pandas DF for better readability and display 10 crash events as results
crash_df.toPandas().head(10)

Unnamed: 0,REPORT_ID,Stats Area,Suburb,Postcode,LGA Name,Total Units,Total Cas,Total Fats,Total SI,Total MI,...,Crash Type,Unit Resp,Entity Code,CSEF Severity,Traffic Ctrls,DUI Involved,Drugs Involved,ACCLOC_X,ACCLOC_Y,UNIQUE_LOC
0,2017-288-15/08/2019,2 Metropolitan,PARA HILLS,5096,CITY OF SALISBURY,2,11,0,1,10,...,Right Angle,1,Driver Rider,3: SI,Stop Sign,,,1334428.9,1683032.96,13344290000000.0
1,2016-3035-15/08/2019,2 Metropolitan,HACKHAM,5163,CITY OF ONKAPARINGA,3,9,3,5,1,...,Right Turn,1,Driver Rider,4: Fatal,No Control,,,1320361.49,1645195.63,13203610000000.0
2,2016-6630-15/08/2019,2 Metropolitan,KANGAROO FLAT,5118,LIGHT REGIONAL COUNCIL,3,9,0,2,7,...,Head On,1,Driver Rider,3: SI,No Control,,,1339316.32,1710314.92,13393160000000.0
3,2019-11734-8/07/2020,2 Metropolitan,STURT,5047,CC MARION.,2,9,0,1,8,...,Right Turn,2,Driver Rider,3: SI,Traffic Signals,,,1324428.84,1659884.95,13244290000000.0
4,2016-7073-15/08/2019,3 Country,MERRITON,5523,PT.PIRIE CITY & DIST. COUNCIL,2,8,4,3,1,...,Head On,1,Driver Rider,4: Fatal,No Control,,,1293759.89,1840109.96,12937600000000.0
5,2016-14407-15/08/2019,3 Country,STOCKWELL,5355,THE BAROSSA COUNCIL.,2,8,1,6,1,...,Head On,1,Driver Rider,4: Fatal,No Control,,,1373964.45,1723462.57,13739640000000.0
6,2015-2823-21/08/2019,3 Country,HAWKER,5434,THE FLINDERS RANGES COUNCIL.,1,8,0,0,8,...,Roll Over,1,Driver Rider,2: MI,No Control,,,1315077.61,2022309.34,13150780000000.0
7,2016-8547-15/08/2019,3 Country,WINDSOR,5501,DC MALLALA.,4,7,0,1,6,...,Rear End,2,Driver Rider,3: SI,No Control,,Y,1306853.11,1724952.66,13068530000000.0
8,2015-6965-21/08/2019,3 Country,BEAUFORT,5550,YORKE PENINSULA COUNCIL,3,7,3,4,0,...,Head On,9,Other,4: Fatal,No Control,,,1287930.19,1761652.36,12879300000000.0
9,2015-12591-21/08/2019,3 Country,MALLALA,5502,DC MALLALA.,2,7,0,2,5,...,Right Angle,1,Driver Rider,3: SI,Give Way Sign,,,1325122.01,1724860.95,13251220000000.0


 <div class="alert alert-block alert-info">
3. Find the total number of fatalities for each crash type.

In [20]:
# Group by crash type, aggregate the sum of fatalities accordingly, then sort them based on the total sum and show results
df_crash.groupBy('Crash Type').agg({'Total Fats': 'sum'})\
        .sort('sum(Total Fats)', ascending = False)\
        .withColumnRenamed('sum(Total Fats)', 'Total Fatalities').show()

+--------------------+----------------+
|          Crash Type|Total Fatalities|
+--------------------+----------------+
|    Hit Fixed Object|             152|
|             Head On|              86|
|      Hit Pedestrian|              70|
|           Roll Over|              57|
|         Right Angle|              45|
|          Side Swipe|              20|
|          Right Turn|              18|
|            Rear End|              16|
|  Hit Parked Vehicle|               9|
|          Hit Animal|               4|
|  Hit Object on Road|               2|
|               Other|               2|
|Left Road - Out o...|               1|
+--------------------+----------------+



 <div class="alert alert-block alert-info">
4. Find the total number of casualties for each suburb when the vehicle was driven by an
unlicensed driver. You are required to display the name of the suburb and the total
number of casualties.

In [21]:
# Filter the units DF to only the unlicenced drivers
df_units_unlicenced = df_units.filter(f.col('Licence Type') == 'Unlicenced')

# Join both the DFs on REPORT_ID
df_joined = df_units_unlicenced.join(df_crash,
                                     df_units_unlicenced.REPORT_ID == df_crash.REPORT_ID,
                                     how = 'inner')

# Group by suburb, aggregate the sum of casualties accordingly and sort them based on the total sum
df_joined.groupBy('Suburb').agg({'Total Cas': 'sum'}).sort('sum(Total Cas)', ascending = False)\
            .withColumnRenamed('sum(Total Cas)', 'Total Casualties').show() # Rename the column as required and show

+---------------+----------------+
|         Suburb|Total Casualties|
+---------------+----------------+
|       ADELAIDE|              19|
|      SALISBURY|              18|
|      DRY CREEK|              18|
| SALISBURY EAST|              16|
|       PROSPECT|              14|
| NORTH ADELAIDE|              13|
|        ENFIELD|              12|
|   ANDREWS FARM|              12|
|SALISBURY SOUTH|              11|
|SALISBURY DOWNS|              11|
|     INGLE FARM|              11|
|   BEDFORD PARK|              11|
| ELIZABETH PARK|              10|
|     MUNNO PARA|              10|
|  MORPHETT VALE|              10|
|         BURTON|              10|
|   MOUNT BARKER|              10|
|   MAWSON LAKES|              10|
|SALISBURY PLAIN|              10|
|ELIZABETH GROVE|               9|
+---------------+----------------+
only showing top 20 rows



## 2.3 Severity Analysis<a class="anchor" id="2.3"></a>

In [22]:
# Select only the required columns from the crash DF for the entire severity analysis to reduce the query load
df_crash_sa = df_crash.select(f.col('CSEF Severity'), f.col('Drugs Involved'), f.col('DUI Involved'))

 <div class="alert alert-block alert-info">
1. Find the total number of crash events for each severity level. Which severity level is the
most common?

In [23]:
# Group by the severity level and aggregate the count of the number of records
df_crash_sa.groupBy('CSEF Severity').count().withColumnRenamed('count', 'Count').show()

+-------------+-----+
|CSEF Severity|Count|
+-------------+-----+
|     4: Fatal|  451|
|        2: MI|21881|
|       1: PDO|46696|
|        3: SI| 2978|
+-------------+-----+



 <div class="alert alert-block alert-warning">
 <span style="color: green"><b>Answer</b></span><br>
The PDO, i.e. Property Damage Only is the most common severity level among the 4, with a total count of 46696 crash incidents.

 <div class="alert alert-block alert-info">
2. Compute the total number of crash events for each severity level and the percentage
for the four different scenarios.

 <div class="alert alert-block alert-info">
a. When the driver is tested positive on drugs.

In [24]:
# Filter the DF based on the requirement, group by severity level, aggregate the count accordingly
# and then compute the percentage of the count
a = df_crash_sa.filter((f.col('Drugs Involved') == 'Y') & f.col('DUI Involved').isNull()).groupBy('CSEF Severity').count()\
            .withColumn('Percentage', f.round(f.col('count') / f.sum('count').over(Window.partitionBy()) * 100, 2))\
            .withColumnRenamed('count', 'Count')

# Display the results
a.show()

+-------------+-----+----------+
|CSEF Severity|Count|Percentage|
+-------------+-----+----------+
|     4: Fatal|   55|       5.1|
|        2: MI|  660|     61.17|
|       1: PDO|  152|     14.09|
|        3: SI|  212|     19.65|
+-------------+-----+----------+



 <div class="alert alert-block alert-info">
b. When the driver is tested positive for blood alcohol concentration.

In [25]:
# Filter the DF based on the requirement, group by severity level, aggregate the count accordingly
# and then compute the percentage of the count
b = df_crash_sa.filter((f.col('DUI Involved') == 'Y') & f.col('Drugs Involved').isNull()).groupBy('CSEF Severity').count()\
            .withColumn('Percentage', f.round(f.col('count') / f.sum('count').over(Window.partitionBy()) * 100, 2))\
            .withColumnRenamed('count', 'Count')

# Display the results
b.show()

+-------------+-----+----------+
|CSEF Severity|Count|Percentage|
+-------------+-----+----------+
|     4: Fatal|   52|      2.51|
|        2: MI|  648|     31.26|
|       1: PDO| 1149|     55.43|
|        3: SI|  224|     10.81|
+-------------+-----+----------+



 <div class="alert alert-block alert-info">
c. When the driver is tested positive for both drugs and blood alcohol

In [26]:
# Filter the DF based on the requirement, group by severity level, aggregate the count accordingly
# and then compute the percentage of the count
c = df_crash_sa.filter((f.col('Drugs Involved') == 'Y') & (f.col('DUI Involved') == 'Y'))\
            .groupBy('CSEF Severity').count()\
            .withColumn('Percentage', f.round(f.col('count') / f.sum('count').over(Window.partitionBy()) * 100, 2))\
            .withColumnRenamed('count', 'Count')

# Display the results
c.show()

+-------------+-----+----------+
|CSEF Severity|Count|Percentage|
+-------------+-----+----------+
|     4: Fatal|   27|     15.43|
|        2: MI|   89|     50.86|
|       1: PDO|   24|     13.71|
|        3: SI|   35|      20.0|
+-------------+-----+----------+



 <div class="alert alert-block alert-info">
d. When the driver is tested negative for both (no alcohol and no drugs).

In [27]:
# Filter the DF based on the requirement, group by severity level, aggregate the count accordingly
# and then compute the percentage of the count
d = df_crash_sa.filter((f.col('Drugs Involved').isNull()) & (f.col('DUI Involved').isNull()))\
            .groupBy('CSEF Severity').count()\
            .withColumn('Percentage', f.round(f.col('count') / f.sum('count').over(Window.partitionBy()) * 100, 2))\
            .withColumnRenamed('count', 'Count')

# Display the results
d.show()

+-------------+-----+----------+
|CSEF Severity|Count|Percentage|
+-------------+-----+----------+
|     4: Fatal|  317|      0.46|
|        2: MI|20484|     29.83|
|       1: PDO|45371|     66.06|
|        3: SI| 2507|      3.65|
+-------------+-----+----------+



## Crash Severity Analysis - For Part B<a class="anchor" id="b"></a>

In [28]:
# Join all the results from the above 4 scenarios and generate a single result table to compare the numbers
severity_joined = a.select(f.col('CSEF Severity'), f.col('Percentage').alias('On Drugs'))\
            .join(b.select(f.col('CSEF Severity'), f.col('Percentage').alias('On Alc')), ['CSEF Severity'])\
            .join(c.select(f.col('CSEF Severity'), f.col('Percentage').alias('On Both')), ['CSEF Severity'])\
            .join(d.select(f.col('CSEF Severity'), f.col('Percentage').alias('On None')), ['CSEF Severity'])
severity_joined = severity_joined.withColumnRenamed('CSEF Severity', 'Severity Level').sort('Severity Level')

# Show the joined table for comparison
severity_joined.show()

+--------------+--------+------+-------+-------+
|Severity Level|On Drugs|On Alc|On Both|On None|
+--------------+--------+------+-------+-------+
|        1: PDO|   14.09| 55.43|  13.71|  66.06|
|         2: MI|   61.17| 31.26|  50.86|  29.83|
|         3: SI|   19.65| 10.81|   20.0|   3.65|
|      4: Fatal|     5.1|  2.51|  15.43|   0.46|
+--------------+--------+------+-------+-------+



 <div class="alert alert-block alert-warning">
 <span style="color: green"><b>Answer</b></span><br>
After comparing the results from the 4 different scenarios, we can see that drivers tend to cause the highest proportion of fatal accidents when they are tested positive for both, i.e. drugs and alcohol. Also, there are high proportions of minor and serious injuries in this case. In the case of testing positive for drugs only, there are plenty of minor injuries to the drivers followed by some serious injuries as well, and very little fatal and property damages. We can see some distribution in the case of testing positive for alcohol only too, where the highest share is for property damages only, followed by some minor injuries and very few fatal accidents. But, the maximum property damage is caused in the case of accidents when the drivers are tested negative for both, drugs and alcohol. All the other severity levels are comparatively low in this particular case.

## 2.4 RDDs vs DataFrames vs Spark SQL<a class="anchor" id="2.4"></a>

In [29]:
# Create SQL Views from both the Dataframes
df_units.createOrReplaceTempView('sql_units')
df_crash.createOrReplaceTempView('sql_crash')

 <div class="alert alert-block alert-info">
1. Find the Date and Time of Crash, Number of Casualties in each unit and the Gender,
Age, License Type of the unit driver for the suburb "Adelaide".

### RDD

In [30]:
%%time
# Log the time taken for the query

# Create a Key Value pair with REPORT_ID as the key and the required columns as values for both the RDDs
rdd_units_kv = rdd_units.map(lambda x: (x[0], (int(x[1]), int(x[2]), x[7], int(x[8]) if x[8].isdigit() else x[8], x[11])))
rdd_crash_kv = rdd_crash.map(lambda x: (x[0], (x[10], x[2], x[11], x[12], x[13])))

# Filter the crash RDD only for the Adelaide suburb 
rdd_crash_kv = rdd_crash_kv.filter(lambda x: x[1][1] == 'ADELAIDE')
# Join the 2 RDDs on REPORT_ID
rdd_joined = rdd_units_kv.join(rdd_crash_kv)

# Concatenate the date as required and select all the other required elements
rdd_joined = rdd_joined.map(lambda x: ((x[1][1][0] + '-' + x[1][1][2] + '-' + x[1][1][3]),
                                       x[1][1][4], x[1][0][0], x[1][0][1], x[1][0][2], x[1][0][3], x[1][0][4]))
# Sort the RDD by date and time for easy comparison with the other 2 methods
rdd_joined = rdd_joined.sortBy(lambda x: (x[0], x[1]), ascending = False)

# Print the total number of rows in the result
print('Total rows: ', rdd_joined.count())
# Show the result RDD
rdd_joined.collect()

Total rows:  6310
CPU times: user 53.7 ms, sys: 16.4 ms, total: 70.1 ms
Wall time: 4.8 s


[('2019-September-Wednesday', '11:40 am', 1, 0, 'Male', 56, 'Full'),
 ('2019-September-Wednesday', '11:40 am', 2, 0, 'Unknown', 'XXX', 'Unknown'),
 ('2019-September-Wednesday', '11:38 am', 1, 0, 'Male', 29, 'Full'),
 ('2019-September-Wednesday', '11:38 am', 2, 1, 'Male', 73, ''),
 ('2019-September-Wednesday', '07:55 pm', 1, 1, 'Female', 31, ''),
 ('2019-September-Wednesday', '07:55 pm', 2, 0, 'Unknown', 'XXX', ''),
 ('2019-September-Wednesday', '06:50 pm', 1, 0, 'Female', 75, 'Full'),
 ('2019-September-Wednesday', '06:50 pm', 2, 0, 'Male', 39, 'Full'),
 ('2019-September-Wednesday', '06:00 pm', 1, 1, 'Female', 38, ''),
 ('2019-September-Wednesday', '06:00 pm', 2, 0, 'Unknown', 'XXX', ''),
 ('2019-September-Wednesday', '04:36 pm', 1, 1, 'Male', 28, ''),
 ('2019-September-Wednesday', '04:36 pm', 2, 1, 'Female', 33, ''),
 ('2019-September-Wednesday', '04:10 pm', 1, 0, 'Female', 29, 'Full'),
 ('2019-September-Wednesday', '04:10 pm', 2, 0, 'Male', 50, 'Full'),
 ('2019-September-Wednesday', '

### DataFrame

In [31]:
%%time
# Log the time taken for the query

# Filter the crash DF for the Adelaide suburb only 
df_crash_adelaide = df_crash.filter(f.col('Suburb') == 'ADELAIDE')
# Join the 2 DFs on REPORT_ID
df_joined = df_units.join(df_crash_adelaide,
                          df_units.REPORT_ID == df_crash_adelaide.REPORT_ID,
                          how = 'inner')

# Concatenate the date as required and select all the other required columns
df_joined = df_joined.select(f.concat(f.col('Year'), f.lit('-'), f.col('Month'), f.lit('-'), f.col('Day')),
                             f.col('Time'), f.col('Unit No'), f.col('No of Cas'),
                             f.col('Sex'), f.col('Age'), f.col('Licence Type'))\
                            .withColumnRenamed('concat(Year, -, Month, -, Day)', 'Date')\
                            .sort('Date', 'Time', ascending = False) # Sort by date and time for easy comparison

# Print the total number of rows in the result
print('Total rows: ', df_joined.count())
# Show the result DF
df_joined.show(truncate = False)

Total rows:  6310
+------------------------+--------+-------+---------+-------+---+------------+
|Date                    |Time    |Unit No|No of Cas|Sex    |Age|Licence Type|
+------------------------+--------+-------+---------+-------+---+------------+
|2019-September-Wednesday|11:40 am|1      |0        |Male   |056|Full        |
|2019-September-Wednesday|11:40 am|2      |0        |Unknown|XXX|Unknown     |
|2019-September-Wednesday|11:38 am|1      |0        |Male   |029|Full        |
|2019-September-Wednesday|11:38 am|2      |1        |Male   |073|null        |
|2019-September-Wednesday|07:55 pm|2      |0        |Unknown|XXX|null        |
|2019-September-Wednesday|07:55 pm|1      |1        |Female |031|null        |
|2019-September-Wednesday|06:50 pm|2      |0        |Male   |039|Full        |
|2019-September-Wednesday|06:50 pm|1      |0        |Female |075|Full        |
|2019-September-Wednesday|06:00 pm|1      |1        |Female |038|null        |
|2019-September-Wednesday|06:00 pm

### Spark SQL

In [32]:
%%time
# Log the time taken for the query

# Write the SQL query for retreiving the required result and store it
sql_query = spark.sql('''
  SELECT
      CONCAT(c.Year, '-', c.Month, '-', c.Day) AS Date,
      c.Time,
      u.`Unit No`,
      u.`No of Cas`,
      u.Sex,
      u.Age,
      u.`Licence Type`
  FROM sql_units u
  JOIN sql_crash c ON u.REPORT_ID = c.REPORT_ID
  WHERE c.Suburb = 'ADELAIDE'
  ORDER BY CONCAT(c.Year, '-', c.Month, '-', c.Day) DESC, c.Time DESC
''')

# Print the total number of rows in the result
print('Total rows: ', sql_query.count())
# Show the result of the SQL query
sql_query.show(truncate = False)

Total rows:  6310
+------------------------+--------+-------+---------+-------+---+------------+
|Date                    |Time    |Unit No|No of Cas|Sex    |Age|Licence Type|
+------------------------+--------+-------+---------+-------+---+------------+
|2019-September-Wednesday|11:40 am|1      |0        |Male   |056|Full        |
|2019-September-Wednesday|11:40 am|2      |0        |Unknown|XXX|Unknown     |
|2019-September-Wednesday|11:38 am|1      |0        |Male   |029|Full        |
|2019-September-Wednesday|11:38 am|2      |1        |Male   |073|null        |
|2019-September-Wednesday|07:55 pm|2      |0        |Unknown|XXX|null        |
|2019-September-Wednesday|07:55 pm|1      |1        |Female |031|null        |
|2019-September-Wednesday|06:50 pm|2      |0        |Male   |039|Full        |
|2019-September-Wednesday|06:50 pm|1      |0        |Female |075|Full        |
|2019-September-Wednesday|06:00 pm|1      |1        |Female |038|null        |
|2019-September-Wednesday|06:00 pm

 <div class="alert alert-block alert-info">
2. Find the total number of casualties for each suburb when the vehicle was driven by an
unlicensed driver. You are required to display the name of the suburb and the total
number of casualties.

### RDD

In [33]:
%%time
# Log the time taken for the query

# Create a Key Value pair with REPORT_ID as the key and the required columns as values for both the RDDs
rdd_units_kv = rdd_units.map(lambda x: (x[0], (x[1], x[11])))
rdd_crash_kv = rdd_crash.map(lambda x: (x[0], (x[2], int(x[6]))))

# Filter the units RDD only for the unlicenced drivers
rdd_units_kv = rdd_units_kv.filter(lambda x: x[1][1] == 'Unlicenced')

# Join the 2 RDDs on REPORT_ID
rdd_joined = rdd_units_kv.join(rdd_crash_kv)

# Select the elements as required and store them
rdd_joined = rdd_joined.map(lambda x: (x[1][1][0], (x[1][1][1], x[1][0][1])))
rdd_joined = rdd_joined.map(lambda x: (x[0], x[1][0]))
# Group by suburb, aggregate the sum of casualties accordingly and sort them based on the total sum for easy comparison
rdd_joined = rdd_joined.groupByKey().mapValues(sum).sortBy(lambda x: (x[1], x[0]), ascending = False)

# Print the total number of rows in the result
print('Total rows: ', rdd_joined.count())
# Show the result RDD
rdd_joined.collect()

Total rows:  634
CPU times: user 67.1 ms, sys: 10.7 ms, total: 77.8 ms
Wall time: 3.5 s


[('ADELAIDE', 19),
 ('SALISBURY', 18),
 ('DRY CREEK', 18),
 ('SALISBURY EAST', 16),
 ('PROSPECT', 14),
 ('NORTH ADELAIDE', 13),
 ('ENFIELD', 12),
 ('ANDREWS FARM', 12),
 ('SALISBURY SOUTH', 11),
 ('SALISBURY DOWNS', 11),
 ('INGLE FARM', 11),
 ('BEDFORD PARK', 11),
 ('SALISBURY PLAIN', 10),
 ('MUNNO PARA', 10),
 ('MOUNT BARKER', 10),
 ('MORPHETT VALE', 10),
 ('MAWSON LAKES', 10),
 ('ELIZABETH PARK', 10),
 ('BURTON', 10),
 ('SALISBURY NORTH', 9),
 ('PLYMPTON', 9),
 ('PARA HILLS WEST', 9),
 ('HOLDEN HILL', 9),
 ('ELIZABETH GROVE', 9),
 ('ELIZABETH EAST', 9),
 ('SOUTH PLYMPTON', 8),
 ('SEATON', 8),
 ('MOUNT GAMBIER', 8),
 ('FLINDERS PARK', 8),
 ('ELIZABETH SOUTH', 8),
 ('DAVOREN PARK', 8),
 ('WHYALLA NORRIE', 7),
 ('PARAFIELD GARDENS', 7),
 ('ONKAPARINGA HILLS', 7),
 ('MURRAY BRIDGE', 7),
 ('MAGILL', 7),
 ('KANGARILLA', 7),
 ('GILLES PLAINS', 7),
 ('COOBER PEDY', 7),
 ('BURRA', 7),
 ('BROOKLYN PARK', 7),
 ('WYNN VALE', 6),
 ('WINGFIELD', 6),
 ('WATERLOO CORNER', 6),
 ('WARRADALE', 6),
 ('V

### DataFrame

In [34]:
%%time
# Log the time taken for the query

# Filter the units DF to only the unlicenced drivers
df_units_unlicenced = df_units.filter(f.col('Licence Type') == 'Unlicenced')

# Join both the DFs on REPORT_ID
df_joined = df_units_unlicenced.join(df_crash,
                                     df_units_unlicenced.REPORT_ID == df_crash.REPORT_ID,
                                     how = 'inner')

# Group by suburb, aggregate the sum of casualties accordingly and sort them based on the total sum for easy comparison
df_joined = df_joined.groupBy('Suburb').agg({'Total Cas': 'sum'}).sort('sum(Total Cas)', ascending = False)\
                    .withColumnRenamed('sum(Total Cas)', 'Total Casualties') # Rename the column as required

# Print the total number of rows in the result
print('Total rows: ', df_joined.count())
# Show the result DF
df_joined.show()

Total rows:  634
+---------------+----------------+
|         Suburb|Total Casualties|
+---------------+----------------+
|       ADELAIDE|              19|
|      SALISBURY|              18|
|      DRY CREEK|              18|
| SALISBURY EAST|              16|
|       PROSPECT|              14|
| NORTH ADELAIDE|              13|
|        ENFIELD|              12|
|   ANDREWS FARM|              12|
|SALISBURY SOUTH|              11|
|   BEDFORD PARK|              11|
|     INGLE FARM|              11|
|SALISBURY DOWNS|              11|
|   MOUNT BARKER|              10|
|     MUNNO PARA|              10|
|SALISBURY PLAIN|              10|
| ELIZABETH PARK|              10|
|         BURTON|              10|
|  MORPHETT VALE|              10|
|   MAWSON LAKES|              10|
| ELIZABETH EAST|               9|
+---------------+----------------+
only showing top 20 rows

CPU times: user 1.66 ms, sys: 4.23 ms, total: 5.89 ms
Wall time: 3.43 s


### Spark SQL

In [35]:
%%time
# Log the time taken for the query

# Write the SQL query for retreiving the required result and store it
sql_query = spark.sql('''
  SELECT
      c.Suburb,
      sum(c.`Total Cas`) AS `Total Casualties`
  FROM sql_units u
  JOIN sql_crash c ON u.REPORT_ID = c.REPORT_ID
  WHERE u.`Licence Type` = 'Unlicenced'
  GROUP BY c.Suburb
  ORDER BY sum(c.`Total Cas`) DESC, c.Suburb DESC
''')

# Print the total number of rows in the result
print('Total rows: ', sql_query.count())
# Show the result of the SQL query
sql_query.show()

Total rows:  634
+---------------+----------------+
|         Suburb|Total Casualties|
+---------------+----------------+
|       ADELAIDE|              19|
|      SALISBURY|              18|
|      DRY CREEK|              18|
| SALISBURY EAST|              16|
|       PROSPECT|              14|
| NORTH ADELAIDE|              13|
|        ENFIELD|              12|
|   ANDREWS FARM|              12|
|SALISBURY SOUTH|              11|
|SALISBURY DOWNS|              11|
|     INGLE FARM|              11|
|   BEDFORD PARK|              11|
|SALISBURY PLAIN|              10|
|     MUNNO PARA|              10|
|   MOUNT BARKER|              10|
|  MORPHETT VALE|              10|
|   MAWSON LAKES|              10|
| ELIZABETH PARK|              10|
|         BURTON|              10|
|SALISBURY NORTH|               9|
+---------------+----------------+
only showing top 20 rows

CPU times: user 3.83 ms, sys: 93 µs, total: 3.93 ms
Wall time: 3.41 s


### End of the assignment