In [None]:
import os, json
import yaml
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    from_json, col, to_timestamp, window,
    sum as _sum, avg as _avg, when, to_json, struct
)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType


cfg_path = os.path.join("config_m3.yaml")
with open(cfg_path) as f:
    cfg = yaml.safe_load(f)

kafka_boot = cfg['kafka']['bootstrap_servers']
input_topic = cfg['kafka']['input_topic']
alert_topic = cfg['kafka']['alert_topic']

w_dur = cfg['streaming']['window_duration']
s_dur = cfg['streaming']['slide_duration']
thr_med = cfg['streaming']['vehicle_threshold_moderate']
thr_high = cfg['streaming']['vehicle_threshold_high']

checkpoint = cfg['streaming']['checkpoint_dir']
parquet_out = cfg['paths']['parquet_output']


spark = SparkSession.builder.appName("RealTimeTrafficAnalytics").getOrCreate()
spark.sparkContext.setLogLevel("WARN")


raw = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", kafka_boot)
    .option("subscribe", input_topic)
    .option("startingOffsets", "latest")
    .load()
)

schema = StructType([
    StructField("timestamp", StringType()),
    StructField("street_name", StringType()),
    StructField("vehicle_count", IntegerType()),
    StructField("avg_speed", DoubleType())
])

json_str = raw.selectExpr("CAST(value AS STRING) as json_str")
parsed = json_str.select(from_json(col("json_str"), schema).alias("data")).select("data.*")
parsed = parsed.withColumn("event_time", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss"))


parsed_with_watermark = parsed.withWatermark("event_time", "30 seconds") 

agg = (
    parsed_with_watermark # استخدام DataFrame مع العلامة المائية
    .groupBy(window(col("event_time"), w_dur, s_dur), col("street_name"))
    .agg(
        _sum("vehicle_count").alias("vehicles_per_window"),
        _avg("avg_speed").alias("avg_speed")
    )
    .select(
        col("window").start.alias("window_start"),
        col("window").end.alias("window_end"),
        col("street_name"),
        col("vehicles_per_window"),
        col("avg_speed")
    )
)


agg2 = (
    agg
    .withColumn(
        "alert_level",
        when(col("avg_speed") < 15, "HIGH")           
        .when(col("avg_speed") < 25, "MEDIUM")         
        .when(col("vehicles_per_window") > thr_high, "HIGH")  
        .when(col("vehicles_per_window") > thr_med, "MEDIUM") 
        .otherwise("LOW")
    )
    .withColumn(
        "is_alert",
        (
            (col("avg_speed") < 25) | (col("vehicles_per_window") > thr_med)
        ).cast("boolean")
    )
)

agg2_clean = agg2.fillna(0.0, subset=['vehicles_per_window', 'avg_speed'])


alerts_for_kafka = (
    agg2_clean 
    .filter(col("is_alert") == True)
    .select(to_json(struct(*[col(c) for c in agg2_clean.columns])).alias("value"))
)

alerts_query = (
    alerts_for_kafka
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", kafka_boot)
    .option("topic", alert_topic)
    .option("checkpointLocation", checkpoint + "/alerts_kafka")
    .outputMode("update")
    .start()
)


parquet_query = (
    agg2_clean 
    .writeStream
    .format("parquet")
    .option("path", parquet_out)
    .option("checkpointLocation", checkpoint + "/parquet")
    .outputMode("append")
    .start()
)


console_query = (
    agg2_clean 
    .writeStream
    .format("console")
    .outputMode("update")
    .option("truncate", False)
    .start()
)



In [None]:
import os
import yaml
import json
import paho.mqtt.client as mqtt
import pandas as pd
from datetime import datetime 
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    from_json, col, to_timestamp, window, sum as _sum, avg as _avg,
    when, to_json, struct
)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType


cfg_path = os.path.join("config_m3.yaml")
with open(cfg_path) as f:
    cfg = yaml.safe_load(f)

KAFKA_BOOT = cfg['kafka']['bootstrap_servers']
INPUT_TOPIC = cfg['kafka']['input_topic']
ALERT_TOPIC = cfg['kafka']['alert_topic']

WINDOW_DURATION = cfg['streaming']['window_duration']
SLIDE_DURATION = cfg['streaming']['slide_duration']
VEHICLE_THRESHOLD_MED = cfg['streaming']['vehicle_threshold_moderate']
VEHICLE_THRESHOLD_HIGH = cfg['streaming']['vehicle_threshold_high']

CHECKPOINT_DIR = cfg['streaming']['checkpoint_dir']
LIGHT_TOPIC_PREFIX = cfg['lighting_control']['lighting_topic_prefix']

MQTT_HOST = cfg['mqtt']['host']
MQTT_PORT = cfg['mqtt']['port']

-
spark = SparkSession.builder.appName("LightingController").getOrCreate()
spark.sparkContext.setLogLevel("WARN")


print("Stopping any previously active streaming queries...")
for q in spark.streams.active:
    try:
        q.stop()
        print(f"Query '{q.name}' stopped.")
    except Exception as e:
        print(f"Error stopping query '{q.name}': {e}")
print("--- All previous queries stopped. ---")


raw = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BOOT)
    .option("subscribe", INPUT_TOPIC)
    .option("startingOffsets", "latest")
    .load()
)

schema = StructType([
    StructField("timestamp", StringType()),
    StructField("street_name", StringType()),
    StructField("vehicle_count", IntegerType()),
    StructField("avg_speed", DoubleType())
])

json_str = raw.selectExpr("CAST(value AS STRING) as json_str")
parsed = json_str.select(from_json(col("json_str"), schema).alias("data")).select("data.*")
parsed = parsed.withColumn("event_time", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss"))


agg_input = parsed.withWatermark("event_time", "1 minute")

agg = (
    agg_input.groupBy(window(col("event_time"), WINDOW_DURATION, SLIDE_DURATION), col("street_name"))
    .agg(
        _sum("vehicle_count").alias("vehicles_per_window"),
        _avg("avg_speed").alias("avg_speed")
    )
    .select(
        col("window").start.alias("window_start"),
        col("window").end.alias("window_end"),
        col("street_name"),
        col("vehicles_per_window"),
        col("avg_speed")
    )
)


agg2 = (
    agg.withColumn(
        "alert_level",
        when(col("avg_speed") < 15, "HIGH")
        .when(col("avg_speed") < 25, "MEDIUM")
        .when(col("vehicles_per_window") > VEHICLE_THRESHOLD_HIGH, "HIGH")
        .when(col("vehicles_per_window") > VEHICLE_THRESHOLD_MED, "MEDIUM")
        .otherwise("LOW")
    ).withColumn(
        "is_alert",
        ((col("avg_speed") < 25) | (col("vehicles_per_window") > VEHICLE_THRESHOLD_MED)).cast("boolean")
    )
)


mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqtt_client.connect(MQTT_HOST, MQTT_PORT, 60)

def publish_to_mqtt(df, epoch_id):
    
    pdf = df.toPandas()
    for _, row in pdf.iterrows():
        street = row['street_name']
        level = row['alert_level']
        topic = f"{LIGHT_TOPIC_PREFIX}/{street}/set"
        
     
        payload = {
            "street": street, 
            "level": level,
            "avg_speed": row['avg_speed'] if not pd.isna(row['avg_speed']) else 0.0,
            "vehicles": row['vehicles_per_window'] if not pd.isna(row['vehicles_per_window']) else 0
        }
        
        mqtt_client.publish(topic, json.dumps(payload))
        print(f"Published to {topic}: {payload}")


current_timestamp = datetime.now().strftime('%Y%m%d%H%M%S')

agg2.writeStream.foreachBatch(publish_to_mqtt)\
    .option("checkpointLocation", os.path.join(CHECKPOINT_DIR, f"lighting_controller_{current_timestamp}"))\
    .outputMode("update")\
    .queryName("mqtt_lighting_controller")\
    .start()\
    .awaitTermination()