# Project Work, Part 4 - Machine Learning
## 1. Introduction
This project involves analysing data with implementing machine learning model in a Jupyter Notebook and creating a multi-page online app with Streamlit, with all work and code shared on GitHub. AI tools (e.g., ChatGPT) were utilized during the project to clarify requirements and to gain a deeper understanding of the technologies used.

- Task: Analysis of Norwegian electricity production (Elhub) and meteorological data (Open‑Meteo API).
- Goal: automate data collection, perform time‑series decomposition, periodic analysis, and anomaly detection; then visualize results in a Jupyter Notebook and Streamlit dashboard.

## 2. Repository and App Links
- GitHub: https://github.com/Indraadhikari/IND320_Indra
- Streamlit app: https://ind320-k2r8aymxk9takanegm8e3y.streamlit.app

## 3. Project Overview
### 3.1 AI Usage Description
In this project, I used AI (ChatGPT) as a helpful assistant during development. It supported me in solving coding errors, generating code ideas, and improving my understanding of concepts. The AI explained topics such as STL decomposition, Discrete Cosine Transform (DCT) filtering, and Local Outlier Factor (LOF) anomaly detection, giving both theory and example code.

I also used it to debug Python and Streamlit issues, like fixing empty DataFrames, using st.session_state, avoiding runtime errors, and organizing the multi-page layout. During implementation, I followed AI suggestions to clean up functions, set better parameter defaults, and make the visualizations easier to read.

All AI outputs were carefully checked, tested, and modified to fit the project’s goals and my own coding style. Overall, the AI acted as a learning and support tool, helping me work faster and understand data analysis and software design more deeply.

### 3.2 Project Log
For the compulsory work, I began by defining representative cities for Norway’s five electricity price areas (NO1–NO5) and storing their latitude and longitude in a Pandas DataFrame. This mapping created the geographic foundation for the rest of the analyses. I then downloaded hourly electricity production data from the Elhub API for 2021, focusing on the *PRODUCTION_PER_GROUP_MBA_HOUR* dataset. The raw *JSON* responses were normalized into a clean DataFrame.

Next, I replaced my earlier CSV‑based meteorological import with live calls to the Open‑Meteo API. For each selected price area, the application automatically queries the API using the corresponding city’s coordinates, returning hourly temperature, precipitation, and wind observations for 2019 in a Notebook file and 2021 for the Streamlit app. The fetched data are transformed into a tidy format in a Pandas DataFrame and cached for efficient reuse.

Analytical development was divided into three main components, implemented and tested first in a Jupyter Notebook.
- Seasonal‑Trend decomposition using LOESS (STL): using the *statsmodels.tsa.seasonal.STL* class, I decomposed the production time series into trend, seasonal, and residual components.
- Spectrogram analysis: applying *scipy.signal.spectrogram*, I generated time–frequency plots to reveal changes in periodic behavior across the year.
- Outlier and Anomaly detection: I implemented a robust Statistical Process Control (SPC) method using *Median ± k × MAD* boundaries on filtered temperature data and applied the Local Outlier Factor (LOF) algorithm from *scikit‑learn* to identify precipitation anomalies.
Each analytical block was wrapped in a modular Python function with configurable parameters (area, group, window length, etc.) and tested interactively in the notebook before integration into the Streamlit app.

I then updated the Streamlit dashboard to follow the new required page order. The global area selector was moved to the second page (named *Energy Production(4)* in the app), ensuring that all subsequent analyses depend on the user’s chosen region. Between existing pages, I added *new A (STL and Spectrogram(A))* and *new B (Outliers and Anomalies (B))* pages, each built with *st.tabs()* for navigation. Both pages render Matplotlib plots directly and display them. Communication between pages is managed through *st.session_state*, allowing the selected price area imported meteorological data and production data to persist throughout the session.

The completed workflow demonstrates a full data pipeline: acquiring data dynamically via APIs, performing time‑series analysis, detecting anomalies, and presenting interactive results through a structured Streamlit interface and Jupyter Notebook.

# 4. Importing Libraries

In [1]:
import requests 
import pandas as pd
import calendar
import numpy as np
import matplotlib.pyplot as plt
from scipy.fftpack import dct, idct
from sklearn.neighbors import LocalOutlierFactor
from statsmodels.tsa.seasonal import STL
from scipy.signal import spectrogram

## 5. Data Extraction and Loading

### 5.1 Connection Check for Cassandra

In [2]:
from cassandra.cluster import Cluster

try:
    cluster = Cluster(['localhost'], port=9042)
    session = cluster.connect()
    print("✅ Connected to Cassandra!")
    print("Cluster name:", cluster.metadata.cluster_name)
    print("Hosts:", cluster.metadata.all_hosts())
    cluster.shutdown()
except Exception as e:
    print("❌ Connection failed:", e)

✅ Connected to Cassandra!
Cluster name: Test Cluster
Hosts: [<Host: ::1:9042 datacenter1>]


### 5.2 Connection Check for MangoDB

In [24]:
from pymongo.mongo_client import MongoClient

c_file = '/Users/indra/Documents/Masters in Data Science/Data to Decision/IND320_Indra/No_sync/MongoDB.txt' #creadential file
USR, PWD = open(c_file).read().splitlines()

uri = "mongodb+srv://"+USR+":"+PWD+"@cluster0.wmoqhtp.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"

# Create a new client and connect to the server
client = MongoClient(uri)

# Send a ping to confirm a successful connection
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)

Pinged your deployment. You successfully connected to MongoDB!


### 5.3 Reading Data from  Elhub API

In [6]:
import requests

headers = {    
    
}

endpoint = "https://api.elhub.no/energy-data/v0/"
entity = 'price-areas'
dataset = "PRODUCTION_PER_GROUP_MBA_HOUR"
#startdate = '2022-01-01T00:20:00%2B02:00'
#enddate = '2024-12-31T23:59:59%2B02:00'
year = [2022, 2023, 2024]

In [7]:
import calendar
import pandas as pd

dates = []
for i in year:
    year = i
    # accessing the data for a month at a time as Endpoint does not allow us to get for a whole year.
    for month in range(1, 13):
        # Get number of days in month
        _, last_day = calendar.monthrange(year, month)
        
        # Format month and day properly (e.g. 01, 02, …)
        startdate = f"{year}-{month:02d}-01T00:20:00%2B02:00"
        enddate = f"{year}-{month:02d}-{last_day:02d}T23:59:59%2B02:00"
        
        dates.append((startdate, enddate))

all_data = []

for startdate, enddate in dates:
    #print(f"Start: {start}   End: {end}")
    data = []
    response = requests.get(f"{endpoint}{entity}?dataset={dataset}&startDate={startdate}&endDate={enddate}", headers=headers)
    #print(response.status_code)
    data = response.json()
    #data['data'][1]['attributes']['productionPerGroupMbaHour']
    for i in data['data']:
        all_data.extend(i['attributes']['productionPerGroupMbaHour'])
df = pd.DataFrame(all_data)
print(df.shape)

(656700, 6)


In [15]:
df.head(2), df.tail(2)

(                     endTime            lastUpdatedTime priceArea  \
 0  2022-01-01T02:00:00+01:00  2025-02-01T18:02:57+01:00       NO1   
 1  2022-01-01T03:00:00+01:00  2025-02-01T18:02:57+01:00       NO1   
 
   productionGroup  quantityKwh                  startTime  
 0           hydro    1246209.4  2022-01-01T01:00:00+01:00  
 1           hydro    1271757.0  2022-01-01T02:00:00+01:00  ,
                           endTime            lastUpdatedTime priceArea  \
 656698  2024-12-31T23:00:00+01:00  2025-03-30T18:39:27+02:00       NO5   
 656699  2025-01-01T00:00:00+01:00  2025-03-30T18:39:27+02:00       NO5   
 
        productionGroup  quantityKwh                  startTime  
 656698            wind          0.0  2024-12-31T22:00:00+01:00  
 656699            wind          0.0  2024-12-31T23:00:00+01:00  )

### 5.4 Creating Keyspace and Table in Cassandra

In [16]:
from cassandra.cluster import Cluster

#starting cassandra conection session

cluster = Cluster(['localhost'], port=9042)
session = cluster.connect()

#making id columns for Premary Key for the table.
if "id" not in df.columns:
    df = df.reset_index().rename(columns={'index': 'id'})
else:
    pass
df.columns

columns = ", ".join([f"{col} text" for col in df.columns]) # type is text
primary_key = df.columns[0]  # first column as primary key (id; index of the df)

# Create a keyspace (database)
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS infindra
    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
""")

#ALTER KEYSPACE infindra WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

session.set_keyspace('infindra')

#Creating Tables
create_query = f"""
CREATE TABLE IF NOT EXISTS production_per_group_4 (
    {columns},
    PRIMARY KEY ({primary_key})
)
"""
session.execute(create_query)
#session.execute("TRUNCATE TABLE production_per_group;")

<cassandra.cluster.ResultSet at 0x1451bd210>

### 5.5 Inserting data to Cassandra using Spark 

In [43]:
from pyspark.sql import SparkSession
import os

os.environ['JAVA_HOME'] = "/Library/Java/JavaVirtualMachines/microsoft-17.jdk/Contents/Home"
os.environ['PATH'] = os.path.join(os.environ['JAVA_HOME'], 'bin') + ":" + os.environ['PATH']

spark = (
    SparkSession.builder
    .appName("CassandraReadTest")
    .master("local[*]")
    .config("spark.jars.repositories",
            "https://repos.spark-packages.org,https://oss.sonatype.org/content/repositories/releases/")
    .config("spark.jars.packages",
            "com.datastax.spark:spark-cassandra-connector_2.12:3.5.1,"
            "org.mongodb.spark:mongo-spark-connector_2.12:3.5.1")
    .config("spark.cassandra.connection.host", "localhost")
    .config("spark.cassandra.connection.port", "9042")
    .getOrCreate()
)

print(f"✅ Spark version: {spark.version}")
#spark.stop()

✅ Spark version: 3.5.1


In [19]:
# Convert Pandas DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(df)

spark_df = spark_df.toDF(*[c.lower() for c in spark_df.columns]) #changing the colomns name in lower case to match casandra table
#spark_df.printSchema()
# Show the data
spark_df.show(2)

25/11/25 18:29:34 WARN TaskSetManager: Stage 0 contains a task of very large size (9102 KiB). The maximum recommended task size is 1000 KiB.
25/11/25 18:29:39 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 0 (TID 0): Attempting to kill Python Worker
                                                                                

+---+--------------------+--------------------+---------+---------------+-----------+--------------------+
| id|             endtime|     lastupdatedtime|pricearea|productiongroup|quantitykwh|           starttime|
+---+--------------------+--------------------+---------+---------------+-----------+--------------------+
|  0|2022-01-01T02:00:...|2025-02-01T18:02:...|      NO1|          hydro|  1246209.4|2022-01-01T01:00:...|
|  1|2022-01-01T03:00:...|2025-02-01T18:02:...|      NO1|          hydro|  1271757.0|2022-01-01T02:00:...|
+---+--------------------+--------------------+---------+---------------+-----------+--------------------+
only showing top 2 rows



In [None]:
# Optimize Cassandra write settings
spark.conf.set("spark.cassandra.output.concurrent.writes", "5")
spark.conf.set("spark.cassandra.output.throughput_mb_per_sec", "200")
spark.conf.set("spark.cassandra.output.batch.size.rows", "1000")

# Write DataFrame to Cassandra
# keyspace='infindra' and table='production_per_group' exist in Cassandra
spark_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .option("keyspace", "infindra") \
    .option("table", "production_per_group_4") \
    .option("confirm.truncate", "true") \
    .mode("overwrite") \
    .save()

print("Data successfully written to Cassandra!")

### 5.6 Reading Data from Cassandra

In [82]:
#dataframe for Elhub API´s PRODUCTION_PER_GROUP_MBA_HOUR data for 2021
df_c = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .option("keyspace", "infindra") \
    .option("table", "production_per_group") \
    .load()

df_c_new = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .option("keyspace", "infindra") \
    .option("table", "production_per_group_4") \
    .load()

df_merge = df_c.union(df_c_new)

selected_df = df_merge.select("priceArea", "productionGroup", "startTime", "quantityKwh")

#selected_df.show()
selected_df.tail(5)

                                                                                

[Row(priceArea='NO1', productionGroup='thermal', startTime='2023-03-15T12:00:00+01:00', quantityKwh='42119.51'),
 Row(priceArea='NO3', productionGroup='solar', startTime='2023-10-04T08:00:00+02:00', quantityKwh='84.075'),
 Row(priceArea='NO4', productionGroup='thermal', startTime='2023-08-05T07:00:00+02:00', quantityKwh='225001.0'),
 Row(priceArea='NO1', productionGroup='solar', startTime='2024-03-01T03:00:00+01:00', quantityKwh='308.645'),
 Row(priceArea='NO5', productionGroup='other', startTime='2022-10-02T20:00:00+02:00', quantityKwh='0.003')]

## 5.7 Inserting spark df to Mongo DB Atlas

In [83]:

selected_df.write \
  .format("com.mongodb.spark.sql.DefaultSource") \
  .option("spark.mongodb.output.uri", uri) \
  .option("spark.mongodb.output.database", "indra") \
  .option("spark.mongodb.output.collection", "production_per_group") \
  .mode("overwrite") \
  .save()

print("Success!")

df_mongo = (
        spark.read
        .format("com.mongodb.spark.sql.DefaultSource")  # for v10+ connector, this is correct
        .option("spark.mongodb.input.uri", uri)
        .option("spark.mongodb.input.database", "indra")
        .option("spark.mongodb.input.collection", "production_per_group")
        .load()
    )

print("Data Loaded.")
df_mongo.show(5, truncate=False)

                                                                                

Success!


                                                                                

Data Loaded.
+--------------------------+---------+---------------+-----------+-------------------------+
|_id                       |priceArea|productionGroup|quantityKwh|startTime                |
+--------------------------+---------+---------------+-----------+-------------------------+
|{69260eedd334ff502e7d5101}|NO5      |thermal        |76033.0    |2021-10-07T23:00:00+02:00|
|{69260eedd334ff502e7d5102}|NO3      |other          |0.958      |2021-10-06T23:00:00+02:00|
|{69260eedd334ff502e7d5103}|NO2      |solar          |4247.242   |2021-03-30T15:00:00+02:00|
|{69260eedd334ff502e7d5104}|NO3      |hydro          |2643314.8  |2021-01-05T02:00:00+01:00|
|{69260eedd334ff502e7d5105}|NO3      |other          |1.915      |2021-11-27T20:00:00+01:00|
+--------------------------+---------+---------------+-----------+-------------------------+
only showing top 5 rows



In [84]:
df_mongo.orderBy("startTime").tail(5)

                                                                                

[Row(_id=Row(oid='69260fcdd334ff502e896c2b'), priceArea='NO1', productionGroup='solar', quantityKwh='325.507', startTime='2024-12-31T23:00:00+01:00'),
 Row(_id=Row(oid='69260fd9d334ff502e89e2cf'), priceArea='NO4', productionGroup='hydro', quantityKwh='2677024.0', startTime='2024-12-31T23:00:00+01:00'),
 Row(_id=Row(oid='69260fdbd334ff502e89f407'), priceArea='NO3', productionGroup='other', quantityKwh='25.577', startTime='2024-12-31T23:00:00+01:00'),
 Row(_id=Row(oid='69260febd334ff502e8a4acb'), priceArea='NO4', productionGroup='solar', quantityKwh='0.0', startTime='2024-12-31T23:00:00+01:00'),
 Row(_id=Row(oid='69260fffd334ff502e8a7605'), priceArea='NO3', productionGroup='solar', quantityKwh='66.082', startTime='2024-12-31T23:00:00+01:00')]