-
Notifications
You must be signed in to change notification settings - Fork 89
/
_ensemble.py
428 lines (361 loc) · 15.1 KB
/
_ensemble.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
"""Implements ensemble forecasters.
Creates univariate (optionally weighted)
combination of the predictions from underlying forecasts.
"""
__author__ = ["mloning", "GuzalBulatova", "aiwalter", "RNKuhns", "AnH0ang"]
__all__ = ["EnsembleForecaster", "AutoEnsembleForecaster"]
import numpy as np
import pandas as pd
from scipy.stats import gmean
from sklearn.pipeline import Pipeline
from aeon.forecasting.base import ForecastingHorizon
from aeon.forecasting.base._meta import _HeterogenousEnsembleForecaster
from aeon.forecasting.model_selection import temporal_train_test_split
from aeon.utils.stats import (
_weighted_geometric_mean,
_weighted_max,
_weighted_median,
_weighted_min,
)
from aeon.utils.validation.forecasting import check_regressor
VALID_AGG_FUNCS = {
"mean": {"unweighted": np.mean, "weighted": np.average},
"median": {"unweighted": np.median, "weighted": _weighted_median},
"min": {"unweighted": np.min, "weighted": _weighted_min},
"max": {"unweighted": np.max, "weighted": _weighted_max},
"gmean": {"unweighted": gmean, "weighted": _weighted_geometric_mean},
}
class AutoEnsembleForecaster(_HeterogenousEnsembleForecaster):
"""Automatically find best weights for the ensembled forecasters.
The AutoEnsembleForecaster finds optimal weights for the ensembled forecasters
using given method or a meta-model (regressor) .
The regressor has to be sklearn-like and needs to have either an attribute
``feature_importances_`` or ``coef_``, as this is used as weights.
Regressor can also be a sklearn.Pipeline.
Parameters
----------
forecasters : list of (str, estimator) tuples
Estimators to apply to the input series.
method : str, optional, default="feature-importance"
Strategy used to compute weights. Available choices:
- feature-importance:
use the ``feature_importances_`` or ``coef_`` from
given ``regressor`` as optimal weights.
- inverse-variance:
use the inverse variance of the forecasting error
(based on the internal train-test-split) to compute optimal
weights, a given ``regressor`` will be omitted.
regressor : sklearn-like regressor, optional, default=None.
Used to infer optimal weights from coefficients (linear models) or from
feature importance scores (decision tree-based models). If None, then
a GradientBoostingRegressor(max_depth=5) is used.
The regressor can also be a sklearn.Pipeline().
test_size : int or float, optional, default=None
Used to do an internal temporal_train_test_split(). The test_size data
will be the endog data of the regressor and it is the most recent data.
The exog data of the regressor are the predictions from the temporarily
trained ensemble models. If None, it will be set to 0.25.
random_state : int, RandomState instance or None, default=None
Used to set random_state of the default regressor.
n_jobs : int or None, optional, default=None
The number of jobs to run in parallel for fit. None means 1 unless
in a joblib.parallel_backend context.
-1 means using all processors.
Attributes
----------
regressor_ : sklearn-like regressor
Fitted regressor.
weights_ : np.array
The weights based on either ``regressor.feature_importances_`` or
``regressor.coef_`` values.
See Also
--------
EnsembleForecaster
Examples
--------
>>> from aeon.forecasting.compose import AutoEnsembleForecaster
>>> from aeon.forecasting.naive import NaiveForecaster
>>> from aeon.forecasting.trend import PolynomialTrendForecaster
>>> from aeon.datasets import load_airline
>>> y = load_airline()
>>> forecasters = [
... ("trend", PolynomialTrendForecaster()),
... ("naive", NaiveForecaster()),
... ]
>>> forecaster = AutoEnsembleForecaster(forecasters=forecasters)
>>> forecaster.fit(y=y, fh=[1,2,3])
AutoEnsembleForecaster(...)
>>> y_pred = forecaster.predict()
"""
_tags = {
"ignores-exogeneous-X": False,
"requires-fh-in-fit": False,
"capability:missing_values": False,
"y_input_type": "univariate",
}
def __init__(
self,
forecasters,
method="feature-importance",
regressor=None,
test_size=None,
random_state=None,
n_jobs=None,
):
super(AutoEnsembleForecaster, self).__init__(
forecasters=forecasters,
n_jobs=n_jobs,
)
self.method = method
self.regressor = regressor
self.test_size = test_size
self.random_state = random_state
def _fit(self, y, X=None, fh=None):
"""Fit to training data.
Parameters
----------
y : pd.Series
Target time series to which to fit the forecaster.
fh : int, list or np.array, optional, default=None
The forecasters horizon with the steps ahead to to predict.
X : pd.DataFrame, optional, default=None
Exogenous variables are ignored.
Returns
-------
self : returns an instance of self.
"""
_, forecasters = self._check_forecasters()
# get training data for meta-model
if X is not None:
y_train, y_test, X_train, X_test = temporal_train_test_split(
y, X, test_size=self.test_size
)
else:
y_train, y_test = temporal_train_test_split(y, test_size=self.test_size)
X_train, X_test = None, None
# fit ensemble models
fh_test = ForecastingHorizon(y_test.index, is_relative=False)
self._fit_forecasters(forecasters, y_train, X_train, fh_test)
if self.method == "feature-importance":
self.regressor_ = check_regressor(
regressor=self.regressor, random_state=self.random_state
)
X_meta = pd.concat(self._predict_forecasters(fh_test, X_test), axis=1)
# fit meta-model (regressor) on predictions of ensemble models
# with y_test as endog/target
self.regressor_.fit(X=X_meta, y=y_test)
# check if regressor is a sklearn.Pipeline
if isinstance(self.regressor_, Pipeline):
# extract regressor from pipeline to access its attributes
self.weights_ = _get_weights(self.regressor_.steps[-1][1])
else:
self.weights_ = _get_weights(self.regressor_)
elif self.method == "inverse-variance":
# get in-sample forecasts
if self.regressor is not None:
Warning(f"regressor will not be used because ${self.method} is set.")
inv_var = np.array(
[
1 / np.var(y_test - y_pred_test)
for y_pred_test in self._predict_forecasters(fh_test, X_test)
]
)
# standardize the inverse variance
self.weights_ = list(inv_var / np.sum(inv_var))
else:
raise NotImplementedError(
f"Given method {self.method} does not exist, "
f"please provide valid method parameter."
)
self._fit_forecasters(forecasters, y, X, fh)
return self
def _predict(self, fh, X=None):
"""Return the predicted reduction.
Parameters
----------
fh : int, list or np.array, optional, default=None
X : pd.DataFrame
Returns
-------
y_pred : pd.Series
Aggregated predictions.
"""
y_pred_df = pd.concat(self._predict_forecasters(fh, X), axis=1)
# apply weights
y_pred = y_pred_df.apply(lambda x: np.average(x, weights=self.weights_), axis=1)
return y_pred
@classmethod
def get_test_params(cls, parameter_set="default"):
"""Return testing parameter settings for the estimator.
Parameters
----------
parameter_set : str, default="default"
Name of the set of test parameters to return, for use in tests. If no
special parameters are defined for a value, will return `"default"` set.
Returns
-------
params : dict or list of dict
"""
from sklearn.linear_model import LinearRegression
from aeon.forecasting.naive import NaiveForecaster
FORECASTER = NaiveForecaster()
params1 = {"forecasters": [("f1", FORECASTER), ("f2", FORECASTER)]}
params2 = {
"forecasters": [("f1", FORECASTER), ("f2", FORECASTER)],
"method": "inverse-variance",
"regressor": LinearRegression(),
"test_size": 0.2,
}
return [params1, params2]
def _get_weights(regressor):
# tree-based models from sklearn which have feature importance values
if hasattr(regressor, "feature_importances_"):
weights = regressor.feature_importances_
# linear regression models from sklearn which have coefficient values
elif hasattr(regressor, "coef_"):
weights = regressor.coef_
else:
raise NotImplementedError(
"""The given regressor is not supported. It must have
either an attribute feature_importances_ or coef_ after fitting."""
)
# avoid ZeroDivisionError if all weights are 0
if weights.sum() == 0:
weights += 1
return list(weights)
class EnsembleForecaster(_HeterogenousEnsembleForecaster):
"""Ensemble of forecasters.
Overview: Input one series of length `n` and EnsembleForecaster performs
fitting and prediction for each estimator passed in `forecasters`. It then
applies `aggfunc` aggregation function by row to the predictions dataframe
and returns final prediction - one series.
Parameters
----------
forecasters : list of (str, estimator) tuples
Estimators to apply to the input series.
n_jobs : int or None, optional, default=None
The number of jobs to run in parallel for fit. None means 1 unless
in a joblib.parallel_backend context.
-1 means using all processors.
aggfunc : str, {'mean', 'median', 'min', 'max'}, default='mean'
The function to aggregate prediction from individual forecasters.
weights : list of floats
Weights to apply in aggregation.
See Also
--------
AutoEnsembleForecaster
Examples
--------
>>> from aeon.forecasting.compose import EnsembleForecaster
>>> from aeon.forecasting.naive import NaiveForecaster
>>> from aeon.forecasting.trend import PolynomialTrendForecaster
>>> from aeon.datasets import load_airline
>>> y = load_airline()
>>> forecasters = [
... ("trend", PolynomialTrendForecaster()),
... ("naive", NaiveForecaster()),
... ]
>>> forecaster = EnsembleForecaster(forecasters=forecasters, weights=[4, 10])
>>> forecaster.fit(y=y, fh=[1,2,3])
EnsembleForecaster(...)
>>> y_pred = forecaster.predict()
"""
_tags = {
"ignores-exogeneous-X": False,
"requires-fh-in-fit": False,
"capability:missing_values": False,
"X_inner_type": ["pd.DataFrame", "pd-multiindex", "pd_multiindex_hier"],
"y_inner_type": ["pd.DataFrame", "pd-multiindex", "pd_multiindex_hier"],
"y_input_type": "both",
}
def __init__(self, forecasters, n_jobs=None, aggfunc="mean", weights=None):
super(EnsembleForecaster, self).__init__(forecasters=forecasters, n_jobs=n_jobs)
self.aggfunc = aggfunc
self.weights = weights
# the ensemble requires fh in fit
# iff any of the component forecasters require fh in fit
self._anytagis_then_set("requires-fh-in-fit", True, False, self.forecasters)
def _fit(self, y, X=None, fh=None):
"""Fit to training data.
Parameters
----------
y : pd.DataFrame - Series, Panel, or Hierarchical mtype format.
Target time series to which to fit the forecaster.
fh : ForecastingHorizon, optional, default=None
The forecasters horizon with the steps ahead to to predict.
X : pd.DataFrame, optional, default=None, must be of same mtype as y
Exogenous data to which to fit the forecaster.
Returns
-------
self : returns an instance of self.
"""
names, forecasters = self._check_forecasters()
self._fit_forecasters(forecasters, y, X, fh)
return self
def _predict(self, fh, X=None):
"""Return the predicted reduction.
Parameters
----------
fh : ForecastingHorizon, optional, default=None
X : pd.DataFrame, optional, default=None, must be of same mtype as y
Exogenous data to which to fit the forecaster.
Returns
-------
y_pred : pd.DataFrame - Series, Panel, or Hierarchical mtype format,
will be of same mtype as y in _fit
Ensembled predictions
"""
names, _ = self._check_forecasters()
y_pred = pd.concat(self._predict_forecasters(fh, X), axis=1, keys=names)
y_pred = y_pred.groupby(level=1, axis=1).agg(
_aggregate, self.aggfunc, self.weights
)
return y_pred
@classmethod
def get_test_params(cls, parameter_set="default"):
"""Return testing parameter settings for the estimator.
Parameters
----------
parameter_set : str, default="default"
Name of the set of test parameters to return, for use in tests. If no
special parameters are defined for a value, will return `"default"` set.
Returns
-------
params : dict or list of dict
"""
from aeon.forecasting.compose._reduce import DirectReductionForecaster
from aeon.forecasting.naive import NaiveForecaster
# univariate case
FORECASTER = NaiveForecaster()
params = [{"forecasters": [("f1", FORECASTER), ("f2", FORECASTER)]}]
# test multivariate case, i.e., ensembling multiple variables at same time
FORECASTER = DirectReductionForecaster.create_test_instance()
params = params + [{"forecasters": [("f1", FORECASTER), ("f2", FORECASTER)]}]
return params
def _aggregate(y, aggfunc, weights):
"""Apply aggregation function by row.
Parameters
----------
y : pd.DataFrame
Multivariate series to transform.
aggfunc : str
Aggregation function used for transformation.
weights : list of floats
Weights to apply in aggregation.
Returns
-------
y_agg: pd.Series
Transformed univariate series.
"""
if weights is None:
aggfunc = _check_aggfunc(aggfunc, weighted=False)
y_agg = aggfunc(y, axis=1)
else:
aggfunc = _check_aggfunc(aggfunc, weighted=True)
y_agg = aggfunc(y, axis=1, weights=np.array(weights))
return pd.Series(y_agg, index=y.index)
def _check_aggfunc(aggfunc, weighted=False):
_weighted = "weighted" if weighted else "unweighted"
if aggfunc not in VALID_AGG_FUNCS.keys():
raise ValueError("Aggregation function %s not recognized." % aggfunc)
return VALID_AGG_FUNCS[aggfunc][_weighted]