-
Notifications
You must be signed in to change notification settings - Fork 89
/
_cboss.py
474 lines (400 loc) · 17.3 KB
/
_cboss.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
"""ContractableBOSS classifier.
Dictionary based cBOSS classifier based on SFA transform. Improves the ensemble
structure of the original BOSS algorithm.
"""
__author__ = ["MatthewMiddlehurst", "BINAYKUMAR943"]
__all__ = ["ContractableBOSS", "pairwise_distances"]
import math
import time
import numpy as np
from sklearn.utils import check_random_state
from aeon.classification.base import BaseClassifier
from aeon.classification.dictionary_based import IndividualBOSS
from aeon.classification.dictionary_based._boss import pairwise_distances
from aeon.utils.validation.panel import check_X_y
class ContractableBOSS(BaseClassifier):
"""
Contractable Bag of Symbolic Fourier Approximation Symbols (cBOSS).
Implementation of BOSS Ensemble from [1]_ with refinements
described in [2]_.
Overview: Input "n" series of length "m" and cBOSS randomly samples
`n_parameter_samples` parameter sets, evaluting each with LOOCV. It then
retains `max_ensemble_size` classifiers with the highest accuracy
There are three primary parameters:
- alpha: alphabet size
- w: window length
- l: word length.
For any combination, a single BOSS slides a window length "w" along the
series. The "w" length window is shortened to an "l" length word by
taking a Fourier transform and keeping the first l/2 complex coefficients.
These "l" coefficients are then discretised into "alpha" possible values,
to form a word length "l". A histogram of words for each
series is formed and stored.
Fit involves finding "n" histograms.
Predict uses 1 nearest neighbor with a bespoke BOSS distance function.
Parameters
----------
n_parameter_samples : int, default = 250
If search is randomised, number of parameter combos to try.
max_ensemble_size : int or None, default = 50
Maximum number of classifiers to retain. Will limit number of retained
classifiers even if more than `max_ensemble_size` are within threshold.
max_win_len_prop : int or float, default = 1
Maximum window length as a proportion of the series length.
min_window : int, default = 10
Minimum window size.
time_limit_in_minutes : int, default = 0
Time contract to limit build time in minutes. Default of 0 means no limit.
contract_max_n_parameter_samples : int, default=np.inf
Max number of parameter combinations to consider when time_limit_in_minutes is
set.
save_train_predictions : bool, default=False
Save the ensemble member train predictions in fit for use in _get_train_probs
leave-one-out cross-validation.
n_jobs : int, default = 1
The number of jobs to run in parallel for both `fit` and `predict`.
``-1`` means using all processors.
feature_selection : str, default = "none"
Sets the feature selections strategy to be used. One of {"chi2", "none",
"random"}. "chi2" reduces the number of words significantly and is thus much
faster (preferred). Random also reduces the number significantly. None
applies not feature selectiona and yields large bag of words, e.g. much
memory may be needed.
random_state : int or None, default=None
Seed for random integer.
Attributes
----------
n_classes_ : int
Number of classes. Extracted from the data.
classes_ : list
The classes labels.
n_instances_ : int
Number of instances. Extracted from the data.
n_estimators_ : int
The final number of classifiers used. Will be <= `max_ensemble_size` if
`max_ensemble_size` has been specified.
series_length_ : int
Length of all series (assumed equal).
estimators_ : list
List of DecisionTree classifiers.
weights_ :
Weight of each classifier in the ensemble.
See Also
--------
BOSSEnsemble, IndividualBOSS
Variants of the BOSS classifier.
Notes
-----
For the Java version, see
`TSML <https://github.com/uea-machine-learning/tsml/blob/master/src/main/java/
tsml/classifiers/dictionary_based/cBOSS.java>`_.
References
----------
.. [1] Patrick Schäfer, "The BOSS is concerned with time series classification
in the presence of noise", Data Mining and Knowledge Discovery, 29(6): 2015
https://link.springer.com/article/10.1007/s10618-014-0377-7
.. [2] Matthew Middlehurst, William Vickers and Anthony Bagnall
"Scalable Dictionary Classifiers for Time Series Classification",
in proc 20th International Conference on Intelligent Data Engineering
and Automated Learning,LNCS, volume 11871
https://link.springer.com/chapter/10.1007/978-3-030-33607-3_2
Examples
--------
>>> from aeon.classification.dictionary_based import ContractableBOSS
>>> from aeon.datasets import load_unit_test
>>> X_train, y_train = load_unit_test(split="train", return_X_y=True)
>>> X_test, y_test = load_unit_test(split="test", return_X_y=True)
>>> clf = ContractableBOSS(n_parameter_samples=10, max_ensemble_size=3)
>>> clf.fit(X_train, y_train)
ContractableBOSS(...)
>>> y_pred = clf.predict(X_test)
"""
_tags = {
"capability:train_estimate": True,
"capability:contractable": True,
"capability:multithreading": True,
"algorithm_type": "dictionary",
}
def __init__(
self,
n_parameter_samples=250,
max_ensemble_size=50,
max_win_len_prop=1,
min_window=10,
time_limit_in_minutes=0.0,
contract_max_n_parameter_samples=np.inf,
save_train_predictions=False,
feature_selection="none",
n_jobs=1,
random_state=None,
):
self.n_parameter_samples = n_parameter_samples
self.max_ensemble_size = max_ensemble_size
self.max_win_len_prop = max_win_len_prop
self.min_window = min_window
self.time_limit_in_minutes = time_limit_in_minutes
self.contract_max_n_parameter_samples = contract_max_n_parameter_samples
self.save_train_predictions = save_train_predictions
self.n_jobs = n_jobs
self.random_state = random_state
self.feature_selection = feature_selection
self.estimators_ = []
self.weights_ = []
self.n_estimators_ = 0
self.series_length_ = 0
self.n_instances_ = 0
self._weight_sum = 0
self._word_lengths = [16, 14, 12, 10, 8]
self._norm_options = [True, False]
self._alphabet_size = 4
super(ContractableBOSS, self).__init__()
def _fit(self, X, y):
"""Fit a cBOSS ensemble on cases (X,y), where y is the target variable.
Build an ensemble of BOSS classifiers from the training set (X,
y), through randomising over the para space to make a fixed size
ensemble of the best.
Parameters
----------
X : 3D np.ndarray
The training data shape = (n_instances, n_channels, n_timepoints).
y : 1D np.ndarray
The training labels, shape = (n_instances).
Returns
-------
self :
Reference to self.
Notes
-----
Changes state by creating a fitted model that updates attributes
ending in "_" and sets is_fitted flag to True.
"""
time_limit = self.time_limit_in_minutes * 60
self.n_instances_, _, self.series_length_ = X.shape
self.estimators_ = []
self.weights_ = []
# Window length parameter space dependent on series length
max_window_searches = self.series_length_ / 4
max_window = int(self.series_length_ * self.max_win_len_prop)
win_inc = int((max_window - self.min_window) / max_window_searches)
win_inc = max(win_inc, 1)
if self.min_window > max_window + 1:
raise ValueError(
f"Error in ContractableBOSS, min_window ="
f"{self.min_window} is bigger"
f" than max_window ={max_window}."
f" Try set min_window to be smaller than series length in "
f"the constructor, but the classifier may not work at "
f"all with very short series"
)
possible_parameters = self._unique_parameters(max_window, win_inc)
num_classifiers = 0
start_time = time.time()
train_time = 0
subsample_size = int(self.n_instances_ * 0.7)
lowest_acc = 1
lowest_acc_idx = 0
rng = check_random_state(self.random_state)
if time_limit > 0:
n_parameter_samples = 0
contract_max_n_parameter_samples = self.contract_max_n_parameter_samples
else:
n_parameter_samples = self.n_parameter_samples
contract_max_n_parameter_samples = np.inf
while (
(
train_time < time_limit
and num_classifiers < contract_max_n_parameter_samples
)
or num_classifiers < n_parameter_samples
) and len(possible_parameters) > 0:
parameters = possible_parameters.pop(
rng.randint(0, len(possible_parameters))
)
subsample = rng.choice(
self.n_instances_, size=subsample_size, replace=False
)
X_subsample = X[subsample]
y_subsample = y[subsample]
boss = IndividualBOSS(
*parameters,
alphabet_size=self._alphabet_size,
save_words=False,
n_jobs=self.n_jobs,
feature_selection=self.feature_selection,
random_state=self.random_state,
)
boss.fit(X_subsample, y_subsample)
boss._clean()
boss._subsample = subsample
boss._accuracy = self._individual_train_acc(
boss,
y_subsample,
subsample_size,
0 if num_classifiers < self.max_ensemble_size else lowest_acc,
)
if boss._accuracy > 0:
weight = math.pow(boss._accuracy, 4)
else:
weight = 0.000000001
# Only keep the classifier, if its accuracy is non-zero
if boss._accuracy > 0:
if num_classifiers < self.max_ensemble_size:
if boss._accuracy < lowest_acc:
lowest_acc = boss._accuracy
lowest_acc_idx = num_classifiers
self.weights_.append(weight)
self.estimators_.append(boss)
elif boss._accuracy > lowest_acc:
self.weights_[lowest_acc_idx] = weight
self.estimators_[lowest_acc_idx] = boss
lowest_acc, lowest_acc_idx = self._worst_ensemble_acc()
num_classifiers += 1
train_time = time.time() - start_time
self.n_estimators_ = len(self.estimators_)
self._weight_sum = np.sum(self.weights_)
return self
def _predict(self, X) -> np.ndarray:
"""Predict class values of n instances in X.
Parameters
----------
X : 3D np.ndarray
The data to make predictions for, shape = (n_instances, n_channels,
n_timepoints).
Returns
-------
1D np.ndarray
Predicted class labels shape = (n_instances).
"""
rng = check_random_state(self.random_state)
return np.array(
[
self.classes_[int(rng.choice(np.flatnonzero(prob == prob.max())))]
for prob in self.predict_proba(X)
]
)
def _predict_proba(self, X) -> np.ndarray:
"""Predict class probabilities for n instances in X.
Parameters
----------
X : 3D np.ndarray
The data to make predictions for, shape = (n_instances, n_channels,
n_timepoints).
Returns
-------
2D np.ndarray
Predicted class labels shape = (n_instances).
"""
sums = np.zeros((X.shape[0], self.n_classes_))
for n, clf in enumerate(self.estimators_):
preds = clf.predict(X)
for i in range(X.shape[0]):
sums[i, self._class_dictionary[preds[i]]] += self.weights_[n]
return sums / (np.ones(self.n_classes_) * self._weight_sum)
def _worst_ensemble_acc(self):
min_acc = 1.0
min_acc_idx = -1
for c, classifier in enumerate(self.estimators_):
if classifier._accuracy < min_acc:
min_acc = classifier._accuracy
min_acc_idx = c
return min_acc, min_acc_idx
def _unique_parameters(self, max_window, win_inc):
return [
[win_size, word_len, normalise]
for n, normalise in enumerate(self._norm_options)
for win_size in range(self.min_window, max_window + 1, win_inc)
for g, word_len in enumerate(self._word_lengths)
]
def _get_train_probs(self, X, y) -> np.ndarray:
self.check_is_fitted()
X, y = check_X_y(X, y, coerce_to_numpy=True, enforce_univariate=True)
n_instances, _, series_length = X.shape
if n_instances != self.n_instances_ or series_length != self.series_length_:
raise ValueError(
"n_instances, series_length mismatch. X should be "
"the same as the training data used in fit for generating train "
"probabilities."
)
results = np.zeros((n_instances, self.n_classes_))
divisors = np.zeros(n_instances)
for i, clf in enumerate(self.estimators_):
subsample = clf._subsample
if self.save_train_predictions:
preds = clf._train_predictions
else:
distance_matrix = pairwise_distances(
clf._transformed_data, n_jobs=self.n_jobs
)
preds = [
clf._train_predict(j, distance_matrix)
for j in range(len(subsample))
]
for n, pred in enumerate(preds):
results[subsample[n]][self._class_dictionary[pred]] += self.weights_[i]
divisors[subsample[n]] += self.weights_[i]
for i in range(n_instances):
results[i] = (
np.ones(self.n_classes_) * (1 / self.n_classes_)
if divisors[i] == 0
else results[i] / (np.ones(self.n_classes_) * divisors[i])
)
return results
def _individual_train_acc(self, boss, y, train_size, lowest_acc):
correct = 0
required_correct = int(lowest_acc * train_size)
# there may be no words if feature selection is too aggressive
if boss._transformed_data.shape[1] > 0:
distance_matrix = pairwise_distances(
boss._transformed_data, n_jobs=self.n_jobs
)
for i in range(train_size):
if correct + train_size - i < required_correct:
return -1
c = boss._train_predict(i, distance_matrix)
if c == y[i]:
correct += 1
if self.save_train_predictions:
boss._train_predictions.append(c)
return correct / train_size
@classmethod
def get_test_params(cls, parameter_set="default"):
"""Return testing parameter settings for the estimator.
Parameters
----------
parameter_set : str, default="default"
Name of the set of test parameters to return, for use in tests. If no
special parameters are defined for a value, will return `"default"` set.
ContractableBOSS provides the following special sets:
"results_comparison" - used in some classifiers to compare against
previously generated results where the default set of parameters
cannot produce suitable probability estimates
"contracting" - used in classifiers that set the
"capability:contractable" tag to True to test contacting
functionality
"train_estimate" - used in some classifiers that set the
"capability:train_estimate" tag to True to allow for more efficient
testing when relevant parameters are available
Returns
-------
params : dict or list of dict, default={}
Parameters to create testing instances of the class.
Each dict are parameters to construct an "interesting" test instance, i.e.,
`MyClass(**params)` or `MyClass(**params[i])` creates a valid test instance.
`create_test_instance` uses the first (or only) dictionary in `params`.
"""
if parameter_set == "results_comparison":
return {"n_parameter_samples": 10, "max_ensemble_size": 5}
elif parameter_set == "contracting":
return {
"time_limit_in_minutes": 5,
"contract_max_n_parameter_samples": 4,
"max_ensemble_size": 2,
}
elif parameter_set == "train_estimate":
return {
"n_parameter_samples": 4,
"max_ensemble_size": 2,
"save_train_predictions": True,
}
else:
return {"n_parameter_samples": 4, "max_ensemble_size": 2}