In [165]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, to_timestamp, row_number, weekofyear, ceil, dayofmonth
from pyspark.sql.window import Window
import os
import shutil

In [136]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("WriteToPostgres") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0") \
    .getOrCreate()

In [137]:
path_data = "Datasets/traffic_accidents.csv"

raw_data = spark.read.csv(path_data, header=True, inferSchema=True)

In [138]:
raw_data = raw_data.withColumn("crash_date", to_timestamp("crash_date", "MM/dd/yyyy hh:mm:ss a"))

In [139]:
window = Window.orderBy("crash_date", "crash_hour")

In [140]:
Date_Data = raw_data.select(
    col("crash_date"),
    year("crash_date").alias("crash_year"),
    month("crash_date").alias("crash_month"),
    col("crash_day_of_week").alias("crash_day"),
    col("crash_hour")
).withColumn("id", row_number().over(window)) \
 .withColumn("crash_week", ceil(dayofmonth("crash_date") / 7.0))

In [141]:
Clean_Date_Data = Date_Data.dropDuplicates(["crash_date", "crash_year", "crash_month", "crash_day", "crash_hour", "crash_week"])

In [142]:
window = Window.orderBy("trafficway_type", "alignment", "roadway_surface_condition", "road_defect")

In [143]:
Roadway_Data = raw_data.select(col("trafficway_type"), col("alignment"), col("roadway_surface_cond").alias("roadway_surface_condition"), 
                               col("road_defect")).withColumn("id", row_number().over(window))

In [144]:
Clean_Roadway_Data = Roadway_Data.dropDuplicates(["trafficway_type", "alignment", "roadway_surface_condition",  "road_defect"])

In [145]:
window = Window.orderBy("crash_date", "crash_hour")

In [146]:
Final_Date_Data = Clean_Date_Data.select(col("crash_date"), col("crash_year"), col("crash_month"),
                                        col("crash_week"),col("crash_day"), col("crash_hour"), ).withColumn("id", row_number().over(window))

In [147]:
window = Window.orderBy("trafficway_type", "alignment", "roadway_surface_condition", "road_defect")

In [148]:
Final_Roadway_Data = Clean_Roadway_Data.select(col("trafficway_type"), ("alignment"), ("roadway_surface_condition"),
                                        ("road_defect")).withColumn("id", row_number().over(window))

In [149]:
Final_Roadway_Data.show()

+---------------+------------------+-------------------------+-----------------+---+
|trafficway_type|         alignment|roadway_surface_condition|      road_defect| id|
+---------------+------------------+-------------------------+-----------------+---+
|          ALLEY|      CURVE, LEVEL|                      WET|       RUT, HOLES|  1|
|          ALLEY|STRAIGHT AND LEVEL|                      DRY|       NO DEFECTS|  2|
|          ALLEY|STRAIGHT AND LEVEL|                      DRY|       RUT, HOLES|  3|
|          ALLEY|STRAIGHT AND LEVEL|                      DRY|  SHOULDER DEFECT|  4|
|          ALLEY|STRAIGHT AND LEVEL|                      DRY|          UNKNOWN|  5|
|          ALLEY|STRAIGHT AND LEVEL|                      DRY|     WORN SURFACE|  6|
|          ALLEY|STRAIGHT AND LEVEL|                      ICE|       NO DEFECTS|  7|
|          ALLEY|STRAIGHT AND LEVEL|                      ICE|  SHOULDER DEFECT|  8|
|          ALLEY|STRAIGHT AND LEVEL|                      ICE|   

In [150]:
Final_Date_Data.show()

+-------------------+----------+-----------+----------+---------+----------+---+
|         crash_date|crash_year|crash_month|crash_week|crash_day|crash_hour| id|
+-------------------+----------+-----------+----------+---------+----------+---+
|2013-03-03 16:48:00|      2013|          3|         1|        1|        16|  1|
|2013-06-01 20:29:00|      2013|          6|         1|        7|        20|  2|
|2015-02-13 08:00:00|      2015|          2|         2|        6|         8|  3|
|2015-05-25 23:38:00|      2015|          5|         4|        2|        23|  4|
|2015-08-02 19:55:00|      2015|          8|         1|        1|        19|  5|
|2015-08-03 15:30:00|      2015|          8|         1|        2|        15|  6|
|2015-08-04 12:40:00|      2015|          8|         1|        3|        12|  7|
|2015-08-04 14:30:00|      2015|          8|         1|        3|        14|  8|
|2015-08-06 10:00:00|      2015|          8|         1|        5|        10|  9|
|2015-08-07 08:18:00|      2

In [159]:
Join_Final_Date_Data = Final_Date_Data.withColumnRenamed("id", "date_id")
Join_Final_Roadway_Data = Final_Roadway_Data.withColumnRenamed("id", "road_id")

Date_Temp = raw_data.join(Join_Final_Date_Data, (raw_data["crash_date"] == Join_Final_Date_Data["crash_date"]) & 
                                                 (raw_data["crash_hour"] == Join_Final_Date_Data["crash_hour"]), "inner")

Crash_Temp = Date_Temp.join(Join_Final_Roadway_Data, (Date_Temp["trafficway_type"] == Join_Final_Roadway_Data["trafficway_type"])
                             & (Date_Temp["alignment"] == Join_Final_Roadway_Data["alignment"])
                             & (Date_Temp["roadway_surface_cond"] == Join_Final_Roadway_Data["roadway_surface_condition"])
                             & (Date_Temp["road_defect"] == Join_Final_Roadway_Data["road_defect"])
                             , "inner")

# select the columns from User_Final DataFrame to create a new DataFrame with the same schema as User_data and add the new columns Country_ID, State_ID, and City_ID
Crash_Data = Crash_Temp.select("date_id", "road_id", "traffic_control_device", "weather_condition", "lighting_condition",
                                   "first_crash_type", "crash_type", "intersection_related_i", "damage", "prim_contributory_cause",
                                   "num_units", "most_severe_injury", "injuries_total", "injuries_fatal", "injuries_incapacitating",
                                   "injuries_non_incapacitating", "injuries_reported_not_evident", "injuries_no_indication")



In [160]:
Crash_Data.printSchema()

root
 |-- date_id: integer (nullable = false)
 |-- road_id: integer (nullable = false)
 |-- traffic_control_device: string (nullable = true)
 |-- weather_condition: string (nullable = true)
 |-- lighting_condition: string (nullable = true)
 |-- first_crash_type: string (nullable = true)
 |-- crash_type: string (nullable = true)
 |-- intersection_related_i: string (nullable = true)
 |-- damage: string (nullable = true)
 |-- prim_contributory_cause: string (nullable = true)
 |-- num_units: integer (nullable = true)
 |-- most_severe_injury: string (nullable = true)
 |-- injuries_total: double (nullable = true)
 |-- injuries_fatal: double (nullable = true)
 |-- injuries_incapacitating: double (nullable = true)
 |-- injuries_non_incapacitating: double (nullable = true)
 |-- injuries_reported_not_evident: double (nullable = true)
 |-- injuries_no_indication: double (nullable = true)



In [166]:
tmp_path = os.path.join("Datasets", "_tmp_output")

Crash_Data.coalesce(1).write.mode("overwrite").option("header", "true").csv(tmp_path)

for file in os.listdir(tmp_path):
    if file.startswith("part-") and file.endswith(".csv"):
        full_temp_file_path = os.path.join(tmp_path, file)
        break
else:
    raise FileNotFoundError("CSV part file not found in temp folder.")

final_output_path = os.path.join("Datasets", "Clean_traffic_accidents.csv")
shutil.move(full_temp_file_path, final_output_path)

shutil.rmtree(tmp_path)

print(f"Saved: {final_output_path}")


Saved: Datasets/Clean_traffic_accidents.csv
