-
Notifications
You must be signed in to change notification settings - Fork 38
/
regressor.py
378 lines (324 loc) · 14.6 KB
/
regressor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
import logging
import tempfile
from datetime import timedelta
from typing import Any, Dict, List, Optional, Union
import numpy
import pandas
from sklearn.base import BaseEstimator, RegressorMixin
from hts import defaults
from hts import model as hts_models
from hts._t import ExogT, MethodT, ModelT, NodesT, TimeSeriesModelT, Transform
from hts.core.exceptions import InvalidArgumentException, MissingRegressorException
from hts.core.result import HTSResult
from hts.core.utils import _do_fit, _do_predict, _model_mapping_to_iterable
from hts.functions import to_sum_mat
from hts.hierarchy import HierarchyTree
from hts.hierarchy.utils import make_iterable
from hts.model.base import TimeSeriesModel
from hts.revision import RevisionMethod
from hts.utilities.distribution import DistributorBaseClass
logger = logging.getLogger(__name__)
class HTSRegressor(BaseEstimator, RegressorMixin):
"""
Main regressor class for scikit-hts. Likely the only import you'll need for using
this project. It takes a pandas dataframe, the nodes specifying the hierarchies, model kind, revision
method, and a few other parameters. See Examples to get an idea of how to use it.
Attributes
----------
transform : Union[NamedTuple[str, Callable], bool]
Function transform to be applied to input and outputs. If True, it will use ``scipy.stats.boxcox``
and ``scipy.special._ufuncs.inv_boxcox`` on input and output data
sum_mat : array_like
The summing matrix, explained in depth in `Forecasting <https://otexts.com/fpp2/gts.html>`_
nodes : Dict[str, List[str]]
Nodes representing node, edges of the hierarchy. Keys are nodes, values are list of edges.
df : pandas.DataFrame
The dataframe containing the nodes and edges specified above
revision_method : str
One of: ``"OLS", "WLSS", "WLSV", "FP", "PHA", "AHP", "BU", "NONE"``
models : dict
Dictionary that holds the trained models
mse : dict
Dictionary that holds the mse scores for the trained models
residuals : dict
Dictionary that holds the mse residual for the trained models
forecasts : dict
Dictionary that holds the forecasts for the trained models
model_instance : TimeSeriesModel
Reference to the class implementing the actual time series model
"""
def __init__(
self,
model: str = defaults.MODEL,
revision_method: str = defaults.REVISION,
transform: Optional[Union[Transform, bool]] = False,
n_jobs: int = defaults.N_PROCESSES,
low_memory: bool = defaults.LOW_MEMORY,
**kwargs: Any,
):
"""
Parameters
----------
model : str
One of the models supported by ``hts``. These can be found
revision_method : str
The revision method to be used. One of: ``"OLS", "WLSS", "WLSV", "FP", "PHA", "AHP", "BU", "NONE"``
transform : Boolean or NamedTuple
If True, ``scipy.stats.boxcox`` and ``scipy.special._ufuncs.inv_boxcox`` will be applied prior and after
fitting.
If False (default), no transform is applied.
If you desired to use custom functions, use a NamedTuple like:
.. highlight:: python
.. code-block:: python
from collections import namedtuple
Transform = namedtuple('Transform', ['func', 'inv_func']
transform = Transform(func=numpy.exp, inv_func=numpy.log)
ht = HTSRegressor(transform=transform, ...)
The signatures for the ``func`` as well as ``inv_func`` parameters must both be
``Callable[[numpy.ndarry], numpy.ndarray]``, i.e. they must take an array and return an array, both of equal
dimensions
n_jobs : int
Number of parallel jobs to run the forecasting on
low_memory : Bool
If True, models will be fit, serialized, and released from memory. Usually a good idea if
you are dealing with a large amount of nodes
kwargs
Keyword arguments to be passed to the underlying model to be instantiated
"""
self.model = model
self.method: str = revision_method
self.n_jobs: int = n_jobs
self.low_memory: bool = low_memory
if self.low_memory:
self.tmp_dir: Optional[str] = tempfile.mkdtemp(prefix="hts_")
else:
self.tmp_dir = None
self.transform = transform
self.sum_mat: Optional[numpy.ndarray] = None
self.nodes: Optional[NodesT] = None
self.model_instance: Optional[TimeSeriesModelT] = None
self.exogenous: bool = False
self.revision_method: Optional[RevisionMethod] = None
self.hts_result: HTSResult = HTSResult()
self.model_args = kwargs
def __init_hts(
self,
nodes: Optional[NodesT] = None,
df: Optional[pandas.DataFrame] = None,
tree: Optional[HierarchyTree] = None,
root: str = "root",
exogenous: Optional[List[str]] = None,
):
if not nodes and not df:
if not tree:
raise InvalidArgumentException(
"Either nodes and df must be passed, or a pre-built hierarchy tree"
)
else:
self.nodes = tree
else:
self.nodes = HierarchyTree.from_nodes(
nodes=nodes, df=df, exogenous=exogenous, root=root
)
self.exogenous = exogenous
self.sum_mat, sum_mat_labels = to_sum_mat(self.nodes)
self._set_model_instance()
self._init_revision()
def _init_revision(self):
self.revision_method = RevisionMethod(
sum_mat=self.sum_mat, transformer=self.transform, name=self.method
)
def _set_model_instance(self):
try:
self.model_instance = hts_models.MODEL_MAPPING[self.model]
except KeyError:
raise InvalidArgumentException(
f'Model {self.model} not valid. Pick one of: {" ".join(ModelT.names())}'
)
def fit(
self,
df: Optional[pandas.DataFrame] = None,
nodes: Optional[NodesT] = None,
tree: Optional[HierarchyTree] = None,
exogenous: Optional[ExogT] = None,
root: str = "total",
distributor: Optional[DistributorBaseClass] = None,
disable_progressbar=defaults.DISABLE_PROGRESSBAR,
show_warnings=defaults.SHOW_WARNINGS,
**fit_kwargs: Any,
) -> "HTSRegressor":
"""
Fit hierarchical model to dataframe containing hierarchical data as specified in the ``nodes`` parameter.
Exogenous can also be passed as a dict of (string, list), where string is the specific node key and the list
contains the names of the columns to be used as exogenous variables for that node.
Alternatively, a pre-built HierarchyTree can be passed without specifying the node and df. See more at
:class:`hts.hierarchy.HierarchyTree`
Parameters
----------
df : pandas.DataFrame
A Dataframe of time series with a DateTimeIndex. Each column represents a node in the hierarchy. Ignored if
tree argument is passed
nodes : Dict[str, List[str]]
The hierarchy defined as a dict of (string, list), as specified in
:py:func:`HierarchyTree.from_nodes <hts.hierarchy.HierarchyTree.from_nodes>`
tree : HierarchyTree
A pre-built HierarchyTree. Ignored if df and nodes are passed, as the tree will be built from thise
distributor : Optional[DistributorBaseClass]
A distributor, for parallel/distributed processing
exogenous : Dict[str, List[str]] or None
Node key mapping to columns that contain the exogenous variable for that node
root : str
The name of the root node
disable_progressbar : Bool
Disable or enable progressbar
show_warnings : Bool
Disable warnings
fit_kwargs : Any
Any arguments to be passed to the underlying forecasting model's fit function
Returns
-------
HTSRegressor
The fitted HTSRegressor instance
"""
self.__init_hts(nodes=nodes, df=df, tree=tree, root=root, exogenous=exogenous)
nodes = make_iterable(self.nodes, prop=None)
fit_function_kwargs = {
"fit_kwargs": fit_kwargs,
"low_memory": self.low_memory,
"tmp_dir": self.tmp_dir,
"model_instance": self.model_instance,
"model_args": self.model_args,
"transform": self.transform,
}
fitted_models = _do_fit(
nodes=nodes,
function_kwargs=fit_function_kwargs,
n_jobs=self.n_jobs,
disable_progressbar=disable_progressbar,
show_warnings=show_warnings,
distributor=distributor,
)
for model in fitted_models:
if isinstance(model, tuple):
self.hts_result.models = model
else:
self.hts_result.models = (model.node.key, model)
return self
def __validate_exogenous(
self, exogenous_df: pandas.DataFrame
) -> Optional[pandas.DataFrame]:
if exogenous_df is not None:
if self.model not in [ModelT.prophet.value, ModelT.auto_arima.value]:
logger.warning(
"Providing `exogenous_df` with a model that is not `prophet` or `auto_arima` has no effect"
)
if self.exogenous and exogenous_df is None:
raise MissingRegressorException(
"Exogenous variables were provided at fit step, hence are required at "
"predict step. Please pass the 'exogenous_df' variable to predict "
"function"
)
return exogenous_df
def __validate_steps_ahead(
self, exogenous_df: pandas.DataFrame, steps_ahead: int
) -> int:
if exogenous_df is None and not steps_ahead:
logger.info(
"No arguments passed for 'steps_ahead', defaulting to predicting 1-step-ahead"
)
steps_ahead = 1
elif exogenous_df is not None:
steps_ahead = len(exogenous_df)
for node in make_iterable(self.nodes, prop=None):
exog_cols = node.exogenous
try:
_ = exogenous_df[exog_cols]
except KeyError:
raise MissingRegressorException(
f"Node {node.key} has as exogenous variables {node.exogenous} but "
f"these columns were not found in 'exogenous_df'"
)
return steps_ahead
def predict(
self,
exogenous_df: pandas.DataFrame = None,
steps_ahead: int = None,
distributor: Optional[DistributorBaseClass] = None,
disable_progressbar: bool = defaults.DISABLE_PROGRESSBAR,
show_warnings: bool = defaults.SHOW_WARNINGS,
**predict_kwargs,
) -> pandas.DataFrame:
"""
Parameters
----------
distributor : Optional[DistributorBaseClass]
A distributor, for parallel/distributed processing
disable_progressbar : Bool
Disable or enable progressbar
show_warnings : Bool
Disable warnings
predict_kwargs : Any
Any arguments to be passed to the underlying forecasting model's predict function
exogenous_df : pandas.DataFrame
A dataframe of length == steps_ahead containing the exogenous data for each of the nodes.
Only required when using ``prophet`` or ``auto_arima`` models. See
`fbprophet's additional regression docs <https://facebook.github.io/prophet/docs/seasonality,_holiday_effects,_and_regressors.html#additional-regressors>`_
and
`AutoARIMA's exogenous handling docs <https://alkaline-ml.com/pmdarima/modules/generated/pmdarima.arima.AutoARIMA.html>`_
for more information.
Other models do not require additional regressors at predict time.
steps_ahead : int
The number of forecasting steps for which to produce a forecast
Returns
-------
Revised Forecasts, as a pandas.DataFrame in the same format as the one passed for fitting, extended by `steps_ahead`
time steps`
"""
exogenous_df = self.__validate_exogenous(exogenous_df)
steps_ahead = self.__validate_steps_ahead(
exogenous_df=exogenous_df, steps_ahead=steps_ahead
)
if exogenous_df is not None:
predict_kwargs["exogenous_df"] = exogenous_df
predict_function_kwargs = {
"fit_kwargs": predict_kwargs,
"steps_ahead": steps_ahead,
"low_memory": self.low_memory,
"tmp_dir": self.tmp_dir,
"predict_kwargs": predict_kwargs,
}
fit_models = _model_mapping_to_iterable(self.hts_result.models, self.nodes)
results = _do_predict(
models=fit_models,
function_kwargs=predict_function_kwargs,
n_jobs=self.n_jobs,
disable_progressbar=disable_progressbar,
show_warnings=show_warnings,
distributor=distributor,
)
for key, forecast, error, residual in results:
self.hts_result.forecasts = (key, forecast)
self.hts_result.errors = (key, error)
self.hts_result.residuals = (key, residual)
return self._revise(steps_ahead=steps_ahead)
def _revise(self, steps_ahead: int = 1) -> pandas.DataFrame:
logger.info(f"Reconciling forecasts using {self.revision_method}")
revised = self.revision_method.revise(
forecasts=self.hts_result.forecasts,
mse=self.hts_result.errors,
nodes=self.nodes,
)
revised_columns = list(make_iterable(self.nodes))
revised_index = self._get_predict_index(steps_ahead=steps_ahead)
return pandas.DataFrame(revised, index=revised_index, columns=revised_columns)
def _get_predict_index(self, steps_ahead=1) -> Any:
freq = getattr(self.nodes.item.index, "freq", 1) or 1
try:
start = self.nodes.item.index[-1] + timedelta(freq)
end = self.nodes.item.index[-1] + timedelta(steps_ahead * freq)
future = pandas.date_range(start=start, end=end)
except TypeError:
start = self.nodes.item.index[-1] + freq
end = self.nodes.item.index[-1] + (steps_ahead * freq)
future = pandas.date_range(freq=freq, start=start, end=end)
return self.nodes.item.index.append(future)