# Benchmark

> Fill in a module description here

In [None]:
#| default_exp benchmark

In [None]:
#| export
from typing import List, Optional, Callable, Dict, Any, Iterator, Tuple, Union
from pathlib import Path
import random
import numpy as np
import itertools
import time
from identibench.utils import get_default_data_root,_load_sequences_from_files

In [None]:
#| hide
from fastcore.test import test_eq, test_ne# Import nbdev testing functions
import identibench.metrics
from identibench.utils import _dummy_dataset_loader

## Benchmark Specifications

In [None]:
#| exporti
def _test_simulation(specs, model):
    test_dir = specs.dataset_path / 'test'
    test_files = sorted(list(test_dir.glob('*.hdf5'))) 

    if not test_files:
        raise RuntimeError(f"No test files found in {test_dir}") 

    all_scores = []
    for u_test, y_test, _ in _load_sequences_from_files(test_files, specs.u_cols, specs.y_cols, specs.x_cols):
        y_pred = model(u_test,y_test[:specs.init_window])
        score = specs.metric_func(y_test, y_pred)
        all_scores.append(score)
            
    if not all_scores:
        final_score = np.nan 
        print(f"Warning: No valid scores calculated for benchmark {specs.name}.")
    else:
        final_score = np.mean(all_scores).item() # Ensure scalar float

    return {'metric_score': final_score}

In [None]:
#| exporti
class BenchmarkSpecBase: pass 
class BenchmarkSpecBase:
    """
    Base class for benchmark specifications, holding common attributes.
    """
    def __init__(self,
                 name: str, # Unique name identifying this benchmark task.
                 dataset_id: str, # Identifier for the raw dataset source.
                 u_cols: List[str], # List of column names for input signals (u).
                 y_cols: List[str], # List of column names for output signals (y).
                 metric_func: Callable[[np.ndarray, np.ndarray], float], # Primary metric: `func(y_true, y_pred)`.
                 x_cols: Optional[List[str]] = None, # Optional state inputs (x).
                 sampling_time: Optional[float] = None, # Optional sampling time (seconds).
                 download_func: Optional[Callable[[Path, bool], None]] = None, # Dataset preparation func.
                 test_func: Callable[[BenchmarkSpecBase, Callable], Dict[str, Any]] = _test_simulation, # Evaluation func.
                 init_window: Optional[int] = None, # Steps for warm-up, potentially ignored in evaluation.
                 data_root: [Path, Callable[[], Path]] = get_default_data_root # root dir for dataset, may be a callable or path
                ):
        self.name = name
        self.dataset_id = dataset_id
        self.u_cols = u_cols
        self.y_cols = y_cols
        self.metric_func = metric_func
        self.x_cols = x_cols
        self.sampling_time = sampling_time
        self.download_func = download_func
        self.test_func = test_func
        self.init_window = init_window
        self._data_root = data_root

        # Ensure required parameters have valid values if needed (basic checks)
        if not self.name or not self.dataset_id or not self.u_cols or not self.y_cols or not self.metric_func:
             raise ValueError("Core benchmark parameters (name, dataset_id, u_cols, y_cols, metric_func) are required.")

    @property
    def data_root(self) -> Path:
        """Returns the evaluated data root path."""
        if isinstance(self._data_root, Callable):
            return self._data_root()
        return self._data_root

    @property
    def dataset_path(self) -> Path:
        """Returns the full path to the dataset directory."""
        return self.data_root / self.dataset_id

    def ensure_dataset_exists(self, force_download: bool = False) -> None:
        """Checks if the dataset exists, downloads/prepares it if needed."""
        # (Implementation remains the same as before)
        dataset_path = self.dataset_path
        if self.download_func is None:
            print(f"Warning: No download function for '{self.name}'. Assuming data exists at {dataset_path}")
            if not dataset_path.is_dir():
                 print(f"Warning: Dataset directory {dataset_path} not found.")
            return

        if not dataset_path.is_dir() or force_download:
            print(f"Preparing dataset for '{self.name}' at {dataset_path}...")
            self.data_root.mkdir(parents=True, exist_ok=True)
            try:
                self.download_func(dataset_path, force_download)
                print(f"Dataset '{self.name}' prepared successfully.")
            except Exception as e:
                print(f"Error preparing dataset '{self.name}': {e}")
                raise


In [None]:
#| export
class BenchmarkSpecSimulation(BenchmarkSpecBase):
    """
    Specification for a simulation benchmark task.

    Inherits common parameters from BaseBenchmarkSpec.
    Use this when the goal is to simulate the system's output given the input `u`.
    """

In [None]:
#| exporti
def _test_prediction(specs: BenchmarkSpecBase, model: Callable):
    test_dir = specs.dataset_path / 'test'
    test_files = sorted(list(test_dir.glob('*.hdf5'))) 

    if not test_files:
        raise RuntimeError(f"No test files found in {test_dir}") 

    all_scores = []
    for u_test, y_test, _ in _load_sequences_from_files(test_files, specs.u_cols, specs.y_cols, specs.x_cols):
        y_pred = model(u_test,y_test[:specs.init_window])
        score = specs.metric_func(y_test, y_pred)
        all_scores.append(score)
            
    if not all_scores:
        final_score = np.nan 
        print(f"Warning: No valid scores calculated for benchmark {specs.name}.")
    else:
        final_score = np.mean(all_scores).item() # Ensure scalar float

    return {'metric_score': final_score}

In [None]:
#| export
class BenchmarkSpecPrediction(BenchmarkSpecBase):
    """
    Specification for a k-step ahead prediction benchmark task.

    Inherits common parameters from BaseBenchmarkSpec and adds prediction-specific ones.
    Use this when the goal is to predict `y` some steps ahead based on past `u` and `y`.
    """
    def __init__(self,
                 pred_horizon: int, # The 'k' in k-step ahead prediction (mandatory for this type).
                 pred_step: int, # Step size for k-step ahead prediction (e.g., predict y[t+k] using data up to t).
                 test_func: Callable[[BenchmarkSpecBase, Callable], Dict[str, Any]] = _test_prediction, # Evaluation func.
                 **kwargs # Capture all base class arguments
                ):
        super().__init__(**kwargs) # Initialize base class attributes
        if pred_horizon <= 0:
             raise ValueError("pred_horizon must be a positive integer for PredictionBenchmarkSpec.")
        self.pred_horizon = pred_horizon
        self.pred_step = pred_step

In [None]:
from nbdev import show_doc
show_doc(BenchmarkSpecPrediction)

---

[source](https://github.com/daniel-om-weber/identibench/blob/main/identibench/benchmark.py#L140){target="_blank" style="float:right; font-size:smaller"}

### BenchmarkSpecPrediction

>      BenchmarkSpecPrediction (pred_horizon:int, pred_step:int, test_func:Calla
>                               ble[[__main__.BenchmarkSpecBase,Callable],Dict[s
>                               tr,Any]]=<function _test_prediction>, **kwargs)

*Specification for a k-step ahead prediction benchmark task.

Inherits common parameters from BaseBenchmarkSpec and adds prediction-specific ones.
Use this when the goal is to predict `y` some steps ahead based on past `u` and `y`.*

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| pred_horizon | int |  | The 'k' in k-step ahead prediction (mandatory for this type). |
| pred_step | int |  | Step size for k-step ahead prediction (e.g., predict y[t+k] using data up to t). |
| test_func | Callable | _test_prediction | Evaluation func. |
| kwargs | VAR_KEYWORD |  |  |

In [None]:
# Test: BenchmarkSpec basic initialization and defaults
_spec_sim = BenchmarkSpecSimulation(
    name='_spec_default', dataset_id='_dummy_default',
    u_cols=['u0'], y_cols=['y0'], metric_func=identibench.metrics.rmse, 
    download_func=_dummy_dataset_loader
)
test_eq(_spec_sim.init_window, None)
test_eq(_spec_sim.name, '_spec_default') 

In [None]:
# Test: BenchmarkSpec initialization with prediction-related parameters
_spec_pred = BenchmarkSpecPrediction(
    name='_spec_pred_params', dataset_id='_dummy_pred_params',
    u_cols=['u0'], y_cols=['y0'], metric_func=identibench.metrics.rmse, 
    download_func=_dummy_dataset_loader, 
    init_window=20, pred_horizon=5, pred_step=2
)
test_eq(_spec_pred.init_window, 20)
test_eq(_spec_pred.pred_horizon, 5)
test_eq(_spec_pred.pred_step, 2)

In [None]:
# Test: BenchmarkSpec ensure_dataset_exists - first call (creation)
_spec_ensure = BenchmarkSpecSimulation(
    name='_spec_ensure', dataset_id='_dummy_ensure',
    u_cols=['u0'], y_cols=['y0'], metric_func=identibench.metrics.rmse, 
    download_func=_dummy_dataset_loader
)
_spec_ensure.ensure_dataset_exists()
_dataset_path_ensure = _spec_ensure.dataset_path
test_eq(_dataset_path_ensure.is_dir(), True)
test_eq((_dataset_path_ensure / 'train' / 'train_0.hdf5').is_file(), True)

In [None]:
# Test: BenchmarkSpec ensure_dataset_exists - second call (skip)
_mtime_before_skip = (_dataset_path_ensure / 'train' / 'train_0.hdf5').stat().st_mtime
time.sleep(0.1) 
_spec_ensure.ensure_dataset_exists() 
_mtime_after_skip = (_dataset_path_ensure / 'train' / 'train_0.hdf5').stat().st_mtime
test_eq(_mtime_before_skip, _mtime_after_skip)

In [None]:
# Test: BenchmarkSpec ensure_dataset_exists - third call (force_download=True)
_mtime_before_force = (_dataset_path_ensure / 'train' / 'train_0.hdf5').stat().st_mtime
time.sleep(0.1) 
_spec_ensure.ensure_dataset_exists(force_download=True) 
_mtime_after_force = (_dataset_path_ensure / 'train' / 'train_0.hdf5').stat().st_mtime
test_ne(_mtime_before_force, _mtime_after_force)

Preparing dataset for '_spec_ensure' at /Users/daniel/.identibench_data/_dummy_ensure...
Dataset '_spec_ensure' prepared successfully.


## Training Context

In [None]:
#| export
class TrainingContext:
    """
    Context object passed to the user's training function (`build_predictor`).

    Holds the benchmark specification, hyperparameters, and seed.
    Provides methods to access the raw, full-length training and validation data sequences.
    Windowing/batching for training must be handled within the user's `build_predictor` function.
    """
    # Explicit __init__ for nbdev documentation compatibility
    def __init__(self, 
                 spec: BenchmarkSpecBase, # The benchmark specification.
                 hyperparameters: Dict[str, Any], # User-provided dictionary containing model and training hyperparameters.
                 seed: Optional[int] = None # Optional random seed for reproducibility.
                ):
        # Standard attribute assignment
        self.spec = spec
        self.hyperparameters = hyperparameters
        self.seed = seed

    # --- Data Access Methods ---

    def _get_file_paths(self, subset: str) -> List[Path]:
        """Gets sorted list of HDF5 files for a given subset directory."""
        subset_path = self.spec.dataset_path / subset
        if not subset_path.is_dir():
            return []
        return sorted(list(subset_path.glob('*.hdf5')))

    def _get_sequences_from_subset(self, subset: str
                                  ) -> Iterator[Tuple[np.ndarray, np.ndarray, Optional[np.ndarray]]]:
        """Loads raw sequences for a specific subset directory."""
        file_paths = self._get_file_paths(subset)
        if not file_paths:
             print(f"Warning: No HDF5 files found in {self.spec.dataset_path / subset}. Returning empty iterator.")
             return iter([])

        return _load_sequences_from_files(
            file_paths=file_paths,
            u_cols=self.spec.u_cols,
            y_cols=self.spec.y_cols,
            x_cols=self.spec.x_cols,
        )

    def get_train_sequences(self) -> Iterator[Tuple[np.ndarray, np.ndarray, Optional[np.ndarray]]]:
        """Returns a lazy iterator yielding raw (u, y, x) tuples for the 'train' subset."""
        return self._get_sequences_from_subset('train')

    def get_valid_sequences(self) -> Iterator[Tuple[np.ndarray, np.ndarray, Optional[np.ndarray]]]:
        """Returns a lazy iterator yielding raw (u, y, x) tuples for the 'valid' subset."""
        return self._get_sequences_from_subset('valid')

    def get_train_valid_sequences(self) -> Iterator[Tuple[np.ndarray, np.ndarray, Optional[np.ndarray]]]:
        """
        Returns a lazy iterator yielding raw (u, y, x) tuples for combined training and validation.

        Checks for a 'train_valid' subset directory first. If it exists, loads data from there.
        If not, it loads data from 'train' and 'valid' subsets sequentially.
        """
        train_valid_files = self._get_file_paths('train_valid')
        if train_valid_files:
            return _load_sequences_from_files(
                file_paths=train_valid_files, u_cols=self.spec.u_cols, y_cols=self.spec.y_cols,
                x_cols=self.spec.x_cols
            )
        else:
            train_iter = self._get_sequences_from_subset('train')
            valid_iter = self._get_sequences_from_subset('valid')
            return itertools.chain(train_iter, valid_iter)

In [None]:
#todo: test

## Benchmark Runtime

In [None]:
#| export
def run_benchmark(spec, build_model, hyperparameters={}, seed=None):

    if seed is None:
        seed = random.randint(0, 2**32 - 1)
    
    results = {
        'benchmark_name': spec.name,
        'dataset_id': spec.dataset_id,
        'hyperparameters': hyperparameters,
        'seed': seed,
        'training_time_seconds': np.nan,
        'test_time_seconds': np.nan,
        'benchmark_type' : type(spec).__name__
    }

    spec.ensure_dataset_exists() 

    context = TrainingContext(spec=spec, hyperparameters=hyperparameters, seed=seed) 

    train_start_time = time.monotonic()
    model = build_model(context) 
    train_end_time = time.monotonic()
    results['training_time_seconds'] = train_end_time - train_start_time

    if model is None:
        raise RuntimeError(f"build_model for {spec.name} did not return a model.") 
        
    test_start_time = time.monotonic()
    test_results = spec.test_func(spec, model)
    test_end_time = time.monotonic()

    results['test_time_seconds'] = test_end_time - test_start_time
    
    results.update(test_results) # Merge test results
        
    return results

In [None]:
#| exporti
# Define a very simple build_model function for the example
def _dummy_build_model(context):
    print(f"Building model with spec: {context.spec.name}, seed: {context.seed}")

    def dummy_model(u_test,y_test):
        output_dim = len(context.spec.y_cols) 
        return np.zeros((u_test.shape[0], output_dim))
        
    return dummy_model # Return the callable model

In [None]:
# Example usage of run_benchmark
hyperparams = {'learning_rate': 0.01, 'epochs': 5} # Example hyperparameters

results = run_benchmark(
    spec=_spec_sim, 
    build_model=_dummy_build_model,
    hyperparameters=hyperparams
)

print("\nBenchmark Results:")
print(results)

Building model with spec: _spec_default, seed: 2196568183

Benchmark Results:
{'benchmark_name': '_spec_default', 'dataset_id': '_dummy_default', 'hyperparameters': {'learning_rate': 0.01, 'epochs': 5}, 'seed': 2196568183, 'training_time_seconds': 1.8916005501523614e-05, 'test_time_seconds': 0.001395874991430901, 'benchmark_type': 'BenchmarkSpecSimulation', 'metric_score': 0.5947432613358903}


In [None]:
def custom_test_logic(spec, model):
    test_dir = spec.dataset_path / 'test'
    test_files = sorted(list(test_dir.glob('*.hdf5'))) 
    max_errors = []
    for u_test, y_test, _ in _load_sequences_from_files(test_files, spec.u_cols, spec.y_cols, spec.x_cols):
        y_pred = model(u_test,y_test[:spec.init_window])
        max_errors.append(np.max(np.abs(y_test - y_pred)))

    avg_max_error = np.mean(max_errors) if max_errors else np.nan
    median_max_error = np.median(max_errors) if max_errors else np.nan
    return {'avg_max_abs_error': avg_max_error, 'median_max_abs_error': median_max_error} # Return results as dict

In [None]:
spec_with_custom_test = BenchmarkSpecSimulation(
    name="CustomTestExampleBench",
    dataset_id="dummy_core_data_v1", # Same dataset ID as before
    download_func=_dummy_dataset_loader, 
    u_cols=['u0', 'u1'], 
    y_cols=['y0'],      
    test_func=custom_test_logic,
    metric_func=identibench.metrics.rmse
)

In [None]:
# Run benchmark using the spec with the custom test function
hyperparams = {'model_type': 'dummy_v2'} 

results_custom_test = run_benchmark(
    spec=spec_with_custom_test, 
    build_model=_dummy_build_model,
    hyperparameters=hyperparams
)

print("\nBenchmark Results (Custom Test Example):")
print(results_custom_test)

# Note: The result dictionary now contains 'avg_max_abs_error' instead of 'metric_score'

Building model with spec: CustomTestExampleBench, seed: 218001267

Benchmark Results (Custom Test Example):
{'benchmark_name': 'CustomTestExampleBench', 'dataset_id': 'dummy_core_data_v1', 'hyperparameters': {'model_type': 'dummy_v2'}, 'seed': 218001267, 'training_time_seconds': 8.095901284832507e-05, 'test_time_seconds': 0.0012364580034045503, 'benchmark_type': 'BenchmarkSpecSimulation', 'avg_max_abs_error': np.float64(0.9871239066123962), 'median_max_abs_error': np.float64(0.9871239066123962)}


In [None]:
#| export
def run_multiple_benchmarks(
    specs: Union[List[BenchmarkSpecBase], Dict[str, BenchmarkSpecBase]], # Collection of specs to run
    build_model: Callable[[TrainingContext], Callable], # User function to build the model/predictor
    hyperparameters: Optional[Dict[str, Any]] = None, # Hyperparameters passed to build_model
    seed: Optional[int] = None, # Base random seed
    continue_on_error: bool = True, # If True, continue running benchmarks even if one fails
) -> List[Dict[str, Any]]:
    """
    Runs multiple benchmarks sequentially using the same build_model function.

    Args:
        specs: A list or dictionary containing the BenchmarkSpec objects to run.
        build_model: A callable that accepts a TrainingContext and returns a trained model/predictor function.
        hyperparameters: A dictionary of hyperparameters passed to the build_model function.
        seed: An optional integer seed passed to each run_benchmark call for reproducibility.
        continue_on_error: If True, catches exceptions during individual benchmark runs, prints a warning,
                           and continues. If False, stops on the first error.

    Returns:
        A list of result dictionaries containing the results from successful benchmark runs.
    """
    results_list = []
    hyperparameters = hyperparameters or {} # Ensure it's a dict

    # Determine the list of specification objects to iterate over
    spec_objects = list(specs.values()) if isinstance(specs, dict) else list(specs)

    print(f"--- Starting benchmark run for {len(spec_objects)} specifications ---")

    for i, spec in enumerate(spec_objects):
        spec_name = getattr(spec, 'name', f'Unnamed Spec {i+1}') # Get name for logging
        print(f"\n[{i+1}/{len(spec_objects)}] Running benchmark: {spec_name}")

        try:
            # Run the individual benchmark
            result = run_benchmark(
                spec=spec,
                build_model=build_model,
                hyperparameters=hyperparameters,
                seed=seed # Pass the same base seed to each run
            )
            results_list.append(result)
            print(f"  -> Success: {spec_name} completed.")

        except Exception as e:
            print(f"  -> ERROR running benchmark '{spec_name}': {e}")
            if not continue_on_error:
                print("Stopping due to error (continue_on_error=False).")
                raise # Re-raise the exception to halt execution
            # If continue_on_error is True, the loop continues automatically

    print(f"\n--- Benchmark run finished. {len(results_list)}/{len(spec_objects)} completed successfully. ---")
    return results_list

In [None]:
run_multiple_benchmarks(
    specs=[_spec_sim,_spec_pred,spec_with_custom_test], 
    build_model=_dummy_build_model
)

--- Starting benchmark run for 3 specifications ---

[1/3] Running benchmark: _spec_default
Building model with spec: _spec_default, seed: 4176054161
  -> Success: _spec_default completed.

[2/3] Running benchmark: _spec_pred_params
Building model with spec: _spec_pred_params, seed: 1541742625
  -> Success: _spec_pred_params completed.

[3/3] Running benchmark: CustomTestExampleBench
Building model with spec: CustomTestExampleBench, seed: 1357892571
  -> Success: CustomTestExampleBench completed.

--- Benchmark run finished. 3/3 completed successfully. ---


[{'benchmark_name': '_spec_default',
  'dataset_id': '_dummy_default',
  'hyperparameters': {},
  'seed': 4176054161,
  'training_time_seconds': 1.3541997759602964e-05,
  'test_time_seconds': 0.0019185830024071038,
  'benchmark_type': 'BenchmarkSpecSimulation',
  'metric_score': 0.5947432613358903},
 {'benchmark_name': '_spec_pred_params',
  'dataset_id': '_dummy_pred_params',
  'hyperparameters': {},
  'seed': 1541742625,
  'training_time_seconds': 5.750000127591193e-06,
  'test_time_seconds': 0.0007422499911626801,
  'benchmark_type': 'BenchmarkSpecPrediction',
  'metric_score': 0.5681130975059453},
 {'benchmark_name': 'CustomTestExampleBench',
  'dataset_id': 'dummy_core_data_v1',
  'hyperparameters': {},
  'seed': 1357892571,
  'training_time_seconds': 5.832989700138569e-06,
  'test_time_seconds': 0.0006590000120922923,
  'benchmark_type': 'BenchmarkSpecSimulation',
  'avg_max_abs_error': np.float64(0.9871239066123962),
  'median_max_abs_error': np.float64(0.9871239066123962)}]

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()