In [2]:
# !pip install pyspark
# !pip install dill

In [3]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import count
from pyspark.sql.functions import to_date, date_format
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import when
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
from joblib import dump
# from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
# import pandas as pd
# from sklearn.ensemble import RandomForestClassifier
# from sklearn.model_selection import train_test_split
# from sklearn.metrics import accuracy_score
# from sklearn.preprocessing import OneHotEncoder
import dill

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
# Create a SparkSession
spark = SparkSession.builder.appName("TrafficCrashPrediction").getOrCreate()

# importing dataset
file_path = "/content/drive/MyDrive/dataset.csv"

# Read the dataset into a DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True)


In [6]:
# Show the first few rows of the DataFrame to verify that the data is loaded correctly
df.show(5, truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------+----------------+----------------------+------------------+----------------------+--------------------+-----------------+------------------+--------------------+-------------------------------+--------+------------------+--------------------+-----------+-----------+--------------------------------+----------------------+------------------+-------------+-------------+----------------------+--------------------------------------+--------------------------------------------------------------------------------+---------+----------------+-----------+------------------+--------------+------------------+---------+-----------+--------------+-----------------+---------+------------------------+--------------+--------------+-----------------------+---------------------------+-----------------------------+----------------------+----------------+----------+-------------

In [7]:
# Get all column names of the DataFrame
column_names = df.columns

# Print the column names
print("Column Names:")
for column_name in column_names:
    print(column_name)


Column Names:
CRASH_RECORD_ID
CRASH_DATE_EST_I
CRASH_DATE
POSTED_SPEED_LIMIT
TRAFFIC_CONTROL_DEVICE
DEVICE_CONDITION
WEATHER_CONDITION
LIGHTING_CONDITION
FIRST_CRASH_TYPE
TRAFFICWAY_TYPE
LANE_CNT
ALIGNMENT
ROADWAY_SURFACE_COND
ROAD_DEFECT
REPORT_TYPE
CRASH_TYPE
INTERSECTION_RELATED_I
NOT_RIGHT_OF_WAY_I
HIT_AND_RUN_I
DAMAGE
DATE_POLICE_NOTIFIED
PRIM_CONTRIBUTORY_CAUSE
SEC_CONTRIBUTORY_CAUSE
STREET_NO
STREET_DIRECTION
STREET_NAME
BEAT_OF_OCCURRENCE
PHOTOS_TAKEN_I
STATEMENTS_TAKEN_I
DOORING_I
WORK_ZONE_I
WORK_ZONE_TYPE
WORKERS_PRESENT_I
NUM_UNITS
MOST_SEVERE_INJURY
INJURIES_TOTAL
INJURIES_FATAL
INJURIES_INCAPACITATING
INJURIES_NON_INCAPACITATING
INJURIES_REPORTED_NOT_EVIDENT
INJURIES_NO_INDICATION
INJURIES_UNKNOWN
CRASH_HOUR
CRASH_DAY_OF_WEEK
CRASH_MONTH
LATITUDE
LONGITUDE
LOCATION


In [8]:
# Select the required columns from the original DataFrame
selected_columns_df = df.select("CRASH_DATE", "WEATHER_CONDITION", "LIGHTING_CONDITION",
                                "ROADWAY_SURFACE_COND", "STREET_NAME",
                                "CRASH_DAY_OF_WEEK", "CRASH_MONTH")

# Show the selected columns DataFrame

selected_columns_df.show()
selected_columns_df.count()

+--------------------+-----------------+--------------------+--------------------+----------------+-----------------+-----------+
|          CRASH_DATE|WEATHER_CONDITION|  LIGHTING_CONDITION|ROADWAY_SURFACE_COND|     STREET_NAME|CRASH_DAY_OF_WEEK|CRASH_MONTH|
+--------------------+-----------------+--------------------+--------------------+----------------+-----------------+-----------+
|08/18/2023 12:50:...|            CLEAR|            DAYLIGHT|                 DRY|        OHARE ST|                6|          8|
|07/29/2023 02:45:...|            CLEAR|            DAYLIGHT|                 DRY|     ASHLAND AVE|                7|          7|
|08/18/2023 05:58:...|            CLEAR|            DAYLIGHT|                 DRY|        LONG AVE|                6|          8|
|11/26/2019 08:38:...|            CLEAR|            DAYLIGHT|                 DRY|     TERMINAL ST|                3|         11|
|08/18/2023 10:45:...|            CLEAR|            DAYLIGHT|                 DRY|     TER

820587

In [9]:
# Get all distinct values
distinct_lighting_conditions = selected_columns_df.select('LIGHTING_CONDITION').distinct()

# Show the distinct values
distinct_lighting_conditions.show(truncate=False)
print(distinct_lighting_conditions.count())

+----------------------+
|LIGHTING_CONDITION    |
+----------------------+
|DAWN                  |
|DAYLIGHT              |
|DUSK                  |
|UNKNOWN               |
|DARKNESS, LIGHTED ROAD|
|DARKNESS              |
+----------------------+

6


In [10]:
# getting column data types
selected_columns_df.printSchema()

root
 |-- CRASH_DATE: string (nullable = true)
 |-- WEATHER_CONDITION: string (nullable = true)
 |-- LIGHTING_CONDITION: string (nullable = true)
 |-- ROADWAY_SURFACE_COND: string (nullable = true)
 |-- STREET_NAME: string (nullable = true)
 |-- CRASH_DAY_OF_WEEK: integer (nullable = true)
 |-- CRASH_MONTH: integer (nullable = true)



In [11]:
# Define a function to parse and reformat dates
def parse_and_reformat(date_str):
    try:
        # Attempt to parse date with first format
        return datetime.strptime(date_str, "%m/%d/%Y %I:%M:%S %p").strftime("%m/%d/%Y")
    except ValueError:
        try:
            # Attempt to parse date with second format
            return datetime.strptime(date_str, "%m-%d-%Y %I:%M:%S %p").strftime("%m/%d/%Y")
        except ValueError:
            # Return None for invalid formats
            return None

# Register the function as a UDF
parse_and_reformat_udf = udf(parse_and_reformat, StringType())

In [12]:
# Apply the UDF to create a new column with standardized date format
cleaned_df = selected_columns_df.withColumn("CRASH_DATE_STANDARDIZED", parse_and_reformat_udf("CRASH_DATE"))

# Drop the original 'CRASH_DATE' column
cleaned_df = cleaned_df.drop("CRASH_DATE")

# Drop rows with null values in the new column
cleaned_df = cleaned_df.dropna(subset=["CRASH_DATE_STANDARDIZED"])

# Show the updated DataFrame
cleaned_df.show()


+-----------------+--------------------+--------------------+----------------+-----------------+-----------+-----------------------+
|WEATHER_CONDITION|  LIGHTING_CONDITION|ROADWAY_SURFACE_COND|     STREET_NAME|CRASH_DAY_OF_WEEK|CRASH_MONTH|CRASH_DATE_STANDARDIZED|
+-----------------+--------------------+--------------------+----------------+-----------------+-----------+-----------------------+
|            CLEAR|            DAYLIGHT|                 DRY|        OHARE ST|                6|          8|             08/18/2023|
|            CLEAR|            DAYLIGHT|                 DRY|     ASHLAND AVE|                7|          7|             07/29/2023|
|            CLEAR|            DAYLIGHT|                 DRY|        LONG AVE|                6|          8|             08/18/2023|
|            CLEAR|            DAYLIGHT|                 DRY|     TERMINAL ST|                3|         11|             11/26/2019|
|            CLEAR|            DAYLIGHT|                 DRY|     TER

In [13]:
# Convert string column to date format
cleaned_df = cleaned_df.withColumn("CRASH_DATE_STANDARDIZED", to_date(cleaned_df["CRASH_DATE_STANDARDIZED"], "MM/dd/yyyy"))

# Show the updated DataFrame
cleaned_df.show()

cleaned_df.printSchema()

+-----------------+--------------------+--------------------+----------------+-----------------+-----------+-----------------------+
|WEATHER_CONDITION|  LIGHTING_CONDITION|ROADWAY_SURFACE_COND|     STREET_NAME|CRASH_DAY_OF_WEEK|CRASH_MONTH|CRASH_DATE_STANDARDIZED|
+-----------------+--------------------+--------------------+----------------+-----------------+-----------+-----------------------+
|            CLEAR|            DAYLIGHT|                 DRY|        OHARE ST|                6|          8|             2023-08-18|
|            CLEAR|            DAYLIGHT|                 DRY|     ASHLAND AVE|                7|          7|             2023-07-29|
|            CLEAR|            DAYLIGHT|                 DRY|        LONG AVE|                6|          8|             2023-08-18|
|            CLEAR|            DAYLIGHT|                 DRY|     TERMINAL ST|                3|         11|             2019-11-26|
|            CLEAR|            DAYLIGHT|                 DRY|     TER

In [14]:
# Number of rows in the DataFrame
total_rows = cleaned_df.count()
print("Number of rows in the DataFrame:", total_rows)

# Number of non-null values in the column 'CRASH_DATE_STANDARDIZED'
non_null_values = cleaned_df.filter(cleaned_df.CRASH_DATE_STANDARDIZED.isNotNull()).count()
print("Number of non-null values in 'CRASH_DATE_STANDARDIZED' column:", non_null_values)

# Number of distinct values in the column 'CRASH_DATE_STANDARDIZED'
distinct_values_count = cleaned_df.select("CRASH_DATE_STANDARDIZED").distinct().count()
print("Number of distinct values in 'CRASH_DATE_STANDARDIZED' column:", distinct_values_count)


Number of rows in the DataFrame: 820587
Number of non-null values in 'CRASH_DATE_STANDARDIZED' column: 820587
Number of distinct values in 'CRASH_DATE_STANDARDIZED' column: 3203


In [15]:
# creating a new data frame with 3 columns: "CRASH_DATE_STANDARDIZED", "STREET_NAME", and a new column "NUMBER_OF_ACCIDENTS"

# Group by 'CRASH_DATE_STANDARDIZED' and 'STREET_NAME', and count the number of occurrences
aggregated_df = cleaned_df.groupBy("CRASH_DATE_STANDARDIZED", "STREET_NAME").agg(F.count("*").alias("NUMBER_OF_ACCIDENTS"))

# Show the aggregated DataFrame
aggregated_df.show()


+-----------------------+--------------------+-------------------+
|CRASH_DATE_STANDARDIZED|         STREET_NAME|NUMBER_OF_ACCIDENTS|
+-----------------------+--------------------+-------------------+
|             2023-09-01|         TERMINAL ST|                  5|
|             2023-09-29|            OHARE ST|                  3|
|             2021-08-25|       FULLERTON AVE|                  2|
|             2018-06-18|             67TH ST|                  1|
|             2020-08-12|            OHARE ST|                  2|
|             2021-11-02|    STONY ISLAND AVE|                  6|
|             2022-09-08|             35TH ST|                  4|
|             2019-01-24|         WESTERN AVE|                 12|
|             2022-04-17|           DAMEN AVE|                  2|
|             2020-10-28|          ONTARIO ST|                  1|
|             2018-10-07|      SACRAMENTO AVE|                  2|
|             2017-12-28|         DIVISION ST|                

In [16]:
# Order the DataFrame by 'CRASH_DATE_STANDARDIZED' in Ascending order
sorted_df = aggregated_df.orderBy('CRASH_DATE_STANDARDIZED')

# Show all records in ascending order of date
sorted_df.show()

+-----------------------+--------------------+-------------------+
|CRASH_DATE_STANDARDIZED|         STREET_NAME|NUMBER_OF_ACCIDENTS|
+-----------------------+--------------------+-------------------+
|             2013-03-03|          FOSTER AVE|                  1|
|             2013-06-01|          PRATT BLVD|                  1|
|             2014-01-18|             LAKE ST|                  1|
|             2014-01-21|        EXCHANGE AVE|                  1|
|             2014-02-24|     SACRAMENTO BLVD|                  1|
|             2014-06-25|          PULASKI RD|                  1|
|             2014-08-20|             79TH ST|                  1|
|             2014-11-11|           DEVON AVE|                  1|
|             2015-01-10|            AVENUE L|                  1|
|             2015-01-17|          CLINTON ST|                  1|
|             2015-02-13|          ONTARIO ST|                  1|
|             2015-02-23|       FULLERTON AVE|                

In [17]:
# Filter records with number of accidents more than 15
filtered_df = sorted_df.filter(col('NUMBER_OF_ACCIDENTS') > 15)

# Show filtered records
filtered_df.show(1000)

+-----------------------+--------------+-------------------+
|CRASH_DATE_STANDARDIZED|   STREET_NAME|NUMBER_OF_ACCIDENTS|
+-----------------------+--------------+-------------------+
|             2017-09-15|    CICERO AVE|                 20|
|             2017-10-06|    HALSTED ST|                 16|
|             2017-10-20|   WESTERN AVE|                 17|
|             2017-11-02|    HALSTED ST|                 16|
|             2017-12-21|   WESTERN AVE|                 16|
|             2017-12-28|   WESTERN AVE|                 17|
|             2018-01-02|IRVING PARK RD|                 16|
|             2018-01-05|   WESTERN AVE|                 20|
|             2018-01-19|   WESTERN AVE|                 17|
|             2018-02-05|    CICERO AVE|                 17|
|             2018-03-23|   WESTERN AVE|                 16|
|             2018-03-23|    PULASKI RD|                 16|
|             2018-05-10|   ASHLAND AVE|                 20|
|             2018-05-23

In [18]:
# making third dataframe by merging the 2 dfs: 'cleaned_df' and 'aggregated_df'

# Perform a join operation between cleaned_df and aggregated_df
final_df = cleaned_df.join(aggregated_df, ['CRASH_DATE_STANDARDIZED', 'STREET_NAME'], 'left_outer')

# Show the final DataFrame
final_df.show(1000)

+-----------------------+--------------------+-----------------+--------------------+--------------------+-----------------+-----------+-------------------+
|CRASH_DATE_STANDARDIZED|         STREET_NAME|WEATHER_CONDITION|  LIGHTING_CONDITION|ROADWAY_SURFACE_COND|CRASH_DAY_OF_WEEK|CRASH_MONTH|NUMBER_OF_ACCIDENTS|
+-----------------------+--------------------+-----------------+--------------------+--------------------+-----------------+-----------+-------------------+
|             2015-08-14|       LAKE PARK AVE|             RAIN|DARKNESS, LIGHTED...|                 WET|                6|          8|                  1|
|             2015-08-21|             LAKE ST|            CLEAR|            DAYLIGHT|                 DRY|                6|          8|                  1|
|             2015-09-03|          MADISON ST|            CLEAR|            DAYLIGHT|                 DRY|                5|          9|                  1|
|             2015-09-08|       WENTWORTH AVE|            

In [19]:
# dropping null records
final_df = final_df.dropna()

# Convert 'NUMBER_OF_ACCIDENTS' column from 'long' to 'integer'
final_df = final_df.withColumn("NUMBER_OF_ACCIDENTS", final_df["NUMBER_OF_ACCIDENTS"].cast(IntegerType()))

final_df.printSchema()

root
 |-- CRASH_DATE_STANDARDIZED: date (nullable = true)
 |-- STREET_NAME: string (nullable = true)
 |-- WEATHER_CONDITION: string (nullable = true)
 |-- LIGHTING_CONDITION: string (nullable = true)
 |-- ROADWAY_SURFACE_COND: string (nullable = true)
 |-- CRASH_DAY_OF_WEEK: integer (nullable = true)
 |-- CRASH_MONTH: integer (nullable = true)
 |-- NUMBER_OF_ACCIDENTS: integer (nullable = true)



In [20]:
# Group the final_df by 'NUMBER_OF_ACCIDENTS' and count the number of records for each value
count_by_accidents = final_df.groupBy('NUMBER_OF_ACCIDENTS').count()


# Order the count_by_accidents DataFrame by the 'NUMBER_OF_ACCIDENTS' column in ascending order
count_by_accidents_ordered = count_by_accidents.orderBy('NUMBER_OF_ACCIDENTS')

# Show the count for each value of 'NUMBER_OF_ACCIDENTS' in ascending order
count_by_accidents_ordered.show(1000)

+-------------------+------+
|NUMBER_OF_ACCIDENTS| count|
+-------------------+------+
|                  1|310021|
|                  2|164010|
|                  3|104724|
|                  4| 71068|
|                  5| 48550|
|                  6| 34224|
|                  7| 24199|
|                  8| 18832|
|                  9| 13221|
|                 10| 10630|
|                 11|  7029|
|                 12|  4860|
|                 13|  3341|
|                 14|  2478|
|                 15|  1485|
|                 16|   960|
|                 17|   442|
|                 18|   252|
|                 19|   114|
|                 20|    60|
|                 21|    42|
|                 22|    44|
+-------------------+------+



In [21]:
# Add a new column 'SAFE' based on the value of 'NUMBER_OF_ACCIDENTS'
final_df = final_df.withColumn("SAFE", when(final_df["NUMBER_OF_ACCIDENTS"] >= 3, 0).otherwise(1))

final_df.printSchema()

root
 |-- CRASH_DATE_STANDARDIZED: date (nullable = true)
 |-- STREET_NAME: string (nullable = true)
 |-- WEATHER_CONDITION: string (nullable = true)
 |-- LIGHTING_CONDITION: string (nullable = true)
 |-- ROADWAY_SURFACE_COND: string (nullable = true)
 |-- CRASH_DAY_OF_WEEK: integer (nullable = true)
 |-- CRASH_MONTH: integer (nullable = true)
 |-- NUMBER_OF_ACCIDENTS: integer (nullable = true)
 |-- SAFE: integer (nullable = false)



In [22]:
# Show the updated DataFrame
final_df.show(1000)

+-----------------------+--------------------+-----------------+--------------------+--------------------+-----------------+-----------+-------------------+----+
|CRASH_DATE_STANDARDIZED|         STREET_NAME|WEATHER_CONDITION|  LIGHTING_CONDITION|ROADWAY_SURFACE_COND|CRASH_DAY_OF_WEEK|CRASH_MONTH|NUMBER_OF_ACCIDENTS|SAFE|
+-----------------------+--------------------+-----------------+--------------------+--------------------+-----------------+-----------+-------------------+----+
|             2014-01-18|             LAKE ST|            CLEAR|                DUSK|                 DRY|                7|          1|                  1|   1|
|             2014-11-11|           DEVON AVE|          UNKNOWN|             UNKNOWN|             UNKNOWN|                3|         11|                  1|   1|
|             2015-04-06|         ASHLAND AVE|            CLEAR|            DARKNESS|                 DRY|                2|          4|                  1|   1|
|             2015-04-15|   

In [23]:
# ################################################################################################################################################################
# ################################################################################################################################################################
# ################################################################################################################################################################
# ################################################################################################################################################################
# ################################################################################################################################################################

# making the model

# Encoding categorical features
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index").fit(final_df) for col in ['STREET_NAME', 'WEATHER_CONDITION', 'LIGHTING_CONDITION', 'ROADWAY_SURFACE_COND']]
pipeline = Pipeline(stages=indexers)
encoded_df = pipeline.fit(final_df).transform(final_df)

In [24]:
# Assemble features into a vector
assembler = VectorAssembler(inputCols=['STREET_NAME_index', 'WEATHER_CONDITION_index', 'LIGHTING_CONDITION_index', 'ROADWAY_SURFACE_COND_index', 'CRASH_DAY_OF_WEEK', 'CRASH_MONTH'], outputCol="features")
assembled_df = assembler.transform(encoded_df)

# Split the data into training and testing sets
train_data, test_data = assembled_df.randomSplit([0.8, 0.2], seed=42)

# Train RandomForestClassifier model
rf = RandomForestClassifier(labelCol="SAFE", featuresCol="features", numTrees=10, maxBins=2048)
model = rf.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="SAFE")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

# ################################################################################################################################################################
# ################################################################################################################################################################
# ################################################################################################################################################################
# ################################################################################################################################################################
# ################################################################################################################################################################

Accuracy: 0.7964517857575121


In [25]:
# Fit the pipeline on the training data, to use it as many times for testing, without it taking much time
pipeline_model = pipeline.fit(final_df)

In [26]:
# testing the model

# Define the input data
sample_data = {
    'STREET_NAME': 'HALSTED ST',
    'WEATHER_CONDITION': 'CLEAR',
    'LIGHTING_CONDITION': 'DAYLIGHT',
    'ROADWAY_SURFACE_COND': 'DRY',
    'CRASH_DAY_OF_WEEK': 5,
    'CRASH_MONTH': 8
}

# Create a DataFrame with the sample data
sample_df = spark.createDataFrame([sample_data])

# Index the categorical columns
# indexers = [StringIndexer(inputCol=col, outputCol=col+"_index").fit(final_df) for col in ['STREET_NAME', 'WEATHER_CONDITION', 'LIGHTING_CONDITION', 'ROADWAY_SURFACE_COND']]
# pipeline = Pipeline(stages=indexers)
# indexed_sample_df = pipeline.fit(final_df).transform(sample_df)

# Transform the sample data using the fitted pipeline model
indexed_sample_df = pipeline_model.transform(sample_df)



# Assemble features into a vector
# assembler = VectorAssembler(inputCols=['STREET_NAME_index', 'WEATHER_CONDITION_index', 'LIGHTING_CONDITION_index', 'ROADWAY_SURFACE_COND_index', 'CRASH_DAY_OF_WEEK', 'CRASH_MONTH'], outputCol="features")
assembled_sample_df = assembler.transform(indexed_sample_df)

# Make predictions
predictions = model.transform(assembled_sample_df)

# Display the prediction
prediction_value = predictions.select("prediction").collect()[0][0]
print("Predicted SAFE value:", int(prediction_value))

Predicted SAFE value: 1


In [28]:
model.save('/content/drive/MyDrive/AccidentModel')

# dump(model, '/content/drive/MyDrive/collisionDetectionModel.joblib')

# Save the model using dill
# with open('/content/drive/MyDrive/collisionDetectionModel.pkl', 'wb') as file:
#     dill.dump(model, file)