# Energy Data Analytics - Delta Lake Queries

This notebook demonstrates reading Delta format data and executing three core energy analytics queries using Spark SQL:

1. **Daily Production Trends** - Daily electricity production by production type
2. **Underperformance Prediction Features** - ML features for energy production forecasting
3. **Wind Price Analysis** - Wind power production vs electricity prices

## Data Sources
- Delta tables in the Gold layer: `gold_fact_power`, `gold_dim_production_type`, `gold_fact_power_30min_agg`

## 1. Setup Spark Session and Delta Lake

In [8]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType

# Try to initialize a minimal Spark session
try:
    # Stop any existing session first
    if 'spark' in globals() and spark is not None:
        spark.stop()
    
    # Initialize with minimal configuration
    spark = ((SparkSession.builder.master("local[*]")
    .appName("NotebookQueries")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.0.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")
    .config("spark.sql.warehouse.dir", "./delta_lake")
    .config("spark.driver.memory", "4g")
    .config("spark.driver.maxResultSize", "2g")
    .config("spark.sql.shuffle.partitions", "200")
    .config("spark.sql.autoBroadcastJoinThreshold", "10485760")
    .config("spark.executor.memory", "4g")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate()))
    
    print("✅ Spark session initialized successfully!")
    print(f"Spark Version: {spark.version}")
    print(f"Application Name: {spark.sparkContext.appName}")
    
    # Set log level to reduce verbosity
    spark.sparkContext.setLogLevel("WARN")
    
except Exception as e:
    print(f"❌ Failed to initialize Spark session: {e}")
    print("\n📝 Note: If Spark initialization fails in the notebook environment,")
    print("   you can still verify the pipeline functionality by running:")
    print("   python run_notebook_queries.py")
    spark = None

Ivy Default Cache set to: /Users/srsu/.ivy2/cache
The jars for the packages stored in: /Users/srsu/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e648c4f8-5ead-4e61-8345-bd5cafca5404;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.0.0 in central
	found io.delta#delta-storage;2.0.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in local-m2-cache
:: resolution report :: resolve 86ms :: artifacts dl 3ms
	:: modules in use:
	io.delta#delta-core_2.12;2.0.0 from central in [default]
	io.delta#delta-storage;2.0.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	org.codehaus.jackson#jackson-core-asl;1.9.13 from local-m2-cache in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|

:: loading settings :: url = jar:file:/Users/srsu/spark-3.2.0-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


25/07/03 23:57:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


✅ Spark session initialized successfully!
Spark Version: 3.2.0
Application Name: NotebookQueries


## 2. Read Delta Lake Tables

In [9]:
# Define Delta table paths (update these paths to match your environment)
delta_base_path = "/Users/srsu/Documents/Personal/git/delta_pipeline/delta_lake"

# Read Delta tables from the Gold layer
print("📊 Reading Delta tables...")

try:
    # Check if spark session exists
    if spark is None:
        print("❌ Spark session not initialized. Please run the first cell.")
    else:
        # Read dimension table
        gold_dim_production_type = spark.read.format("delta").load(f"{delta_base_path}/gold/dim_production_type")
        gold_dim_production_type.createOrReplaceTempView("gold_dim_production_type")
        print(f"✅ Loaded gold_dim_production_type: {gold_dim_production_type.count()} records")
        
        # Show schema of dimension table
        print("\nDimension table schema:")
        gold_dim_production_type.printSchema()
        
        # Read daily fact table for power generation
        gold_fact_power = spark.read.format("delta").load(f"{delta_base_path}/gold/fact_power")
        gold_fact_power.createOrReplaceTempView("gold_fact_power")
        print(f"✅ Loaded gold_fact_power: {gold_fact_power.count()} records")
        
        # Read 30-minute aggregated fact table  
        gold_fact_power_30min_agg = spark.read.format("delta").load(f"{delta_base_path}/gold/fact_power_30min_agg")
        gold_fact_power_30min_agg.createOrReplaceTempView("gold_fact_power_30min_agg")
        print(f"✅ Loaded gold_fact_power_30min_agg: {gold_fact_power_30min_agg.count()} records")
        
        print("✅ Successfully loaded all Delta tables")
    
except Exception as e:
    print(f"❌ Error loading Delta tables: {str(e)}")
    print("Please ensure the pipeline has been run and Delta tables exist")


📊 Reading Delta tables...


                                                                                

✅ Loaded gold_dim_production_type: 15 records

Dimension table schema:
root
 |-- production_type_id: string (nullable = true)
 |-- production_type: string (nullable = true)
 |-- energy_category: string (nullable = true)
 |-- controllability_type: string (nullable = true)
 |-- description: string (nullable = true)
 |-- country: string (nullable = true)
 |-- effective_date: date (nullable = true)
 |-- expiry_date: date (nullable = true)
 |-- active_flag: boolean (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- updated_at: timestamp (nullable = true)

✅ Loaded gold_fact_power: 86360 records
✅ Loaded gold_fact_power_30min_agg: 43180 records
✅ Successfully loaded all Delta tables


## 3. Query 1: Daily Production Trends

This query analyzes daily electricity production trends by production type.

In [10]:
# Query 1: Daily Production Trends
daily_production_query = """
SELECT
  f.year,
  f.month,
  f.day,
  d.production_type AS production_type,
  SUM(f.electricity_produced) AS total_daily_production
FROM gold_fact_power f
JOIN gold_dim_production_type d ON f.production_type_id = d.production_type_id
WHERE f.country = 'de'
GROUP BY f.year, f.month, f.day, d.production_type
ORDER BY f.year, f.month, f.day, d.production_type
"""

print("🔍 Executing Query 1: Daily Production Trends")
print("=" * 50)

try:
    if spark is None:
        print("❌ Spark session not initialized. Please run the first cell.")
    else:
        daily_trends_df = spark.sql(daily_production_query)
        
        print(f"Query executed successfully!")
        print(f"Results: {daily_trends_df.count()} records found")
        
        print("\nSample Results:")
        daily_trends_df.show(10, truncate=False)
        
        # Show schema
        print("\nSchema:")
        daily_trends_df.printSchema()
        
except Exception as e:
    print(f"Query failed: {e}")
    print("This is expected if Delta tables don't exist or Spark session is not initialized")

🔍 Executing Query 1: Daily Production Trends
Query executed successfully!
Results: 810 records found

Sample Results:
+----+-----+---+---------------+----------------------+
|year|month|day|production_type|total_daily_production|
+----+-----+---+---------------+----------------------+
|2025|1    |1  |Biomass        |382260.29999999993    |
|2025|1    |1  |Fossil         |348234.4999999999     |
|2025|1    |1  |Geothermal     |2159.8999999999996    |
|2025|1    |1  |Hydro          |20687.500000000004    |
|2025|1    |1  |Load           |4721965.6000000015    |
|2025|1    |1  |Others         |19598.2               |
|2025|1    |1  |Solar          |215798.59999999998    |
|2025|1    |1  |Waste          |95757.70000000001     |
|2025|1    |1  |Wind           |3517545.0999999987    |
|2025|1    |2  |Biomass        |405472.7              |
+----+-----+---+---------------+----------------------+
only showing top 10 rows


Schema:
root
 |-- year: integer (nullable = true)
 |-- month: integer (

## 4. Query 2: Underperformance Prediction Features

This query generates ML features for predicting energy production underperformance with lag features and rolling averages.

In [13]:
# Query 2: Underperformance Prediction Features
underperformance_query = """
SELECT
    f.timestamp_30min,
    f.production_type_id,
    d.production_type,
    d.energy_category,
    d.controllability_type,
    f.total_electricity_produced,
    f.year, f.month, f.day, f.hour, f.minute_interval_30,
    LAG(f.total_electricity_produced, 48) OVER (PARTITION BY f.production_type_id ORDER BY f.timestamp_30min) AS lag_1d,
    LAG(f.total_electricity_produced, 336) OVER (PARTITION BY f.production_type_id ORDER BY f.timestamp_30min) AS lag_1w,
    AVG(f.total_electricity_produced) OVER (
        PARTITION BY f.production_type_id, f.hour, f.minute_interval_30
        ORDER BY f.timestamp_30min
        ROWS BETWEEN 336 PRECEDING AND 1 PRECEDING
    ) AS rolling_7d_avg
FROM gold_fact_power_30min_agg f
JOIN gold_dim_production_type d ON f.production_type_id = d.production_type_id
WHERE f.country = 'de' AND d.active_flag = TRUE
LIMIT 20
"""

print("🔍 Executing Query 2: ML Features for Underperformance Prediction")
print("=" * 65)

try:
    if spark is None:
        print("❌ Spark session not initialized. Please run the first cell.")
    else:
        underperformance_query_df = spark.sql(underperformance_query)
        
        print(f"Query executed successfully!")
        print(f"Results: {underperformance_query_df.count()} records found")
        
        print("\nSample ML Features:")
        underperformance_query_df.show(5, truncate=False)
        
        # Show feature statistics (only for non-null values)
        print("\nFeature Statistics:")
        feature_stats = underperformance_query_df.select("total_electricity_produced", "lag_1d", "lag_1w", "rolling_7d_avg").describe()
        feature_stats.show()
        
except Exception as e:
    print(f"Query failed: {e}")
    print("This is expected if 30-minute Delta table doesn't exist or Spark session is not initialized")

🔍 Executing Query 2: ML Features for Underperformance Prediction
Query executed successfully!
Results: 20 records found

Sample ML Features:
+-------------------+----------------------------------------------------------------+---------------+---------------+--------------------+--------------------------+----+-----+---+----+------------------+-----------------+------+-----------------+
|timestamp_30min    |production_type_id                                              |production_type|energy_category|controllability_type|total_electricity_produced|year|month|day|hour|minute_interval_30|lag_1d           |lag_1w|rolling_7d_avg   |
+-------------------+----------------------------------------------------------------+---------------+---------------+--------------------+--------------------------+----+-----+---+----+------------------+-----------------+------+-----------------+
|2025-01-01 00:00:00|49e077857baab80a41e125aaa88b8ba9f239673011a8ddf982a790d7d6176dd5|Hydro          |Renewable 

## 5. Query 3: Wind Price Analysis

This query analyzes the relationship between wind power production (offshore and onshore) and electricity prices.

In [31]:
# Query 3: Wind Price Analysis
wind_price_query = """
SELECT
  f.year, f.month, f.day,
  d.production_type AS production_type,
  SUM(f.electricity_produced) AS total_daily_production_mw,
  AVG(f.electricity_price) AS avg_daily_price_eur_per_mwh
FROM gold_fact_power f
JOIN gold_dim_production_type d ON f.production_type_id = d.production_type_id
WHERE f.country = 'de'
  AND d.production_type LIKE '%Wind%'
  AND d.active_flag = TRUE 
GROUP BY f.year, f.month, f.day, d.production_type
ORDER BY f.year, f.month, f.day, d.production_type
"""

print("🔍 Executing Query 3: Wind Power vs Price Analysis")
print("=" * 50)

try:
    if spark is None:
        print("❌ Spark session not initialized. Please run the first cell.")
    else:
        wind_analysis_df = spark.sql(wind_price_query)
        
        print(f"Query executed successfully!")
        print(f"Results: {wind_analysis_df.count()} records found")
        
        print("\nWind Power vs Price Results:")
        wind_analysis_df.show(10, truncate=False)
        
        # Summary statistics by wind type
        if wind_analysis_df.count() > 0:
            print("\nSummary by Wind Type:")
            wind_summary = wind_analysis_df.groupBy("production_type").agg(
                avg("total_daily_production_mw").alias("avg_production"),
                avg("avg_daily_price_eur_per_mwh").alias("avg_price"),
                count("*").alias("total_days")
            )
            wind_summary.show(truncate=False)
        
except Exception as e:
    print(f"Query failed: {e}")
    print("This is expected if Delta tables don't exist or Spark session is not initialized")

🔍 Executing Query 3: Wind Power vs Price Analysis
Query executed successfully!
Results: 90 records found

Wind Power vs Price Results:
+----+-----+---+---------------+-------------------------+---------------------------+
|year|month|day|production_type|total_daily_production_mw|avg_daily_price_eur_per_mwh|
+----+-----+---+---------------+-------------------------+---------------------------+
|2025|1    |1  |Wind           |3517545.0999999987       |0.3966666666666667         |
|2025|1    |2  |Wind           |1968775.3000000007       |38.32111111111111          |
|2025|1    |3  |Wind           |2640196.9999999995       |84.20333333333332          |
|2025|1    |4  |Wind           |1010095.4999999998       |102.16777777777777         |
|2025|1    |5  |Wind           |1925518.5000000002       |91.67555555555555          |
|2025|1    |6  |Wind           |3639525.7                |21.005555555555553         |
|2025|1    |7  |Wind           |3380889.7000000007       |29.265555555555554      