# IND320 Project Work

 - **Github Link :** https://github.com/Mobashra/M-Abeer-Project
 - **Streamlit Link :** https://m-abeer-project.streamlit.app/



### Project Log

For this project, I worked with a weather dataset (`open-meteo-subset.csv`) and combined Jupyter Notebook analysis with a Streamlit interactive application.  

To maintain reproducibility, I created a virtual environment called **D2D_env**, installing key libraries such as `pandas`, `plotly`, `scikit-learn`, and `streamlit`. This setup allowed me to keep the workflow organized and isolated from other projects.  

In the ***Jupyter Notebook***, I processed the dataset by converting the `time` column to `pandas` datetime format for time-series analysis. Then, I used `plotly` to visualize the variables such as temperature, precipitation, wind speed, and wind direction over time. Since the variables had very different scales, I first attempted a multi-axis plot. However, to allow comparison on a single Y-axis, I applied **Min-Max normalization** from `scikit-learn` to rescale all values between 0 and 1. This provided a clearer view of how the parameters varied relative to each other.  

In the ***Streamlit App***, I built a simple multi-page dashboard to make the weather dataset interactive and easier to explore. 
- *Page 1:* An introduction and description of the dataset using styled text.
- *Page 2:* A mini trend chart of January, so users could quickly see how things changed over the month.
- *Page 3:* An interactive plot where users can choose a range of months and select which variable to graph. The plot updates automatically, making it more engaging compared to static notebook graphs.
- *Page 4:* For the last page, I included a fun element just to make the app a bit more personal and light-hearted.

 

 


### AI Usage

I leveraged ChatGPT to assist with **styling and formatting** in both Jupyter Notebook and the Streamlit app. When plotting multiple Y-axis variables on a single graph, I normalized the data using **MinMaxScaler**, with guidance from ChatGPT. Since **[Plotly](https://plotly.com/python/)** was new to me, I referred to both its official documentation and ChatGPT for implementation.

For the Streamlit app, most tasks were completed using the official **[Streamlit documentation](https://docs.streamlit.io/)**, while AI support was primarily used for text formatting and styling using Markdown.


In [1]:
import requests
import pandas as pd
from datetime import datetime, timedelta
import time
from typing import List, Dict
from zoneinfo import ZoneInfo
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum as spark_sum
import matplotlib.pyplot as plt
from pyspark.sql.functions import month
import seaborn as sns
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
from pymongo import MongoClient
import json
from dotenv import load_dotenv
import os
from pymongo import MongoClient
import json
import pandas as pd



In [None]:
import requests

response = requests.get('https://api.elhub.no/energy-data/v0/price-areas?dataset=PRODUCTION_PER_GROUP_MBA_HOUR&startDate=2021-01-01T00%3A00%3A00%2B02%3A00&endDate=2021-01-02T03%3A00%3A00%2B02%3A00')

if response.status_code == 200:
    print(response.json())
else:
    print('Error:', response.status_code)

In [None]:


# Initialize Spark session with Cassandra connector
spark = SparkSession.builder \
    .appName("SparkCassandraWriteApp") \
    .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.5.1") \
    .config("spark.cassandra.connection.host", "localhost") \
    .config("spark.cassandra.connection.port", "9042") \
    .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions") \
    .config("spark.sql.catalog.mycatalog", "com.datastax.spark.connector.datasource.CassandraCatalog") \
    .config("spark.jars", "mongo-spark-connector_2.12-10.1.1.jar") \
    .config("spark.mongodb.output.uri", "mongodb+srv://moabe2274:147014@320Project.mongodb.net/energy_data.production_data") \
    .getOrCreate()
    
    





25/10/23 01:53:19 WARN Utils: Your hostname, Mobashras-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.20.3.77 instead (on interface en0)
25/10/23 01:53:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/mobashraabeer/.ivy2/cache
The jars for the packages stored in: /Users/mobashraabeer/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-964381a0-e628-4787-9c81-00fab1ef201b;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;10.1.1 in central


:: loading settings :: url = jar:file:/Users/mobashraabeer/miniconda3/envs/D2D_env/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.mongodb#mongodb-driver-sync;4.8.2 in central
	[4.8.2] org.mongodb#mongodb-driver-sync;[4.8.1,4.8.99)
	found org.mongodb#bson;4.8.2 in central
	found org.mongodb#mongodb-driver-core;4.8.2 in central
	found org.mongodb#bson-record-codec;4.8.2 in central
:: resolution report :: resolve 743ms :: artifacts dl 4ms
	:: modules in use:
	org.mongodb#bson;4.8.2 from central in [default]
	org.mongodb#bson-record-codec;4.8.2 from central in [default]
	org.mongodb#mongodb-driver-core;4.8.2 from central in [default]
	org.mongodb#mongodb-driver-sync;4.8.2 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;10.1.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   5   |   1   |   0   |   0   |

In [3]:
def fetch_elhub_production_data(start_date: str, end_date: str) -> List[Dict]:
    base_url = "https://api.elhub.no/energy-data/v0/price-areas"

    params = {
        'dataset': 'PRODUCTION_PER_GROUP_MBA_HOUR',
        'startDate': start_date,
        'endDate': end_date
    }

    try:
        response = requests.get(base_url, params=params, timeout=30)
        response.raise_for_status()
        data = response.json()

        all_production_records = []

        if 'data' in data:
            for price_area_data in data['data']:
                if 'attributes' in price_area_data and 'productionPerGroupMbaHour' in price_area_data['attributes']:
                    production_records = price_area_data['attributes']['productionPerGroupMbaHour']
                    all_production_records.extend(production_records)

        if not all_production_records:
            print(f"Warning: No production data found for {start_date} to {end_date}")

        return all_production_records

    except requests.exceptions.RequestException as e:
        print(f"Error fetching data for {start_date} to {end_date}: {e}")
        return []


def fetch_full_year_2021() -> pd.DataFrame:
    all_records = []

    start_date = datetime(2021, 1, 1, 0, 0, 0)

    for month in range(1, 13):
        month_start = datetime(2021, month, 1, 0, 0, 0)

        if month == 12:
            month_end = datetime(2022, 1, 1, 0, 0, 0)
        else:
            month_end = datetime(2021, month + 1, 1, 0, 0, 0)

        start_str = month_start.strftime('%Y-%m-%dT%H:%M:%S+01:00')
        end_str = month_end.strftime('%Y-%m-%dT%H:%M:%S+01:00')

        print(f"Fetching data for {month_start.strftime('%B %Y')}...")

        records = fetch_elhub_production_data(start_str, end_str)
        all_records.extend(records)

        print(f"  Retrieved {len(records)} records")

        time.sleep(0.5)

    df = pd.DataFrame(all_records)

    print(f"\nTotal records retrieved: {len(df)}")

    if not df.empty:
        if 'startTime' in df.columns:
            df['startTime'] = pd.to_datetime(df['startTime'], utc=True).dt.tz_convert("Europe/Oslo")
        if 'endTime' in df.columns:
            df['endTime'] = pd.to_datetime(df['endTime'], utc=True).dt.tz_convert("Europe/Oslo")
        if 'lastUpdatedTime' in df.columns:
            df['lastUpdatedTime'] = pd.to_datetime(df['lastUpdatedTime'], utc=True).dt.tz_convert("Europe/Oslo")

    return df


if __name__ == "__main__":
    print("Fetching Elhub production data for all of 2021...")
    print("=" * 60)

    df = fetch_full_year_2021()
    if not df.empty:
        print("\n" + "=" * 60)
        print("Data retrieval complete!")
        print(f"Shape: {df.shape}")
        print(f"\nColumns: {list(df.columns)}")
        print(f"\nFirst few records:")
        print(df.head())
        print(f"\nData types:")
        print(df.dtypes)
        print(f"\nPrice areas: {df['priceArea'].unique()}")
        print(f"Production groups: {df['productionGroup'].unique()}")

    else:
        print("\nNo data retrieved.")


Fetching Elhub production data for all of 2021...
Fetching data for January 2021...
  Retrieved 17856 records
Fetching data for February 2021...
  Retrieved 16128 records
Fetching data for March 2021...
  Retrieved 17832 records
Fetching data for April 2021...
  Retrieved 17280 records
Fetching data for May 2021...
  Retrieved 17856 records
Fetching data for June 2021...
  Retrieved 17976 records
Fetching data for July 2021...
  Retrieved 18600 records
Fetching data for August 2021...
  Retrieved 18600 records
Fetching data for September 2021...
  Retrieved 18000 records
Fetching data for October 2021...
  Retrieved 18625 records
Fetching data for November 2021...
  Retrieved 18000 records
Fetching data for December 2021...
  Retrieved 18600 records

Total records retrieved: 215353

Data retrieval complete!
Shape: (215353, 6)

Columns: ['endTime', 'lastUpdatedTime', 'priceArea', 'productionGroup', 'quantityKwh', 'startTime']

First few records:
                    endTime           las

In [4]:
# Convert to Spark DataFrame
spark_df = spark.createDataFrame(df)

# Show schema and few rows to check
spark_df.printSchema()
spark_df.show(5)

root
 |-- endTime: timestamp (nullable = true)
 |-- lastUpdatedTime: timestamp (nullable = true)
 |-- priceArea: string (nullable = true)
 |-- productionGroup: string (nullable = true)
 |-- quantityKwh: double (nullable = true)
 |-- startTime: timestamp (nullable = true)



25/10/23 01:54:09 WARN TaskSetManager: Stage 0 contains a task of very large size (1392 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+-------------------+-------------------+---------+---------------+-----------+-------------------+
|            endTime|    lastUpdatedTime|priceArea|productionGroup|quantityKwh|          startTime|
+-------------------+-------------------+---------+---------------+-----------+-------------------+
|2021-01-01 01:00:00|2024-12-20 10:35:40|      NO1|          hydro|  2507716.8|2021-01-01 00:00:00|
|2021-01-01 02:00:00|2024-12-20 10:35:40|      NO1|          hydro|  2494728.0|2021-01-01 01:00:00|
|2021-01-01 03:00:00|2024-12-20 10:35:40|      NO1|          hydro|  2486777.5|2021-01-01 02:00:00|
|2021-01-01 04:00:00|2024-12-20 10:35:40|      NO1|          hydro|  2461176.0|2021-01-01 03:00:00|
|2021-01-01 05:00:00|2024-12-20 10:35:40|      NO1|          hydro|  2466969.2|2021-01-01 04:00:00|
+-------------------+-------------------+---------+---------------+-----------+-------------------+
only showing top 5 rows



In [5]:
print(spark_df.columns)


['endTime', 'lastUpdatedTime', 'priceArea', 'productionGroup', 'quantityKwh', 'startTime']


In [6]:
spark_df = spark_df \
    .withColumnRenamed("priceArea", "price_area") \
    .withColumnRenamed("productionGroup", "production_group") \
    .withColumnRenamed("startTime", "start_time") \
    .withColumnRenamed("endTime", "end_time") \
    .withColumnRenamed("lastUpdatedTime", "last_updated_time") \
    .withColumnRenamed("quantityKwh", "value")


In [7]:
spark_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("append") \
    .option("keyspace", "energy_data") \
    .option("table", "production_2021") \
    .save()
print("Data inserted into Cassandra!")

Py4JJavaError: An error occurred while calling o73.save.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: org.apache.spark.sql.cassandra. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:724)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:863)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:257)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.cassandra.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 16 more


In [None]:
price_area = "NO1"  # Replace as needed

df = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .option("keyspace", "energy_data") \
    .option("table", "production_2021") \
    .load() \
    .select("price_area", "production_group", "start_time", "value") \
    .filter(f"price_area = '{price_area}'")

df.show(5)


In [None]:


agg_df = df.groupBy("production_group").agg(spark_sum("value").alias("total_quantity"))
agg_pd = agg_df.toPandas()


In [None]:


plt.figure(figsize=(8, 8))
plt.pie(agg_pd['total_quantity'], labels=agg_pd['production_group'], autopct='%1.1f%%', startangle=140)
plt.title(f'Total Production in {price_area} (Year)')
plt.show()


In [None]:


jan_df = df.filter(month("start_time") == 1)


In [None]:
jan_pd = jan_df.toPandas()


In [None]:


plt.figure(figsize=(12, 6))
sns.lineplot(data=jan_pd, x='start_time', y='value', hue='production_group')
plt.title(f'Production in {price_area} - January')
plt.xlabel('Date')
plt.ylabel('Quantity (Kwh)')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()


In [None]:
# #load_dotenv()

# # Read secrets from environment
# username = os.getenv("MONGO_USER")
# password = os.getenv("MONGO_PASS")
# cluster = os.getenv("MONGO_CLUSTER")



# uri = f"mongodb+srv://{username}:{password}@{cluster}.nj1bpxy.mongodb.net/"

# client = MongoClient(uri)

# # Example: insert a pandas DataFrame into Mongo (safe usage)
# # pandas_df = spark_df.toPandas()   # your existing code
# # data_dict = json.loads(pandas_df.to_json(orient='records'))
# # collection.insert_many(data_dict)



# # Step 1: Convert Spark DataFrame to Pandas
# pandas_df = spark_df.toPandas()

# collection = client['energy_data']['production_data']

# # Step 3: Insert the data
# data_dict = json.loads(pandas_df.to_json(orient='records'))
# collection.insert_many(data_dict)

# print("Data successfully inserted into MongoDB!")


ERROR! Session/line number was not unique in database. History logging moved to new session 20


ConfigurationError: Invalid SRV host: ac-bix83oj-shard-00-00.nj1bpxy.mongodb.net

In [9]:
# Set the write configuration
write_config = {
    "uri": "mongodb+srv://moabe2274:147014@320Project.mongodb.net/energy_data.production_data",
    "writeConcern.w": "majority"
}


In [10]:
# Write directly to MongoDB
spark_df.write \
    .format("mongo") \
    .mode("append") \
    .options(**write_config) \
    .save()


Py4JJavaError: An error occurred while calling o80.save.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: mongo. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:724)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:863)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:257)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.ClassNotFoundException: mongo.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 16 more


In [None]:
spark_df.write \
    .format("mongo") \
    .mode("append") \
    .option("uri", "mongodb+srv://moabe2274:147014@320project.nj1bpxy.mongodb.net/energy_data.production_data") \
    .save()


In [11]:
spark_df.write.format("mongo").mode("overwrite").save()


Py4JJavaError: An error occurred while calling o85.save.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: mongo. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:724)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:863)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:257)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.ClassNotFoundException: mongo.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 16 more


In [None]:
import pyspark
print(pyspark.__version__)
