### Part A : Working with RDDs and DataFrames 

### 1.1 Data Preparation and Loading

In [2]:
# Importing SparkConf class into program
from pyspark import SparkConf

master = "local[*]"
# Naming The `appName` field, i.e., is a name to be shown on the Spark cluster UI page
app_name = "Assignment 1"

# Setup configuration parameters for Spark
spark_conf = SparkConf().setMaster(master).setAppName(app_name)

# Importing SparkSession classes 
from pyspark.sql import SparkSession # Spark SQL

# Initialize Spark Session and create a SparkContext Object
from pyspark import SparkContext # Spark

spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')

######  Import all the csv files from 2015-2019 into a single RDD.

In [3]:
# Importing all Units files to rdd using wildcard "*"
units_rdd = sc.textFile('*Units.csv')

# Importing all Crash files to rdd using wildcard "*"
crash_rdd = sc.textFile('*Crash.csv')

###### Use csv.reader to parse each row as a list in the RDD, and remove the header rows and display the total count and first 10 records.

In [16]:
# For Units RDD

# Split each line separated by comma into a list 
unitsrdd = units_rdd.map(lambda line: line.split(','))
# Remove the header
header = unitsrdd.first()
units_final = unitsrdd.filter(lambda row: row != header)   #filter out header
# Print how many final records
print(f"\n Total count of units : {units_final.count()}")
units_final.take(10)


 Total count of units : 153854


[['"2016-1-15/08/2019"',
  '"01"',
  '0',
  '"SA"',
  '"OMNIBUS"',
  '"2011"',
  '"North"',
  '"Male"',
  '"056"',
  '"SA"',
  '"HR"',
  '"Full"',
  '"Not Towing"',
  '"Straight Ahead"',
  '"010"',
  '"5121"',
  '',
  ''],
 ['"2016-1-15/08/2019"',
  '"02"',
  '1',
  '',
  '"Pedestrian on Road"',
  '',
  '"East"',
  '"Male"',
  '"072"',
  '',
  '',
  '',
  '',
  '"Walking on Road"',
  '',
  '"5084"',
  '',
  ''],
 ['"2016-2-15/08/2019"',
  '"01"',
  '0',
  '"SA"',
  '"Motor Cars - Sedan"',
  '"2004"',
  '"Unknown"',
  '"Female"',
  '"023"',
  '"SA"',
  '"C "',
  '"Full"',
  '"Not Towing"',
  '"Straight Ahead"',
  '"001"',
  '"5087"',
  '',
  ''],
 ['"2016-2-15/08/2019"',
  '"02"',
  '0',
  '"SA"',
  '"Station Wagon"',
  '"2008"',
  '"Unknown"',
  '"Male"',
  '"040"',
  '"SA"',
  '"C "',
  '"Full"',
  '"Not Towing"',
  '"Straight Ahead"',
  '"001"',
  '"5084"',
  '',
  ''],
 ['"2016-3-15/08/2019"',
  '"01"',
  '0',
  '"SA"',
  '"RIGID TRUCK LGE GE 4.5T"',
  '"1990"',
  '"South"',
  '"Unk

In [13]:
# For Crash RDD

# Split each line separated by comma into a list 
crashrdd = crash_rdd.map(lambda line: line.split(','))
# Remove the header
header = crashrdd.first()
crash_final = crashrdd.filter(lambda row: row != header)   #filter out header
# Print how many final records
print(f"\n Total count of Crashes : {crash_final.count()}")
crash_final.take(10)


 Total count of Crashes : 72006


[['"2019-1-8/07/2020"',
  '"2 Metropolitan"',
  '"HAMPSTEAD GARDENS"',
  '"5086"',
  '"CITY OF PORT ADELAIDE ENFIELD"',
  '2',
  '0',
  '0',
  '0',
  '0',
  '2019',
  '"June"',
  '"Wednesday"',
  '"11:15 am"',
  '"060"',
  '"Cross Road"',
  '"Straight road"',
  '"Level"',
  '"Not Applicable"',
  '"Sealed"',
  '"Dry"',
  '"Not Raining"',
  '"Daylight"',
  '"Right Angle"',
  '"01"',
  '"Driver Rider"',
  '"1: PDO"',
  '"Give Way Sign"',
  '""',
  '""',
  '1331810.03',
  '1676603.26',
  '"13318101676603"'],
 ['"2019-2-8/07/2020"',
  '"2 Metropolitan"',
  '"DRY CREEK"',
  '"5094"',
  '"CITY OF SALISBURY"',
  '2',
  '0',
  '0',
  '0',
  '0',
  '2019',
  '"January"',
  '"Tuesday"',
  '"12:49 am"',
  '"090"',
  '"Divided Road"',
  '"Straight road"',
  '"Level"',
  '"Not Applicable"',
  '"Sealed"',
  '"Dry"',
  '"Not Raining"',
  '"Night"',
  '"Rear End"',
  '"02"',
  '"Driver Rider"',
  '"1: PDO"',
  '"No Control"',
  '""',
  '""',
  '1328376.2',
  '1682942.63',
  '"13283761682943"'],
 ['"201

### 1.2 Data Partitioning in RDD

###### Check the number of partitions in the above RDDs. 

In [17]:
from pyspark.rdd import RDD
# Calculating and printing number of partitions and partitioner of units RDD
numPartitions_units = units_final.getNumPartitions()
print(f"NUMBER OF PARTITIONS IN UNITS: {numPartitions_units}")
print("Partitioner:{}".format(units_final.partitioner))

# Building a function to get number of records per transition
def print_partitions(data):
    partitions = data.glom().collect()
    for index, partition in enumerate(partitions):
        # show partition if it is not empty
        if len(partition) > 0:
            print(f"Partition {index}: {len(partition)} records")

# Printing records per partition using the above function
print_partitions(units_final)            

# Printing number of partitions and records per transition for each partition for crash rdd
numPartitions_crash = crash_final.getNumPartitions()
print(f"NUMBER OF PARTITIONS IN CRASH: {numPartitions_crash}")
print_partitions(crash_final) 

NUMBER OF PARTITIONS IN UNITS: 5
Partitioner:None
Partition 0: 35861 records
Partition 1: 28163 records
Partition 2: 33084 records
Partition 3: 27713 records
Partition 4: 29033 records
NUMBER OF PARTITIONS IN CRASH: 5
Partition 0: 12965 records
Partition 1: 16776 records
Partition 2: 13238 records
Partition 3: 13600 records
Partition 4: 15432 records


On observing the records in each of the partitions of Units and Crash RDD, we can say that records are distibuted almost equally in all the 5 partitions in both Units and Crash RDD. Thus, we can conclude that Random equal partitioning is used by default in the partitioning both Units and Crash RDDs.  

###### In the “Units” csv dataset, there is a column called Lic State which shows the state where the vehicle is registered. Assume we want to keep all the data related to SA in one partition and the rest of the data in another partition.

###### a. Create a Key Value Pair RDD with Lic State as the key and rest of the other columns as value.

In [1]:
# Creating a new RDD with Lic State as key and other columns as values
units_pairs = units_final.map(lambda x: (x[9], [x[0],x[1],x[2],x[3],x[4],x[5],x[6],x[7],
                                         x[8],x[10],x[11],x[12],x[13],x[14],x[15],x[16],x[17]]))

# units_pairs.collect() # Displaying the RDD

###### Implement this partitioning in RDD using appropriate partitioning functions.

In [19]:
# Separating "SA"in one partition and other states in other partition using Hash function
# Building a hash function that accepts key of the RDD as input
def hash_function(key):
    if key == '"SA"':
        total = 2 # if key is "SA" then a total 2 is returned, which is kept in partition 2%2=0
    else:
        total = 3 # else a total 3 is returned, which is kept in partition 3%2=1
    return total

# generating 2 partitions by hash_function 
hash_units_rdd = units_pairs.partitionBy(2,hash_function) 


###### Print the number of records in each partition.

In [20]:
# Printing record in each partition of the RDD generated in previous part
partitions = hash_units_rdd.glom().collect()
for index, partition in enumerate(partitions):
    print(f"Partition {index}: {len(partition)} records")
    

Partition 0: 109684 records
Partition 1: 44170 records


On observing, we can see that there are 109684 records of "SA" state and 44170 records containing other states. This maybe because of more data of "SA" in the datasets compared to other states.

### 1.3 Query/Analysis 

###### Calculate average age of male and female drivers separately.

In [21]:
# Selecting Sex and Age column from units RDD and removing extra double quotes from the values
units_sex_pairs = units_final.map(lambda x: (x[7].replace('"',''), x[8].replace('"','')))

# Removing null values of the 2 columns and filtering out "XXX" values in Age column
units_sex_pairs = units_sex_pairs.filter(lambda x: x[0] != '' and x[1]!= '')
units_sex_pairs = units_sex_pairs.filter(lambda x: x[1] != 'XXX')
units_sex_pairs = units_sex_pairs.map(lambda x: (x[0], int(x[1]))) # convert age to int

# Grouping by key ,i.e.,Sex, and calculating average age of each sex group
units_male_avg = units_sex_pairs.groupByKey().map(lambda x: (x[0], sum(x[1])/len(x[1])))
units_male_avg.collect()


[('Unknown', 32.55813953488372),
 ('Male', 40.975960299920004),
 ('Female', 40.38729268862415)]

From the result displayed, we can observe that average age of Male is around 40 years and that of Female is around 41 years. 

###### Calculate the oldest and the newest vehicle year involved in the accident and find their Registration State, Year and Unit type of the vehicle.

In [24]:
# Selecting the required columns and removing quotes, null values and 'XXXX' values
units_veh_pairs = units_final.map(lambda x: (x[3].replace('"',''), x[5].replace('"','')
                                            , x[4].replace('"','')))
units_veh_pairs = units_veh_pairs.filter(lambda x: x[1]!= '')
units_veh_pairs = units_veh_pairs.filter(lambda x: x[1] != 'XXXX')
units_veh_pairs = units_veh_pairs.map(lambda x: (x[0], int(x[1]), x[2])) # making year column an integer

#units_veh_pairs.collect()
# Getting oldest vehicle with minimum year and printing the result
oldest_veh = units_veh_pairs.min(key=lambda x: x[1]) 
print(oldest_veh)

# Getting newest vehicle with maximum year and printing the result
newest_veh = units_veh_pairs.max(key=lambda x: x[1])
print(newest_veh)

('VIC', 1900, 'Motor Cycle')
('SA', 2019, 'Station Wagon')


We can conclude that the oldest year of the vehile is 1900 while the newest year involved is 2019.

In [25]:
# Getting the data of all vehicles in 1900
old = units_veh_pairs.filter(lambda x: x[1]== 1900)
old.collect()

[('VIC', 1900, 'Motor Cycle'),
 ('SA', 1900, 'Motor Cycle'),
 ('SA', 1900, 'Motor Cycle'),
 ('SA', 1900, 'Motor Cycle'),
 ('SA', 1900, 'Motor Cycle'),
 ('SA', 1900, 'Motor Cycle'),
 ('SA', 1900, 'Motor Cycle'),
 ('SA', 1900, 'RIGID TRUCK LGE GE 4.5T'),
 ('SA', 1900, 'Motor Cycle'),
 ('SA', 1900, 'Motor Cycle'),
 ('SA', 1900, 'Motor Cycle'),
 ('SA', 1900, 'Motor Cycle'),
 ('SA', 1900, 'Motor Cycle'),
 ('SA', 1900, 'Motor Cycle')]

We can see that during 1900 the most common vehicle type is a Motor Cycle, and the most common licence state is SA.

In [2]:
# # Getting the data of all vehicles in 2019
new = units_veh_pairs.filter(lambda x: x[1]== 2019)
# new.collect()

In 2019, we can see a wide variety in the vehicle types, but the most common licence state is still SA. 

### 2.1 Data Preparation and Loading 

###### Load all units and crash data into two separate dataframes.

In [27]:
# Generating units dataframe using * 
units_df = spark.read.csv("*Units.csv",header=True)
crash_df = spark.read.csv("*Crash.csv",header=True)

###### Display the schema of the final two dataframes.

In [28]:
# Displaying schema of both dataframes 
units_df.printSchema()
crash_df.printSchema()

root
 |-- REPORT_ID: string (nullable = true)
 |-- Unit No: string (nullable = true)
 |-- No Of Cas: string (nullable = true)
 |-- Veh Reg State: string (nullable = true)
 |-- Unit Type: string (nullable = true)
 |-- Veh Year: string (nullable = true)
 |-- Direction Of Travel: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Lic State: string (nullable = true)
 |-- Licence Class: string (nullable = true)
 |-- Licence Type: string (nullable = true)
 |-- Towing: string (nullable = true)
 |-- Unit Movement: string (nullable = true)
 |-- Number Occupants: string (nullable = true)
 |-- Postcode: string (nullable = true)
 |-- Rollover: string (nullable = true)
 |-- Fire: string (nullable = true)

root
 |-- REPORT_ID: string (nullable = true)
 |-- Stats Area: string (nullable = true)
 |-- Suburb: string (nullable = true)
 |-- Postcode: string (nullable = true)
 |-- LGA Name: string (nullable = true)
 |-- Total Units: string (nullable = true)


### 2.2 Query/Analysis 

###### Find all the crash events in Adelaide where the total number of casualties in the event is more than 3.

In [29]:
# Importing functions from sql and Integer Type
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

# Converting Total Cas column to an integer type
crash_df = crash_df.withColumn('Total Cas',F.col('Total Cas').cast(IntegerType()))

# Filtering for suburb Adelaide and for Total Cas greater than 3
crash_cas_adel = crash_df.filter(crash_df.Suburb == 'ADELAIDE') 
crash_cas_adel = crash_cas_adel.filter(crash_df["Total Cas"] > 3)
crash_cas_adel.show()

+--------------------+----------+--------+--------+----------------+-----------+---------+----------+--------+--------+----+--------+--------+--------+----------+-------------+----------------+--------------+--------------------+------------+-------------+------------+--------+--------------+---------+------------+-------------+---------------+------------+--------------+----------+----------+--------------+
|           REPORT_ID|Stats Area|  Suburb|Postcode|        LGA Name|Total Units|Total Cas|Total Fats|Total SI|Total MI|Year|   Month|     Day|    Time|Area Speed|Position Type|Horizontal Align|Vertical Align|          Other Feat|Road Surface|Moisture Cond|Weather Cond|DayNight|    Crash Type|Unit Resp| Entity Code|CSEF Severity|  Traffic Ctrls|DUI Involved|Drugs Involved|  ACCLOC_X|  ACCLOC_Y|    UNIQUE_LOC|
+--------------------+----------+--------+--------+----------------+-----------+---------+----------+--------+--------+----+--------+--------+--------+----------+-------------+

###### Display 10 crash events with highest casualties.

In [30]:
# Sorting the Total Cas in descending order and printing first 10 highest casualities
high_cas = crash_df.sort(crash_df["Total Cas"].desc())
high_cas.show(10)

+--------------------+--------------+---------------+--------+--------------------+-----------+---------+----------+--------+--------+----+--------+---------+--------+----------+-------------+--------------------+--------------+--------------+------------+-------------+------------+--------+-----------+---------+------------+-------------+---------------+------------+--------------+----------+----------+--------------+
|           REPORT_ID|    Stats Area|         Suburb|Postcode|            LGA Name|Total Units|Total Cas|Total Fats|Total SI|Total MI|Year|   Month|      Day|    Time|Area Speed|Position Type|    Horizontal Align|Vertical Align|    Other Feat|Road Surface|Moisture Cond|Weather Cond|DayNight| Crash Type|Unit Resp| Entity Code|CSEF Severity|  Traffic Ctrls|DUI Involved|Drugs Involved|  ACCLOC_X|  ACCLOC_Y|    UNIQUE_LOC|
+--------------------+--------------+---------------+--------+--------------------+-----------+---------+----------+--------+--------+----+--------+------

###### Find the total number of fatalities for each crash type.

In [31]:
# Converting Total Fats to integer type
crash_df = crash_df.withColumn('Total Fats',F.col('Total Fats').cast(IntegerType()))

# Grouping by crash type and summing the fatalities for each crash type
typewise_fats = crash_df.groupby("Crash Type").agg(F.sum("Total Fats").alias('Total number of Fatalities'))
# Sorting in descending order to get better insight
typewise_fats = typewise_fats.sort(typewise_fats["Total number of Fatalities"].desc())
typewise_fats.show()

+--------------------+--------------------------+
|          Crash Type|Total number of Fatalities|
+--------------------+--------------------------+
|    Hit Fixed Object|                       152|
|             Head On|                        86|
|      Hit Pedestrian|                        70|
|           Roll Over|                        57|
|         Right Angle|                        45|
|          Side Swipe|                        20|
|          Right Turn|                        18|
|            Rear End|                        16|
|  Hit Parked Vehicle|                         9|
|          Hit Animal|                         4|
|               Other|                         2|
|  Hit Object on Road|                         2|
|Left Road - Out o...|                         1|
+--------------------+--------------------------+



We can observe that most casualities are from Hit fixed object, head on, hit pedestrian.

###### Find the total number of casualties for each suburb when the vehicle was driven by an unlicensed driver. 

In [32]:
# Joining units_df and crash_df
units_crash_joined = units_df.join(crash_df, units_df.REPORT_ID==crash_df.REPORT_ID,how='inner')

# Filtering for unlicenced licence type and grouping by suburb and adding casualities in each group
sub_cas = units_crash_joined.filter(units_crash_joined["Licence Type"] == "Unlicenced").select("Suburb","Total Cas").groupby("Suburb").agg(F.sum("Total Cas").alias('Total number of Casualities'))
sub_cas.sort(sub_cas["Total number of Casualities"].desc()).show()

+---------------+---------------------------+
|         Suburb|Total number of Casualities|
+---------------+---------------------------+
|       ADELAIDE|                         19|
|      SALISBURY|                         18|
|      DRY CREEK|                         18|
| SALISBURY EAST|                         16|
|       PROSPECT|                         14|
| NORTH ADELAIDE|                         13|
|        ENFIELD|                         12|
|   ANDREWS FARM|                         12|
|SALISBURY DOWNS|                         11|
|   BEDFORD PARK|                         11|
|SALISBURY SOUTH|                         11|
|     INGLE FARM|                         11|
|     MUNNO PARA|                         10|
|         BURTON|                         10|
|SALISBURY PLAIN|                         10|
|   MOUNT BARKER|                         10|
| ELIZABETH PARK|                         10|
|  MORPHETT VALE|                         10|
|   MAWSON LAKES|                 

Total number of casualities with an unlicenced driver is highest in Adelaide with total 19 casualities followed by Salisbury and Dry Creek.

### 2.3 Severity Analysis 

###### Find the total number of crash events for each severity level. 

In [33]:
# Grouping the dataframe by security level and counting records in each group
crash_severity = crash_df.groupby("CSEF Severity").agg(F.count("*").alias('Total number of Crash events'))
crash_severity.sort(crash_severity["Total number of Crash events"].desc()).show()

+-------------+----------------------------+
|CSEF Severity|Total number of Crash events|
+-------------+----------------------------+
|       1: PDO|                       46696|
|        2: MI|                       21881|
|        3: SI|                        2978|
|     4: Fatal|                         451|
+-------------+----------------------------+



From the above result, we can notice that 1:PDO crash severity, i.e., property damage only is the most common severity level. This is followed by 2:MI, i.e., minor injury, followed by 3:SI, i.e., serious injury and 4:FATAL. 

###### Compute the total number of crash events for each severity level and the percentage for the four different scenarios.
###### a. When the driver is tested positive on drugs.

In [34]:
# Importing functions 
from pyspark.sql.functions import rank,sum,col
# Filtering, grouping and summing based on the conditions
drug_positive = crash_df.filter(crash_df["Drugs Involved"] == "Y").groupby("CSEF Severity").agg(F.count("*").alias('Count on drugs'))
# Creating a new Percentage column and formatting it 
Total = drug_positive.agg(F.sum('Count on drugs').alias('Total')).collect()[0][0] 
drug_positive = drug_positive.withColumn('Percentage on drugs', F.format_string("%2.2f%%\n", col('Count on drugs')/Total * 100))
drug_positive.sort(drug_positive["Count on drugs"].desc()).show()

+-------------+--------------+-------------------+
|CSEF Severity|Count on drugs|Percentage on drugs|
+-------------+--------------+-------------------+
|        2: MI|           749|            59.73%
|
|        3: SI|           247|            19.70%
|
|       1: PDO|           176|            14.04%
|
|     4: Fatal|            82|             6.54%
|
+-------------+--------------+-------------------+



###### b. When the driver is tested positive for blood alcohol concentration.

In [35]:
# Filtering, grouping and summing based on the conditions
alco_positive = crash_df.filter(crash_df["DUI Involved"] == "Y").groupby("CSEF Severity").agg(F.count("*").alias('Count on alcohol'))
# Creating a new Percentage column and formatting it 
Total = alco_positive.agg(F.sum('Count on alcohol').alias('Total')).collect()[0][0] 
alco_positive = alco_positive.withColumn('Percentage on alcohol', F.format_string("%2.2f%%\n", col('Count on alcohol')/Total * 100))
alco_positive.sort(alco_positive["Count on alcohol"].desc()).show()

+-------------+----------------+---------------------+
|CSEF Severity|Count on alcohol|Percentage on alcohol|
+-------------+----------------+---------------------+
|       1: PDO|            1173|              52.18%
|
|        2: MI|             737|              32.78%
|
|        3: SI|             259|              11.52%
|
|     4: Fatal|              79|               3.51%
|
+-------------+----------------+---------------------+



###### c. When the driver is tested positive for both drugs and blood alcohol.

In [36]:
# Filtering, grouping and summing based on the conditions
drug_alco_pos = crash_df.filter((crash_df["DUI Involved"] == "Y") & (crash_df["Drugs Involved"] == "Y")).groupby("CSEF Severity").agg(F.count("*").alias('Count on both'))
# Creating a new Percentage column and formatting it 
Total = drug_alco_pos.agg(F.sum('Count on both').alias('Total')).collect()[0][0] 
drug_alco_pos = drug_alco_pos.withColumn('Percentage on both', F.format_string("%2.2f%%\n", col('Count on both')/Total * 100))
drug_alco_pos.sort(drug_alco_pos["Count on both"].desc()).show()

+-------------+-------------+------------------+
|CSEF Severity|Count on both|Percentage on both|
+-------------+-------------+------------------+
|        2: MI|           89|           50.86%
|
|        3: SI|           35|           20.00%
|
|     4: Fatal|           27|           15.43%
|
|       1: PDO|           24|           13.71%
|
+-------------+-------------+------------------+



###### d. When the driver is tested negative for both (no alcohol and no drugs).

In [37]:
crash_df.select("DUI Involved").distinct().show()
crash_df.select("Drugs Involved").distinct().show()

+------------+
|DUI Involved|
+------------+
|        null|
|           Y|
+------------+

+--------------+
|Drugs Involved|
+--------------+
|          null|
|             Y|
+--------------+



In [38]:
# Filtering, grouping and summing based on the conditions
drug_alco_neg = crash_df.select("CSEF Severity", "DUI Involved","Drugs Involved").filter((F.isnull("DUI Involved"))& (F.isnull("Drugs Involved"))).groupby("CSEF Severity").agg(F.count("*").alias('Count on negative'))
# Creating a new Percentage column and formatting it 
Total = drug_alco_neg.agg(F.sum('Count on negative').alias('Total')).collect()[0][0] 
drug_alco_neg = drug_alco_neg.withColumn('Percentage on negative', F.format_string("%2.2f%%\n", col('Count on negative')/Total * 100))
drug_alco_neg.sort(drug_alco_neg["Count on negative"].desc()).show()


+-------------+-----------------+----------------------+
|CSEF Severity|Count on negative|Percentage on negative|
+-------------+-----------------+----------------------+
|       1: PDO|            45371|               66.06%
|
|        2: MI|            20484|               29.83%
|
|        3: SI|             2507|                3.65%
|
|     4: Fatal|              317|                0.46%
|
+-------------+-----------------+----------------------+



###### Comparing the results in these 4 scenarios. 

From the above results, we observe that almost 60% of people tested positive for drugs were involved in minor injury, 52% of people tested positive for alcohol were involved in property damage, 51% of people that tested positive for both were involved in minor injury, and 66% of people tested negative for both caused property damage. Among the 4 scenarios, highest fatality rate was of 15% among people tested positive for both drug and alcohol followed by 6.54% of fatality rate of people taking drugs. Highest rate of serious injury was 20% found among people that tested positive for both, and was followed by 19.7% of serious injury rate of people taking drugs. From the calculated percentages, we can say that as people involved in both drugs and alcohol and people involvedin drugs have high risk of a fatal or serious crash. People involved with alcohol also have a risk of causing crash. While, people not involved in both drugs and alcohol have low chances of getting into a fatal crash.

### 2.4 RDDs vs DataFrame vs Spark SQL 

###### Find the Date1 and Time of Crash, Number of Casualties in each unit and the Gender, Age, License Type of the unit driver for the suburb "Adelaide".

##### (A) Using RDDs

In [3]:
%%time 

# Getting the required columns from the units RDD and formatting them
adel_units_rdd = units_final.map(lambda field: (field[0],field[7],field[8],field[11]))
adel_units_rdd = adel_units_rdd.map(lambda x: (x[0].replace('"',''), (x[1].replace('"','') \
                                               , x[2].replace('"',''), x[3].replace('"',''))))
# Getting the required columns from the crash RDD and formatting them
adel_crash_rdd = crash_final.map(lambda field: (field[0],field[2],field[6]\
                                              ,field[10],field[11],field[12],field[13]))
adel_crash_rdd = adel_crash_rdd.map(lambda x: (x[0].replace('"',''),(x[1].replace('"','')\
                                               ,x[2].replace('"',''),x[3].replace('"','')\
                                               ,x[4].replace('"',''),x[5].replace('"','')\
                                               ,x[6].replace('"',''))))
# Filtering the suburb
adel_crash_rdd = adel_crash_rdd.filter(lambda x: x[1][0]=='ADELAIDE')
# joining the two RDDs
join_rdd = adel_units_rdd.join(adel_crash_rdd)

# Formatting the columns and printing them
# join_rdd.map(lambda x: [x[1][0][0],x[1][0][1],x[1][0][2], x[1][1][1],\
#                         x[1][1][2]+"-"+x[1][1][3]+"-"+x[1][1][4],\
#                         x[1][1][5]]).collect()


###### (B) Using Dataframe

In [39]:
%%time
from pyspark.sql.functions import concat, col, lit # Imprting required functions

# Joining units_df and crash_df, and filter the suburb
join_df = units_df.join(crash_df, units_df.REPORT_ID==crash_df.REPORT_ID,how='inner')
join_df = join_df.filter(crash_df["Suburb"] == "ADELAIDE")
# Selecting and formatting the columns
join_df.select(F.concat(col("Year"), lit("-"), col("Month"),lit("-"), col("Day")).alias("Date")\
              , "Time", "Total Cas", "Sex", "Age", "Licence Type").show()

+--------------------+--------+---------+-------+----+------------+
|                Date|    Time|Total Cas|    Sex| Age|Licence Type|
+--------------------+--------+---------+-------+----+------------+
|2016-November-Wed...|01:45 pm|        1|   Male| 056|        Full|
|2016-November-Wed...|01:45 pm|        1|   Male| 072|        null|
|2016-November-Tue...|03:40 pm|        1|   Male| 056|        null|
|2016-November-Tue...|03:40 pm|        1| Female| 027|        null|
|2016-November-Tue...|05:00 pm|        0| Female| 032|        Full|
|2016-November-Tue...|05:00 pm|        0|Unknown| XXX|     Unknown|
|2016-November-Tue...|05:40 pm|        0|   Male| 022|     Unknown|
|2016-November-Tue...|05:40 pm|        0|   Male| 020|     Unknown|
|2016-November-Monday|11:26 pm|        0|Unknown| XXX|     Unknown|
|2016-November-Monday|11:26 pm|        0|   Male| 042|        Full|
|2016-November-Monday|11:26 pm|        0|   null|null|        null|
|2016-November-Monday|11:30 pm|        0|   Male

###### (C) Using SQL

In [40]:
%%time

units_df.createOrReplaceTempView("units_sql") # Registering original df as a temp view
crash_df.createOrReplaceTempView("crash_sql")
# Query to filter the suburb, format the columns
filter_sql = spark.sql('''
  SELECT crash_sql.Year||'-'||crash_sql.Month||'-'||crash_sql.Day as Date,
  crash_sql.Time, crash_sql.`Total Cas`, units_sql.Sex,units_sql.Age, units_sql.`Licence Type`
  FROM units_sql join crash_sql on units_sql.REPORT_ID == crash_sql.REPORT_ID
  WHERE Suburb = "ADELAIDE" 
''')
filter_sql.show()

+--------------------+--------+---------+-------+----+------------+
|                Date|    Time|Total Cas|    Sex| Age|Licence Type|
+--------------------+--------+---------+-------+----+------------+
|2016-November-Wed...|01:45 pm|        1|   Male| 056|        Full|
|2016-November-Wed...|01:45 pm|        1|   Male| 072|        null|
|2016-November-Tue...|03:40 pm|        1|   Male| 056|        null|
|2016-November-Tue...|03:40 pm|        1| Female| 027|        null|
|2016-November-Tue...|05:00 pm|        0| Female| 032|        Full|
|2016-November-Tue...|05:00 pm|        0|Unknown| XXX|     Unknown|
|2016-November-Tue...|05:40 pm|        0|   Male| 022|     Unknown|
|2016-November-Tue...|05:40 pm|        0|   Male| 020|     Unknown|
|2016-November-Monday|11:26 pm|        0|Unknown| XXX|     Unknown|
|2016-November-Monday|11:26 pm|        0|   Male| 042|        Full|
|2016-November-Monday|11:26 pm|        0|   null|null|        null|
|2016-November-Monday|11:30 pm|        0|   Male

With the help of time command, we can notice that the least wall time is observed when using dataframe, followed by SQL query, while RDD has the highest wall time. Similarly, least total time is observed in Sql query, followed by dataframe, and RDD has highest total time. From these observations, we can say that RDD are less efficient compared to SQL and Dataframe.

###### Find the total number of casualties for each suburb when the vehicle was driven by an unlicensed driver. 

##### (A) Using RDDs

In [4]:
%%time

# Getting the required columns from the units RDD and formatting them
sub_units_rdd = units_final.map(lambda field: (field[0].replace('"',''),(field[11].replace('"',''))))
# Getting the required columns from the crash RDD and formatting them
sub_crash_rdd = crash_final.map(lambda field: (field[0].replace('"',''),\
                                              (field[2].replace('"',''),field[6].replace('"',''))))

# Joining the 2 RDDs 
join_rdd = sub_units_rdd.join(sub_crash_rdd)

# Filtering the RDDs and formatting the columns
filter_rdd = join_rdd.filter(lambda x: x[1][0]=='Unlicenced')
filter_rdd = filter_rdd.map(lambda x: (x[1][1][0], (x[1][1][1])))
filter_rdd = filter_rdd.filter(lambda x: x[0] != '' and x[1]!= '')
filter_rdd = filter_rdd.map(lambda x: (x[0], int(x[1])))
# Using groupByKey to get total values of each suburb
result_rdd = filter_rdd.groupByKey().map(lambda x: (x[0], sum(x[1])))
# result_rdd.collect()

##### (B) Using Dataframes

In [41]:
%%time

# Importing the functions necessary
from pyspark.sql.functions import concat, col, lit
from pyspark.sql import functions as F

# Joining the dataframes and filtering the licence type
join_df = units_df.join(crash_df, units_df.REPORT_ID==crash_df.REPORT_ID,how='inner')
join_df = join_df.filter(units_df["Licence Type"] == "Unlicenced")
# Selecting, grouping, and aggregating required columns
join_df = join_df.select("Suburb","Total Cas").groupby("Suburb").agg(F.sum("Total Cas").alias('Number of casualities'))
join_df = join_df.sort(col("Number of casualities").desc())
join_df.show()

+---------------+---------------------+
|         Suburb|Number of casualities|
+---------------+---------------------+
|       ADELAIDE|                   19|
|      SALISBURY|                   18|
|      DRY CREEK|                   18|
| SALISBURY EAST|                   16|
|       PROSPECT|                   14|
| NORTH ADELAIDE|                   13|
|        ENFIELD|                   12|
|   ANDREWS FARM|                   12|
|SALISBURY DOWNS|                   11|
|     INGLE FARM|                   11|
|SALISBURY SOUTH|                   11|
|   BEDFORD PARK|                   11|
|   MOUNT BARKER|                   10|
|     MUNNO PARA|                   10|
|SALISBURY PLAIN|                   10|
|         BURTON|                   10|
|  MORPHETT VALE|                   10|
| ELIZABETH PARK|                   10|
|   MAWSON LAKES|                   10|
|PARA HILLS WEST|                    9|
+---------------+---------------------+
only showing top 20 rows

CPU times: use

##### (C) Using SQL

In [42]:
%%time
units_df.createOrReplaceTempView("units_sql") # Registering original df as a temp view
crash_df.createOrReplaceTempView("crash_sql")
# Query to filter the suburb, format the columns
filter_sql = spark.sql('''
  SELECT crash_sql.Suburb,sum(crash_sql.`Total Cas`) as `Number of casualities`
  FROM units_sql join crash_sql on units_sql.REPORT_ID == crash_sql.REPORT_ID
  WHERE `Licence Type` = "Unlicenced" 
  GROUP BY Suburb
  ORDER BY `Number of casualities` desc
''')
filter_sql.show()

+---------------+---------------------+
|         Suburb|Number of casualities|
+---------------+---------------------+
|       ADELAIDE|                   19|
|      SALISBURY|                   18|
|      DRY CREEK|                   18|
| SALISBURY EAST|                   16|
|       PROSPECT|                   14|
| NORTH ADELAIDE|                   13|
|        ENFIELD|                   12|
|   ANDREWS FARM|                   12|
|   BEDFORD PARK|                   11|
|SALISBURY DOWNS|                   11|
|     INGLE FARM|                   11|
|SALISBURY SOUTH|                   11|
|   MOUNT BARKER|                   10|
|     MUNNO PARA|                   10|
|  MORPHETT VALE|                   10|
|SALISBURY PLAIN|                   10|
| ELIZABETH PARK|                   10|
|         BURTON|                   10|
|   MAWSON LAKES|                   10|
|ELIZABETH GROVE|                    9|
+---------------+---------------------+
only showing top 20 rows

CPU times: use

With the help of time command, we can notice that the least wall time is observed when using dataframe, followed by SQL query, while RDD has the highest wall time. Similarly, least total time is observed in Sql query, followed by dataframe, and RDD has highest total time. From these observations, we can say that RDD are less efficient compared to SQL and Dataframe.