/
split.py
560 lines (468 loc) · 19.9 KB
/
split.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
from functools import lru_cache
from typing import Iterator, List, Optional
import numpy as np
import pandas as pd
from gluonts.core.component import validated
from gluonts.core.exception import GluonTSDateBoundsError
from gluonts.dataset.common import DataEntry
from gluonts.dataset.field_names import FieldName
from ._base import FlatMapTransformation
from .sampler import InstanceSampler, ContinuousTimePointSampler
def shift_timestamp(ts: pd.Timestamp, offset: int) -> pd.Timestamp:
"""
Computes a shifted timestamp.
Basic wrapping around pandas ``ts + offset`` with caching and exception
handling.
"""
return _shift_timestamp_helper(ts, ts.freq, offset)
@lru_cache(maxsize=10000)
def _shift_timestamp_helper(
ts: pd.Timestamp, freq: str, offset: int
) -> pd.Timestamp:
"""
We are using this helper function which explicitly uses the frequency as a
parameter, because the frequency is not included in the hash of a time
stamp.
I.e.
pd.Timestamp(x, freq='1D') and pd.Timestamp(x, freq='1min')
hash to the same value.
"""
try:
# this line looks innocent, but can create a date which is out of
# bounds values over year 9999 raise a ValueError
# values over 2262-04-11 raise a pandas OutOfBoundsDatetime
return ts + offset * ts.freq
except (ValueError, pd._libs.OutOfBoundsDatetime) as ex:
raise GluonTSDateBoundsError(ex)
class InstanceSplitter(FlatMapTransformation):
"""
Selects training instances, by slicing the target and other time series
like arrays at random points in training mode or at the last time point in
prediction mode. Assumption is that all time like arrays start at the same
time point.
The target and each time_series_field is removed and instead two
corresponding fields with prefix `past_` and `future_` are included. E.g.
If the target array is one-dimensional, the resulting instance has shape
(len_target). In the multi-dimensional case, the instance has shape (dim,
len_target).
target -> past_target and future_target
The transformation also adds a field 'past_is_pad' that indicates whether
values where padded or not.
Convention: time axis is always the last axis.
Parameters
----------
target_field
field containing the target
is_pad_field
output field indicating whether padding happened
start_field
field containing the start date of the time series
forecast_start_field
output field that will contain the time point where the forecast starts
train_sampler
instance sampler that provides sampling indices given a time-series
past_length
length of the target seen before making prediction
future_length
length of the target that must be predicted
output_NTC
whether to have time series output in (time, dimension) or in
(dimension, time) layout
time_series_fields
fields that contains time-series, they are split in the same interval
as the target
pick_incomplete
whether training examples can be sampled with only a part of
past_length time-units
present for the time series. This is useful to train models for
cold-start. In such case, is_pad_out contains an indicator whether
data is padded or not.
"""
@validated()
def __init__(
self,
target_field: str,
is_pad_field: str,
start_field: str,
forecast_start_field: str,
train_sampler: InstanceSampler,
past_length: int,
future_length: int,
output_NTC: bool = True,
time_series_fields: Optional[List[str]] = None,
pick_incomplete: bool = True,
) -> None:
assert future_length > 0
self.train_sampler = train_sampler
self.past_length = past_length
self.future_length = future_length
self.output_NTC = output_NTC
self.ts_fields = (
time_series_fields if time_series_fields is not None else []
)
self.target_field = target_field
self.is_pad_field = is_pad_field
self.start_field = start_field
self.forecast_start_field = forecast_start_field
self.pick_incomplete = pick_incomplete
def _past(self, col_name):
return f"past_{col_name}"
def _future(self, col_name):
return f"future_{col_name}"
def flatmap_transform(
self, data: DataEntry, is_train: bool
) -> Iterator[DataEntry]:
pl = self.future_length
slice_cols = self.ts_fields + [self.target_field]
target = data[self.target_field]
len_target = target.shape[-1]
minimum_length = (
self.future_length
if self.pick_incomplete
else self.past_length + self.future_length
)
if is_train:
sampling_bounds = (
(0, len_target - self.future_length)
if self.pick_incomplete
else (self.past_length, len_target - self.future_length)
)
# We currently cannot handle time series that are
# too short during training, so we just skip these.
# If we want to include them we would need to pad and to
# mask the loss.
sampled_indices = (
np.array([], dtype=int)
if len_target < minimum_length
else self.train_sampler(target, *sampling_bounds)
)
else:
assert self.pick_incomplete or len_target >= self.past_length
sampled_indices = np.array([len_target], dtype=int)
for i in sampled_indices:
pad_length = max(self.past_length - i, 0)
if not self.pick_incomplete:
assert (
pad_length == 0
), f"pad_length should be zero, got {pad_length}"
d = data.copy()
for ts_field in slice_cols:
if i > self.past_length:
# truncate to past_length
past_piece = d[ts_field][..., i - self.past_length : i]
elif i < self.past_length:
pad_block = np.zeros(
d[ts_field].shape[:-1] + (pad_length,),
dtype=d[ts_field].dtype,
)
past_piece = np.concatenate(
[pad_block, d[ts_field][..., :i]], axis=-1
)
else:
past_piece = d[ts_field][..., :i]
d[self._past(ts_field)] = past_piece
d[self._future(ts_field)] = d[ts_field][..., i : i + pl]
del d[ts_field]
pad_indicator = np.zeros(self.past_length)
if pad_length > 0:
pad_indicator[:pad_length] = 1
if self.output_NTC:
for ts_field in slice_cols:
d[self._past(ts_field)] = d[
self._past(ts_field)
].transpose()
d[self._future(ts_field)] = d[
self._future(ts_field)
].transpose()
d[self._past(self.is_pad_field)] = pad_indicator
d[self.forecast_start_field] = shift_timestamp(
d[self.start_field], i
)
yield d
class CanonicalInstanceSplitter(FlatMapTransformation):
"""
Selects instances, by slicing the target and other time series
like arrays at random points in training mode or at the last time point in
prediction mode. Assumption is that all time like arrays start at the same
time point.
In training mode, the returned instances contain past_`target_field`
as well as past_`time_series_fields`.
In prediction mode, one can set `use_prediction_features` to get
future_`time_series_fields`.
If the target array is one-dimensional, the `target_field` in the resulting instance has shape
(`instance_length`). In the multi-dimensional case, the instance has shape (`dim`, `instance_length`),
where `dim` can also take a value of 1.
In the case of insufficient number of time series values, the
transformation also adds a field 'past_is_pad' that indicates whether
values where padded or not, and the value is padded with
`default_pad_value` with a default value 0.
This is done only if `allow_target_padding` is `True`,
and the length of `target` is smaller than `instance_length`.
Parameters
----------
target_field
fields that contains time-series
is_pad_field
output field indicating whether padding happened
start_field
field containing the start date of the time series
forecast_start_field
field containing the forecast start date
instance_sampler
instance sampler that provides sampling indices given a time-series
instance_length
length of the target seen before making prediction
output_NTC
whether to have time series output in (time, dimension) or in
(dimension, time) layout
time_series_fields
fields that contains time-series, they are split in the same interval
as the target
allow_target_padding
flag to allow padding
pad_value
value to be used for padding
use_prediction_features
flag to indicate if prediction range features should be returned
prediction_length
length of the prediction range, must be set if
use_prediction_features is True
"""
@validated()
def __init__(
self,
target_field: str,
is_pad_field: str,
start_field: str,
forecast_start_field: str,
instance_sampler: InstanceSampler,
instance_length: int,
output_NTC: bool = True,
time_series_fields: List[str] = [],
allow_target_padding: bool = False,
pad_value: float = 0.0,
use_prediction_features: bool = False,
prediction_length: Optional[int] = None,
) -> None:
self.instance_sampler = instance_sampler
self.instance_length = instance_length
self.output_NTC = output_NTC
self.dynamic_feature_fields = time_series_fields
self.target_field = target_field
self.allow_target_padding = allow_target_padding
self.pad_value = pad_value
self.is_pad_field = is_pad_field
self.start_field = start_field
self.forecast_start_field = forecast_start_field
assert (
not use_prediction_features or prediction_length is not None
), "You must specify `prediction_length` if `use_prediction_features`"
self.use_prediction_features = use_prediction_features
self.prediction_length = prediction_length
def _past(self, col_name):
return f"past_{col_name}"
def _future(self, col_name):
return f"future_{col_name}"
def flatmap_transform(
self, data: DataEntry, is_train: bool
) -> Iterator[DataEntry]:
ts_fields = self.dynamic_feature_fields + [self.target_field]
ts_target = data[self.target_field]
len_target = ts_target.shape[-1]
if is_train:
if len_target < self.instance_length:
sampling_indices = (
# Returning [] for all time series will cause this to be in loop forever!
[len_target]
if self.allow_target_padding
else []
)
else:
sampling_indices = self.instance_sampler(
ts_target, self.instance_length, len_target
)
else:
sampling_indices = [len_target]
for i in sampling_indices:
d = data.copy()
pad_length = max(self.instance_length - i, 0)
# update start field
d[self.start_field] = shift_timestamp(
data[self.start_field], i - self.instance_length
)
# set is_pad field
is_pad = np.zeros(self.instance_length)
if pad_length > 0:
is_pad[:pad_length] = 1
d[self.is_pad_field] = is_pad
# update time series fields
for ts_field in ts_fields:
full_ts = data[ts_field]
if pad_length > 0:
pad_pre = self.pad_value * np.ones(
shape=full_ts.shape[:-1] + (pad_length,)
)
past_ts = np.concatenate(
[pad_pre, full_ts[..., :i]], axis=-1
)
else:
past_ts = full_ts[..., (i - self.instance_length) : i]
past_ts = past_ts.transpose() if self.output_NTC else past_ts
d[self._past(ts_field)] = past_ts
if self.use_prediction_features and not is_train:
if not ts_field == self.target_field:
future_ts = full_ts[
..., i : i + self.prediction_length
]
future_ts = (
future_ts.transpose()
if self.output_NTC
else future_ts
)
d[self._future(ts_field)] = future_ts
del d[ts_field]
d[self.forecast_start_field] = shift_timestamp(
d[self.start_field], self.instance_length
)
yield d
class ContinuousTimeInstanceSplitter(FlatMapTransformation):
"""
Selects training instances by slicing "intervals" from a continous-time
process instantiation. Concretely, the input data is expected to describe an
instantiation from a point (or jump) process, with the "target"
identifying inter-arrival times and other features (marks), as described
in detail below.
The splitter will then take random points in continuous time from each
given observation, and return a (variable-length) array of points in
the past (context) and the future (prediction) intervals.
The transformation is analogous to its discrete counterpart
`InstanceSplitter` except that
- It does not allow "incomplete" records. That is, the past and future
intervals sampled are always complete
- Outputs a (T, C) layout.
- Does not accept `time_series_fields` (i.e., only accepts target fields) as these
would typically not be available in TPP data.
The target arrays are expected to have (2, T) layout where the first axis
corresponds to the (i) interarrival times between consecutive points, in
order and (ii) integer identifiers of marks (from {0, 1, ..., :code:`num_marks`}).
The returned arrays will have (T, 2) layout.
For example, the array below corresponds to a target array where points with timestamps
0.5, 1.1, and 1.5 were observed belonging to categories (marks) 3, 1 and 0
respectively: :code:`[[0.5, 0.6, 0.4], [3, 1, 0]]`.
Parameters
----------
past_interval_length
length of the interval seen before making prediction
future_interval_length
length of the interval that must be predicted
train_sampler
instance sampler that provides sampling indices given a time-series
target_field
field containing the target
start_field
field containing the start date of the of the point process observation
end_field
field containing the end date of the point process observation
forecast_start_field
output field that will contain the time point where the forecast starts
"""
def __init__(
self,
past_interval_length: float,
future_interval_length: float,
train_sampler: ContinuousTimePointSampler,
target_field: str = FieldName.TARGET,
start_field: str = FieldName.START,
end_field: str = "end",
forecast_start_field: str = FieldName.FORECAST_START,
) -> None:
assert (
future_interval_length > 0
), "Prediction interval must have length greater than 0."
self.train_sampler = train_sampler
self.past_interval_length = past_interval_length
self.future_interval_length = future_interval_length
self.target_field = target_field
self.start_field = start_field
self.end_field = end_field
self.forecast_start_field = forecast_start_field
# noinspection PyMethodMayBeStatic
def _mask_sorted(self, a: np.ndarray, lb: float, ub: float):
start = np.searchsorted(a, lb)
end = np.searchsorted(a, ub)
return np.arange(start, end)
def flatmap_transform(
self, data: DataEntry, is_train: bool
) -> Iterator[DataEntry]:
assert data[self.start_field].freq == data[self.end_field].freq
total_interval_length = (
data[self.end_field] - data[self.start_field]
) / data[self.start_field].freq.delta
# sample forecast start times in continuous time
if is_train:
if total_interval_length < (
self.future_interval_length + self.past_interval_length
):
sampling_times: np.ndarray = np.array([])
else:
sampling_times = self.train_sampler(
self.past_interval_length,
total_interval_length - self.future_interval_length,
)
else:
sampling_times = np.array([total_interval_length])
ia_times = data[self.target_field][0, :]
marks = data[self.target_field][1:, :]
ts = np.cumsum(ia_times)
assert ts[-1] < total_interval_length, (
"Target interarrival times provided are inconsistent with "
"start and end timestamps."
)
# select field names that will be included in outputs
keep_cols = {
k: v
for k, v in data.items()
if k not in [self.target_field, self.start_field, self.end_field]
}
for future_start in sampling_times:
r: DataEntry = dict()
past_start = future_start - self.past_interval_length
future_end = future_start + self.future_interval_length
assert past_start >= 0
past_mask = self._mask_sorted(ts, past_start, future_start)
past_ia_times = np.diff(np.r_[0, ts[past_mask] - past_start])[
np.newaxis
]
r[f"past_{self.target_field}"] = np.concatenate(
[past_ia_times, marks[:, past_mask]], axis=0
).transpose()
r["past_valid_length"] = np.array([len(past_mask)])
r[self.forecast_start_field] = (
data[self.start_field]
+ data[self.start_field].freq.delta * future_start
)
if is_train: # include the future only if is_train
assert future_end <= total_interval_length
future_mask = self._mask_sorted(ts, future_start, future_end)
future_ia_times = np.diff(
np.r_[0, ts[future_mask] - future_start]
)[np.newaxis]
r[f"future_{self.target_field}"] = np.concatenate(
[future_ia_times, marks[:, future_mask]], axis=0
).transpose()
r["future_valid_length"] = np.array([len(future_mask)])
# include other fields
r.update(keep_cols.copy())
yield r