diff --git a/etna/ensembles/direct_ensemble.py b/etna/ensembles/direct_ensemble.py index 57dc3ca92..5725d51bd 100644 --- a/etna/ensembles/direct_ensemble.py +++ b/etna/ensembles/direct_ensemble.py @@ -125,7 +125,9 @@ def _merge(self, forecasts: List[TSDataset]) -> TSDataset: # TODO: Fix slicing with explicit passing the segments in issue #775 horizon, forecast = horizons[idx], forecasts[idx][:, segments, "target"] forecast_df.iloc[:horizon] = forecast - forecast_dataset = TSDataset(df=forecast_df, freq=forecasts[0].freq) + forecast_dataset = TSDataset( + df=forecast_df, freq=forecasts[0].freq, hierarchical_structure=forecasts[0].hierarchical_structure + ) return forecast_dataset def _forecast(self, ts: TSDataset, return_components: bool) -> TSDataset: diff --git a/etna/ensembles/stacking_ensemble.py b/etna/ensembles/stacking_ensemble.py index a1f044ac1..b7e614e6e 100644 --- a/etna/ensembles/stacking_ensemble.py +++ b/etna/ensembles/stacking_ensemble.py @@ -103,6 +103,12 @@ def __init__( self.joblib_params = joblib_params super().__init__(horizon=self._get_horizon(pipelines=pipelines)) + def _make_same_level(self, ts: TSDataset, forecasts: List[TSDataset]) -> TSDataset: + if ts.has_hierarchy(): + if ts.current_df_level != forecasts[0].current_df_level: + ts = ts.get_level_dataset(forecasts[0].current_df_level) # type: ignore + return ts + def _filter_features_to_use(self, forecasts: List[TSDataset]) -> Union[None, Set[str]]: """Return all the features from ``features_to_use`` which can be obtained from base models' forecasts.""" features_df = pd.concat([forecast.df for forecast in forecasts], axis=1) @@ -132,7 +138,7 @@ def _filter_features_to_use(self, forecasts: List[TSDataset]) -> Union[None, Set def _backtest_pipeline(self, pipeline: BasePipeline, ts: TSDataset) -> TSDataset: """Get forecasts from backtest for given pipeline.""" forecasts = pipeline.get_historical_forecasts(ts=ts, n_folds=self.n_folds) - forecasts = TSDataset(df=forecasts, freq=ts.freq) + forecasts = TSDataset(df=forecasts, freq=ts.freq, hierarchical_structure=ts.hierarchical_structure) return forecasts def fit(self, ts: TSDataset, save_ts: bool = True) -> "StackingEnsemble": @@ -174,6 +180,8 @@ def _make_features( self, ts: TSDataset, forecasts: List[TSDataset], train: bool = False ) -> Tuple[pd.DataFrame, Optional[pd.Series]]: """Prepare features for the ``final_model``.""" + ts = self._make_same_level(ts=ts, forecasts=forecasts) + # Stack targets from the forecasts targets = [ forecast[:, :, "target"].rename({"target": f"regressor_target_{i}"}, level="feature", axis=1) @@ -210,6 +218,8 @@ def _make_features( return x, None def _process_forecasts(self, ts: TSDataset, forecasts: List[TSDataset]) -> TSDataset: + ts = self._make_same_level(ts=ts, forecasts=forecasts) + x, _ = self._make_features(ts=ts, forecasts=forecasts, train=False) y = self.final_model.predict(x) num_segments = len(forecasts[0].segments) @@ -225,7 +235,7 @@ def _process_forecasts(self, ts: TSDataset, forecasts: List[TSDataset]) -> TSDat df = forecasts[0][:, :, "target"].copy() df.loc[pd.IndexSlice[:], pd.IndexSlice[:, "target"]] = np.NAN - result = TSDataset(df=df, freq=ts.freq, df_exog=df_exog) + result = TSDataset(df=df, freq=ts.freq, df_exog=df_exog, hierarchical_structure=ts.hierarchical_structure) result.loc[pd.IndexSlice[:], pd.IndexSlice[:, "target"]] = y return result diff --git a/etna/ensembles/voting_ensemble.py b/etna/ensembles/voting_ensemble.py index e6addb561..bd7046496 100644 --- a/etna/ensembles/voting_ensemble.py +++ b/etna/ensembles/voting_ensemble.py @@ -192,7 +192,9 @@ def _vote(self, forecasts: List[TSDataset]) -> TSDataset: forecast_df = sum( [forecast[:, :, "target"] * weight for forecast, weight in zip(forecasts, self.processed_weights)] ) - forecast_dataset = TSDataset(df=forecast_df, freq=forecasts[0].freq) + forecast_dataset = TSDataset( + df=forecast_df, freq=forecasts[0].freq, hierarchical_structure=forecasts[0].hierarchical_structure + ) return forecast_dataset def _forecast(self, ts: TSDataset, return_components: bool) -> TSDataset: diff --git a/etna/pipeline/base.py b/etna/pipeline/base.py index aee810495..f83f68839 100644 --- a/etna/pipeline/base.py +++ b/etna/pipeline/base.py @@ -591,7 +591,9 @@ def _validate_backtest_stride(n_folds: Union[int, List[FoldMask]], horizon: int, return stride @staticmethod - def _validate_backtest_dataset(ts: TSDataset, n_folds: int, horizon: int, stride: int): + def _validate_backtest_dataset( + ts: TSDataset, n_folds: int, horizon: int, stride: int + ): # TODO: try to optimize, works really slow on datasets with large number of segments """Check all segments have enough timestamps to validate forecaster with given number of splits.""" min_required_length = horizon + (n_folds - 1) * stride segments = set(ts.df.columns.get_level_values("segment")) @@ -673,6 +675,10 @@ def _compute_metrics( self, metrics: List[Metric], y_true: TSDataset, y_pred: TSDataset ) -> Dict[str, Dict[str, float]]: """Compute metrics for given y_true, y_pred.""" + if y_true.has_hierarchy(): + if y_true.current_df_level != y_pred.current_df_level: + y_true = y_true.get_level_dataset(y_pred.current_df_level) # type: ignore + metrics_values: Dict[str, Dict[str, float]] = {} for metric in metrics: metrics_values[metric.name] = metric(y_true=y_true, y_pred=y_pred) # type: ignore diff --git a/etna/pipeline/hierarchical_pipeline.py b/etna/pipeline/hierarchical_pipeline.py index ea7d6f7d9..abca3eb76 100644 --- a/etna/pipeline/hierarchical_pipeline.py +++ b/etna/pipeline/hierarchical_pipeline.py @@ -1,5 +1,3 @@ -from typing import Dict -from typing import List from typing import Optional from typing import Sequence @@ -7,7 +5,6 @@ from etna.datasets.hierarchical_structure import HierarchicalStructure from etna.datasets.tsdataset import TSDataset -from etna.metrics import Metric from etna.models.base import ModelType from etna.pipeline.pipeline import Pipeline from etna.reconciliation.base import BaseReconciliator @@ -278,21 +275,6 @@ def predict( forecast_reconciled = self.reconciliator.reconcile(forecast) return forecast_reconciled - def _compute_metrics( - self, metrics: List[Metric], y_true: TSDataset, y_pred: TSDataset - ) -> Dict[str, Dict[str, float]]: - """Compute metrics for given y_true, y_pred.""" - if y_true.current_df_level != self.reconciliator.target_level: - y_true = y_true.get_level_dataset(self.reconciliator.target_level) - - if y_pred.current_df_level == self.reconciliator.source_level: - y_pred = self.reconciliator.reconcile(y_pred) - - metrics_values: Dict[str, Dict[str, float]] = {} - for metric in metrics: - metrics_values[metric.name] = metric(y_true=y_true, y_pred=y_pred) # type: ignore - return metrics_values - def _forecast_prediction_interval( self, ts: TSDataset, predictions: TSDataset, quantiles: Sequence[float], n_folds: int ) -> TSDataset: diff --git a/tests/conftest.py b/tests/conftest.py index d84aab260..552e7784a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -548,6 +548,20 @@ def product_level_df(): return df +@pytest.fixture +def product_level_df_long_history(periods=200): + n_segments = 4 + df = pd.DataFrame( + { + "timestamp": list(pd.date_range(start="2000-01-01", periods=periods, freq="D")) * n_segments, + "segment": ["a"] * periods + ["b"] * periods + ["c"] * periods + ["d"] * periods, + "target": np.random.uniform(low=0, high=100, size=periods * n_segments), + } + ) + df = TSDataset.to_dataset(df) + return df + + @pytest.fixture def product_level_df_w_nans(): df = pd.DataFrame( @@ -644,6 +658,12 @@ def product_level_simple_hierarchical_ts(product_level_df, hierarchical_structur return ts +@pytest.fixture +def product_level_simple_hierarchical_ts_long_history(product_level_df_long_history, hierarchical_structure): + ts = TSDataset(df=product_level_df_long_history, freq="D", hierarchical_structure=hierarchical_structure) + return ts + + @pytest.fixture def simple_no_hierarchy_ts(market_level_df): ts = TSDataset(df=market_level_df, freq="D") diff --git a/tests/test_ensembles/conftest.py b/tests/test_ensembles/conftest.py index af88d747d..804e406e9 100644 --- a/tests/test_ensembles/conftest.py +++ b/tests/test_ensembles/conftest.py @@ -15,7 +15,10 @@ from etna.models import CatBoostPerSegmentModel from etna.models import NaiveModel from etna.models import ProphetModel +from etna.pipeline import HierarchicalPipeline from etna.pipeline import Pipeline +from etna.reconciliation import BottomUpReconciliator +from etna.reconciliation import TopDownReconciliator from etna.transforms import DateFlagsTransform from etna.transforms import LagTransform @@ -45,6 +48,42 @@ def naive_pipeline() -> Pipeline: return pipeline +@pytest.fixture +def naive_pipeline_top_down_market_14() -> Pipeline: + """Generate pipeline with NaiveModel.""" + pipeline = HierarchicalPipeline( + model=NaiveModel(14), + transforms=[], + horizon=14, + reconciliator=TopDownReconciliator(source_level="total", target_level="market", period=7, method="AHP"), + ) + return pipeline + + +@pytest.fixture +def naive_pipeline_top_down_product_14() -> Pipeline: + """Generate pipeline with NaiveModel.""" + pipeline = HierarchicalPipeline( + model=NaiveModel(14), + transforms=[], + horizon=14, + reconciliator=TopDownReconciliator(source_level="total", target_level="product", period=7, method="AHP"), + ) + return pipeline + + +@pytest.fixture +def naive_pipeline_bottom_up_market_14() -> Pipeline: + """Generate pipeline with NaiveModel.""" + pipeline = HierarchicalPipeline( + model=NaiveModel(14), + transforms=[], + horizon=14, + reconciliator=BottomUpReconciliator(source_level="product", target_level="market"), + ) + return pipeline + + @pytest.fixture def naive_pipeline_1() -> Pipeline: """Generate pipeline with NaiveModel(1).""" @@ -67,6 +106,22 @@ def voting_ensemble_pipeline( return pipeline +@pytest.fixture +def voting_ensemble_hierarchical_pipeline( + naive_pipeline_top_down_market_14: HierarchicalPipeline, naive_pipeline_bottom_up_market_14: HierarchicalPipeline +) -> VotingEnsemble: + pipeline = VotingEnsemble(pipelines=[naive_pipeline_top_down_market_14, naive_pipeline_bottom_up_market_14]) + return pipeline + + +@pytest.fixture +def voting_ensemble_mix_pipeline( + naive_pipeline: Pipeline, naive_pipeline_top_down_product_14: HierarchicalPipeline +) -> VotingEnsemble: + pipeline = VotingEnsemble(pipelines=[naive_pipeline, naive_pipeline_top_down_product_14]) + return pipeline + + @pytest.fixture def voting_ensemble_naive(naive_pipeline_1: Pipeline, naive_pipeline_2: Pipeline) -> VotingEnsemble: pipeline = VotingEnsemble(pipelines=[naive_pipeline_1, naive_pipeline_2]) @@ -81,6 +136,22 @@ def stacking_ensemble_pipeline( return pipeline +@pytest.fixture +def stacking_ensemble_hierarchical_pipeline( + naive_pipeline_top_down_market_14: HierarchicalPipeline, naive_pipeline_bottom_up_market_14: HierarchicalPipeline +) -> StackingEnsemble: + pipeline = StackingEnsemble(pipelines=[naive_pipeline_top_down_market_14, naive_pipeline_bottom_up_market_14]) + return pipeline + + +@pytest.fixture +def stacking_ensemble_mix_pipeline( + naive_pipeline: Pipeline, naive_pipeline_top_down_product_14: HierarchicalPipeline +) -> StackingEnsemble: + pipeline = StackingEnsemble(pipelines=[naive_pipeline, naive_pipeline_top_down_product_14]) + return pipeline + + @pytest.fixture def naive_featured_pipeline_1() -> Pipeline: """Generate pipeline with NaiveModel(1).""" diff --git a/tests/test_ensembles/test_direct_ensemble.py b/tests/test_ensembles/test_direct_ensemble.py index 00a6da5c0..04f8b5693 100644 --- a/tests/test_ensembles/test_direct_ensemble.py +++ b/tests/test_ensembles/test_direct_ensemble.py @@ -6,8 +6,11 @@ from etna.datasets import TSDataset from etna.datasets import generate_from_patterns_df from etna.ensembles import DirectEnsemble +from etna.metrics import MAE from etna.models import NaiveModel +from etna.pipeline import HierarchicalPipeline from etna.pipeline import Pipeline +from etna.reconciliation import TopDownReconciliator from tests.test_pipeline.utils import assert_pipeline_equals_loaded_original from tests.test_pipeline.utils import assert_pipeline_forecast_raise_error_if_no_ts from tests.test_pipeline.utils import assert_pipeline_forecasts_given_ts @@ -26,6 +29,54 @@ def direct_ensemble_pipeline() -> DirectEnsemble: return ensemble +@pytest.fixture +def naive_pipeline_top_down_market_7() -> Pipeline: + """Generate pipeline with NaiveModel.""" + pipeline = HierarchicalPipeline( + model=NaiveModel(), + transforms=[], + horizon=7, + reconciliator=TopDownReconciliator(source_level="total", target_level="market", period=14, method="AHP"), + ) + return pipeline + + +@pytest.fixture +def naive_pipeline_top_down_product_7() -> Pipeline: + """Generate pipeline with NaiveModel.""" + pipeline = HierarchicalPipeline( + model=NaiveModel(), + transforms=[], + horizon=7, + reconciliator=TopDownReconciliator(source_level="total", target_level="product", period=14, method="AHP"), + ) + return pipeline + + +@pytest.fixture +def direct_ensemble_hierarchical_pipeline( + naive_pipeline_top_down_market_7, naive_pipeline_bottom_up_market_14 +) -> DirectEnsemble: + ensemble = DirectEnsemble( + pipelines=[ + naive_pipeline_top_down_market_7, + naive_pipeline_bottom_up_market_14, + ] + ) + return ensemble + + +@pytest.fixture +def direct_ensemble_mix_pipeline(naive_pipeline_top_down_product_7) -> DirectEnsemble: + ensemble = DirectEnsemble( + pipelines=[ + naive_pipeline_top_down_product_7, + Pipeline(model=NaiveModel(), transforms=[], horizon=14), + ] + ) + return ensemble + + @pytest.fixture def simple_ts_train(): df = generate_from_patterns_df(patterns=[[1, 3, 5], [2, 4, 6], [7, 9, 11]], periods=3, start_time="2000-01-01") @@ -123,3 +174,32 @@ def test_predict_with_return_components_fails(example_tsds, direct_ensemble_pipe def test_params_to_tune_not_implemented(direct_ensemble_pipeline): with pytest.raises(NotImplementedError, match="DirectEnsemble doesn't support this method"): _ = direct_ensemble_pipeline.params_to_tune() + + +@pytest.mark.parametrize("n_jobs", (1, 5)) +def test_backtest(direct_ensemble_pipeline, example_tsds, n_jobs: int): + results = direct_ensemble_pipeline.backtest(ts=example_tsds, metrics=[MAE()], n_jobs=n_jobs, n_folds=3) + for df in results: + assert isinstance(df, pd.DataFrame) + + +@pytest.mark.parametrize("n_jobs", (1, 5)) +def test_backtest_hierarchical_pipeline( + direct_ensemble_hierarchical_pipeline, product_level_simple_hierarchical_ts_long_history: TSDataset, n_jobs: int +): + results = direct_ensemble_hierarchical_pipeline.backtest( + ts=product_level_simple_hierarchical_ts_long_history, metrics=[MAE()], n_jobs=n_jobs, n_folds=3 + ) + for df in results: + assert isinstance(df, pd.DataFrame) + + +@pytest.mark.parametrize("n_jobs", (1, 5)) +def test_backtest_mix_pipeline( + direct_ensemble_mix_pipeline, product_level_simple_hierarchical_ts_long_history: TSDataset, n_jobs: int +): + results = direct_ensemble_mix_pipeline.backtest( + ts=product_level_simple_hierarchical_ts_long_history, metrics=[MAE()], n_jobs=n_jobs, n_folds=3 + ) + for df in results: + assert isinstance(df, pd.DataFrame) diff --git a/tests/test_ensembles/test_stacking_ensemble.py b/tests/test_ensembles/test_stacking_ensemble.py index d76dd1e50..198bd33db 100644 --- a/tests/test_ensembles/test_stacking_ensemble.py +++ b/tests/test_ensembles/test_stacking_ensemble.py @@ -331,6 +331,34 @@ def test_backtest(stacking_ensemble_pipeline: StackingEnsemble, example_tsds: TS assert isinstance(df, pd.DataFrame) +@pytest.mark.parametrize("n_jobs", (1, 5)) +def test_backtest_hierarchical_pipeline( + stacking_ensemble_hierarchical_pipeline: StackingEnsemble, + product_level_simple_hierarchical_ts_long_history: TSDataset, + n_jobs: int, +): + """Check that backtest works with StackingEnsemble of hierarchical pipelines.""" + results = stacking_ensemble_hierarchical_pipeline.backtest( + ts=product_level_simple_hierarchical_ts_long_history, metrics=[MAE()], n_jobs=n_jobs, n_folds=3 + ) + for df in results: + assert isinstance(df, pd.DataFrame) + + +@pytest.mark.parametrize("n_jobs", (1, 5)) +def test_backtest_mix_pipeline( + stacking_ensemble_mix_pipeline: StackingEnsemble, + product_level_simple_hierarchical_ts_long_history: TSDataset, + n_jobs: int, +): + """Check that backtest works with StackingEnsemble of pipeline and hierarchical pipeline.""" + results = stacking_ensemble_mix_pipeline.backtest( + ts=product_level_simple_hierarchical_ts_long_history, metrics=[MAE()], n_jobs=n_jobs, n_folds=3 + ) + for df in results: + assert isinstance(df, pd.DataFrame) + + @pytest.mark.parametrize("n_jobs", (1, 5)) def test_get_historical_forecasts(stacking_ensemble_pipeline: StackingEnsemble, example_tsds: TSDataset, n_jobs: int): """Check that get_historical_forecasts works with StackingEnsemble.""" diff --git a/tests/test_ensembles/test_voting_ensemble.py b/tests/test_ensembles/test_voting_ensemble.py index 7a3986a28..132989a44 100644 --- a/tests/test_ensembles/test_voting_ensemble.py +++ b/tests/test_ensembles/test_voting_ensemble.py @@ -212,6 +212,34 @@ def test_backtest(voting_ensemble_pipeline: VotingEnsemble, example_tsds: TSData assert isinstance(df, pd.DataFrame) +@pytest.mark.parametrize("n_jobs", (1, 5)) +def test_backtest_hierarchical_pipeline( + voting_ensemble_hierarchical_pipeline: VotingEnsemble, + product_level_simple_hierarchical_ts_long_history: TSDataset, + n_jobs: int, +): + """Check that backtest works with VotingEnsemble of hierarchical pipelines.""" + results = voting_ensemble_hierarchical_pipeline.backtest( + ts=product_level_simple_hierarchical_ts_long_history, metrics=[MAE()], n_jobs=n_jobs, n_folds=3 + ) + for df in results: + assert isinstance(df, pd.DataFrame) + + +@pytest.mark.parametrize("n_jobs", (1, 5)) +def test_backtest_mix_pipeline( + voting_ensemble_mix_pipeline: VotingEnsemble, + product_level_simple_hierarchical_ts_long_history: TSDataset, + n_jobs: int, +): + """Check that backtest works with VotingEnsemble of pipeline and hierarchical pipeline.""" + results = voting_ensemble_mix_pipeline.backtest( + ts=product_level_simple_hierarchical_ts_long_history, metrics=[MAE()], n_jobs=n_jobs, n_folds=3 + ) + for df in results: + assert isinstance(df, pd.DataFrame) + + @pytest.mark.parametrize("n_jobs", (1, 5)) def test_get_historical_forecasts(voting_ensemble_pipeline: VotingEnsemble, example_tsds: TSDataset, n_jobs: int): """Check that get_historical_forecasts works with VotingEnsemble."""