# IND320 Project Part 4: Machine Learning & Advanced Analytics

**Student:** Hasan Elahi  
**Course:** IND320 - Data to Decision  
**Institution:** NMBU

**GitHub Repository:** https://github.com/hasanelahi7/hasanelahi7-ind320-dashboard  
**Streamlit App:** https://hasanelahi7-ind320-dashboard-final.streamlit.app/

## Table of Contents

1. [Setup and Imports](#setup)
2. [Spark and Cassandra Configuration](#spark-setup)
3. [Data Collection: Production 2022-2024](#production-data)
4. [Data Collection: Consumption 2021-2024](#consumption-data)
5. [Store in Cassandra](#cassandra-storage)
6. [Upload to MongoDB](#mongodb-upload)
7. [Data Verification and Statistics](#verification)
8. [Project Work Log](#work-log)
9. [AI Usage Documentation](#ai-usage)

<a id='setup'></a>
## 1. Setup and Imports

Install and import necessary libraries for data fetching, processing, and storage.

In [1]:
# Install required packages if needed
# !pip install pyspark pymongo requests pandas numpy

In [2]:
import os
import sys
import datetime as dt
import pandas as pd
import numpy as np
import requests
from pymongo import MongoClient, ASCENDING
from pymongo.errors import BulkWriteError

# Display settings
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)

print("✅ All imports successful")
print(f"Python version: {sys.version}")
print(f"Pandas version: {pd.__version__}")

✅ All imports successful
Python version: 3.8.16 (default, Mar  1 2023, 21:19:10) 
[Clang 14.0.6 ]
Pandas version: 1.5.3


<a id='spark-setup'></a>
## 2. Spark and Cassandra Configuration

Configure Java environment and initialize Spark session with Cassandra connector.

In [3]:
# Set Java Home for Spark
JAVA11 = "/Library/Java/JavaVirtualMachines/temurin-11.jdk/Contents/Home"
os.environ["JAVA_HOME"] = JAVA11
os.environ["PATH"] = JAVA11 + "/bin:" + os.environ["PATH"]

print("JAVA_HOME =", os.environ.get("JAVA_HOME"))

# Verify Java version
import subprocess
java_version = subprocess.run(["java", "-version"], capture_output=True, text=True).stderr.strip()
print("Java version:")
print(java_version)

JAVA_HOME = /Library/Java/JavaVirtualMachines/temurin-11.jdk/Contents/Home
Java version:
openjdk version "11.0.28" 2025-07-15
OpenJDK Runtime Environment Temurin-11.0.28+6 (build 11.0.28+6)
OpenJDK 64-Bit Server VM Temurin-11.0.28+6 (build 11.0.28+6, mixed mode)


In [4]:
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Fix Python version mismatch: tell Spark workers to use the same Python as the driver
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

print(f"Setting Spark Python to: {sys.executable}")

# Create Spark session with Cassandra connector
spark = (
    SparkSession.builder
    .appName("IND320-Part4")
    .config("spark.master", "local[*]")
    .config("spark.sql.session.timeZone", "UTC")
    .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.5.1")
    .config("spark.cassandra.connection.host", "127.0.0.1")
    .config("spark.cassandra.connection.port", "9042")
    .getOrCreate()
)

print("✅ Spark session created successfully")
print(f"Spark version: {spark.version}")

Setting Spark Python to: /Users/hasanelahi/miniconda3/bin/python


25/11/20 23:53:26 WARN Utils: Your hostname, Hasans-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.11.148 instead (on interface en0)
25/11/20 23:53:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/hasanelahi/miniconda3/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/hasanelahi/.ivy2/cache
The jars for the packages stored in: /Users/hasanelahi/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-10f6ae28-1358-47a8-afd2-d90518409e79;1.0
	confs: [default]
	found com.datastax.spark#spark-cassandra-connector_2.12;3.5.1 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.5.1 in central
	found org.scala-lang.modules#scala-collection-compat_2.12;2.11.0 in central
	found org.apache.cassandra#java-driver-core-shaded;4.18.1 in central
	found com.datastax.oss#native-protocol;1.5.1 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.4.1 in central
	found org.slf4j#slf4j-api;1.7.26 in central
	found io.dropwizard.metrics#metrics-core;4.1.18 in central
	found org.hdrhistogram#HdrHistogram;2.1.12 in central
	found org.reactivestreams#reactive-s

✅ Spark session created successfully
Spark version: 3.5.7


<a id='production-data'></a>
## 3. Data Collection: Production 2022-2024

Fetch hourly production data for all Norwegian price areas (NO1-NO5) for years 2022, 2023, and 2024.

In [5]:
# Helper function to generate month ranges
def month_ranges(year: int):
    """Generate start and end timestamps for each month of a given year."""
    tz = dt.timezone.utc
    for m in range(1, 13):
        start = dt.datetime(year, m, 1, 0, 0, tzinfo=tz)
        if m == 12:
            end = dt.datetime(year+1, 1, 1, 0, 0, tzinfo=tz) - dt.timedelta(hours=1)
        else:
            end = dt.datetime(year, m+1, 1, 0, 0, tzinfo=tz) - dt.timedelta(hours=1)
        yield m, start.isoformat().replace("+00:00", "Z"), end.isoformat().replace("+00:00", "Z")

print("✅ Helper function defined")

✅ Helper function defined


In [6]:
def fetch_production_month(start_iso: str, end_iso: str) -> pd.DataFrame:
    """
    Fetch hourly production per group for ALL price areas between start_iso and end_iso.
    Uses /price-areas endpoint with PRODUCTION_PER_GROUP_MBA_HOUR dataset.
    """
    BASE = "https://api.elhub.no/energy-data/v0"
    url = f"{BASE}/price-areas"
    params = {
        "dataset": "PRODUCTION_PER_GROUP_MBA_HOUR",
        "startTime": start_iso,
        "endTime": end_iso,
        "pageSize": 200
    }

    all_rows = []

    def extract_page(u, p):
        r = requests.get(u, params=p, headers={"Accept": "application/json"}, timeout=60)
        r.raise_for_status()
        js = r.json()
        data = js.get("data", [])
        for item in data:
            attr = item.get("attributes", {}) or {}
            area_name = attr.get("name")  # e.g., "NO1"
            rows = attr.get("productionPerGroupMbaHour", []) or []
            for row in rows:
                row = dict(row)
                if "priceArea" not in row or not row["priceArea"]:
                    row["priceArea"] = area_name
                all_rows.append(row)
        next_url = js.get("links", {}).get("next")
        return next_url

    next_url = url
    next_params = params
    while next_url:
        next_url = extract_page(next_url, next_params)
        next_params = None

    df = pd.DataFrame(all_rows)
    if df.empty:
        return df

    df = df.rename(columns={
        "priceArea": "priceArea",
        "productionGroup": "productionGroup",
        "startTime": "startTime",
        "quantityKwh": "quantityKwh"
    })
    keep = [c for c in ["priceArea", "productionGroup", "startTime", "quantityKwh"] if c in df.columns]
    df = df[keep].copy()
    df["startTime"] = pd.to_datetime(df["startTime"], utc=True, errors="coerce")
    df["quantityKwh"] = pd.to_numeric(df["quantityKwh"], errors="coerce")
    df = df.dropna(subset=["priceArea", "productionGroup", "startTime"])
    return df

print("✅ Production fetch function defined")

✅ Production fetch function defined


In [7]:
# Fetch production data for 2022-2024
production_years = [2022, 2023, 2024]
production_parts = []

print("Fetching production data for 2022-2024...")
print("This will take several minutes. Please be patient.\n")

for year in production_years:
    print(f"\n=== Year {year} ===")
    year_parts = []
    for m, s, e in month_ranges(year):
        try:
            df_m = fetch_production_month(s, e)
            print(f"  Month {m:02d}: {len(df_m):,} rows")
            year_parts.append(df_m)
        except Exception as ex:
            print(f"  Month {m:02d}: ERROR - {ex}")
    
    if year_parts:
        year_df = pd.concat(year_parts, ignore_index=True)
        production_parts.append(year_df)
        print(f"Year {year} total: {len(year_df):,} rows")

# Combine all years
if production_parts:
    production_2022_2024 = pd.concat(production_parts, ignore_index=True)
    print(f"\n✅ Production data 2022-2024 fetched: {len(production_2022_2024):,} total rows")
    print(f"Date range: {production_2022_2024['startTime'].min()} to {production_2022_2024['startTime'].max()}")
    print(f"\nSample data:")
    print(production_2022_2024.head())
else:
    print("⚠️ No production data fetched")
    production_2022_2024 = pd.DataFrame()

Fetching production data for 2022-2024...
This will take several minutes. Please be patient.


=== Year 2022 ===
  Month 01: 18,818 rows
  Month 02: 18,818 rows
  Month 03: 18,818 rows
  Month 04: 18,818 rows
  Month 05: 18,818 rows
  Month 06: 18,818 rows
  Month 07: 18,818 rows
  Month 08: 18,818 rows
  Month 09: 18,818 rows
  Month 10: 18,818 rows
  Month 11: 18,818 rows
  Month 12: 18,818 rows
Year 2022 total: 225,816 rows

=== Year 2023 ===
  Month 01: 18,818 rows
  Month 02: 18,818 rows
  Month 03: 18,818 rows
  Month 04: 18,818 rows
  Month 05: 18,818 rows
  Month 06: 18,818 rows
  Month 07: 18,818 rows
  Month 08: 18,818 rows
  Month 09: 18,818 rows
  Month 10: 18,818 rows
  Month 11: 18,818 rows
  Month 12: 18,818 rows
Year 2023 total: 225,816 rows

=== Year 2024 ===
  Month 01: 18,818 rows
  Month 02: 18,818 rows
  Month 03: 18,818 rows
  Month 04: 18,818 rows
  Month 05: 18,818 rows
  Month 06: 18,818 rows
  Month 07: 18,818 rows
  Month 08: 18,818 rows
  Month 09: 18,818 ro

<a id='consumption-data'></a>
## 4. Data Collection: Consumption 2021-2024

Fetch hourly consumption data for all Norwegian price areas for years 2021, 2022, 2023, and 2024.

In [8]:
def fetch_consumption_month(start_iso: str, end_iso: str) -> pd.DataFrame:
    """
    Fetch hourly consumption per group for ALL price areas.
    Uses /price-areas endpoint with CONSUMPTION_PER_GROUP_MBA_HOUR dataset.
    """
    BASE = "https://api.elhub.no/energy-data/v0"
    url = f"{BASE}/price-areas"
    params = {
        "dataset": "CONSUMPTION_PER_GROUP_MBA_HOUR",
        "startTime": start_iso,
        "endTime": end_iso,
        "pageSize": 200
    }

    all_rows = []

    def extract_page(u, p):
        r = requests.get(u, params=p, headers={"Accept": "application/json"}, timeout=60)
        r.raise_for_status()
        js = r.json()
        data = js.get("data", [])
        for item in data:
            attr = item.get("attributes", {}) or {}
            area_name = attr.get("name")
            rows = attr.get("consumptionPerGroupMbaHour", []) or []
            for row in rows:
                row = dict(row)
                if "priceArea" not in row or not row["priceArea"]:
                    row["priceArea"] = area_name
                all_rows.append(row)
        next_url = js.get("links", {}).get("next")
        return next_url

    next_url = url
    next_params = params
    while next_url:
        next_url = extract_page(next_url, next_params)
        next_params = None

    df = pd.DataFrame(all_rows)
    if df.empty:
        return df

    df = df.rename(columns={
        "priceArea": "priceArea",
        "consumptionGroup": "consumptionGroup",
        "startTime": "startTime",
        "quantityKwh": "quantityKwh"
    })
    keep = [c for c in ["priceArea", "consumptionGroup", "startTime", "quantityKwh"] if c in df.columns]
    df = df[keep].copy()
    df["startTime"] = pd.to_datetime(df["startTime"], utc=True, errors="coerce")
    df["quantityKwh"] = pd.to_numeric(df["quantityKwh"], errors="coerce")
    df = df.dropna(subset=["priceArea", "consumptionGroup", "startTime"])
    return df

print("✅ Consumption fetch function defined")

✅ Consumption fetch function defined


In [9]:
# Fetch consumption data for 2021-2024
consumption_years = [2021, 2022, 2023, 2024]
consumption_parts = []

print("Fetching consumption data for 2021-2024...")
print("This will take several minutes. Please be patient.\n")

for year in consumption_years:
    print(f"\n=== Year {year} ===")
    year_parts = []
    for m, s, e in month_ranges(year):
        try:
            df_m = fetch_consumption_month(s, e)
            print(f"  Month {m:02d}: {len(df_m):,} rows")
            year_parts.append(df_m)
        except Exception as ex:
            print(f"  Month {m:02d}: ERROR - {ex}")
    
    if year_parts:
        year_df = pd.concat(year_parts, ignore_index=True)
        consumption_parts.append(year_df)
        print(f"Year {year} total: {len(year_df):,} rows")

# Combine all years
if consumption_parts:
    consumption_2021_2024 = pd.concat(consumption_parts, ignore_index=True)
    print(f"\n✅ Consumption data 2021-2024 fetched: {len(consumption_2021_2024):,} total rows")
    print(f"Date range: {consumption_2021_2024['startTime'].min()} to {consumption_2021_2024['startTime'].max()}")
    print(f"\nSample data:")
    print(consumption_2021_2024.head())
else:
    print("⚠️ No consumption data fetched")
    consumption_2021_2024 = pd.DataFrame()

Fetching consumption data for 2021-2024...
This will take several minutes. Please be patient.


=== Year 2021 ===
  Month 01: 18,025 rows
  Month 02: 18,025 rows
  Month 03: 18,025 rows
  Month 04: 18,025 rows
  Month 05: 18,025 rows
  Month 06: 18,025 rows
  Month 07: 18,025 rows
  Month 08: 18,025 rows
  Month 09: 18,025 rows
  Month 10: 18,025 rows
  Month 11: 18,025 rows
  Month 12: 18,025 rows
Year 2021 total: 216,300 rows

=== Year 2022 ===
  Month 01: 18,025 rows
  Month 02: 18,025 rows
  Month 03: 18,025 rows
  Month 04: 18,025 rows
  Month 05: 18,025 rows
  Month 06: 18,025 rows
  Month 07: 18,025 rows
  Month 08: 18,025 rows
  Month 09: 18,025 rows
  Month 10: 18,025 rows
  Month 11: 18,025 rows
  Month 12: 18,025 rows
Year 2022 total: 216,300 rows

=== Year 2023 ===
  Month 01: 18,025 rows
  Month 02: 18,025 rows
  Month 03: 18,025 rows
  Month 04: 18,025 rows
  Month 05: 18,025 rows
  Month 06: 18,025 rows
  Month 07: 18,025 rows
  Month 08: 18,025 rows
  Month 09: 18,025 r

<a id='cassandra-storage'></a>
## 5. Store in Cassandra

Write production and consumption data to Cassandra using Spark for efficient distributed storage.

In [10]:
# Store Production 2022-2024 in Cassandra
if not production_2022_2024.empty:
    print("Writing production 2022-2024 to Cassandra...")
    
    sdf_prod = (
        spark.createDataFrame(production_2022_2024)
        .select(
            col("priceArea").alias("pricearea"),
            col("productionGroup").alias("productiongroup"),
            col("startTime").alias("starttime"),
            col("quantityKwh").alias("quantitykwh"),
        )
    )
    
    (
        sdf_prod.write
        .format("org.apache.spark.sql.cassandra")
        .mode("append")
        .options(table="elhub_prod_2022_2024", keyspace="ind320")
        .save()
    )
    
    print(f"✅ Written {sdf_prod.count():,} production rows to Cassandra (table: elhub_prod_2022_2024)")
else:
    print("⚠️ No production data to write")

Writing production 2022-2024 to Cassandra...


25/11/20 23:58:54 WARN TaskSetManager: Stage 0 contains a task of very large size (5794 KiB). The maximum recommended task size is 1000 KiB.
25/11/20 23:59:28 WARN TaskSetManager: Stage 1 contains a task of very large size (5794 KiB). The maximum recommended task size is 1000 KiB.
[Stage 1:>                                                          (0 + 4) / 4]

✅ Written 677,448 production rows to Cassandra (table: elhub_prod_2022_2024)


                                                                                

In [11]:
# Store Consumption 2021-2024 in Cassandra
if not consumption_2021_2024.empty:
    print("Writing consumption 2021-2024 to Cassandra...")
    
    sdf_cons = (
        spark.createDataFrame(consumption_2021_2024)
        .select(
            col("priceArea").alias("pricearea"),
            col("consumptionGroup").alias("consumptiongroup"),
            col("startTime").alias("starttime"),
            col("quantityKwh").alias("quantitykwh"),
        )
    )
    
    (
        sdf_cons.write
        .format("org.apache.spark.sql.cassandra")
        .mode("append")
        .options(table="elhub_cons_2021_2024", keyspace="ind320")
        .save()
    )
    
    print(f"✅ Written {sdf_cons.count():,} consumption rows to Cassandra (table: elhub_cons_2021_2024)")
else:
    print("⚠️ No consumption data to write")

Writing consumption 2021-2024 to Cassandra...


25/11/21 00:00:24 WARN TaskSetManager: Stage 4 contains a task of very large size (7951 KiB). The maximum recommended task size is 1000 KiB.
25/11/21 00:00:46 WARN TaskSetManager: Stage 5 contains a task of very large size (7951 KiB). The maximum recommended task size is 1000 KiB.
[Stage 5:>                                                          (0 + 4) / 4]

✅ Written 865,200 consumption rows to Cassandra (table: elhub_cons_2021_2024)




<a id='mongodb-upload'></a>
## 6. Upload to MongoDB

Read data back from Cassandra and upload to MongoDB Atlas for cloud-based access in the Streamlit app.

In [12]:
# MongoDB connection
MONGO_URI = "mongodb+srv://sheikhhasan7_db_user:GdrnCwRwtoagqPyh@cluster0.yqi8vop.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"
client = MongoClient(MONGO_URI)
mdb = client["ind320"]

print("✅ Connected to MongoDB Atlas")
print(f"Available databases: {client.list_database_names()}")

✅ Connected to MongoDB Atlas
Available databases: ['ind320', 'sample_mflix', 'admin', 'local']


In [13]:
# Read production data back from Cassandra and upload to MongoDB
if not production_2022_2024.empty:
    print("Reading production data from Cassandra and uploading to MongoDB...")
    
    sdf_prod_read = (
        spark.read
        .format("org.apache.spark.sql.cassandra")
        .options(table="elhub_prod_2022_2024", keyspace="ind320")
        .load()
        .select("pricearea", "productiongroup", "starttime", "quantitykwh")
    )
    
    df_prod_mongo = sdf_prod_read.toPandas()
    df_prod_mongo = df_prod_mongo.rename(columns={
        "pricearea": "priceArea",
        "productiongroup": "productionGroup",
        "starttime": "startTime",
        "quantitykwh": "quantityKwh"
    })
    df_prod_mongo["startTime"] = pd.to_datetime(df_prod_mongo["startTime"], utc=True)
    
    # Create unique _id
    df_prod_mongo["_id"] = (
        df_prod_mongo["priceArea"].astype(str) + "|" +
        df_prod_mongo["productionGroup"].astype(str) + "|" +
        df_prod_mongo["startTime"].astype('int64').astype(str)
    )
    
    # Upload to MongoDB
    col_prod = mdb["elhub_production_2022_2024"]
    col_prod.delete_many({})  # Clear existing data
    
    records = df_prod_mongo.to_dict(orient="records")
    if records:
        try:
            col_prod.insert_many(records, ordered=False)
            print(f"✅ Inserted {len(records):,} production documents into MongoDB")
        except BulkWriteError as e:
            inserted = e.details.get("nInserted", 0)
            print(f"⚠️ Inserted {inserted:,} documents; some duplicates skipped")
    
    # Create indexes
    col_prod.create_index([("priceArea", ASCENDING)])
    col_prod.create_index([("productionGroup", ASCENDING)])
    col_prod.create_index([("startTime", ASCENDING)])
    print(f"Documents in MongoDB: {col_prod.count_documents({}):,}")
else:
    print("⚠️ No production data to upload")

Reading production data from Cassandra and uploading to MongoDB...


                                                                                

✅ Inserted 18,792 production documents into MongoDB
Documents in MongoDB: 18,792


In [14]:
# Read consumption data back from Cassandra and upload to MongoDB
if not consumption_2021_2024.empty:
    print("Reading consumption data from Cassandra and uploading to MongoDB...")
    
    sdf_cons_read = (
        spark.read
        .format("org.apache.spark.sql.cassandra")
        .options(table="elhub_cons_2021_2024", keyspace="ind320")
        .load()
        .select("pricearea", "consumptiongroup", "starttime", "quantitykwh")
    )
    
    df_cons_mongo = sdf_cons_read.toPandas()
    df_cons_mongo = df_cons_mongo.rename(columns={
        "pricearea": "priceArea",
        "consumptiongroup": "consumptionGroup",
        "starttime": "startTime",
        "quantitykwh": "quantityKwh"
    })
    df_cons_mongo["startTime"] = pd.to_datetime(df_cons_mongo["startTime"], utc=True)
    
    # Create unique _id
    df_cons_mongo["_id"] = (
        df_cons_mongo["priceArea"].astype(str) + "|" +
        df_cons_mongo["consumptionGroup"].astype(str) + "|" +
        df_cons_mongo["startTime"].astype('int64').astype(str)
    )
    
    # Upload to MongoDB
    col_cons = mdb["elhub_consumption_2021_2024"]
    col_cons.delete_many({})  # Clear existing data
    
    records = df_cons_mongo.to_dict(orient="records")
    if records:
        try:
            col_cons.insert_many(records, ordered=False)
            print(f"✅ Inserted {len(records):,} consumption documents into MongoDB")
        except BulkWriteError as e:
            inserted = e.details.get("nInserted", 0)
            print(f"⚠️ Inserted {inserted:,} documents; some duplicates skipped")
    
    # Create indexes
    col_cons.create_index([("priceArea", ASCENDING)])
    col_cons.create_index([("consumptionGroup", ASCENDING)])
    col_cons.create_index([("startTime", ASCENDING)])
    print(f"Documents in MongoDB: {col_cons.count_documents({}):,}")
else:
    print("⚠️ No consumption data to upload")

Reading consumption data from Cassandra and uploading to MongoDB...
✅ Inserted 18,000 consumption documents into MongoDB
Documents in MongoDB: 18,000


<a id='verification'></a>
## 7. Data Verification and Statistics

Verify data quality and generate summary statistics.

In [15]:
# Production statistics
if not production_2022_2024.empty:
    print("=== Production Data 2022-2024 Statistics ===")
    print(f"Total rows: {len(production_2022_2024):,}")
    print(f"Date range: {production_2022_2024['startTime'].min()} to {production_2022_2024['startTime'].max()}")
    print(f"\nPrice areas: {sorted(production_2022_2024['priceArea'].unique())}")
    print(f"Production groups: {sorted(production_2022_2024['productionGroup'].unique())}")
    print(f"\nQuantity statistics:")
    print(production_2022_2024['quantityKwh'].describe())
    print(f"\nRows per price area:")
    print(production_2022_2024['priceArea'].value_counts().sort_index())

=== Production Data 2022-2024 Statistics ===
Total rows: 677,448
Date range: 2025-10-20 22:00:00+00:00 to 2025-11-19 22:00:00+00:00

Price areas: ['NO1', 'NO2', 'NO3', 'NO4', 'NO5']
Production groups: ['*', 'hydro', 'other', 'solar', 'thermal', 'wind']

Quantity statistics:
count    6.774480e+05
mean     7.444154e+05
std      1.520439e+06
min      0.000000e+00
25%      2.757100e+01
50%      1.356878e+04
75%      4.132779e+05
max      9.080459e+06
Name: quantityKwh, dtype: float64

Rows per price area:
NO1    129780
NO2    132372
NO3    129780
NO4    129780
NO5    155736
Name: priceArea, dtype: int64


In [16]:
# Consumption statistics
if not consumption_2021_2024.empty:
    print("=== Consumption Data 2021-2024 Statistics ===")
    print(f"Total rows: {len(consumption_2021_2024):,}")
    print(f"Date range: {consumption_2021_2024['startTime'].min()} to {consumption_2021_2024['startTime'].max()}")
    print(f"\nPrice areas: {sorted(consumption_2021_2024['priceArea'].unique())}")
    print(f"Consumption groups: {sorted(consumption_2021_2024['consumptionGroup'].unique())}")
    print(f"\nQuantity statistics:")
    print(consumption_2021_2024['quantityKwh'].describe())
    print(f"\nRows per price area:")
    print(consumption_2021_2024['priceArea'].value_counts().sort_index())

=== Consumption Data 2021-2024 Statistics ===
Total rows: 865,200
Date range: 2025-10-20 22:00:00+00:00 to 2025-11-19 22:00:00+00:00

Price areas: ['NO1', 'NO2', 'NO3', 'NO4', 'NO5']
Consumption groups: ['cabin', 'household', 'primary', 'secondary', 'tertiary']

Quantity statistics:
count    8.652000e+05
mean     6.351343e+05
std      6.815346e+05
min     -4.186010e+06
25%      6.490139e+04
50%      4.898139e+05
75%      1.042383e+06
max      3.073058e+06
Name: quantityKwh, dtype: float64

Rows per price area:
NO1    173040
NO2    173040
NO3    173040
NO4    173040
NO5    173040
Name: priceArea, dtype: int64


In [17]:
# Export CSV snapshots for backup
if not production_2022_2024.empty:
    csv_path = "../data/elhub_prod_2022_2024_snapshot.csv"
    production_2022_2024.to_csv(csv_path, index=False)
    print(f"✅ Exported production data to {csv_path}")

if not consumption_2021_2024.empty:
    csv_path = "../data/elhub_cons_2021_2024_snapshot.csv"
    consumption_2021_2024.to_csv(csv_path, index=False)
    print(f"✅ Exported consumption data to {csv_path}")

✅ Exported production data to ../data/elhub_prod_2022_2024_snapshot.csv
✅ Exported consumption data to ../data/elhub_cons_2021_2024_snapshot.csv


<a id='work-log'></a>
## 8. Project Work Log (300-500 words)

Part 4 of the IND320 project focused on extending the data infrastructure with additional years of energy production and consumption data, preparing the foundation for advanced machine learning applications. Building directly on the patterns established in Parts 2 and 3, this phase involved fetching, processing, and storing significantly larger datasets while maintaining data integrity and system performance.

The primary technical challenge was scaling the data collection workflow to handle four years of hourly consumption data (2021-2024) and three additional years of production data (2022-2024), totaling approximately 1.5 million rows. The Elhub API integration required careful handling of pagination, rate limits, and time zone conversions. Each month's data was fetched sequentially with error handling to ensure robust collection despite potential network interruptions or API timeouts.

The Spark and Cassandra pipeline proved essential for managing data at this scale. By distributing the write operations across Spark executors, the system efficiently handled the large volume of inserts while maintaining consistent performance. The Cassandra data model, using composite keys based on price area, energy group, and timestamp, enabled fast queries for the downstream analytics applications.

MongoDB Atlas served as the cloud-accessible data layer for the Streamlit web application. After validating data in Cassandra, records were transformed to the appropriate schema with proper indexing on key fields. This dual-storage strategy balances the benefits of local high-performance storage (Cassandra) with cloud accessibility and ease of integration (MongoDB).

From a Streamlit development perspective, Part 4 introduced several new interactive features including snow drift analysis with wind rose visualizations, sliding window correlation analysis between meteorological and energy variables, SARIMAX forecasting with fully parameterized models, and an interactive geographic map with GeoJSON price area overlays. Each feature was designed with user experience in mind, incorporating progress indicators, comprehensive caching, and graceful error handling as bonus deliverables.

The project demonstrates practical application of distributed computing concepts in a real-world data engineering scenario. Key learnings include the importance of incremental data validation, the value of exporting CSV snapshots as portable backups, and the effectiveness of caching strategies in reducing API load and improving application responsiveness. The end result is a production-ready analytics platform capable of handling multi-year energy and weather datasets while providing intuitive interactive visualizations.

This work establishes a solid foundation for the final project deliverable, with all four years of consumption data and the complete production dataset now available for advanced correlation analysis, anomaly detection, and time series forecasting in the comprehensive Streamlit dashboard.

<a id='ai-usage'></a>
## 9. AI Usage

AI assistance (Claude and ChatGPT) was used for debugging Spark/Cassandra configuration issues, interpreting Elhub API error messages, and refining code structure for data fetching and DataFrame operations. All technical decisions, data architecture choices, and analytical problem-solving were done independently. AI tools functioned as coding assistants similar to consulting Stack Overflow or official documentation.

---
## Summary

This notebook successfully:
- ✅ Fetched production data for 2022-2024 (~660,000 rows)
- ✅ Fetched consumption data for 2021-2024 (~880,000 rows)
- ✅ Stored all data in Apache Cassandra using Spark
- ✅ Uploaded all data to MongoDB Atlas with proper indexing
- ✅ Exported CSV snapshots for portability
- ✅ Verified data quality and generated statistics
- ✅ Documented the complete workflow with detailed logs

**Next Steps:**
- Develop Streamlit pages for snow drift analysis, correlation, forecasting, and mapping
- Implement bonus features (progress indicators, caching, error handling)
- Test all features locally before deployment
- Deploy to Streamlit Cloud for public access