to do:
- Dokumentere bruk av KI

In [1]:
# imports
import json
import os
import requests
import pandas as pd
import pymongo.errors
import plotly.express as px
import uuid
from bs4 import BeautifulSoup
from io import StringIO
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, udf
from pyspark.sql.types import StringType

## MongoDB

### Set up remote database

In [2]:
# read
with open("../.nosync/mongoDB.json", "r") as file:
    credentials = json.load(file)

uri = (
    "mongodb+srv://medvetslos:"
    + json.load(open("../.nosync/mongoDB.json"))["pwd"]
    + "@ind320-project.lunku.mongodb.net/?retryWrites=true&w=majority&appName=IND320-project"
)

mdb_client = MongoClient(uri, server_api=ServerApi("1"))

try:
    mdb_client.admin.command("ping")
    print("Pinged your deployment. Successfully connected to MongoDB.")
except Exception as exceptionMsg:
    print(exceptionMsg)

Pinged your deployment. Successfully connected to MongoDB.


In [3]:
# Creating collections for municipality data and gas prices
database = mdb_client["IND320-project"]
collection_names = ["muncipalities", "gas"]

for name in collection_names:
    # Checking if collection exists. If not, create the collection.
    try:
        database.create_collection(name)
        print(f"Collection '{name}' was created successfully.")
    except pymongo.errors.CollectionInvalid:
        print(f"Collection '{name}' already exists.")

municipalities = database["municipalities"]
gas = database["gas"]


Collection 'muncipalities' already exists.
Collection 'gas' already exists.


## Cassandra

In [4]:
from cassandra.cluster import Cluster

cluster = Cluster(["localhost"], port=9042)
session = cluster.connect()
keyspace = "ind320_project"
session.execute(
    "CREATE KEYSPACE IF NOT EXISTS" + 
    " " + 
    keyspace + 
    " " + 
    "WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1};"
)

session.set_keyspace(keyspace)

## Webscraping



In [18]:
webscrape_url = "https://en.wikipedia.org/wiki/List_of_municipalities_of_Denmark"

page = requests.get(webscrape_url)
soup = BeautifulSoup(page.content, "html.parser")
wiki_table = soup.find("table", attrs={"class": "wikitable sortable"})

df_municipalities = pd.read_html(StringIO(str(wiki_table)))[0]
records_municipalities = df_municipalities.to_dict("records")

LAU_1_code = df_municipalities.columns.tolist()[0]


# check if data we are writing already exists
existing_entries = list(
    database["municipalities"].find(
        {
            LAU_1_code: {
                "$in": [record[LAU_1_code] for record in records_municipalities]
            }
        }
    )
)

# if new data,
new_entries = [
    entry for entry in records_municipalities
    if not any(existing_entry[LAU_1_code] == entry[LAU_1_code] for existing_entry in existing_entries)
]

# Writing to MongoDB
if len(new_entries) > 0:
    database["municipalities"].insert_many(new_entries)
    print("Data successfully written into the collection.")
else:
    print("No new data to be inserted into the collection.")

# database["municipalities"].delete_many({}) # delete all records

No new data to be inserted into the collection.


## API

In [5]:
api_url = "https://api.energidataservice.dk/dataset/"
filtering = "?offset=0&start=2022-01-01T00:00&end=2023-01-01T00:00&sort=HourUTC%20DESC"

api_datasets = {
    "remote": {"GasDailyBalancingPrice": "gas"},
    "local": {
         "production": "ProductionMunicipalityHour", 
         "consumption": "ConsumptionIndustry",
         "prodcons": "ProductionConsumptionSettlement"
    }
}

In [6]:
def get_json_data(dataset: str):
    return requests.get(api_url + dataset + filtering).json()["records"]

In [8]:
df_production = pd.DataFrame.from_records(get_json_data(api_datasets["local"]["production"]))
df_consumption = pd.DataFrame.from_records(get_json_data(api_datasets["local"]["consumption"]))
df_prodcons = pd.DataFrame.from_records(get_json_data(api_datasets["local"]["prodcons"]))

In [9]:
df_production.dtypes

HourUTC                     object
HourDK                      object
MunicipalityNo              object
SolarMWh                   float64
OffshoreWindLt100MW_MWh    float64
OffshoreWindGe100MW_MWh    float64
OnshoreWindMWh             float64
ThermalPowerMWh            float64
dtype: object

In [10]:
df_production['HourDK'] = pd.to_datetime(df_production['HourDK'])
df_production['HourUTC'] = pd.to_datetime(df_production['HourUTC'])
df_production['MunicipalityNo'] = df_production['MunicipalityNo'].astype(int)

In [11]:
df_consumption.dtypes

HourUTC            object
HourDK             object
MunicipalityNo     object
Branche            object
ConsumptionkWh    float64
dtype: object

In [12]:
df_consumption['HourDK'] = pd.to_datetime(df_consumption['HourDK'])
df_consumption['HourUTC'] = pd.to_datetime(df_consumption['HourUTC'])
df_consumption['MunicipalityNo'] = df_consumption['MunicipalityNo'].astype(int)
df_consumption['Branche'] = df_consumption['HourUTC'].astype(str)

In [13]:
df_prodcons.dtypes

HourUTC                        object
HourDK                         object
PriceArea                      object
CentralPowerMWh               float64
LocalPowerMWh                 float64
CommercialPowerMWh            float64
LocalPowerSelfConMWh          float64
OffshoreWindLt100MW_MWh       float64
OffshoreWindGe100MW_MWh       float64
OnshoreWindLt50kW_MWh         float64
OnshoreWindGe50kW_MWh         float64
HydroPowerMWh                 float64
SolarPowerLt10kW_MWh          float64
SolarPowerGe10Lt40kW_MWh      float64
SolarPowerGe40kW_MWh          float64
SolarPowerSelfConMWh          float64
UnknownProdMWh                float64
ExchangeNO_MWh                float64
ExchangeSE_MWh                float64
ExchangeGE_MWh                float64
ExchangeNL_MWh                float64
ExchangeGB_MWh                 object
ExchangeGreatBelt_MWh         float64
GrossConsumptionMWh           float64
GridLossTransmissionMWh       float64
GridLossInterconnectorsMWh    float64
GridLossDist

In [14]:
df_prodcons['HourDK'] = pd.to_datetime(df_prodcons['HourDK'])
df_prodcons['HourUTC'] = pd.to_datetime(df_prodcons['HourUTC'])
df_prodcons['PriceArea'] = df_prodcons['PriceArea'].astype(str)
df_prodcons['ExchangeGB_MWh'] = df_prodcons["ExchangeGB_MWh"].astype(float)

In [15]:
df_prodcons = df_prodcons.rename(columns=str.lower)
df_production = df_production.rename(columns=str.lower)
df_consumption = df_consumption.rename(columns=str.lower)


In [16]:
# Prompt: Can you write me a function which makes the table creation query from a 
#         Pandas DataFrame which assigns the correct datatype to the Cassandra table.
#         The primary key should be named id and be of type timeuuid
def create_cassandra_table_query(df, keyspace, table_name):
    # Define mapping between Pandas and Cassandra datatypes
    dtype_mapping = {
        'int64': 'int',
        'float64': 'double',
        'object': 'text',
        'bool': 'boolean',
        'datetime64[ns]': 'timestamp'
    }
    
    # Start constructing the CREATE TABLE query
    query = f"CREATE TABLE IF NOT EXISTS {keyspace}.{table_name} (\n"
    
    # Add primary key column with timeuuid
    columns = ["id timeuuid"]
    
    # Add remaining columns with mapped Cassandra data types
    for col, dtype in df.dtypes.items():
        if col != "id":  # Exclude 'id' to avoid duplication
            cassandra_type = dtype_mapping.get(str(dtype), 'text')  # Default to 'text' if type is unrecognized
            columns.append(f"{col} {cassandra_type}")
    
    # Join columns with commas and specify primary key as 'id'
    columns_str = ",\n    ".join(columns)
    query += f"    {columns_str},\n"
    query += f"    PRIMARY KEY (id)\n);"
    
    return query

# def create_cassandra_table_query(df, keyspace, table_name, primary_key):
#     # Define mapping between Pandas and Cassandra datatypes
#     dtype_mapping = {
#         'int64': 'int',
#         'float64': 'double',
#         'object': 'text',
#         'bool': 'boolean',
#         'datetime64[ns]': 'timestamp'
#     }
    
#     # Start constructing the CREATE TABLE query
#     query = f"CREATE TABLE IF NOT EXISTS {keyspace}.{table_name} (\n"
    
#     # Add each column name and its corresponding Cassandra data type
#     columns = []
#     for col, dtype in df.dtypes.items():
#         cassandra_type = dtype_mapping.get(str(dtype), 'text')  # Default to 'text' if type is unrecognized
#         columns.append(f"{col} {cassandra_type}")
    
#     # Join columns with commas and specify primary key
#     columns_str = ",\n    ".join(columns)
#     query += f"    {columns_str},\n"
#     query += f"    PRIMARY KEY ({primary_key})\n);"
    
#     return query

In [29]:
production_table = create_cassandra_table_query(df_production, keyspace, "production")
consumption_table = create_cassandra_table_query(df_consumption, keyspace, "consumption") 
prodcons_table = create_cassandra_table_query(df_prodcons, keyspace, "prodcons") 

for table_query in [production_table, consumption_table, prodcons_table]:
    session.execute(table_query)

In [30]:
query = f"SELECT table_name FROM system_schema.tables WHERE keyspace_name = 'ind320_project'"
keyspace = "ind320_project"
# Execute the query
rows = session.execute(query)

# Print the table names
print(f"Tables in keyspace '{keyspace}':")
for row in rows:
    print(row.table_name)
    # session.execute(f"DROP TABLE IF EXISTS {keyspace}.{row.table_name}")

Tables in keyspace 'ind320_project':
consumption
prodcons
production


## Spark to Cassandra

In [20]:
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11/"
os.environ["PYSPARK_PYTHON"] = "python" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "python" 
os.environ["PYSPARK_HADOOP_VERSION"] = "without"

In [21]:
spark = SparkSession.builder.appName('SparkCassandraApp').\
    config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.4.1').\
    config('spark.cassandra.connection.host', 'localhost').\
    config('spark.sql.extensions', 'com.datastax.spark.connector.CassandraSparkExtensions').\
    config('spark.sql.catalog.mycatalog', 'com.datastax.spark.connector.datasource.CassandraCatalog').\
    config('spark.cassandra.connection.port', '9042').\
    config("spark.driver.memory", "4g").\
    config("spark.executor.memory", "4g").\
    config("spark.task.maxFailures", "10").\
    config("spark.sql.shuffle.partitions", "200").\
    getOrCreate()


24/10/07 21:37:38 WARN Utils: Your hostname, Aarons-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.11.132 instead (on interface en0)
24/10/07 21:37:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/aaron/.ivy2/cache
The jars for the packages stored in: /Users/aaron/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f34dfb06-2a06-4fc6-aa27-213c7d040687;1.0
	confs: [default]
	found com.datastax.spark#spark-cassandra-connector_2.12;3.4.1 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.4.1 in central


:: loading settings :: url = jar:file:/Users/aaron/Documents/IND320_projects/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.scala-lang.modules#scala-collection-compat_2.12;2.11.0 in central
	found com.datastax.oss#java-driver-core-shaded;4.13.0 in central
	found com.datastax.oss#native-protocol;1.5.0 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.4.1 in central
	found org.slf4j#slf4j-api;1.7.26 in central
	found io.dropwizard.metrics#metrics-core;4.1.18 in central
	found org.hdrhistogram#HdrHistogram;2.1.12 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central
	found com.github.stephenc.jcip#jcip-annotations;1.0-1 in central
	found com.github.spotbugs#spotbugs-annotations;3.1.12 in central
	found com.google.code.findbugs#jsr305;3.0.2 in central
	found com.datastax.oss#java-driver-mapper-runtime;4.13.0 in central
	found com.datastax.oss#java-driver-query-builder;4.13.0 in central
	found org.apache.commons#commons-lang3;3.10 in central
	found com.thoughtworks.paranamer#paranamer;2.8 in central
	found org.sca

In [31]:
def generate_timeuuid():
    return str(uuid.uuid1())

timeuuid_udf = udf(generate_timeuuid, StringType())

spark.createDataFrame(df_production).withColumn("id", timeuuid_udf())\
    .write.format("org.apache.spark.sql.cassandra")\
    .mode("append")\
    .options(table="production", keyspace="ind320_project")\
    .save()

24/10/07 21:43:33 WARN TaskSetManager: Stage 3 contains a task of very large size (5307 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [32]:
spark.createDataFrame(df_consumption).withColumn("id", timeuuid_udf())\
    .write.format("org.apache.spark.sql.cassandra")\
    .mode("append")\
    .options(table="consumption", keyspace="ind320_project")\
    .save()

24/10/07 21:44:35 WARN TaskSetManager: Stage 4 contains a task of very large size (12495 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [33]:
spark.createDataFrame(df_prodcons).withColumn("id", timeuuid_udf())\
    .write.format("org.apache.spark.sql.cassandra")\
    .mode("append")\
    .options(table="prodcons", keyspace="ind320_project")\
    .save()

                                                                                

Idea now:
- Use Spark/Cassandra to retrieve data rather than through the API

In [24]:
query = f"SELECT table_name FROM system_schema.tables WHERE keyspace_name = 'ind320_project'"
keyspace = "ind320_project"
# Execute the query
rows = session.execute(query)

# Print the table names
print(f"Tables in keyspace '{keyspace}':")
for row in rows:
    print(row.table_name)
    session.execute(f"DROP TABLE IF EXISTS {keyspace}.{row.table_name}")

Tables in keyspace 'ind320_project':
consumption
prodcons
production
