# Harnessing Weather Insights for Accurate Energy Load Forecasting

In [1]:
%pip install -r requirements.txt

Note: you may need to restart the kernel to use updated packages.


### Import important libraries

In [2]:
import platform
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.ml.linalg import DenseVector
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

from pathlib import Path

In [3]:
# Initialize via the full spark path
if platform.system() == 'Windows':
    print("Windows OS detected")
    findspark.init("C:/Spark/spark-3.5.4-bin-hadoop3") # For my local machine
else:
    findspark.init("/usr/local/spark/")

Windows OS detected


In [4]:
# Build the SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Linear Regression Model") \
    .config("spark.executor.memory", "16g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.jars.packages", "com.databricks:spark-xml_2.12:0.15.0") \
    .config("spark.executor.heartbeatInterval", "100s") \
    .config("spark.sql.session.timeZone", "UTC") \
    .getOrCreate()

   
# Main entry point for Spark functionality. A SparkContext represents the
# connection to a Spark cluster, and can be used to create :class:`RDD` and
# broadcast variables on that cluster.      
sc = spark.sparkContext

### Preprocessing I: Read in Weather and Load Data

In [5]:
# Read in the data

# Folder Structure
# data
# |-- geosphere
# |   |-- YYYY
# |      |-- MM.csv
# |      |-- MM.csv
# |
# |-- transparency
# |   |-- YYYY
# |      |-- MM.xml
# |      |-- MM.xml

# Loop through the geosphere folder and read in the data

# Define the base data folder
base_path = Path("./data/geosphere")

# Collect all data frames first to optimize the union operation
dfs = []

for year_folder in base_path.iterdir():
    if year_folder.is_dir():
        for month_file in year_folder.glob("*.csv"):
            print(f"Reading in {month_file}")

            df = spark.read.csv(str(month_file), header=True, inferSchema=True)

            # Convert the time column (string) to a timestamp
            df = df.withColumn("time", to_timestamp(col("time"), "yyyy-MM-dd'T'HH:mmXXX"))
            
            dfs.append(df)
            
            print(f"Read in {df.count()} rows")

if dfs:
    # Combine all DataFrames
    weather = dfs[0]
    for df in dfs[1:]:
        weather = weather.union(df)

    # Aggregate measurements (average from different stations)
    weather = (
        weather.groupBy("time")
        .agg(
            avg("rr").alias("avg_rr"),
            avg("tl").alias("avg_tl"),
            avg("p").alias("avg_p"),
            avg("so_h").alias("avg_so_h"),
            avg("ff").alias("avg_ff"),
        )
        .orderBy("time")
    )

    # Log-transform the measurements (data for rainfall and windspeed is skewed)
    weather = weather.withColumn("log_rr", log1p(col("avg_rr")))
    weather = weather.withColumn("log_ff", log1p(col("avg_ff")))
    
    weather.drop("avg_rr", "avg_ff")
    
    weather.show(10, truncate=False)
    
    print(weather.count())
    weather.printSchema()
else:
    print("No data found")


Reading in data\geosphere\2022\01.csv
Read in 8940 rows
Reading in data\geosphere\2022\02.csv
Read in 8076 rows
Reading in data\geosphere\2022\03.csv
Read in 8940 rows
Reading in data\geosphere\2022\04.csv
Read in 8652 rows
Reading in data\geosphere\2022\05.csv
Read in 8940 rows
Reading in data\geosphere\2022\06.csv
Read in 8652 rows
Reading in data\geosphere\2022\07.csv
Read in 8940 rows
Reading in data\geosphere\2022\08.csv
Read in 8940 rows
Reading in data\geosphere\2022\09.csv
Read in 8652 rows
Reading in data\geosphere\2022\10.csv
Read in 8940 rows
Reading in data\geosphere\2022\11.csv
Read in 8652 rows
Reading in data\geosphere\2022\12.csv
Read in 8940 rows
Reading in data\geosphere\2023\01.csv
Read in 8940 rows
Reading in data\geosphere\2023\02.csv
Read in 8076 rows
Reading in data\geosphere\2023\03.csv
Read in 8940 rows
Reading in data\geosphere\2023\04.csv
Read in 8652 rows
Reading in data\geosphere\2023\05.csv
Read in 8940 rows
Reading in data\geosphere\2023\06.csv
Read in 86

In [6]:
# Loop through the transparency folder and read in the energy data

# Define base path for transparency data
base_path = Path("./data/transparency")

# Collect DataFrames before performing union (optimization)
dfs = []

for year_folder in base_path.iterdir():
    if year_folder.is_dir():
        for month_file in year_folder.glob("*.xml"):
            print(f"Reading transparency data: {month_file}")

            # Read XML data
            df = spark.read.format('xml').option("rowTag", "GL_MarketDocument").load(str(month_file))

            # Extract and explode relevant fields
            df_filtered = df.select(
                col("TimeSeries.Period.timeInterval.start").alias("start_time"),
                col("TimeSeries.Period.timeInterval.end").alias("end_time"),
                col("TimeSeries.Period.resolution").alias("resolution"),
                explode(col("TimeSeries.Period.Point")).alias("Point")  # Flatten Points
            ).select(
                col("start_time"),
                col("end_time"),
                col("resolution"),
                col("Point.position").cast("int").alias("position"),
                col("Point.quantity").cast("double").alias("quantity")
            )

            # Convert ISO 8601 duration (e.g., "PT15M") to minutes dynamically
            df_fixed = df_filtered.withColumn(
                "interval_minutes",
                expr("CAST(SUBSTRING(resolution, 3, LENGTH(resolution) - 3) AS INT)")  # Extracts "15" from "PT15M"
            ).withColumn(
                "actual_time",
                expr("start_time + (position - 1) * interval_minutes * interval 1 minute")
            ).select(
                col("actual_time"),
                col("quantity")
            )
            
            # Aggregate to hourly intervals
            df_hourly = df_fixed.withColumn(
                "hourly_time", date_trunc("hour", col("actual_time"))  # Round down to the hour
            ).groupBy("hourly_time").agg(
                sum("quantity").alias("hourly_quantity")  # Sum all sub-hourly measurements
            )

            # Rename to match the structure of other time series
            df_hourly = df_hourly.select(
                col("hourly_time").alias("timestamp"), col("hourly_quantity").alias("quantity")
            ).orderBy("timestamp")
            
            print(f"Read in {df_hourly.count()} values")
            
            # Append DataFrame to list
            dfs.append(df_hourly)

# Merge all collected DataFrames
if dfs:
    Load = dfs[0]
    for df in dfs[1:]:
        Load = Load.union(df)

    Load.show(10)
    Load.printSchema()
else:
    print("No data found.")



Reading transparency data: data\transparency\2022\01.xml
Read in 744 values
Reading transparency data: data\transparency\2022\02.xml
Read in 672 values
Reading transparency data: data\transparency\2022\03.xml
Read in 744 values
Reading transparency data: data\transparency\2022\04.xml
Read in 720 values
Reading transparency data: data\transparency\2022\05.xml
Read in 744 values
Reading transparency data: data\transparency\2022\06.xml
Read in 720 values
Reading transparency data: data\transparency\2022\07.xml
Read in 744 values
Reading transparency data: data\transparency\2022\08.xml
Read in 744 values
Reading transparency data: data\transparency\2022\09.xml
Read in 720 values
Reading transparency data: data\transparency\2022\10.xml
Read in 744 values
Reading transparency data: data\transparency\2022\11.xml
Read in 720 values
Reading transparency data: data\transparency\2022\12.xml
Read in 744 values
Reading transparency data: data\transparency\2023\01.xml
Read in 744 values
Reading tran

### Preprocessing II: Combine both Data Frames

In [20]:
if Load is not None and weather is not None:
    # Join the data into a single DataFrame
    data = Load.join(weather, Load.timestamp == weather.time, "inner").drop("time")
    
    # Rename columns for better understanding
    data = data.withColumnRenamed("timestamp", "time")
    data = data.withColumnRenamed("quantity", "load")

    data = data.withColumnRenamed("log_rr", "rainfall")
    data = data.withColumnRenamed("avg_tl", "temperature")
    data = data.withColumnRenamed("avg_p", "pressure")
    data = data.withColumnRenamed("avg_so_h", "sunshine_duration")
    data = data.withColumnRenamed("log_ff", "wind_speed")
    
    # Add Lag features
    
    # Define window for lagging (ordered by time)
    window_spec = Window().partitionBy().orderBy("time")

    # Add lag features (previous hour's values)
    data = data.withColumn("rainfall_lag_1", lag("rainfall", 1).over(window_spec))
    data = data.withColumn("wind_speed_lag_1", lag("wind_speed", 1).over(window_spec))
    
    # Add lag features (previous day's values)
    data = data.withColumn("load_lag_24", lag("load", 24).over(window_spec))
    
    # Drop rows with missing values
    data = data.dropna()
    
    # Reorder the columns
    data = data.select("time",
                        "load", 
                        "load_lag_24",
                        "rainfall", 
                        "rainfall_lag_1",
                        "temperature", 
                        "pressure", 
                        "sunshine_duration", 
                        "wind_speed", 
                        "wind_speed_lag_1")
    
    # Print the schema and stats
    data.describe().show()
    data.show(10)
    data.printSchema()

+-------+------------------+------------------+-------------------+-------------------+------------------+-----------------+-------------------+------------------+-------------------+
|summary|              load|       load_lag_24|           rainfall|     rainfall_lag_1|       temperature|         pressure|  sunshine_duration|        wind_speed|   wind_speed_lag_1|
+-------+------------------+------------------+-------------------+-------------------+------------------+-----------------+-------------------+------------------+-------------------+
|  count|             26280|             26280|              26280|              26280|             26280|            26280|              26280|             26280|              26280|
|   mean|27150.157420091324|27146.609741248096|0.08296694568704333|0.08296694568704333|11.623370716295403|971.6488151146115|0.21699808588165032|1.0623818055673144| 1.0623803538258394|
| stddev| 5199.763342043908| 5200.060296507467|0.18603561609247393| 0.1860356160

### Machine Learning

In [31]:
# Keep original timestamp before converting to Unix seconds
unix_time_data = data.withColumn("unix_time", unix_timestamp("time"))  # Store Unix time separately

# Extract day of the year (1-365/366) from the original timestamp column
unix_time_data = unix_time_data.withColumn("day_of_year", dayofyear(col("time")))

# Extract time of day in seconds (seconds since midnight)
unix_time_data = unix_time_data.withColumn(
    "time_of_day", expr("hour(time) * 3600 + minute(time) * 60 + second(time)")
)

# Normalize day of the year to range (0, 2π) for periodic encoding
ml_data = unix_time_data.withColumn("day_sin", sin(2 * 3.1416 * col("day_of_year") / 365))
ml_data = ml_data.withColumn("day_cos", cos(2 * 3.1416 * col("day_of_year") / 365))

# Normalize time of day
ml_data = ml_data.withColumn("time_sin", sin(2 * 3.1416 * col("time_of_day") / 86400))
ml_data = ml_data.withColumn("time_cos", cos(2 * 3.1416 * col("time_of_day") / 86400))

# Define new feature columns
feature_columns = ["day_sin", "day_cos", "time_sin", "time_cos", "is_weekend", "rainfall", "temperature", "pressure", "sunshine_duration", "wind_speed"]

feature_columns = [ "load_lag_24",
                    "rainfall", 
                    "rainfall_lag_1",
                    "temperature", 
                    "pressure", 
                    "sunshine_duration", 
                    "wind_speed", 
                    "wind_speed_lag_1",
                    "day_sin",
                    "day_cos",
                    "time_sin",
                    "time_cos"
                    ]


assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Transform data
df_ml = assembler.transform(ml_data).select("load", "features")

df_ml.cache()

# Show transformed data
df_ml.show(5, truncate=False)



+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|load   |features                                                                                                                                                                       |
+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|21458.0|[22394.0,0.0,0.0,2.972727272727272,980.6909090909091,0.0,0.7548407495652851,0.7548407495652851,0.034421692083641306,0.9994073979684656,0.0,1.0]                                |
|21151.0|[21804.0,0.0,0.0,2.5090909090909093,980.2636363636365,0.0,0.6370577139089018,0.7548407495652851,0.034421692083641306,0.9994073979684656,0.25881963644308476,0.9659256678396477]|
|20691.0|[20917.0,0.0,0.0,1.9909090909090912,979.8272727272725,0.0,0.5

In [32]:
scaler = StandardScaler(inputCol="features", outputCol="features_scaled")
scaled_df = scaler.fit(df_ml).transform(df_ml).select("load", "features_scaled")
scaled_df = scaled_df.withColumnRenamed("features_scaled", "features")
scaled_df.cache()

scaled_df.show(5, truncate=False)

+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|load   |features                                                                                                                                                                                 |
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|21458.0|[4.306488525727395,0.0,0.0,0.3589592808443017,124.47491573632132,0.0,2.5544669470788173,2.5544546956943335,0.04867875449108109,1.4133469539006789,0.0,1.4141850399325533]                |
|21151.0|[4.193028302891851,0.0,0.0,0.30297480585023634,124.42068383080381,0.0,2.1558757585610926,2.5544546956943335,0.04867875449108109,1.4133469539006789,0.3660196941658062,1.3659976291456903]|
|20691.0|[4.02245335

In [33]:
# Randomly splits this :class:`DataFrame` with the provided weights.
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)

train_data.take(5)

[Row(load=16055.0, features=DenseVector([3.438, 0.0, 0.0, 1.483, 123.2208, 0.0, 1.9513, 1.9513, 0.3257, -1.3762, 0.7071, 1.2247])),
 Row(load=16158.0, features=DenseVector([3.1484, 0.0, 0.0, 1.4732, 123.3464, 0.0, 2.0451, 1.8549, 0.7711, -1.1855, 0.366, 1.366])),
 Row(load=16168.0, features=DenseVector([3.2938, 0.0486, 0.192, 2.2778, 122.9414, 0.0, 3.7959, 3.9783, -0.6663, -1.2474, 0.366, 1.366])),
 Row(load=16209.0, features=DenseVector([3.449, 0.0, 0.0, 1.5588, 123.2157, 0.0, 1.9513, 2.1542, 0.3257, -1.3762, 0.366, 1.366])),
 Row(load=16216.0, features=DenseVector([3.1513, 0.0, 0.0, 1.4051, 123.3464, 0.0, 2.4621, 2.0451, 0.7711, -1.1855, 0.7071, 1.2247]))]

In [37]:
# Linear regression.
# The learning objective is to minimize the specified loss function, with regularization.
#More about regParam and elasticNetParam (seen as penalties on the loss function that minimises error in predicitons) can be found here: https://runawayhorse001.github.io/LearningApacheSpark/reg.html
# The maxIter parameter sets an upper limit on the number of iterations the optimization algorithm can perform regarding the loss function
# By default the LinearRegression function expects another column named "features"

lr = LinearRegression(labelCol="load", maxIter=500)

# Fits a model to the input dataset with optional parameters.
linearModel = lr.fit(train_data)

# Get the summary of the model
trainingSummary = linearModel.summary

# Print the Score of the model
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 2851.256628
r2: 0.699920


In [None]:
# Define the model
lr = LinearRegression(featuresCol="features", labelCol="load")

# Define hyperparameter grid
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.001, 0.01, 0.1, 1.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])  # 0 = Ridge, 1 = Lasso
             .build())

# Define evaluator
evaluator = RegressionEvaluator(labelCol="load", predictionCol="prediction", metricName="rmse")

# Define cross-validation
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Fit the model with cross-validation
cv_model = cv.fit(train_data)

# Get the best model
best_model = cv_model.bestModel
print(f"Best regParam: {best_model._java_obj.getRegParam()}")
print(f"Best elasticNetParam: {best_model._java_obj.getElasticNetParam()}")

# Make predictions
predictions = best_model.transform(test_data)

# Evaluate performance
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")

# Show predictions
predictions.select("load", "prediction").show(10, truncate=False)

r2 = evaluator.setMetricName("r2").evaluate(predictions)
print(f"R2: {r2}")

In [36]:
import pandas as pd

# Get coefficients
coefficients = best_model.coefficients.toArray()

# Create a DataFrame for feature importance
coef_df = pd.DataFrame({"Feature": feature_columns, "Coefficient": coefficients})

# Sort by absolute coefficient values (fixing the issue)
coef_df = coef_df.reindex(coef_df["Coefficient"].abs().sort_values(ascending=False).index)

# Display feature importance
print("\nFeature Importance (Lasso Regression Coefficients):")
print(coef_df)

# Identify removed features (Lasso sets them to exactly 0)
removed_features = coef_df[coef_df["Coefficient"] == 0]["Feature"].tolist()
print(f"\nFeatures removed by Lasso: {removed_features}")




Feature Importance (Lasso Regression Coefficients):
              Feature  Coefficient
0         load_lag_24  3020.223594
11           time_cos -1746.120104
3         temperature  -635.724839
5   sunshine_duration  -615.834222
9             day_cos   321.831150
1            rainfall   119.955653
7    wind_speed_lag_1   -79.763799
10           time_sin   -79.315728
8             day_sin    60.714177
6          wind_speed   -52.349136
2      rainfall_lag_1    43.061944
4            pressure    26.586304

Features removed by Lasso: []


In [14]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Define the Random Forest model
rf = RandomForestRegressor(featuresCol="features", labelCol="load", numTrees=100, maxDepth=10, seed=42)

# Train the Random Forest model
rf_model = rf.fit(train_data)

# Make predictions
rf_predictions = rf_model.transform(test_data)

# Evaluate performance
evaluator = RegressionEvaluator(labelCol="load", predictionCol="prediction", metricName="rmse")
rf_rmse = evaluator.evaluate(rf_predictions)

print(f"Random Forest RMSE: {rf_rmse}")

# Show predictions
rf_predictions.select("load", "prediction").show(10, truncate=False)

# Feature importance
rf_feature_importance = rf_model.featureImportances
print("\nFeature Importances (Random Forest):")
for i, imp in enumerate(rf_feature_importance):
    print(f"{feature_columns[i]}: {imp}")


Random Forest RMSE: 2655.331627131015
+-------+------------------+
|load   |prediction        |
+-------+------------------+
|16120.0|19756.220040786684|
|16170.0|19363.87555976215 |
|16555.0|19883.084436726895|
|16602.0|19352.431212810618|
|16603.0|19653.89031174189 |
|16634.0|19261.425831555174|
|16643.0|19476.182713137518|
|16746.0|19316.405360232406|
|16833.0|19518.02307237193 |
|16859.0|19405.34351009589 |
+-------+------------------+
only showing top 10 rows


Feature Importances (Random Forest):
load_lag_24: 0.6383276555146126
rainfall: 0.011059557408270014
rainfall_lag_1: 0.010013318455556208
temperature: 0.1612212502922963
pressure: 0.03209565387580752
sunshine_duration: 0.10765030750084678
wind_speed: 0.021534098553229224
wind_speed_lag_1: 0.01809815839938156


In [17]:
from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.regression import GeneralizedLinearRegression

# Define the Polynomial Expansion (degree = 2 for quadratic features)
poly_expansion = PolynomialExpansion(degree=2, inputCol="features", outputCol="poly_features")

# Apply transformation
poly_data = poly_expansion.transform(train_data)

# Define the Generalized Linear Model (GLM)
glm = GeneralizedLinearRegression(featuresCol="poly_features", labelCol="load", family="gaussian", link="identity")

# Train the model
glm_model = glm.fit(poly_data)

# Make predictions
poly_predictions = glm_model.transform(poly_expansion.transform(test_data))

# Evaluate performance
evaluator = RegressionEvaluator(labelCol="load", predictionCol="prediction", metricName="rmse")
poly_rmse = evaluator.evaluate(poly_predictions)

print(f"Polynomial Regression RMSE: {poly_rmse}")

# Show predictions
poly_predictions.select("load", "prediction").show(10, truncate=False)


Polynomial Regression RMSE: 2930.561547709331
+-------+------------------+
|load   |prediction        |
+-------+------------------+
|16120.0|19007.793203885783|
|16170.0|19134.1688077703  |
|16555.0|19794.277979894483|
|16602.0|18408.548487121356|
|16603.0|20398.044320557383|
|16634.0|18177.71089276485 |
|16643.0|19466.786875656457|
|16746.0|18337.63203177735 |
|16833.0|19769.41339639749 |
|16859.0|19106.392148101062|
+-------+------------------+
only showing top 10 rows

