-
Notifications
You must be signed in to change notification settings - Fork 89
/
_sast_classifier.py
205 lines (169 loc) · 6.17 KB
/
_sast_classifier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
"""Scalable and Accurate Subsequence Transform (SAST).
Pipeline classifier using the SAST transformer and an sklearn classifier.
"""
__author__ = ["MichaelMbouopda"]
__all__ = ["SASTClassifier"]
from operator import itemgetter
import numpy as np
from sklearn.linear_model import RidgeClassifierCV
from sklearn.pipeline import make_pipeline
from aeon.base._base import _clone_estimator
from aeon.classification import BaseClassifier
from aeon.transformations.collection.shapelet_based import SAST
from aeon.utils.numba.general import z_normalise_series
class SASTClassifier(BaseClassifier):
"""Classification pipeline using SAST [1]_ transformer and an sklean classifier.
Parameters
----------
length_list : int[], default = None
an array containing the lengths of the subsequences to be generated.
If None, will be infered during fit as np.arange(3, X.shape[1])
stride : int, default = 1
the stride used when generating subsquences
nb_inst_per_class : int default = 1
the number of reference time series to select per class
seed : int, default = None
the seed of the random generator
classifier : sklearn compatible classifier, default = None
if None, a RidgeClassifierCV(alphas=np.logspace(-3, 3, 10)) is used.
n_jobs : int, default -1
Number of threads to use for the transform.
Reference
---------
.. [1] Mbouopda, Michael Franklin, and Engelbert Mephu Nguifo.
"Scalable and accurate subsequence transform for time series classification."
Pattern Recognition 147 (2023): 110121.
https://www.sciencedirect.com/science/article/abs/pii/S003132032300818X,
https://uca.hal.science/hal-03087686/document
Examples
--------
>>> from aeon.classification.shapelet_based import SASTClassifier
>>> from aeon.datasets import load_unit_test
>>> X_train, y_train = load_unit_test(split="train")
>>> X_test, y_test = load_unit_test(split="test")
>>> clf = SASTClassifier()
>>> clf.fit(X_train, y_train)
SASTClassifier(...)
>>> y_pred = clf.predict(X_test)
"""
_tags = {
"capability:multithreading": True,
"capability:multivariate": False,
"algorithm_type": "subsequence",
}
def __init__(
self,
length_list=None,
stride=1,
nb_inst_per_class=1,
seed=None,
classifier=None,
n_jobs=-1,
):
super().__init__()
self.length_list = length_list
self.stride = stride
self.nb_inst_per_class = nb_inst_per_class
self.n_jobs = n_jobs
self.seed = seed
self.classifier = classifier
def _fit(self, X, y):
"""Fit SASTClassifier to the training data.
Parameters
----------
X: np.ndarray shape (n_time_series, n_channels, n_timepoints)
The training input samples.
y: array-like or list
The class values for X.
Return
------
self : SASTClassifier
This pipeline classifier
"""
self._transformer = SAST(
self.length_list,
self.stride,
self.nb_inst_per_class,
self.seed,
self.n_jobs,
)
self._classifier = _clone_estimator(
(
RidgeClassifierCV(alphas=np.logspace(-3, 3, 10))
if self.classifier is None
else self.classifier
),
self.seed,
)
self._pipeline = make_pipeline(self._transformer, self._classifier)
self._pipeline.fit(X, y)
return self
def _predict(self, X):
"""Predict labels for the input.
Parameters
----------
X: np.ndarray shape (n_time_series, n_channels, n_timepoints)
The training input samples.
Return
------
array-like or list
Predicted class labels.
"""
return self._pipeline.predict(X)
def _predict_proba(self, X):
"""Predict labels probabilities for the input.
Parameters
----------
X: np.ndarray shape (n_time_series, n_channels, n_timepoints)
The training input samples.
Return
------
dists : np.ndarray shape (n_time_series, n_timepoints)
Predicted class probabilities.
"""
m = getattr(self._classifier, "predict_proba", None)
if callable(m):
dists = self._pipeline.predict_proba(X)
else:
dists = np.zeros((X.shape[0], self.n_classes_))
preds = self._pipeline.predict(X)
for i in range(0, X.shape[0]):
dists[i, np.where(self.classes_ == preds[i])] = 1
return dists
def plot_most_important_feature_on_ts(self, ts, feature_importance, limit=5):
"""Plot the most important features on ts.
Parameters
----------
ts : float[:]
The time series
feature_importance : float[:]
The importance of each feature in the transformed data
limit : int, default = 5
The maximum number of features to plot
Returns
-------
fig : plt.figure
The figure
"""
import matplotlib.pyplot as plt
features = zip(self._transformer._kernel_orig, feature_importance)
sorted_features = sorted(features, key=itemgetter(1), reverse=True)
max_ = min(limit, len(sorted_features))
fig, axes = plt.subplots(
1, max_, sharey=True, figsize=(3 * max_, 3), tight_layout=True
)
for f in range(max_):
kernel, _ = sorted_features[f]
znorm_kernel = z_normalise_series(kernel)
d_best = np.inf
for i in range(ts.size - kernel.size):
s = ts[i : i + kernel.size]
s = z_normalise_series(s)
d = np.sum((s - znorm_kernel) ** 2)
if d < d_best:
d_best = d
start_pos = i
axes[f].plot(range(start_pos, start_pos + kernel.size), kernel, linewidth=5)
axes[f].plot(range(ts.size), ts, linewidth=2)
axes[f].set_title(f"feature: {f+1}")
return fig