-
Notifications
You must be signed in to change notification settings - Fork 89
/
_online_ensemble.py
129 lines (102 loc) · 4.06 KB
/
_online_ensemble.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# -*- coding: utf-8 -*-
# !/usr/bin/env python3 -u
# copyright: aeon developers, BSD-3-Clause License (see LICENSE file)
"""Implements framework for applying online ensembling algorithms to forecasters."""
__author__ = ["magittan, mloning"]
import numpy as np
import pandas as pd
from aeon.forecasting.compose._ensemble import EnsembleForecaster
class OnlineEnsembleForecaster(EnsembleForecaster):
"""Online Updating Ensemble of forecasters.
Parameters
----------
ensemble_algorithm : ensemble algorithm
forecasters : list of (str, estimator) tuples
n_jobs : int or None, optional (default=None)
The number of jobs to run in parallel for fit. None means 1 unless
in a joblib.parallel_backend context.
-1 means using all processors.
"""
_tags = {
"ignores-exogeneous-X": True,
"requires-fh-in-fit": False,
"capability:missing_values": False,
"y_inner_mtype": ["pd.Series"],
"scitype:y": "univariate",
}
def __init__(self, forecasters, ensemble_algorithm=None, n_jobs=None):
self.n_jobs = n_jobs
self.ensemble_algorithm = ensemble_algorithm
super(EnsembleForecaster, self).__init__(forecasters=forecasters, n_jobs=n_jobs)
def _fit(self, y, X=None, fh=None):
"""Fit to training data.
Parameters
----------
y : pd.Series
Target time series to which to fit the forecaster.
fh : int, list or np.array, optional (default=None)
The forecasters horizon with the steps ahead to to predict.
X : pd.DataFrame, optional (default=None)
Exogenous variables are ignored
Returns
-------
self : returns an instance of self.
"""
names, forecasters = self._check_forecasters()
self.weights = np.ones(len(forecasters)) / len(forecasters)
self._fit_forecasters(forecasters, y, X, fh)
return self
def _fit_ensemble(self, y, X=None):
"""Fit the ensemble.
This makes predictions with individual forecasters and compares the
results to actual values. This is then used to update ensemble
weights.
Parameters
----------
y : pd.Series
Target time series to which to fit the forecaster.
X : pd.DataFrame, optional (default=None)
Exogenous variables are ignored
"""
fh = np.arange(len(y)) + 1
estimator_predictions = np.column_stack(self._predict_forecasters(fh, X))
y = np.array(y)
self.ensemble_algorithm.update(estimator_predictions.T, y)
def _update(self, y, X=None, update_params=False):
"""Update fitted paramters and performs a new ensemble fit.
Parameters
----------
y : pd.Series
X : pd.DataFrame
update_params : bool, optional (default=False)
Returns
-------
self : an instance of self
"""
if len(y) >= 1 and self.ensemble_algorithm is not None:
self._fit_ensemble(y, X)
for forecaster in self.forecasters_:
forecaster.update(y, X, update_params=update_params)
return self
def _predict(self, fh=None, X=None):
if self.ensemble_algorithm is not None:
self.weights = self.ensemble_algorithm.weights
return (pd.concat(self._predict_forecasters(fh, X), axis=1) * self.weights).sum(
axis=1
)
@classmethod
def get_test_params(cls, parameter_set="default"):
"""Return testing parameter settings for the estimator.
Parameters
----------
parameter_set : str, default="default"
Name of the set of test parameters to return, for use in tests. If no
special parameters are defined for a value, will return `"default"` set.
Returns
-------
params : dict or list of dict
"""
from aeon.forecasting.naive import NaiveForecaster
FORECASTER = NaiveForecaster()
params = {"forecasters": [("f1", FORECASTER), ("f2", FORECASTER)]}
return params