# 1. Introduction and Setup

## Time Series, using PySpark
PJM Interconnection LLC (PJM) is a regional transmission organization (RTO) in the United States. It is part of the Eastern Interconnection grid operating an electric transmission system serving all or parts of Delaware, Illinois, Indiana, Kentucky, Maryland, Michigan, New Jersey, North Carolina, Ohio, Pennsylvania, Tennessee, Virginia, West Virginia, and the District of Columbia.

The hourly power consumption data comes from PJM's website and are in megawatts (MW).

## Library Imports and Configuration 

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp

In [16]:
spark = SparkSession.builder.appName("TimeSeriesForecast").getOrCreate()

## Loading Data

In [43]:
data_path = "./data/PJME_hourly.csv"

df = spark.read\
    .option("header", True)\
    .option("inferSchema", True)\
    .csv(data_path)

df.printSchema()

root
 |-- Datetime: timestamp (nullable = true)
 |-- PJME_MW: double (nullable = true)



In [45]:
df.describe().show()

+-------+------------------+
|summary|           PJME_MW|
+-------+------------------+
|  count|            145366|
|   mean|32080.222830648156|
| stddev|6464.0121664127355|
|    min|           14544.0|
|    max|           62009.0|
+-------+------------------+



In [None]:
df = df.withColumnRenamed("PJME_MW", "MW")\
       .withColumn("Datetime", to_timestamp(col("Datetime"), "MM/dd/yyyy HH:mm")) \

print("\nSchema after conversions:")
df.printSchema()


df.show(5, truncate=False)


Schema after conversions:
root
 |-- Datetime: timestamp (nullable = true)
 |-- MW: double (nullable = true)

+-------------------+-------+
|Datetime           |MW     |
+-------------------+-------+
|2002-12-31 01:00:00|26498.0|
|2002-12-31 02:00:00|25147.0|
|2002-12-31 03:00:00|24574.0|
|2002-12-31 04:00:00|24393.0|
|2002-12-31 05:00:00|24860.0|
+-------------------+-------+
only showing top 5 rows


In [48]:
df_clean = df.filter(col("MW") > 19000.0)


df_sorted = df_clean.orderBy("Datetime")

print(f"\nOriginal number of rows: {df.count()}")
print(f"Number of rows after outlier removal: {df_sorted.count()}")


df_sorted.show(5, truncate=False)


Original number of rows: 145366
Number of rows after outlier removal: 145351
+-------------------+-------+
|Datetime           |MW     |
+-------------------+-------+
|2002-01-01 01:00:00|30393.0|
|2002-01-01 02:00:00|29265.0|
|2002-01-01 03:00:00|28357.0|
|2002-01-01 04:00:00|27899.0|
|2002-01-01 05:00:00|28057.0|
+-------------------+-------+
only showing top 5 rows


In [20]:
from pyspark.sql.functions import col, hour, dayofweek, month, year, dayofyear, weekofyear, quarter

# Create a new DataFrame with all the required time features
df_features = df_sorted.withColumn("hour", hour(col("Datetime"))) \
    .withColumn("dayofweek", dayofweek(col("Datetime"))) \
    .withColumn("dayofyear", dayofyear(col("Datetime"))) \
    .withColumn("month", month(col("Datetime"))) \
    .withColumn("year", year(col("Datetime"))) \
    .withColumn("quarter", quarter(col("Datetime"))) \
    .withColumn("weekofyear", weekofyear(col("Datetime")))

# Display the new features created
print("DataFrame Schema with new time features:")
df_features.printSchema()

# Show the first few rows to confirm the features were added
df_features.select("Datetime", "MW", "hour", "dayofweek", "month", "year").show(5, truncate=False)

DataFrame Schema with new time features:
root
 |-- Datetime: timestamp (nullable = true)
 |-- MW: double (nullable = true)
 |-- hour: integer (nullable = true)
 |-- dayofweek: integer (nullable = true)
 |-- dayofyear: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- weekofyear: integer (nullable = true)

+-------------------+-------+----+---------+-----+----+
|Datetime           |MW     |hour|dayofweek|month|year|
+-------------------+-------+----+---------+-----+----+
|2002-01-01 01:00:00|30393.0|1   |3        |1    |2002|
|2002-01-01 02:00:00|29265.0|2   |3        |1    |2002|
|2002-01-01 03:00:00|28357.0|3   |3        |1    |2002|
|2002-01-01 04:00:00|27899.0|4   |3        |1    |2002|
|2002-01-01 05:00:00|28057.0|5   |3        |1    |2002|
+-------------------+-------+----+---------+-----+----+
only showing top 5 rows


In [None]:
from pyspark.sql.functions import lag
from pyspark.sql.window import Window

# 1. Define the Window Specification
# This window applies over the entire dataset (no partition)
# and orders the rows strictly by the 'Datetime' column.
# This order is essential for the lag function to work correctly.
window_spec = Window.orderBy("Datetime")

# Define the lag offsets in hours 
LAG_1_YR = 8736
LAG_2_YR = 17472
LAG_3_YR = 26208

# 2. Apply the lag() function using the Window Specification
df_lags = df_features.withColumn("lag1", lag(col("MW"), LAG_1_YR).over(window_spec)) \
                     .withColumn("lag2", lag(col("MW"), LAG_2_YR).over(window_spec)) \
                     .withColumn("lag3", lag(col("MW"), LAG_3_YR).over(window_spec))

# 3. Handle Missing Values (NULLs)
# The first 26208 rows (3 years) will have NULLs in the lag columns.
# We drop these rows because a machine learning model can't train on NULLs.
df_final_features = df_lags.dropna()

print("DataFrame Schema with new Lag features:")
df_final_features.printSchema()

print("\nFirst rows after adding and dropping NULLs (data starts after 3 years):")
# Show the columns to confirm the lag values are present
df_final_features.select("Datetime", "MW", "year", "lag1", "lag2", "lag3").show(5, truncate=False)

DataFrame Schema with new Lag features:
root
 |-- Datetime: timestamp (nullable = true)
 |-- MW: double (nullable = true)
 |-- hour: integer (nullable = true)
 |-- dayofweek: integer (nullable = true)
 |-- dayofyear: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- weekofyear: integer (nullable = true)
 |-- lag1: double (nullable = true)
 |-- lag2: double (nullable = true)
 |-- lag3: double (nullable = true)


First rows after adding and dropping NULLs (data starts after 3 years):
+-------------------+-------+----+-------+-------+-------+
|Datetime           |MW     |year|lag1   |lag2   |lag3   |
+-------------------+-------+----+-------+-------+-------+
|2004-12-28 07:00:00|37755.0|2004|24659.0|24574.0|30393.0|
|2004-12-28 08:00:00|39493.0|2004|26076.0|24393.0|29265.0|
|2004-12-28 09:00:00|40070.0|2004|28615.0|24860.0|28357.0|
|2004-12-28 10:00:00|40030.0|2004|30876.0|26222.0|27899.0|
|200

In [None]:
from pyspark.ml.feature import VectorAssembler

# List all the feature columns we want to use for the model.
FEATURE_COLS = [
    "hour", "dayofweek", "dayofyear", "month", "year", "quarter", "weekofyear",
    "lag1", "lag2", "lag3"
]
TARGET_COL = "MW" # Our energy consumption target

# Initialize the VectorAssembler
assembler = VectorAssembler(
    inputCols=FEATURE_COLS,
    outputCol="features"
)

# Apply the assembler transformation to create the features vector column
df_model_ready = assembler.transform(df_final_features)

# Show the resulting DataFrame structure
print("DataFrame Schema after VectorAssembler:")
df_model_ready.printSchema()

# Show the new features column
df_model_ready.select("Datetime", "MW", "features").show(5, truncate=False)

DataFrame Schema after VectorAssembler:
root
 |-- Datetime: timestamp (nullable = true)
 |-- MW: double (nullable = true)
 |-- hour: integer (nullable = true)
 |-- dayofweek: integer (nullable = true)
 |-- dayofyear: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- weekofyear: integer (nullable = true)
 |-- lag1: double (nullable = true)
 |-- lag2: double (nullable = true)
 |-- lag3: double (nullable = true)
 |-- features: vector (nullable = true)

+-------------------+-------+-------------------------------------------------------------+
|Datetime           |MW     |features                                                     |
+-------------------+-------+-------------------------------------------------------------+
|2004-12-28 07:00:00|37755.0|[7.0,3.0,363.0,12.0,2004.0,4.0,53.0,24659.0,24574.0,30393.0] |
|2004-12-28 08:00:00|39493.0|[8.0,3.0,363.0,12.0,2004.0,4.0,53.0,26076.0,24393.0,2

In [None]:
from pyspark.sql.functions import to_date

# Define the cutoff date for the split
CUTOFF_DATE = "2015-01-01"

# Convert Datetime to Date for easy comparison 
df_split = df_model_ready.withColumn("Date", to_date(col("Datetime")))

# Create the training set (data before the cutoff date)
train_df = df_split.filter(col("Date") < CUTOFF_DATE)

# Create the testing set
test_df = df_split.filter(col("Date") >= CUTOFF_DATE)

print("-" * 50)
print(f"Train Set Size (before {CUTOFF_DATE}): {train_df.count()} rows")
print(f"Test Set Size (on or after {CUTOFF_DATE}): {test_df.count()} rows")
print("-" * 50)

--------------------------------------------------
Train Set Size (before 2015-01-01): 87703 rows
Test Set Size (on or after 2015-01-01): 31440 rows
--------------------------------------------------


In [35]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# The target column ('MW') and the feature vector column ('features')
# are automatically handled by the regressor.

# Define the Gradient-Boosted Tree Regressor model
# We set parameters similar to a robust XGBoost setup:
gbt_regressor = GBTRegressor(
    featuresCol="features",
    labelCol="MW", # TARGET_COL from the previous step
    maxIter=50,     # Number of trees/iterations (a typical default)
    maxDepth=5      # Depth of each decision tree
)

print("Starting model training with GBTRegressor...")

# Fit the model to the training set (train_df)
model = gbt_regressor.fit(train_df)

print("Model training complete.")

Starting model training with GBTRegressor...
Model training complete.


In [36]:
# Make predictions on the test set
predictions_df = model.transform(test_df)

# Show the actual MW value vs. the predicted MW value
print("\nFirst 5 predictions:")
predictions_df.select("Datetime", "MW", "prediction").show(5, truncate=False)

# Initialize the evaluator to calculate RMSE
evaluator = RegressionEvaluator(
    labelCol="MW",
    predictionCol="prediction",
    metricName="rmse" # Use Root Mean Squared Error
)

# Calculate the final RMSE score on the test set
rmse_score = evaluator.evaluate(predictions_df)

print("-" * 50)
print(f"✅ Final PySpark Model RMSE on Test Set: {rmse_score:,.2f}")
print("-" * 50)


First 5 predictions:
+-------------------+-------+-----------------+
|Datetime           |MW     |prediction       |
+-------------------+-------+-----------------+
|2015-01-01 00:00:00|32802.0|34421.31684998449|
|2015-01-01 01:00:00|31647.0|33213.17155640886|
|2015-01-01 02:00:00|30755.0|33165.12554789061|
|2015-01-01 03:00:00|30189.0|33056.68809190518|
|2015-01-01 04:00:00|29890.0|33056.68809190518|
+-------------------+-------+-----------------+
only showing top 5 rows
--------------------------------------------------
✅ Final PySpark Model RMSE on Test Set: 4,001.13
--------------------------------------------------


In [None]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import to_date, col

# Add a simple Date column for filtering (easier than comparing Timestamps)
df_split = df_model_ready.withColumn("Date", to_date(col("Datetime")))

In [38]:
# Define the GBT Regressor (same as before)
gbt_regressor = GBTRegressor(
    featuresCol="features",
    labelCol="MW",
    maxIter=50,
    maxDepth=5
)

# Define the training and test periods for each fold (e.g., three years for training, one year for testing)
# We will test in 2015, 2016, and 2017.
FOLD_CUTOFFS = [
    # Fold 1: Train up to 2015-01-01, Test 2015
    ("2015-01-01", "2016-01-01"), 
    # Fold 2: Train up to 2016-01-01, Test 2016
    ("2016-01-01", "2017-01-01"), 
    # Fold 3: Train up to 2017-01-01, Test 2017
    ("2017-01-01", "2018-01-01") 
]

rmse_results = []
evaluator = RegressionEvaluator(labelCol="MW", predictionCol="prediction", metricName="rmse")

print("Starting 3-Fold Time Series Cross-Validation...\n")

for i, (train_cutoff, test_cutoff) in enumerate(FOLD_CUTOFFS):
    # 1. TEMPORAL SPLIT
    # Train: Data before the first cutoff (e.g., before 2015-01-01)
    train_fold_df = df_split.filter(col("Date") < train_cutoff)
    # Test: Data between the cutoffs (e.g., between 2015-01-01 and 2016-01-01)
    test_fold_df = df_split.filter((col("Date") >= train_cutoff) & (col("Date") < test_cutoff))

    # Check that both DataFrames have data
    if train_fold_df.count() == 0 or test_fold_df.count() == 0:
        print(f"Skipping Fold {i+1}: Insufficient data for this period.")
        continue

    print(f"--- FOLD {i+1} ---")
    print(f"Train size: {train_fold_df.count()} (up to {train_cutoff})")
    print(f"Test size: {test_fold_df.count()} (from {train_cutoff} to {test_cutoff})")
    
    # 2. TRAINING
    model_fold = gbt_regressor.fit(train_fold_df)
    
    # 3. PREDICTION & EVALUATION
    predictions_fold_df = model_fold.transform(test_fold_df)
    rmse_fold = evaluator.evaluate(predictions_fold_df)
    
    print(f"RMSE for Fold {i+1}: {rmse_fold:,.2f}\n")
    rmse_results.append(rmse_fold)

# Calculate the final average RMSE
if rmse_results:
    avg_rmse = sum(rmse_results) / len(rmse_results)
    
    print("-" * 50)
    print(f"ALL FOLDS RMSEs: {rmse_results}")
    print(f"** ROBUST AVERAGE RMSE: {avg_rmse:,.2f} **")
    print("-" * 50)

Starting 3-Fold Time Series Cross-Validation...

--- FOLD 1 ---
Train size: 87703 (up to 2015-01-01)
Test size: 8760 (from 2015-01-01 to 2016-01-01)
RMSE for Fold 1: 3,713.54

--- FOLD 2 ---
Train size: 96463 (up to 2016-01-01)
Test size: 8784 (from 2016-01-01 to 2017-01-01)
RMSE for Fold 2: 3,967.38

--- FOLD 3 ---
Train size: 105247 (up to 2017-01-01)
Test size: 8760 (from 2017-01-01 to 2018-01-01)
RMSE for Fold 3: 4,247.95

--------------------------------------------------
ALL FOLDS RMSEs: [3713.5388239589797, 3967.3814679357106, 4247.950879227678]
** ROBUST AVERAGE RMSE: 3,976.29 **
--------------------------------------------------
