# Assignment 4

## AI usage

## Log

Working on this assignment I am working first on requirements for the Jupyter Notebook and subsequently updating the Streamlit app. 

I really enjoyed working with the same import structure as in Assignment 2, with the Elhub API. However, I realized that the name I chose for the tables (`elhub_api`) in Cassandra and MongoDB weren't descriptive enough, as we now are using two tables from the same API. I was also unsure of whether the new production data from 2022-2024 should be strictly appended to the old tables or if I could remake the tables with data 2021-2024. Manipulating data with Spark and inserting into Cassandra and MongoDB was otherwise fine, mostly following the same structure from Assignment 2.

Updates for the Streamlit app have felt difficult as I haven't caught up with the progress in the lectures. 

## Links 

- Github: https://github.com/Satheris/IND320_SMAA
- Streamlit app: https://ind320smaa-2eg32uba6uhmrknkwtxzar.streamlit.app/

## Coding 

### Imports and system variables

In [21]:
import numpy as np
import pandas as pd 
import streamlit as st
import pymongo
from cassandra.cluster import Cluster
from pyspark.sql import SparkSession
from pyjstat import pyjstat
import requests
import json
import plotly.express as px
from pyspark import SparkConf, SparkContext

import plotly.io as pio
pio.renderers.default = "notebook+pdf+plotly_mimetype"

In [3]:
# Set environment variables for PySpark (system and version dependent!) 
# if not already set persistently (e.g., in .bashrc or .bash_profile or Windows environment variables)
import os
# Set the Java home path to the one you are using ((un)comment and edit as needed):
os.environ["JAVA_HOME"] = r"C:\Program Files\Java\jre1.8.0_471"

# If you are using environments in Python, you can set the environment variables like the alternative below.
# The default Python environment is used if the variables are set to "python" (edit if needed):
os.environ["PYSPARK_PYTHON"] = "python" # or similar to "/Users/kristian/miniforge3/envs/tf_M1/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python" # or similar to "/Users/kristian/miniforge3/envs/tf_M1/bin/python"

# On Windows you need to specify where the Hadoop drivers are located (uncomment and edit if needed):
os.environ["HADOOP_HOME"] = r"C:\Users\saraa\Documents\winutils\hadoop-3.3.1"

# Set the Hadoop version to the one you are using, e.g., none:
os.environ["PYSPARK_HADOOP_VERSION"] = "without"

### Cassandra and Spark

In [4]:
# Connecting to Cassandra
# Run local Docker container first
cluster = Cluster(['localhost'], port=9042)
session = cluster.connect()



 Traceback (most recent call last):
   File "c:\Users\saraa\anaconda3\envs\IND320_SMAA\Lib\site-packages\cassandra\cluster.py", line 3577, in _reconnect_internal
    return self._try_connect(host)
           ^^^^^^^^^^^^^^^^^^^^^^^
   File "c:\Users\saraa\anaconda3\envs\IND320_SMAA\Lib\site-packages\cassandra\cluster.py", line 3599, in _try_connect
    connection = self._cluster.connection_factory(host.endpoint, is_control_connection=True)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File "c:\Users\saraa\anaconda3\envs\IND320_SMAA\Lib\site-packages\cassandra\cluster.py", line 1670, in connection_factory
    return self.connection_class.factory(endpoint, self.connect_timeout, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File "c:\Users\saraa\anaconda3\envs\IND320_SMAA\Lib\site-packages\cassandra\connection.py", line 852, in factory
    raise conn.last_error
 cassandra.conne

In [6]:
# Set up new keyspace
#                                              name of keyspace                        replication strategy           replication factor
session.execute("CREATE KEYSPACE IF NOT EXISTS ind320_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };")

# Create a new table
session.set_keyspace('ind320_keyspace')
# session.execute("DROP TABLE IF EXISTS ind320_keyspace.elhub_api;") # Starting from scratch every time
session.execute("DROP TABLE IF EXISTS ind320_keyspace.elhub_production;") # Starting from scratch every time
session.execute("DROP TABLE IF EXISTS ind320_keyspace.elhub_consumption;") # Starting from scratch every time
session.execute("CREATE TABLE IF NOT EXISTS elhub_production \
                (ind int PRIMARY KEY, startTime text, endTime text, lastUpdatedTime text, \
                productionGroup text, quantityKwh float, \
                priceArea text);")
session.execute("CREATE TABLE IF NOT EXISTS elhub_consumption \
                (ind int PRIMARY KEY, startTime text, endTime text, lastUpdatedTime text, \
                consumptionGroup text, quantityKwh float, \
                priceArea text, meteringPointCount int)")

<cassandra.cluster.ResultSet at 0x25f0957c510>

In [7]:
spark = SparkSession.builder.appName('SparkCassandraApp').\
    config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.5.1').\
    config('spark.cassandra.connection.host', 'localhost').\
    config('spark.sql.extensions', 'com.datastax.spark.connector.CassandraSparkExtensions').\
    config('spark.sql.catalog.mycatalog', 'com.datastax.spark.connector.datasource.CassandraCatalog').\
    config('spark.cassandra.connection.port', '9042').\
    config('spark.driver.host', 'localhost').\
    config('spark.driver.bindAddress', '127.0.0.1').\
    config('spark.sql.adaptive.enabled', 'true').\
        getOrCreate()

#### Testing that the connection works

In [8]:
# .load() is used to load data from Cassandra table as a Spark DataFrame
spark.read.format("org.apache.spark.sql.cassandra").options(table="my_first_table", keyspace="my_first_keyspace").load().show()

+---+--------+-------+
|ind| company|  model|
+---+--------+-------+
|460|    Ford|Transit|
|459|    Ford| Escort|
|  3|Polestar|      3|
|  1|   Tesla|Model S|
|  2|   Tesla|Model 3|
+---+--------+-------+



### MongoDB

In [9]:
def init_connection():
    return pymongo.MongoClient(st.secrets["mongo"]["uri"])

client = init_connection()

# Send a ping to confirm a successful connection
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)

Pinged your deployment. You successfully connected to MongoDB!


### Elhub API

#### Production data

In [10]:
# initating lists for traversing the URL
years = [2021, 2022, 2023, 2024, 2025]
months = ['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12']


all_records = []

for i, year in enumerate(years[:-1]):
    for j, month in enumerate(months):

        # catching last month with adapted URL for end time
        if month == months[-1]:
            URL = 'https://api.elhub.no/energy-data/v0/price-areas?dataset=PRODUCTION_PER_GROUP_MBA_HOUR&'\
                f'startDate={year}-{month}-01T00:00:00%2B02:00&endDate={years[i+1]}-{months[0]}-01T00:00:00%2B02:00'
            
        # all other months follow the same structure
        else: 
            URL = 'https://api.elhub.no/energy-data/v0/price-areas?dataset=PRODUCTION_PER_GROUP_MBA_HOUR&'\
                f'startDate={year}-{month}-01T00:00:00%2B02:00&endDate={year}-{months[j+1]}-01T00:00:00%2B02:00'

        payload = { 
            "query": [], 
            "response": { "format": "json-stat2" } }

        response = requests.get(URL, json=payload)
        
        # print(f"Y:{year}, m:{month}, Status Code: {response.status_code}")

        data = response.json()

        for area in data['data']:
            records = area['attributes']['productionPerGroupMbaHour']
            for record in records:
                record['priceArea'] = area['attributes']['name']
                all_records.append(record)

elhub_production = pd.DataFrame(all_records)
elhub_production.index.name = 'ind'
elhub_production = elhub_production.reset_index()

print(f"\nCreated DataFrame with {len(elhub_production)} rows")
elhub_production.head()


Created DataFrame with 872953 rows


Unnamed: 0,ind,endTime,lastUpdatedTime,priceArea,productionGroup,quantityKwh,startTime
0,0,2021-01-01T01:00:00+01:00,2024-12-20T10:35:40+01:00,NO1,hydro,2507716.8,2021-01-01T00:00:00+01:00
1,1,2021-01-01T02:00:00+01:00,2024-12-20T10:35:40+01:00,NO1,hydro,2494728.0,2021-01-01T01:00:00+01:00
2,2,2021-01-01T03:00:00+01:00,2024-12-20T10:35:40+01:00,NO1,hydro,2486777.5,2021-01-01T02:00:00+01:00
3,3,2021-01-01T04:00:00+01:00,2024-12-20T10:35:40+01:00,NO1,hydro,2461176.0,2021-01-01T03:00:00+01:00
4,4,2021-01-01T05:00:00+01:00,2024-12-20T10:35:40+01:00,NO1,hydro,2466969.2,2021-01-01T04:00:00+01:00


#### Consumption data

In [11]:
# initating lists for traversing the URL
years = [2021, 2022, 2023, 2024, 2025]
months = ['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12']


all_records = []

for i, year in enumerate(years[:-1]):
    for j, month in enumerate(months):
        
        # catching last month with adapted URL for end time
        if month == months[-1]:
            URL = 'https://api.elhub.no/energy-data/v0/price-areas?dataset=CONSUMPTION_PER_GROUP_MBA_HOUR&'\
                f'startDate={year}-{month}-01T00:00:00%2B02:00&endDate={years[i+1]}-{months[0]}-01T00:00:00%2B02:00'
        
        # all other months follow the same structure
        else: 
            URL = 'https://api.elhub.no/energy-data/v0/price-areas?dataset=CONSUMPTION_PER_GROUP_MBA_HOUR&'\
                f'startDate={year}-{month}-01T00:00:00%2B02:00&endDate={year}-{months[j+1]}-01T00:00:00%2B02:00'

        payload = { 
            "query": [], 
            "response": { "format": "json-stat2" } }

        response = requests.get(URL, json=payload)
        
        # print(f"Y:{year}, m:{month}, Status Code: {response.status_code}")

        data = response.json()

        for area in data['data']:
            records = area['attributes']['consumptionPerGroupMbaHour']
            for record in records:
                record['priceArea'] = area['attributes']['name']
                all_records.append(record)

elhub_consumption = pd.DataFrame(all_records)
elhub_consumption.index.name = 'ind'
elhub_consumption = elhub_consumption.reset_index()

print(f"\nCreated DataFrame with {len(elhub_consumption)} rows")
elhub_consumption.head()


Created DataFrame with 876600 rows


Unnamed: 0,ind,consumptionGroup,endTime,lastUpdatedTime,meteringPointCount,priceArea,quantityKwh,startTime
0,0,cabin,2021-01-01T01:00:00+01:00,2024-12-20T10:35:40+01:00,100607,NO1,177071.56,2021-01-01T00:00:00+01:00
1,1,cabin,2021-01-01T02:00:00+01:00,2024-12-20T10:35:40+01:00,100607,NO1,171335.12,2021-01-01T01:00:00+01:00
2,2,cabin,2021-01-01T03:00:00+01:00,2024-12-20T10:35:40+01:00,100607,NO1,164912.02,2021-01-01T02:00:00+01:00
3,3,cabin,2021-01-01T04:00:00+01:00,2024-12-20T10:35:40+01:00,100607,NO1,160265.77,2021-01-01T03:00:00+01:00
4,4,cabin,2021-01-01T05:00:00+01:00,2024-12-20T10:35:40+01:00,100607,NO1,159828.69,2021-01-01T04:00:00+01:00


In [12]:
# I found that the columns in the Cassandra table was constructed with lowercase letters.
# Therefore, I need to convert the column names to lowercase before writing to Cassandra

str_list = ['elhub_production', 'elhub_consumption']

for i, table in enumerate([elhub_production, elhub_consumption]):
    name_dict = {}
    for capitalname in (table.columns):
        name_dict[capitalname] = capitalname.lower()
    
    table = table.rename(columns=name_dict)

    # Convert the Pandas DataFrame to Spark DataFrame and save it to Cassandra (append mode)
    spark.createDataFrame(table).write.format('org.apache.spark.sql.cassandra')\
    .options(table=str_list[i], keyspace='ind320_keyspace').mode('append').save()

    print(f'Saved {str_list[i]} to Cassandra')

Saved elhub_production to Cassandra
Saved elhub_consumption to Cassandra


### Saving dfs to MongoDB

In [19]:
# Selecting a database and a collection
database = client['project']

for table in str_list:
    collection = database[table]
    collection.delete_many({}) # starting fresh

    spark.read.format("org.apache.spark.sql.cassandra")\
    .options(table=table, keyspace="ind320_keyspace").load()\
    .createOrReplaceTempView(f"{table}_view")

    df_spark = spark.sql(f"SELECT priceArea, {table.split('_')[1]}Group, startTime, quantityKwh FROM {table}_view")

    # Convert DataFrame to JSON and dumping to MongoDB
    df_pd = df_spark.toPandas()
    json_data = df_pd.to_json(orient='records')

    documents = json.loads(json_data)
    try: 
        collection.insert_many(documents)
        print(f'Successfully inserted {table} to MongoDB')
    except Exception as e:
        print(e)

Successfully inserted elhub_production to MongoDB
Successfully inserted elhub_consumption to MongoDB


### Stopping Spark session

In [20]:
# Stop Spark session
try:
    spark.stop()
    print('Spark session terminated successfully')
except ConnectionRefusedError:
    print("Spark session already stopped.")
except NameError:
    print('Spark session is not defined')

Spark session terminated successfully


### Testing for Streamlit app