-
Notifications
You must be signed in to change notification settings - Fork 3
/
FeatureExtractor.py
142 lines (135 loc) · 7.92 KB
/
FeatureExtractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import numpy as np
from scipy.signal import welch
import warnings
import antropy
from ..Configuration.FeatureSettings import GeneralFeatureChoices
# Filter out UserWarning messages from the scipy package, could be worth moving to init and applying printdebug print levels? (typically nans, 0 and infs causing errors)
warnings.filterwarnings("ignore", category=UserWarning, module="scipy") # used to reduce print statements from constant signals being applied
warnings.filterwarnings("ignore", category=UserWarning, module="antropy") # used to reduce print statements from constant signals being applied
warnings.filterwarnings("ignore", category=RuntimeWarning, module="antropy") # used to reduce print statements from constant signals being applied
warnings.filterwarnings("ignore", category=RuntimeWarning, module="numpy") # used to reduce print statements from constant signals being applied
#warnings.filterwarnings("ignore", category=RuntimeWarning, module="pybci") # used to reduce print statements from constant signals being applied
class GenericFeatureExtractor():
def __init__(self, freqbands = [[1.0, 4.0], [4.0, 8.0], [8.0, 12.0], [12.0, 20.0]], featureChoices = GeneralFeatureChoices()):
super().__init__()
self.freqbands = freqbands
self.featureChoices = featureChoices
#for key, value in self.featureChoices.__dict__.items():
# print(f"{key} = {value}")
selFeats = sum([#self.featureChoices.appr_entropy,
self.featureChoices.perm_entropy,
self.featureChoices.spec_entropy,
self.featureChoices.svd_entropy,
self.featureChoices.rms,
self.featureChoices.meanPSD,
self.featureChoices.medianPSD,
self.featureChoices.variance,
self.featureChoices.meanAbs,
self.featureChoices.waveformLength,
self.featureChoices.zeroCross,
self.featureChoices.slopeSignChange]
)
self.numFeatures = (len(self.freqbands)*self.featureChoices.psdBand)+selFeats
def ProcessFeatures(self, epoch, sr, target):
"""Allows 2D time series data to be passed with given sample rate to get various time+frequency based features.
Best for EEG, EMG, EOG, or other consistent data with a consistent sample rate (pupil labs does not)
Which features are chosen is based on self.featureChoices with initialisation. self.freqbands sets the limits for
desired frequency bands average power.
Inputs:
epoch = 2D list or 2D numpy array [chs, samples]
target = string of received marker type
sr = samplerate of current device
Returns:
features = 2D numpy array of size (chs, (len(freqbands) + sum(True in self.featureChoices)))
target = same as input target, can be useful for using a baseline number differently
NOTE: Any channels with a constant value will generate warnings in any frequency based features (constant level == no frequency components).
"""
numchs = epoch.shape[1]
features = np.zeros(numchs * self.numFeatures)
ct = 0
for ch in range(epoch.shape[1]):
#ch = np.isnan(ch)
if self.featureChoices.psdBand: # get custom average power within given frequency band from freqbands
freqs, psd = welch(epoch[:,ch], sr, nperseg = len(epoch[:,ch]))
for ct, band in enumerate(self.freqbands):
if len(freqs) > 0: # len(freqs) can be 0 if signal is all DC
idx_band = np.logical_and(freqs >= band[0], freqs <= band[1])
#if len(psd[idx_band]) == 1: # if freq band is only in one field just pass single value instead of calculating average
#print(ch)
bp = np.mean(psd[idx_band])
#else:
# bp = simps(psd[idx_band], dx=(freqs[1]-freqs[0])) / (band[1] - band[0])
#bp = simpson(psd[idx_band], dx=freq_res)
features[(ch* self.numFeatures)+ct] = bp
else:
features[(ch* self.numFeatures)+ct] = 0
else:
freqs, psd = welch(epoch[:,ch], sr, nperseg = len(epoch[:,ch]))# calculate for mean and median
ct = -1 # accounts for no freqbands being selected
if self.featureChoices.meanPSD: # mean power
ct += 1
if len(freqs) > 0:
features[(ch* self.numFeatures)+ct] = np.mean(psd) # len(freqs) can be 0 if signal is all DC
else:
features[(ch* self.numFeatures)+ct] = 0
if self.featureChoices.medianPSD: # median Power
ct += 1
if len(freqs) > 0:
features[(ch* self.numFeatures)+ct] = np.median(psd) # len(freqs) can be 0 if signal is all DC
else:
features[(ch* self.numFeatures)+ct] = 0
#if self.featureChoices.appr_entropy: # Approximate entropy(X,M,R) X = data, M is , R is 30% standard deviation of X
# ct += 1
# features[(ch* self.numFeatures)+ct] = antropy.app_entropy(epoch[:,ch])
if self.featureChoices.perm_entropy: # permutation_entropy
ct += 1
features[(ch* self.numFeatures)+ct] = antropy.perm_entropy(epoch[:,ch],normalize=True)
if self.featureChoices.spec_entropy: # spectral Entropy
ct += 1
features[(ch* self.numFeatures)+ct] = antropy.spectral_entropy(epoch[:,ch], sf=sr, method='welch', nperseg = len(epoch[:,ch]), normalize=True)
if self.featureChoices.svd_entropy:# svd Entropy
ct += 1
features[(ch* self.numFeatures)+ct] = antropy.svd_entropy(epoch[:,ch], normalize=True)
if self.featureChoices.rms: # rms
ct += 1
features[(ch* self.numFeatures)+ct] = np.sqrt(np.mean(np.array(epoch[:,ch])**2))
if self.featureChoices.variance: # variance
ct += 1
features[(ch* self.numFeatures)+ct] = np.var(epoch[:,ch])
if self.featureChoices.meanAbs: # Mean Absolute Value
ct += 1
try:
features[(ch* self.numFeatures)+ct] = sum([np.linalg.norm(c) for c in epoch[:,ch]])/len(epoch[:,ch])
except ImportError:
features[(ch* self.numFeatures)+ct] = 0
if self.featureChoices.waveformLength: # waveformLength
ct += 1
try:
features[(ch* self.numFeatures)+ct] = sum([np.linalg.norm(c-epoch[inum,ch]) for inum, c in enumerate(epoch[1:,ch])])
except ImportError:
features[(ch* self.numFeatures)+ct] = 0
if self.featureChoices.zeroCross: # zeroCross
ct += 1
features[(ch* self.numFeatures)+ct] = sum([1 if c*epoch[inum+1,ch]<0 else 0 for inum, c in enumerate(epoch[:-1,ch])])
if self.featureChoices.slopeSignChange: # slopeSignChange
ct += 1
ssc = sum([1 if (c-epoch[inum+1,ch])*(c-epoch[inum+1,ch])>=0.1 else 0 for inum, c in enumerate(epoch[:-1,ch])])
features[(ch* self.numFeatures)+ct] = ssc
features[np.isnan(features)] = 0 # checks for nans
features[features == np.inf] = 0#np.iinfo(np.int32).max
#print(features)
return features
class GazeFeatureExtractor():
def __init__(self):
super().__init__()
'''pupil channels in order
confidence: 1 channel
norm_pos_x/y: 2 channels
gaze_point_3d_x/y/z: 3 channels
eye_center0_3d_x/y/z (right/left, x/y/z): 6 channels (3 channels for each eye)
gaze_normal0/1_x/y/z (right/left, x/y/z): 6 channels (3 channels for each eye)
norm_pos_x/y: 2 channels
diameter0/1_2d (right/left): 2 channels
diameter0/1_3d (right/left): 2 channels
22 total
'''