## Project part 4 

## Links
- Github repository:https://github.com/Gianna-liu/IND320_dashboard_GegeLiu

- Streamlit App: https://weatheringwithyou-gegeliu.streamlit.app/

## Development Log
For Part 4 of the assignment, I felt a bit lost at the beginning because I haven’t had much experience working with time-series data. Most of my previous work involved tabular data, and later some image data. This part required developing four features, but since I had already gained some understanding of sliding-window correlation and model prediction from the lectures, I decided to start with those two components.

I began by making sure I fully understood the model’s input, output workflow, and frequently referred to and debugged the example code provided in the lecture. After getting about 60% of those two tasks done, I moved on to the spatial analysis part when mainly implementing the map features such as snow drift calculation and drawing the wind rose chart. At first, I could only mimic the provided code, but as I became more familiar with it, I was able to complete the bonus task of computing monthly snow drift as well.

For the time-series part specifically, selecting appropriate parameters was quite challenging, and it’s something I still need to improve on. In developing the Streamlit app, I also spent a significant amount of time designing the subpages and managing how data is passed using session_state.

Once all the new features were implemented, I started refactoring the entire app. This also took quite a bit of time, as I kept discovering areas that could be optimized. Eventually, I completed a version that I’m genuinely satisfied with.

### Sliding window correlation between Meteorology and energy production

- In January 2024, Oslo experienced extreme cold weather (temperatures dropped to -27 °C). I used a 70-hour sliding window to study the relationship between outdoor temperature and household electricity consumption. The immediate correlation between the two was already strong, but it significantly increased when the temperature lagged by 6–12 hours.

- This makes sense: when the air suddenly becomes very cold, buildings cool slowly due to thermal inertia. Heating systems take several hours to start up, so there is a delay in the response of electricity consumption. Therefore, the highest correlation typically occurs with a lag of around 10 hours.

- Unlike household electricity consumption, for thermal power generation, the highest correlation with temperature has almost no lag. Thermal power generation does not have a significant delayed response, and its peak correlation typically occurs with a lag of 0 hours.

### Bonus part

- For the bonus task, I chose to work on calculating the monthly snow drift and plotting it together with the yearly snow drift.

## AI usage


During this assignment, I used GitHub Copilot in my VS Code environment to assist with code review, debugging, and improving overall code structure.

I asked GPT to explain some of the lecture concepts in more detail, such as the SARIMAX model. However, most of my AI usage was focused on helping with the web development process, including Streamlit UI design and styling. I also used it to explore how to optimize query performance when working with the MongoDB connection.

## Tasks1

### Library imports

In [None]:
EXECUTE_API = False  # Avoid execute api when convert into html file
EXECUTE_localDB = False   # Avoid execute database when convert into html file
EXECUTE_remoteDB = False 

import json
import os
from pyjstat import pyjstat
import requests
from cassandra.cluster import Cluster

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, to_timestamp, year, count,month

import pandas as pd
import pytz
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.io as pio
pio.renderers.default = "notebook_connected"

from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi

### Step1: Load data with API and insert the data to Cassandra with Spark

#### 1.Preparation with Spark

##### Set environment variables for PySpark

In [2]:
os.environ["JAVA_HOME"] = "/opt/homebrew/Cellar/openjdk@17/17.0.17/libexec/openjdk.jdk/Contents/Home"
os.environ["PYSPARK_PYTHON"] = "python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python"

##### Create a spark session to transfer data

In [3]:
spark = SparkSession.builder.appName('SparkCassandraApp').\
    config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.5.1').\
    config('spark.cassandra.connection.host', 'localhost').\
    config('spark.sql.extensions', 'com.datastax.spark.connector.CassandraSparkExtensions').\
    config('spark.sql.catalog.mycatalog', 'com.datastax.spark.connector.datasource.CassandraCatalog').\
    config('spark.cassandra.connection.port', '9042').getOrCreate()

:: loading settings :: url = jar:file:/opt/miniconda3/envs/D2D_project/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/liugege/.ivy2/cache
The jars for the packages stored in: /Users/liugege/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-937d2822-40c3-4847-bc1a-97d7ea1841f8;1.0
	confs: [default]
	found com.datastax.spark#spark-cassandra-connector_2.12;3.5.1 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.5.1 in central
	found org.scala-lang.modules#scala-collection-compat_2.12;2.11.0 in central
	found org.apache.cassandra#java-driver-core-shaded;4.18.1 in central
	found com.datastax.oss#native-protocol;1.5.1 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.4.1 in central
	found org.slf4j#slf4j-api;1.7.26 in central
	found io.dropwizard.metrics#metrics-core;4.1.18 in central
	found org.hdrhistogram#HdrHistogram;2.1.12 in central
	found org.reactivestreams#reactive-streams

#### 2.preparation in Cassandra

##### Connect to the Cassandra cluster from Python.

In [4]:
# Connecting to Cassandra
cluster = Cluster(['localhost'], port=9042)
session = cluster.connect()

##### Set up new keyspace

In [5]:
if EXECUTE_localDB:
    session.execute("CREATE KEYSPACE IF NOT EXISTS elnub WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };")

##### Create a table
- IF NOT EXISTS makes sure we do not overwrite existing tables

In [None]:
# # Create a new table (first time only)
# ##!For a composite primary key like PRIMARY KEY ((priceArea, productionGroup), startTime), you don’t need to create a separate combined column.
# if EXECUTE_localDB:
#     session.set_keyspace('elnub')
#     session.execute("DROP TABLE IF EXISTS elnub.production_data;")
#     session.execute("CREATE TABLE IF NOT EXISTS elnub.production_data (\
#         priceArea TEXT,\
#         productionGroup TEXT,\
#         startTime TIMESTAMP,\
#         quantityKwh DOUBLE,\
#         endTime TIMESTAMP,\
#         lastUpdatedTime TIMESTAMP,\
#         PRIMARY KEY ((priceArea, productionGroup), startTime));")

In [None]:
# # Create a new table (first time only)
# ###!For a composite primary key like PRIMARY KEY ((priceArea, productionGroup), startTime), you don’t need to create a separate combined column.
# if EXECUTE_localDB:
#     session.set_keyspace('elnub')
#     session.execute("DROP TABLE IF EXISTS elnub.consumption_data;")
#     session.execute("CREATE TABLE IF NOT EXISTS elnub.consumption_data (\
#         priceArea TEXT,\
#         consumptionGroup TEXT,\
#         startTime TIMESTAMP,\
#         quantityKwh DOUBLE,\
#         meteringPointCount DOUBLE,\
#         endTime TIMESTAMP,\
#         lastUpdatedTime TIMESTAMP,\
#         PRIMARY KEY ((priceArea, consumptionGroup), startTime));")

#### 3.Retrieve data with Elhub API

##### Create function to enhance code reusability

In [36]:
def get_period_start_end(year, m):
    '''
    This function is used to generate a whole month time interval 
    corresponding to the Norwegian time zone for the specified year and month.
    '''
    norway_tz = pytz.timezone("Europe/Oslo")
    
    start_date = norway_tz.localize(datetime(year, m, 1))
    if m == 12:
        end_date = norway_tz.localize(datetime(year + 1, 1, 1)) - timedelta(hours=1)
    else:
        end_date = norway_tz.localize(datetime(year, m + 1, 1)) - timedelta(hours=1)
    return start_date, end_date

In [37]:
def load_parse_data_fra_api(start_dt, end_dt, required_dataset, attr_val):
    '''
    This function is used to obtain production data for a defined time period from the Elhub API 
    and parse the data into a list.
    '''
    url = "https://api.elhub.no/energy-data/v0/price-areas"
    params = {
        "dataset": required_dataset,
        "startDate": start_dt.isoformat(),
        "endDate": end_dt.isoformat()
    }
    response = requests.get(url, params=params)
    if response.status_code != 200:
        print(f"Failed to get data for {start_dt} - {end_dt}: status {response.status_code}")
        return []
    
    data_json = response.json() # 
    parsed_data = []
    for data in data_json['data']:
        for item in data['attributes'][attr_val]:
            parsed_data.append(item)
    return parsed_data

In [38]:
def write_to_cassandra_ved_spark(data_list, df_col_list, table="production_data", keyspace="elnub"):
    '''
    This function writes a list of electricity production data to a local Cassandra database with spark.
    '''
    if not data_list:
        return

    df = pd.DataFrame(data_list)
    # define the low-case columns
    # df.columns = ['endtime','lastupdatedtime','pricearea','productiongroup','quantitykwh','starttime']
    df.columns = df_col_list
    # Use spark to insert data into cassandra
    spark.createDataFrame(df)\
        .write\
        .format("org.apache.spark.sql.cassandra")\
        .options(table=table, keyspace=keyspace)\
        .mode("append")\
        .save()

##### Append the production data from 2022 to 2024.


In [None]:
start_year = 2022
end_year = 2024
required_dataset = "PRODUCTION_PER_GROUP_MBA_HOUR"
attr_val = "productionPerGroupMbaHour"
col_list = ['endtime','lastupdatedtime','pricearea','productiongroup','quantitykwh','starttime']

if EXECUTE_API:
    for one_year in range(start_year, end_year + 1):
        print(f"Processing year: {one_year}")
        for m in range(1, 13):
            start_dt, end_dt = get_period_start_end(one_year, m)
            # Special handling for October DST change
            if m == 10:
                norway_tz = pytz.timezone("Europe/Oslo")
                '''
                The API limits retrieval to one month at a time, and October data initially failed 
                due to the Daylight Saving Time transition, which added an extra hour. 
                I resolved this by splitting October into two parts.
                '''
                end_first_part = norway_tz.localize(datetime(one_year, 10, 20, 23))
                parts = [(start_dt, end_first_part),
                        (norway_tz.localize(datetime(one_year, 10, 21, 0)),
                        norway_tz.localize(datetime(one_year, 10, 31, 23)))]
            else:
                parts = [(start_dt, end_dt)]
            
            for start_part, end_part in parts:
                data_list = load_parse_data_fra_api(start_part, end_part, required_dataset,attr_val)
                write_to_cassandra_ved_spark(data_list,col_list)
            
            print(f"Month {m} finished.")

Processing year: 2022


                                                                                

Month 1 finished.
Month 2 finished.
Month 3 finished.
Month 4 finished.
Month 5 finished.
Month 6 finished.
Month 7 finished.
Month 8 finished.
Month 9 finished.


                                                                                

Month 10 finished.
Month 11 finished.
Month 12 finished.
Processing year: 2023
Month 1 finished.
Month 2 finished.
Month 3 finished.
Month 4 finished.
Month 5 finished.
Month 6 finished.
Month 7 finished.
Month 8 finished.
Month 9 finished.
Month 10 finished.
Month 11 finished.
Month 12 finished.
Processing year: 2024
Month 1 finished.
Month 2 finished.
Month 3 finished.
Month 4 finished.
Month 5 finished.
Month 6 finished.
Month 7 finished.
Month 8 finished.
Month 9 finished.


                                                                                

Month 10 finished.
Month 11 finished.
Month 12 finished.


##### Load the consumption data from 2021 to 2024.


In [80]:
start_year = 2021
end_year = 2024
required_dataset = "CONSUMPTION_PER_GROUP_MBA_HOUR"
attr_val = "consumptionPerGroupMbaHour"
col_list = ['consumptiongroup','endtime','lastupdatedtime','meteringpointcount','pricearea','quantitykwh','starttime']
table_nm = "consumption_data"
keyspace = "elnub"

if EXECUTE_API:
    for one_year in range(start_year, end_year + 1):
        print(f"Processing year: {one_year}")
        for m in range(1, 13):
            start_dt, end_dt = get_period_start_end(one_year, m)
            # Special handling for October DST change
            if m == 10:
                norway_tz = pytz.timezone("Europe/Oslo")
                '''
                The API limits retrieval to one month at a time, and October data initially failed 
                due to the Daylight Saving Time transition, which added an extra hour. 
                I resolved this by splitting October into two parts.
                '''
                end_first_part = norway_tz.localize(datetime(one_year, 10, 20, 23))
                parts = [(start_dt, end_first_part),
                        (norway_tz.localize(datetime(one_year, 10, 21, 0)),
                        norway_tz.localize(datetime(one_year, 10, 31, 23)))]
            else:
                parts = [(start_dt, end_dt)]
        
            for start_part, end_part in parts:
                data_list = load_parse_data_fra_api(start_part, end_part, required_dataset, attr_val)
                write_to_cassandra_ved_spark(data_list,col_list,table_nm,keyspace)
        
            print(f"Month {m} finished.")

Processing year: 2021


                                                                                

Month 1 finished.
Month 2 finished.


                                                                                

Month 3 finished.


                                                                                

Month 4 finished.
Month 5 finished.
Month 6 finished.
Month 7 finished.
Month 8 finished.


                                                                                

Month 9 finished.
Month 10 finished.
Month 11 finished.
Month 12 finished.
Processing year: 2022
Month 1 finished.
Month 2 finished.
Month 3 finished.
Month 4 finished.
Month 5 finished.
Month 6 finished.


                                                                                

Month 7 finished.


                                                                                

Month 8 finished.
Month 9 finished.
Month 10 finished.
Month 11 finished.
Month 12 finished.
Processing year: 2023
Month 1 finished.
Month 2 finished.
Month 3 finished.


                                                                                

Month 4 finished.


                                                                                

Month 5 finished.
Month 6 finished.
Month 7 finished.
Month 8 finished.
Month 9 finished.
Month 10 finished.
Month 11 finished.
Month 12 finished.
Processing year: 2024
Month 1 finished.


                                                                                

Month 2 finished.
Month 3 finished.
Month 4 finished.
Month 5 finished.
Month 6 finished.


                                                                                

Month 7 finished.
Month 8 finished.


                                                                                

Month 9 finished.
Month 10 finished.
Month 11 finished.
Month 12 finished.


### Step2: Brifly explore data with Spark 

#### load data from cassandra to notebook

In [98]:
df_from_cassandra = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="production_data", keyspace="elnub") \
    .load()

df_from_cassandra.show(5)

+---------+---------------+-------------------+-------------------+-------------------+-----------+
|pricearea|productiongroup|          starttime|            endtime|    lastupdatedtime|quantitykwh|
+---------+---------------+-------------------+-------------------+-------------------+-----------+
|      NO2|          hydro|2021-01-01 00:00:00|2021-01-01 01:00:00|2024-12-20 10:35:40|  7245923.5|
|      NO2|          hydro|2021-01-01 01:00:00|2021-01-01 02:00:00|2024-12-20 10:35:40|  6750958.0|
|      NO2|          hydro|2021-01-01 02:00:00|2021-01-01 03:00:00|2024-12-20 10:35:40|  6070989.0|
|      NO2|          hydro|2021-01-01 03:00:00|2021-01-01 04:00:00|2024-12-20 10:35:40|  5851299.0|
|      NO2|          hydro|2021-01-01 04:00:00|2021-01-01 05:00:00|2024-12-20 10:35:40|  5812150.0|
+---------+---------------+-------------------+-------------------+-------------------+-----------+
only showing top 5 rows



#### Explore the dataset briefly

In [99]:
df_spark = df_from_cassandra.select("pricearea", "productiongroup", "starttime", "quantitykwh")
# Add year column
df_spark = df_spark.withColumn("year",year(to_timestamp("starttime")))
df_spark.groupBy("year").agg(
    sum("quantitykwh").alias("total_quantitykwh"),
    count("*").alias("row_count")
).orderBy("year").show(truncate=False)



+----+---------------------+---------+
|year|total_quantitykwh    |row_count|
+----+---------------------+---------+
|2021|1.569205538468405E11 |215033   |
|2022|1.4581949517553506E11|218675   |
|2023|1.5381816021331845E11|218675   |
|2024|1.570131566350712E11 |219275   |
+----+---------------------+---------+



                                                                                

There are not any null or duplicate value

In [None]:
df_clean = df_spark.dropna(subset=['starttime', 'quantitykwh', 'pricearea', 'productiongroup'])
df_clean = df_clean.dropDuplicates(['pricearea', 'productiongroup', 'starttime'])
df_clean.count()

                                                                                

871658

select the new data and drop the year column

In [103]:
df_spark_new = (df_spark
                .filter((df_spark.year >=2022) & (df_spark.year <=2024))
                .drop("year"))
df_spark_new.count()

                                                                                

656625

Did the same thing for the consumpition data

In [None]:
df_consump_from_cassandra = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="consumption_data", keyspace="elnub") \
    .load()

df_consump_from_cassandra.show(10)

+---------+----------------+-------------------+-------------------+-------------------+------------------+-----------+
|pricearea|consumptiongroup|          starttime|            endtime|    lastupdatedtime|meteringpointcount|quantitykwh|
+---------+----------------+-------------------+-------------------+-------------------+------------------+-----------+
|      NO5|       secondary|2021-01-01 00:00:00|2021-01-01 01:00:00|2024-12-20 10:35:40|            5984.0|  1094799.6|
|      NO5|       secondary|2021-01-01 01:00:00|2021-01-01 02:00:00|2024-12-20 10:35:40|            5984.0|  1099480.8|
|      NO5|       secondary|2021-01-01 02:00:00|2021-01-01 03:00:00|2024-12-20 10:35:40|            5984.0|  1054455.9|
|      NO5|       secondary|2021-01-01 03:00:00|2021-01-01 04:00:00|2024-12-20 10:35:40|            5984.0|  1049728.6|
|      NO5|       secondary|2021-01-01 04:00:00|2021-01-01 05:00:00|2024-12-20 10:35:40|            5984.0|  1099942.9|
+---------+----------------+------------

The difference count between 2024 and other years because the february in 2024 has 29 days

In [112]:
df_consump_spark = df_consump_from_cassandra.select("pricearea", "consumptiongroup", "starttime", "quantitykwh")
# Add year column
df_consump_spark = (
    df_consump_spark
        .withColumn("year",  year(to_timestamp("starttime")))
        .withColumn("month", month(to_timestamp("starttime")))
)
df_consump_spark.groupBy("year").agg(
    sum("quantitykwh").alias("total_quantitykwh"),
    count("*").alias("row_count")
).orderBy("year").show(truncate=False)



+----+---------------------+---------+
|year|total_quantitykwh    |row_count|
+----+---------------------+---------+
|2021|1.3145741822485603E11|218675   |
|2022|1.258865769915383E11 |218675   |
|2023|1.2811225028317383E11|218675   |
|2024|1.3071573243972789E11|219275   |
+----+---------------------+---------+



                                                                                

In [114]:
df_consump_spark.filter(df_consump_spark.year ==2023).groupBy("month").agg(
    count("*").alias("row_count")
).orderBy("month").show(truncate=False)



+-----+---------+
|month|row_count|
+-----+---------+
|1    |18575    |
|2    |16775    |
|3    |18550    |
|4    |17975    |
|5    |18575    |
|6    |17975    |
|7    |18575    |
|8    |18575    |
|9    |17975    |
|10   |18575    |
|11   |17975    |
|12   |18575    |
+-----+---------+



                                                                                

In [113]:
df_consump_spark.filter(df_consump_spark.year ==2024).groupBy("month").agg(
    count("*").alias("row_count")
).orderBy("month").show(truncate=False)



+-----+---------+
|month|row_count|
+-----+---------+
|1    |18575    |
|2    |17375    |
|3    |18550    |
|4    |17975    |
|5    |18575    |
|6    |17975    |
|7    |18575    |
|8    |18575    |
|9    |17975    |
|10   |18575    |
|11   |17975    |
|12   |18575    |
+-----+---------+



                                                                                

In [118]:
df_consump_spark.count()

                                                                                

875300

There are not any null or duplicate value

In [116]:
df_consump_clean = df_consump_spark.dropna(subset=['starttime', 'quantitykwh', 'pricearea', 'consumptiongroup'])
df_consump_clean = df_consump_clean.dropDuplicates(['pricearea', 'consumptiongroup', 'starttime'])
df_consump_clean.count()

                                                                                

875300

### Step3: Push data into Mongodb

In [119]:
def write_to_mongodb(df_spark, mongo_collection, EXECUTE_remoteDB=True):
    if EXECUTE_remoteDB:
        with open("config_local.json") as f:
            config = json.load(f)

        mongo_uri = config["mongo_uri"]

        # Create a new client and connect to the server
        client = MongoClient(mongo_uri, server_api=ServerApi('1'))
        # Send a ping to confirm a successful connection
        try:
            client.admin.command('ping')
            print("Pinged your deployment. You successfully connected to MongoDB!")
        except Exception as e:
            print(e)

        # connect to the Mongodb
        db = client["elhub_db"] 
        collection = db[mongo_collection] 
        df = df_spark.toPandas()
        data_dict = df.to_dict(orient="records")

        # insert the data 
        collection.insert_many(data_dict)

        print(f"{len(data_dict)} records inserted into MongoDB.")

In [None]:
write_to_mongodb(df_spark_new,"production_data",False)

Pinged your deployment. You successfully connected to MongoDB!


25/11/21 20:51:32 WARN ControlConnection: [s1] Error connecting to Node(endPoint=localhost/127.0.0.1:9042, hostId=null, hashCode=6a1b1d4d), trying next node (ConnectionInitException: [s1|control|id: 0xc3181e4b, L:/127.0.0.1:62101 - R:localhost/127.0.0.1:9042] Protocol initialization request, step 1 (OPTIONS): unexpected failure (java.lang.IllegalArgumentException: Unsupported request opcode: 0 in protocol 127))
                                                                                

656625 records inserted into MongoDB.


In [None]:
write_to_mongodb(df_consump_spark,"consumption_data",False)

Pinged your deployment. You successfully connected to MongoDB!


                                                                                

875300 records inserted into MongoDB.


Close the Spark session

In [122]:
spark.stop()