-
Notifications
You must be signed in to change notification settings - Fork 89
/
difference.py
389 lines (306 loc) · 12.5 KB
/
difference.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
"""Class to iteratively apply differences to a time series."""
__author__ = ["RNKuhns", "fkiraly"]
__all__ = ["Differencer"]
from typing import Union
import numpy as np
import pandas as pd
from sklearn.utils import check_array
from aeon.datatypes._utilities import get_cutoff, update_data
from aeon.transformations.base import BaseTransformer
from aeon.utils.validation import is_int
def _check_lags(lags):
msg = " ".join(
[
"`lags` should be provided as a positive integer scaler, or",
"a list, tuple or np.ndarray of positive integers,"
f"but found {type(lags)}.",
]
)
non_positive_msg = "`lags` should be positive integers."
if isinstance(lags, int):
if lags <= 0:
raise ValueError(non_positive_msg)
lags = check_array([lags], ensure_2d=False)
elif isinstance(lags, (list, tuple, np.ndarray)):
if not all([is_int(lag) for lag in lags]):
raise TypeError(msg)
lags = check_array(lags, ensure_2d=False)
if (lags <= 0).any():
raise ValueError(non_positive_msg)
else:
raise TypeError(msg)
return lags
def _diff_transform(X: Union[pd.Series, pd.DataFrame], lags: np.array):
"""Perform differencing on Series or DataFrame.
Parameters
----------
X : pd.DataFrame
lags : int or iterable of int, e.g., list of int
Returns
-------
`X` differenced at lags `lags`, always a copy (no reference)
if `lags` is int, applies diff to X at period `lags`
returns X.diff(periods=lag)
if `lags` is list of int, loops over elements from start to end
and applies diff to X at period lags[value], for value in the list `lags`
"""
if isinstance(lags, int):
lags = [lags]
Xt = X
for lag in lags:
# converting lag to int since pandas complains if it's np.int64
Xt = Xt.diff(periods=int(lag))
return Xt
def _diff_to_seq(X: Union[pd.Series, pd.DataFrame], lags: np.array):
"""Difference a series multiple times and return intermediate results.
Parameters
----------
X : pd.DataFrame
lags : int or iterable of int, e.g., list of int
Returns
-------
list, i-th element is _diff_transform(X, lags[0:i])
"""
if X is None:
return None
if isinstance(lags, int):
lags = [lags]
ret = [X]
Xd = X
for lag in lags:
# converting lag to int since pandas complains if it's np.int64
Xd = Xd.diff(periods=int(lag))
ret += [Xd]
return ret
def _shift(ix, periods):
"""Shift pandas index by periods."""
if isinstance(ix, (pd.DatetimeIndex, pd.PeriodIndex, pd.TimedeltaIndex)):
return ix.shift(periods)
else:
return ix + periods
def _inverse_diff(X, lags, X_diff_seq=None):
"""Inverse to difference.
Parameters
----------
X : pd.Series or pd.DataFrame
lags : int or iterable of int, e.g., list of int
X_diff_seq : list of pd.Series or pd.DataFrame
elements must match type, columns and index type of X
length must be equal or longer than length of lags
Returns
-------
`X` inverse differenced at lags `lags`, always a copy (no reference)
if `lags` is int, applies cumsum to X at period `lag`
for i in range(lag), X.iloc[i::lag] = X.iloc[i::lag].cumsum()
if `lags` is list of int, loops over elements from start to end
and applies cumsum to X at period lag[value], for value in the list `lag`
if `X_diff_seq` is provided, uses values stored for indices outside `X` to invert
"""
if isinstance(lags, int):
lags = [lags]
# if lag is numpy, convert to list
if isinstance(lags, (np.ndarray, list, tuple)):
lags = list(lags)
# if lag is a list, recurse
if isinstance(lags, (list, tuple)):
if len(lags) == 0:
return X
lags = lags.copy()
# lag_first = pop last element of lags
lag_last = lags.pop()
# invert last lag index
if X_diff_seq is not None:
X_diff_orig = X_diff_seq[len(lags)]
X_ix_shift = X.index.shift(-lag_last)
X_update = X_diff_orig.loc[X_ix_shift.intersection(X_diff_orig.index)]
X = X.combine_first(X_update)
X_diff_last = X.copy()
if lag_last < 0:
X_diff_last = X_diff_last.iloc[::-1]
abs_lag = abs(lag_last)
for i in range(abs_lag):
X_diff_last.iloc[i::abs_lag] = X_diff_last.iloc[i::abs_lag].cumsum()
if lag_last < 0:
X_diff_last = X_diff_last.iloc[::-1]
# if any more lags, recurse
if len(lags) > 0:
return _inverse_diff(X_diff_last, lags, X_diff_seq=X_diff_seq)
# else return
else:
return X_diff_last
class Differencer(BaseTransformer):
"""Apply iterative differences to a timeseries.
The transformation works for univariate and multivariate timeseries. However,
the multivariate case applies the same differencing to every series.
Difference transformations are applied at the specified lags in the order provided.
For example, given a timeseries with monthly periodicity, using lags=[1, 12]
corresponds to applying a standard first difference to handle trend, and
followed by a seasonal difference (at lag 12) to attempt to account for
seasonal dependence.
To provide a higher-order difference at the same lag list the lag multiple
times. For example, lags=[1, 1] takes iterative first differences like may
be needed for a series that is integrated of order 2.
Parameters
----------
lags : int or array-like, default = 1
The lags used to difference the data.
If a single `int` value is
na_handling : str, optional, default = "fill_zero"
How to handle the NaNs that appear at the start of the series from differencing
Example: there are only 3 differences in a series of length 4,
differencing [a, b, c, d] gives [?, b-a, c-b, d-c]
so we need to determine what happens with the "?" (= unknown value)
"drop_na" - unknown value(s) are dropped, the series is shortened
"keep_na" - unknown value(s) is/are replaced by NaN
"fill_zero" - unknown value(s) is/are replaced by zero
memory : str, optional, default = "all"
how much of previously seen X to remember, for exact reconstruction of inverse
"all" : estimator remembers all X, inverse is correct for all indices seen
"latest" : estimator only remembers latest X necessary for future reconstruction
inverses at any time stamps after fit are correct, but not past time stamps
"none" : estimator does not remember any X, inverse is direct cumsum
Examples
--------
>>> from aeon.transformations.difference import Differencer
>>> from aeon.datasets import load_airline
>>> y = load_airline()
>>> transformer = Differencer(lags=[1, 12])
>>> y_transform = transformer.fit_transform(y)
"""
_tags = {
"input_data_type": "Series",
# what is the scitype of X: Series, or Panel
"output_data_type": "Series",
# what scitype is returned: Primitives, Series, Panel
"instancewise": True, # is this an instance-wise transform?
"X_inner_type": ["pd.DataFrame", "pd.Series"],
# which mtypes do _fit/_predict support for X?
"y_inner_type": "None", # which mtypes do _fit/_predict support for y?
"fit_is_empty": False,
"transform-returns-same-time-index": False,
"univariate-only": False,
"capability:inverse_transform": True,
}
VALID_NA_HANDLING_STR = ["drop_na", "keep_na", "fill_zero"]
def __init__(self, lags=1, na_handling="fill_zero", memory="all"):
self.lags = lags
self.na_handling = self._check_na_handling(na_handling)
self.memory = memory
self._X = None
self._lags = _check_lags(self.lags)
self._cumulative_lags = None
super(Differencer, self).__init__()
# if the na_handling is "fill_zero" or "keep_na"
# then the returned indices are same to the passed indices
if self.na_handling in ["fill_zero", "keep_na"]:
self.set_tags(**{"transform-returns-same-time-index": True})
def _check_na_handling(self, na_handling):
"""Check na_handling parameter, should be a valid string as per docstring."""
if na_handling not in self.VALID_NA_HANDLING_STR:
raise ValueError(
f'invalid na_handling parameter value encountered: "{na_handling}", '
f"na_handling must be one of: {self.VALID_NA_HANDLING_STR}"
)
return na_handling
def _fit(self, X, y=None):
"""
Fit transformer to X and y.
private _fit containing the core logic, called from fit
Parameters
----------
X : pd.Series or pd.DataFrame
Data to fit transform to
y : ignored argument for interface compatibility
Additional data, e.g., labels for transformation
Returns
-------
self: a fitted instance of the estimator
"""
memory = self.memory
lagsum = self._lags.cumsum()[-1]
self._lagsum = lagsum
# remember X or part of X
if memory == "all":
self._X = X
elif memory == "latest":
n_memory = min(len(X), lagsum)
self._X = X.iloc[-n_memory:]
self._freq = get_cutoff(X, return_index=True)
return self
def _check_freq(self, X):
"""Ensure X carries same freq as X seen in _fit."""
if self._freq is not None and hasattr(self._freq, "freq"):
if hasattr(X.index, "freq") and X.index.freq is None:
X.index.freq = self._freq.freq
return X
def _transform(self, X, y=None):
"""Transform X and return a transformed version.
private _transform containing the core logic, called from transform
Parameters
----------
X : pd.Series or pd.DataFrame
Data to be transformed
y : ignored argument for interface compatibility
Additional data, e.g., labels for transformation
Returns
-------
Xt : pd.Series or pd.DataFrame, same type as X
transformed version of X
"""
X_orig_index = X.index
X = update_data(X=self._X, X_new=X)
X = self._check_freq(X)
Xt = _diff_transform(X, self._lags)
Xt = Xt.loc[X_orig_index]
na_handling = self.na_handling
if na_handling == "drop_na":
Xt = Xt.iloc[self._lagsum :]
elif na_handling == "fill_zero":
Xt.iloc[: self._lagsum] = 0
elif na_handling == "keep_na":
pass
else:
raise RuntimeError(
"unreachable condition, invalid na_handling value encountered: "
f"{na_handling}"
)
return Xt
def _inverse_transform(self, X, y=None):
"""Logic used by `inverse_transform` to reverse transformation on `X`.
Parameters
----------
X : pd.Series or pd.DataFrame
Data to be inverse transformed
y : ignored argument for interface compatibility
Additional data, e.g., labels for transformation
Returns
-------
Xt : pd.Series or pd.DataFrame, same type as X
inverse transformed version of X
"""
lags = self._lags
X_diff_seq = _diff_to_seq(self._X, lags)
X = self._check_freq(X)
X_orig_index = X.index
Xt = _inverse_diff(X, lags, X_diff_seq=X_diff_seq)
Xt = Xt.loc[X_orig_index]
return Xt
@classmethod
def get_test_params(cls):
"""Return testing parameter settings for the estimator.
Returns
-------
params : dict or list of dict, default = {}
Parameters to create testing instances of the class
Each dict are parameters to construct an "interesting" test instance, i.e.,
`MyClass(**params)` or `MyClass(**params[i])` creates a valid test instance.
`create_test_instance` uses the first (or only) dictionary in `params`
"""
params = [{"na_handling": x} for x in cls.VALID_NA_HANDLING_STR]
# we're testing that inverse_transform is inverse to transform
# and that is only correct if the first observation is not dropped
# todo: ensure that we have proper tests or escapes for "incomplete inverses"
params = params[1:]
# this removes "drop_na" setting where the inverse has problems
# need to deal with this in a better way in testing
return params