In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, when, avg, isnan, count, round,
    to_timestamp, hour, dayofweek, date_format
)
from pyspark.sql.types import (
    StringType, DoubleType, LongType, TimestampType
)

In [0]:
spark = SparkSession.builder.appName("IoT Traffic Signal Preprocessing").getOrCreate()
df_raw = spark.table("iot_edge_computing_public_management")

In [0]:
df = (
    df_raw
    .withColumnRenamed("vehicle_speed (km/h)", "vehicle_speed_kmph")
)


In [0]:
df = (
    df
    .withColumn("timestamp", col("timestamp").cast(TimestampType()))
    .withColumn("sensor_id", col("sensor_id").cast(StringType()))
    .withColumn("vehicle_speed_kmph", col("vehicle_speed_kmph").cast(LongType()))
    .withColumn("latitude", col("latitude").cast(DoubleType()))
    .withColumn("longitude", col("longitude").cast(DoubleType()))
    .withColumn("traffic_pattern", col("traffic_pattern").cast(StringType()))
    .withColumn("incident_report", col("incident_report").cast(StringType()))
    .withColumn("accident_hotspot", col("accident_hotspot").cast(LongType()))
    .withColumn("event_type", col("event_type").cast(StringType()))
)

In [0]:
numeric_cols = [f.name for f in df.schema.fields if f.dataType.typeName() in ["double", "float", "long", "int", "bigint"]]
string_cols = [f.name for f in df.schema.fields if f.dataType.typeName() == "string"]

missing_counts = df.select([
    count(
        when(
            col(c).isNull() | (isnan(col(c)) if c in numeric_cols else False),
            c
        )
    ).alias(c)
    for c in df.columns
])

display(missing_counts)


timestamp,sensor_id,vehicle_speed_kmph,latitude,longitude,traffic_pattern,incident_report,accident_hotspot,event_type
0,0,0,0,0,0,0,0,0


In [0]:
means = df.select([avg(c).alias(c) for c in numeric_cols]).collect()[0].asDict()
df = df.fillna(means)
df = df.fillna("Unknown", subset=string_cols)

In [0]:
df = (
    df
    .withColumn("hour_of_day", hour(col("timestamp")))
    .withColumn("day_of_week", dayofweek(col("timestamp")))  # 1=Sunday, 7=Saturday
    .withColumn("date", date_format(col("timestamp"), "yyyy-MM-dd"))
)


In [0]:
df = df.filter((col("vehicle_speed_kmph") >= 0) & (col("vehicle_speed_kmph") <= 200))
df = df.filter((col("latitude").between(-90, 90)) & (col("longitude").between(-180, 180)))
display(df)

timestamp,sensor_id,vehicle_speed_kmph,latitude,longitude,traffic_pattern,incident_report,accident_hotspot,event_type,hour_of_day,day_of_week,date
2024-12-06T08:00:00.000Z,Vehicle_1,114,34.54926150831106,-79.30136853880121,Clear,,0,Accident,8,6,2024-12-06
2024-12-06T08:15:00.000Z,Vehicle_2,63,36.8189725647745,-97.57893133276576,Heavy,Breakdown,1,Normal,8,6,2024-12-06
2024-12-06T08:30:00.000Z,Vehicle_3,67,39.89604505169282,-111.73397542150222,Moderate,Police Blockade,0,Accident,8,6,2024-12-06
2024-12-06T08:45:00.000Z,Vehicle_4,24,34.22040303269379,-99.54087923662124,Moderate,Police Blockade,0,Congestion,8,6,2024-12-06
2024-12-06T09:00:00.000Z,Vehicle_5,62,38.48441454374566,-109.6230577482927,Heavy,Accident,0,Congestion,9,6,2024-12-06
2024-12-06T09:15:00.000Z,Vehicle_6,33,38.677279519159455,-87.59237957097622,Heavy,Police Blockade,0,Accident,9,6,2024-12-06
2024-12-06T09:30:00.000Z,Vehicle_7,79,37.98355446411157,-97.84793593532116,Clear,Traffic Jam,0,Normal,9,6,2024-12-06
2024-12-06T09:45:00.000Z,Vehicle_8,44,37.95666123733266,-106.16079872767178,Clear,Traffic Jam,0,Accident,9,6,2024-12-06
2024-12-06T10:00:00.000Z,Vehicle_9,34,38.23071478990284,-73.82381311916097,Heavy,Accident,0,Normal,10,6,2024-12-06
2024-12-06T10:15:00.000Z,Vehicle_10,39,36.13303754783956,-106.65777975084892,Light,Accident,0,Congestion,10,6,2024-12-06


In [0]:
df.write.mode("overwrite").saveAsTable("iot_edge_computing_public_management_cleaned")

print("✅ Data preprocessing complete — cleaned table saved as 'iot_edge_computing_public_management_cleaned'.")

✅ Data preprocessing complete — cleaned table saved as 'iot_edge_computing_public_management_cleaned'.
