In [None]:
from typing import Any
from datetime import time

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import enfobench as efb
from enfobench.datasets import ElectricityDemandDataset, PVGenerationDataset, GasDemandDataset
from enfobench.datasets.utils import create_perfect_forecasts_from_covariates
from enfobench.evaluation.metrics import (
    mean_bias_error,
    mean_absolute_error,
    root_mean_squared_error,
)
from enfobench.evaluation.utils import periods_in_duration, create_forecast_index

## Loading the datasets

In [None]:
edd = ElectricityDemandDataset("../data/electricity-demand")
edd

In [None]:
pvd = PVGenerationDataset("../data/pv-generation")
pvd

In [None]:
gdd = GasDemandDataset("../data/gas-demand")
gdd

In [None]:
print(f"Number of buildings in the {edd.__class__.__name__}: {len(edd.metadata_subset.list_unique_ids())}")
print(f"Number of buildings in the {pvd.__class__.__name__}: {len(pvd.metadata_subset.list_unique_ids())}")
print(f"Number of buildings in the {gdd.__class__.__name__}: {len(gdd.metadata_subset.list_unique_ids())}")

In [None]:
edd.metadata_subset.read().freq.unique()
print(f"Available frequencies in the {edd.__class__.__name__}: {edd.metadata_subset.read().freq.unique().tolist()}")
print(f"Available frequencies in the {pvd.__class__.__name__}: {pvd.metadata_subset.read().freq.unique().tolist()}")
print(f"Available frequencies in the {gdd.__class__.__name__}: {gdd.metadata_subset.read().freq.unique().tolist()}")

## Models
### Naive Seasonal Model

In [None]:
from statsforecast.models import SeasonalNaive, HistoricAverage, SeasonalWindowAverage


class SeasonalNaiveModel:
    def __init__(self, seasonality: str = "1D"):
        self.seasonality = seasonality.upper()

    def forecast(
            self,
            horizon: int,
            history: pd.DataFrame,
            past_covariates: pd.DataFrame | None = None,
            future_covariates: pd.DataFrame | None = None,
            metadata: dict | None = None,
            level: list[int] | None = None,
            **kwargs,
    ) -> pd.DataFrame:
        # Fill missing values
        y = history.y.fillna(history.y.mean())

        # Create model
        periods = periods_in_duration(y.index, duration=self.seasonality)
        model = SeasonalNaive(season_length=periods)

        # Make forecast
        pred = model.forecast(y=y.values, h=horizon, level=level, **kwargs)

        # Create index for forecast
        index = create_forecast_index(history=history, horizon=horizon)

        # Postprocess forecast
        forecast = pd.DataFrame(index=index, data=pred).rename(columns={"mean": "yhat"}).fillna(y.mean())
        return forecast


daily_naive_model = SeasonalNaiveModel(seasonality='1d')
weekly_naive_model = SeasonalNaiveModel(seasonality='7d')

## Benchmarking

In [None]:
def evaluate_model_on_building(
    *,
    dataset: efb.Dataset,
    unique_id: str,
    model: Any,
    cv_folds: int,
    cv_step: pd.Timedelta,
    cv_horizon: pd.Timedelta,
    cv_time: str = time(hour=0, minute=0), 
    initial_training_period: pd.Timedelta = pd.Timedelta("28 days"),
    metrics: dict[str, Any] | None = None,
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """Evaluate model on a single building.
    
    Args:
        dataset: Dataset object.
        unique_id: Unique identifier of the building.
        model: Model object.
        cv_folds: Number of cross-validation folds.
        cv_step: Step size for cross-validation.
        cv_horizon: Forecast horizon for cross-validation.
        cv_time: Time when the forecasts are made, defaults to midnight.
        initial_training_period: Initial available training period at the first prediction.
        metrics: Metrics to evaluate. Defaults to [MBE, MAE, RMSE].
        
    Returns:
        Cross-validation results and metrics.
    """
    # Load data for building
    target, past_covariates, metadata = dataset.get_data_by_unique_id(unique_id)

    # Create perfect forecasts from past covariates
    future_covariates = create_perfect_forecasts_from_covariates(
        past_covariates[
            ['temperature_2m', 'relative_humidity_2m', 'wind_speed_10m', 'wind_direction_10m', 'cloud_cover']],
        start=pd.Timestamp("2013-01-01T00:00:00"),
        horizon=pd.Timedelta("4 days"),
        step=pd.Timedelta("24 hour"),
    )

    # Create dataset
    dataset = efb.Dataset(
        target=target,
        past_covariates=past_covariates,
        future_covariates=future_covariates,
        metadata=metadata,
    )

    # Cross-validate model
    start_date = (dataset.target_available_since + initial_training_period + pd.Timedelta("1 day")).replace(hour=cv_time.hour, minute=cv_time.minute)
    end_date = start_date + cv_folds * cv_step + cv_horizon
    print(f"Training data is available from '{dataset.target_available_since}' until '{dataset.target_available_until}'")
    print(f"Evaluating model from '{start_date}' to '{end_date}', with {cv_folds} folds, '{cv_step}' step, and '{cv_horizon}' horizon.")
    crossval_df = efb.evaluation.cross_validate(
        model,
        dataset,
        start_date=start_date,
        end_date=end_date,
        horizon=cv_horizon,
        step=cv_step,
    )

    # Evaluate metrics
    default_metrics = {
        "MBE": mean_bias_error,
        "MAE": mean_absolute_error,
        "RMSE": root_mean_squared_error,
    }
    metrics = efb.evaluation.evaluate_metrics(
        crossval_df,
        metrics=metrics or default_metrics,
        groupby="cutoff_date",
    )
    return crossval_df, metrics

In [None]:
# Example evaluation
evaluate_model_on_building(
    dataset=edd, 
    unique_id=edd.metadata_subset.list_unique_ids()[0], 
    model=daily_naive_model,
    cv_folds=10,
    cv_step=pd.Timedelta("1 day"),
    cv_horizon=pd.Timedelta("1 day"),
)

In [None]:
def sample_buildings(dataset, n_buildings: int, seed: int = 42) -> list[str]:
    """Sample buildings from dataset."""
    np.random.seed(seed)
    metadata = dataset.metadata_subset.read().dropna()
    return np.random.choice(metadata.unique_id.values, n_buildings)

def benchmark_model(
    dataset, 
    unique_ids: list[str],
    model: Any,
    **kwargs,
) -> pd.DataFrame:
    """Benchmark model on multiple buildings."""    
    results = []
    for unique_id in unique_ids:
        print(f"Benchmarking model on building '{unique_id}'")
        _, metrics = evaluate_model_on_building(dataset=dataset, unique_id=unique_id, model=model, **kwargs)
        metrics['unique_id'] = unique_id
        results.append(metrics)
        
    return pd.concat(results).set_index(['unique_id', 'cutoff_date']).drop(columns='weight')

### Run benchmarking with baseline model

In [None]:
ed_leaderboard = pd.DataFrame()

In [None]:
dataset = edd
buildings = sample_buildings(dataset, n_buildings=5)
cv_folds = 10
cv_step = pd.Timedelta("1 day")
cv_horizon = pd.Timedelta("1 day")
baseline_model = daily_naive_model

In [None]:
baseline_results_df = benchmark_model(dataset, buildings, baseline_model, cv_folds=cv_folds, cv_step=cv_step, cv_horizon=cv_horizon)

In [None]:
model = SeasonalNaiveModel(seasonality='1D')
model_results_df = benchmark_model(dataset, buildings, model, cv_folds=cv_folds, cv_step=cv_step, cv_horizon=cv_horizon)
model_results_df['rMAE'] = (model_results_df / baseline_results_df).loc[:, "MAE"]
ed_leaderboard['DailyNaiveSeasonal'] = model_results_df.mean()

In [None]:
model = SeasonalNaiveModel(seasonality='7D')
model_results_df = benchmark_model(dataset, buildings, model, cv_folds=cv_folds, cv_step=cv_step, cv_horizon=cv_horizon)
model_results_df['rMAE'] = (model_results_df / baseline_results_df).loc[:, "MAE"]
ed_leaderboard['WeeklyNaiveSeasonal'] = model_results_df.mean()

In [None]:
# Placeholder model to benchmark, change this to the model you want to benchmark
class HistoricAverageModel:

    def forecast(
        self,
        horizon: int,
        history: pd.DataFrame,
        past_covariates: pd.DataFrame | None = None,
        future_covariates: pd.DataFrame | None = None,
        metadata: dict | None = None,
        level: list[int] | None = None,
        **kwargs,
    ) -> pd.DataFrame:
        # Fill missing values
        y = history.y.fillna(history.y.mean())

        # Create model
        model = HistoricAverage()

        # Make forecast
        pred = model.forecast(y=y.values, h=horizon, level=level, **kwargs)

        # Create index for forecast
        index = create_forecast_index(history=history, horizon=horizon)

        # Postprocess forecast
        forecast = pd.DataFrame(index=index, data=pred).rename(columns={"mean": "yhat"}).fillna(y.mean())
        return forecast
    

model = HistoricAverageModel()
model_results_df = benchmark_model(dataset, buildings, model, cv_folds=cv_folds, cv_step=cv_step, cv_horizon=cv_horizon)
model_results_df['rMAE'] = (model_results_df / baseline_results_df).loc[:, "MAE"]
ed_leaderboard['HistoricAverage'] = model_results_df.mean()

In [None]:
# Placeholder model to benchmark, change this to the model you want to benchmark
class SeasonalWindowAverageModel:
    
    def __init__(self, seasonality: str, window_size: int):
        self.seasonality = seasonality.upper()
        self.window_size = window_size

    def forecast(
        self,
        horizon: int,
        history: pd.DataFrame,
        past_covariates: pd.DataFrame | None = None,
        future_covariates: pd.DataFrame | None = None,
        metadata: dict | None = None,
        level: list[int] | None = None,
        **kwargs,
    ) -> pd.DataFrame:
        # Fill missing values
        y = history.y.fillna(history.y.mean())

        # Create model
        periods = periods_in_duration(y.index, duration=self.seasonality)
        model = SeasonalWindowAverage(season_length=periods, window_size=self.window_size)

        # Make forecast
        pred = model.forecast(y=y.values, h=horizon, **kwargs)

        # Create index for forecast
        index = create_forecast_index(history=history, horizon=horizon)

        # Postprocess forecast
        forecast = pd.DataFrame(
            index=index,
            data={
                "yhat": pred["mean"],
            },
        ).fillna(y.mean())
        return forecast
    

model = SeasonalWindowAverageModel("1D", 28)
model_results_df = benchmark_model(dataset, buildings, model, cv_folds=cv_folds, cv_step=cv_step, cv_horizon=cv_horizon)
model_results_df['rMAE'] = (model_results_df / baseline_results_df).loc[:, "MAE"]
ed_leaderboard['SeasonalWindowAverageS1DW28'] = model_results_df.mean()

In [None]:
model = SeasonalWindowAverageModel("7D", 4)
model_results_df = benchmark_model(dataset, buildings, model, cv_folds=cv_folds, cv_step=cv_step, cv_horizon=cv_horizon)
model_results_df['rMAE'] = (model_results_df / baseline_results_df).loc[:, "MAE"]
ed_leaderboard['SeasonalWindowAverageS7DW4'] = model_results_df.mean()

In [None]:
ed_leaderboard.T.round(2).sort_values("rMAE")

In [None]:
fig = ed_leaderboard.T.sort_values("rMAE", ascending=False).rMAE.plot(kind='barh', figsize=(10, 5), title='Electricity Demand Leaderboard', xlabel='rMAE')
plt.show()