# Part 4 — Machine Learning

## Links
- **Live updated app:** https://ind320-project-work-nonewthing.streamlit.app/
- **Repo:** https://github.com/TaoM29/IND320-dashboard-basics



## AI Usage

I Used an ChatGPT 5 as a coding and troubleshooting partner. 

## Work Log

...


In [1]:
# Imports, paths, env
import os, sys
from pathlib import Path
import pandas as pd

# Make PySpark use THIS Python
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable
os.environ.setdefault("PYSPARK_HADOOP_VERSION", "without")

# Cassandra contact info (Docker on Mac)
CASS_HOST = "127.0.0.1"
CASS_PORT = "9042"
CASS_DC   = "datacenter1"
KS        = "ind320"

# Mongo (optional – set our secrets)
MONGO_URI = os.getenv("MONGO_URI", "")
MONGO_DB  = os.getenv("MONGO_DB", "ind320")

In [2]:
from cassandra.cluster import Cluster

cluster = Cluster(["127.0.0.1"], port=9042)
session = cluster.connect()
row = session.execute("SELECT cluster_name,data_center,release_version FROM system.local").one()
print("✅ Cassandra:", row.cluster_name, "|", row.data_center, "|", row.release_version)

session.set_keyspace("ind320")
tables = [r.table_name for r in session.execute(
    "SELECT table_name FROM system_schema.tables WHERE keyspace_name='ind320'"
)]
print("Tables in ind320:", sorted(tables))

✅ Cassandra: Test Cluster | datacenter1 | 5.0.6
Tables in ind320: ['consumption_mba_hour', 'production_mba_hour']


In [None]:
# Spark session (DataStax connector assembly contains the Java driver)
try:
    spark.stop()
except:
    pass

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("IND320-Cassandra")
    .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.5.1")
    .config("spark.cassandra.connection.host", CASS_HOST)
    .config("spark.cassandra.connection.port", CASS_PORT)
    .config("spark.cassandra.connection.localDC", CASS_DC)
    .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")
print("Spark ready (assembly).")

25/11/19 13:23:45 WARN Utils: Your hostname, Taofiks-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.42.69.134 instead (on interface en0)
25/11/19 13:23:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/taom/.ivy2/cache
The jars for the packages stored in: /Users/taom/.ivy2/jars
com.datastax.spark#spark-cassandra-connector-assembly_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-39f684d9-a17b-4a19-889f-c23ba2a1b733;1.0
	confs: [default]
	found com.datastax.spark#spark-cassandra-connector-assembly_2.12;3.5.1 in central
:: resolution report :: resolve 54ms :: artifacts dl 2ms
	:: modules in use:
	com.datastax.spark#spark-cassandra-connector-assembly_2.12;3.5.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnld

:: loading settings :: url = jar:file:/opt/miniconda3/envs/IND320env/lib/python3.13/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	0 artifacts copied, 1 already retrieved (0kB/3ms)
25/11/19 13:23:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Spark ready (assembly).


In [4]:
# Imports
from datetime import datetime, timezone
import os, requests, pandas as pd
from tqdm.auto import tqdm
import itertools
from pyspark.sql import functions as F

# Constants
BASE_V0 = "https://api.elhub.no/energy-data/v0"
ELHUB_API_TOKEN = os.getenv("ELHUB_API_TOKEN")  
PRICE_AREAS = ["NO1","NO2","NO3","NO4","NO5"]


# Common headers for JSON:API
def headers_jsonapi():
    h = {"Accept": "application/vnd.api+json"}
    if ELHUB_API_TOKEN:
        h["Authorization"] = f"Bearer {ELHUB_API_TOKEN}"
    return h

# ISO 8601 UTC offset formatting
def iso_utc_offset(dt: datetime) -> str:
    if dt.tzinfo is None:
        dt = dt.replace(tzinfo=timezone.utc)
    dt = dt.astimezone(timezone.utc)
    off = dt.strftime("%z")
    off = off[:-2] + ":" + off[-2:]
    return dt.strftime("%Y-%m-%dT%H:%M:%S") + off


In [5]:
# Groups (ids)
def list_groups(kind="production"):
    url = f"{BASE_V0}/{kind}-groups"
    r = requests.get(url, headers=headers_jsonapi(), timeout=30)
    r.raise_for_status()
    rows = []
    for item in r.json().get("data", []):
        attrs = item.get("attributes", {}) or {}
        rows.append({"id": item.get("id"), "name": attrs.get("name")})
    df = pd.DataFrame(rows)
  
    ids = [g for g in df["id"].tolist() if g != "*"]
    return ids, df

prod_group_ids, production_groups_df = list_groups("production")
cons_group_ids, consumption_groups_df = list_groups("consumption")

print("Production groups:", prod_group_ids)
print("Consumption groups:", cons_group_ids)

Production groups: ['solar', 'hydro', 'wind', 'thermal', 'nuclear', 'other']
Consumption groups: ['household', 'cabin', 'primary', 'secondary', 'tertiary', 'industry', 'private', 'business']


In [None]:
# Generic monthly fetch
def fetch_month_generic(
    price_area: str,
    group_id: str,
    year: int,
    month: int,
    dataset: str,
    group_param_name: str,
    inner_key: str,
    group_col_out: str,
    verbose: bool = False,
) -> pd.DataFrame:
    start = datetime(year, month, 1, tzinfo=timezone.utc)
    end   = datetime(year + (month==12), (month % 12) + 1, 1, tzinfo=timezone.utc)

    params = {
        "dataset": dataset,
        "priceArea": price_area,
        group_param_name: group_id,
        "startDate": iso_utc_offset(start),
        "endDate":   iso_utc_offset(end),
        "pageSize":  10000,
    }

    url = f"{BASE_V0}/price-areas"
    r = requests.get(url, headers=headers_jsonapi(), params=params, timeout=90)

    if verbose:
        print("HTTP", r.status_code, "|", r.headers.get("Content-Type"))
        print("URL:", r.url)

    if r.status_code != 200:
        if verbose: print("Body preview:", r.text[:400])
        return pd.DataFrame()

    data = r.json().get("data", [])
    if not data:
        return pd.DataFrame(columns=["priceArea", group_col_out, "startTime", "quantityKwh"])

    rows = []
    for rec in data:
        attrs = rec.get("attributes", {}) or {}
        area  = attrs.get("name") or rec.get("id") or price_area
        inner = attrs.get(inner_key, []) or []
        for item in inner:
            rows.append({
                "priceArea": area,
                group_col_out: item.get(group_col_out),
                "startTime": item.get("startTime"),
                "quantityKwh": item.get("quantityKwh")
            })

    df = pd.DataFrame(rows)
    if df.empty:
        return df

    df["startTime"]   = pd.to_datetime(df["startTime"], utc=True, errors="coerce")
    df["quantityKwh"] = pd.to_numeric(df["quantityKwh"], errors="coerce")
    df = df.dropna(subset=["startTime","quantityKwh"]).reset_index(drop=True)
    return df


# Thin wrappers (so our code stays readable)
def fetch_month_prod(area, group_id, year, month, verbose=False):
    return fetch_month_generic(
        area, group_id, year, month,
        dataset="PRODUCTION_PER_GROUP_MBA_HOUR",
        group_param_name="productionGroup",
        inner_key="productionPerGroupMbaHour",
        group_col_out="productionGroup",
        verbose=verbose,
    )

def fetch_month_cons(area, group_id, year, month, verbose=False):
    return fetch_month_generic(
        area, group_id, year, month,
        dataset="CONSUMPTION_PER_GROUP_MBA_HOUR",
        group_param_name="consumptionGroup",
        inner_key="consumptionPerGroupMbaHour",
        group_col_out="consumptionGroup",
        verbose=verbose,
    )

In [7]:
# Fetch production data for 2022–2024
YEARS_PROD = [2022, 2023, 2024]

total_months = len(PRICE_AREAS) * len(prod_group_ids) * len(YEARS_PROD) * 12
print(f"Planned requests (prod): {total_months} months "
      f"= {len(PRICE_AREAS)} areas × {len(prod_group_ids)} groups × {len(YEARS_PROD)} years × 12 months")

parts, runs, non_empty, row_count = [], 0, 0, 0
pbar = tqdm(total=total_months, desc="Production (2022–2024) months", leave=True)

for area in PRICE_AREAS:
    for g in prod_group_ids:
        for y, m in itertools.product(YEARS_PROD, range(1, 13)):
            df_m = fetch_month_prod(area, g, y, m)
            runs += 1
            if not df_m.empty:
                parts.append(df_m)
                non_empty += 1
                row_count += len(df_m)
            # update progress every month
            pbar.set_postfix_str(f"rows_so_far={row_count:,}")
            pbar.update(1)

pbar.close()

prod_2224 = (pd.concat(parts, ignore_index=True)
             if parts else pd.DataFrame(columns=["priceArea","productionGroup","startTime","quantityKwh"]))
prod_2224 = prod_2224.drop_duplicates(subset=["priceArea","productionGroup","startTime"]).reset_index(drop=True)

print("\n=== PRODUCTION 2022–2024 ===")
print(f"Requests run: {runs} | Non-empty months: {non_empty} | Rows: {len(prod_2224):,}")
print("Span:", prod_2224["startTime"].min() if not prod_2224.empty else None,
      "→",  prod_2224["startTime"].max() if not prod_2224.empty else None)
display(prod_2224.head())

Planned requests (prod): 1080 months = 5 areas × 6 groups × 3 years × 12 months


Production (2022–2024) months:   0%|          | 0/1080 [00:00<?, ?it/s]


=== PRODUCTION 2022–2024 ===
Requests run: 1080 | Non-empty months: 900 | Rows: 657,600
Span: 2021-12-31 23:00:00+00:00 → 2024-12-31 22:00:00+00:00


Unnamed: 0,priceArea,productionGroup,startTime,quantityKwh
0,NO1,solar,2021-12-31 23:00:00+00:00,6.448
1,NO1,solar,2022-01-01 00:00:00+00:00,6.062
2,NO1,solar,2022-01-01 01:00:00+00:00,4.697
3,NO1,solar,2022-01-01 02:00:00+00:00,10.907
4,NO1,solar,2022-01-01 03:00:00+00:00,5.975


In [8]:
# Fetch consumption data for 2021–2024
YEARS_CONS = [2021, 2022, 2023, 2024]

total_months_c = len(PRICE_AREAS) * len(cons_group_ids) * len(YEARS_CONS) * 12
print(f"\nPlanned requests (cons): {total_months_c} months "
      f"= {len(PRICE_AREAS)} areas × {len(cons_group_ids)} groups × {len(YEARS_CONS)} years × 12 months")

parts_c, runs_c, non_empty_c, row_count_c = [], 0, 0, 0
pbar_c = tqdm(total=total_months_c, desc="Consumption (2021–2024) months", leave=True)

for area in PRICE_AREAS:
    for g in cons_group_ids:
        for y, m in itertools.product(YEARS_CONS, range(1, 13)):
            df_m = fetch_month_cons(area, g, y, m)
            runs_c += 1
            if not df_m.empty:
                parts_c.append(df_m)
                non_empty_c += 1
                row_count_c += len(df_m)
            pbar_c.set_postfix_str(f"rows_so_far={row_count_c:,}")
            pbar_c.update(1)

pbar_c.close()

cons_2124 = (pd.concat(parts_c, ignore_index=True)
             if parts_c else pd.DataFrame(columns=["priceArea","consumptionGroup","startTime","quantityKwh"]))
cons_2124 = cons_2124.drop_duplicates(subset=["priceArea","consumptionGroup","startTime"]).reset_index(drop=True)

print("\n=== CONSUMPTION 2021–2024 ===")
print(f"Requests run: {runs_c} | Non-empty months: {non_empty_c} | Rows: {len(cons_2124):,}")
print("Span:", cons_2124["startTime"].min() if not cons_2124.empty else None,
      "→",  cons_2124["startTime"].max() if not cons_2124.empty else None)
display(cons_2124.head())


Planned requests (cons): 1920 months = 5 areas × 8 groups × 4 years × 12 months


Consumption (2021–2024) months:   0%|          | 0/1920 [00:00<?, ?it/s]


=== CONSUMPTION 2021–2024 ===
Requests run: 1920 | Non-empty months: 1200 | Rows: 876,600
Span: 2020-12-31 23:00:00+00:00 → 2024-12-31 22:00:00+00:00


Unnamed: 0,priceArea,consumptionGroup,startTime,quantityKwh
0,NO1,household,2020-12-31 23:00:00+00:00,2366888.8
1,NO1,household,2021-01-01 00:00:00+00:00,2325218.2
2,NO1,household,2021-01-01 01:00:00+00:00,2273791.2
3,NO1,household,2021-01-01 02:00:00+00:00,2221311.8
4,NO1,household,2021-01-01 03:00:00+00:00,2188174.2


In [18]:
import pandas as pd
from pyspark.sql import types as T

# === PRODUCTION ===
prod_pdf = (
    prod_2224
    .rename(columns={"priceArea":"price_area", "productionGroup":"production_group",
                     "startTime":"start_time", "quantityKwh":"quantity_kwh"})
    .assign(
        year=lambda d: d["start_time"].dt.year,
        # make timestamps naive UTC for Spark→Cassandra
        start_time=lambda d: pd.to_datetime(d["start_time"], utc=True).dt.tz_convert("UTC").dt.tz_localize(None),
        quantity_kwh=lambda d: pd.to_numeric(d["quantity_kwh"], errors="coerce")
    )[["price_area","production_group","year","start_time","quantity_kwh"]]
)

prod_schema = T.StructType([
    T.StructField("price_area",       T.StringType(),  False),
    T.StructField("production_group", T.StringType(),  False),
    T.StructField("year",             T.IntegerType(), False),
    T.StructField("start_time",       T.TimestampType(), False),
    T.StructField("quantity_kwh",     T.DoubleType(),  False),
])

prod_sdf = spark.createDataFrame(prod_pdf.dropna(), schema=prod_schema)
prod_sdf.printSchema()
prod_sdf.show(3, truncate=False)

# === CONSUMPTION ===
cons_pdf = (
    cons_2124
    .rename(columns={"priceArea":"price_area", "consumptionGroup":"consumption_group",
                     "startTime":"start_time", "quantityKwh":"quantity_kwh"})
    .assign(
        year=lambda d: d["start_time"].dt.year,
        start_time=lambda d: pd.to_datetime(d["start_time"], utc=True).dt.tz_convert("UTC").dt.tz_localize(None),
        quantity_kwh=lambda d: pd.to_numeric(d["quantity_kwh"], errors="coerce")
    )[["price_area","consumption_group","year","start_time","quantity_kwh"]]
)

cons_schema = T.StructType([
    T.StructField("price_area",         T.StringType(),  False),
    T.StructField("consumption_group",  T.StringType(),  False),
    T.StructField("year",               T.IntegerType(), False),
    T.StructField("start_time",         T.TimestampType(), False),
    T.StructField("quantity_kwh",       T.DoubleType(),  False),
])

cons_sdf = spark.createDataFrame(cons_pdf.dropna(), schema=cons_schema)
cons_sdf.printSchema()
cons_sdf.show(3, truncate=False)


Py4JJavaError: An error occurred while calling o37.applySchemaToPythonRDD.
: java.lang.NoSuchMethodError: 'scala.collection.immutable.Seq org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.expressions()'
	at org.apache.spark.sql.cassandra.CassandraMetaDataRule$.findMetadataExpressions(CassandraMetadataFunctions.scala:191)
	at org.apache.spark.sql.cassandra.CassandraMetaDataRule$$anonfun$apply$1.applyOrElse(CassandraMetadataFunctions.scala:196)
	at org.apache.spark.sql.cassandra.CassandraMetaDataRule$$anonfun$apply$1.applyOrElse(CassandraMetadataFunctions.scala:195)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:405)
	at org.apache.spark.sql.cassandra.CassandraMetaDataRule$.apply(CassandraMetadataFunctions.scala:195)
	at org.apache.spark.sql.cassandra.CassandraMetaDataRule$.apply(CassandraMetadataFunctions.scala:102)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:226)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:222)
	at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:222)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:77)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:77)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
	at org.apache.spark.sql.SparkSession.internalCreateDataFrame(SparkSession.scala:572)
	at org.apache.spark.sql.SparkSession.applySchemaToPythonRDD(SparkSession.scala:877)
	at org.apache.spark.sql.SparkSession.applySchemaToPythonRDD(SparkSession.scala:862)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [None]:
# PRODUCTION (append after our 2021 data)
(prod_sdf.write
 .format("org.apache.spark.sql.cassandra")
 .options(keyspace="ind320", table="production_mba_hour")
 .mode("append")
 .save()
)

# CONSUMPTION (new table created: full 2021–2024)
(cons_sdf.write
 .format("org.apache.spark.sql.cassandra")
 .options(keyspace="ind320", table="consumption_mba_hour")
 .mode("append")
 .save()
)
print("Cassandra writes done.")