# Energy Data Query Notebook

This notebook queries the Delta tables created by the data ingestion script.

In [29]:
# Import required libraries
from pyspark.sql import SparkSession

# Initialize Spark session with Delta support
spark = SparkSession.builder \
    .appName("Energy Data Query") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .getOrCreate()

In [30]:
# Define the paths to the Delta tables (adjust these paths as per your config)
STG_public_power_path = "/workspaces/baywa-data-pipeline/Data/Staging_Schema/public_power"  # Replace with your actual path
STG_price_path = "/workspaces/baywa-data-pipeline/data/Staging_Schema/price"                  # Replace with your actual path
STG_installed_power_path = "/workspaces/baywa-data-pipeline/data/Staging_Schema/installed_power"  # Replace with your actual path

In [36]:
# Querying public power data
stg_public_power_df = spark.read.format("delta").load(STG_public_power_path)
stg_public_power_df.createOrReplaceTempView("stg_public_power")

# Example SQL query to get average production values
stg_public_power = spark.sql("""
    SELECT *
    FROM stg_public_power
    LIMIT 100
""")
stg_public_power.show()

+-------+----------+--------------------+----------+-------------------+-------+
|country|       end|     production_type|     start|          timestamp|  value|
+-------+----------+--------------------+----------+-------------------+-------+
|     de|2024-01-01|Hydro pumped stor...|2024-01-01|2024-01-01 15:00:00|-1683.1|
|     de|2024-01-01|Hydro pumped stor...|2024-01-01|2024-01-01 15:00:00|-2070.5|
|     de|2024-01-01|Hydro pumped stor...|2024-01-01|2024-01-01 15:00:00|-2412.2|
|     de|2024-01-01|Hydro pumped stor...|2024-01-01|2024-01-01 15:00:00|-2535.5|
|     de|2024-01-01|Hydro pumped stor...|2024-01-01|2024-01-01 15:00:00|-3244.7|
|     de|2024-01-01|Hydro pumped stor...|2024-01-01|2024-01-01 15:00:00|-2564.3|
|     de|2024-01-01|Hydro pumped stor...|2024-01-01|2024-01-01 15:00:00|-2578.4|
|     de|2024-01-01|Hydro pumped stor...|2024-01-01|2024-01-01 15:00:00|-2614.3|
|     de|2024-01-01|Hydro pumped stor...|2024-01-01|2024-01-01 15:00:00|-3377.2|
|     de|2024-01-01|Hydro pu

In [33]:
# Querying public power data
stg_price_df = spark.read.format("delta").load(STG_price_path)
stg_price_df.createOrReplaceTempView("stg_price")

# Example SQL query to get average production values
stg_price = spark.sql("""
    SELECT *
    FROM stg_price
    LIMIT 100
""")
stg_price.show()

+-----+----------+-----+----------+-------------------+-------------------+
|  bzn|       end|price|     start|          timestamp|               unit|
+-----+----------+-----+----------+-------------------+-------------------+
|DE-LU|2024-01-01| 2.24|2024-01-01|2024-01-01 11:00:00|EUR / megawatt_hour|
|DE-LU|2024-01-01| 1.96|2024-01-01|2024-01-01 12:00:00|EUR / megawatt_hour|
|DE-LU|2024-01-01|48.01|2024-01-01|2024-01-01 19:00:00|EUR / megawatt_hour|
|DE-LU|2024-01-01|42.95|2024-01-01|2024-01-01 20:00:00|EUR / megawatt_hour|
|DE-LU|2024-01-01| 1.04|2024-01-01|2024-01-01 13:00:00|EUR / megawatt_hour|
|DE-LU|2024-01-01| 3.66|2024-01-01|2024-01-01 14:00:00|EUR / megawatt_hour|
|DE-LU|2024-01-01|43.06|2024-01-01|2024-01-01 15:00:00|EUR / megawatt_hour|
|DE-LU|2024-01-01|49.93|2024-01-01|2024-01-01 16:00:00|EUR / megawatt_hour|
|DE-LU|2024-01-01| 58.0|2024-01-01|2024-01-01 17:00:00|EUR / megawatt_hour|
|DE-LU|2024-01-01|54.25|2024-01-01|2024-01-01 18:00:00|EUR / megawatt_hour|
|DE-LU|2024-

In [37]:
# Querying public power data
stg_installed_power_df = spark.read.format("delta").load(STG_installed_power_path)
stg_installed_power_df.createOrReplaceTempView("stg_price")

# Example SQL query to get average production values
stg_installed_power = spark.sql("""
    SELECT *
    FROM stg_price
    LIMIT 100
""")
stg_installed_power.show()

+-------+-------------------------+---------------+---------------+---------+----+
|country|installation_decommission|installed_power|production_type|time_step|year|
+-------+-------------------------+---------------+---------------+---------+----+
|     de|                    false|          5.406|  Wind offshore|   yearly|2005|
|     de|                    false|          6.393|  Wind offshore|   yearly|2005|
|     de|                    false|          7.555|  Wind offshore|   yearly|2005|
|     de|                    false|          7.787|  Wind offshore|   yearly|2005|
|     de|                    false|          7.787|  Wind offshore|   yearly|2005|
|     de|                    false|          8.129|  Wind offshore|   yearly|2005|
|     de|                    false|          8.473|  Wind offshore|   yearly|2005|
|     de|                    false|          9.215|  Wind offshore|   yearly|2005|
|     de|                    false|         11.976|   Wind onshore|   yearly|2005|
|   

In [41]:
# Define the paths to the Delta tables (adjust these paths as per your config)
CDC_public_power_path = "/workspaces/baywa-data-pipeline/Data/CDC_Schema/public_power"  # Replace with your actual path
CDC_price_path = "/workspaces/baywa-data-pipeline/data/CDC_Schema/price"                  # Replace with your actual path
CDC_installed_power_path = "/workspaces/baywa-data-pipeline/data/CDC_Schema/installed_power"  # Replace with your actual path

In [46]:
# Querying public power data
public_power_df = spark.read.format("delta").load(CDC_public_power_path)
public_power_df.createOrReplaceTempView("public_power")

# Example SQL query to get average production values
average_public_power = spark.sql("""
    SELECT *
    FROM public_power
    LIMIT 100
""")
average_public_power.show()

24/11/08 11:29:13 WARN DeltaLog: Change in the table id detected while updating snapshot. 
Previous snapshot = Snapshot(path=file:/workspaces/baywa-data-pipeline/Data/CDC_Schema/public_power/_delta_log, version=0, metadata=Metadata(b896bb0f-4708-419b-9d85-f94a1c9a4067,null,null,Format(parquet,Map()),{"type":"struct","fields":[{"name":"country","type":"string","nullable":true,"metadata":{}},{"name":"end","type":"string","nullable":true,"metadata":{}},{"name":"production_type","type":"string","nullable":true,"metadata":{}},{"name":"start","type":"string","nullable":true,"metadata":{}},{"name":"timestamp","type":"timestamp","nullable":true,"metadata":{}},{"name":"value","type":"double","nullable":true,"metadata":{}}]},List(),Map(),Some(1731064740222)), logSegment=LogSegment(file:/workspaces/baywa-data-pipeline/Data/CDC_Schema/public_power/_delta_log,0,WrappedArray(DeprecatedRawLocalFileStatus{path=file:/workspaces/baywa-data-pipeline/Data/CDC_Schema/public_power/_delta_log/000000000000000

+-------+----------+--------------------+----------+-------------------+--------+
|country|       end|     production_type|     start|          timestamp|   value|
+-------+----------+--------------------+----------+-------------------+--------+
|     de|2024-01-01|Cross border elec...|2024-01-01|2024-01-01 08:30:00|-11869.6|
|     de|2024-01-01|                Load|2024-01-01|2024-01-01 07:00:00| 40592.6|
|     de|2024-01-01|       Wind offshore|2024-01-01|2024-01-01 07:45:00|  5838.9|
|     de|2024-01-01|                Load|2024-01-01|2024-01-01 07:30:00| 40592.6|
|     de|2024-01-01|       Residual load|2024-01-01|2024-01-01 08:15:00|  5384.3|
|     de|2024-01-01|          Fossil gas|2024-01-01|2024-01-01 07:00:00|  2680.2|
|     de|2024-01-01|             Biomass|2024-01-01|2024-01-01 08:15:00|  4180.6|
|     de|2024-01-01|Cross border elec...|2024-01-01|2024-01-01 08:45:00|-11869.6|
|     de|2024-01-01|Hydro water reser...|2024-01-01|2024-01-01 07:30:00|    86.4|
|     de|2024-01

In [3]:
# Querying public power data
public_power_df = spark.read.format("delta").load(public_power_path)
public_power_df.createOrReplaceTempView("public_power")

# Example SQL query to get average production values
average_public_power = spark.sql("""
    SELECT production_type, AVG(value) AS average_value
    FROM public_power
    GROUP BY production_type
""")
average_public_power.show()

24/10/30 10:00:17 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 9:>                                                        (0 + 12) / 12]

+--------------------+-------------------+
|     production_type|      average_value|
+--------------------+-------------------+
|               Waste| 1115.4239583333328|
|          Fossil gas| 2713.9281249999976|
|          Fossil oil| 392.81145833333363|
|Hydro water reser...|  75.12604166666678|
|Renewable share o...|  92.49791666666657|
|               Solar| 1135.5500000000018|
|Renewable share o...|  80.11875000000002|
|       Wind offshore|  5707.162499999985|
|             Nuclear|               null|
|Cross border elec...| -6802.878124999999|
|Hydro pumped storage| 1050.1208333333345|
|          Geothermal|  20.22395833333327|
|  Hydro Run-of-River| 2080.4947916666647|
|        Wind onshore| 26499.397916666672|
|       Residual load|  10486.62499999999|
|Fossil brown coal...| 3250.2406250000004|
|              Others|  211.0635416666664|
|    Fossil hard coal| 1713.7614583333345|
|             Biomass|  4531.489583333334|
|Hydro pumped stor...|-2010.4166666666622|
+----------

                                                                                

In [14]:
# Querying price data
price_df = spark.read.format("delta").load(price_path)
price_df.createOrReplaceTempView("price")

# Example SQL query to get maximum price
max_price = spark.sql("""
    SELECT *
    FROM price
""")
max_price.show()

                                                                                

+------+-------------------+-------------------+
| price|          timestamp|               unit|
+------+-------------------+-------------------+
|115.34|2024-10-30 21:00:00|EUR / megawatt_hour|
|112.37|2024-10-30 22:00:00|EUR / megawatt_hour|
|102.53|2024-10-30 11:00:00|EUR / megawatt_hour|
|106.95|2024-10-30 12:00:00|EUR / megawatt_hour|
| 160.0|2024-10-30 15:00:00|EUR / megawatt_hour|
|193.88|2024-10-30 16:00:00|EUR / megawatt_hour|
| 115.0|2024-10-30 09:00:00|EUR / megawatt_hour|
|109.68|2024-10-30 10:00:00|EUR / megawatt_hour|
|123.54|2024-10-30 05:00:00|EUR / megawatt_hour|
|139.68|2024-10-30 06:00:00|EUR / megawatt_hour|
| 98.16|2024-10-30 01:00:00|EUR / megawatt_hour|
| 98.05|2024-10-30 02:00:00|EUR / megawatt_hour|
|111.31|2024-10-30 13:00:00|EUR / megawatt_hour|
|126.16|2024-10-30 14:00:00|EUR / megawatt_hour|
|192.89|2024-10-30 17:00:00|EUR / megawatt_hour|
|159.44|2024-10-30 18:00:00|EUR / megawatt_hour|
|100.71|2024-10-29 23:00:00|EUR / megawatt_hour|
|100.52|2024-10-30 0

In [13]:
# Querying price data
price_df = spark.read.format("delta").load(price_path)
price_df.createOrReplaceTempView("price")

# Example SQL query to get maximum price
max_price = spark.sql("""
    SELECT MAX(price) AS max_price
    FROM price
""")
max_price.show()

                                                                                

+---------+
|max_price|
+---------+
|   193.88|
+---------+



In [16]:
# Querying installed power data
installed_power_df = spark.read.format("delta").load(installed_power_path)
installed_power_df.createOrReplaceTempView("installed_power")

# Example SQL query to get total installed power by year
total_installed_power = spark.sql("""
    SELECT year, SUM(installed_power) AS total_power
    FROM installed_power
    GROUP BY year
    ORDER BY year
""")
total_installed_power.show()


+----+------------------+
|year|       total_power|
+----+------------------+
|2002| 4270.264999999997|
|2003| 4270.264999999997|
|2004| 4270.264999999997|
|2005| 4270.264999999998|
|2006| 4270.264999999997|
|2007|4270.2649999999985|
|2008| 4270.264999999997|
|2009|          4270.265|
|2010| 4270.264999999997|
|2011| 4270.264999999999|
|2012| 4270.264999999997|
|2013|          4270.265|
|2014| 4270.264999999997|
|2015| 4270.264999999999|
|2016| 4270.264999999997|
|2017| 4270.264999999999|
|2018| 4270.264999999997|
|2019| 4270.264999999999|
|2020| 4270.264999999997|
|2021| 4270.264999999999|
+----+------------------+
only showing top 20 rows

