In [5]:
import sys
from pathlib import Path

cwd = Path.cwd()
root = cwd
while root.name not in ["src"] and (root / "src").exists() is False:
    if root.parent == root:
        break
    root = root.parent
sys.path.append(str(root / "src"))

import pandas as pd
from datetime import datetime, timedelta
import time
from pyspark.sql import SparkSession
from api.elhub_api import fetch_elhub_data
from cassandra.cluster import Cluster

In [4]:
spark = SparkSession.builder \
    .appName("ElhubBronze") \
    .master("local[*]") \
    .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.5.1") \
    .config("spark.cassandra.connection.host", "127.0.0.1") \
    .config("spark.cassandra.connection.port", "9042") \
    .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions") \
    .config("spark.sql.catalog.mycatalog", "com.datastax.spark.connector.datasource.CassandraCatalog") \
    .config("spark.cassandra.output.consistency.level", "ONE") \
    .config("spark.cassandra.connection.keepAliveMS", "60000") \
    .getOrCreate()

print("âœ… SparkSession started with Cassandra integration")

25/11/14 10:26:06 WARN Utils: Your hostname, Fabians-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.22 instead (on interface en0)
25/11/14 10:26:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/fabianheflo/.ivy2/cache
The jars for the packages stored in: /Users/fabianheflo/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c5a64fdb-12ff-46c8-a715-8762b6a2e124;1.0
	confs: [default]


:: loading settings :: url = jar:file:/Users/fabianheflo/UNI_courses/IND320/IND320/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found com.datastax.spark#spark-cassandra-connector_2.12;3.5.1 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.5.1 in central
	found org.scala-lang.modules#scala-collection-compat_2.12;2.11.0 in central
	found org.apache.cassandra#java-driver-core-shaded;4.18.1 in central
	found com.datastax.oss#native-protocol;1.5.1 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.4.1 in central
	found org.slf4j#slf4j-api;1.7.26 in central
	found io.dropwizard.metrics#metrics-core;4.1.18 in central
	found org.hdrhistogram#HdrHistogram;2.1.12 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central
	found org.apache.cassandra#java-driver-mapper-runtime;4.18.1 in central
	found org.apache.cassandra#java-driver-query-builder;4.18.1 in central
	found org.apache.commons#commons-lang3;3.10 in central
	found com.thoughtworks.paranamer#paranamer;2.8 in central
	found org.scala-lang#scala-reflect

âœ… SparkSession started with Cassandra integration


In [11]:
# Testing API connection and data fetching
start = datetime(2021, 1, 1)
end = start + timedelta(days=1)
df = fetch_elhub_data(start, end)
df.head()

Unnamed: 0,endTime,lastUpdatedTime,priceArea,productionGroup,quantityKwh,startTime,meteringgridarea
0,2021-01-01T01:00:00+01:00,2024-12-20T10:35:40+01:00,NO1,hydro,2507716.8,2021-01-01T00:00:00+01:00,NO1
1,2021-01-01T02:00:00+01:00,2024-12-20T10:35:40+01:00,NO1,hydro,2494728.0,2021-01-01T01:00:00+01:00,NO1
2,2021-01-01T03:00:00+01:00,2024-12-20T10:35:40+01:00,NO1,hydro,2486777.5,2021-01-01T02:00:00+01:00,NO1
3,2021-01-01T04:00:00+01:00,2024-12-20T10:35:40+01:00,NO1,hydro,2461176.0,2021-01-01T03:00:00+01:00,NO1
4,2021-01-01T05:00:00+01:00,2024-12-20T10:35:40+01:00,NO1,hydro,2466969.2,2021-01-01T04:00:00+01:00,NO1


In [12]:
# Connect to Cassandra
cluster = Cluster(['127.0.0.1'])
session = cluster.connect()

session.execute("""
CREATE TABLE IF NOT EXISTS elhub_data.production_raw (
    pricearea TEXT,
    productiongroup TEXT,
    starttime TIMESTAMP,
    endtime TIMESTAMP,
    quantitykwh DOUBLE,
    PRIMARY KEY ((pricearea), productiongroup, starttime)
);
""")

print("âœ… production_raw table created/verified.")

âœ… production_raw table created/verified.


In [15]:
# Delete existing data for clean slate (for testing)
session.execute("TRUNCATE elhub_data.production_raw")
print("ðŸ§¹ Cleared existing data from production_raw")

ðŸ§¹ Cleared existing data from production_raw


In [16]:
years = [2021, 2022, 2023, 2024]
price_areas = ["NO1","NO2","NO3","NO4","NO5"]

for year in years:

    start_date = datetime(year, 1, 1)
    end_date   = datetime(year, 12, 31)

    current = start_date
    batch = 1

    while current < end_date:

        next_batch = current + timedelta(days=7)

        # Last batch: stop exactly at end_date + 1 day
        if next_batch > end_date:
            next_batch = end_date + timedelta(days=1)

        print(f"Batch {batch}: {current.date()} â†’ {next_batch.date()}")

        df = fetch_elhub_data(current, next_batch)

        if not df.empty:
            df.columns = [c.lower() for c in df.columns]
            sdf = spark.createDataFrame(df)

            sdf.write \
                .format("org.apache.spark.sql.cassandra") \
                .mode("append") \
                .options(table="production_raw", keyspace="elhub_data") \
                .save()

            print(f"  Saved {len(df)} rows.")
        else:
            print("  No data.")

        current = next_batch
        batch += 1


Batch 1: 2021-01-01 â†’ 2021-01-08
  Saved 4032 rows.
Batch 2: 2021-01-08 â†’ 2021-01-15
  Saved 4032 rows.
Batch 3: 2021-01-15 â†’ 2021-01-22
  Saved 4032 rows.
Batch 4: 2021-01-22 â†’ 2021-01-29
  Saved 4032 rows.
Batch 5: 2021-01-29 â†’ 2021-02-05
  Saved 4032 rows.
Batch 6: 2021-02-05 â†’ 2021-02-12
  Saved 4032 rows.
Batch 7: 2021-02-12 â†’ 2021-02-19
  Saved 4032 rows.
Batch 8: 2021-02-19 â†’ 2021-02-26
  Saved 4032 rows.
Batch 9: 2021-02-26 â†’ 2021-03-05
  Saved 4032 rows.
Batch 10: 2021-03-05 â†’ 2021-03-12
  Saved 4032 rows.
Batch 11: 2021-03-12 â†’ 2021-03-19
  Saved 4032 rows.
Batch 12: 2021-03-19 â†’ 2021-03-26
  Saved 4032 rows.
Batch 13: 2021-03-26 â†’ 2021-04-02
  Saved 4008 rows.
Batch 14: 2021-04-02 â†’ 2021-04-09
  Saved 4032 rows.
Batch 15: 2021-04-09 â†’ 2021-04-16
  Saved 4032 rows.
Batch 16: 2021-04-16 â†’ 2021-04-23
  Saved 4032 rows.
Batch 17: 2021-04-23 â†’ 2021-04-30
  Saved 4032 rows.
Batch 18: 2021-04-30 â†’ 2021-05-07
  Saved 4032 rows.
Batch 19: 2021-05-0

                                                                                

  Saved 4200 rows.
Batch 34: 2024-08-19 â†’ 2024-08-26
  Saved 4200 rows.
Batch 35: 2024-08-26 â†’ 2024-09-02
  Saved 4200 rows.
Batch 36: 2024-09-02 â†’ 2024-09-09
  Saved 4200 rows.
Batch 37: 2024-09-09 â†’ 2024-09-16
  Saved 4200 rows.
Batch 38: 2024-09-16 â†’ 2024-09-23
  Saved 4200 rows.
Batch 39: 2024-09-23 â†’ 2024-09-30
  Saved 4200 rows.
Batch 40: 2024-09-30 â†’ 2024-10-07
  Saved 4200 rows.
Batch 41: 2024-10-07 â†’ 2024-10-14
  Saved 4200 rows.
Batch 42: 2024-10-14 â†’ 2024-10-21
  Saved 4200 rows.
Batch 43: 2024-10-21 â†’ 2024-10-28
  Saved 4225 rows.
Batch 44: 2024-10-28 â†’ 2024-11-04
  Saved 4200 rows.
Batch 45: 2024-11-04 â†’ 2024-11-11
  Saved 4200 rows.
Batch 46: 2024-11-11 â†’ 2024-11-18
  Saved 4200 rows.
Batch 47: 2024-11-18 â†’ 2024-11-25
  Saved 4200 rows.
Batch 48: 2024-11-25 â†’ 2024-12-02
  Saved 4200 rows.
Batch 49: 2024-12-02 â†’ 2024-12-09
  Saved 4200 rows.
Batch 50: 2024-12-09 â†’ 2024-12-16
  Saved 4200 rows.
Batch 51: 2024-12-16 â†’ 2024-12-23
  Saved 42

In [None]:
df.columns  = [c.lower() for c in df.columns]  # Cassandra likes lowercase column names
sdf = spark.createDataFrame(df)
sdf.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("append") \
    .options(table="production_raw", keyspace="elhub_data") \
    .save()

print("âœ… Data written to Cassandra (bronze layer)")

âœ… Data written to Cassandra (bronze layer)


In [20]:
spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .option("table", "production_raw") \
    .option("keyspace", "elhub_data") \
    .load()

DataFrame[pricearea: string, starttime: timestamp, productiongroup: string, endtime: timestamp, lastupdatedtime: timestamp, meteringgridarea: string, quantitykwh: double]