In [None]:
from pyspark.sql import SparkSession
!java -version
spark = (SparkSession.builder
    .appName("Elhub-2021-Prod")
    .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.5.1")
    .config("spark.cassandra.connection.host", "127.0.0.1")   # match docker ps
    .config("spark.cassandra.connection.port", "9042")
    .getOrCreate()
)

spark


In [None]:
import requests
import pandas as pd

base_url = "https://api.elhub.no/energy-data/v0/price-areas"

# Make a list of months in 2021
months = pd.date_range("2021-01-01", "2021-12-31", freq="MS")

all_data = []

for start in months:
    end = start + pd.offsets.MonthEnd(1)   # <-- keep as datetime
    
    # Build URL params
    params = {
        "dataset": "PRODUCTION_PER_GROUP_MBA_HOUR",
        "startDate": start.strftime("%Y-%m-%d"),
        "endDate": end.strftime("%Y-%m-%d")
    }

    print(f"Fetching {params['startDate']} to {params['endDate']}...")

    r = requests.get(base_url, params=params)
    r.raise_for_status()
    data = r.json()
    # import json
    # print(json.dumps(data, indent=2)[:1000]) 
    # Flatten "productionPerGroupMbaHour"
    records = []
    for item in data["data"]:
        prod = item["attributes"].get("productionPerGroupMbaHour", [])
        records.extend(prod)

    all_data.extend(records)

# Convert to DataFrame
df = pd.DataFrame(all_data)
print(df.head())


In [None]:
import pandas as pd

# Parse time safely (strings like "2021-01-01T00:00:00+01:00" -> UTC)
for c in ["startTime"]:
    df[c] = pd.to_datetime(df[c], utc=True, errors="coerce")

# Cassandra's TIMESTAMP is UTC with no tz-info → make tz-naive UTC
df["startTime"] = df["startTime"].dt.tz_convert("UTC").dt.tz_localize(None)

# Keep only the columns we want in Cassandra
df4 = df[["priceArea", "productionGroup", "startTime", "quantityKwh"]].copy()

print(df4.head(), df4.dtypes)

In [None]:
from pyspark.sql import functions as F

df_spark = spark.createDataFrame(df4)

# Optional: increase partitions a bit before write (tune if needed)
df_spark = df_spark.repartition(8, "priceArea", "productionGroup")

(df_spark.write
   .format("org.apache.spark.sql.cassandra")
   .options(keyspace="elhub2021", table="prod_by_group_hour")
   .mode("append")                 # safe to re-run; same PK rows will overwrite
   .save())

print("Wrote rows to Cassandra:", df_spark.count())