In [1]:
import pandas as pd
import requests
import numpy as np
import plotly.express as px
import plotly.io as pio
import os
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement
from dotenv import load_dotenv
from pyspark.sql import SparkSession, functions
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi

In [2]:
# Setting the environment for PySpark
load_dotenv()

HADOOP_PATH = os.getenv("HADOOP_PATH")

os.environ["JAVA_HOME"] = r"C:\Program Files\Microsoft\jdk-11.0.28.6-hotspot" 
os.environ["PYSPARK_HADOOP_VERSION"] = "without"
os.environ["HADOOP_HOME"] = HADOOP_PATH
os.environ["PYSPARK_PYTHON"] = "python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python"

In [8]:
# Setting the environment for MongoDB
load_dotenv()

USR,PWD = os.getenv("DB_USER"), os.getenv("DB_PWD")

uri = f"mongodb+srv://{USR}:{PWD}@ind320.nxw58bh.mongodb.net/?retryWrites=true&w=majority&appName=IND320"

# Create a client and connect to the server
client = MongoClient(uri, server_api=ServerApi('1'))

# Send a ping to confirm a successful connection
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)

Pinged your deployment. You successfully connected to MongoDB!


# Collecting data

In [2]:
# Setting the environment for collecting the elhub-data for 2021-01-01
entity = 'price-areas'
dataset = 'PRODUCTION_PER_GROUP_MBA_HOUR'
start = '2021-01-01T00:00:00%2B02:00'
end = '2021-01-01T23:59:59%2B02:00' 
res = requests.get(f'https://api.elhub.no/energy-data/v0/{entity}?dataset={dataset}&startDate={start}&endDate={end}')

In [3]:
# Check that we got a connection to the API
assert res.status_code == 200

In [4]:
for header_name, header_value in res.headers.items():
    print(f'{header_name:16s}: {header_value}')

Date            : Mon, 17 Nov 2025 15:29:22 GMT
Content-Type    : application/json; charset=utf-8
Transfer-Encoding: chunked
Connection      : keep-alive
Cache-Control   : public, max-age=3600
strict-transport-security: max-age=63072000; includeSubDomains


In [6]:
# Creating a list which we will extend with dataframes each containing data for one month
data = []

# Creating start and stop dates for the api call and collecting data
years = [2022, 2023, 2024]
months = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
for year in years:
    for month in months:
        if month < 9:
            start = f'{year}-0{month}-01T00:00:00%2B02:00'
            end = f'{year}-0{month+1}-01T00:00:00%2B02:00'
        elif month == 9:
            start = f'{year}-0{month}-01T00:00:00%2B02:00'
            end = f'{year}-{month+1}-01T00:00:00%2B02:00'
        elif month == 12:
            start = f'{year}-{month}-01T00:00:00%2B02:00'
            end = f'{year}-{month}-31T23:59:59%2B02:00'
        else: 
            start = f'{year}-{month}-01T00:00:00%2B02:00'
            end = f'{year}-{month+1}-01T00:00:00%2B02:00'
        print(start)
        res = requests.get(f'https://api.elhub.no/energy-data/v0/{entity}?dataset={dataset}&startDate={start}&endDate={end}')
        assert res.status_code == 200
        payload = res.json()
        temp_data = [pd.DataFrame(entry['attributes']['productionPerGroupMbaHour'])
                 for entry in payload['data']]
        data.extend(temp_data)

2022-01-01T00:00:00%2B02:00
2022-02-01T00:00:00%2B02:00
2022-03-01T00:00:00%2B02:00
2022-04-01T00:00:00%2B02:00
2022-05-01T00:00:00%2B02:00
2022-06-01T00:00:00%2B02:00
2022-07-01T00:00:00%2B02:00
2022-08-01T00:00:00%2B02:00
2022-09-01T00:00:00%2B02:00
2022-10-01T00:00:00%2B02:00
2022-11-01T00:00:00%2B02:00
2022-12-01T00:00:00%2B02:00
2023-01-01T00:00:00%2B02:00
2023-02-01T00:00:00%2B02:00
2023-03-01T00:00:00%2B02:00
2023-04-01T00:00:00%2B02:00
2023-05-01T00:00:00%2B02:00
2023-06-01T00:00:00%2B02:00
2023-07-01T00:00:00%2B02:00
2023-08-01T00:00:00%2B02:00
2023-09-01T00:00:00%2B02:00
2023-10-01T00:00:00%2B02:00
2023-11-01T00:00:00%2B02:00
2023-12-01T00:00:00%2B02:00
2024-01-01T00:00:00%2B02:00
2024-02-01T00:00:00%2B02:00
2024-03-01T00:00:00%2B02:00
2024-04-01T00:00:00%2B02:00
2024-05-01T00:00:00%2B02:00
2024-06-01T00:00:00%2B02:00
2024-07-01T00:00:00%2B02:00
2024-08-01T00:00:00%2B02:00
2024-09-01T00:00:00%2B02:00
2024-10-01T00:00:00%2B02:00
2024-11-01T00:00:00%2B02:00
2024-12-01T00:00:00%

In [7]:
# Make one datafram from the list of dataframes collected
df = pd.concat(data, ignore_index=True)

In [8]:
df.head()

Unnamed: 0,endTime,lastUpdatedTime,priceArea,productionGroup,quantityKwh,startTime
0,2022-01-01T01:00:00+01:00,2025-02-01T18:02:57+01:00,NO1,hydro,1291422.4,2022-01-01T00:00:00+01:00
1,2022-01-01T02:00:00+01:00,2025-02-01T18:02:57+01:00,NO1,hydro,1246209.4,2022-01-01T01:00:00+01:00
2,2022-01-01T03:00:00+01:00,2025-02-01T18:02:57+01:00,NO1,hydro,1271757.0,2022-01-01T02:00:00+01:00
3,2022-01-01T04:00:00+01:00,2025-02-01T18:02:57+01:00,NO1,hydro,1204251.8,2022-01-01T03:00:00+01:00
4,2022-01-01T05:00:00+01:00,2025-02-01T18:02:57+01:00,NO1,hydro,1202086.9,2022-01-01T04:00:00+01:00


In [9]:
df.shape

(657600, 6)

In [10]:
# Drop duplicates if there were any duplicates created at the start/end of months 
df = df.drop_duplicates()
df.shape

(657600, 6)

In [11]:
# Checking datatypes and converting datetime columns to type datetime
df.dtypes

endTime             object
lastUpdatedTime     object
priceArea           object
productionGroup     object
quantityKwh        float64
startTime           object
dtype: object

In [12]:
df['endTime'] = pd.to_datetime(df['endTime'], utc=True).dt.tz_localize(None)
df['lastUpdatedTime'] = pd.to_datetime(df['lastUpdatedTime'], utc=True).dt.tz_localize(None)
df['startTime'] = pd.to_datetime(df['startTime'], utc=True).dt.tz_localize(None)

In [13]:
df.dtypes

endTime            datetime64[ns]
lastUpdatedTime    datetime64[ns]
priceArea                  object
productionGroup            object
quantityKwh               float64
startTime          datetime64[ns]
dtype: object

In [14]:
# Setting Casseandra environment
keyspace = 'my_first_keyspace'
table_name = 'elhub'


In [15]:
# Function for converting pandas datatypes to Cassandra compatible datatypes
def pandas_to_cassandra_type(dtype):
    if pd.api.types.is_integer_dtype(dtype):
        return 'int'
    elif pd.api.types.is_float_dtype(dtype):
        return 'double'
    elif np.issubdtype(dtype, np.datetime64):
        return 'timestamp'
    else:
        return 'text'

In [16]:
# Connecting to Cassandra
cluster = Cluster(['localhost'], port=9042)
session = cluster.connect()
session.set_keyspace(f'{keyspace}')

In [17]:
# Setting column definitions for Cassandra
columns_cql = ', '.join([
    f'{col} {pandas_to_cassandra_type(df[col].dtype)}'
    for col in columns
])
columns_cql

'endTime timestamp, lastUpdatedTime timestamp, priceArea text, productionGroup text, quantityKwh double, startTime timestamp, row_id int'

In [20]:
# Read max row_id from Cassandra db to start inserting data with row ids starting after max row_id
result = session.execute("SELECT max(row_id) FROM elhub")
max_id = result.one()[0]
max_id

215353

In [27]:
df = df.drop('row_id', axis=1)

In [28]:
df.head()

Unnamed: 0,endTime,lastUpdatedTime,priceArea,productionGroup,quantityKwh,startTime
0,2022-01-01 00:00:00,2025-02-01 17:02:57,NO1,hydro,1291422.4,2021-12-31 23:00:00
1,2022-01-01 01:00:00,2025-02-01 17:02:57,NO1,hydro,1246209.4,2022-01-01 00:00:00
2,2022-01-01 02:00:00,2025-02-01 17:02:57,NO1,hydro,1271757.0,2022-01-01 01:00:00
3,2022-01-01 03:00:00,2025-02-01 17:02:57,NO1,hydro,1204251.8,2022-01-01 02:00:00
4,2022-01-01 04:00:00,2025-02-01 17:02:57,NO1,hydro,1202086.9,2022-01-01 03:00:00


In [29]:
df['row_number'] = range(1, len(df)+1)
primary_key = columns[-1]
df['row_id'] = max_id + df['row_number']
df = df.drop('row_number', axis=1)
df.head()

Unnamed: 0,endTime,lastUpdatedTime,priceArea,productionGroup,quantityKwh,startTime,row_id
0,2022-01-01 00:00:00,2025-02-01 17:02:57,NO1,hydro,1291422.4,2021-12-31 23:00:00,215354
1,2022-01-01 01:00:00,2025-02-01 17:02:57,NO1,hydro,1246209.4,2022-01-01 00:00:00,215355
2,2022-01-01 02:00:00,2025-02-01 17:02:57,NO1,hydro,1271757.0,2022-01-01 01:00:00,215356
3,2022-01-01 03:00:00,2025-02-01 17:02:57,NO1,hydro,1204251.8,2022-01-01 02:00:00,215357
4,2022-01-01 04:00:00,2025-02-01 17:02:57,NO1,hydro,1202086.9,2022-01-01 03:00:00,215358


In [31]:
# Inserting data into Cassandra using batch-insert 
columns = list(df.columns)
placeholders = ", ".join(["?"] * len(columns))
columns_str = ", ".join(columns)

insert_cql = f"INSERT INTO elhub ({columns_str}) VALUES ({placeholders})"

BATCH_SIZE = 100

prepared = session.prepare(insert_cql)
batch = BatchStatement()

for i, (_, row) in enumerate(df.iterrows(), 1):
    values = [v.to_pydatetime() if isinstance(v, pd.Timestamp) else v for v in row]
    batch.add(prepared, tuple(values))

    if i % BATCH_SIZE == 0:
        session.execute(batch)
        batch = BatchStatement()  # reset batch

# execute remaining
if len(batch) > 0:
    session.execute(batch)

print("Bulk insert completed")

Bulk insert completed


# Reading data from Cassandra using Spark

In [3]:
# Start a Spark session
spark = (
    SparkSession.builder
    .appName('CassandraReader')
    .config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.4.1')
    .config('spark.cassandra.connection.host', 'localhost')  
    .config('spark.cassandra.connection.port', '9042')
    .getOrCreate()
)

In [4]:
# Collect the data from the Cassandra database
df = (
    spark.read
    .format('org.apache.spark.sql.cassandra')
    .options(table='elhub', keyspace='my_first_keyspace')
    .load()
    .select('pricearea', 'productiongroup', 'starttime', 'quantitykwh')
)

In [5]:
# Check the dimensions of the data from Cassandra. Looks like the df is 4 times as long, good.
print((df.count(), len(df.columns)))

(872953, 4)


In [6]:
# Checking that the data looks ok
df.show()

+---------+---------------+-------------------+-----------+
|pricearea|productiongroup|          starttime|quantitykwh|
+---------+---------------+-------------------+-----------+
|      NO3|          hydro|2024-05-07 21:00:00|  2676882.2|
|      NO1|          solar|2022-02-10 12:00:00|   2922.939|
|      NO1|          other|2023-08-25 20:00:00|     16.362|
|      NO5|          other|2024-09-15 12:00:00|     15.321|
|      NO2|          solar|2024-04-11 00:00:00|     42.247|
|      NO5|        thermal|2022-10-10 18:00:00|    16207.0|
|      NO1|        thermal|2024-07-22 15:00:00|  21147.885|
|      NO2|        thermal|2021-04-25 02:00:00|   25452.85|
|      NO3|        thermal|2022-08-03 16:00:00|    18202.0|
|      NO1|          hydro|2022-08-30 17:00:00|  1317006.4|
|      NO4|          hydro|2021-04-26 03:00:00|  2285321.8|
|      NO3|          other|2023-05-02 20:00:00|     42.152|
|      NO2|          other|2021-02-26 19:00:00|      3.822|
|      NO3|          hydro|2021-07-02 08

In [7]:
# Lets also look at the tail of the data. Both head and tail looks good.
df.orderBy("row_id", ascending=False).limit(5).collect()

[Row(pricearea='NO5', productiongroup='wind', starttime=datetime.datetime(2024, 12, 31, 23, 0), quantitykwh=0.0),
 Row(pricearea='NO5', productiongroup='wind', starttime=datetime.datetime(2024, 12, 31, 22, 0), quantitykwh=0.0),
 Row(pricearea='NO5', productiongroup='wind', starttime=datetime.datetime(2024, 12, 31, 21, 0), quantitykwh=0.0),
 Row(pricearea='NO5', productiongroup='wind', starttime=datetime.datetime(2024, 12, 31, 20, 0), quantitykwh=0.0),
 Row(pricearea='NO5', productiongroup='wind', starttime=datetime.datetime(2024, 12, 31, 19, 0), quantitykwh=0.0)]

# Inserting data to MongoDB

In [11]:
# Converting the Spark datafram to a pandas dataframe
pdf = df.toPandas()

In [12]:
pdf.head()

Unnamed: 0,pricearea,productiongroup,starttime,quantitykwh
0,NO1,other,2022-10-03 06:00:00,11.3
1,NO4,thermal,2021-12-13 22:00:00,20695.977
2,NO2,wind,2022-05-27 11:00:00,569478.9
3,NO2,solar,2022-10-27 14:00:00,3084.904
4,NO1,hydro,2024-09-21 02:00:00,2692534.0


In [10]:
# Inserting the data to MongoDB
collection = client.IND320.production_NO1
x = collection.insert_many(pdf.to_dict('records'))

Py4JJavaError: An error occurred while calling o60.save.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: mongodb. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:724)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:863)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:257)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: mongodb.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 16 more
