In [1]:
# Setup steps for Google Colab
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.4.0'
spark_version = 'spark-3.5.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"


0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
0% [Waiting for headers] [Waiting for headers] [1 InRelease 0 B/3,626 B 0%] [Connecting to ppa.launc0% [Waiting for headers] [Waiting for headers] [Connecting to ppa.launchpadcontent.net] [Waiting for                                                                                                    Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
                                                                                                    0% [Waiting for headers] [Waiting for headers] [Connecting to ppa.launchpadcontent.net]                                                                                       Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Waiting for headers] [Waiting for headers] [Connecting to ppa.launchpadcontent.net]                                                            

In [2]:
# Start a SparkSession
import findspark
findspark.init()

In [3]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [4]:
# Read data from csv to DataFrame

#traffic_data_df = spark.read.csv("data\Traffic_Violations_Processed.csv", header=True, inferSchema=True)
traffic_data_df = spark.read.csv("Traffic_Violations_Processed.csv", header=True, inferSchema=True)

# Show DataFrame
traffic_data_df.show(5)


+--------------------+--------------------+----------------+-----------------+--------+-----+---------------+---------------+-----+-------+---------+----------------+------------------+--------------+-------------+-----------+--------------------+----+-------+-----+--------------+-----+------+------------+-------------+---------------------+----------------------------+------------+----------+
|         Description|            Location|        Latitude|        Longitude|Accident|Belts|Personal Injury|Property Damage|Fatal|Alcohol|Work Zone|Search Conducted|Search Disposition|Search Outcome|Search Reason|Search Type|Search Arrest Reason|Year|   Make|Color|Violation Type| Race|Gender|Year of Stop|Month of Stop|Driver State Category|License Plate State Category|Hour of Stop|Color Type|
+--------------------+--------------------+----------------+-----------------+--------+-----+---------------+---------------+-----+-------+---------+----------------+------------------+--------------+------

In [5]:
traffic_data_df.printSchema()

root
 |-- Description: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Accident: string (nullable = true)
 |-- Belts: string (nullable = true)
 |-- Personal Injury: string (nullable = true)
 |-- Property Damage: string (nullable = true)
 |-- Fatal: string (nullable = true)
 |-- Alcohol: string (nullable = true)
 |-- Work Zone: string (nullable = true)
 |-- Search Conducted: string (nullable = true)
 |-- Search Disposition: string (nullable = true)
 |-- Search Outcome: string (nullable = true)
 |-- Search Reason: string (nullable = true)
 |-- Search Type: string (nullable = true)
 |-- Search Arrest Reason: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Make: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Violation Type: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Year of Stop: integer (n

In [6]:
# Create a temporary view of the DataFrame.
traffic_data_df.createOrReplaceTempView("traffic_data")

In [7]:
# Show traffic_data
spark.sql("select * from traffic_data limit 5").show()

+--------------------+--------------------+----------------+-----------------+--------+-----+---------------+---------------+-----+-------+---------+----------------+------------------+--------------+-------------+-----------+--------------------+----+-------+-----+--------------+-----+------+------------+-------------+---------------------+----------------------------+------------+----------+
|         Description|            Location|        Latitude|        Longitude|Accident|Belts|Personal Injury|Property Damage|Fatal|Alcohol|Work Zone|Search Conducted|Search Disposition|Search Outcome|Search Reason|Search Type|Search Arrest Reason|Year|   Make|Color|Violation Type| Race|Gender|Year of Stop|Month of Stop|Driver State Category|License Plate State Category|Hour of Stop|Color Type|
+--------------------+--------------------+----------------+-----------------+--------+-----+---------------+---------------+-----+-------+---------+----------------+------------------+--------------+------

In [8]:
# Total number of Rows
spark.sql("""select count(1) as Number_of_Rows from traffic_data
            """).show()

+--------------+
|Number_of_Rows|
+--------------+
|       1665836|
+--------------+



In [9]:
# Count of Violation Types
spark.sql("select `Violation Type`, count(`Violation Type`) as Count from traffic_data  group by `Violation Type` ").show()

+--------------+------+
|Violation Type| Count|
+--------------+------+
|      Citation|756296|
+--------------+------+



In [10]:
# Dataset Year Range
spark.sql("select Min(`Year of Stop`) as Data_From, Max(`Year of Stop`) as Data_Till from traffic_data ").show()

+---------+---------+
|Data_From|Data_Till|
+---------+---------+
|     2012|     2023|
+---------+---------+



In [12]:
# Number of distinct Descriptions
spark.sql("""select count( Distinct Description) as Number_of_Descriptions from traffic_data
            """).show()

+----------------------+
|Number_of_Descriptions|
+----------------------+
|                    25|
+----------------------+



In [13]:
#  List of Distinct Descriptions
spark.sql("""select distinct Description from traffic_data order by 1 """).show(30,truncate = False)

+------------------------------------------------------------------------------------------------+
|Description                                                                                     |
+------------------------------------------------------------------------------------------------+
|DRIVER ENTERING INTERSECTION AT FLASHING RED TRAFFIC SIGNAL WITHOUT STOPPING                    |
|DRIVER FAILURE TO OBEY PROPERLY PLACED TRAFFIC CONTROL DEVICE INSTRUCTIONS                      |
|DRIVING VEH. W/O ADEQUATE REAR REG. PLATE ILLUMINATION                                          |
|DUI                                                                                             |
|FAILURE TO DRIVE VEHICLE ON RIGHT HALF OF ROADWAY WHEN REQUIRED                                 |
|Failure to Yield                                                                                |
|Failure to stop at different circumstances                                                      |
|Improper 

In [50]:
# Determine the run time for this query.
start_time = time.time()

# Select number of incidents for each description for each violation type
spark.sql(""" select Description,`Violation Type`, count(2) as Number_of_Incidents
                from traffic_data
                group by Description, `Violation Type`
                order by Description asc, `Violation Type` asc,  Number_of_Incidents desc
          """).show(50,truncate = False)

print("--- %s seconds ---" % (time.time() - start_time))

+------------------------------------------------------------------------------------------------+--------------+-------------------+
|Description                                                                                     |Violation Type|Number_of_Incidents|
+------------------------------------------------------------------------------------------------+--------------+-------------------+
|DRIVER ENTERING INTERSECTION AT FLASHING RED TRAFFIC SIGNAL WITHOUT STOPPING                    |Citation      |3537               |
|DRIVER FAILURE TO OBEY PROPERLY PLACED TRAFFIC CONTROL DEVICE INSTRUCTIONS                      |Citation      |38587              |
|DRIVING VEH. W/O ADEQUATE REAR REG. PLATE ILLUMINATION                                          |Citation      |1715               |
|DUI                                                                                             |Citation      |53009              |
|FAILURE TO DRIVE VEHICLE ON RIGHT HALF OF ROADWAY WHEN REQUIR

In [51]:
# Determine the run time for this query.

start_time = time.time()

# Select 25 locations with highest citations of "Speeding" recently
spark.sql(""" select Description, Location, count(2) as Number_of_Incidents, `Year of Stop`,`Month of Stop`
                from traffic_data
                where Description = 'Speeding' and `Violation Type` = 'Citation'
                group by Description, Location,`Year of Stop`,`Month of Stop`
                order by `Year of Stop` desc, `Month of Stop` desc, Number_of_Incidents desc, Location asc
          """).show(25,truncate = False)

print("--- %s seconds ---" % (time.time() - start_time))

+-----------+----------------------------------+-------------------+------------+-------------+
|Description|Location                          |Number_of_Incidents|Year of Stop|Month of Stop|
+-----------+----------------------------------+-------------------+------------+-------------+
|Speeding   |S/B GERMANTOWN RD @ DAWSON FARM RD|13                 |2023        |11           |
|Speeding   |DARNESTOWN RD @ AMERICAN WAY      |9                  |2023        |11           |
|Speeding   |RANDOLPH RD W/B & KEMP MILL RD    |9                  |2023        |11           |
|Speeding   |CONN. AVE S/B & DEAN RD AREA      |6                  |2023        |11           |
|Speeding   |RIVER RD @ BRAEBURN PKWY          |6                  |2023        |11           |
|Speeding   |21500 BEALLSVILLE                 |5                  |2023        |11           |
|Speeding   |GREAT SENECA HWY/GREY EAGLE CT    |5                  |2023        |11           |
|Speeding   |NB SHADY GROVE RD AT MIDCOU

In [54]:
# Verfying the count of a single row of the above result
# spark.sql(""" select count(1) from traffic_data
#  where Description = 'Speeding' and Location = 'S/B GERMANTOWN RD @ DAWSON FARM RD' and
# 		`Violation Type` = 'Citation' and `Year of Stop` = 2023 and `Month of Stop` = 11
# 		 """).show()

+--------+
|count(1)|
+--------+
|      13|
+--------+



In [29]:
# Partition by the "Description" field on the formatted parquet traffic data
traffic_data_df.write.parquet('parquet_traffic_data', mode='overwrite', partitionBy = "Description")

In [30]:
# Read the parquet formatted data.
p_traffic_data_df = spark.read.parquet("parquet_traffic_data")

In [31]:
# Create a temporary table for the parquet data.
p_traffic_data_df.createOrReplaceTempView("p_traffic_data")

In [52]:
# Determine the run time for this query.
start_time = time.time()

# Select number of incidents for each description for each violation type
spark.sql(""" select Description,`Violation Type`, count(2) as Number_of_Incidents
                from p_traffic_data
                group by Description, `Violation Type`
                order by Description asc, `Violation Type` asc,  Number_of_Incidents desc
          """).show(50,truncate = False)

print("--- %s seconds ---" % (time.time() - start_time))

+------------------------------------------------------------------------------------------------+--------------+-------------------+
|Description                                                                                     |Violation Type|Number_of_Incidents|
+------------------------------------------------------------------------------------------------+--------------+-------------------+
|DRIVER ENTERING INTERSECTION AT FLASHING RED TRAFFIC SIGNAL WITHOUT STOPPING                    |Citation      |3537               |
|DRIVER FAILURE TO OBEY PROPERLY PLACED TRAFFIC CONTROL DEVICE INSTRUCTIONS                      |Citation      |38587              |
|DRIVING VEH. W/O ADEQUATE REAR REG. PLATE ILLUMINATION                                          |Citation      |1715               |
|DUI                                                                                             |Citation      |53009              |
|FAILURE TO DRIVE VEHICLE ON RIGHT HALF OF ROADWAY WHEN REQUIR

In [53]:
# Determine the run time for this query.

start_time = time.time()

# Select 25 locations with highest citations of "Speeding" recently
spark.sql(""" select Description, Location, count(2) as Number_of_Incidents, `Year of Stop`,`Month of Stop`
                from p_traffic_data
                where Description = 'Speeding' and `Violation Type` = 'Citation'
                group by Description, Location,`Year of Stop`,`Month of Stop`
                order by `Year of Stop` desc, `Month of Stop` desc, Number_of_Incidents desc, Location asc
          """).show(25,truncate = False)

print("--- %s seconds ---" % (time.time() - start_time))

+-----------+----------------------------------+-------------------+------------+-------------+
|Description|Location                          |Number_of_Incidents|Year of Stop|Month of Stop|
+-----------+----------------------------------+-------------------+------------+-------------+
|Speeding   |S/B GERMANTOWN RD @ DAWSON FARM RD|13                 |2023        |11           |
|Speeding   |DARNESTOWN RD @ AMERICAN WAY      |9                  |2023        |11           |
|Speeding   |RANDOLPH RD W/B & KEMP MILL RD    |9                  |2023        |11           |
|Speeding   |CONN. AVE S/B & DEAN RD AREA      |6                  |2023        |11           |
|Speeding   |RIVER RD @ BRAEBURN PKWY          |6                  |2023        |11           |
|Speeding   |21500 BEALLSVILLE                 |5                  |2023        |11           |
|Speeding   |GREAT SENECA HWY/GREY EAGLE CT    |5                  |2023        |11           |
|Speeding   |NB SHADY GROVE RD AT MIDCOU