# IND320 – Part 4  
## Data Retrieval and Storage (Production & Consumption 2021–2024)

This notebook retrieves hourly energy production and consumption data from the Elhub API, using the same approach as in Part 2.  
The new data (2022–2024) are appended to the 2021 data and stored in both Cassandra and MongoDB.  

In [None]:
from cassandra.cluster import Cluster

try:
    cluster = Cluster(["127.0.0.1"], port=9042)
    session = cluster.connect()
    print("✅ Cassandra connected successfully")
except Exception as e:
        print("❌ Failed to connect to Cassandra:", repr(e))



✅ Cassandra connected successfully


Creating keyspace and tabeles for Part4

In [41]:
KEYSPACE = "my_keyspace"

# Change keyspace first
session.execute(f"USE {KEYSPACE};")

# New table names for part 4
PROD_TABLE = "production_per_group_hour_2022_2024"
CONS_TABLE = "consumption_per_group_hour_2021_2024"

# --- Production table 2022–2024 ---
session.execute(f"""
CREATE TABLE IF NOT EXISTS {PROD_TABLE} (
    pricearea text,
    productiongroup text,
    starttime timestamp,
    quantitykwh double,
    PRIMARY KEY ((pricearea, productiongroup), starttime)
) WITH CLUSTERING ORDER BY (starttime ASC);
""")

print("Production table ready")

# --- Consumption table 2021–2024 ---
session.execute(f"""
CREATE TABLE IF NOT EXISTS {CONS_TABLE} (
    pricearea text,
    consumptiongroup text,
    starttime timestamp,
    quantitykwh double,
    PRIMARY KEY ((pricearea, consumptiongroup), starttime)
) WITH CLUSTERING ORDER BY (starttime ASC);
""")

print("Consumption table ready")


Production table ready
Consumption table ready


API Helper Function

## Below API helper functions
Below I import and reuse the helper functions from Part 2, but I generalize them so they can fetch
multiple years and price areas. These helpers will be used to download hourly data month-by-month.

In [17]:
# --- Elhub API helper functions (reuse from Part 2, slightly generalized) ---
import requests
import pandas as pd
from datetime import datetime
from dateutil.relativedelta import relativedelta

BASE_URL = "https://api.elhub.no/energy-data/v0/price-areas"
TIMEZONE_OFFSET = "%2B02:00"   # +02:00 URL encoded
PRICE_AREAS = ["NO1", "NO2", "NO3", "NO4", "NO5"]


def month_edges(year: int):
    """Generate (start, end) pairs for each month in a year."""
    cur = datetime(year, 1, 1)
    end = datetime(year + 1, 1, 1)
    while cur < end:
        nxt = cur + relativedelta(months=1)
        yield cur, min(nxt, end)
        cur = nxt


def fetch_month(start_dt: datetime, end_dt: datetime, area: str, dataset: str):
    """
    Fetch a single month for one price area and one dataset.
    dataset is e.g. 'PRODUCTION_PER_GROUP_MBA_HOUR' or 'CONSUMPTION_PER_GROUP_MBA_HOUR'
    """
    start_str = start_dt.strftime("%Y-%m-%dT%H:%M:%S") + TIMEZONE_OFFSET
    end_str   = end_dt.strftime("%Y-%m-%dT%H:%M:%S")   + TIMEZONE_OFFSET

    url = (
        f"{BASE_URL}?dataset={dataset}"
        f"&startDate={start_str}&endDate={end_str}"
        f"&priceArea={area}"
    )
    r = requests.get(url, timeout=60)
    r.raise_for_status()
    js = r.json()

    # Decide which key to look for in the JSON
    if "PRODUCTION" in dataset.upper():
        key_name = "productionPerGroupMbaHour"
    else:
        key_name = "consumptionPerGroupMbaHour"

    # Form 1: top-level list
    if isinstance(js.get(key_name), list):
        return js[key_name]

    # Form 2: { "data": [ { "attributes": { key_name: [...] }}, ... ] }
    rows = []
    for entity in js.get("data", []):
        attrs = (entity or {}).get("attributes", {}) or {}
        rows.extend(attrs.get(key_name, []))
    return rows

## Fetching and cleaning Elhub data

Here I download hourly production (2022–2024) and consumption (2021–2024) data
from the Elhub API, looping month-by-month for all price areas (NO1–NO5).
The `fetch_month()` function handles both possible JSON response formats.

After downloading, I convert the raw data into DataFrames, rename columns to match
the Cassandra schema, convert timestamps, ensure numeric types, remove missing
values, and drop duplicates. The cleaned datasets (`df_prod_norm` and `df_cons_norm`)
are then ready to be inserted into Cassandra.

In [44]:
import requests
import pandas as pd
from datetime import datetime
from dateutil.relativedelta import relativedelta

# ----------------- Configuration -----------------
BASE_URL = "https://api.elhub.no/energy-data/v0/price-areas"
TIMEZONE_OFFSET = "%2B02:00"
PRICE_AREAS = ["NO1", "NO2", "NO3", "NO4", "NO5"]

# Production configuration
DATASET_PROD = "PRODUCTION_PER_GROUP_MBA_HOUR"
YEARS_PROD = [2022, 2023, 2024]

# Consumption configuration
DATASET_CONS = "CONSUMPTION_PER_GROUP_MBA_HOUR"
YEARS_CONS = [2021, 2022, 2023, 2024]


def month_edges(year: int):
    """Generate month start/end pairs for a given year."""
    cur = datetime(year, 1, 1)
    end = datetime(year + 1, 1, 1)
    while cur < end:
        nxt = cur + relativedelta(months=1)
        yield cur, min(nxt, end)
        cur = nxt


def fetch_month(start_dt: datetime, end_dt: datetime, area: str, dataset: str, top_key: str):
    """Download one month of data for one price area for the given dataset."""
    start_str = start_dt.strftime("%Y-%m-%dT%H:%M:%S") + TIMEZONE_OFFSET
    end_str = end_dt.strftime("%Y-%m-%dT%H:%M:%S") + TIMEZONE_OFFSET

    url = (
        f"{BASE_URL}?dataset={dataset}"
        f"&startDate={start_str}&endDate={end_str}"
        f"&priceArea={area}"
    )

    r = requests.get(url, timeout=60)
    r.raise_for_status()
    js = r.json()

    # Case 1: data appears directly as a top-level list
    if isinstance(js.get(top_key), list):
        return js[top_key]

    # Case 2: data inside "data" → "attributes"
    rows = []
    for entity in js.get("data", []):
        attrs = (entity or {}).get("attributes", {}) or {}
        rows.extend(attrs.get(top_key, []))
    return rows


# ----------------- Production 2022–2024 -----------------
prod_rows = []

for year in YEARS_PROD:
    for area in PRICE_AREAS:
        for s, e in month_edges(year):
            try:
                rows = fetch_month(s, e, area, DATASET_PROD, "productionPerGroupMbaHour")
                print(f"PROD {year} {s:%b} {area}: {len(rows)} rows")
                prod_rows.extend(rows)
            except Exception as ex:
                print(f"ERROR PROD {year} {s:%b} {area}: {ex}")

print(f"\nTotal production rows fetched: {len(prod_rows):,}")

raw_prod = pd.DataFrame(prod_rows)

expected_prod_cols = {
    "priceArea": "pricearea",
    "productionGroup": "productiongroup",
    "startTime": "starttime",
    "quantityKwh": "quantitykwh",
}

missing = [c for c in expected_prod_cols if c not in raw_prod.columns]
if missing:
    raise ValueError(f"Missing columns in production data: {missing}")

df_prod = (
    raw_prod[list(expected_prod_cols)]
    .rename(columns=expected_prod_cols)
    .copy()
)

df_prod["starttime"] = pd.to_datetime(df_prod["starttime"], errors="coerce", utc=True)
df_prod["quantitykwh"] = pd.to_numeric(df_prod["quantitykwh"], errors="coerce")

df_prod = df_prod.dropna(subset=["pricearea", "productiongroup", "starttime", "quantitykwh"])

before = len(df_prod)
df_prod_norm = df_prod.drop_duplicates(subset=["pricearea", "productiongroup", "starttime"])
after = len(df_prod_norm)

print("Production rows before dedup:", before)
print("Production rows after dedup :", after)
print("Final production rows:", len(df_prod_norm))


# ----------------- Consumption 2021–2024 -----------------
cons_rows = []

for year in YEARS_CONS:
    for area in PRICE_AREAS:
        for s, e in month_edges(year):
            try:
                rows = fetch_month(s, e, area, DATASET_CONS, "consumptionPerGroupMbaHour")
                print(f"CONS {year} {s:%b} {area}: {len(rows)} rows")
                cons_rows.extend(rows)
            except Exception as ex:
                print(f"ERROR CONS {year} {s:%b} {area}: {ex}")

print(f"\nTotal consumption rows fetched: {len(cons_rows):,}")

raw_cons = pd.DataFrame(cons_rows)

expected_cons_cols = {
    "priceArea": "pricearea",
    "consumptionGroup": "consumptiongroup",
    "startTime": "starttime",
    "quantityKwh": "quantitykwh",
}

missing = [c for c in expected_cons_cols if c not in raw_cons.columns]
if missing:
    raise ValueError(f"Missing columns in consumption data: {missing}")

df_cons = (
    raw_cons[list(expected_cons_cols)]
    .rename(columns=expected_cons_cols)
    .copy()
)

df_cons["starttime"] = pd.to_datetime(df_cons["starttime"], errors="coerce", utc=True)
df_cons["quantitykwh"] = pd.to_numeric(df_cons["quantitykwh"], errors="coerce")

df_cons = df_cons.dropna(subset=["pricearea", "consumptiongroup", "starttime", "quantitykwh"])

before = len(df_cons)
df_cons_norm = df_cons.drop_duplicates(subset=["pricearea", "consumptiongroup", "starttime"])
after = len(df_cons_norm)

print("Consumption rows before dedup:", before)
print("Consumption rows after dedup :", after)
print("Final consumption rows:", len(df_cons_norm))

df_cons_norm.head()


PROD 2022 Jan NO1: 18600 rows
PROD 2022 Feb NO1: 16800 rows
PROD 2022 Mar NO1: 18575 rows
PROD 2022 Apr NO1: 18000 rows
PROD 2022 May NO1: 18600 rows
PROD 2022 Jun NO1: 18000 rows
PROD 2022 Jul NO1: 18600 rows
PROD 2022 Aug NO1: 18600 rows
PROD 2022 Sep NO1: 18000 rows
PROD 2022 Oct NO1: 18625 rows
PROD 2022 Nov NO1: 18000 rows
PROD 2022 Dec NO1: 18600 rows
PROD 2022 Jan NO2: 18600 rows
PROD 2022 Feb NO2: 16800 rows
PROD 2022 Mar NO2: 18575 rows
PROD 2022 Apr NO2: 18000 rows
PROD 2022 May NO2: 18600 rows
PROD 2022 Jun NO2: 18000 rows
PROD 2022 Jul NO2: 18600 rows
PROD 2022 Aug NO2: 18600 rows
PROD 2022 Sep NO2: 18000 rows
PROD 2022 Oct NO2: 18625 rows
PROD 2022 Nov NO2: 18000 rows
PROD 2022 Dec NO2: 18600 rows
PROD 2022 Jan NO3: 18600 rows
PROD 2022 Feb NO3: 16800 rows
PROD 2022 Mar NO3: 18575 rows
PROD 2022 Apr NO3: 18000 rows
PROD 2022 May NO3: 18600 rows
PROD 2022 Jun NO3: 18000 rows
PROD 2022 Jul NO3: 18600 rows
PROD 2022 Aug NO3: 18600 rows
PROD 2022 Sep NO3: 18000 rows
PROD 2022 

Unnamed: 0,pricearea,consumptiongroup,starttime,quantitykwh
0,NO1,cabin,2020-12-31 23:00:00+00:00,177071.56
1,NO1,cabin,2021-01-01 00:00:00+00:00,171335.12
2,NO1,cabin,2021-01-01 01:00:00+00:00,164912.02
3,NO1,cabin,2021-01-01 02:00:00+00:00,160265.77
4,NO1,cabin,2021-01-01 03:00:00+00:00,159828.69


## Interpretation of fetched consumption data

The Elhub API returned a total of 4.38 million raw consumption rows for the
period 2021–2024 across all price areas. After cleaning (removing invalid rows)
and dropping duplicates on the combination of (pricearea, consumptiongroup,
starttime), the dataset was reduced to 876,600 unique hourly records.

This cleaned dataset will now be written into Cassandra using Spark in the next step.

In [19]:
# --- Write production 2022–2024 to Cassandra ---

# We reuse KEYSPACE, PROD_TABLE and spark_sess from earlier cells

# Convert Pandas → Spark
sdf_prod = spark_sess.createDataFrame(df_prod_2022_2024)

# Partition by area and group for better distribution
sdf_prod = sdf_prod.repartition("pricearea", "productiongroup")

# Write to Cassandra
(
    sdf_prod.write
    .format("org.apache.spark.sql.cassandra")
    .options(keyspace=KEYSPACE, table=PROD_TABLE)
    .mode("append")
    .save()
)

print(f"Wrote to Cassandra: {KEYSPACE}.{PROD_TABLE}")

Wrote to Cassandra: my_keyspace.production_per_group_hour_2022_2024


In [20]:
# --- Verify production table in Cassandra ---

from pyspark.sql import functions as F

sdf_prod_cas = (
    spark_sess.read
    .format("org.apache.spark.sql.cassandra")
    .options(keyspace=KEYSPACE, table=PROD_TABLE)
    .load()
    .select("pricearea", "productiongroup", "starttime", "quantitykwh")
)

rows_prod = sdf_prod_cas.count()
areas_prod = sdf_prod_cas.select("pricearea").distinct().count()
groups_prod = sdf_prod_cas.select("productiongroup").distinct().count()

print(f"Rows in {PROD_TABLE}: {rows_prod:,}")
print(f"Distinct price areas: {areas_prod} | Distinct production groups: {groups_prod}")

# Quick preview
sdf_prod_cas.orderBy("starttime").show(5)

Rows in production_per_group_hour_2022_2024: 657,525
Distinct price areas: 5 | Distinct production groups: 5
+---------+---------------+-------------------+-----------+
|pricearea|productiongroup|          starttime|quantitykwh|
+---------+---------------+-------------------+-----------+
|      NO2|           wind|2022-01-01 00:00:00|   309870.0|
|      NO1|        thermal|2022-01-01 00:00:00|   30049.92|
|      NO5|          solar|2022-01-01 00:00:00|     42.136|
|      NO1|          hydro|2022-01-01 00:00:00|  1291422.4|
|      NO4|          hydro|2022-01-01 00:00:00|  3840983.2|
+---------+---------------+-------------------+-----------+
only showing top 5 rows



## Verifying the Cassandra production table

After writing the cleaned production dataset to Cassandra, I verify that the
table was stored correctly by reading it back with Spark. Here I check:

- total number of rows written  
- number of distinct price areas  
- number of distinct production groups  
- and preview a few rows ordered by time

This confirms that the Cassandra write operation worked as expected.


In [22]:
# Write normalized consumption data (2021–2024) to Cassandra

from pyspark.sql import functions as F

print("Creating Spark DataFrame from df_cons_norm ...")
sdf_cons = spark_sess.createDataFrame(df_cons_norm)

# Færre, men fornuftige partisjoner
spark_sess.conf.set("spark.sql.shuffle.partitions", "64")

# Repartition to match primary key (pricearea, consumptiongroup)
sdf_cons = sdf_cons.repartition("pricearea", "consumptiongroup")

print("Writing consumption data to Cassandra table:", CONS_TABLE)

(
    sdf_cons.write
    .format("org.apache.spark.sql.cassandra")
    .options(keyspace=KEYSPACE, table=CONS_TABLE)
    .mode("append")
    .save()
)

print("Write complete.")

Creating Spark DataFrame from df_cons_norm ...
Writing consumption data to Cassandra table: consumption_per_group_hour_2021_2024
Write complete.


In [23]:
# Verify consumption data in Cassandra

from pyspark.sql import functions as F

sdf_cons_cas = (
    spark_sess.read
    .format("org.apache.spark.sql.cassandra")
    .options(keyspace=KEYSPACE, table=CONS_TABLE)
    .load()
)

rows = sdf_cons_cas.count()
areas = sdf_cons_cas.select("pricearea").distinct().count()
groups = sdf_cons_cas.select("consumptiongroup").distinct().count()

print(f"Rows in {CONS_TABLE}: {rows:,}")
print(f"Distinct price areas: {areas} | Distinct consumption groups: {groups}")
sdf_cons_cas.show(5)

Rows in consumption_per_group_hour_2021_2024: 876,500
Distinct price areas: 5 | Distinct consumption groups: 5
+---------+----------------+-------------------+-----------+
|pricearea|consumptiongroup|          starttime|quantitykwh|
+---------+----------------+-------------------+-----------+
|      NO2|        tertiary|2021-01-01 00:00:00|  608354.44|
|      NO2|        tertiary|2021-01-01 01:00:00|   606207.2|
|      NO2|        tertiary|2021-01-01 02:00:00|  603700.75|
|      NO2|        tertiary|2021-01-01 03:00:00|  606288.25|
|      NO2|        tertiary|2021-01-01 04:00:00|   615663.1|
+---------+----------------+-------------------+-----------+
only showing top 5 rows



The verification confirms that the consumption table was successfully written to
Cassandra. The row count matches the number of cleaned records inserted, and all
price areas and consumption groups are present. The preview also shows that the
timestamps and kWh values were stored correctly. This completes the ingestion
pipeline for both production and consumption data.

## Writing production and consumption data to MongoDB

In this step I connect to MongoDB Atlas using the URI stored in the `.env` file.
I create two new collections for Part 4:
- `elhub_production_data_2022_2024`
- `elhub_consumption_data_2021_2024`

I then insert the cleaned datasets `df_prod_norm` and `df_cons_norm` directly
into these collections. Before inserting, I optionally clear old documents to
ensure a fresh dataset. The final counts in the output confirm that all rows
were inserted correctly.


MongoDB - production 2022-2024 + consumption 2021-2024

In [28]:
# --- MongoDB connection setup ---
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
from dotenv import load_dotenv
import os
load_dotenv()  # reads .env locally (same as in app.py)
uri = os.getenv("MONGODB_URI")
if not uri:
   raise RuntimeError("MONGODB_URI is missing in .env")
# Connect to MongoDB Atlas
client = MongoClient(uri, server_api=ServerApi('1'))
client.admin.command("ping")
print("MongoDB connection OK")
db = client["ind320"]
# New collections for part 4
prod_coll = db["elhub_production_data_2022_2024"]
cons_coll = db["elhub_consumption_data_2021_2024"]
print("Using collections:", prod_coll.name, "and", cons_coll.name)

MongoDB connection OK
Using collections: elhub_production_data_2022_2024 and elhub_consumption_data_2021_2024


Save df_prod_norm and df_cons_norm in MongoDB

In [45]:
# --- Insert Production Data (2022–2024) into MongoDB ---
print("Inserting production data into MongoDB...")
prod_coll.delete_many({})      # optional: clean old data
prod_coll.insert_many(df_prod_norm.to_dict("records"))
print("Production data inserted:", prod_coll.count_documents({}))
# --- Insert Consumption Data (2021–2024) into MongoDB ---
print("Inserting consumption data into MongoDB...")
cons_coll.delete_many({})      # optional
cons_coll.insert_many(df_cons_norm.to_dict("records"))
print("Consumption data inserted:", cons_coll.count_documents({}))

Inserting production data into MongoDB...
Production data inserted: 657600
Inserting consumption data into MongoDB...
Consumption data inserted: 876600


The output confirms that both datasets were successfully inserted into MongoDB.
The production collection contains 657,600 documents, and the consumption
collection contains 876,600 documents, matching the cleaned DataFrame sizes.
This completes the MongoDB storage step for Part 4.


Sliding Window Correlation – Summary

I implemented the sliding-window correlation in Streamlit with Plotly and tested it using different meteorological variables, energy types, lags, and window lengths.

Findings

Correlation changes noticeably through the year and is not constant.

During stable weather, the correlation appears smoother and more predictable.

Extreme events (temperature drops, storms, heavy precipitation) cause clear spikes or dips, especially with shorter windows.

Longer windows smooth the curve, while lag adjustments reveal delayed energy responses.

The interactive tool made these variations easy to observe and helped link specific weather conditions to changes in energy production and consumption.

## Final Summary

In Part 4, I completed the workflow for downloading, cleaning, and storing the full Elhub datasets (2021–2024) and loading them into both Cassandra and MongoDB. After verifying the schemas, the data were successfully written and checked using Spark.

One issue I encountered was that old tables from a previous project were still present. These duplicates caused the new Elhub data to appear incomplete. After removing the outdated tables and recreating the correct ones, the imports worked properly and the row counts matched the API output.

With the corrected data pipeline, all Streamlit components—STL decomposition, spectrograms, sliding-window correlation, anomaly detection, and forecasting—now work as intended using the updated datasets. This completes the end-to-end setup for the IND320 project and ensures that all analyses run on clean and consistent data.


## Project Links

Below are the required links to the public GitHub repository and the deployed Streamlit application:

- **Streamlit App (Online Dashboard)**  
  https://ind320-dashboard33.streamlit.app/

- **GitHub Repository (Source Code & Notebooks)**  
  https://github.com/AbdirahmanOsma/IND320

These links provide access to the full data pipeline, final dashboard, and all code used throughout the project.