Github: https://github.com/Hakonandreas/HakonsApp

Streamlit: https://hakond2dproject.streamlit.app

## Log

Also this time, as all the other parts, I have tried to follow the working car principle, to make sure I always had a working code. I started by copying my connection to Cassandra and MongoDB from my previous tasks. I then changed my API connection from 2021, to 2021-2024. I found it the best to fetch all the data from scratch, to make the indexing easier. Then I sent everything to Cassandra, using spark, which this time took a really long time. I then fetched the columns I needed and sent them to MongoDB. Everything alligning with what has been done i previous tasks. 

I started by changing my function that retrieves data fom MongoDB through my api, so it also could get consumption data. Then I struggled for a long time, since it made my whole streamlit page crash, everytime i tried to fetch the elhub data. After a while I got it up and running by splitting my elhub function into one for production and one for consumption. 

Then i started by turning everything from matplotlib to plotly, that I did not change earlier, before I fixed the comments I got from the last felivery. I then got the geojsn file and started working on the map. I tried using different styles, but ended with folium. I struggled a bit with the colors matching the functionality, but with some help with AI I managed in the end. I then put my snowdrift file up and made that one work using session state for the click. After it worked i merged it into my map page, as I found it the most intuitive. 

Then I worked with the SWC before starting on the SARIMAX. I struggeled with the SARIMAX, and therefore colaborated with a fellow student to make our codes work. Then I selected the bonus task to also plot the monthly snow drift. I found it the most intuitive to plot them as a common line plot under. 

## AI usage
First, I generally have copilot enabled in VS code to efficiently write code. In addition, I used ChatGPT to clarify concepts, create parts of the code and debug errors. I made sure to be actively involved in the code, and only used AI to assist me in the process. I also made sure to review all code generated by AI. 

More concretely tasks where I used AI was to understand what was happening when my streamlit constantly crashed. Then I argued a lot with ChatGPT to get my map working. My last point were I used AI was to fix my own code, was when working with my fellow student, to merge the code that worked for each of us, so that we could compile it into a code that included everything that worked.

In [1]:
# Imports
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, row_number, lit, col, to_date, expr
from pyspark.sql.window import Window
import requests
import pandas as pd
from datetime import datetime, timedelta
from cassandra.cluster import Cluster
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
import toml
import time



In [2]:
os.environ["HADOOP_HOME"] = "C:/Hadoop/hadoop-3.3.1"
os.environ["PYSPARK_HADOOP_VERSION"] = "without"
os.environ["PYSPARK_PYTHON"] = "python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python"
os.environ['HADOOP_USER_NAME'] = 'hakhol'


os.environ["JAVA_HOME"] = "/Library/Java/JavaVirtualMachines/temurin-11.jdk/Contents/Home"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]

In [3]:
spark = SparkSession.builder.appName('Elhub_SparkCassandraApp').\
    config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.5.1').\
    config('spark.cassandra.connection.host', 'localhost').\
    config('spark.sql.extensions', 'com.datastax.spark.connector.CassandraSparkExtensions').\
    config('spark.sql.catalog.mycatalog', 'com.datastax.spark.connector.datasource.CassandraCatalog').\
    config('spark.driver.host', '127.0.0.1').\
    config('spark.cassandra.connection.port', '9042').getOrCreate()


:: loading settings :: url = jar:file:/Users/hakhol/miniconda3/envs/D2D_env/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/hakhol/.ivy2/cache
The jars for the packages stored in: /Users/hakhol/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-55635418-1193-426c-a608-797c6ba53deb;1.0
	confs: [default]
	found com.datastax.spark#spark-cassandra-connector_2.12;3.5.1 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.5.1 in central
	found org.scala-lang.modules#scala-collection-compat_2.12;2.11.0 in central
	found org.apache.cassandra#java-driver-core-shaded;4.18.1 in central
	found com.datastax.oss#native-protocol;1.5.1 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.4.1 in central
	found org.slf4j#slf4j-api;1.7.26 in central
	found io.dropwizard.metrics#metrics-core;4.1.18 in central
	found org.hdrhistogram#HdrHistogram;2.1.12 in central
	found org.reactivestreams#reactive-streams;1

In [4]:
# Connecting to Cassandra
cluster = Cluster(['localhost'], port=9042)
session = cluster.connect()

# Create keyspace (safe to run multiple times)
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS elhub
    WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':1};
""")

time.sleep(1)  # wait for metadata propagation

# Drop and create table using fully-qualified name
#session.execute("DROP TABLE IF EXISTS elhub.production;")
session.execute("""
    CREATE TABLE IF NOT EXISTS elhub.production (
        pricearea text,
        productiongroup text,
        starttime timestamp,
        ind uuid,
        endtime timestamp,
        lastupdatedtime timestamp,
        quantitykwh double,
        PRIMARY KEY ((pricearea, productiongroup), starttime, ind)
    );
""")



<cassandra.cluster.ResultSet at 0x11c758200>

In [5]:
# Connection URI
secrets = toml.load("../../secrets.toml")
uri = secrets['mongodb']['uri']

# Create a new client and connect to the server
client = MongoClient(uri, server_api=ServerApi('1'))

# Send a ping to confirm a successful connection
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)

Pinged your deployment. You successfully connected to MongoDB!


### API Connection for Production

In [6]:
def fetch_month(year: int, month: int, tz_offset_hours: int = 2, session: requests.Session = None):
    """Fetch production data for a specific month and year from Elhub API."""
    
    session = session or requests.Session()

    # Start and end of the month
    start = datetime(year, month, 1)
    end = datetime(year + (month == 12), (month % 12) + 1, 1) - timedelta(seconds=1)

    start_str = start.strftime(f"%Y-%m-%dT%H:%M:%S+02:00")
    end_str = end.strftime(f"%Y-%m-%dT%H:%M:%S+02:00")

    url = "https://api.elhub.no/energy-data/v0/price-areas"
    params = {
        "dataset": "PRODUCTION_PER_GROUP_MBA_HOUR",
        "startDate": start_str,
        "endDate": end_str,
    }

    try:
        resp = session.get(url, params=params, timeout=30)
        resp.raise_for_status()
        data = resp.json()
    except Exception as e:
        print(f"Error fetching {year}-{month:02d}: {e}")
        return pd.DataFrame()  # return empty DF on error

    # Extract production records
    all_records = []
    for area in data.get("data", []):
        records = area.get("attributes", {}).get("productionPerGroupMbaHour", [])
        if records:
            all_records.extend(records)

    df = pd.DataFrame(all_records)
    print(f"Fetched {len(df)} records for {year}-{month:02d}")
    return df


# Fetch all months from 2021 to 2024 using a Session and skip empty dfs before concat
dfs = []
with requests.Session() as sess:
    for year in range(2021, 2025):   # loop over 2021, 2022, 2023, 2024
        for month in range(1, 13):   # loop over all months
            monthly_df = fetch_month(year, month, session=sess)
            if not monthly_df.empty:
                dfs.append(monthly_df)

if dfs:
    df_all = pd.concat(dfs, ignore_index=True)
else:
    df_all = pd.DataFrame()

print(f"\nTotal records fetched: {len(df_all)}")
print(df_all.head())


Fetched 17856 records for 2021-01
Fetched 16128 records for 2021-02
Fetched 17832 records for 2021-03
Fetched 17280 records for 2021-04
Fetched 17856 records for 2021-05
Fetched 17976 records for 2021-06
Fetched 18600 records for 2021-07
Fetched 18600 records for 2021-08
Fetched 18000 records for 2021-09
Fetched 18625 records for 2021-10
Fetched 18000 records for 2021-11
Fetched 18600 records for 2021-12
Fetched 18600 records for 2022-01
Fetched 16800 records for 2022-02
Fetched 18575 records for 2022-03
Fetched 18000 records for 2022-04
Fetched 18600 records for 2022-05
Fetched 18000 records for 2022-06
Fetched 18600 records for 2022-07
Fetched 18600 records for 2022-08
Fetched 18000 records for 2022-09
Fetched 18625 records for 2022-10
Fetched 18000 records for 2022-11
Fetched 18600 records for 2022-12
Fetched 18600 records for 2023-01
Fetched 16800 records for 2023-02
Fetched 18575 records for 2023-03
Fetched 18000 records for 2023-04
Fetched 18600 records for 2023-05
Fetched 18000 

In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, expr

# ---- Create Spark DataFrame ----
spark_df = spark.createDataFrame(df_all)

# ---- Rename columns to match Cassandra ----
col_map = {
    "startTime": "starttime",
    "endTime": "endtime",
    "lastUpdatedTime": "lastupdatedtime",
    "priceArea": "pricearea",
    "productionGroup": "productiongroup",
    "quantityKwh": "quantitykwh",
}

for old, new in col_map.items():
    spark_df = spark_df.withColumnRenamed(old, new)

# ---- Generate UUID per row (FAST, distributed, no shuffle) ----
spark_df = spark_df.withColumn("ind", expr("uuid()"))

# ---- Convert timestamps / ISO strings → timestamp ----
spark_df = (
    spark_df
        .withColumn("starttime", to_timestamp("starttime"))
        .withColumn("endtime", to_timestamp("endtime"))
        .withColumn("lastupdatedtime", to_timestamp("lastupdatedtime"))
)

# ---- Repartition by Cassandra partition key ----
# Matches PRIMARY KEY ((pricearea, productiongroup), starttime, ind)
spark_df = spark_df.repartition("pricearea", "productiongroup")

# ---- Coalesce reduces number of writers → faster for Cassandra ----
spark_df = spark_df.coalesce(32)   # tune based on cluster size

# ---- Write to Cassandra ----
(
    spark_df.write
        .format("org.apache.spark.sql.cassandra")
        .mode("append")
        .options(table="production", keyspace="elhub")
        .option("spark.cassandra.output.concurrent.writes", "1")   # single writer
        .option("spark.cassandra.output.batch.size.rows", "1")     # tiny batches
        .option("spark.cassandra.output.throughput_mb_per_sec", "1")  # throttle write speed
        .save()
)



print("✅ Data successfully written to Cassandra!")



25/11/15 17:13:34 WARN DeprecatedConfigParameter: spark.cassandra.output.throughput_mb_per_sec is deprecated (DSE 6.0.0) and has been automatically replaced with parameter spark.cassandra.output.throughputMBPerSec. 
25/11/15 17:13:34 WARN DeprecatedConfigParameter: spark.cassandra.output.throughput_mb_per_sec is deprecated (DSE 6.0.0) and has been automatically replaced with parameter spark.cassandra.output.throughputMBPerSec. 
25/11/15 17:13:35 WARN DeprecatedConfigParameter: spark.cassandra.output.throughput_mb_per_sec is deprecated (DSE 6.0.0) and has been automatically replaced with parameter spark.cassandra.output.throughputMBPerSec. 
25/11/15 17:13:37 WARN TaskSetManager: Stage 6 contains a task of very large size (23495 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

✅ Data successfully written to Cassandra!


In [17]:
# Load table from Cassandra
spark_df = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="production", keyspace="elhub") \
    .load()

# Select only the columns you want
selected_df = spark_df.select(
    "pricearea",
    "productiongroup",
    "starttime",
    "quantitykwh"
)

# Show a few rows
selected_df.show()

                                                                                

+---------+---------------+-------------------+-----------+
|pricearea|productiongroup|          starttime|quantitykwh|
+---------+---------------+-------------------+-----------+
|      NO1|          other|2021-01-01 00:00:00|        0.0|
|      NO1|          other|2021-01-01 00:00:00|        0.0|
|      NO1|          other|2021-01-01 01:00:00|        0.0|
|      NO1|          other|2021-01-01 01:00:00|        0.0|
|      NO1|          other|2021-01-01 02:00:00|        0.0|
|      NO1|          other|2021-01-01 02:00:00|        0.0|
|      NO1|          other|2021-01-01 03:00:00|        0.0|
|      NO1|          other|2021-01-01 03:00:00|        0.0|
|      NO1|          other|2021-01-01 04:00:00|        0.0|
|      NO1|          other|2021-01-01 04:00:00|        0.0|
|      NO1|          other|2021-01-01 05:00:00|        0.0|
|      NO1|          other|2021-01-01 05:00:00|        0.0|
|      NO1|          other|2021-01-01 06:00:00|        0.0|
|      NO1|          other|2021-01-01 06

In [19]:
# Connect to MongoDB
db = client["elhub_db"]
collection = db["production"]

# Drop the existing collection
collection.drop()
print("Dropped existing MongoDB collection.")

# Convert Spark DataFrame to Pandas
mongo_df = selected_df.toPandas()

# Insert everything into MongoDB
records = mongo_df.to_dict(orient='records')
result = collection.insert_many(records)
print(f"Inserted {len(result.inserted_ids)} documents into MongoDB")

Dropped existing MongoDB collection.


                                                                                

Inserted 1696647 documents into MongoDB


### API Connection for Consumption

In [6]:
#session.execute("DROP TABLE IF EXISTS elhub.consumption;")
session.execute("""
    CREATE TABLE IF NOT EXISTS elhub.consumption (
        pricearea text,
        consumptiongroup text,
        starttime timestamp,
        ind uuid,
        endtime timestamp,
        lastupdatedtime timestamp,
        meteringPointCount int,
        quantitykwh double,
        PRIMARY KEY ((pricearea, consumptiongroup), starttime, ind)
    );
""")

<cassandra.cluster.ResultSet at 0x108cd2870>

In [11]:
def fetch_month(year: int, month: int, tz_offset_hours: int = 2, session: requests.Session = None):
    """Fetch production data for a specific month and year from Elhub API."""
    
    session = session or requests.Session()

    # Start and end of the month
    start = datetime(year, month, 1)
    end = datetime(year + (month == 12), (month % 12) + 1, 1) - timedelta(seconds=1)

    start_str = start.strftime(f"%Y-%m-%dT%H:%M:%S+02:00")
    end_str = end.strftime(f"%Y-%m-%dT%H:%M:%S+02:00")

    url = "https://api.elhub.no/energy-data/v0/price-areas"
    params = {
        "dataset": "CONSUMPTION_PER_GROUP_MBA_HOUR",
        "startDate": start_str,
        "endDate": end_str,
    }

    try:
        resp = session.get(url, params=params, timeout=30)
        resp.raise_for_status()
        data = resp.json()
    except Exception as e:
        print(f"Error fetching {year}-{month:02d}: {e}")
        return pd.DataFrame()  # return empty DF on error

    # Extract production records
    all_records = []
    for area in data.get("data", []):
        records = area.get("attributes", {}).get("consumptionPerGroupMbaHour", [])
        if records:
            all_records.extend(records)

    df = pd.DataFrame(all_records)
    print(f"Fetched {len(df)} records for {year}-{month:02d}")
    return df


# Fetch all months from 2021 to 2024 using a Session and skip empty dfs before concat
dfs = []
with requests.Session() as sess:
    for year in range(2021, 2025):   # loop over 2021, 2022, 2023, 2024
        for month in range(1, 13):   # loop over all months
            monthly_df = fetch_month(year, month, session=sess)
            if not monthly_df.empty:
                dfs.append(monthly_df)

if dfs:
    df_all_con = pd.concat(dfs, ignore_index=True)
else:
    df_all_con = pd.DataFrame()

print(f"\nTotal records fetched: {len(df_all_con)}")
print(df_all_con.head())



Fetched 18600 records for 2021-01
Fetched 16800 records for 2021-02
Fetched 18575 records for 2021-03
Fetched 18000 records for 2021-04
Fetched 18600 records for 2021-05
Fetched 18000 records for 2021-06
Fetched 18600 records for 2021-07
Fetched 18600 records for 2021-08
Fetched 18000 records for 2021-09
Fetched 18625 records for 2021-10
Fetched 18000 records for 2021-11
Fetched 18600 records for 2021-12
Fetched 18600 records for 2022-01
Fetched 16800 records for 2022-02
Fetched 18575 records for 2022-03
Fetched 18000 records for 2022-04
Fetched 18600 records for 2022-05
Fetched 18000 records for 2022-06
Fetched 18600 records for 2022-07
Fetched 18600 records for 2022-08
Fetched 18000 records for 2022-09
Fetched 18625 records for 2022-10
Fetched 18000 records for 2022-11
Fetched 18600 records for 2022-12
Fetched 18600 records for 2023-01
Fetched 16800 records for 2023-02
Fetched 18575 records for 2023-03
Fetched 18000 records for 2023-04
Fetched 18600 records for 2023-05
Fetched 18000 

In [12]:
# ---- Create Spark DataFrame ----
con_spark_df = spark.createDataFrame(df_all_con)

# ---- Rename columns to match Cassandra consumption table ----
col_map = {
    "startTime": "starttime",
    "endTime": "endtime",
    "lastUpdatedTime": "lastupdatedtime",
    "priceArea": "pricearea",
    "consumptionGroup": "consumptiongroup",
    "meteringPointCount": "meteringpointcount", 
    "quantityKwh": "quantitykwh",
}

for old, new in col_map.items():
    con_spark_df = con_spark_df.withColumnRenamed(old, new)

# ---- Generate UUID per row ----
con_spark_df = con_spark_df.withColumn("ind", expr("uuid()"))

# ---- Convert timestamps / ISO strings → timestamp ----
con_spark_df = (
    con_spark_df
        .withColumn("starttime", to_timestamp("starttime"))
        .withColumn("endtime", to_timestamp("endtime"))
        .withColumn("lastupdatedtime", to_timestamp("lastupdatedtime"))
)

# ---- Repartition by Cassandra partition key for consumption ----
# Matches PRIMARY KEY ((pricearea, consumptiontype), starttime, ind)
con_spark_df = con_spark_df.repartition("pricearea", "consumptiongroup")

# ---- Coalesce reduces number of writers → faster for Cassandra ----
con_spark_df = con_spark_df.coalesce(32)   # adjust based on cluster size

# ---- Write to Cassandra ----
(
    con_spark_df.write
        .format("org.apache.spark.sql.cassandra")
        .mode("append")
        .options(table="consumption", keyspace="elhub")  # changed table name
        .option("spark.cassandra.output.concurrent.writes", "1")   # single writer
        .option("spark.cassandra.output.batch.size.rows", "1")     # tiny batches
        .option("spark.cassandra.output.throughput_mb_per_sec", "1")  # throttle write speed
        .save()
)

print("✅ Consumption data successfully written to Cassandra!")


25/11/16 12:27:15 WARN DeprecatedConfigParameter: spark.cassandra.output.throughput_mb_per_sec is deprecated (DSE 6.0.0) and has been automatically replaced with parameter spark.cassandra.output.throughputMBPerSec. 
25/11/16 12:27:15 WARN DeprecatedConfigParameter: spark.cassandra.output.throughput_mb_per_sec is deprecated (DSE 6.0.0) and has been automatically replaced with parameter spark.cassandra.output.throughputMBPerSec. 
25/11/16 12:27:16 WARN DeprecatedConfigParameter: spark.cassandra.output.throughput_mb_per_sec is deprecated (DSE 6.0.0) and has been automatically replaced with parameter spark.cassandra.output.throughputMBPerSec. 
25/11/16 12:27:17 WARN TaskSetManager: Stage 6 contains a task of very large size (24910 KiB). The maximum recommended task size is 1000 KiB.

✅ Consumption data successfully written to Cassandra!


                                                                                

In [15]:
# Load table from Cassandra
consumption_spark_df = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="consumption", keyspace="elhub") \
    .load()

# Select only the columns you want
con_selected_df = consumption_spark_df.select(
    "pricearea",
    "consumptiongroup",
    "starttime",
    "quantitykwh"
)

# Show a few rows
con_selected_df.show()

+---------+----------------+-------------------+-----------+
|pricearea|consumptiongroup|          starttime|quantitykwh|
+---------+----------------+-------------------+-----------+
|      NO3|           cabin|2021-01-01 00:00:00|   72063.35|
|      NO3|           cabin|2021-01-01 01:00:00|  71502.945|
|      NO3|           cabin|2021-01-01 02:00:00|   69903.34|
|      NO3|           cabin|2021-01-01 03:00:00|    68603.0|
|      NO3|           cabin|2021-01-01 04:00:00|   68606.54|
|      NO3|           cabin|2021-01-01 05:00:00|  68929.484|
|      NO3|           cabin|2021-01-01 06:00:00|   70096.02|
|      NO3|           cabin|2021-01-01 07:00:00|   71398.77|
|      NO3|           cabin|2021-01-01 08:00:00|   73275.54|
|      NO3|           cabin|2021-01-01 09:00:00|   76973.98|
|      NO3|           cabin|2021-01-01 10:00:00|    81190.1|
|      NO3|           cabin|2021-01-01 11:00:00|    82732.9|
|      NO3|           cabin|2021-01-01 12:00:00|   81820.91|
|      NO3|           ca

In [16]:
# Connect to MongoDB
db = client["elhub_db"]
collection_con = db["consumption"]

# Drop the existing collection
#collection_con.drop()
#print("Dropped existing MongoDB consumption collection.")

# Convert Spark DataFrame to Pandas
con_df = con_selected_df.toPandas()

# Insert everything into MongoDB
records_con = con_df.to_dict(orient='records')
result_con = collection_con.insert_many(records_con)

print(f"Inserted {len(result_con.inserted_ids)} documents into MongoDB")


                                                                                

Inserted 876600 documents into MongoDB
