In [1]:
# dataset source: https://data.cityofnewyork.us/Public-Safety/Motor-Vehicle-Collisions-Crashes/h9gi-nx95
# questions
# which borough had most crashes by year?
# Group incidents at the top of the hour and find out top 3 times of the day that saw the most crashes?
# If a incident had a fatality, what was the average fatality?
# what were top 3 zip codes where most incidents happened by each of NYC borough by year?

In [2]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.window import *

In [3]:
# create spark session
spark = SparkSession.builder.appName("analysis").getOrCreate()

23/06/01 20:45:19 WARN Utils: Your hostname, Ravis-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.6.23 instead (on interface en0)
23/06/01 20:45:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/01 20:45:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# read dataset
raw_df = spark.read.csv("crashes.csv", inferSchema=True, header=True)

                                                                                

In [5]:
# count number of rows in the dataframe
raw_df.count()

                                                                                

1996742

In [6]:
raw_df.printSchema()

root
 |-- CRASH DATE: string (nullable = true)
 |-- CRASH TIME: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- ZIP CODE: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- ON STREET NAME: string (nullable = true)
 |-- CROSS STREET NAME: string (nullable = true)
 |-- OFF STREET NAME: string (nullable = true)
 |-- NUMBER OF PERSONS INJURED: string (nullable = true)
 |-- NUMBER OF PERSONS KILLED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS INJURED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS KILLED: integer (nullable = true)
 |-- NUMBER OF CYCLIST INJURED: integer (nullable = true)
 |-- NUMBER OF CYCLIST KILLED: string (nullable = true)
 |-- NUMBER OF MOTORIST INJURED: string (nullable = true)
 |-- NUMBER OF MOTORIST KILLED: integer (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 1: string (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 2: strin

In [7]:
# check for duplicates
# count of rows by collision_id should be equal to total count of rows
row_count_df = raw_df.select(countDistinct(col("COLLISION_ID")).alias("row_cnt"))

In [8]:
row_count_df.show()

[Stage 7:>                                                        (0 + 10) / 10]

+-------+
|row_cnt|
+-------+
|1996740|
+-------+



                                                                                

In [9]:
# questions
# output the borough that had most creashes for every year?
borough_year_raw = raw_df.withColumn("crash_year", split("CRASH DATE", '/')[2]).select("crash_year", "BOROUGH", "COLLISION_ID")
borough_year_raw.show(10)

+----------+---------+------------+
|crash_year|  BOROUGH|COLLISION_ID|
+----------+---------+------------+
|      2021|     null|     4455765|
|      2022|     null|     4513547|
|      2022|     null|     4541903|
|      2021| BROOKLYN|     4456314|
|      2021| BROOKLYN|     4486609|
|      2021|     null|     4407458|
|      2021|     null|     4486555|
|      2021|    BRONX|     4486660|
|      2021| BROOKLYN|     4487074|
|      2021|MANHATTAN|     4486519|
+----------+---------+------------+
only showing top 10 rows



In [10]:
# remove all rows of data where the borough is null
cleaned_borough_data = borough_year_raw.dropna(subset=["BOROUGH", "crash_year"])
# which borough had most crashes by year?
cleaned_borough_data.createOrReplaceTempView("table")
top_years_sql = spark.sql(
f"""
WITH CTE AS(
SELECT
    crash_year,
    BOROUGH,
    RANK() OVER(PARTITION BY crash_year ORDER BY COUNT(DISTINCT COLLISION_ID) DESC) AS rnk
    FROM table
    GROUP BY 1,2
    )
SELECT 
crash_year,
BOROUGH
FROM CTE
WHERE rnk = 1
ORDER BY 1
""")
top_years_sql.show()

23/06/01 20:45:39 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 14:>                                                       (0 + 10) / 10]

+----------+--------+
|crash_year| BOROUGH|
+----------+--------+
|      2012|BROOKLYN|
|      2013|BROOKLYN|
|      2014|BROOKLYN|
|      2015|BROOKLYN|
|      2016|BROOKLYN|
|      2017|BROOKLYN|
|      2018|BROOKLYN|
|      2019|BROOKLYN|
|      2020|BROOKLYN|
|      2021|BROOKLYN|
|      2022|BROOKLYN|
|      2023|BROOKLYN|
+----------+--------+



                                                                                

In [57]:
# in pyspark
cte = cleaned_borough_data.groupBy("crash_year","BOROUGH")\
.agg(countDistinct("COLLISION_ID").alias("collision_cnt"))
# cte.show()
window_spec = Window.partitionBy("crash_year").orderBy(col("collision_cnt").desc())
ranked_cte = cte.withColumn("rnk", rank().over(window_spec))
# select rows with rnk = 1
result = ranked_cte.filter(col("rnk") == 1).orderBy("crash_year")
result.show()
# .select("crash_year","BOROUGH", countDistinct("COLLISION_ID").alias("collision_cnt"))




[Stage 97:>                                                       (0 + 10) / 10]

+----------+--------+-------------+---+
|crash_year| BOROUGH|collision_cnt|rnk|
+----------+--------+-------------+---+
|      2012|BROOKLYN|        23305|  1|
|      2013|BROOKLYN|        47021|  1|
|      2014|BROOKLYN|        47758|  1|
|      2015|BROOKLYN|        50847|  1|
|      2016|BROOKLYN|        47464|  1|
|      2017|BROOKLYN|        44915|  1|
|      2018|BROOKLYN|        47313|  1|
|      2019|BROOKLYN|        44479|  1|
|      2020|BROOKLYN|        25472|  1|
|      2021|BROOKLYN|        25171|  1|
|      2022|BROOKLYN|        23357|  1|
|      2023|BROOKLYN|         8803|  1|
+----------+--------+-------------+---+



                                                                                

In [34]:
# What are the top 3 times of the day saw the most crashes?
# round the time to top of hour i.e. 2:39 -> 2 or 21:10 -> 21
df_hour = raw_df.withColumn("hour", expr("hour(to_timestamp(`CRASH TIME`, 'H:mm'))"))\
.select("COLLISION_ID", col("CRASH TIME"), "hour")
df_hour_summarized = df_hour.groupBy("hour")\
.agg(countDistinct("COLLISION_ID").alias("collision_cnt"))\
.orderBy("collision_cnt", ascending=False)\
.limit(3)

df_hour_summarized.show()
# df.select("hour").show()

[Stage 39:>                                                       (0 + 10) / 10]

+----+-------------+
|hour|collision_cnt|
+----+-------------+
|  16|       144542|
|  17|       141298|
|  14|       133834|
+----+-------------+



                                                                                

In [42]:
# If a incident had a fatality, what was the average fatality?
curated_fatal_df = raw_df.select(col("NUMBER OF PERSONS INJURED").alias("person_killed")\
,col("NUMBER OF CYCLIST KILLED").alias("cyclist_killed")
,col("NUMBER OF PEDESTRIANS KILLED").alias("pedestration_killed")
,col("NUMBER OF MOTORIST KILLED").alias("motorist_killed")
,"COLLISION_ID")
curated_fatal_df.createOrReplaceTempView("table")
fatal_query = spark.sql(
f"""
WITH CTE AS 
(
SELECT 
DISTINCT COLLISION_ID,
total_killed
FROM
(
SELECT 
COLLISION_ID,
(person_killed + cyclist_killed + pedestration_killed + motorist_killed) AS total_killed
FROM table
) temp
WHERE total_killed > 0
)
SELECT
ROUND(AVG(total_killed), 2) AS avg_killed
FROM CTE
"""
)
fatal_query.show()

[Stage 52:>                                                       (0 + 10) / 10]

+----------+
|avg_killed|
+----------+
|      1.36|
+----------+



                                                                                

In [64]:
inner_df = curated_fatal_df.withColumn("total_killed", \
            expr("person_killed + cyclist_killed + pedestration_killed + motorist_killed"))\
.distinct()\
.where(col("total_killed") > 0)

result = inner_df.select(expr("ROUND(AVG(total_killed), 2)").alias("avg_killed"))
result.show()
# avg_fatalities_df = inner_df.withColumn("avg_killed", round(avg(col("total_killed"))))



+----------+
|avg_killed|
+----------+
|      1.36|
+----------+



                                                                                

In [65]:
# what were top 3 zip codes where most incidents happened by each of boroughs by year?
borough_year_raw = raw_df.withColumn("crash_year", split("CRASH DATE", '/')[2])\
.select("crash_year", col("ZIP CODE").alias("zip_code"), "BOROUGH", "COLLISION_ID")
# drop Null
non_null_borough_year_raw = borough_year_raw.dropna(subset=['BOROUGH', 'crash_year'])
non_null_borough_year_raw.createOrReplaceTempView("table")
top_3 = spark.sql(
f"""
WITH CTE AS
(
SELECT
crash_year,
BOROUGH,
zip_code,
collision_cnt,
DENSE_RANK() OVER (PARTITION BY crash_year, BOROUGH ORDER BY collision_cnt DESC) AS rnk
FROM 
(
SELECT 
crash_year,
BOROUGH,
zip_code,
COUNT(DISTINCT COLLISION_ID) AS collision_cnt
FROM table
GROUP BY 1,2,3
) temp
)

SELECT 
crash_year,
BOROUGH,
zip_code,
collision_cnt
FROM CTE
WHERE rnk < 4
ORDER BY 1,2,3,4 DESC
"""
)

top_3.show(20)

[Stage 114:>                                                      (0 + 10) / 10]

+----------+-------------+--------+-------------+
|crash_year|      BOROUGH|zip_code|collision_cnt|
+----------+-------------+--------+-------------+
|      2012|        BRONX|   10458|          547|
|      2012|        BRONX|   10466|          566|
|      2012|        BRONX|   10467|          693|
|      2012|     BROOKLYN|   11201|         1034|
|      2012|     BROOKLYN|   11203|          946|
|      2012|     BROOKLYN|   11207|         1345|
|      2012|    MANHATTAN|   10019|         1125|
|      2012|    MANHATTAN|   10022|         1224|
|      2012|    MANHATTAN|   10036|         1122|
|      2012|       QUEENS|   11101|         1054|
|      2012|       QUEENS|   11385|          857|
|      2012|       QUEENS|   11434|          738|
|      2012|STATEN ISLAND|   10306|          706|
|      2012|STATEN ISLAND|   10312|          491|
|      2012|STATEN ISLAND|   10314|         1076|
|      2013|        BRONX|   10458|         1158|
|      2013|        BRONX|   10467|         1567|


                                                                                

In [78]:
non_null_borough_year_cte1 = non_null_borough_year_raw.groupBy("crash_year",\
"BOROUGH","zip_code")\
.agg(countDistinct("COLLISION_ID").alias("collision_cnt"))

window_spec = Window.partitionBy("crash_year", "BOROUGH").orderBy(col("collision_cnt").desc())
non_null_borough_year_cte2 = non_null_borough_year_cte1.withColumn("rnk", \
 dense_rank().over(window_spec))

result_df = non_null_borough_year_cte2.filter(col("rnk") < 4)\
.orderBy("crash_year", "BOROUGH", "zip_code", "collision_cnt")
result_df.show()

[Stage 124:>                                                      (0 + 10) / 10]

+----------+-------------+--------+-------------+---+
|crash_year|      BOROUGH|zip_code|collision_cnt|rnk|
+----------+-------------+--------+-------------+---+
|      2012|        BRONX|   10458|          547|  3|
|      2012|        BRONX|   10466|          566|  2|
|      2012|        BRONX|   10467|          693|  1|
|      2012|     BROOKLYN|   11201|         1034|  2|
|      2012|     BROOKLYN|   11203|          946|  3|
|      2012|     BROOKLYN|   11207|         1345|  1|
|      2012|    MANHATTAN|   10019|         1125|  2|
|      2012|    MANHATTAN|   10022|         1224|  1|
|      2012|    MANHATTAN|   10036|         1122|  3|
|      2012|       QUEENS|   11101|         1054|  1|
|      2012|       QUEENS|   11385|          857|  2|
|      2012|       QUEENS|   11434|          738|  3|
|      2012|STATEN ISLAND|   10306|          706|  2|
|      2012|STATEN ISLAND|   10312|          491|  3|
|      2012|STATEN ISLAND|   10314|         1076|  1|
|      2013|        BRONX|  

                                                                                