In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, desc, avg, to_timestamp, hour, date_format, month, year, when, mean
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline


In [2]:
from dotenv import load_dotenv
import os
load_dotenv()
key_filepath = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")


In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("US Accidents") \
    .config("spark.executor.memory", "10g") \
    .config("spark.driver.memory", "10g") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile",key_filepath  ) \
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/05/07 11:32:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [5]:

# Read Parquet files from GCS
parquet_files = [
                "gs://us-accidents-bucket/us_accidents_data/Start_Year=2016/933c14c388864f19a17c514e311a69b1-0.parquet",
                "gs://us-accidents-bucket/us_accidents_data/Start_Year=2017/933c14c388864f19a17c514e311a69b1-0.parquet",
                "gs://us-accidents-bucket/us_accidents_data/Start_Year=2018/933c14c388864f19a17c514e311a69b1-0.parquet",
                "gs://us-accidents-bucket/us_accidents_data/Start_Year=2019/933c14c388864f19a17c514e311a69b1-0.parquet",
                "gs://us-accidents-bucket/us_accidents_data/Start_Year=2020/933c14c388864f19a17c514e311a69b1-0.parquet",
                "gs://us-accidents-bucket/us_accidents_data/Start_Year=2021/933c14c388864f19a17c514e311a69b1-0.parquet",
                "gs://us-accidents-bucket/us_accidents_data/Start_Year=2022/933c14c388864f19a17c514e311a69b1-0.parquet",
                "gs://us-accidents-bucket/us_accidents_data/Start_Year=2023/933c14c388864f19a17c514e311a69b1-0.parquet"
                ]

df_list = [spark.read.option("header", "true").option("inferSchema", "true").parquet(file) for file in parquet_files]

merged_df = df_list[0]
for df in df_list[1:]:
    merged_df = merged_df.unionAll(df)



                                                                                

In [5]:
merged_df.show()

24/04/30 23:00:18 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[Stage 8:>                                                          (0 + 1) / 1]

+----+-------+--------+-------------------+-------------------+-----------------+------------------+------------+--------------------+--------------------+------------+----------+-----+-------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+--------+-------+-------+---------------+--------------+--------------+
|  ID| Source|Severity|         Start_Time|           End_Time|        Start_Lat|         Start_Lng|Distance(mi)|         Description|              Street|        City|    County|State|Country|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Weather_Condition|Crossing|Railway|Station|Traffic_Calming|Traffic_Signal|Sunrise_Sunset|
+----+-------+--------+-------------------+-------------------+-----------------+------------------+------------+--------------------+--------------------+------------+----------+-----+-------+--------------+-------------+-----------+

                                                                                

# Cleaning the data

### Steps:
> Removing unimportnant columns

> Renaming the columns

> Convert the values of the columns into easy handled values

In [6]:
from pyspark.sql.functions import to_timestamp

# Convert "Start_Time" and "End_Time" columns to datetime format
merged_df = merged_df.withColumn("Start_Time", to_timestamp(merged_df["Start_Time"], "yyyy-MM-dd HH:mm:ss"))
merged_df = merged_df.withColumn("End_Time", to_timestamp(merged_df["End_Time"], "yyyy-MM-dd HH:mm:ss"))


In [7]:

# Convert "Severity" column from string to integer
merged_df = merged_df.withColumn("Severity", merged_df["Severity"].cast(IntegerType()))


In [8]:
# TODO: If the rows that have at least one null value remove them, and if they are large number 
# replace the null with the average of its column
merged_df = merged_df.dropna()

In [39]:
merged_df.count()

                                                                                

5656839

## We have dropped about 2 millions records because of nulls :)

In [9]:
# Create a new column called Duration = End_Time - Start_Time
# To indicate the duration of the accident
from pyspark.sql.functions import col, unix_timestamp

# Convert "Start_Time" and "End_Time" columns to Unix timestamp (seconds since the epoch)
merged_df = merged_df.withColumn("Start_Time_unix", unix_timestamp("Start_Time"))
merged_df = merged_df.withColumn("End_Time_unix", unix_timestamp("End_Time"))

# Calculate the duration (in seconds)
merged_df = merged_df.withColumn("Duration", col("End_Time_unix") - col("Start_Time_unix"))


In [10]:
# Add columns for start_time
merged_df = merged_df.withColumn('Hour_Of_Acc', hour(merged_df['Start_Time']))
merged_df = merged_df.withColumn('Day_Of_Acc', date_format(merged_df['Start_Time'], 'EEEE'))
merged_df = merged_df.withColumn('Month_Of_Acc', month(merged_df['Start_Time']))

In [11]:
# Use 'when' function to assign temperature categories based on the bins
temperature_category_column = when(merged_df['Temperature(F)'] <= 50, 'Cold') \
    .when((merged_df['Temperature(F)'] > 50) & (merged_df['Temperature(F)'] <= 75), 'Moderate') \
    .otherwise('Warm')

# Add the temperature category column to the DataFrame
merged_df = merged_df.withColumn('Temperature_Category', temperature_category_column)

In [12]:
# Convert "Distance(mi)" column from miles to meters
merged_df = merged_df.withColumn("Distance(m)", col("Distance(mi)") * 1609.34)

In [13]:
# Drop unnecessary or redundant columns

# List of column names to drop
columns_to_drop = ['Country', 'Start_Time', 'End_Time', 'Distance(mi)', 'Start_Time_unix', 'End_Time_unix']

# Drop the unimportant columns
merged_df = merged_df.drop(*columns_to_drop)



In [37]:
merged_df.show()

[Stage 19:>                                                         (0 + 1) / 1]

+----+-------+--------+---------+------------------+--------------------+--------------------+------------+----------+-----+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+--------+-------+-------+---------------+--------------+--------------+--------+-----------+----------+------------+--------------------+-----------+
|  ID| Source|Severity|Start_Lat|         Start_Lng|         Description|              Street|        City|    County|State|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Weather_Condition|Crossing|Railway|Station|Traffic_Calming|Traffic_Signal|Sunrise_Sunset|Duration|Hour_Of_Acc|Day_Of_Acc|Month_Of_Acc|Temperature_Category|Distance(m)|
+----+-------+--------+---------+------------------+--------------------+--------------------+------------+----------+-----+--------------+-------------+-----------+------------+--------------+--------------+--------

                                                                                

In [42]:
merged_df.columns

['ID',
 'Source',
 'Severity',
 'Start_Time',
 'End_Time',
 'Start_Lat',
 'Start_Lng',
 'Description',
 'Street',
 'City',
 'County',
 'State',
 'Temperature(F)',
 'Wind_Chill(F)',
 'Humidity(%)',
 'Pressure(in)',
 'Visibility(mi)',
 'Wind_Direction',
 'Wind_Speed(mph)',
 'Weather_Condition',
 'Crossing',
 'Railway',
 'Station',
 'Traffic_Calming',
 'Traffic_Signal',
 'Sunrise_Sunset',
 'Distance(m)',
 'Start_Time_unix',
 'End_Time_unix',
 'Duration']

# Predictive Analytics

1. [Regression]➔ Predicting accident Duration as indicator of impact on
traffic flow.

2. [classification]➔ Predicting the severity of an accident based on the
factors involved.

### Based on the EDA phase we will use the following features in our model:
- Start_Lat
- Start_Lng
- Source
- Duration
- Hour_Of_Acc
- Day_Of_Acc
- Month_Of_Acc
- Temperature
- Distance
- State
- Crossing
- Railway
- Traffic_Calming
- Traffic_Signal

### Note: 

We execluded most of weather conditions as it does not affect on the severity as follows:  

Shows the severity of the accident, a number between 1 and 4, where 1 indicates the least impact on traffic (i.e., short delay as a result of the accident) and 4 indicates a significant impact on traffic (i.e., long delay).

## 1. Regression

In [17]:
from linear_regression_mr import linear_regression, predict


In [48]:

columns_to_keep = [
    "Start_Lat", "Start_Lng",  'Source', "Duration", "Hour_Of_Acc", "Day_Of_Acc",
    "Month_Of_Acc", "Temperature(F)", "Distance(m)", "State", "City", "Crossing",
    "Railway", "Traffic_Calming", "Traffic_Signal", "Severity"
]

# Selecting only the desired columns
lr_data_df = merged_df.select(columns_to_keep)

# Showing the resulting DataFrame
lr_data_df.show()

[Stage 25:>                                                         (0 + 1) / 1]

+---------+------------------+-------+--------+-----------+----------+------------+--------------+-----------+-----+------------+--------+-------+---------------+--------------+--------+
|Start_Lat|         Start_Lng| Source|Duration|Hour_Of_Acc|Day_Of_Acc|Month_Of_Acc|Temperature(F)|Distance(m)|State|        City|Crossing|Railway|Traffic_Calming|Traffic_Signal|Severity|
+---------+------------------+-------+--------+-----------+----------+------------+--------------+-----------+-----+------------+--------+-------+---------------+--------------+--------+
|39.063148|        -84.032608|Source2|    1800|          6|    Monday|           2|          36.0|    16.0934|   OH|Williamsburg|   false|  false|          false|          true|       2|
|39.747753|-84.20558199999998|Source2|    1800|          7|    Monday|           2|          35.1|    16.0934|   OH|      Dayton|   false|  false|          false|         false|       3|
|39.627781|        -84.188354|Source2|    1800|          7|    Mo

                                                                                

In [52]:
# Convert boolean columns to numerical values (0 or 1)
boolean_cols = [ 'Crossing', 'Railway', 'Traffic_Calming', 'Traffic_Signal']  
new_df = lr_data_df
for col_name in boolean_cols:
    new_df = new_df.withColumn(col_name, when(col(col_name), 1).otherwise(0))

# Convert string columns to numerical values using StringIndexer
string_cols = ['Source', 'City', 'State', 'Day_Of_Week']

indexers = [StringIndexer(inputCol=col_name, outputCol=col_name+"_index", handleInvalid="keep") for col_name in string_cols]
pipeline = Pipeline(stages=indexers)
new_df = pipeline.fit(new_df).transform(new_df)

# Convert indexed string columns to one-hot encoded vectors
encoder = OneHotEncoder(dropLast=False, inputCols=[col_name+"_index" for col_name in string_cols], outputCols=[col_name+"_onehot" for col_name in string_cols])
model = encoder.fit(new_df)
new_df = model.transform(new_df)

# Drop original string columns and indexed columns
new_df = new_df.drop(*[col_name+"_index" for col_name in string_cols])
new_df = new_df.drop(*string_cols)

# Show the DataFrame with converted numerical values
new_df.show()


                                                                                

Py4JJavaError: An error occurred while calling o575.fit.
: org.apache.spark.SparkException: Input column Day_Of_Week does not exist.
	at org.apache.spark.ml.feature.StringIndexerBase.$anonfun$validateAndTransformSchema$2(StringIndexer.scala:128)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
	at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
	at org.apache.spark.ml.feature.StringIndexerBase.validateAndTransformSchema(StringIndexer.scala:123)
	at org.apache.spark.ml.feature.StringIndexerBase.validateAndTransformSchema$(StringIndexer.scala:115)
	at org.apache.spark.ml.feature.StringIndexer.validateAndTransformSchema(StringIndexer.scala:145)
	at org.apache.spark.ml.feature.StringIndexer.transformSchema(StringIndexer.scala:252)
	at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:71)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:237)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [None]:
learning_rate = 0.0001
num_iterations = 100
weights, cost_history = linear_regression(merged_df)