# Assignment 2 Cassandra and Spark
## Local database: Cassandra

Running the cassandra_db, check if it's running:

`docker start cassandra_db`
`docker ps` to check if its running

In [None]:
from cassandra.cluster import Cluster

cluster = Cluster(['0.0.0.0'], port = 9042)




## 1) Fetch Elhub PRODUCTION_PER_GROUP_MBA_HOUR for 2021

The API returns JSON:API formatted responses. We'll request the `price-areas` entity with dataset `PRODUCTION_PER_GROUP_MBA_HOUR` and extract any `productionPerGroupMbaHour` lists found in the response. The code below is defensive and follows `links.next` for pagination if present.

Note about timezones and DST: Elhub timestamps are ISO strings â€” we'll parse them with pandas and convert to `Europe/Oslo` which handles summer/winter transitions. If timestamps are timezone-naive, we'll assume UTC and convert; adjust if your data includes local offsets.

In [None]:
import requests
import time
import pandas as pd
from urllib.parse import urljoin

BASE = 'https://api.elhub.no/energy-data/v0/price-areas'
DATASET = 'PRODUCTION_PER_GROUP_MBA_HOUR'
# full year 2021 (ISO8601). Use UTC times or dataset-local times depending on API behaviour
START = '2021-01-01T00:00:00Z'
END = '2021-12-31T23:00:00Z'

def fetch_production_per_group(start=START, end=END, page_size=1000, sleep_between=0.2):
    params = {'dataset': DATASET, 'startTime': start, 'endTime': end, 'page[size]': page_size}
    url = BASE
    all_items = []
    while True:
        print('Requesting', url, 'params=', params)
        r = requests.get(url, params=params, timeout=60)
        r.raise_for_status()
        j = r.json()
        # collect any top-level lists named productionPerGroupMbaHour
        if isinstance(j, dict):
            if 'productionPerGroupMbaHour' in j and isinstance(j['productionPerGroupMbaHour'], list):
                all_items.extend(j['productionPerGroupMbaHour'])
            # JSON:API style: data[].attributes.productionPerGroupMbaHour
            for di in j.get('data', []):
                attrs = di.get('attributes', {}) if isinstance(di, dict) else {}
                if 'productionPerGroupMbaHour' in attrs and isinstance(attrs['productionPerGroupMbaHour'], list):
                    all_items.extend(attrs['productionPerGroupMbaHour'])
        # pagination via links.next
        next_link = None
        if isinstance(j, dict):
            links = j.get('links', {})
            next_link = links.get('next') if isinstance(links, dict) else None
        if next_link:
            # follow absolute next link
            url = next_link
            params = {}
            time.sleep(sleep_between)
            continue
        break
    return all_items

# Fetch (this may take a while for full year) - consider splitting into smaller ranges if needed
items = fetch_production_per_group()
len(items)

## 2) Normalize to DataFrame and parse times

We'll convert the list of objects to a pandas DataFrame. Typical object fields include: `priceArea`, `productionGroup`, `startTime`, and `quantityKwh`. We'll parse `startTime` to timezone-aware datetimes in Europe/Oslo. If values are missing or the API uses other key names, adjust the normalization accordingly.

In [None]:
import pytz
from dateutil import parser as date_parser

# Defensive normalization: create a DataFrame from list of dicts
df = pd.json_normalize(items)
print('Columns found:', df.columns.tolist())
# Typical columns - try to standardize
possible_cols = {
    'priceArea': ['priceArea', 'price_area', 'priceAreaCode'],
    'productionGroup': ['productionGroup', 'production_group', 'productionGroupCode'],
    'startTime': ['startTime', 'start_time', 'time'],
    'quantityKwh': ['quantityKwh', 'quantity_kwh', 'quantity']
}

def pick_col(df, options):
    for o in options:
        if o in df.columns:
            return o
    return None
cols = {k: pick_col(df, v) for k,v in possible_cols.items()}
print('Mapped columns:', cols)
# Keep only available columns
df_small = df[[c for c in cols.values() if c is not None]].rename(columns={v:k for k,v in cols.items() if v is not None})
# parse startTime
if 'startTime' in df_small.columns:
    df_small['startTime'] = pd.to_datetime(df_small['startTime'], utc=True, errors='coerce')
    # convert to Europe/Oslo
    df_small['startTime_local'] = df_small['startTime'].dt.tz_convert('Europe/Oslo')
else:
    print('startTime column not present; check API response schema')
# ensure quantity numeric
if 'quantityKwh' in df_small.columns:
    df_small['quantityKwh'] = pd.to_numeric(df_small['quantityKwh'], errors='coerce')
df_small.head()

## 3) Write to Cassandra using Spark

We create a Spark session configured with the Spark Cassandra connector jar. The exact connector coordinates depend on your Spark/Scala version. The example below uses a commonly used artifact; update the package string if you use different Spark/Scala versions. We also create the keyspace and table (if not exists) using the cassandra-driver before writing.

In [None]:
from pyspark.sql import SparkSession
from cassandra.cluster import Cluster
import math

# Spark + Cassandra connector - update version to match your Spark/Scala setup
SPARK_PACKAGES = 'com.datastax.spark:spark-cassandra-connector_2.12:3.1.0'
spark = SparkSession.builder \
.appName('ElhubToCassandra') \
.config('spark.master', 'local[*]') \
.config('spark.jars.packages', SPARK_PACKAGES) \
.config('spark.cassandra.connection.host', '127.0.0.1') \
.getOrCreate()

print('Spark started:', spark)
# Create keyspace and table if not exists (Cassandra must be reachable)
KEYSPACE = 'elhubb'
TABLE = 'production_per_group'
if session is not None:
    try:
        session.execute(f"CREATE KEYSPACE IF NOT EXISTS {KEYSPACE} WITH replication = {'{'}'class':'SimpleStrategy','replication_factor':1{'}'}")
        session.execute(f"CREATE TABLE IF NOT EXISTS {KEYSPACE}.{TABLE} (priceArea text, startTime timestamp, productionGroup text, quantityKwh double, PRIMARY KEY ((priceArea), startTime, productionGroup))")
        print('Keyspace/table ensured')
    except Exception as e:
        print('Could not create keyspace/table:', e)
else:
    print('No Cassandra session available; ensure Cassandra is running and reachable on 127.0.0.1:9042')

# Convert pandas DataFrame to Spark DataFrame and write to Cassandra
# Use df_small from earlier; ensure startTime is a timezone-aware datetime in UTC for Cassandra timestamp column
if 'df_small' in globals() and len(df_small):
    write_df = df_small.copy()
    # Cassandra timestamp type expects naive UTC datetimes or pandas.Timestamp with tzinfo removed but representing UTC; convert accordingly
    if 'startTime' in write_df.columns:
        write_df['startTime'] = write_df['startTime'].dt.tz_convert('UTC').dt.tz_localize(None)
    # Ensure column types
    write_df['quantityKwh'] = pd.to_numeric(write_df['quantityKwh'], errors='coerce')
    sdf = spark.createDataFrame(write_df)
    sdf.write.format('org.apache.spark.sql.cassandra').options(keyspace=KEYSPACE, table=TABLE).mode('append').save()
    print('Wrote', sdf.count(), 'rows to Cassandra')
else:
    print('No data to write; check earlier steps')

## 4) Read selected columns from Cassandra using Spark and plot

We'll read back the columns `priceArea`, `productionGroup`, `startTime`, and `quantityKwh`. Then we'll create:

- A pie chart showing total production for the year 2021 for a chosen price area (each slice = productionGroup).
- A line plot for the first month (January 2021) for the chosen price area with separate lines per production group.

In [None]:
# Read from Cassandra
df_cass = spark.read.format('org.apache.spark.sql.cassandra').options(keyspace=KEYSPACE, table=TABLE).load()
# select and cache
df_selected = df_cass.select('priceArea','productionGroup','startTime','quantityKwh').cache()
print('Rows in table:', df_selected.count())
df_selected.printSchema()

# Convert to pandas for plotting (filter by price area first to limit data transferred)
chosen_area = 'NO1'  # change to the price area code you want
pdf = df_selected.filter(df_selected.priceArea == chosen_area).toPandas()
# ensure startTime is datetime in UTC then convert to Europe/Oslo for plotting
pdf['startTime'] = pd.to_datetime(pdf['startTime'], utc=True)
pdf['startTime_local'] = pdf['startTime'].dt.tz_convert('Europe/Oslo')

# Pie chart: total production by productionGroup for the YEAR
year_total = pdf.groupby('productionGroup', as_index=False)['quantityKwh'].sum()
import plotly.express as px
fig_pie = px.pie(year_total, names='productionGroup', values='quantityKwh', title=f'Total production 2021 - {chosen_area}')
fig_pie.show()

# Line plot: January 2021 - prepare data
jan = pdf[(pdf['startTime_local'] >= '2021-01-01') & (pdf['startTime_local'] < '2021-02-01')].copy()
if jan.empty:
    print('No January data found for', chosen_area)
else:
    # Pivot so each productionGroup is a column; index by local time for correct DST handling
    pivot = jan.pivot_table(index='startTime_local', columns='productionGroup', values='quantityKwh', aggfunc='sum').fillna(0)
    fig_line = px.line(pivot.reset_index(), x='startTime_local', y=pivot.columns, title=f'January 2021 production - {chosen_area}')
    fig_line.update_xaxes(rangeslider_visible=True)
    fig_line.show()

## Notes and troubleshooting

- If the API enforces smaller time ranges per request, split the year into monthly ranges and fetch each separately (call `fetch_production_per_group` per-month).
- If timestamps are returned without timezone, verify with a small sample and adjust assumptions (we assumed UTC).
- Spark Cassandra connector coordinate (`SPARK_PACKAGES`) must match your Spark/Scala version. If you get `NoClassDefFoundError` for the connector, install the compatible connector jar or use the correct `spark.jars.packages` string.
- On Windows, running Spark with the connector may require extra steps or running Spark via WSL or a Linux container. Alternatively, write to Cassandra using the `cassandra-driver` in Python in batches as a fallback.

If you'd like, I can split the year into monthly fetches in the notebook (recommended for reliability) and provide a small helper that writes directly to Cassandra using the Python driver if Spark setup causes issues.