# USE CASE 5 - ESG REPORTING

## 1.Imports & Config

In [0]:
# Run at top of notebook
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.regression import GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

# For anomaly detection (small sampled Pandas)
import pandas as pd
from sklearn.ensemble import IsolationForest

# Paths
catalog_table = "data.energy_volume.energy_power_data"   # your table
report_local_path = "/databricks/driver/esg_report_by_location.csv"

## 2. Load data & basic cleaning

In [0]:
# Load from Unity Catalog
df = spark.table(catalog_table)

# Ensure timestamp type and select core columns
df = (df
      .withColumn("timestamp", F.to_timestamp("timestamp"))
      .select("timestamp", "device_id", "device_type", "location", F.col("energy_kWh").alias("energy_kwh"), F.col("carbon_kg").alias("carbon_kg"))
     )

df = df.na.drop(subset=["timestamp","energy_kwh"])  # drop totally bad rows
df.printSchema()
display(df.limit(5))

root
 |-- timestamp: timestamp (nullable = true)
 |-- device_id: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- location: string (nullable = true)
 |-- energy_kwh: string (nullable = true)
 |-- carbon_kg: string (nullable = true)



timestamp,device_id,device_type,location,energy_kwh,carbon_kg
2025-10-12T08:33:55.663Z,device_001,Smartphone,Home,0.044,0.026
2025-10-12T08:33:55.663Z,device_002,Fan,Office,0.081,0.032
2025-10-12T08:33:55.663Z,device_003,Laptop,Office,0.106,0.042
2025-10-12T08:33:55.663Z,device_004,Fan,Home,0.103,0.062
2025-10-12T08:33:55.663Z,device_005,AC,Office,1.457,0.583


## 3. KPI aggregation (hourly + daily + location)

In [0]:
# Hourly aggregate: total energy & avg carbon
hourly = (df
          .withColumn("hour_ts", F.date_trunc("hour","timestamp"))
          .groupBy("hour_ts","location")
          .agg(F.sum("energy_kwh").alias("hour_energy_kwh"),
               F.avg("carbon_kg").alias("hour_avg_carbon_kg"))
         )

# Daily aggregate per location (features for ESG)
daily = (hourly
         .withColumn("date", F.to_date("hour_ts"))
         .groupBy("date","location")
         .agg(F.sum("hour_energy_kwh").alias("daily_energy_kwh"),
              F.avg("hour_avg_carbon_kg").alias("daily_avg_carbon_kg"),
              F.max("hour_energy_kwh").alias("daily_peak_kwh"),
              F.stddev("hour_energy_kwh").alias("daily_std_kwh"))
        )

# Location-level aggregation over whole dataset (can be used to score)
location_kpis = (daily.groupBy("location")
                 .agg(
                     F.avg("daily_energy_kwh").alias("avg_daily_energy_kwh"),
                     F.avg("daily_avg_carbon_kg").alias("avg_daily_carbon_kg"),
                     F.avg("daily_peak_kwh").alias("avg_daily_peak_kwh"),
                     F.avg("daily_std_kwh").alias("avg_daily_std_kwh"),
                     F.count("*").alias("days_of_data")
                 )
                )

display(location_kpis)

location,avg_daily_energy_kwh,avg_daily_carbon_kg,avg_daily_peak_kwh,avg_daily_std_kwh,days_of_data
Home,38.8751935483871,0.1672088933691755,2.3808064516129037,0.283898266476068,31
Office,103.0244193548387,0.2535784370199692,6.279645161290323,0.5951447700562937,31
Factory,120.08335483870968,0.5189619815668203,7.643419354838709,0.7531987063199371,31


##4. Feature engineering for modeling(per-hour forecasting)

In [0]:
# ============================================
# FORECASTING (FREE TIER SAFE)
# ============================================

import pyspark.sql.functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# 1) Feature Engineering
model_df = (
    hourly
    .withColumn("hour", F.hour("hour_ts"))
    .withColumn("day", F.dayofmonth("hour_ts"))
    .withColumn("dow", F.dayofweek("hour_ts"))
)

# 2) Downsample dataset to keep model size small
model_df_small = model_df.sample(fraction=0.01, seed=42)
print(f"Using {model_df_small.count()} records for training (1% sample).")

# 3) Assemble features
assembler = VectorAssembler(
    inputCols=["hour", "day", "dow"],
    outputCol="features"
)

data = assembler.transform(model_df_small).select("features", "hour_energy_kwh")

# 4) Train-test split
train, test = data.randomSplit([0.8, 0.2], seed=42)

# 5) Very compact Linear Regression (no summary)
lr = LinearRegression(
    featuresCol="features",
    labelCol="hour_energy_kwh",
    maxIter=5,
    regParam=0.1,
    elasticNetParam=0.5
)

model = lr.fit(train)   # Model now below 10MB ✅

# 6) Evaluate
pred = model.transform(test)

rmse = RegressionEvaluator(
    labelCol="hour_energy_kwh", predictionCol="prediction", metricName="rmse"
).evaluate(pred)

r2 = RegressionEvaluator(
    labelCol="hour_energy_kwh", predictionCol="prediction", metricName="r2"
).evaluate(pred)

print(f"✅ Forecast Model Trained Successfully")
print(f"RMSE: {rmse:.3f}, R2: {r2:.3f}")

display(
    pred.select("features", "hour_energy_kwh", "prediction").limit(50)
)


# 7) Save compact model to Unity Catalog volume
model.write().overwrite().save(
    "dbfs:/Volumes/data/energy_volume/energy_power_data/energy_forecast_lr_compact"
)
print("📦 Model saved to: dbfs:/Volumes/<catalog>/<schema>/<volume>/energy_forecast_lr_compact")

Using 21 records for training (1% sample).
✅ Forecast Model Trained Successfully
RMSE: 1.979, R2: -0.635


features,hour_energy_kwh,prediction
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.0"",""31.0"",""6.0""]}",3.837,3.787417972611084
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""4.0"",""24.0"",""6.0""]}",1.649,3.71237663106874
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""7.0"",""23.0"",""5.0""]}",2.0610000000000004,3.673898990081166
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""16.0"",""25.0"",""7.0""]}",5.745,2.552654195437764
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""22.0"",""20.0"",""2.0""]}",4.667000000000001,3.0791860364710217


📦 Model saved to: dbfs:/Volumes/<catalog>/<schema>/<volume>/energy_forecast_lr_compact


## 4. Feature importance(explainability)

In [0]:
# ============================================
# FEATURE IMPORTANCE FOR LINEAR REGRESSION MODEL
# ============================================

# Feature importance = absolute value of coefficients
coefficients = model.coefficients.toArray()
feature_names = ["hour", "day", "dow"]

print("Feature importance (|weights|):")
for name, coef in zip(feature_names, coefficients):
    print(f"{name}: {abs(coef):.4f}")

Feature importance (|weights|):
hour: 0.0799
day: 0.0349
dow: 0.1662


##5. Anomaly detection (device / location level)

In [0]:
# Sample aggregated hourly per location to Pandas (limit rows)
sample_pd = (hourly.select("hour_ts","location","hour_energy_kwh")
             .orderBy("hour_ts")
             .limit(10000)   # tune to fit memory
             .toPandas())

# Feature for anomaly detection: energy + hour of day
sample_pd["hour"] = pd.to_datetime(sample_pd["hour_ts"]).dt.hour
X = sample_pd[["hour_energy_kwh","hour"]].values

iso = IsolationForest(contamination=0.01, random_state=42)
sample_pd["anomaly_score"] = iso.fit_predict(X)  # -1 = anomaly, 1 = normal
anomalies_pd = sample_pd[sample_pd["anomaly_score"] == -1]

# Convert anomalies back to Spark if desired
anomalies_spark = spark.createDataFrame(anomalies_pd)
display(anomalies_spark.limit(20))

hour_ts,location,hour_energy_kwh,hour,anomaly_score
2025-10-12T10:00:00.000Z,Office,9.425,10,-1
2025-10-17T04:00:00.000Z,Factory,8.302999999999999,4,-1
2025-10-23T19:00:00.000Z,Office,7.830999999999999,19,-1
2025-10-23T21:00:00.000Z,Factory,10.091,21,-1
2025-10-25T08:00:00.000Z,Factory,8.527,8,-1
2025-10-28T01:00:00.000Z,Office,10.516,1,-1
2025-10-28T02:00:00.000Z,Factory,13.525000000000002,2,-1
2025-10-28T17:00:00.000Z,Factory,8.414,17,-1
2025-10-29T21:00:00.000Z,Office,8.815000000000001,21,-1
2025-10-30T09:00:00.000Z,Factory,10.449000000000002,9,-1


##6. Compute ESG score per location (interpretable weighted score)

In [0]:
# First compute anomaly counts per location (from anomalies_spark)
anomaly_counts = (anomalies_spark.groupBy("location")
                  .agg(F.count("*").alias("anomaly_count"))
                  .withColumnRenamed("location","loc")
                 )

# Prepare metrics table (join location_kpis + anomalies)
loc_metrics = location_kpis.alias("lk").join(
    anomaly_counts.alias("ac"),
    F.col("lk.location") == F.col("ac.loc"),
    how="left"
).select(
    F.col("lk.location"),
    "avg_daily_energy_kwh","avg_daily_carbon_kg","avg_daily_peak_kwh","avg_daily_std_kwh",
    F.coalesce(F.col("ac.anomaly_count"), F.lit(0)).alias("anomaly_count")
)

# Normalize columns to 0-1 (min-max)
# compute mins and maxs
stats = loc_metrics.select(
    F.min("avg_daily_energy_kwh").alias("min_e"),
    F.max("avg_daily_energy_kwh").alias("max_e"),
    F.min("avg_daily_carbon_kg").alias("min_c"),
    F.max("avg_daily_carbon_kg").alias("max_c"),
    F.min("avg_daily_peak_kwh").alias("min_p"),
    F.max("avg_daily_peak_kwh").alias("max_p"),
    F.min("anomaly_count").alias("min_a"),
    F.max("anomaly_count").alias("max_a")
).collect()[0]

min_e, max_e = stats["min_e"], stats["max_e"]
min_c, max_c = stats["min_c"], stats["max_c"]
min_p, max_p = stats["min_p"], stats["max_p"]
min_a, max_a = stats["min_a"], stats["max_a"]

# Avoid divide by zero by adding small epsilon
eps = 1e-9
scored = (loc_metrics
          .withColumn("s_energy", (F.col("avg_daily_energy_kwh") - F.lit(min_e)) / (F.lit(max_e - min_e + eps)))
          .withColumn("s_carbon", (F.col("avg_daily_carbon_kg") - F.lit(min_c)) / (F.lit(max_c - min_c + eps)))
          .withColumn("s_peak", (F.col("avg_daily_peak_kwh") - F.lit(min_p)) / (F.lit(max_p - min_p + eps)))
          .withColumn("s_anomaly", (F.col("anomaly_count") - F.lit(min_a)) / (F.lit(max_a - min_a + eps)))
         )

# Weighted ESG "risk" score (higher = worse) — you can invert to make "ESG rating"
w_energy, w_carbon, w_peak, w_anomaly = 0.35, 0.35, 0.2, 0.1
scored = scored.withColumn("esg_risk_score",
                           F.round(w_energy*F.col("s_energy") + w_carbon*F.col("s_carbon") + w_peak*F.col("s_peak") + w_anomaly*F.col("s_anomaly"), 4)
                          )

# Convert risk to rating 0-100 (lower risk -> higher rating)
scored = scored.withColumn("esg_rating", F.round((1 - F.col("esg_risk_score")) * 100, 2))

display(scored.select("location","avg_daily_energy_kwh","avg_daily_carbon_kg","anomaly_count","esg_risk_score","esg_rating"))

location,avg_daily_energy_kwh,avg_daily_carbon_kg,anomaly_count,esg_risk_score,esg_rating
Office,103.02441935483871,0.2535784370199693,7,0.556,44.4
Home,38.8751935483871,0.1672088933691756,2,0.0,100.0
Factory,120.08335483870968,0.5189619815668204,13,1.0,0.0


##7. Save ESG report (CSV) for submission or download

In [0]:
# Save CSV in Workspace Files (parallel to notebook)
pd_report = scored.select(
    "location",
    "avg_daily_energy_kwh",
    "avg_daily_carbon_kg",
    "anomaly_count",
    "esg_risk_score",
    "esg_rating"
).toPandas()

pd_report.to_csv(
    "esg_report.csv",
    index=False
)

display(pd_report)

location,avg_daily_energy_kwh,avg_daily_carbon_kg,anomaly_count,esg_risk_score,esg_rating
Office,103.02441935483871,0.2535784370199693,7,0.556,44.4
Home,38.8751935483871,0.1672088933691756,2,0.0,100.0
Factory,120.08335483870968,0.5189619815668204,13,1.0,0.0
