In [1]:

from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
from dotenv import load_dotenv
import os

load_dotenv()
uri = os.getenv("URI")

# Create a new client and connect to the server
client = MongoClient(uri, server_api=ServerApi('1'))

# Send a ping to confirm a successful connection
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)

Pinged your deployment. You successfully connected to MongoDB!


In [2]:
os.environ["HADOOP_HOME"] = "C:/Hadoop/hadoop-3.3.1"
os.environ["PYSPARK_HADOOP_VERSION"] = "without"

In [3]:
# Connecting to Cassandra
import asyncore
from cassandra.cluster import Cluster
cluster = Cluster(['localhost'], port=9042)
session = cluster.connect()

  import asyncore


In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkCassandraApp').\
    config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.5.1').\
    config('spark.cassandra.connection.host', 'localhost').\
    config('spark.sql.extensions', 'com.datastax.spark.connector.CassandraSparkExtensions').\
    config('spark.sql.catalog.mycatalog', 'com.datastax.spark.connector.datasource.CassandraCatalog').\
    config('spark.cassandra.connection.port', '9042').getOrCreate()
# Some warnings are to be expected.
# If running this cell does not give any output after ~30 seconds, there is likely an error in the configuration (JAVA_HOME, HADOOP_HOME, etc.).

In [5]:
session.execute("CREATE KEYSPACE IF NOT EXISTS my_first_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };")

<cassandra.cluster.ResultSet at 0x12017e85a10>

In [6]:
# Create a new table (first time only)
session.set_keyspace('my_first_keyspace')
session.execute("DROP TABLE IF EXISTS my_first_keyspace.my_first_table;") # Starting from scratch every time
session.execute("CREATE TABLE IF NOT EXISTS my_first_table (ind int PRIMARY KEY, company text, model text);")

<cassandra.cluster.ResultSet at 0x120379ac890>

In [7]:
# Insert some data (ind is the primary key, must be unique)
session.execute("INSERT INTO my_first_table (ind, company, model) VALUES (1, 'Tesla', 'Model S');")
session.execute("INSERT INTO my_first_table (ind, company, model) VALUES (2, 'Tesla', 'Model 3');")
session.execute("INSERT INTO my_first_table (ind, company, model) VALUES (3, 'Polestar', '3');")

<cassandra.cluster.ResultSet at 0x120379d4810>

In [8]:
# Query the data
rows = session.execute("SELECT * FROM my_first_table;")
for i in rows:
    print(i)

Row(ind=1, company='Tesla', model='Model S')
Row(ind=2, company='Tesla', model='Model 3')
Row(ind=3, company='Polestar', model='3')


In [9]:
session.execute_async("INSERT INTO my_first_table (ind, company, model) VALUES (5, 'Volkswagen', 'ID.3');")

<ResponseFuture: query='<SimpleStatement query="INSERT INTO my_first_table (ind, company, model) VALUES (5, 'Volkswagen', 'ID.3');", consistency=Not Set>' request_id=25 result=(no result yet) exception=None coordinator_host=None>

In [10]:
# Query the data
rows = session.execute("SELECT * FROM my_first_table;")
for i in rows:
    print(i)

Row(ind=5, company='Volkswagen', model='ID.3')
Row(ind=1, company='Tesla', model='Model S')
Row(ind=2, company='Tesla', model='Model 3')
Row(ind=3, company='Polestar', model='3')


In [11]:
# More specific query
prepared_statement = session.prepare("SELECT * FROM my_first_table WHERE company=? ALLOW FILTERING;")
teslas = session.execute(prepared_statement, ['Tesla'])
for i in teslas:
    print(i)

Row(ind=1, company='Tesla', model='Model S')
Row(ind=2, company='Tesla', model='Model 3')


In [12]:
# Selecting a database and a collection.
database = client['example']
collection = database['data']

In [13]:
# Inserting a single document (dictionary).
collection.insert_one({'name': 'Hallvard', 'age': 23})

# Inserting multiple documents (list of dictionaries).
collection.insert_many([
    {'name': 'Kristian', 'age': 27},
    {'name': 'Ihn Duck', 'age': 15}
])

# Note that an _id field is automatically generated by MongoDB.

<pymongo.results.InsertManyResult at 0x12037337380>

In [14]:
# Reading ALL documents from a collection.
# ........................................

documents = collection.find({})
# A cursor is returned.

# The cursor can be iterated over:
for document in documents:
    print(document)

# Or directly converted to a list:
#documents = list(documents)

{'_id': ObjectId('68f24cb2a6f0e9d0a1f97b7f'), 'name': 'Hallvard Lavik', 'age': 33}
{'_id': ObjectId('68f24cb2a6f0e9d0a1f97b80'), 'name': 'Kristian', 'age': 37}
{'_id': ObjectId('68f24cb2a6f0e9d0a1f97b81'), 'name': 'Ihn Duck', 'age': 25}
{'_id': ObjectId('68f25da8d8698bfd0c7e6a7b'), 'name': 'Hallvard Lavik', 'age': 32}
{'_id': ObjectId('68f25da8d8698bfd0c7e6a7c'), 'name': 'Kristian', 'age': 36}
{'_id': ObjectId('68f25da8d8698bfd0c7e6a7d'), 'name': 'Ihn Duck', 'age': 24}
{'_id': ObjectId('68f9e1224e8c45d5ee0fb385'), 'name': 'Hallvard Lavik', 'age': 31}
{'_id': ObjectId('68f9e1224e8c45d5ee0fb386'), 'name': 'Kristian', 'age': 35}
{'_id': ObjectId('68f9e1224e8c45d5ee0fb387'), 'name': 'Ihn Duck', 'age': 23}
{'_id': ObjectId('68f9e24dd4c233458872dfec'), 'name': 'Hallvard Lavik', 'age': 30}
{'_id': ObjectId('68f9e24ed4c233458872dfed'), 'name': 'Kristian', 'age': 34}
{'_id': ObjectId('68f9e24ed4c233458872dfee'), 'name': 'Ihn Duck', 'age': 22}
{'_id': ObjectId('68f9e32c6dfbcb4154919b7f'), 'name'

In [15]:
# Reading SPECIFIC documents from a collection.
# .............................................

hallvard = collection.find({'name': 'Hallvard'})

for document in hallvard:
    print(document)

hallvard = list(hallvard)

{'_id': ObjectId('68f9fe1aeec11bd59064fc54'), 'name': 'Hallvard', 'age': 23}


In [16]:
# Updating a single document.
# ...........................
collection.update_one(
    {'name': 'Hallvard'},
    {'$set': {'name': 'Hallvard Lavik'}}  # Sets the `name` to `Hallvard Lavik`.
)

# Updating multiple documents.
# ............................
collection.update_many(
    {},
       {'$inc': {'age': 1}}  # Increments the `age` of all documents by `1`.
)

<pymongo.results.UpdateResult at 0x120379d7680>

In [24]:
import requests
from datetime import datetime, timedelta
import pytz

headers = {
    
}

base_url = "https://api.elhub.no/energy-data/v0/price-areas"
dataset = "PRODUCTION_PER_GROUP_MBA_HOUR"

# Use timezone-aware datetime for Norwegian time zone
oslo_tz = pytz.timezone('Europe/Oslo')
start = oslo_tz.localize(datetime(2021, 1, 1, 0, 0, 0))
end = oslo_tz.localize(datetime(2021, 12, 31, 23, 59, 59))

responses = []

while start <= end:
    # Calculate month end in the same timezone
    month_end = (start.replace(day=28) + timedelta(days=4)).replace(day=1) - timedelta(seconds=1)
    if month_end > end:
        month_end = end

    params = {
        "dataset": dataset,
        "startDate": start.isoformat(),
        "endDate": month_end.isoformat()
    }

    response = requests.get(base_url, headers=headers, params=params)
    responses.append(response.json())

    start = month_end + timedelta(seconds=1)


In [26]:
# Extract all productionPerGroupMbaHour lists from all responses
all_production_data = []

for response in responses:
    # Navigate through the response structure
    if 'data' in response:
        for price_area in response['data']:
            if 'attributes' in price_area and 'productionPerGroupMbaHour' in price_area['attributes']:
                production_list = price_area['attributes']['productionPerGroupMbaHour']
                # Only add non-empty lists
                if production_list:
                    all_production_data.extend(production_list)

print(f"Total records extracted: {len(all_production_data)}")
print(f"Sample record: {all_production_data[0] if all_production_data else 'No data'}")

Total records extracted: 215353
Sample record: {'endTime': '2021-01-01T01:00:00+01:00', 'lastUpdatedTime': '2024-12-20T10:35:40+01:00', 'priceArea': 'NO1', 'productionGroup': 'hydro', 'quantityKwh': 2507716.8, 'startTime': '2021-01-01T00:00:00+01:00'}


In [27]:
# Convert to pandas DataFrame
import pandas as pd

df = pd.DataFrame(all_production_data)
print(f"DataFrame shape: {df.shape}")
print(f"\nDataFrame columns: {df.columns.tolist()}")
print(f"\nFirst few rows:")
print(df.head())

DataFrame shape: (215353, 6)

DataFrame columns: ['endTime', 'lastUpdatedTime', 'priceArea', 'productionGroup', 'quantityKwh', 'startTime']

First few rows:
                     endTime            lastUpdatedTime priceArea  \
0  2021-01-01T01:00:00+01:00  2024-12-20T10:35:40+01:00       NO1   
1  2021-01-01T02:00:00+01:00  2024-12-20T10:35:40+01:00       NO1   
2  2021-01-01T03:00:00+01:00  2024-12-20T10:35:40+01:00       NO1   
3  2021-01-01T04:00:00+01:00  2024-12-20T10:35:40+01:00       NO1   
4  2021-01-01T05:00:00+01:00  2024-12-20T10:35:40+01:00       NO1   

  productionGroup  quantityKwh                  startTime  
0           hydro    2507716.8  2021-01-01T00:00:00+01:00  
1           hydro    2494728.0  2021-01-01T01:00:00+01:00  
2           hydro    2486777.5  2021-01-01T02:00:00+01:00  
3           hydro    2461176.0  2021-01-01T03:00:00+01:00  
4           hydro    2466969.2  2021-01-01T04:00:00+01:00  


In [28]:
# Create Cassandra table for production data
session.execute("DROP TABLE IF EXISTS my_first_keyspace.production_data;")

# Create table with appropriate schema based on the data structure
create_table_query = """
CREATE TABLE IF NOT EXISTS my_first_keyspace.production_data (
    price_area text,
    production_group text,
    start_time timestamp,
    end_time timestamp,
    quantity_kwh double,
    last_updated_time timestamp,
    PRIMARY KEY ((price_area, production_group), start_time)
) WITH CLUSTERING ORDER BY (start_time ASC);
"""

session.execute(create_table_query)
print("Table 'production_data' created successfully!")

Table 'production_data' created successfully!


In [29]:
# Convert pandas DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(df)

# Rename columns to match Cassandra table schema (convert camelCase to snake_case)
spark_df = spark_df.withColumnRenamed("priceArea", "price_area") \
                   .withColumnRenamed("productionGroup", "production_group") \
                   .withColumnRenamed("startTime", "start_time") \
                   .withColumnRenamed("endTime", "end_time") \
                   .withColumnRenamed("quantityKwh", "quantity_kwh") \
                   .withColumnRenamed("lastUpdatedTime", "last_updated_time")

# Convert timestamp strings to timestamp type
from pyspark.sql.functions import to_timestamp

spark_df = spark_df.withColumn("start_time", to_timestamp("start_time")) \
                   .withColumn("end_time", to_timestamp("end_time")) \
                   .withColumn("last_updated_time", to_timestamp("last_updated_time"))

print("Spark DataFrame schema:")
spark_df.printSchema()
print(f"\nTotal rows: {spark_df.count()}")
print("\nFirst few rows:")
spark_df.show(5, truncate=False)

Spark DataFrame schema:
root
 |-- end_time: timestamp (nullable = true)
 |-- last_updated_time: timestamp (nullable = true)
 |-- price_area: string (nullable = true)
 |-- production_group: string (nullable = true)
 |-- quantity_kwh: double (nullable = true)
 |-- start_time: timestamp (nullable = true)


Total rows: 215353

First few rows:
+-------------------+-------------------+----------+----------------+------------+-------------------+
|end_time           |last_updated_time  |price_area|production_group|quantity_kwh|start_time         |
+-------------------+-------------------+----------+----------------+------------+-------------------+
|2021-01-01 01:00:00|2024-12-20 10:35:40|NO1       |hydro           |2507716.8   |2021-01-01 00:00:00|
|2021-01-01 02:00:00|2024-12-20 10:35:40|NO1       |hydro           |2494728.0   |2021-01-01 01:00:00|
|2021-01-01 03:00:00|2024-12-20 10:35:40|NO1       |hydro           |2486777.5   |2021-01-01 02:00:00|
|2021-01-01 04:00:00|2024-12-20 10:35:40|

In [30]:
# Insert data into Cassandra using Spark
spark_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("append") \
    .options(table="production_data", keyspace="my_first_keyspace") \
    .save()

print("Data successfully inserted into Cassandra!")

Data successfully inserted into Cassandra!


In [31]:
# Verify the data was inserted by querying from Cassandra
rows = session.execute("SELECT * FROM my_first_keyspace.production_data LIMIT 10;")
print("Sample data from Cassandra:")
for row in rows:
    print(row)

Sample data from Cassandra:
Row(price_area='NO3', production_group='wind', start_time=datetime.datetime(2020, 12, 31, 23, 0), end_time=datetime.datetime(2021, 1, 1, 0, 0), last_updated_time=datetime.datetime(2024, 12, 20, 9, 35, 40), quantity_kwh=259312.2)
Row(price_area='NO3', production_group='wind', start_time=datetime.datetime(2021, 1, 1, 0, 0), end_time=datetime.datetime(2021, 1, 1, 1, 0), last_updated_time=datetime.datetime(2024, 12, 20, 9, 35, 40), quantity_kwh=225762.9)
Row(price_area='NO3', production_group='wind', start_time=datetime.datetime(2021, 1, 1, 1, 0), end_time=datetime.datetime(2021, 1, 1, 2, 0), last_updated_time=datetime.datetime(2024, 12, 20, 9, 35, 40), quantity_kwh=248005.1)
Row(price_area='NO3', production_group='wind', start_time=datetime.datetime(2021, 1, 1, 2, 0), end_time=datetime.datetime(2021, 1, 1, 3, 0), last_updated_time=datetime.datetime(2024, 12, 20, 9, 35, 40), quantity_kwh=243180.3)
Row(price_area='NO3', production_group='wind', start_time=datetim