In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, concat_ws, to_timestamp, date_format, size, split
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np

In [2]:
#Read csv file to dataframe
spark = SparkSession.builder.config("spark.sql.legacy.timeParserPolicy", "LEGACY").getOrCreate()
df = spark.read.csv("gs://6893proj/Motor_Vehicle_Collisions_-_Crashes.csv", inferSchema=True, header=True)

df.show(1)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/12/01 22:09:47 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/12/01 22:09:47 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/12/01 22:09:47 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/12/01 22:09:47 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
22/12/01 22:10:19 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----------+----------+-------+--------+--------+---------+--------+--------------------+-----------------+---------------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|CRASH DATE|CRASH TIME|BOROUGH|ZIP CODE|LATITUDE|LONGITUDE|LOCATION|      ON STREET NAME|CROSS STREET NAME|OFF STREET NAME|NUMBER OF PERSONS INJURED|NUMBER OF PERSONS KILLED|NUMBER OF PEDESTRIANS INJURED|NUMBER OF PEDESTRIANS KILLED|NUMBER OF CYCLIST INJURED|NUMBER OF CYCLIST KILLED|NUMBER OF MOTORIST INJURED|NUMBER OF MOTORIST KILLED|CONTRIBUTING FACTOR VEHICLE 1|CONTRIBUTING FACTOR VEHICLE 2|CO

In [3]:
for column in df.columns:
    df = df.withColumnRenamed(column, column.lower())
df.printSchema()

root
 |-- crash date: string (nullable = true)
 |-- crash time: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- zip code: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- location: string (nullable = true)
 |-- on street name: string (nullable = true)
 |-- cross street name: string (nullable = true)
 |-- off street name: string (nullable = true)
 |-- number of persons injured: string (nullable = true)
 |-- number of persons killed: integer (nullable = true)
 |-- number of pedestrians injured: integer (nullable = true)
 |-- number of pedestrians killed: integer (nullable = true)
 |-- number of cyclist injured: integer (nullable = true)
 |-- number of cyclist killed: string (nullable = true)
 |-- number of motorist injured: string (nullable = true)
 |-- number of motorist killed: integer (nullable = true)
 |-- contributing factor vehicle 1: string (nullable = true)
 |-- contributing factor vehicle 2: strin

In [4]:
# drop line with null values in some columns
print(df.count())
df = df.dropna(how='any', subset=["crash date", "crash time", "borough", "zip code", "latitude", "longitude"\
                                      , "contributing factor vehicle 1", "vehicle type code 1"])
df = df.dropna(thresh=1, subset=["on street name", "cross street name", "off street name"])
print(df.count())

                                                                                

1945053




1295010


                                                                                

In [5]:
# deal with contributing factor 'Unspecified'
#df.withColumn('contributing factor vehicle 1', when(col('contributing factor vehicle 1') == 'Unspecified', None)\
#                  .otherwise(col('contributing factor vehicle 1'))).show(2)

In [6]:
# modify time for joining with weather data
df = df.withColumn("date", concat_ws(" ", col("crash date"), col("crash time")))
df = df.withColumn("date", to_timestamp("date", 'MM/dd/yyyy HH:mm'))
df = df.withColumn("simp_date", date_format("date", 'yyyy-MM-dd HH'))
df = df.drop("crash date", "crash time")
df.select("date", "simp_date").show(3)

[Stage 9:>                                                          (0 + 1) / 1]

+-------------------+-------------+
|               date|    simp_date|
+-------------------+-------------+
|2021-09-11 09:35:00|2021-09-11 09|
|2021-12-14 08:17:00|2021-12-14 08|
|2021-12-14 21:10:00|2021-12-14 21|
+-------------------+-------------+
only showing top 3 rows



                                                                                

In [7]:
# import weather data
w = spark.read.csv("gs://6893proj/NYC_Weather_2016_2022.csv", inferSchema=True, header=True)
w = w.dropna()
w.show(3)
w.printSchema()

+----------------+-------------------+------------------+---------+--------------+------------------+------------------+-------------------+--------------------+---------------------+
|            time|temperature_2m (°C)|precipitation (mm)|rain (mm)|cloudcover (%)|cloudcover_low (%)|cloudcover_mid (%)|cloudcover_high (%)|windspeed_10m (km/h)|winddirection_10m (°)|
+----------------+-------------------+------------------+---------+--------------+------------------+------------------+-------------------+--------------------+---------------------+
|2016-01-01T00:00|                7.6|               0.0|      0.0|          69.0|              53.0|               0.0|               72.0|                10.0|                296.0|
|2016-01-01T01:00|                7.5|               0.0|      0.0|          20.0|               4.0|               0.0|               56.0|                 9.8|                287.0|
|2016-01-01T02:00|                7.1|               0.0|      0.0|          32.

In [8]:
# change to timestamp
w = w.withColumn("date", to_timestamp("time"))
w = w.withColumn("simp_date", date_format("date", 'yyyy-MM-dd HH'))
w = w.drop("date")
w.select("simp_date").show(3)

[Stage 13:>                                                         (0 + 1) / 1]                                                                                

+-------------+
|    simp_date|
+-------------+
|2016-01-01 00|
|2016-01-01 01|
|2016-01-01 02|
+-------------+
only showing top 3 rows



In [9]:
df_w = df.join(w, on='simp_date', how='inner')
df_w.show(3)

                                                                                

+-------------+--------+--------+---------+---------+--------------------+--------------+-----------------+--------------------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+----------------+-------------------+------------------+---------+--------------+------------------+------------------+-------------------+--------------------+---------------------+
|    simp_date| borough|zip code| latitude|longitude|            location|on street name|cross street name|     off street name|number of persons injured|number of persons killed|number of ped

In [11]:
output_directory = 'gs://6893proj/hadoop/tmp/pyspark_output/collision/collision_weather'
df_w.coalesce(1).write.option("header", "true").csv(output_directory)

                                                                                

In [None]:
# merge columns of contributing factor vehicle and vehicle type code
'''df1 = df_w.withColumn("contributing factor", concat_ws("  ", col("contributing factor vehicle 1"), col("contributing factor vehicle 2")\
                                               , col("contributing factor vehicle 3"), col("contributing factor vehicle 4")\
                                               , col("contributing factor vehicle 5")))\
    .withColumn("contributing factor", split(col("contributing factor"), "  "))
df1 = df1.drop("contributing factor vehicle 1", "contributing factor vehicle 2", "contributing factor vehicle 3"\
             , "contributing factor vehicle 4", "contributing factor vehicle 5")

df1 = df1.withColumn("vehicle type code", concat_ws("  ", col("vehicle type code 1"), col("vehicle type code 2"), col("vehicle type code 3")\
                                                  , col("vehicle type code 4"), col("vehicle type code 5")))\
    .withColumn("vehicle type code", split(col("vehicle type code"), "  "))
df1 = df1.drop("vehicle type code 1", "vehicle type code 2", "vehicle type code 3", "vehicle type code 4", "vehicle type code 5")

df1.select("contributing factor", "vehicle type code").show(3)'''

In [None]:
'''output_directory = 'gs://6893proj/hadoop/tmp/pyspark_output/collision/collision_weather.csv'
df1.coalesce(1).write.option("header", "true").json(output_directory)'''