In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, avg, max, min
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Initialize Spark
spark = SparkSession.builder \
    .appName("IoT_Gold_Layer") \
    .config("spark.sql.streaming.checkpointLocation", "/home/jovyan/lakehouse/checkpoints/gold") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# Define Schema for Silver Data (We know it from the previous step)
silver_schema = StructType([
    StructField("timestamp", TimestampType(), True),
    StructField("TP2", DoubleType(), True),
    StructField("TP3", DoubleType(), True),
    StructField("H1", StringType(), True),
    StructField("DV_pressure", StringType(), True),
    StructField("Reservoirs", StringType(), True),
    StructField("Oil_temperature", DoubleType(), True),
    StructField("Motor_current", DoubleType(), True),
    StructField("kafka_arrival_time", StringType(), True)
])

# Read from Silver Layer
silver_df = spark.readStream \
    .schema(silver_schema) \
    .parquet("/home/jovyan/lakehouse/silver")

print("âœ… Connected to Silver Layer")

âœ… Connected to Silver Layer


Feature Engineering

In [2]:
# 2. Define Schema for Silver Data
# We must match the schema of the files written in the Silver step
silver_schema = StructType([
    StructField("timestamp", TimestampType(), True),
    StructField("TP2", DoubleType(), True),
    StructField("TP3", DoubleType(), True),
    StructField("H1", StringType(), True),
    StructField("DV_pressure", StringType(), True),
    StructField("Reservoirs", StringType(), True),
    StructField("Oil_temperature", DoubleType(), True),
    StructField("Motor_current", DoubleType(), True),
    StructField("kafka_arrival_time", StringType(), True)
])

# 3. Read Stream from Silver Layer
silver_df = spark.readStream \
    .schema(silver_schema) \
    .parquet("/home/jovyan/lakehouse/silver")

# 4. Feature Engineering (Windowed Aggregation)
# We calculate KPIs per minute
gold_df = silver_df \
    .withWatermark("timestamp", "2 minutes") \
    .groupBy(window(col("timestamp"), "1 minute", "30 seconds")) \
    .agg(
        avg("TP2").alias("Avg_Pressure_TP2"),
        avg("Oil_temperature").alias("Avg_Oil_Temp"),
        max("Motor_current").alias("Max_Motor_Current"),
        min("TP3").alias("Min_Pressure_TP3")
    ) \
    .select(
        col("window.start").alias("Window_Start"),
        col("window.end").alias("Window_End"),
        col("Avg_Pressure_TP2"),
        col("Avg_Oil_Temp"),
        col("Max_Motor_Current")
    )

print("âœ… Logic Applied with Watermark")
gold_df.printSchema()

âœ… Logic Applied with Watermark
root
 |-- Window_Start: timestamp (nullable = true)
 |-- Window_End: timestamp (nullable = true)
 |-- Avg_Pressure_TP2: double (nullable = true)
 |-- Avg_Oil_Temp: double (nullable = true)
 |-- Max_Motor_Current: double (nullable = true)



Write to Gold Layer

In [None]:
# 5. Write Stream to Gold Layer (Parquet)
query = gold_df.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .option("path", "/home/jovyan/lakehouse/gold") \
    .option("checkpointLocation", "/home/jovyan/lakehouse/checkpoints/gold") \
    .option("truncate", "false") \
    .trigger(processingTime='10 seconds') \
    .start()

print(f"ðŸš€ Streaming to Gold Layer started... RunId: {query.runId}")
query.awaitTermination()

ðŸš€ Streaming to Gold Layer started... RunId: 40b60d8d-fd7a-4621-85cd-6693cd1eac5d
