#  Analyzing Road Crash Data


##### 1.1 Data Preparation and Loading

**1. Write the code to create a SparkContext object using SparkSession, which tells Spark how to access a cluster. To create a SparkSession you first need to build a SparkConf object that contains information about your application. Give an appropriate name for your application and run.**

In [1]:
# Code to create a SparkContext object using SparkSession

# Import SparkConf class into program
from pyspark import SparkConf

# local[*]: run Spark in local mode with as many working processors as logical cores on your machine
# If we want Spark to run locally with 'k' worker threads, we can specify as "local[k]".
master = "local[*]"
# The `appName` field is a name to be shown on the Spark cluster UI page
# app_name = "Assignment 1"
# Setup configuration parameters for Spark
spark_conf = SparkConf().setMaster(master).setAppName(app_name)

# Import SparkContext and SparkSession classes
from pyspark import SparkContext # Spark
from pyspark.sql import SparkSession # Spark SQL

# Using SparkSession
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')


**2. Import all the “Units” csv files from 2015-2019 into a single RDD.**

In [2]:
# creating a Units csv rdd
units_rdd = sc.textFile('*_DATA_SA_Units.csv')

**3. Import all the “Crashes” csv files from 2015-2019 into a single RDD.**

In [3]:
# creating a Crashes csv rdd
crashes_rdd = sc.textFile('*_DATA_SA_Crash.csv')

**4. For each Units and Crashes RDDs, remove the header rows and display the total
count and first 10 records.**

In [1]:
# to remove the next line character
units_rdd = units_rdd.map(lambda line: line.split(','))

# filter out header
header_unit = units_rdd.first()
filt_units_rdd = units_rdd.filter(lambda row: row != header_unit)  

print("#### For Units ####")
print(f"Number of lines: {filt_units_rdd.count()}")  # to display the total count
filt_units_rdd.take(10) # to display first 10 records



In [2]:
# to remove the next line character
crashes_rdd = crashes_rdd.map(lambda line: line.split(','))

# to remove the header
header_crash = crashes_rdd.first()
filt_crashes_rdd = crashes_rdd.filter(lambda row: row != header_crash) 

print("\n\n#### For Crashes ####")
print(f"Number of lines: {filt_crashes_rdd.count()}") # to display the total count
filt_crashes_rdd.take(10) # to display first 10 records

##### 1.2 Data Partitioning in RDD

In [6]:
# function to print number of records in each partition

from pyspark.rdd import RDD
def print_partitions(data):
    if isinstance(data, RDD):
        numPartitions = data.getNumPartitions()
        partitions = data.glom().collect()
    else:
        numPartitions = data.rdd.getNumPartitions()
        partitions = data.rdd.glom().collect()
    
    print(f"####### NUMBER OF PARTITIONS: {numPartitions}")
    for index, partition in enumerate(partitions):
        # show partition if it is not empty
        if len(partition) > 0:
            print(f"Partition {index}: {len(partition)} records")

In [7]:
print("Number of partitions:{}".format(filt_units_rdd.getNumPartitions())) # to display number of partitions
print("Partitioner:{}".format(filt_units_rdd.partitioner))
print_partitions(filt_units_rdd)

Number of partitions:5
Partitioner:None
####### NUMBER OF PARTITIONS: 5
Partition 0: 35861 records
Partition 1: 28163 records
Partition 2: 33084 records
Partition 3: 27713 records
Partition 4: 29033 records


In [8]:
print("Number of partitions:{}".format(filt_crashes_rdd.getNumPartitions())) # to display number of partitions
print("Partitioner:{}".format(filt_crashes_rdd.partitioner))
print_partitions(filt_crashes_rdd)

Number of partitions:5
Partitioner:None
####### NUMBER OF PARTITIONS: 5
Partition 0: 12964 records
Partition 1: 16775 records
Partition 2: 13237 records
Partition 3: 13599 records
Partition 4: 15431 records


By default Spark is partitioning data according to **Random equal partitioning** method. As we can see the number of records in each partition has less variation and both the rdds have 5 partitions each.

**a. Create a Key Value Pair RDD with Lic State as the key and rest of the other columns as value.**

In [9]:
# to create Key Value Pair RDD from units rdd
result_pair = filt_units_rdd.map(lambda x: (x[9], (x[0], x[1], x[2], x[3], x[4], x[5], x[6],\
                                            x[7], x[8], x[10], x[11], x[12], x[13], x[14], x[15], x[16], x[17])))
                                            
result_pair.take(2)

[('"SA"',
  ('"2016-1-15/08/2019"',
   '"01"',
   '0',
   '"SA"',
   '"OMNIBUS"',
   '"2011"',
   '"North"',
   '"Male"',
   '"056"',
   '"HR"',
   '"Full"',
   '"Not Towing"',
   '"Straight Ahead"',
   '"010"',
   '"5121"',
   '',
   '')),
 ('',
  ('"2016-1-15/08/2019"',
   '"02"',
   '1',
   '',
   '"Pedestrian on Road"',
   '',
   '"East"',
   '"Male"',
   '"072"',
   '',
   '',
   '',
   '"Walking on Road"',
   '',
   '"5084"',
   '',
   ''))]

**b. Write the code to implement this partitioning in RDD using appropriate partitioning functions.**

In [10]:
# Using hash partitioning function to partition rdd
def hash_function(key):
    val = 0
    if key == '"SA"':
        val = 2
    else:
        val = 3
    return val

In [11]:
# performing hash partitioning with our function
num_partitions = 2
hash_partition_rdd = result_pair.partitionBy(num_partitions, hash_function)

**c. Write the code to print the number of records in each partition. What does it tell about the data skewness?**

In [12]:
# using the above created print_partitions function to get records in each partition
print_partitions(hash_partition_rdd)

####### NUMBER OF PARTITIONS: 2
Partition 0: 109684 records
Partition 1: 44170 records


By looking at the nnumber of records in each partition we can identify that **Partition 0** that denotes records belonging to **SA** are highly greater than that for other states. This implies that our data is highly skewed.

##### Average age of male and female drivers separately.

In [13]:
# filtering done to remove empty and non-essential records from the AGE column
clean_rdd = filt_units_rdd.filter(lambda x: '"XXX"' not in x[8]).filter(lambda x: x[8] != '')

# to get the desired columns of GENDER and AGE
male_female_rdd = clean_rdd.map(lambda x: (x[7].replace('"',''), int(x[8].replace('"',''))))

In [15]:
# to aggregate only the male records in gender column 
male_rdd = male_female_rdd.filter(lambda x: x[0] == 'Male')
grouped_male = male_rdd.groupByKey().map(lambda x: (x[0], sum(x[1])/len(x[1])))
grouped_male.collect()

[('Male', 40.975960299920004)]

In [16]:
# to aggregate only the female records in gender column 
female_rdd = male_female_rdd.filter(lambda x: x[0] == 'Female')
grouped_female = female_rdd.groupByKey().map(lambda x: (x[0], sum(x[1])/len(x[1])))
grouped_female.collect()

[('Female', 40.38729268862415)]

**Oldest and the newest vehicle year involved in the accident? Display the Registration State, Year and Unit type of the vehicle.**

In [18]:
# to get the required columns from the rdd
vehicle_reqq = filt_units_rdd.filter(lambda x: (x[5]!='' and 'XXXX' not in x[5]))\
                            .map(lambda x: (x[3],x[4],int(x[5].replace('"',''))))

# to find the details of newest vehicle
vehicle_reqq.max(key=lambda x: x[2])

('"SA"', '"Station Wagon"', 2019)

In [19]:
# to find the details of oldest vehicle
vehicle_reqq.min(key=lambda x: x[2])

('"VIC"', '"Motor Cycle"', 1900)

##### 2.1 Data Preparation and Loading

**1. Load all units and crash data into two separate dataframes**

In [20]:
# to load data in two dataframes
units_df = spark.read.csv("*_DATA_SA_Units.csv",header=True)
crashes_df = spark.read.csv("*_DATA_SA_Crash.csv",header=True)

#### 2. Display the schema of the final two dataframes

In [21]:
# to display the schema
units_df.printSchema()
crashes_df.printSchema()

root
 |-- REPORT_ID: string (nullable = true)
 |-- Unit No: string (nullable = true)
 |-- No Of Cas: string (nullable = true)
 |-- Veh Reg State: string (nullable = true)
 |-- Unit Type: string (nullable = true)
 |-- Veh Year: string (nullable = true)
 |-- Direction Of Travel: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Lic State: string (nullable = true)
 |-- Licence Class: string (nullable = true)
 |-- Licence Type: string (nullable = true)
 |-- Towing: string (nullable = true)
 |-- Unit Movement: string (nullable = true)
 |-- Number Occupants: string (nullable = true)
 |-- Postcode: string (nullable = true)
 |-- Rollover: string (nullable = true)
 |-- Fire: string (nullable = true)

root
 |-- REPORT_ID: string (nullable = true)
 |-- Stats Area: string (nullable = true)
 |-- Suburb: string (nullable = true)
 |-- Postcode: string (nullable = true)
 |-- LGA Name: string (nullable = true)
 |-- Total Units: string (nullable = true)


In [22]:
# to import necessary functionality
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType

In [23]:
# to convert 'Total Cas' column to integer type
crashes_df = crashes_df.withColumn("Total Cas", crashes_df["Total Cas"].cast(IntegerType()))

# to show all crashes in Adelaide with casualities greater than 3
crashes_df.where((col("Suburb") == 'ADELAIDE') & (col("Total Cas") > 3)).show()

+--------------------+----------+--------+--------+----------------+-----------+---------+----------+--------+--------+----+--------+--------+--------+----------+-------------+----------------+--------------+--------------------+------------+-------------+------------+--------+--------------+---------+------------+-------------+---------------+------------+--------------+----------+----------+--------------+
|           REPORT_ID|Stats Area|  Suburb|Postcode|        LGA Name|Total Units|Total Cas|Total Fats|Total SI|Total MI|Year|   Month|     Day|    Time|Area Speed|Position Type|Horizontal Align|Vertical Align|          Other Feat|Road Surface|Moisture Cond|Weather Cond|DayNight|    Crash Type|Unit Resp| Entity Code|CSEF Severity|  Traffic Ctrls|DUI Involved|Drugs Involved|  ACCLOC_X|  ACCLOC_Y|    UNIQUE_LOC|
+--------------------+----------+--------+--------+----------------+-----------+---------+----------+--------+--------+----+--------+--------+--------+----------+-------------+

**2. Display 10 crash events with highest casualties.**

In [24]:
# to show top ten crashes
crashes_df.orderBy("Total Cas", ascending=False).show(10)

+--------------------+--------------+---------------+--------+--------------------+-----------+---------+----------+--------+--------+----+--------+---------+--------+----------+-------------+--------------------+--------------+--------------+------------+-------------+------------+--------+-----------+---------+------------+-------------+---------------+------------+--------------+----------+----------+--------------+
|           REPORT_ID|    Stats Area|         Suburb|Postcode|            LGA Name|Total Units|Total Cas|Total Fats|Total SI|Total MI|Year|   Month|      Day|    Time|Area Speed|Position Type|    Horizontal Align|Vertical Align|    Other Feat|Road Surface|Moisture Cond|Weather Cond|DayNight| Crash Type|Unit Resp| Entity Code|CSEF Severity|  Traffic Ctrls|DUI Involved|Drugs Involved|  ACCLOC_X|  ACCLOC_Y|    UNIQUE_LOC|
+--------------------+--------------+---------------+--------+--------------------+-----------+---------+----------+--------+--------+----+--------+------

**3. Find the total number of fatalities for each crash type**

In [25]:
# to convert 'Total Fats' column to integer type
crashes_df = crashes_df.withColumn("Total Fats", crashes_df["Total Fats"].cast(IntegerType()))

In [26]:
# to group the different crash types and display their total fatalities
crash_group_df = crashes_df.groupBy('Crash Type')
crash_group_df.agg(sum('Total Fats').alias('Number_of_Fatalities')).orderBy("Number_of_Fatalities", ascending=False).show()


+--------------------+--------------------+
|          Crash Type|Number_of_Fatalities|
+--------------------+--------------------+
|    Hit Fixed Object|                 152|
|             Head On|                  86|
|      Hit Pedestrian|                  70|
|           Roll Over|                  57|
|         Right Angle|                  45|
|          Side Swipe|                  20|
|          Right Turn|                  18|
|            Rear End|                  16|
|  Hit Parked Vehicle|                   9|
|          Hit Animal|                   4|
|  Hit Object on Road|                   2|
|               Other|                   2|
|Left Road - Out o...|                   1|
+--------------------+--------------------+



**4. Find the total number of casualties for each suburb when the vehicle was driven by an unlicensed driver. You are required to display the name of the suburb and the total number of casualties.**


In [27]:
# join the two dataframes
joined_df = units_df.join(crashes_df, units_df.REPORT_ID==crashes_df.REPORT_ID,how='inner')

# to display the casualities when the driver is unlicenced
joined_df.select("Suburb", "Total Cas")\
        .where((col("Licence Type") == 'Unlicenced'))\
        .groupBy('Suburb').agg(sum('Total Cas').alias('Number_of_Casualities'))\
        .orderBy("Number_of_Casualities", ascending=False)\
        .show()

+---------------+---------------------+
|         Suburb|Number_of_Casualities|
+---------------+---------------------+
|       ADELAIDE|                   19|
|      SALISBURY|                   18|
|      DRY CREEK|                   18|
| SALISBURY EAST|                   16|
|       PROSPECT|                   14|
| NORTH ADELAIDE|                   13|
|        ENFIELD|                   12|
|   ANDREWS FARM|                   12|
|SALISBURY DOWNS|                   11|
|   BEDFORD PARK|                   11|
|SALISBURY SOUTH|                   11|
|     INGLE FARM|                   11|
|     MUNNO PARA|                   10|
|         BURTON|                   10|
|SALISBURY PLAIN|                   10|
|   MOUNT BARKER|                   10|
| ELIZABETH PARK|                   10|
|  MORPHETT VALE|                   10|
|   MAWSON LAKES|                   10|
| ELIZABETH EAST|                    9|
+---------------+---------------------+
only showing top 20 rows



**1. Find the total number of crash events for each severity level. Which severity level is the most common?**


In [28]:
# to find the total number of crash events for each severity level
crashes_df.groupBy("CSEF Severity").agg(count("*").alias('Number_of_events'))\
          .orderBy('Number_of_events',ascending=False).show()

+-------------+----------------+
|CSEF Severity|Number_of_events|
+-------------+----------------+
|       1: PDO|           46696|
|        2: MI|           21881|
|        3: SI|            2978|
|     4: Fatal|             451|
+-------------+----------------+



**2. Compute the total number of crash events for each severity level and the percentage for the four different scenarios.**

**a. When the driver is tested positive on drugs**

In [29]:
# to get the required conditions in dataframe
crash_count_df = crashes_df.select('CSEF Severity')\
        .where(col("Drugs Involved") == 'Y')\
        .groupBy("CSEF Severity").agg(count("*").alias('Count'))

In [30]:
# used the below links to get strings in required format
#https://stackoverflow.com/a/43992110
#https://sparkbyexamples.com/spark/usage-of-spark-sql-string-functions/
total = crash_count_df.select("Count").agg({"Count": "sum"}).collect().pop()['sum(Count)']
crash_count_df.withColumn('Percentage', format_string("%2.3f%%", round((crash_count_df['Count']/total * 100),2))).show()

+-------------+-----+----------+
|CSEF Severity|Count|Percentage|
+-------------+-----+----------+
|     4: Fatal|   82|    6.540%|
|        2: MI|  749|   59.730%|
|       1: PDO|  176|   14.040%|
|        3: SI|  247|   19.700%|
+-------------+-----+----------+



**b. When the driver is tested positive for blood alcohol concentration.**

In [31]:
# to get the required conditions in dataframe
crash_alcohol_df = crashes_df.select('CSEF Severity')\
        .where(col('DUI Involved') != '')\
        .groupBy("CSEF Severity").agg(count("*").alias('Count'))

In [32]:
#to get the output in required format
total = crash_alcohol_df.select("Count").agg({"Count": "sum"}).collect().pop()['sum(Count)']
crash_alcohol_df.withColumn('Percentage', format_string("%2.3f%%", round((crash_alcohol_df['Count']/total * 100),2))).show()

+-------------+-----+----------+
|CSEF Severity|Count|Percentage|
+-------------+-----+----------+
|     4: Fatal|   79|    3.510%|
|        2: MI|  737|   32.780%|
|       1: PDO| 1173|   52.180%|
|        3: SI|  259|   11.520%|
+-------------+-----+----------+



**c. When the driver is tested positive for both drugs and blood alcohol**

In [33]:
# to get the required conditions in dataframe
crash_both_df = crashes_df.select('CSEF Severity')\
        .where((col('DUI Involved') != '') & (col("Drugs Involved") == 'Y'))\
        .groupBy("CSEF Severity").agg(count("*").alias('Count'))

In [34]:
# to get the output in required format
total = crash_both_df.select("Count").agg({"Count": "sum"}).collect().pop()['sum(Count)']
crash_both_df.withColumn('Percentage', format_string("%2.3f%%", round((crash_both_df['Count']/total * 100),2))).show()

+-------------+-----+----------+
|CSEF Severity|Count|Percentage|
+-------------+-----+----------+
|     4: Fatal|   27|   15.430%|
|        2: MI|   89|   50.860%|
|       1: PDO|   24|   13.710%|
|        3: SI|   35|   20.000%|
+-------------+-----+----------+



**d. When the driver is tested negative for both (no alcohol and no drugs).**

In [35]:
# to get the required conditions in dataframe
negative_crash_df = crashes_df.select('CSEF Severity')\
        .where((col('DUI Involved').isNull()) & (col("Drugs Involved").isNull()))\
        .groupBy("CSEF Severity").agg(count("*").alias('Count'))

In [36]:
# to get the output in required format
total = negative_crash_df.select("Count").agg({"Count": "sum"}).collect().pop()['sum(Count)']
negative_crash_df.withColumn('Percentage', format_string("%2.3f%%", round((negative_crash_df['Count']/total * 100),2))).show()

+-------------+-----+----------+
|CSEF Severity|Count|Percentage|
+-------------+-----+----------+
|     4: Fatal|  317|    0.460%|
|        2: MI|20484|   29.830%|
|       1: PDO|45371|   66.060%|
|        3: SI| 2507|    3.650%|
+-------------+-----+----------+



**1. Find the Date and Time of Crash, Number of Casualties in each unit and the Gender, Age, License Type of the unit driver for the suburb "Adelaide".**

In [3]:
%%time


# Spark RDD

req_crashes_rdd = filt_crashes_rdd.map(lambda x: (x[0].replace('"',''),(x[2].replace('"',''),x[10].replace('"',''),\
                              x[11].replace('"',''),x[12].replace('"',''),x[13].replace('"',''),x[6].replace('"',''))))

req_unit_rdd = filt_units_rdd.map(lambda x: (x[0].replace('"',''),(x[7].replace('"',''), x[8].replace('"',''),\
                                            x[11].replace('"',''))))

# join the rdd on key
join_rdd = req_crashes_rdd.join(req_unit_rdd).filter(lambda x: x[1][0][0] == 'ADELAIDE')
join_rdd.map(lambda x: (x[1][0][1] +"-"+ x[1][0][2] +"-"+ x[1][0][3], x[1][0][4], x[1][0][5],\
                        x[1][1][0], x[1][1][1],x[1][1][2])).collect()


In [38]:
%%time

# Spark DataFrame

# join the two dataframes
joined_df = units_df.join(crashes_df, units_df.REPORT_ID==crashes_df.REPORT_ID,how='inner')
# used the below link to find out concat function
#https://www.edureka.co/community/2280/concatenate-columns-in-apache-spark-dataframe
joined_df.select(concat("Year", lit("-"), "Month", lit("-"),"Day").alias("Date"), "Time", "Total Cas", "Sex", "Age", "Licence Type")\
        .where(col("Suburb")=='ADELAIDE').show()

+--------------------+--------+---------+-------+----+------------+
|                Date|    Time|Total Cas|    Sex| Age|Licence Type|
+--------------------+--------+---------+-------+----+------------+
|2016-November-Wed...|01:45 pm|        1|   Male| 056|        Full|
|2016-November-Wed...|01:45 pm|        1|   Male| 072|        null|
|2016-November-Tue...|03:40 pm|        1|   Male| 056|        null|
|2016-November-Tue...|03:40 pm|        1| Female| 027|        null|
|2016-November-Tue...|05:00 pm|        0| Female| 032|        Full|
|2016-November-Tue...|05:00 pm|        0|Unknown| XXX|     Unknown|
|2016-November-Tue...|05:40 pm|        0|   Male| 022|     Unknown|
|2016-November-Tue...|05:40 pm|        0|   Male| 020|     Unknown|
|2016-November-Monday|11:26 pm|        0|Unknown| XXX|     Unknown|
|2016-November-Monday|11:26 pm|        0|   Male| 042|        Full|
|2016-November-Monday|11:26 pm|        0|   null|null|        null|
|2016-November-Monday|11:30 pm|        0|   Male

In [39]:
%%time

# Spark SQL
# Create Views from Dataframes
crashes_df.createOrReplaceTempView("sql_crashes")
units_df.createOrReplaceTempView("sql_units")

# perform the sql operations
spark.sql('''
  SELECT Year || '-' || Month||'-'|| Day AS Date, Time, `Total Cas`, Sex, Age, `Licence Type`
  FROM sql_crashes d JOIN sql_units w ON d.REPORT_ID=w.REPORT_ID
  WHERE d.Suburb == 'ADELAIDE'
''').show()

+--------------------+--------+---------+-------+----+------------+
|                Date|    Time|Total Cas|    Sex| Age|Licence Type|
+--------------------+--------+---------+-------+----+------------+
|2016-November-Wed...|01:45 pm|        1|   Male| 056|        Full|
|2016-November-Wed...|01:45 pm|        1|   Male| 072|        null|
|2016-November-Tue...|03:40 pm|        1|   Male| 056|        null|
|2016-November-Tue...|03:40 pm|        1| Female| 027|        null|
|2016-November-Tue...|05:00 pm|        0| Female| 032|        Full|
|2016-November-Tue...|05:00 pm|        0|Unknown| XXX|     Unknown|
|2016-November-Tue...|05:40 pm|        0|   Male| 022|     Unknown|
|2016-November-Tue...|05:40 pm|        0|   Male| 020|     Unknown|
|2016-November-Monday|11:26 pm|        0|Unknown| XXX|     Unknown|
|2016-November-Monday|11:26 pm|        0|   Male| 042|        Full|
|2016-November-Monday|11:26 pm|        0|   null|null|        null|
|2016-November-Monday|11:30 pm|        0|   Male

**2. Find the total number of casualties for each suburb when the vehicle was driven by an unlicensed driver. You are required to display the name of the suburb and the total number of casualties.**

In [41]:
%%time


# Spark DataFrame

# join the two dataframes
joined_df = units_df.join(crashes_df, units_df.REPORT_ID==crashes_df.REPORT_ID,how='inner')

joined_df.select("Suburb", "Total Cas")\
        .where((col("Licence Type") == 'Unlicenced'))\
        .groupBy('Suburb').agg(sum('Total Cas').alias('Number_of_Casualities'))\
        .orderBy("Number_of_Casualities", ascending=False)\
        .show()

+---------------+---------------------+
|         Suburb|Number_of_Casualities|
+---------------+---------------------+
|       ADELAIDE|                   19|
|      SALISBURY|                   18|
|      DRY CREEK|                   18|
| SALISBURY EAST|                   16|
|       PROSPECT|                   14|
| NORTH ADELAIDE|                   13|
|        ENFIELD|                   12|
|   ANDREWS FARM|                   12|
|     INGLE FARM|                   11|
|   BEDFORD PARK|                   11|
|SALISBURY SOUTH|                   11|
|SALISBURY DOWNS|                   11|
|SALISBURY PLAIN|                   10|
|   MOUNT BARKER|                   10|
|     MUNNO PARA|                   10|
| ELIZABETH PARK|                   10|
|         BURTON|                   10|
|  MORPHETT VALE|                   10|
|   MAWSON LAKES|                   10|
| ELIZABETH EAST|                    9|
+---------------+---------------------+
only showing top 20 rows

CPU times: use

In [42]:
%%time

# Spark SQL
# Create Views from Dataframes
crashes_df.createOrReplaceTempView("sql_crashes")
units_df.createOrReplaceTempView("sql_units")

spark.sql('''
  SELECT Suburb, sum(`Total Cas`) AS Number_of_Casualities
  FROM sql_crashes d JOIN sql_units w ON d.REPORT_ID=w.REPORT_ID
  WHERE w.`Licence Type` == 'Unlicenced'
  GROUP BY Suburb
  ORDER BY Number_of_Casualities desc
''').show()


+---------------+---------------------+
|         Suburb|Number_of_Casualities|
+---------------+---------------------+
|       ADELAIDE|                   19|
|      SALISBURY|                   18|
|      DRY CREEK|                   18|
| SALISBURY EAST|                   16|
|       PROSPECT|                   14|
| NORTH ADELAIDE|                   13|
|        ENFIELD|                   12|
|   ANDREWS FARM|                   12|
|     INGLE FARM|                   11|
|SALISBURY DOWNS|                   11|
|SALISBURY SOUTH|                   11|
|   BEDFORD PARK|                   11|
|SALISBURY PLAIN|                   10|
|     MUNNO PARA|                   10|
|   MOUNT BARKER|                   10|
| ELIZABETH PARK|                   10|
|         BURTON|                   10|
|  MORPHETT VALE|                   10|
|   MAWSON LAKES|                   10|
|ELIZABETH GROVE|                    9|
+---------------+---------------------+
only showing top 20 rows

CPU times: use