In [None]:
# This notebook is the 'static' version of the pipeline, it loads data 
# from JSON files in batch mode and writes them into a Delta table.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, TimestampType
from pathlib import Path
from delta import configure_spark_with_delta_pip
from delta.tables import DeltaTable

delta_package = "io.delta:delta-spark_2.13:4.0.0"

builder = (SparkSession.builder
    .appName("SmartTech_Streaming")
    .config("spark.jars.packages", delta_package)
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)


spark = configure_spark_with_delta_pip(builder).getOrCreate()

print(f"Spark version : {spark.version}")
print("Connecteur Delta chargé avec succès.")


Spark version : 4.0.1
Connecteur Delta chargé avec succès.


In [None]:
# definition of the streaming dataframe schema, with a 'corrupt_reccord' column in case of errors
schema_sensor = StructType([
    StructField("timestamp", TimestampType(), True),
    StructField("device_id", StringType(), True),
    StructField("building", StringType(), True),
    StructField("floor", IntegerType(), True),
    StructField("type", StringType(), True),
    StructField("value", FloatType(), True),
    StructField("unit", StringType(), True),
    StructField("_corrupt_record", StringType(), True)
])

In [None]:
# definition of the differents paths with Path objects
INPUT_PATH = Path.cwd().parent/"data"/"sensor_data"
BRONZE_PATH = Path.cwd().parent/"data"/"out"/"delta_bronze"
CHECKPOINT_PATH = Path.cwd().parent/"data"/"out"/"checkpoint"


In [None]:
# definition of the streaming dataframe - lazy execution
df_stream = (spark.readStream
             .format("json")
             .schema(schema_sensor)
             .option("maxFilesPerTrigger", 1)
             .option("columnNameOfCorruptRecord", "_corrupt_record")
             .load(str(INPUT_PATH))
)

In [None]:
# we filter the Null values - lazy execution
df_stream_clean = df_stream.filter(col("timestamp")
                                   .isNotNull() & 
                                   col("device_id").isNotNull() & 
                                   col("building").isNotNull() & 
                                   col("floor").isNotNull() & 
                                   col("type").isNotNull() & 
                                   col("value").isNotNull() & 
                                   col("unit").isNotNull())



In [None]:
# writing the clean streaming dataframe into a Delta table
query = (df_stream_clean.writeStream          
    .format("delta")                         
    .outputMode("append")                    
    .option("checkpointLocation", str(CHECKPOINT_PATH))
    .option("path", str(BRONZE_PATH))
    .trigger(processingTime='1 seconds')    
    .toTable("bronze_sensor_data")              
)


25/12/16 11:18:15 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


25/12/16 11:18:16 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/12/16 11:18:20 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000} milliseconds, but spent 4367 milliseconds
25/12/16 11:18:21 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000} milliseconds, but spent 1089 milliseconds
25/12/16 11:18:22 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000} milliseconds, but spent 1046 milliseconds
25/12/16 11:18:29 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000} milliseconds, but spent 1537 milliseconds
25/12/16 11:18:30 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000} milliseconds, but spent 1457 milliseconds
25/12/16 11:18:32 WARN ProcessingTimeExecutor: Curr

In [None]:
bronze_delta = DeltaTable.forPath(spark, str(BRONZE_PATH))


In [None]:
delta_table = DeltaTable.forPath(spark, str(BRONZE_PATH))
delta_table.history().select("version", "timestamp", "operation", "operationMetrics").show(50,truncate=False)

+-------+-----------------------+----------------+--------------------------------------------------------------------------------------+
|version|timestamp              |operation       |operationMetrics                                                                      |
+-------+-----------------------+----------------+--------------------------------------------------------------------------------------+
|100    |2025-12-16 11:19:58.702|STREAMING UPDATE|{numRemovedFiles -> 0, numAddedFiles -> 1, numOutputRows -> 5, numOutputBytes -> 2591}|
|99     |2025-12-16 11:19:57.854|STREAMING UPDATE|{numRemovedFiles -> 0, numAddedFiles -> 1, numOutputRows -> 5, numOutputBytes -> 2550}|
|98     |2025-12-16 11:19:56.796|STREAMING UPDATE|{numRemovedFiles -> 0, numAddedFiles -> 1, numOutputRows -> 5, numOutputBytes -> 2590}|
|97     |2025-12-16 11:19:55.678|STREAMING UPDATE|{numRemovedFiles -> 0, numAddedFiles -> 1, numOutputRows -> 5, numOutputBytes -> 2548}|
|96     |2025-12-16 11:19:54.648|S

In [17]:
spark.sql("SELECT *  FROM bronze_sensor_data LIMIT 20").show()

+-------------------+-----------------+--------+-----+------------------+------+----+---------------+
|          timestamp|        device_id|building|floor|              type| value|unit|_corrupt_record|
+-------------------+-----------------+--------+-----+------------------+------+----+---------------+
|2025-01-12 09:37:39|  sensor-temp-003|       A|    1|       temperature|  27.6|  °C|           NULL|
|2025-01-12 09:37:43|   sensor-co2-020|       A|    3|               co2| 758.0| ppm|           NULL|
|2025-01-12 09:37:47|sensor-energy-010|       B|    3|energy_consumption| 171.6| kWh|           NULL|
|2025-01-12 09:37:50|  sensor-temp-001|       B|    2|       temperature|  27.7|  °C|           NULL|
|2025-01-12 09:37:53|   sensor-hum-002|       A|    1|          humidity|  30.9|   %|           NULL|
|2025-01-12 09:37:24|   sensor-co2-020|       A|    1|               co2| 949.0| ppm|           NULL|
|2025-01-12 09:37:27|   sensor-hum-002|       A|    1|          humidity|  53.6|  