In [0]:
%spark

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame

// ---------- VERSIONING CONFIG ----------

// Change this string when you want a "fresh" run with new folders
// e.g. "v1", "v2", "exp1", "final", etc.
val runId = "v2"

// Build versioned paths
val outputPath = s"/home/devpandya/parquet_clean_$runId"
val ckPath     = s"/home/devpandya/parquet_clean_ck_$runId"

println(s"Using output path: $outputPath")
println(s"Using checkpoint : $ckPath")

// ---------- 1. Ingestion from Kafka ----------

val kafkaBootstrap = "localhost:9092"
val topic = "electricity_topic"

val rawKafka = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBootstrap)
  .option("subscribe", topic)
  .option("startingOffsets", "latest")
  .load()

// JSON schema from your message
val jsonSchema = StructType(Array(
  StructField("State_Code", StringType),
  StructField("Timestamp_UTC", StringType),
  StructField("Gross_Load_MW", StringType),
  StructField("Hour_Of_Day", StringType),
  StructField("Day_Of_Week", StringType),
  StructField("Is_Weekend", StringType),
  StructField("Is_Holiday_State", StringType),
  StructField("Avg_Temp_C", StringType),
  StructField("Temp_Change_6H", StringType),
  StructField("Avg_Humidity_Pct", StringType)
))

// Parse JSON payload
val rawParsed = rawKafka
  .selectExpr("CAST(value AS STRING) AS json")
  .select(from_json(col("json"), jsonSchema).as("data"))
  .select("data.*")

// Basic type casting
val typed = rawParsed
  .withColumn("Gross_Load_MW", col("Gross_Load_MW").cast(DoubleType))
  .withColumn("Hour_Of_Day", col("Hour_Of_Day").cast(IntegerType))
  .withColumn("Day_Of_Week", col("Day_Of_Week").cast(IntegerType))
  .withColumn("Is_Weekend", col("Is_Weekend").cast(IntegerType))
  .withColumn("Is_Holiday_State", col("Is_Holiday_State").cast(IntegerType))
  .withColumn("Avg_Temp_C", col("Avg_Temp_C").cast(DoubleType))
  .withColumn("Temp_Change_6H", col("Temp_Change_6H").cast(DoubleType))
  .withColumn("Avg_Humidity_Pct", col("Avg_Humidity_Pct").cast(DoubleType))

// ---------- 1.5 PREPROCESSING / DATA QUALITY (ROW-LEVEL ONLY) ----------

// Helper: define reasonable ranges (domain knowledge)
val minLoad = 0.0
val maxLoad = 1e6
val minTemp = -30.0
val maxTemp = 55.0
val minHumidity = 0.0
val maxHumidity = 100.0

// Treat NaNs as nulls
val cleanedNulls = typed
  .withColumn("Gross_Load_MW",
    when(col("Gross_Load_MW").isNull || col("Gross_Load_MW").isNaN, lit(null).cast(DoubleType))
      .otherwise(col("Gross_Load_MW"))
  )
  .withColumn("Avg_Temp_C",
    when(col("Avg_Temp_C").isNull || col("Avg_Temp_C").isNaN, lit(null).cast(DoubleType))
      .otherwise(col("Avg_Temp_C"))
  )
  .withColumn("Temp_Change_6H",
    when(col("Temp_Change_6H").isNull || col("Temp_Change_6H").isNaN, lit(null).cast(DoubleType))
      .otherwise(col("Temp_Change_6H"))
  )
  .withColumn("Avg_Humidity_Pct",
    when(col("Avg_Humidity_Pct").isNull || col("Avg_Humidity_Pct").isNaN, lit(null).cast(DoubleType))
      .otherwise(col("Avg_Humidity_Pct"))
  )

// Remove obviously invalid categorical / timestamp values
val filteredInvalid = cleanedNulls
  .filter(col("State_Code").isNotNull && col("Timestamp_UTC").isNotNull)
  .filter(col("Hour_Of_Day").between(0, 23))
  .filter(col("Day_Of_Week").between(0, 6))

// Handle negative or unrealistic loads by clipping and flagging
val clippedLoad = filteredInvalid
  .withColumn(
    "Gross_Load_MW_clipped",
    when(col("Gross_Load_MW").isNull, null)
      .when(col("Gross_Load_MW") < minLoad, minLoad)
      .when(col("Gross_Load_MW") > maxLoad, maxLoad)
      .otherwise(col("Gross_Load_MW"))
  )
  .withColumn(
    "Load_Anomaly_Flag",
    when(col("Gross_Load_MW").lt(minLoad) || col("Gross_Load_MW").gt(maxLoad), lit(1))
      .otherwise(lit(0))
  )

// Clip temperature and humidity to realistic ranges, flag anomalies
val clippedWeather = clippedLoad
  .withColumn(
    "Avg_Temp_C_clipped",
    when(col("Avg_Temp_C").isNull, null)
      .when(col("Avg_Temp_C") < minTemp, minTemp)
      .when(col("Avg_Temp_C") > maxTemp, maxTemp)
      .otherwise(col("Avg_Temp_C"))
  )
  .withColumn(
    "Avg_Humidity_Pct_clipped",
    when(col("Avg_Humidity_Pct").isNull, null)
      .when(col("Avg_Humidity_Pct") < minHumidity, minHumidity)
      .when(col("Avg_Humidity_Pct") > maxHumidity, maxHumidity)
      .otherwise(col("Avg_Humidity_Pct"))
  )
  .withColumn(
    "Weather_Anomaly_Flag",
    when(col("Avg_Temp_C").lt(minTemp) || col("Avg_Temp_C").gt(maxTemp) ||
         col("Avg_Humidity_Pct").lt(minHumidity) || col("Avg_Humidity_Pct").gt(maxHumidity), lit(1))
      .otherwise(lit(0))
  )

// Simple per-row "imputed" values (no aggregation inside stream)
val cleanDf = clippedWeather
  .withColumn("Gross_Load_MW_imputed", col("Gross_Load_MW_clipped"))
  .withColumn("Avg_Temp_C_imputed", col("Avg_Temp_C_clipped"))
  .withColumn("Avg_Humidity_Pct_imputed", col("Avg_Humidity_Pct_clipped"))
  .withColumn("event_time", to_timestamp(col("Timestamp_UTC"), "yyyy-MM-dd HH:mm:ss"))
  .withColumn("date", date_format(col("event_time"), "yyyy-MM-dd"))
  .withColumn("hour", date_format(col("event_time"), "HH"))

// optional: drop intermediate clipped columns if you don't need them
//  .drop("Gross_Load_MW_clipped", "Avg_Temp_C_clipped", "Avg_Humidity_Pct_clipped")

cleanDf.printSchema()

// ---------- 2. Storage: write clean data to versioned Parquet (append) ----------

val parquetSink = cleanDf.writeStream
  .format("parquet")
  .option("path", outputPath)
  .option("checkpointLocation", ckPath)
  .partitionBy("date", "hour")
  .outputMode("append")
  .start()

parquetSink

In [1]:
%spark.pyspark

import sys
import os
import re
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType



run_id = "v2"  # manual override, if needed

input_path = f"/home/devpandya/parquet_clean_{run_id}"
output_path = f"/home/devpandya/parquet_with_predictions_{run_id}"

print("Reading clean parquet from:", input_path)

# -------------------------------------------------------------------
# 2. Import model helper and read data
# -------------------------------------------------------------------

sys.path.append("/home/devpandya/bigdata-stack")
from model_inference_helper import ElectricityLoadPredictor

df = spark.read.parquet(input_path)

print("Rows in clean parquet:", df.count())
df.show(5, truncate=False)

base_col = "Gross_Load_MW_imputed"

# -------------------------------------------------------------------
# 3. Create lag and rolling features
# -------------------------------------------------------------------

w = Window.partitionBy("State_Code").orderBy("event_time")

df_feat = (
    df
    .withColumn("Load_lag_1h",   F.lag(F.col(base_col), 1).over(w))
    .withColumn("Load_lag_2h",   F.lag(F.col(base_col), 2).over(w))
    .withColumn("Load_lag_3h",   F.lag(F.col(base_col), 3).over(w))
    .withColumn("Load_lag_24h",  F.lag(F.col(base_col), 24).over(w))
    .withColumn("Load_lag_168h", F.lag(F.col(base_col), 168).over(w))
    .withColumn(
        "Load_roll_mean_3h",
        F.avg(F.col(base_col)).over(w.rowsBetween(-3, -1))
    )
    .withColumn(
        "Load_roll_mean_24h",
        F.avg(F.col(base_col)).over(w.rowsBetween(-24, -1))
    )
    .withColumn(
        "Load_roll_mean_168h",
        F.avg(F.col(base_col)).over(w.rowsBetween(-168, -1))
    )
)

# Require only the "short" lags/rolls to be present
required_cols_strict = [
    "Load_lag_1h", "Load_lag_2h", "Load_lag_3h",
    "Load_lag_24h",
    "Load_roll_mean_3h", "Load_roll_mean_24h",
]

cond = F.lit(True)
for c in required_cols_strict:
    cond = cond & F.col(c).isNotNull()

df_feat = df_feat.filter(cond)

# For 168h features, approximate missing values with 24h features
df_feat = (
    df_feat
    .withColumn(
        "Load_lag_168h",
        F.when(F.col("Load_lag_168h").isNull(), F.col("Load_lag_24h"))
         .otherwise(F.col("Load_lag_168h"))
    )
    .withColumn(
        "Load_roll_mean_168h",
        F.when(F.col("Load_roll_mean_168h").isNull(), F.col("Load_roll_mean_24h"))
         .otherwise(F.col("Load_roll_mean_168h"))
    )
)


print("Rows after feature engineering:", df_feat.count())
df_feat.select(
    "State_Code", "Timestamp_UTC", base_col,
    "Load_lag_1h", "Load_lag_2h", "Load_lag_3h",
    "Load_lag_24h", "Load_lag_168h",
    "Load_roll_mean_3h", "Load_roll_mean_24h", "Load_roll_mean_168h"
).show(5, truncate=False)

predictor = ElectricityLoadPredictor()

# -------------------------------------------------------------------
# 4. pandas UDF building feature matrix as in training
# -------------------------------------------------------------------

@pandas_udf(DoubleType())
def predict_udf(pdf: pd.DataFrame) -> pd.Series:
    X = pd.DataFrame()
    X["State_Code"] = pdf["State_Code"]
    X["Hour_Of_Day"] = pdf["Hour_Of_Day"]
    X["Day_Of_Week"] = pdf["Day_Of_Week"]
    X["Is_Weekend"] = pdf["Is_Weekend"]
    X["Is_Holiday_State"] = pdf["Is_Holiday_State"]
    X["Avg_Temp_C"] = pdf["Avg_Temp_C_imputed"]
    X["Temp_Change_6H"] = pdf["Temp_Change_6H"]
    X["Avg_Humidity_Pct"] = pdf["Avg_Humidity_Pct_imputed"]

    for col in [
        "Load_lag_1h", "Load_lag_2h", "Load_lag_3h",
        "Load_lag_24h", "Load_lag_168h",
        "Load_roll_mean_3h", "Load_roll_mean_24h", "Load_roll_mean_168h",
    ]:
        X[col] = pdf[col]

    preds = predictor.predict_batch(X.to_dict(orient="records"))
    return pd.Series(preds)

df_pred = df_feat.withColumn(
    "Predicted_Load_MW",
    predict_udf(F.struct([F.col(c) for c in df_feat.columns]))
)

df_pred.show(10, truncate=False)

print("Writing predictions to:", output_path)
(
    df_pred
    .write
    .mode("overwrite")
    .partitionBy("date", "hour")
    .parquet(output_path)
)

print("Done.")

In [2]:
%spark

val runId    = "v2"  // keep in sync with what you used / auto-detected
val predPath = s"/home/devpandya/parquet_with_predictions_$runId"

val predDf = spark.read.parquet(predPath)

println("Rows with predictions: " + predDf.count())
predDf.show(20, truncate=false)

In [3]:
%spark

import org.apache.spark.sql.functions._

val tsAgg = predDf
  .groupBy("State_Code", "date", "hour")
  .agg(
    avg("Gross_Load_MW").alias("Avg_Actual_Load"),
    avg("Predicted_Load_MW").alias("Avg_Predicted_Load")
  )
  .orderBy("State_Code", "date", "hour")

tsAgg.show(100, truncate=false)

In [4]:
%spark

import org.apache.spark.sql.functions._

val withError = predDf
  .withColumn("abs_error", abs(col("Gross_Load_MW") - col("Predicted_Load_MW")))
  .withColumn(
    "ape",
    when(col("Gross_Load_MW") =!= 0.0,
         abs(col("Gross_Load_MW") - col("Predicted_Load_MW")) / abs(col("Gross_Load_MW")))
      .otherwise(lit(null))
  )

val errorByState = withError
  .groupBy("State_Code")
  .agg(
    avg("abs_error").alias("MAE"),
    avg("ape").alias("MAPE")
  )
  .orderBy("State_Code")

errorByState.show(truncate=false)

In [5]:
%spark

val exportDf = predDf.select(
  "State_Code",
  "Timestamp_UTC",
  "Gross_Load_MW",
  "Predicted_Load_MW",
  "Avg_Temp_C",
  "Avg_Humidity_Pct",
  "Is_Weekend"
)

val exportPath = "/home/devpandya/powerbi_export_csv"

exportDf
  .coalesce(1)
  .write
  .mode("overwrite")
  .option("header", "true")
  .csv(exportPath)

println(s"Exported for Power BI to: $exportPath")

In [6]:
%spark

val runId    = "v1"  // or whatever version you are using
val exportPath = s"/mnt/c/Users/DEV/Desktop/Minor Project/Project/powerbi_export_v${runId}"

// Select the columns you want for analysis / Power BI
val exportDf = predDf.select(
  "State_Code",
  "Timestamp_UTC",
  "Gross_Load_MW",
  "Predicted_Load_MW",
  "Avg_Temp_C",
  "Avg_Humidity_Pct",
  "Is_Weekend"
)

// Write as a single CSV file (inside a folder) with header
exportDf
  .coalesce(1)
  .write
  .mode("overwrite")
  .option("header", "true")
  .csv(exportPath)

println(s"Exported for Power BI to Windows path mounted at: $exportPath")


In [7]:
%spark
