In [1]:
hours = ""

In [2]:
# Parameters
hours = 24


In [3]:
import os
import requests
import json
from dotenv import load_dotenv
from datetime import datetime, timedelta, timezone
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, posexplode, col, size, from_unixtime, to_date, concat_ws, lit, sum
from pyspark.sql.types import StructType, StructField, StringType, FloatType, BooleanType, ArrayType, LongType


In [4]:
load_dotenv()
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home"


In [5]:
# Get Jar path needed for spark session
# For simplicity using locally downloaded jars for delta format
cwd = os.getcwd()
if cwd.endswith("notebooks"):
    proj_dir = os.path.abspath("..")
else:
    proj_dir = cwd
jar_dir = os.path.join(proj_dir, "jars")
jar1 = os.path.join(jar_dir, "delta-spark_2.13-4.0.0.jar")
jar2 = os.path.join(jar_dir, "delta-storage-4.0.0.jar")

In [6]:
spark = SparkSession.builder.appName("EnergyUseCase") \
            .config("spark.jars", f"{jar1},{jar2}") \
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
            .getOrCreate()





25/06/29 19:21:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Data Ingestion

In [7]:
# API Configuration
BASE_URL = "https://api.energy-charts.info"
ENDPOINT_CONFIG = {
    "public_power": {
        "path": "/public_power",
        "params": ["country", "start", "end"]
    },
    "installed_power": {
        "path": "/installed_power",
        "params": ["country", "time_step", "installation_decommission"]
    },
    "price": {
        "path": "/price",
        "params": ["bzn", "start", "end"]
    },
}

In [8]:
# Pipeline Parameters
now_utc = datetime.now(timezone.utc)
back_utc = now_utc - timedelta(hours=hours)  # <- set it here when running notebook locally
start_time = back_utc.strftime("%Y-%m-%d %H:%M:%S")
end_time = now_utc.strftime("%Y-%m-%d %H:%M:%S")
# For Testing locally fetched 2 months data
#start_time = "2025-04-01 00:00"
#end_time = "2025-05-30 23:59"
country = "de"
bidding_zone = "DE-LU"

In [9]:
def fetch_api_data(endpoint_name, **kwargs):
    """
    Fetch JSON data from the energy charts 
    """

    if endpoint_name not in ENDPOINT_CONFIG:
        raise ValueError(f"Unsupported endpoint: {endpoint_name}")
    
    config = ENDPOINT_CONFIG[endpoint_name]
    path = config["path"]
    required_params = config["params"]
    missing = [p for p in required_params if p not in kwargs]
    if missing:
        raise ValueError(f"Missing required params: {missing} for endpoint '{endpoint_name}'")
    params = {k: v for k, v in kwargs.items() if k in required_params}
    url = f"{BASE_URL}{path}"
    try:
        print(f"Fetching data from api with these params - {params}")
        response = requests.get(url, params=params, verify=False)
        print(f"Status Code: {response.status_code}")
        data = response.json() 
        if not data:
            print("Empty response received.")
            return None
        return data
    except Exception as e:
        print(f"Unexpected error: {e}")
        return None

## Get public power data

In [10]:
# Fetch public power
public_power = fetch_api_data("public_power",
                            country=country,
                            start=start_time,
                            end=end_time
                        )
# Create raw dataframe 
if public_power is not None and isinstance(public_power, dict):
    # Define expected schema
    public_power_schema = StructType([
        StructField("unix_seconds", ArrayType(LongType()), True),
        StructField("production_types", ArrayType(
            StructType([
                StructField("name", StringType(), True),
                StructField("data", ArrayType(FloatType()), True)
            ])
        ), True),
        StructField("deprecated", BooleanType(), True)
    ])
    public_power_df = spark.createDataFrame([public_power], schema=public_power_schema)
    public_power_df.show(3)

Fetching data from api with these params - {'country': 'de', 'start': '2025-06-28 17:21:32', 'end': '2025-06-29 17:21:32'}




Status Code: 200


[Stage 0:>                                                          (0 + 1) / 1]

                                                                                

+--------------------+--------------------+----------+
|        unix_seconds|    production_types|deprecated|
+--------------------+--------------------+----------+
|[1751124600, 1751...|[{Hydro pumped st...|     false|
+--------------------+--------------------+----------+



In [11]:

""" 
Transform public power data and write to storage 
"""
public_power_df = public_power_df.withColumn("production_type", explode("production_types"))
public_power_df = public_power_df.select(
                col("unix_seconds"),
                col("production_type.name").alias("production_type"),
                posexplode(col("production_type.data")).alias("pos", "value")
                )
# Handle edge cases such as more data points / missing ts for data points
public_power_df = public_power_df.filter(col("pos") < size(col("unix_seconds")))
public_power_df = public_power_df.withColumn("ts", col("unix_seconds")[col("pos")])
public_power_df = public_power_df.select("ts", 
                                    "production_type", 
                                    col("value").alias("net_power_produced"))
public_power_data = public_power_df.withColumn("timestamp", from_unixtime(col("ts")).cast("timestamp")).drop("ts") 
public_power_data = public_power_data.dropDuplicates() 
# Write to storage in Delta format
public_power_data.write \
                .format("delta") \
                .mode("append") \
                .option("mergeSchema", "true") \
                .partitionBy("production_type") \
                .save(f"{proj_dir}/data/silver/public_power_data")  

public_power_data.show(2)
public_power_data.printSchema()




[Stage 5:>                                                          (0 + 1) / 1]

                                                                                

+--------------------+------------------+-------------------+
|     production_type|net_power_produced|          timestamp|
+--------------------+------------------+-------------------+
|Fossil brown coal...|            5366.6|2025-06-28 22:30:00|
|Fossil brown coal...|            5429.7|2025-06-28 23:45:00|
+--------------------+------------------+-------------------+
only showing top 2 rows
root
 |-- production_type: string (nullable = true)
 |-- net_power_produced: float (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [12]:
#public_power_data.select("timestamp").distinct().sort("timestamp").show(50)


## Get Price Data

In [13]:
# Fetch price data
price_data = fetch_api_data("price",
                            bzn=bidding_zone,
                            start=start_time,
                            end=end_time
                       )
# Create raw dataframe 
if price_data is not None and isinstance(price_data, dict):
    # Define price schema
    price_data_schema = StructType([
        StructField("license_info", StringType(), True),
        StructField("unix_seconds", ArrayType(LongType()), True),
        StructField("price", ArrayType(FloatType()), True),
        StructField("unit", StringType(), True),
        StructField("deprecated", BooleanType(), True)
    ])
    price_df = spark.createDataFrame([price_data], schema=price_data_schema)
    price_df.show(3)

Fetching data from api with these params - {'bzn': 'DE-LU', 'start': '2025-06-28 17:21:32', 'end': '2025-06-29 17:21:32'}




Status Code: 200


+--------------------+--------------------+--------------------+---------+----------+
|        license_info|        unix_seconds|               price|     unit|deprecated|
+--------------------+--------------------+--------------------+---------+----------+
|CC BY 4.0 (creati...|[1751126400, 1751...|[62.73, 95.03, 10...|EUR / MWh|     false|
+--------------------+--------------------+--------------------+---------+----------+



In [14]:
""" Transform price data """
price_df = price_df.select(
                posexplode("unix_seconds").alias("pos", "unix_ts"),
                col("price")[col("pos")].alias("price")
            )
price_data = price_df.withColumn("timestamp", from_unixtime(col("unix_ts")).cast("timestamp")).drop("unix_ts", "pos")
price_data = price_data.dropDuplicates()

# Write to storage in Delta format
price_data.write \
            .format("delta") \
            .mode("append") \
            .option("mergeSchema", "true") \
            .save(f"{proj_dir}/data/silver/price_data") 

price_data.show(2)
price_data.printSchema()

25/06/29 19:21:41 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-----+-------------------+
|price|          timestamp|
+-----+-------------------+
| -5.4|2025-06-29 12:00:00|
|95.07|2025-06-29 00:00:00|
+-----+-------------------+
only showing top 2 rows
root
 |-- price: float (nullable = true)
 |-- timestamp: timestamp (nullable = true)

