In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, when, to_timestamp, lit
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

spark = SparkSession.builder \
    .appName("IoT Streaming ETL + Alerts") \
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .config("spark.driver.extraClassPath",
            "/home/jovyan/work/Graduation/mssql-jdbc-13.2.1.jre8.jar") \
    .getOrCreate()

print("Spark Session started")

KAFKA_TOPIC = "iot_sensors"

schema = StructType([
    StructField("device_id", StringType(), True),
    StructField("temperature", DoubleType(), True),
    StructField("humidity", IntegerType(), True),
    StructField("pressure", IntegerType(), True),
    StructField("air_quality", IntegerType(), True),
    StructField("noise_level", IntegerType(), True),
    StructField("battery_level", IntegerType(), True),
    StructField("signal_strength", IntegerType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("timestamp", StringType(), True)
])

raw_df = spark.readStream.format("kafka") \
   .option("kafka.bootstrap.servers", "kafka:9092") \
   .option("subscribe", KAFKA_TOPIC) \
   .option("startingOffsets", "latest") \
   .load()

df = raw_df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")

df = df.withColumn("timestamp", to_timestamp("timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSSSSS"))

df = df.withColumn(
    "temperature_status",
    when(col("temperature") > 30, "High")
    .when(col("temperature") < 22, "Low")
    .otherwise("Normal")
)

df = df.withColumn(
    "battery_status",
    when(col("battery_level") < 20, "Low Battery")
    .otherwise("OK")
)

df = df.filter((col("temperature") >= -10) & (col("temperature") <= 60))

df = df.withColumn(
    "anomaly_flag",
    when((col("temperature") > 40) | (col("temperature") < 10), 1).otherwise(0)
)

alerts_df = df.select(
    "device_id", "timestamp",
    when(col("temperature") > 40, lit("High temperature alert"))
    .when(col("battery_level") < 20, lit("Low battery alert"))
    .when(col("air_quality") < 30, lit("Poor air quality alert"))
    .when(col("noise_level") > 80, lit("Noise alert"))
    .otherwise(None).alias("alert_message")
).filter(col("alert_message").isNotNull())

jdbc_url = "jdbc:sqlserver://host.docker.internal:1433;databaseName=IoT_DB_str;encrypt=false;"
connection_properties = {
    "user": "sa",
    "password": "Salma_SQL@2005DEPI",
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

def write_to_sql(batch_df, batch_id):
    batch_df.write.jdbc(
        url=jdbc_url,
        table="iot_data",
        mode="append",
        properties=connection_properties
    )

def write_alerts_to_sql(batch_df, batch_id):
    batch_df.write.jdbc(
        url=jdbc_url,
        table="iot_alerts",
        mode="append",
        properties=connection_properties
    )

query_data = df.writeStream \
    .foreachBatch(write_to_sql) \
    .outputMode("append") \
    .start()

query_alerts = alerts_df.writeStream \
    .foreachBatch(write_alerts_to_sql) \
    .outputMode("append") \
    .start()

query_data.awaitTermination()
query_alerts.awaitTermination()


Spark Session started


StreamingQueryException: [STREAM_FAILED] Query [id = 0d123d9e-2d15-4c56-a549-ed1d2907ad88, runId = f5366fd7-49d7-487d-85f1-a142cf9b9ee0] terminated with exception: Failed to create new KafkaAdminClient