# IND320 ‚Äì Project Part 2 : Elhub Data Analysis
## *Welcome to my Project work, part 2 - Data Sources*
### Author : Jules Sylvain MUAMBA MVELE
### Description :
This notebook demonstrates the Elhub data analysis workflow for 2021.
It includes :
- Data ingestion from Elhub (CSV or API)
- Storage in Cassandra and MongoDB
- Visualizations (pie chart and line chart)
- Documentation of AI assistance and work log


## Part 1 : Diagnostic and Troubleshooting ‚Äì Spark & Cassandra Configuration on Windows

During the setup of the Spark + Cassandra environment, several technical issues were encountered.
This section documents the problems, the attempted fixes, and the final working configuration, it's not working anyway, i'm open to any help.
I tried a lot of things for days, hope it's clear enough to understand (even if it's not clear for me anymore haha).

### Initial Context

The goal was to run a Jupyter (Python) notebook capable of:

Starting a local Spark 3.5.1 session

Connecting to Cassandra 5.0 running in a Docker container

Using the connector spark-cassandra-connector_2.12-3.5.1.jar

System setup: Windows 11, Anaconda, environment IND320_env, Java JDK 17 (Temurin).

### Problems Encountered

#### **1. Error ModuleNotFoundError: No module named 'pyspark'**

- Cause: The PySpark package was not installed in the conda environment.
- Solution: "pip install pyspark"


#### **2. Error PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number**

- Error appeared when launching Spark: "from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Test").getOrCreate()"
- Analysis:

Communication issue between Python and the JVM

Missing environment variables: JAVA_HOME, HADOOP_HOME, SPARK_HOME

Missing winutils on Windows
- Complete Solution:

Manually install Hadoop winutils (v3.3.1) from the kontext-tech/winutils repository

Download and extract Spark 3.5.1 prebuilt for Hadoop 3 into C:\spark\spark-3.5.1-bin-hadoop3

Set the environment variables: 
```
import os

os.environ["SPARK_HOME"] = r"C:\spark\spark-3.5.1-bin-hadoop3"
os.environ["HADOOP_HOME"] = r"C:\Hadoop\hadoop-3.3.1"
os.environ["JAVA_HOME"] = r"C:\Program Files\Eclipse Adoptium\jdk-17.0.13.11-hotspot"
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
os.environ["JAVA_TOOL_OPTIONS"] = "-Djava.net.preferIPv4Stack=true"

```

- Test with: "%SPARK_HOME%\bin\spark-submit.cmd --version"
- Result : "Spark 3.5.1 runs successfully with Scala 2.12.18 and Java 17."

#### **3. Error TypeError: 'JavaPackage' object is not callable**

- Cause: Spark environment not properly initialized or incorrect path.
- Solution: Explicitly define SPARK_HOME and use findspark for diagnostics.

#### **4. Hadoop-related error: /tmp not accessible**

- Cause: Spark on Windows requires a valid /tmp directory with proper permissions.
- Solution:
```
import os, subprocess
os.makedirs(r"C:\tmp", exist_ok=True)
subprocess.run(r'C:\Hadoop\hadoop-3.3.1\bin\winutils.exe chmod 777 /tmp', shell=True)

```

- Spark starts without warnings.

#### **5. Spark‚ÄìCassandra Connector Compatibility Error**

- Cause: The connector was not automatically downloaded.
- Solution: Manually download et and configure explicitly::
```
spark-cassandra-connector_2.12-3.5.1.jar
.config('spark.jars', r'C:\...\spark-cassandra-connector_2.12-3.5.1.jar')

```

#### **Lessons Learned**

On Windows, Spark requires Java + Hadoop (winutils) + IPv4 configuration.

The JAVA_GATEWAY_EXITED error is almost always caused by incorrect environment settings.

Testing Spark via spark-submit.cmd --version before using PySpark in Jupyter helps isolate configuration issues.

Version compatibility between Spark (3.5.1), Scala (2.12), and the Cassandra connector is essential.

_

### Final Config tried

In [1]:
import os

# --- Spark and Cassandra paths ---
os.environ["SPARK_HOME"] = r"C:\spark\spark-3.5.1-bin-hadoop3"
jar_path = r"C:\Users\muamb\Desktop\ESILV\2025-2026\NMBU\Cours\IND320_DataToDecision\spark-cassandra-connector_2.12-3.5.1.jar"

# --- Force Spark to include the JAR in the classpath ---
os.environ["PYSPARK_SUBMIT_ARGS"] = f'--jars "{jar_path}" pyspark-shell'

# --- Java and Hadoop ---
os.environ["JAVA_HOME"] = r"C:\Program Files\Eclipse Adoptium\jdk-17.0.13.11-hotspot"
os.environ["HADOOP_HOME"] = r"C:\Hadoop\hadoop-3.3.1"
os.environ["PATH"] = (
    os.environ["JAVA_HOME"] + r"\bin;"
    + os.environ["HADOOP_HOME"] + r"\bin;"
    + os.environ["SPARK_HOME"] + r"\bin;"
    + os.environ["PATH"]
)

# --- IPv4 / Python configuration ---
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
os.environ["JAVA_TOOL_OPTIONS"] = "-Djava.net.preferIPv4Stack=true"
os.environ["PYSPARK_PYTHON"] = "python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python"

print("Spark environment + Cassandra ready.")


Spark environment + Cassandra ready.


In [2]:
import os, subprocess
os.makedirs(r"C:\tmp", exist_ok=True)
subprocess.run(r'C:\Hadoop\hadoop-3.3.1\bin\winutils.exe chmod 777 /tmp', shell=True, check=False)
print("winutils chmod done")


winutils chmod done


import os, findspark
# s'assure que SPARK_HOME est bien connu du notebook
os.environ["SPARK_HOME"] = r"C:\spark\spark-3.5.1-bin-hadoop3"
findspark.init(os.environ["SPARK_HOME"])  # <-- cl√©
print("findspark OK ->", findspark.find())


import os, sys, findspark

# --- Local paths ---
SPARK_HOME  = r"C:\spark\spark-3.5.1-bin-hadoop3"
JAVA_HOME   = r"C:\Program Files\Eclipse Adoptium\jdk-17.0.13.11-hotspot"
HADOOP_HOME = r"C:\Hadoop\hadoop-3.3.1"

# Cassandra JARs (assembly + driver)
jar_main   = r"C:\Users\muamb\Desktop\ESILV\2025-2026\NMBU\Cours\IND320_DataToDecision\spark-cassandra-connector-assembly_2.12-3.5.1.jar"
jar_driver = r"C:\Users\muamb\Desktop\ESILV\2025-2026\NMBU\Cours\IND320_DataToDecision\spark-cassandra-connector-driver_2.12-3.5.1.jar"
all_jars   = f"{jar_main},{jar_driver}"

# --- Environment variables for Spark / Java / Hadoop / Python ---
os.environ["SPARK_HOME"]  = SPARK_HOME
os.environ["JAVA_HOME"]   = JAVA_HOME
os.environ["HADOOP_HOME"] = HADOOP_HOME
os.environ["PATH"] = (
    SPARK_HOME + r"\bin;"
    + HADOOP_HOME + r"\bin;"
    + JAVA_HOME + r"\bin;"
    + os.environ["PATH"]
)

# Force PySpark to use the current Python interpreter (important on Windows)
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

# Network configuration (Windows)
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
os.environ["JAVA_TOOL_OPTIONS"] = "-Djava.net.preferIPv4Stack=true"

# Load JARs on both driver and executors (also through submit args)
os.environ["PYSPARK_SUBMIT_ARGS"] = f'--jars "{all_jars}" pyspark-shell'

# Initialize Spark with findspark
findspark.init(SPARK_HOME)

from pyspark.sql import SparkSession

# --- Create Spark Session ---
spark = (
    SparkSession.builder
    .master("local[1]")  # More stable on Windows
    .appName("SparkCassandraApp")
    .config("spark.cassandra.connection.host", "localhost")  # Docker Desktop ‚Üí Cassandra
    .config("spark.cassandra.connection.port", "9042")
    .config("spark.sql.execution.arrow.pyspark.enabled", "false")
    .config("spark.python.worker.reuse", "false")
    .config("spark.driver.extraClassPath", all_jars)
    .config("spark.executor.extraClassPath", all_jars)
    .getOrCreate()
)

print("‚úÖ Spark initialized ‚Äî version:", spark.version)
print("‚úÖ Spark JARs:", spark.sparkContext.getConf().get("spark.jars"))


print("JARs used :", spark.sparkContext._conf.get("spark.jars"))


In [3]:
import socket
print(socket.gethostbyname("localhost"))


127.0.0.1


In [4]:
import socket
s = socket.socket()
s.settimeout(2)
try:
    s.connect(("localhost", 9042))
    print("Cassandra is on 9042")
except Exception as e:
    print(" Cassandra not avaible", e)
s.close()


Cassandra is on 9042


import os

os.environ["PYSPARK_PYTHON"] = "python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python"
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
os.environ["JAVA_TOOL_OPTIONS"] = "-Djava.net.preferIPv4Stack=true"
os.environ["HADOOP_HOME"] = r"C:\Hadoop\hadoop-3.3.1"
os.environ["PATH"] = os.environ["HADOOP_HOME"] + r"\bin;" + os.environ["PATH"]


import os, sys
from pyspark.sql import SparkSession

PYTHON_EXE = r"C:\Users\muamb\anaconda3\envs\IND320_env\python.exe"
os.environ["PYSPARK_PYTHON"] = PYTHON_EXE
os.environ["PYSPARK_DRIVER_PYTHON"] = PYTHON_EXE
for bad in ("PYTHONHOME", "PYTHONPATH"):
    os.environ.pop(bad, None)
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
os.environ["JAVA_TOOL_OPTIONS"] = "-Djava.net.preferIPv4Stack=true"

spark = (SparkSession.builder
    .master("local[1]")
    .appName("worker-smoke-test")
    .config("spark.sql.execution.arrow.pyspark.enabled", "false")
    .getOrCreate())

print("‚úÖ Spark:", spark.version)
print("‚úÖ Python:", sys.executable)
print("‚û°Ô∏è Test workers =", spark.sparkContext.parallelize(range(10)).count())


### **Main Errors**

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In[17], line 20
     18 print("‚úÖ Spark:", spark.version)
     19 print("‚úÖ Python:", sys.executable)
---> 20 print("‚û°Ô∏è Test workers =", spark.sparkContext.parallelize(range(10)).count())

File C:\spark\spark-3.5.1-bin-hadoop3\python\pyspark\rdd.py:2316, in RDD.count(self)
   2295 def count(self) -> int:
   2296     """
   2297     Return the number of elements in this RDD.
   2298 
   (...)   2314     3
   2315     """
-> 2316     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()

File C:\spark\spark-3.5.1-bin-hadoop3\python\pyspark\rdd.py:2291, in RDD.sum(self)
   2270 def sum(self: "RDD[NumberOrArray]") -> "NumberOrArray":
   2271     """
   2272     Add up the elements in this RDD.
   2273 
   (...)   2289     6.0
   2290     """
-> 2291     return self.mapPartitions(lambda x: [sum(x)]).fold(  # type: ignore[return-value]
   2292         0, operator.add
   2293     )

File C:\spark\spark-3.5.1-bin-hadoop3\python\pyspark\rdd.py:2044, in RDD.fold(self, zeroValue, op)
   2039     yield acc
   2041 # collecting result of mapPartitions here ensures that the copy of
   2042 # zeroValue provided to each partition is unique from the one provided
   2043 # to the final reduce call
-> 2044 vals = self.mapPartitions(func).collect()
   2045 return reduce(op, vals, zeroValue)

File C:\spark\spark-3.5.1-bin-hadoop3\python\pyspark\rdd.py:1833, in RDD.collect(self)
   1831 with SCCallSiteSync(self.context):
   1832     assert self.ctx._jvm is not None
-> 1833     sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
   1834 return list(_load_from_socket(sock_info, self._jrdd_deserializer))

File C:\spark\spark-3.5.1-bin-hadoop3\python\lib\py4j-0.10.9.7-src.zip\py4j\java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File C:\spark\spark-3.5.1-bin-hadoop3\python\pyspark\errors\exceptions\captured.py:179, in capture_sql_exception.<locals>.deco(*a, **kw)
    177 def deco(*a: Any, **kw: Any) -> Any:
    178     try:
--> 179         return f(*a, **kw)
    180     except Py4JJavaError as e:
    181         converted = convert_exception(e.java_exception)

File C:\spark\spark-3.5.1-bin-hadoop3\python\lib\py4j-0.10.9.7-src.zip\py4j\protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 1.0 failed 1 times, most recent failure: Lost task 9.0 in stage 1.0 (TID 21) (kubernetes.docker.internal executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:789)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1049)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.io.EOFException
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:386)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:774)

In [5]:
import sys, platform
print("Python:", sys.executable)
print("Version:", platform.python_version())


Python: C:\Users\muamb\anaconda3\envs\IND320_py311\python.exe
Version: 3.11.14


#

#

## Part 2 : Work without Spark

### Install all the dependencies

In [6]:
!pip install pandas cassandra-driver pymongo requests matplotlib plotly tqdm python-dotenv




### Importations

In [7]:
import pandas as pd
import matplotlib.pyplot as plt
from cassandra.cluster import Cluster
from pymongo import MongoClient
from tqdm import tqdm

print("‚úÖ All modules imported successfully!")


‚úÖ All modules imported successfully!


### Test of connection

In [9]:
# --- Imports ---
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider  # only if your Cassandra requires authentication
from cassandra.query import SimpleStatement

# --- Connection settings ---
CASSANDRA_HOST = "localhost"
CASSANDRA_PORT = 9042


CASSANDRA_USER = None  
CASSANDRA_PASS = None   

KEYSPACE = "ind320"
TABLE    = "elhub_data_test"  # just a temporary test table


In [10]:
# --- Connect to Cassandra cluster ---
try:
    if CASSANDRA_USER and CASSANDRA_PASS:
        auth_provider = PlainTextAuthProvider(
            username=CASSANDRA_USER, password=CASSANDRA_PASS
        )
        cluster = Cluster([CASSANDRA_HOST], port=CASSANDRA_PORT, auth_provider=auth_provider)
    else:
        cluster = Cluster([CASSANDRA_HOST], port=CASSANDRA_PORT)

    session = cluster.connect()
    print("‚úÖ Connected to Cassandra")
except Exception as e:
    raise SystemExit(f"‚ùå Unable to connect to Cassandra: {e}")


‚úÖ Connected to Cassandra


In [11]:
# --- Create keyspace if it does not exist ---
session.execute(f"""
CREATE KEYSPACE IF NOT EXISTS {KEYSPACE}
WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 1}};
""")

# --- Switch to the new keyspace ---
session.set_keyspace(KEYSPACE)

# --- Create a simple test table ---
session.execute(f"""
CREATE TABLE IF NOT EXISTS {TABLE} (
    id int PRIMARY KEY,
    txt text
);
""")

print(f"‚úÖ Keyspace `{KEYSPACE}` and table `{TABLE}` are ready.")


‚úÖ Keyspace `ind320` and table `elhub_data_test` are ready.


In [13]:
# Sanity check 
# --- Read without ORDER BY and sort in Python ---
rows = session.execute(f"SELECT id, txt FROM {TABLE};")
rows_sorted = sorted(rows, key=lambda r: r.id)  # sort locally in Python

print("üìÑ Rows in the test table (sorted client-side):")
for r in rows_sorted:
    print(r)


üìÑ Rows in the test table (sorted client-side):
Row(id=1, txt='hello')
Row(id=2, txt='world')
Row(id=3, txt='ind320')


## Load & clean (CSV ‚Üí DataFrame pandas)

### Load the CSV, inspect it, keep only the useful columns

In [19]:
import pandas as pd
from pathlib import Path

# --- File path (adapt if you move the CSV) ---
CSV_PATH = Path(r"C:\Users\muamb\Desktop\ESILV\2025-2026\NMBU\Cours\IND320_DataToDecision\IND320_DataTo_Decision\data\elhub_data.csv")

# --- Columns we care about for the assignment ---
USECOLS = ["START_TIME", "PRICE_AREA", "PRODUCTION_GROUP", "QUANTITY_KWH"]

# --- Read CSV efficiently: only needed columns, keep strings initially ---
df_raw = pd.read_csv(
    CSV_PATH,
    usecols=USECOLS,
    dtype={
        "START_TIME": "string",
        "PRICE_AREA": "string",
        "PRODUCTION_GROUP": "string",
        "QUANTITY_KWH": "float64"
    }
)

print("‚úÖ Loaded CSV with shape:", df_raw.shape)
df_raw.head()


‚úÖ Loaded CSV with shape: (661344, 4)


Unnamed: 0,START_TIME,PRICE_AREA,PRODUCTION_GROUP,QUANTITY_KWH
0,2022-10-24T00:00:00.000+02:00,NO1,hydro,1518343.114
1,2022-10-24T01:00:00.000+02:00,NO1,hydro,1508836.767
2,2022-10-24T02:00:00.000+02:00,NO1,hydro,1495758.356
3,2022-10-24T03:00:00.000+02:00,NO1,hydro,1491274.714
4,2022-10-24T04:00:00.000+02:00,NO1,hydro,1496936.723


### Minimal cleaning + time parsing (handle DST ‚Üí store in UTC)

In [20]:
# --- Basic column rename to a consistent snake_case schema ---
df = df_raw.rename(columns={
    "START_TIME": "start_time",
    "PRICE_AREA": "price_area",
    "PRODUCTION_GROUP": "production_group",
    "QUANTITY_KWH": "quantity_kwh"
}).copy()

# --- Strip whitespace just in case ---
df["price_area"] = df["price_area"].str.strip()
df["production_group"] = df["production_group"].str.strip()

# --- Parse ISO-8601 timestamps (they contain timezone offsets) to UTC ---
# Note: errors='coerce' will produce NaT for bad rows; we'll drop those later.
df["start_time"] = pd.to_datetime(df["start_time"], utc=True, errors="coerce")

# --- Basic integrity checks ---
n_before = len(df)
df = df.dropna(subset=["start_time", "price_area", "production_group", "quantity_kwh"])
n_after = len(df)
print(f"Dropped {n_before - n_after} rows with missing critical fields.")

# --- Ensure proper dtypes ---
df["price_area"] = df["price_area"].astype("string")
df["production_group"] = df["production_group"].astype("string")
df["quantity_kwh"] = df["quantity_kwh"].astype("float64")

# --- Optional: remove obvious duplicates if any (same key/hour) ---
dedup_cols = ["price_area", "production_group", "start_time"]
n_before = len(df)
df = df.drop_duplicates(subset=dedup_cols, keep="last")
print(f"Removed {n_before - len(df)} duplicated hourly rows on key {dedup_cols}.")

print("‚úÖ Clean DataFrame shape:", df.shape)
df.head()


Dropped 0 rows with missing critical fields.
Removed 0 duplicated hourly rows on key ['price_area', 'production_group', 'start_time'].
‚úÖ Clean DataFrame shape: (661344, 4)


Unnamed: 0,start_time,price_area,production_group,quantity_kwh
0,2022-10-23 22:00:00+00:00,NO1,hydro,1518343.114
1,2022-10-23 23:00:00+00:00,NO1,hydro,1508836.767
2,2022-10-24 00:00:00+00:00,NO1,hydro,1495758.356
3,2022-10-24 01:00:00+00:00,NO1,hydro,1491274.714
4,2022-10-24 02:00:00+00:00,NO1,hydro,1496936.723


In [21]:
# --- Quick sanity stats for documentation/log ---
print("Price areas:", sorted(df["price_area"].dropna().unique().tolist()))
print("Production groups:", sorted(df["production_group"].dropna().unique().tolist()))
print("Date range (UTC):", df["start_time"].min(), "‚Üí", df["start_time"].max())

# --- Should be hourly frequency per area/group in 2021; quick check of counts ---
counts = df.groupby(["price_area", "production_group"]).size().reset_index(name="n_rows")
counts.sort_values("n_rows", ascending=False).head(10)


Price areas: ['NO1', 'NO2', 'NO3', 'NO4', 'NO5']
Production groups: ['*', 'hydro', 'other', 'solar', 'thermal', 'wind']
Date range (UTC): 2022-10-23 22:00:00+00:00 ‚Üí 2025-10-23 21:00:00+00:00


Unnamed: 0,price_area,production_group,n_rows
0,NO1,hydro,26304
1,NO1,other,26304
2,NO1,solar,26304
3,NO1,thermal,26304
4,NO1,wind,26304
6,NO2,hydro,26304
7,NO2,other,26304
9,NO2,thermal,26304
8,NO2,solar,26304
10,NO2,wind,26304


### Create Cassandra table (final schema) + insert batches

#### Creating the final table (key: (price_area, production_group), clustering: start_time)

In [23]:
# --- Create final table for Elhub hourly production ---
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

CASSANDRA_HOST = "localhost"
CASSANDRA_PORT = 9042
KEYSPACE = "ind320"
TABLE = "elhub_production_mba_hour"

cluster = Cluster([CASSANDRA_HOST], port=CASSANDRA_PORT)
session = cluster.connect()
session.set_keyspace(KEYSPACE)

# Note: clustering order on start_time for efficient time-range scans within a partition
session.execute(f"""
CREATE TABLE IF NOT EXISTS {TABLE} (
    price_area text,
    production_group text,
    start_time timestamp,
    quantity_kwh double,
    PRIMARY KEY ((price_area, production_group), start_time)
) WITH CLUSTERING ORDER BY (start_time ASC);
""")

print(f"Table `{KEYSPACE}.{TABLE}` ready.")


Table `ind320.elhub_production_mba_hour` ready.


In [26]:
# --- Optional year filter: keep only a given year if present ---
TARGET_YEAR = 2021  # change to 2022/2023/etc. or set to None for "no filter"

df_to_load = df.copy()
if TARGET_YEAR is not None:
    mask = df_to_load["start_time"].dt.year == TARGET_YEAR
    if mask.any():
        df_to_load = df_to_load[mask].copy()
        print(f"‚ÑπÔ∏è Using only rows for year {TARGET_YEAR}: {len(df_to_load)} rows.")
    else:
        print(f"‚ö†Ô∏è No rows found for year {TARGET_YEAR}. Proceeding with ALL rows: {len(df_to_load)} rows.")
else:
    print(f"‚ÑπÔ∏è No year filter. Proceeding with ALL rows: {len(df_to_load)} rows.")


‚ö†Ô∏è No rows found for year 2021. Proceeding with ALL rows: 661344 rows.


#### Remarks : I don't have any rows for 2021, but i'm sure i downloaded the good dataset, i'll try to find a solutions.

In [27]:
# --- Fast concurrent insert using PreparedStatement + execute_concurrent_with_args ---
from cassandra.query import PreparedStatement
from cassandra.concurrent import execute_concurrent_with_args
from tqdm import tqdm
import math

# Prepare the insert statement once (faster and safer)
insert_ps: PreparedStatement = session.prepare(f"""
    INSERT INTO {TABLE} (price_area, production_group, start_time, quantity_kwh)
    VALUES (?, ?, ?, ?)
""")

# Convert DataFrame rows to tuples expected by the driver
params = [
    (
        str(row["price_area"]),
        str(row["production_group"]),
        row["start_time"].to_pydatetime(),   # ensure native datetime (UTC)
        float(row["quantity_kwh"])
    )
    for _, row in df_to_load.iterrows()
]

# Tune concurrency for your machine; start modestly on Windows
CONCURRENCY = 64  # you can try 128/256 if it runs smoothly
BATCH_SIZE = 20_000  # chunk to avoid huge single job; adjust as needed

total = len(params)
print(f" Inserting {total} rows into `{KEYSPACE}.{TABLE}` (concurrency={CONCURRENCY})")

errors_total = 0
for i in tqdm(range(0, total, BATCH_SIZE), desc="Batches"):
    chunk = params[i:i+BATCH_SIZE]
    results = execute_concurrent_with_args(
        session,
        insert_ps,
        chunk,
        concurrency=CONCURRENCY,
        raise_on_first_error=False
    )
    # Count failed writes in this chunk
    chunk_errors = sum(0 if ok else 1 for ok, _ in results)
    errors_total += chunk_errors

print(f" Insert complete. Errors: {errors_total}")


üöÄ Inserting 661344 rows into `ind320.elhub_production_mba_hour` (concurrency=64)


Batches: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 34/34 [04:03<00:00,  7.16s/it]

‚úÖ Insert complete. Errors: 0





In [None]:
# --- Quick counts per area/group (rough sanity check) ---
rows = session.execute(f"""
    SELECT price_area, production_group, count(*) as n
    FROM {TABLE}
    GROUP BY price_area, production_group
    ALLOW FILTERING;
""")
# Note: ALLOW FILTERING is okay here for small ad-hoc checks; avoid in prod.

print("üîé Counts by (price_area, production_group):")
for r in rows:
    print(r)

# --- Sample read: pick one area/group and a small time window ---
sample_area = "NO1"
sample_group = "hydro"

rows = session.execute(f"""
    SELECT price_area, production_group, start_time, quantity_kwh
    FROM {TABLE}
    WHERE price_area = %s AND production_group = %s
    LIMIT 5;
""", (sample_area, sample_group))

print("\nüîé Sample rows:")
for r in rows:
    print(r)
