Skip to content

Commit

Permalink
Estimation of historical forecast on backtest folds without metrics (#…
Browse files Browse the repository at this point in the history
…143)

* added `get_historical_forecasts`

* added tests

* updated code to use `get_historical_forecasts`

* added test

* fixed test

* updated changelog

* changed tests names

* removed duplicated tests
  • Loading branch information
brsnw250 authored Nov 15, 2023
1 parent f8e5ba5 commit 51e6d92
Show file tree
Hide file tree
Showing 13 changed files with 220 additions and 30 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Add params_to_tune for DeepStateModel ([#115](https://github.com/etna-team/etna/issues/115))
- Handle new functionality for prediction intervals in the `plot_forecast` ([#130](https://github.com/etna-team/etna/pull/130))
-
- Add `get_historical_forecasts` to pipelines for forecast estimation at each fold on the historical dataset ([#143](https://github.com/etna-team/etna/pull/143))
-
-
-
Expand Down
5 changes: 1 addition & 4 deletions etna/ensembles/stacking_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
from etna.distributions import BaseDistribution
from etna.ensembles.mixins import EnsembleMixin
from etna.ensembles.mixins import SaveEnsembleMixin
from etna.loggers import tslogger
from etna.metrics import MAE
from etna.pipeline.base import BasePipeline


Expand Down Expand Up @@ -133,8 +131,7 @@ def _filter_features_to_use(self, forecasts: List[TSDataset]) -> Union[None, Set

def _backtest_pipeline(self, pipeline: BasePipeline, ts: TSDataset) -> TSDataset:
"""Get forecasts from backtest for given pipeline."""
with tslogger.disable():
_, forecasts, _ = pipeline.backtest(ts=ts, metrics=[MAE()], n_folds=self.n_folds)
forecasts = pipeline.get_historical_forecasts(ts=ts, n_folds=self.n_folds)
forecasts = TSDataset(df=forecasts, freq=ts.freq)
return forecasts

Expand Down
5 changes: 1 addition & 4 deletions etna/ensembles/voting_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
from etna.distributions import BaseDistribution
from etna.ensembles.mixins import EnsembleMixin
from etna.ensembles.mixins import SaveEnsembleMixin
from etna.loggers import tslogger
from etna.metrics import MAE
from etna.pipeline.base import BasePipeline


Expand Down Expand Up @@ -126,8 +124,7 @@ def _validate_weights(weights: Optional[Union[List[float], Literal["auto"]]], pi

def _backtest_pipeline(self, pipeline: BasePipeline, ts: TSDataset) -> TSDataset:
"""Get forecasts from backtest for given pipeline."""
with tslogger.disable():
_, forecasts, _ = pipeline.backtest(ts, metrics=[MAE()], n_folds=self.n_folds)
forecasts = pipeline.get_historical_forecasts(ts=ts, n_folds=self.n_folds)
forecasts = TSDataset(df=forecasts, freq=ts.freq)
return forecasts

Expand Down
7 changes: 1 addition & 6 deletions etna/experimental/prediction_intervals/naive_variance.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@

from etna.datasets import TSDataset
from etna.experimental.prediction_intervals import BasePredictionIntervals
from etna.loggers import tslogger
from etna.pipeline import BasePipeline
from etna.pipeline.base import _DummyMetric


class NaiveVariancePredictionIntervals(BasePredictionIntervals):
Expand Down Expand Up @@ -93,10 +91,7 @@ def _compute_resids_matrices(self, ts: TSDataset, n_folds: int) -> np.ndarray:
:
Residuals matrices for each segment. Array with shape: ``(n_folds, horizon, n_segments)``.
"""
with tslogger.disable():
_, backtest_forecasts, _ = self.backtest(
ts=ts, metrics=[_DummyMetric()], n_folds=n_folds, stride=self.stride
)
backtest_forecasts = self.get_historical_forecasts(ts=ts, n_folds=n_folds, stride=self.stride)

residuals = backtest_forecasts.loc[:, pd.IndexSlice[:, "target"]] - ts[backtest_forecasts.index, :, "target"]

Expand Down
71 changes: 69 additions & 2 deletions etna/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,7 @@ def _forecast_prediction_interval(
self, ts: TSDataset, predictions: TSDataset, quantiles: Sequence[float], n_folds: int
) -> TSDataset:
"""Add prediction intervals to the forecasts."""
with tslogger.disable():
_, forecasts, _ = self.backtest(ts=ts, metrics=[_DummyMetric()], n_folds=n_folds)
forecasts = self.get_historical_forecasts(ts=ts, n_folds=n_folds)

self._add_forecast_borders(ts=ts, backtest_forecasts=forecasts, quantiles=quantiles, predictions=predictions)

Expand Down Expand Up @@ -994,3 +993,71 @@ def backtest(
tslogger.finish_experiment()

return metrics_df, forecast_df, fold_info_df

def get_historical_forecasts(
self,
ts: TSDataset,
n_folds: Union[int, List[FoldMask]] = 5,
mode: Optional[str] = None,
n_jobs: int = 1,
refit: Union[bool, int] = True,
stride: Optional[int] = None,
joblib_params: Optional[Dict[str, Any]] = None,
forecast_params: Optional[Dict[str, Any]] = None,
) -> pd.DataFrame:
"""Estimate forecast for each fold on the historical dataset.
If ``refit != True`` and some component of the pipeline doesn't support forecasting with gap, this component will raise an exception.
Parameters
----------
ts:
Dataset to fit models in backtest
n_folds:
Number of folds or the list of fold masks
mode:
Train generation policy: 'expand' or 'constant'. Works only if ``n_folds`` is integer.
By default, is set to 'expand'.
n_jobs:
Number of jobs to run in parallel
refit:
Determines how often pipeline should be retrained during iteration over folds.
* If ``True``: pipeline is retrained on each fold.
* If ``False``: pipeline is trained only on the first fold.
* If ``value: int``: pipeline is trained every ``value`` folds starting from the first.
stride:
Number of points between folds. Works only if ``n_folds`` is integer. By default, is set to ``horizon``.
joblib_params:
Additional parameters for :py:class:`joblib.Parallel`
forecast_params:
Additional parameters for :py:func:`~etna.pipeline.base.BasePipeline.forecast`
Returns
-------
:
Forecast dataframe
Raises
------
ValueError:
If ``mode`` is set when ``n_folds`` are ``List[FoldMask]``.
ValueError:
If ``stride`` is set when ``n_folds`` are ``List[FoldMask]``.
"""
with tslogger.disable():
_, forecasts, _ = self.backtest(
ts=ts,
metrics=[_DummyMetric()],
n_folds=n_folds,
mode=mode,
n_jobs=n_jobs,
refit=refit,
stride=stride,
joblib_params=joblib_params,
forecast_params=forecast_params,
)
return forecasts
5 changes: 1 addition & 4 deletions etna/pipeline/hierarchical_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

from etna.datasets.hierarchical_structure import HierarchicalStructure
from etna.datasets.tsdataset import TSDataset
from etna.loggers import tslogger
from etna.metrics import MAE
from etna.metrics import Metric
from etna.models.base import ModelType
from etna.pipeline.pipeline import Pipeline
Expand Down Expand Up @@ -306,8 +304,7 @@ def _forecast_prediction_interval(
try:
# TODO: rework intervals estimation for `BottomUpReconciliator`

with tslogger.disable():
_, forecasts, _ = self.backtest(ts=ts, metrics=[MAE()], n_folds=n_folds)
forecasts = self.get_historical_forecasts(ts=ts, n_folds=n_folds)

source_ts = self.reconciliator.aggregate(ts=ts)
self._add_forecast_borders(
Expand Down
9 changes: 9 additions & 0 deletions tests/test_ensembles/test_stacking_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,15 @@ def test_backtest(stacking_ensemble_pipeline: StackingEnsemble, example_tsds: TS
assert isinstance(df, pd.DataFrame)


@pytest.mark.parametrize("n_jobs", (1, 5))
def test_get_historical_forecasts(stacking_ensemble_pipeline: StackingEnsemble, example_tsds: TSDataset, n_jobs: int):
"""Check that get_historical_forecasts works with StackingEnsemble."""
n_folds = 3
forecast = stacking_ensemble_pipeline.get_historical_forecasts(ts=example_tsds, n_jobs=n_jobs, n_folds=n_folds)
assert isinstance(forecast, pd.DataFrame)
assert len(forecast) == n_folds * stacking_ensemble_pipeline.horizon


@pytest.mark.parametrize("load_ts", [True, False])
def test_save_load(stacking_ensemble_pipeline, example_tsds, load_ts):
assert_pipeline_equals_loaded_original(pipeline=stacking_ensemble_pipeline, ts=example_tsds, load_ts=load_ts)
Expand Down
9 changes: 9 additions & 0 deletions tests/test_ensembles/test_voting_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,15 @@ def test_backtest(voting_ensemble_pipeline: VotingEnsemble, example_tsds: TSData
assert isinstance(df, pd.DataFrame)


@pytest.mark.parametrize("n_jobs", (1, 5))
def test_get_historical_forecasts(voting_ensemble_pipeline: VotingEnsemble, example_tsds: TSDataset, n_jobs: int):
"""Check that get_historical_forecasts works with VotingEnsemble."""
n_folds = 3
forecasts = voting_ensemble_pipeline.get_historical_forecasts(ts=example_tsds, n_jobs=n_jobs, n_folds=n_folds)
assert isinstance(forecasts, pd.DataFrame)
assert len(forecasts) == n_folds * voting_ensemble_pipeline.horizon


@pytest.mark.parametrize("load_ts", [True, False])
def test_save_load(load_ts, voting_ensemble_pipeline, example_tsds):
assert_pipeline_equals_loaded_original(pipeline=voting_ensemble_pipeline, ts=example_tsds, load_ts=load_ts)
Expand Down
11 changes: 11 additions & 0 deletions tests/test_experimental/test_prediction_intervals/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,17 @@ def test_backtest(example_tsds, pipeline_name, request):
pd.testing.assert_frame_equal(pipeline_results, intervals_pipeline_results)


@pytest.mark.parametrize("pipeline_name", ("naive_pipeline", "naive_pipeline_with_transforms"))
def test_get_historical_forecasts(example_tsds, pipeline_name, request):
pipeline = request.getfixturevalue(pipeline_name)
pipeline_results = pipeline.get_historical_forecasts(ts=example_tsds)

intervals_pipeline = DummyPredictionIntervals(pipeline=pipeline)
intervals_pipeline_results = intervals_pipeline.get_historical_forecasts(ts=example_tsds)

pd.testing.assert_frame_equal(pipeline_results, intervals_pipeline_results)


@pytest.mark.parametrize(
"expected_columns",
({"target", "target_lower", "target_upper"},),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@


def test_invalid_stride_error():
with pytest.raises(ValueError, match="Parameter ``stride`` must be positive!"):
with pytest.raises(ValueError, match="Parameter `stride` must be positive!"):
NaiveVariancePredictionIntervals(pipeline=Pipeline(model=NaiveModel()), stride=-1)


Expand Down
9 changes: 9 additions & 0 deletions tests/test_pipeline/test_autoregressive_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,15 @@ def test_backtest_forecasts_sanity(step_ts: TSDataset):
assert np.all(forecast_df == expected_forecast_df)


def test_get_historical_forecasts_sanity(step_ts: TSDataset):
"""Check that AutoRegressivePipeline.get_historical_forecasts gives correct forecasts according to the simple case."""
ts, expected_metrics_df, expected_forecast_df = step_ts
pipeline = AutoRegressivePipeline(model=NaiveModel(), horizon=5, step=1)
forecast_df = pipeline.get_historical_forecasts(ts, n_folds=3)

assert np.all(forecast_df == expected_forecast_df)


@pytest.mark.parametrize(
"model, transforms",
[
Expand Down
18 changes: 18 additions & 0 deletions tests/test_pipeline/test_hierarchical_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,24 @@ def test_backtest(market_level_constant_hierarchical_ts, reconciliator):
np.testing.assert_allclose(metrics["MAE"], 0)


@pytest.mark.parametrize(
"reconciliator",
(
TopDownReconciliator(target_level="market", source_level="total", period=1, method="AHP"),
TopDownReconciliator(target_level="market", source_level="total", period=1, method="PHA"),
BottomUpReconciliator(target_level="total", source_level="market"),
),
)
def test_get_historical_forecasts(market_level_constant_hierarchical_ts, reconciliator):
ts = market_level_constant_hierarchical_ts
n_folds = 2
model = NaiveModel()
pipeline = HierarchicalPipeline(reconciliator=reconciliator, model=model, transforms=[], horizon=1)
forecasts = pipeline.get_historical_forecasts(ts=ts, n_folds=n_folds)
assert isinstance(forecasts, pd.DataFrame)
assert len(forecasts) == n_folds * pipeline.horizon


@pytest.mark.parametrize(
"reconciliator",
(
Expand Down
Loading

0 comments on commit 51e6d92

Please sign in to comment.