Skip to content

Commit

Permalink
Get rid of using TSDataset in method _backtest_pipeline of ensemb…
Browse files Browse the repository at this point in the history
…les (#409)
  • Loading branch information
brsnw250 committed Jun 25, 2024
1 parent ccc5ba4 commit b419f9a
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 14 deletions.
25 changes: 14 additions & 11 deletions etna/ensembles/stacking_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,16 @@ def __init__(
self.joblib_params = joblib_params
super().__init__(horizon=self._get_horizon(pipelines=pipelines))

def _make_same_level(self, ts: TSDataset, forecasts: List[TSDataset]) -> TSDataset:
def _make_same_level(self, ts: TSDataset, forecasts: List[pd.DataFrame]) -> TSDataset:
if ts.has_hierarchy():
if ts.current_df_level != forecasts[0].current_df_level:
ts = ts.get_level_dataset(forecasts[0].current_df_level) # type: ignore
current_df_level = ts._get_dataframe_level(df=forecasts[0])
if ts.current_df_level != current_df_level:
ts = ts.get_level_dataset(current_df_level) # type: ignore
return ts

def _filter_features_to_use(self, forecasts: List[TSDataset]) -> Union[None, Set[str]]:
def _filter_features_to_use(self, forecasts: List[pd.DataFrame]) -> Union[None, Set[str]]:
"""Return all the features from ``features_to_use`` which can be obtained from base models' forecasts."""
features_df = pd.concat([forecast.df for forecast in forecasts], axis=1)
features_df = pd.concat(forecasts, axis=1)
available_features = set(features_df.columns.get_level_values("feature")) - {"fold_number"}
features_to_use = self.features_to_use
if features_to_use is None:
Expand All @@ -134,10 +135,9 @@ def _filter_features_to_use(self, forecasts: List[TSDataset]) -> Union[None, Set
)
return None

def _backtest_pipeline(self, pipeline: BasePipeline, ts: TSDataset) -> TSDataset:
def _backtest_pipeline(self, pipeline: BasePipeline, ts: TSDataset) -> pd.DataFrame:
"""Get forecasts from backtest for given pipeline."""
forecasts = pipeline.get_historical_forecasts(ts=ts, n_folds=self.n_folds)
forecasts = TSDataset(df=forecasts, freq=ts.freq, hierarchical_structure=ts.hierarchical_structure)
return forecasts

def fit(self, ts: TSDataset, save_ts: bool = True) -> "StackingEnsemble":
Expand Down Expand Up @@ -176,14 +176,16 @@ def fit(self, ts: TSDataset, save_ts: bool = True) -> "StackingEnsemble":
return self

def _make_features(
self, ts: TSDataset, forecasts: List[TSDataset], train: bool = False
self, ts: TSDataset, forecasts: List[pd.DataFrame], train: bool = False
) -> Tuple[pd.DataFrame, Optional[pd.Series]]:
"""Prepare features for the ``final_model``."""
ts = self._make_same_level(ts=ts, forecasts=forecasts)

# Stack targets from the forecasts
targets = [
forecast[:, :, "target"].rename({"target": f"regressor_target_{i}"}, level="feature", axis=1)
forecast.loc[:, pd.IndexSlice[:, "target"]].rename(
{"target": f"regressor_target_{i}"}, level="feature", axis=1
)
for i, forecast in enumerate(forecasts)
]
targets = pd.concat(targets, axis=1)
Expand All @@ -200,7 +202,8 @@ def _make_features(
for forecast in forecasts
]
features = pd.concat(
[forecast[:, :, features_in_forecasts[i]] for i, forecast in enumerate(forecasts)], axis=1
[forecast.loc[:, pd.IndexSlice[:, features_in_forecasts[i]]] for i, forecast in enumerate(forecasts)],
axis=1,
)
features = features.loc[:, ~features.columns.duplicated()]
features_df = pd.concat([features, targets], axis=1)
Expand All @@ -216,7 +219,7 @@ def _make_features(
else:
return x, None

def _process_forecasts(self, ts: TSDataset, forecasts: List[TSDataset]) -> TSDataset:
def _process_forecasts(self, ts: TSDataset, forecasts: List[pd.DataFrame]) -> TSDataset:
ts = self._make_same_level(ts=ts, forecasts=forecasts)

x, _ = self._make_features(ts=ts, forecasts=forecasts, train=False)
Expand Down
5 changes: 2 additions & 3 deletions etna/ensembles/voting_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,9 @@ def _validate_weights(weights: Optional[Union[List[float], Literal["auto"]]], pi
else:
raise ValueError("Invalid format of weights is passed!")

def _backtest_pipeline(self, pipeline: BasePipeline, ts: TSDataset) -> TSDataset:
def _backtest_pipeline(self, pipeline: BasePipeline, ts: TSDataset) -> pd.DataFrame:
"""Get forecasts from backtest for given pipeline."""
forecasts = pipeline.get_historical_forecasts(ts=ts, n_folds=self.n_folds)
forecasts = TSDataset(df=forecasts, freq=ts.freq)
return forecasts

def _process_weights(self, ts: TSDataset) -> List[float]:
Expand All @@ -138,7 +137,7 @@ def _process_weights(self, ts: TSDataset) -> List[float]:

x = pd.concat(
[
forecast[:, :, "target"].rename({"target": f"target_{i}"}, axis=1)
forecast.loc[:, pd.IndexSlice[:, "target"]].rename({"target": f"target_{i}"}, axis=1)
for i, forecast in enumerate(forecasts)
],
axis=1,
Expand Down

0 comments on commit b419f9a

Please sign in to comment.