In [1]:
# Import libraries

import os
import pyspark
import re
from operator import add
from pyspark.sql.types import *
from pyspark.sql import functions 
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, concat, isnull, mode, isnan, count, lower, upper, sum, first, regexp_replace, concat_ws, hour, minute, month, to_date, to_timestamp
from pyspark.sql.window import Window
import pyspark.sql.functions as F
import pandas as pd
import numpy as np
from numpy import mean
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn import preprocessing
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
import plotly.express as px
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import LSTM
from tensorflow.keras.layers import Dropout
import warnings
warnings.filterwarnings('ignore')

2024-04-10 12:52:01.835545: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-04-10 12:52:02.137469: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-04-10 12:52:03.496689: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
df = spark.read.format("csv").option("header","true").load('/user1/Motor_Vehicle_Collisions_-_Crashes.csv')

                                                                                

In [3]:
df.show(truncate = False)

24/04/10 12:53:14 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+----------+----------+---------+--------+---------+----------+-----------------------+----------------------------+------------------------------+---------------------------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+------------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+------------+-----------------------------------+-----------------------------------+-------------------+-------------------+-------------------+
|CRASH DATE|CRASH TIME|BOROUGH  |ZIP CODE|LATITUDE |LONGITUDE |LOCATION               |ON STREET NAME              |CROSS STREET NAME             |OFF STREET NAME            |NUMBER OF PERSONS INJURED|NUMBER OF PERSONS KILLED|NUMBER OF PEDESTRIANS INJURED|NUMBER OF PEDESTRIANS KILLED|NUMBER OF CYCLIST INJURED|NUM

## Preprocessing

#### Data Cleaning

In [4]:
# Drop irrelevant columns

df =  df.drop('CROSS STREET NAME', 'OFF STREET NAME', 'NUMBER OF PEDESTRIANS INJURED', 'NUMBER OF CYCLIST INJURED',
            'NUMBER OF CYCLIST KILLED', 'NUMBER OF MOTORIST INJURED', 'NUMBER OF MOTORIST KILLED',
            'CONTRIBUTING FACTOR VEHICLE 2', 'CONTRIBUTING FACTOR VEHICLE 3', 'CONTRIBUTING FACTOR VEHICLE 4',
            'CONTRIBUTING FACTOR VEHICLE 5', 'COLLISION_ID', 'VEHICLE TYPE CODE 2', 'VEHICLE TYPE CODE 3',
            'VEHICLE TYPE CODE 4', 'VEHICLE TYPE CODE 5', 'NUMBER OF PEDESTRIANS KILLED')

df = df.na.drop(how='any')

#df1.select([count(when(
#         isnan(col(each_col)) | \
#         (col(each_col) == "") | \
#         isnull(col(each_col)) | \
#         (lower(col(each_col)) == "null"), 1 \
#     )).alias(f"{each_col}") for each_col in df.columns]).show()

# CHANGE CRASH DATE FORMAT

df = df.withColumn('CRASH DATE', regexp_replace(df['CRASH DATE'],"\\/" , "-"))

# Create new date column with date & time

df = df.withColumn('DATE', concat_ws(' ', df['CRASH DATE'], df['CRASH TIME']))

# DATE to timestamp

df = df.withColumn('DATE', to_timestamp(df['DATE'], 'MM-dd-yyyy H:mm'))

df = df.withColumn('CRASH DATE', to_date(df['CRASH DATE'], 'MM-dd-yyyy'))

# Move DATE column to pisition 0

df = df.select('DATE', *[column for column in df.columns if column != 'DATE'])

# Sort by date

df = df.orderBy('DATE', 'CRASH TIME')

#Filter out 2012 December records

df = df.filter(~((col('DATE').substr(1, 4) == '2012') & (col('DATE').substr(6, 2) != '12')))

df = df.withColumn(
    'ZIP CODE', df['ZIP CODE'].cast('integer')).withColumn(
    'NUMBER OF PERSONS INJURED', 
    df['NUMBER OF PERSONS INJURED'].cast('integer')).withColumn(
    'NUMBER OF PERSONS KILLED', df['NUMBER OF PERSONS KILLED'].cast('integer'))

# Standarize categorical columns

df = df.withColumn(
    'CONTRIBUTING FACTOR VEHICLE 1', 
    upper(col('CONTRIBUTING FACTOR VEHICLE 1'))).withColumn(
    'BOROUGH', 
    upper(col('BOROUGH'))).withColumn(
    'ON STREET NAME', upper(col('ON STREET NAME'))).withColumn(
    'VEHICLE TYPE CODE 1', upper(col('VEHICLE TYPE CODE 1')))

# Create functions for time of day and season
def time_of_day(hour, minute):
    return when((hour == 0) & (minute >= 0) | ((hour >= 1) & (hour <= 6)), 'SMALL HOURS') \
        .when((hour > 6) & (hour <= 12), 'MORNING') \
        .when((hour > 12) & (hour <= 18), 'AFTERNOON') \
        .otherwise('EVENING')

def time_of_year(month):
    return when((month >= 3) & (month <= 5), 'SPRING') \
        .when((month >= 6) & (month <= 8), 'SUMMER') \
        .when((month >= 9) & (month <= 11), 'AUTUMN') \
        .otherwise('WINTER')

#Apply functions to create new columns

df = df.withColumn('TIME OF DAY', 
              time_of_day(hour(df['CRASH TIME']),minute(df['CRASH TIME']))
             ).withColumn('SEASON', time_of_year(month(df['CRASH DATE'])))


# # Organize columns

df = df.select('DATE', 'CRASH DATE', 'CRASH TIME', 'TIME OF DAY', 'SEASON',
               'BOROUGH', 'ZIP CODE', 'LATITUDE', 'LONGITUDE', 'ON STREET NAME', 
               'NUMBER OF PERSONS INJURED', 'NUMBER OF PERSONS KILLED',
               'CONTRIBUTING FACTOR VEHICLE 1', 'VEHICLE TYPE CODE 1')

print('Count of rows: {0}'.format(df.count()), '\n')

df.show(5)

                                                                                

Count of rows: 1012954 



                                                                                

+-------------------+----------+----------+-----------+------+---------+--------+----------+-----------+--------------------+-------------------------+------------------------+-----------------------------+--------------------+
|               DATE|CRASH DATE|CRASH TIME|TIME OF DAY|SEASON|  BOROUGH|ZIP CODE|  LATITUDE|  LONGITUDE|      ON STREET NAME|NUMBER OF PERSONS INJURED|NUMBER OF PERSONS KILLED|CONTRIBUTING FACTOR VEHICLE 1| VEHICLE TYPE CODE 1|
+-------------------+----------+----------+-----------+------+---------+--------+----------+-----------+--------------------+-------------------------+------------------------+-----------------------------+--------------------+
|2012-12-01 00:00:00|2012-12-01|      0:00|SMALL HOURS|WINTER|MANHATTAN|   10013|40.7233076|  -74.00298|BROOME STREET    ...|                        0|                       0|         AGGRESSIVE DRIVIN...|               OTHER|
|2012-12-01 00:05:00|2012-12-01|      0:05|SMALL HOURS|WINTER|MANHATTAN|   10036|40.7572

### DataFrame reduction by Time of Day

In [5]:
df_time_of_day = df

In [6]:
# Group the data by CRASH DATE and TIME OF DAY
grouped_data = df_time_of_day.groupBy("CRASH DATE", "TIME OF DAY")

# Aggregate the grouped data
counts_time_of_day = grouped_data.agg(
    count("CRASH TIME").alias("NUMBER OF CRASHES"),
    sum("NUMBER OF PERSONS INJURED").alias("PERSONS INJURED"),
    sum("NUMBER OF PERSONS KILLED").alias("PERSONS KILLED"),
    mode("LATITUDE").alias("LATITUDE"),
    mode("LONGITUDE").alias("LONGITUDE"),
    mode("BOROUGH").alias("BOROUGH"),
    mode("ON STREET NAME").alias("ON STREET NAME"),
    mode("CONTRIBUTING FACTOR VEHICLE 1").alias("CONTRIBUTING FACTOR VEHICLE 1"),
    mode("VEHICLE TYPE CODE 1").alias("VEHICLE TYPE CODE 1"),
    mode("SEASON").alias("SEASON")
)

counts_time_of_day = counts_time_of_day.withColumnRenamed("CRASH DATE", "DATE") \
                   .withColumnRenamed("BOROUGH", "PEAK BOROUGH") \
                   .withColumnRenamed("ON STREET NAME", "PEAK STREET") \
                   .withColumnRenamed("CONTRIBUTING FACTOR VEHICLE 1", "MAIN FACTOR") \
                   .withColumnRenamed("VEHICLE TYPE CODE 1", "MAIN VEHICLE TYPE")

# Convert TIME OF DAY column to categorical and sort accordingly
time_of_day_org = ['SMALL HOURS', 'MORNING', 'AFTERNOON', 'EVENING']
counts_time_of_day = counts_time_of_day.withColumn("TIME OF DAY", F.when(col("TIME OF DAY").isNull(), "UNKNOWN").otherwise(col("TIME OF DAY")))
counts_time_of_day = counts_time_of_day.withColumn("TIME OF DAY", F.when(col("TIME OF DAY").isin(time_of_day_org), col("TIME OF DAY")).otherwise("OTHER"))
counts_time_of_day = counts_time_of_day.withColumn("TIME OF DAY", counts_time_of_day["TIME OF DAY"].cast("string"))

# Sort the DataFrame by DATE and TIME OF DAY
counts_time_of_day = counts_time_of_day.orderBy("DATE", F.when(col("TIME OF DAY") == "SMALL HOURS", 1)
                                                               .when(col("TIME OF DAY") == "MORNING", 2)
                                                               .when(col("TIME OF DAY") == "AFTERNOON", 3)
                                                               .when(col("TIME OF DAY") == "EVENING", 4)
                                                               .otherwise(5))

counts_time_of_day.show(5)



+----------+-----------+-----------------+---------------+--------------+----------+-----------+------------+--------------------+-----------+-----------------+------+
|      DATE|TIME OF DAY|NUMBER OF CRASHES|PERSONS INJURED|PERSONS KILLED|  LATITUDE|  LONGITUDE|PEAK BOROUGH|         PEAK STREET|MAIN FACTOR|MAIN VEHICLE TYPE|SEASON|
+----------+-----------+-----------------+---------------+--------------+----------+-----------+------------+--------------------+-----------+-----------------+------+
|2012-12-01|SMALL HOURS|               74|             22|             0|40.7569869|-73.7375192|   MANHATTAN|NORTHERN BOULEVAR...|UNSPECIFIED|PASSENGER VEHICLE|WINTER|
|2012-12-01|    MORNING|              100|             20|             0|40.8261757|-73.9508609|    BROOKLYN|FLATBUSH AVENUE  ...|UNSPECIFIED|PASSENGER VEHICLE|WINTER|
|2012-12-01|  AFTERNOON|              139|             32|             0|40.6673324|-73.8631562|    BROOKLYN|FOREST AVENUE    ...|UNSPECIFIED|PASSENGER VEHICLE|

                                                                                

In [None]:
# Change column names
counts_time_of_day = counts_time_of_day.selectExpr(
    "CRASH DATE as DATE",
    "'TIME OF DAY'",
    "'NUMBER OF CRASHES'",
    "'PERSONS INJURED'",
    "'PERSONS KILLED'",
    "'LATITUDE'",
    "'LONGITUDE'",
    "'BOROUGH' as PEAK_BOROUGH",
    "'ON STREET NAME' as PEAK_STREET",
    "'CONTRIBUTING FACTOR VEHICLE 1 MAIN FACTOR' as MAIN_FACTOR",
    "'VEHICLE TYPE CODE 1' as MAIN_VEHICLE",
    "SEASON"
)

# Convert TIME OF DAY column to categorical and sort accordingly
time_of_day_org = ['SMALL HOURS', 'MORNING', 'AFTERNOON', 'EVENING']
counts_time_of_day = counts_time_of_day.withColumn("TIME OF DAY", F.when(col("TIME OF DAY").isNull(), "UNKNOWN").otherwise(col("TIME OF DAY")))
counts_time_of_day = counts_time_of_day.withColumn("TIME OF DAY", F.when(col("TIME OF DAY").isin(time_of_day_org), col("TIME OF DAY")).otherwise("OTHER"))
counts_time_of_day = counts_time_of_day.withColumn("TIME OF DAY", counts_time_of_day["TIME OF DAY"].cast("string"))

# Sort the DataFrame by DATE and TIME OF DAY
counts_time_of_day = counts_time_of_day.orderBy("DATE", F.when(col("TIME OF DAY") == "SMALL HOURS", 1)
                                                               .when(col("TIME OF DAY") == "MORNING", 2)
                                                               .when(col("TIME OF DAY") == "AFTERNOON", 3)
                                                               .when(col("TIME OF DAY") == "EVENING", 4)
                                                               .otherwise(5))


In [None]:
# Group the data by CRASH DATE and TIME OF DAY
grouped_data = df_time_of_day.groupBy("CRASH DATE", "TIME OF DAY")

# Aggregate the grouped data
counts_time_of_day = grouped_data.agg(
    count("CRASH TIME").alias("NUMBER OF CRASHES"),
    sum("NUMBER OF PERSONS INJURED").alias("PERSONS INJURED"),
    sum("NUMBER OF PERSONS KILLED").alias("PERSONS KILLED"),
    first("LATITUDE").alias("LATITUDE"),
    first("LONGITUDE").alias("LONGITUDE"),
    first("BOROUGH").alias("BOROUGH"),
    first("ON STREET NAME").alias("ON STREET NAME"),
    first("CONTRIBUTING FACTOR VEHICLE 1").alias("CONTRIBUTING FACTOR VEHICLE 1"),
    first("VEHICLE TYPE CODE 1").alias("VEHICLE TYPE CODE 1"),
    first("SEASON").alias("SEASON")
)

# Fill NaN values with zeros
counts_time_of_day = counts_time_of_day.fillna(0)

# Change column names
counts_time_of_day = counts_time_of_day.selectExpr(
    "CRASH DATE as DATE",
    "TIME OF DAY",
    "NUMBER OF CRASHES",
    "PERSONS INJURED",
    "PERSONS KILLED",
    "LATITUDE",
    "LONGITUDE",
    "BOROUGH as PEAK BOROUGH",
    "ON STREET NAME as PEAK STREET",
    "CONTRIBUTING FACTOR VEHICLE 1 as MAIN FACTOR",
    "VEHICLE TYPE CODE 1 as MAIN TYPE OF VEHICLE",
    "SEASON"
)

# Convert TIME OF DAY column to categorical and sort accordingly
time_of_day_org = ['SMALL HOURS', 'MORNING', 'AFTERNOON', 'EVENING']
counts_time_of_day = counts_time_of_day.withColumn("TIME OF DAY", F.when(col("TIME OF DAY").isNull(), "UNKNOWN").otherwise(col("TIME OF DAY")))
counts_time_of_day = counts_time_of_day.withColumn("TIME OF DAY", F.when(col("TIME OF DAY").isin(time_of_day_org), col("TIME OF DAY")).otherwise("OTHER"))
counts_time_of_day = counts_time_of_day.withColumn("TIME OF DAY", counts_time_of_day["TIME OF DAY"].cast("string"))

# Sort the DataFrame by DATE and TIME OF DAY
counts_time_of_day = counts_time_of_day.orderBy("DATE", F.when(col("TIME OF DAY") == "SMALL HOURS", 1)
                                                               .when(col("TIME OF DAY") == "MORNING", 2)
                                                               .when(col("TIME OF DAY") == "AFTERNOON", 3)
                                                               .when(col("TIME OF DAY") == "EVENING", 4)
                                                               .otherwise(5))

# Convert DATE column to string format
counts_time_of_day = counts_time_of_day.withColumn("DATE", F.date_format("DATE", "yyyy-MM-dd"))

# Show the DataFrame
counts_time_of_day.show()

In [None]:
# Read file

#df = spark.read.format("csv").option("header","true").load('/user1/Motor_Vehicle_Collisions_-_Crashes.csv')

# Print schema

# Show df

#df.show()

# Drop irrelevant columns

df = df.drop('CROSS STREET NAME', 'OFF STREET NAME', 'NUMBER OF PEDESTRIANS INJURED', 'NUMBER OF CYCLIST INJURED',
            'NUMBER OF CYCLIST KILLED', 'NUMBER OF MOTORIST INJURED', 'NUMBER OF MOTORIST KILLED',
            'CONTRIBUTING FACTOR VEHICLE 2', 'CONTRIBUTING FACTOR VEHICLE 3', 'CONTRIBUTING FACTOR VEHICLE 4',
            'CONTRIBUTING FACTOR VEHICLE 5', 'COLLISION_ID', 'VEHICLE TYPE CODE 2', 'VEHICLE TYPE CODE 3',
            'VEHICLE TYPE CODE 4', 'VEHICLE TYPE CODE 5', 'NUMBER OF PEDESTRIANS KILLED')

# Print count of rows

# print('Count of rows: {0}'.format(df.count()), '\n')
# print('Count of distinct rows: {0}'.format(df.distinct().count()), '\n')
df = df.dropDuplicates()
# print('Count of distinct rows after droping duplicates: {0}'.format(df.distinct().count()))

#df.select([count(when(col(i).isNull(),i))for i in df.columns]).show()

# Count Null Values

# print('\n','\n')
# print('Null values in each column: ','\n', '\n')
# df.select([count(when(
#         isnan(col(each_col)) | \
#         (col(each_col) == "") | \
#         isnull(col(each_col)) | \
#         (lower(col(each_col)) == "null"), 1 \
#     )).alias(f"{each_col}") for each_col in df.columns]).show()

# Fill Null Values with 0


df = df.na.fill(value='0')

# # Count Null values after filling with zeros

# print('\n','\n')
# print('Null values in each column after filling with zeros: ','\n', '\n')
# df.select([count(when(
#         isnan(col(each_col)) | \
#         (col(each_col) == "") | \
#         isnull(col(each_col)) | \
#         (lower(col(each_col)) == "null"), 1 \
#     )).alias(f"{each_col}") for each_col in df.columns]).show()


# print('\n', '\n')
# print('Count of rows: {0}'.format(df.count()), '\n')

df.show()

In [None]:
# Create new date column with date & time, handling null values
df = df.withColumn('DATE', when(
    df['CRASH DATE'].isNull() | df['CRASH TIME'].isNull(),
    None
).otherwise(concat_ws(' ', df['CRASH DATE'], df['CRASH TIME'])))

# Move DATE column to the beginning
df = df.select('DATE', *[column for column in df.columns if column != 'DATE'])

# # Make date & time columns datestamps
# df = df.withColumn('CRASH DATE', df['CRASH DATE'].cast('timestamp'))
# df = df.withColumn('CRASH TIME', df['CRASH TIME'].cast('timestamp').cast('string'))
# df = df.withColumn('DATE', df['DATE'].cast('timestamp'))

df.show()

In [None]:
df.withColumn('DATE', to_timestamp(df['DATE'])).show()

In [None]:
# Move DATE column to the beginning
df = df.select('DATE', *[column for column in df.columns if column != 'DATE'])

# Make date & time columns datestamps
df = df.withColumn('CRASH DATE', df['CRASH DATE'].cast('timestamp'))
df = df.withColumn('CRASH TIME', df['CRASH TIME'].cast('timestamp').cast('string'))
df = df.withColumn('DATE', df['DATE'].cast('timestamp'))

df.show()

In [None]:
# # Create new date column with date & time
# df = df.withColumn('DATE', concat_ws(' ', df['CRASH DATE'], df['CRASH TIME']))

# Create new date column with date & time, handling null values
df = df.withColumn('DATE', when(
    df['CRASH DATE'].isNull() | df['CRASH TIME'].isNull(),
    None
).otherwise(concat_ws(' ', df['CRASH DATE'], df['CRASH TIME'])))


# Move DATE column to the beginning
df = df.select('DATE', *[column for column in df.columns if column != 'DATE'])

# Make date & time columns datestamps
df = df.withColumn('CRASH DATE', df['CRASH DATE'].cast('timestamp'))
df = df.withColumn('CRASH TIME', df['CRASH TIME'].cast('timestamp').cast('string'))
df = df.withColumn('DATE', df['DATE'].cast('timestamp'))

df.show()

In [None]:
#Filter out 2012 December records

df = df.filter(~((col('DATE').substr(1, 4) == '2012') & (col('DATE').substr(6, 2) == '12')))
df.show()

In [None]:
# Filter out 2012 December records
df = df.filter(~((col('DATE').substr(1, 4) == '2012') & (col('DATE').substr(6, 2) == '12')))

# Convert ZIP CODE to numeric and eliminate Null values
df = df.withColumn('ZIP CODE', df['ZIP CODE'].cast('integer'))
df = df.dropna(subset=['ZIP CODE'])

# Convert NUMBER OF PERSONS INJURED, NUMBER OF PERSONS KILLED to integer and CRASH TIME to timestamp
df = df.withColumn('NUMBER OF PERSONS INJURED', df['NUMBER OF PERSONS INJURED'].cast('integer'))
df = df.withColumn('NUMBER OF PERSONS KILLED', df['NUMBER OF PERSONS KILLED'].cast('integer'))

df.show()

In [None]:
# Create new date column with date & time
df = df.withColumn('DATE', concat_ws(' ', df['CRASH DATE'], df['CRASH TIME']))

# Move DATE column to the beginning
df = df.select('DATE', *[column for column in df.columns if column != 'DATE'])

# Make date & time columns datestamps
df = df.withColumn('CRASH DATE', df['CRASH DATE'].cast('timestamp'))
df = df.withColumn('CRASH TIME', df['CRASH TIME'].cast('timestamp').cast('string'))
df = df.withColumn('DATE', df['DATE'].cast('timestamp'))

# Filter out 2012 December records
df = df.filter(~((col('DATE').substr(1, 4) == '2012') & (col('DATE').substr(6, 2) == '12')))

# Convert ZIP CODE to numeric and eliminate Null values
df = df.withColumn('ZIP CODE', df['ZIP CODE'].cast('integer'))
df = df.dropna(subset=['ZIP CODE'])

# Convert NUMBER OF PERSONS INJURED, NUMBER OF PERSONS KILLED to integer and CRASH TIME to timestamp
df = df.withColumn('NUMBER OF PERSONS INJURED', df['NUMBER OF PERSONS INJURED'].cast('integer'))
df = df.withColumn('NUMBER OF PERSONS KILLED', df['NUMBER OF PERSONS KILLED'].cast('integer'))

# Standardize categorical columns
df = df.withColumn('CONTRIBUTING FACTOR VEHICLE 1', upper(col('CONTRIBUTING FACTOR VEHICLE 1')))
df = df.withColumn('BOROUGH', upper(col('BOROUGH')))
df = df.withColumn('ON STREET NAME', upper(col('ON STREET NAME')))
df = df.withColumn('VEHICLE TYPE CODE 1', upper(col('VEHICLE TYPE CODE 1')))

# Sort values by date
df = df.orderBy('DATE')

# Create functions for time of day and season
def time_of_day(hour, minute):
    return when((hour == 0) & (minute >= 1) | ((hour >= 1) & (hour <= 6)), 'SMALL HOURS') \
        .when((hour > 6) & (hour <= 12), 'MORNING') \
        .when((hour > 12) & (hour <= 18), 'AFTERNOON') \
        .otherwise('EVENING')

def time_of_year(month):
    return when((month >= 3) & (month <= 5), 'SPRING') \
        .when((month >= 6) & (month <= 8), 'SUMMER') \
        .when((month >= 9) & (month <= 11), 'AUTUMN') \
        .otherwise('WINTER')

# Apply functions to create new columns
df = df.withColumn('TIME OF DAY', time_of_day(hour(df['CRASH TIME']), minute(df['CRASH TIME'])))
df = df.withColumn('SEASON', time_of_year(month(df['CRASH DATE'])))

# Organize columns
df = df.select('DATE', 'CRASH DATE', 'CRASH TIME', 'TIME OF DAY', 'SEASON',
               'BOROUGH', 'ZIP CODE', 'LATITUDE', 'LONGITUDE', 'ON STREET NAME', 
               'NUMBER OF PERSONS INJURED', 'NUMBER OF PERSONS KILLED',
               'CONTRIBUTING FACTOR VEHICLE 1', 'VEHICLE TYPE CODE 1')

# Show DataFrame
df.show()

In [None]:
# Create a copy of the DataFrame
df_time_of_day = df.select('*')

# Group the data by CRASH DATE and TIME OF DAY
grouped_data = df_time_of_day.groupBy("CRASH DATE", "TIME OF DAY")

# Aggregate the grouped data
counts_time_of_day = grouped_data.agg(
    count("CRASH TIME").alias("NUMBER OF CRASHES"),
    sum("NUMBER OF PERSONS INJURED").alias("PERSONS INJURED"),
    sum("NUMBER OF PERSONS KILLED").alias("PERSONS KILLED"),
    first("LATITUDE").alias("LATITUDE"),
    first("LONGITUDE").alias("LONGITUDE"),
    first("BOROUGH").alias("BOROUGH"),
    first("ON STREET NAME").alias("ON STREET NAME"),
    first("CONTRIBUTING FACTOR VEHICLE 1").alias("CONTRIBUTING FACTOR VEHICLE 1"),
    first("VEHICLE TYPE CODE 1").alias("VEHICLE TYPE CODE 1"),
    first("SEASON").alias("SEASON")
)

# Fill NaN values with zeros
counts_time_of_day = counts_time_of_day.fillna(0)

# Change column names
counts_time_of_day = counts_time_of_day.selectExpr(
    "CRASH DATE as DATE",
    "TIME OF DAY",
    "NUMBER OF CRASHES",
    "PERSONS INJURED",
    "PERSONS KILLED",
    "LATITUDE",
    "LONGITUDE",
    "BOROUGH as PEAK BOROUGH",
    "ON STREET NAME as PEAK STREET",
    "CONTRIBUTING FACTOR VEHICLE 1 as MAIN FACTOR",
    "VEHICLE TYPE CODE 1 as MAIN TYPE OF VEHICLE",
    "SEASON"
)

# Convert TIME OF DAY column to categorical and sort accordingly
time_of_day_org = ['SMALL HOURS', 'MORNING', 'AFTERNOON', 'EVENING']
counts_time_of_day = counts_time_of_day.withColumn("TIME OF DAY", F.when(col("TIME OF DAY").isNull(), "UNKNOWN").otherwise(col("TIME OF DAY")))
counts_time_of_day = counts_time_of_day.withColumn("TIME OF DAY", F.when(col("TIME OF DAY").isin(time_of_day_org), col("TIME OF DAY")).otherwise("OTHER"))
counts_time_of_day = counts_time_of_day.withColumn("TIME OF DAY", counts_time_of_day["TIME OF DAY"].cast("string"))

# Sort the DataFrame by DATE and TIME OF DAY
counts_time_of_day = counts_time_of_day.orderBy("DATE", F.when(col("TIME OF DAY") == "SMALL HOURS", 1)
                                                               .when(col("TIME OF DAY") == "MORNING", 2)
                                                               .when(col("TIME OF DAY") == "AFTERNOON", 3)
                                                               .when(col("TIME OF DAY") == "EVENING", 4)
                                                               .otherwise(5))

# Convert DATE column to string format
counts_time_of_day = counts_time_of_day.withColumn("DATE", F.date_format("DATE", "yyyy-MM-dd"))

# Show the DataFrame
counts_time_of_day.show()

# Stop SparkSession
#spark.stop()
