In [3]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, to_timestamp, unix_timestamp
from pyspark.sql.types import DoubleType


spark = SparkSession.builder \
    .appName("US Accidents Severity Prediction") \
    .master("local[*]") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.sql.shuffle.partitions", "50") \
    .config("spark.memory.fraction", "0.8") \
    .getOrCreate()


In [4]:
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("dataset.csv")

                                                                                

In [5]:
df2 = df \
    .drop("ID","Source", "Zipcode", "Timezone", "Airport_Code", "Amenity", "Wind_Chill(F)", "Distance(mi)",
          "Bump", "Give_Way", "No_Exit", "Railway", "Description", "County", "Precipitation(in)", "Turning_Loop",
          "Roundabout", "Station", "Stop", "Nautical_Twilight", "Astronomical_Twilight", "Country", "Traffic_Calming")
df2.show()

+--------+-------------------+-------------------+------------------+------------------+-------+-------+--------------------+------------+-----+-------------------+--------------+-----------+------------+--------------+--------------+---------------+-----------------+--------+--------+--------------+--------------+--------------+
|Severity|         Start_Time|           End_Time|         Start_Lat|         Start_Lng|End_Lat|End_Lng|              Street|        City|State|  Weather_Timestamp|Temperature(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Weather_Condition|Crossing|Junction|Traffic_Signal|Sunrise_Sunset|Civil_Twilight|
+--------+-------------------+-------------------+------------------+------------------+-------+-------+--------------------+------------+-----+-------------------+--------------+-----------+------------+--------------+--------------+---------------+-----------------+--------+--------+--------------+--------------+--------------+
|   

In [6]:
df3 = df2 \
    .withColumn("Start_TS", to_timestamp(col("Start_Time"), "yyyy-MM-dd HH:mm:ss")) \
    .withColumn("End_TS", to_timestamp(col("End_Time"), "yyyy-MM-dd HH:mm:ss")) \
    .withColumn("Duration", ((unix_timestamp(col("End_TS")) - unix_timestamp(col("Start_TS"))) / 60).cast(DoubleType())) \
    .withColumn("Start_Lat", round(col("Start_Lat"), 1)) \
    .withColumn("Start_Lng", round(col("Start_Lng"), 1)) \
    .withColumn("Hour", hour(col("Start_TS"))) \
    .withColumn("DayOfWeek", dayofweek(col("Start_TS"))) \
    .withColumn("Month", month(col("Start_TS"))) \
    .withColumn("Weather_Hour", hour(col("Weather_Timestamp"))) \
    .withColumn("Weather_Day", month(col("Weather_Timestamp"))) \
    .withColumn("TimeSlot", when((hour(col("Start_TS")) >= 0) & (hour(col("Start_TS")) < 6), 0) #night
                          .when((hour(col("Start_TS")) >= 6) & (hour(col("Start_TS")) < 12), 1) #morning
                          .when((hour(col("Start_TS")) >= 12) & (hour(col("Start_TS")) < 18), 2) #afternoon
                          .otherwise(3)) #evening
    
df3 = df3.drop("Start_TS", "End_TS", "End_Time", "End_Lat", "End_Lng", "Start_Time", "Weather_Timestamp")

df3.show()

+--------+---------+---------+--------------------+------------+-----+--------------+-----------+------------+--------------+--------------+---------------+-----------------+--------+--------+--------------+--------------+--------------+--------+----+---------+-----+------------+-----------+--------+
|Severity|Start_Lat|Start_Lng|              Street|        City|State|Temperature(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Weather_Condition|Crossing|Junction|Traffic_Signal|Sunrise_Sunset|Civil_Twilight|Duration|Hour|DayOfWeek|Month|Weather_Hour|Weather_Day|TimeSlot|
+--------+---------+---------+--------------------+------------+-----+--------------+-----------+------------+--------------+--------------+---------------+-----------------+--------+--------+--------------+--------------+--------------+--------+----+---------+-----+------------+-----------+--------+
|       3|     39.9|    -84.1|              I-70 E|      Dayton|   OH|          36.9|       91

In [None]:
df3 = df3.filter(col("Duration") > 0)

# 3. Uç değerleri ayıkla (%99'luk dilim)
q99 = df3.approxQuantile("Duration", [0.99], 0.01)[0]
df3 = df3.filter(col("Duration") <= q99)

# 4. Kategorik sınıflara böl (label sütunu olarak)
df3 = df3.withColumn("Duration_Class", 
    when(col("Duration") <= 5, 0) #Very Short
    .when(col("Duration") <= 15, 1) #Short
    .when(col("Duration") <= 60, 0) #Medium
    .when(col("Duration") <= 180, 1) #Long
    .otherwise(4) #Very Long
)

# (very short) 5 (short) 15 (medium) 60 (long) 180 (very long) ++
# short 30 medium 120 long
df3 = df3.drop("Duration")

df3.show()

                                                                                

+--------+---------+---------+--------------------+------------+-----+--------------+-----------+------------+--------------+--------------+---------------+-----------------+--------+--------+--------------+--------------+--------------+----+---------+-----+------------+-----------+--------+--------------+
|Severity|Start_Lat|Start_Lng|              Street|        City|State|Temperature(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Weather_Condition|Crossing|Junction|Traffic_Signal|Sunrise_Sunset|Civil_Twilight|Hour|DayOfWeek|Month|Weather_Hour|Weather_Day|TimeSlot|Duration_Class|
+--------+---------+---------+--------------------+------------+-----+--------------+-----------+------------+--------------+--------------+---------------+-----------------+--------+--------+--------------+--------------+--------------+----+---------+-----+------------+-----------+--------+--------------+
|       3|     39.9|    -84.1|              I-70 E|      Dayton|   OH|      

In [8]:
# Hangi sütunları işleyeceğimizi tanımla
columns_to_clean = ["City", "Street"]
top_n = 128


# Sık geçen değerleri belirleyip "Other" ile gruplayan fonksiyon
def clean_column(df, column_name, top_n=128):
    top_values_df = df.groupBy(column_name).count().orderBy(col("count").desc()).limit(top_n)
    top_values_list = [row[column_name] for row in top_values_df.collect()]
   
    cleaned_col_name = f"{column_name}_Cleaned"
    df = df.withColumn(
        cleaned_col_name,
        when(col(column_name).isin(top_values_list), col(column_name)).otherwise("Other")
    )
    return df

# Ön işleme (zaten yaptığın kısım)
for col_name in columns_to_clean:
    df3 = clean_column(df3, col_name, top_n=top_n)

df4 = df3.drop("Street", "City")
df4.show()


                                                                                

+--------+---------+---------+-----+--------------+-----------+------------+--------------+--------------+---------------+-----------------+--------+--------+--------------+--------------+--------------+----+---------+-----+------------+-----------+--------+--------------+------------+--------------+
|Severity|Start_Lat|Start_Lng|State|Temperature(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Weather_Condition|Crossing|Junction|Traffic_Signal|Sunrise_Sunset|Civil_Twilight|Hour|DayOfWeek|Month|Weather_Hour|Weather_Day|TimeSlot|Duration_Class|City_Cleaned|Street_Cleaned|
+--------+---------+---------+-----+--------------+-----------+------------+--------------+--------------+---------------+-----------------+--------+--------+--------------+--------------+--------------+----+---------+-----+------------+-----------+--------+--------------+------------+--------------+
|       3|     39.9|    -84.1|   OH|          36.9|       91.0|       29.68|          10.0|   

In [17]:
# İşlenecek sütunları tanımla
bool_columns_to_encode = ["Crossing", "Traffic_Signal", "Junction"]

# Boolean değerleri 0 ve 1 olarak etiketleyen fonksiyon
def encode_boolean_column(df, column_name):
    binary_col_name = f"{column_name}_Bin"
    df = df.withColumn(
        binary_col_name,
        when(col(column_name) == True, lit(1)).otherwise(lit(0))
    )
    return df

# Uygulama
for col_name in bool_columns_to_encode:
    df5 = encode_boolean_column(df4, col_name)

df5 = df5.drop("Crossing", "Traffic_Signal", "Junction")
df5.show()


+--------+---------+---------+-----+--------------+-----------+------------+--------------+--------------+---------------+-----------------+--------------+--------------+----+---------+-----+------------+-----------+--------+--------------+------------+--------------+------------+
|Severity|Start_Lat|Start_Lng|State|Temperature(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Weather_Condition|Sunrise_Sunset|Civil_Twilight|Hour|DayOfWeek|Month|Weather_Hour|Weather_Day|TimeSlot|Duration_Class|City_Cleaned|Street_Cleaned|Junction_Bin|
+--------+---------+---------+-----+--------------+-----------+------------+--------------+--------------+---------------+-----------------+--------------+--------------+----+---------+-----+------------+-----------+--------+--------------+------------+--------------+------------+
|       3|     39.9|    -84.1|   OH|          36.9|       91.0|       29.68|          10.0|          Calm|           NULL|       Light Rain|         Night

In [18]:
def clean_weather_condition(df, top_n=30):
    # Null'ları "NULL" stringine çevir (gruplama sırasında kolaylık)
    df = df.withColumn("Weather_Condition", when(col("Weather_Condition").isNull(), "NULL").otherwise(col("Weather_Condition")))

    # En sık geçen ilk N değeri al
    top_values_df = df.groupBy("Weather_Condition").count().orderBy(col("count").desc()).limit(top_n)
    top_values_list = [row["Weather_Condition"] for row in top_values_df.collect()]

    # Temizlenmiş yeni sütun
    df = df.withColumn(
        "Weather_Condition_Cleaned",
        when(col("Weather_Condition").isin(top_values_list), col("Weather_Condition")).otherwise("Other")
    )

    # Orijinal sütunu at
    df = df.drop("Weather_Condition")
    return df

df6 = clean_weather_condition(df5)

df6 = df6.drop("Weather_Condition")
df6.show()




+--------+---------+---------+-----+--------------+-----------+------------+--------------+--------------+---------------+--------------+--------------+----+---------+-----+------------+-----------+--------+--------------+------------+--------------+------------+-------------------------+
|Severity|Start_Lat|Start_Lng|State|Temperature(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Sunrise_Sunset|Civil_Twilight|Hour|DayOfWeek|Month|Weather_Hour|Weather_Day|TimeSlot|Duration_Class|City_Cleaned|Street_Cleaned|Junction_Bin|Weather_Condition_Cleaned|
+--------+---------+---------+-----+--------------+-----------+------------+--------------+--------------+---------------+--------------+--------------+----+---------+-----+------------+-----------+--------+--------------+------------+--------------+------------+-------------------------+
|       3|     39.9|    -84.1|   OH|          36.9|       91.0|       29.68|          10.0|          Calm|           NULL|        

                                                                                

In [11]:
selected_cols = [ "Temperature(F)", "Humidity(%)", "Pressure(in)", "Visibility(mi)",
      "Wind_Speed(mph)", "Weather_Condition_Cleaned", "Wind_Direction", "Junction_Bin", "Duration_Class","Severity",
      "Civil_Twilight", "Sunrise_Sunset", "State", "City_Cleaned", "Street_Cleaned", 
      "DayOfWeek", "Start_Lat", "Start_Lng", "Hour", "Month", "TimeSlot", "Weather_Day", "Weather_Hour"]

In [19]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

df_selected = df6.select(*selected_cols)

categorical_cols = [
    "Wind_Direction", "Civil_Twilight", "Weather_Condition_Cleaned",
    "Sunrise_Sunset", "State", "City_Cleaned", "Street_Cleaned"
]

indexers = [
    StringIndexer(inputCol=col, outputCol=col + "_Idx", handleInvalid="keep")
    for col in categorical_cols
]

feature_cols = ["Start_Lat", "Start_Lng", "State_Idx","Temperature(F)", "Humidity(%)","Pressure(in)","Visibility(mi)",
"Wind_Direction_Idx","Wind_Speed(mph)","Sunrise_Sunset_Idx","Civil_Twilight_Idx","Hour","DayOfWeek","Month","Weather_Hour",
"Weather_Day","TimeSlot","Duration_Class","City_Cleaned_Idx","Street_Cleaned_Idx","Junction_Bin","Weather_Condition_Cleaned_Idx"]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features",
    handleInvalid="skip"
)

rf = RandomForestClassifier(
    labelCol="Duration_Class",
    featuresCol="features",
    numTrees=20,
    maxBins=216
)

df_selected.show()

+--------------+-----------+------------+--------------+---------------+-------------------------+--------------+------------+--------------+--------+--------------+--------------+-----+------------+--------------+---------+---------+---------+----+-----+--------+-----------+------------+
|Temperature(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Speed(mph)|Weather_Condition_Cleaned|Wind_Direction|Junction_Bin|Duration_Class|Severity|Civil_Twilight|Sunrise_Sunset|State|City_Cleaned|Street_Cleaned|DayOfWeek|Start_Lat|Start_Lng|Hour|Month|TimeSlot|Weather_Day|Weather_Hour|
+--------------+-----------+------------+--------------+---------------+-------------------------+--------------+------------+--------------+--------+--------------+--------------+-----+------------+--------------+---------+---------+---------+----+-----+--------+-----------+------------+
|          36.9|       91.0|       29.68|          10.0|           NULL|               Light Rain|          Calm|           0|    

In [20]:
# 9. NA temizliği ve veri bölme
df_no_na = df_selected.dropna().cache()
train, test = df_no_na.randomSplit([0.8, 0.2], seed=42)

df_no_na.show()



+--------------+-----------+------------+--------------+---------------+-------------------------+--------------+------------+--------------+--------+--------------+--------------+-----+------------+--------------+---------+---------+---------+----+-----+--------+-----------+------------+
|Temperature(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Speed(mph)|Weather_Condition_Cleaned|Wind_Direction|Junction_Bin|Duration_Class|Severity|Civil_Twilight|Sunrise_Sunset|State|City_Cleaned|Street_Cleaned|DayOfWeek|Start_Lat|Start_Lng|Hour|Month|TimeSlot|Weather_Day|Weather_Hour|
+--------------+-----------+------------+--------------+---------------+-------------------------+--------------+------------+--------------+--------+--------------+--------------+-----+------------+--------------+---------+---------+---------+----+-----+--------+-----------+------------+
|          36.0|      100.0|       29.67|          10.0|            3.5|                 Overcast|            SW|           0|    

                                                                                

In [21]:
# 8. Pipeline
pipeline = Pipeline(stages=indexers + [assembler, rf])
model = pipeline.fit(df_no_na)

                                                                                

In [22]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# 11. Tahmin üret
predictions = model.transform(test)
df_no_na.unpersist()

evaluator = MulticlassClassificationEvaluator(labelCol="Duration_Class", predictionCol="prediction")

accuracy = evaluator.setMetricName("accuracy").evaluate(predictions)
precision = evaluator.setMetricName("weightedPrecision").evaluate(predictions)
recall = evaluator.setMetricName("weightedRecall").evaluate(predictions)
f1 = evaluator.setMetricName("f1").evaluate(predictions)

print(f"Accuracy: {accuracy:.4f}")
print(f"Weighted Precision: {precision:.4f}")
print(f"Weighted Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")



Accuracy: 0.9300
Weighted Precision: 0.9389
Weighted Recall: 0.9300
F1 Score: 0.9211


                                                                                

In [23]:
model.write().overwrite().save("models/us_accidents_duration_rf")