-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
regression.py
386 lines (335 loc) · 16.8 KB
/
regression.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
# Authors: Tal Linzen <linzen@nyu.edu>
# Teon Brooks <teon.brooks@gmail.com>
# Denis A. Engemann <denis.engemann@gmail.com>
# Jona Sassenhagen <jona.sassenhagen@gmail.com>
# Marijn van Vliet <w.m.vanvliet@gmail.com>
#
# License: BSD (3-clause)
from inspect import isgenerator
from collections import namedtuple
import numpy as np
from scipy import linalg, sparse
from ..externals.six import string_types
from ..source_estimate import SourceEstimate
from ..epochs import BaseEpochs
from ..evoked import Evoked, EvokedArray
from ..utils import logger, _reject_data_segments, warn
from ..io.pick import pick_types, pick_info
def linear_regression(inst, design_matrix, names=None):
"""Fit Ordinary Least Squares regression (OLS).
Parameters
----------
inst : instance of Epochs | iterable of SourceEstimate
The data to be regressed. Contains all the trials, sensors, and time
points for the regression. For Source Estimates, accepts either a list
or a generator object.
design_matrix : ndarray, shape (n_observations, n_regressors)
The regressors to be used. Must be a 2d array with as many rows as
the first dimension of `data`. The first column of this matrix will
typically consist of ones (intercept column).
names : list-like | None
Optional parameter to name the regressors. If provided, the length must
correspond to the number of columns present in regressors
(including the intercept, if present).
Otherwise the default names are x0, x1, x2...xn for n regressors.
Returns
-------
results : dict of namedtuple
For each regressor (key) a namedtuple is provided with the
following attributes:
beta : regression coefficients
stderr : standard error of regression coefficients
t_val : t statistics (beta / stderr)
p_val : two-sided p-value of t statistic under the t distribution
mlog10_p_val : -log10 transformed p-value.
The tuple members are numpy arrays. The shape of each numpy array is
the shape of the data minus the first dimension; e.g., if the shape of
the original data was (n_observations, n_channels, n_timepoints),
then the shape of each of the arrays will be
(n_channels, n_timepoints).
"""
if names is None:
names = ['x%i' % i for i in range(design_matrix.shape[1])]
if isinstance(inst, BaseEpochs):
picks = pick_types(inst.info, meg=True, eeg=True, ref_meg=True,
stim=False, eog=False, ecg=False,
emg=False, exclude=['bads'])
if [inst.ch_names[p] for p in picks] != inst.ch_names:
warn('Fitting linear model to non-data or bad channels. '
'Check picking')
msg = 'Fitting linear model to epochs'
data = inst.get_data()
out = EvokedArray(np.zeros(data.shape[1:]), inst.info, inst.tmin)
elif isgenerator(inst):
msg = 'Fitting linear model to source estimates (generator input)'
out = next(inst)
data = np.array([out.data] + [i.data for i in inst])
elif isinstance(inst, list) and isinstance(inst[0], SourceEstimate):
msg = 'Fitting linear model to source estimates (list input)'
out = inst[0]
data = np.array([i.data for i in inst])
else:
raise ValueError('Input must be epochs or iterable of source '
'estimates')
logger.info(msg + ', (%s targets, %s regressors)' %
(np.product(data.shape[1:]), len(names)))
lm_params = _fit_lm(data, design_matrix, names)
lm = namedtuple('lm', 'beta stderr t_val p_val mlog10_p_val')
lm_fits = {}
for name in names:
parameters = [p[name] for p in lm_params]
for ii, value in enumerate(parameters):
out_ = out.copy()
if not isinstance(out_, (SourceEstimate, Evoked)):
raise RuntimeError('Invalid container.')
out_._data[:] = value
parameters[ii] = out_
lm_fits[name] = lm(*parameters)
logger.info('Done')
return lm_fits
def _fit_lm(data, design_matrix, names):
"""Aux function."""
from scipy import stats
n_samples = len(data)
n_features = np.product(data.shape[1:])
if design_matrix.ndim != 2:
raise ValueError('Design matrix must be a 2d array')
n_rows, n_predictors = design_matrix.shape
if n_samples != n_rows:
raise ValueError('Number of rows in design matrix must be equal '
'to number of observations')
if n_predictors != len(names):
raise ValueError('Number of regressor names must be equal to '
'number of column in design matrix')
y = np.reshape(data, (n_samples, n_features))
betas, resid_sum_squares, _, _ = linalg.lstsq(a=design_matrix, b=y)
df = n_rows - n_predictors
sqrt_noise_var = np.sqrt(resid_sum_squares / df).reshape(data.shape[1:])
design_invcov = linalg.inv(np.dot(design_matrix.T, design_matrix))
unscaled_stderrs = np.sqrt(np.diag(design_invcov))
tiny = np.finfo(np.float64).tiny
beta, stderr, t_val, p_val, mlog10_p_val = (dict() for _ in range(5))
for x, unscaled_stderr, predictor in zip(betas, unscaled_stderrs, names):
beta[predictor] = x.reshape(data.shape[1:])
stderr[predictor] = sqrt_noise_var * unscaled_stderr
p_val[predictor] = np.empty_like(stderr[predictor])
t_val[predictor] = np.empty_like(stderr[predictor])
stderr_pos = (stderr[predictor] > 0)
beta_pos = (beta[predictor] > 0)
t_val[predictor][stderr_pos] = (beta[predictor][stderr_pos] /
stderr[predictor][stderr_pos])
cdf = stats.t.cdf(np.abs(t_val[predictor][stderr_pos]), df)
p_val[predictor][stderr_pos] = np.clip((1. - cdf) * 2., tiny, 1.)
# degenerate cases
mask = (~stderr_pos & beta_pos)
t_val[predictor][mask] = np.inf * np.sign(beta[predictor][mask])
p_val[predictor][mask] = tiny
# could do NaN here, but hopefully this is safe enough
mask = (~stderr_pos & ~beta_pos)
t_val[predictor][mask] = 0
p_val[predictor][mask] = 1.
mlog10_p_val[predictor] = -np.log10(p_val[predictor])
return beta, stderr, t_val, p_val, mlog10_p_val
def linear_regression_raw(raw, events, event_id=None, tmin=-.1, tmax=1,
covariates=None, reject=None, flat=None, tstep=1.,
decim=1, picks=None, solver='cholesky'):
"""Estimate regression-based evoked potentials/fields by linear modeling.
This models the full M/EEG time course, including correction for
overlapping potentials and allowing for continuous/scalar predictors.
Internally, this constructs a predictor matrix X of size
n_samples * (n_conds * window length), solving the linear system
``Y = bX`` and returning ``b`` as evoked-like time series split by
condition. See [1]_.
Parameters
----------
raw : instance of Raw
A raw object. Note: be very careful about data that is not
downsampled, as the resulting matrices can be enormous and easily
overload your computer. Typically, 100 Hz sampling rate is
appropriate - or using the decim keyword (see below).
events : ndarray of int, shape (n_events, 3)
An array where the first column corresponds to samples in raw
and the last to integer codes in event_id.
event_id : dict
As in Epochs; a dictionary where the values may be integers or
iterables of integers, corresponding to the 3rd column of
events, and the keys are condition names.
tmin : float | dict
If float, gives the lower limit (in seconds) for the time window for
which all event types' effects are estimated. If a dict, can be used to
specify time windows for specific event types: keys correspond to keys
in event_id and/or covariates; for missing values, the default (-.1) is
used.
tmax : float | dict
If float, gives the upper limit (in seconds) for the time window for
which all event types' effects are estimated. If a dict, can be used to
specify time windows for specific event types: keys correspond to keys
in event_id and/or covariates; for missing values, the default (1.) is
used.
covariates : dict-like | None
If dict-like (e.g., a pandas DataFrame), values have to be array-like
and of the same length as the columns in ```events```. Keys correspond
to additional event types/conditions to be estimated and are matched
with the time points given by the first column of ```events```. If
None, only binary events (from event_id) are used.
reject : None | dict
For cleaning raw data before the regression is performed: set up
rejection parameters based on peak-to-peak amplitude in continuously
selected subepochs. If None, no rejection is done.
If dict, keys are types ('grad' | 'mag' | 'eeg' | 'eog' | 'ecg')
and values are the maximal peak-to-peak values to select rejected
epochs, e.g.::
reject = dict(grad=4000e-12, # T / m (gradiometers)
mag=4e-11, # T (magnetometers)
eeg=40e-5, # V (EEG channels)
eog=250e-5 # V (EOG channels))
flat : None | dict
or cleaning raw data before the regression is performed: set up
rejection parameters based on flatness of the signal. If None, no
rejection is done. If a dict, keys are ('grad' | 'mag' |
'eeg' | 'eog' | 'ecg') and values are minimal peak-to-peak values to
select rejected epochs.
tstep : float
Length of windows for peak-to-peak detection for raw data cleaning.
decim : int
Decimate by choosing only a subsample of data points. Highly
recommended for data recorded at high sampling frequencies, as
otherwise huge intermediate matrices have to be created and inverted.
picks : None | list
List of indices of channels to be included. If None, defaults to all
MEG and EEG channels.
solver : str | function
Either a function which takes as its inputs the sparse predictor
matrix X and the observation matrix Y, and returns the coefficient
matrix b; or a string. If str, must be ``'cholesky'``, in which case
the solver used is ``linalg.solve(dot(X.T, X), dot(X.T, y))``.
Returns
-------
evokeds : dict
A dict where the keys correspond to conditions and the values are
Evoked objects with the ER[F/P]s. These can be used exactly like any
other Evoked object, including e.g. plotting or statistics.
References
----------
.. [1] Smith, N. J., & Kutas, M. (2015). Regression-based estimation of ERP
waveforms: II. Non-linear effects, overlap correction, and practical
considerations. Psychophysiology, 52(2), 169-189.
"""
if isinstance(solver, string_types):
if solver == 'cholesky':
def solver(X, y):
a = (X.T * X).toarray() # dot product of sparse matrices
return linalg.solve(a, X.T * y.T, sym_pos=True,
overwrite_a=True, overwrite_b=True).T
else:
raise ValueError("No such solver: {0}".format(solver))
# build data
data, info, events = _prepare_rerp_data(raw, events, picks=picks,
decim=decim)
# build predictors
X, conds, cond_length, tmin_s, tmax_s = _prepare_rerp_preds(
n_samples=data.shape[1], sfreq=info["sfreq"], events=events,
event_id=event_id, tmin=tmin, tmax=tmax, covariates=covariates)
# remove "empty" and contaminated data points
X, data = _clean_rerp_input(X, data, reject, flat, decim, info, tstep)
# solve linear system
coefs = solver(X, data)
# construct Evoked objects to be returned from output
evokeds = _make_evokeds(coefs, conds, cond_length, tmin_s, tmax_s, info)
return evokeds
def _prepare_rerp_data(raw, events, picks=None, decim=1):
"""Prepare events and data, primarily for `linear_regression_raw`."""
if picks is None:
picks = pick_types(raw.info, meg=True, eeg=True, ref_meg=True)
info = pick_info(raw.info, picks)
decim = int(decim)
info["sfreq"] /= decim
data, times = raw[:]
data = data[picks, ::decim]
if len(set(events[:, 0])) < len(events[:, 0]):
raise ValueError("`events` contains duplicate time points. Make "
"sure all entries in the first column of `events` "
"are unique.")
events = events.copy()
events[:, 0] -= raw.first_samp
events[:, 0] //= decim
if len(set(events[:, 0])) < len(events[:, 0]):
raise ValueError("After decimating, `events` contains duplicate time "
"points. This means some events are too closely "
"spaced for the requested decimation factor. Choose "
"different events, drop close events, or choose a "
"different decimation factor.")
return data, info, events
def _prepare_rerp_preds(n_samples, sfreq, events, event_id=None, tmin=-.1,
tmax=1, covariates=None):
"""Build predictor matrix and metadata (e.g. condition time windows)."""
conds = list(event_id)
if covariates is not None:
conds += list(covariates)
# time windows (per event type) are converted to sample points from times
if isinstance(tmin, (float, int)):
tmin_s = dict((cond, int(tmin * sfreq)) for cond in conds)
else:
tmin_s = dict((cond, int(tmin.get(cond, -.1) * sfreq))
for cond in conds)
if isinstance(tmax, (float, int)):
tmax_s = dict(
(cond, int((tmax * sfreq) + 1.)) for cond in conds)
else:
tmax_s = dict((cond, int((tmax.get(cond, 1.) * sfreq) + 1))
for cond in conds)
# Construct predictor matrix
# We do this by creating one array per event type, shape (lags, samples)
# (where lags depends on tmin/tmax and can be different for different
# event types). Columns correspond to predictors, predictors correspond to
# time lags. Thus, each array is mostly sparse, with one diagonal of 1s
# per event (for binary predictors).
cond_length = dict()
xs = []
for cond in conds:
tmin_, tmax_ = tmin_s[cond], tmax_s[cond]
n_lags = int(tmax_ - tmin_) # width of matrix
if cond in event_id: # for binary predictors
ids = ([event_id[cond]]
if isinstance(event_id[cond], int)
else event_id[cond])
onsets = -(events[np.in1d(events[:, 2], ids), 0] + tmin_)
values = np.ones((len(onsets), n_lags))
else: # for predictors from covariates, e.g. continuous ones
covs = covariates[cond]
if len(covs) != len(events):
error = ("Condition {0} from ```covariates``` is "
"not the same length as ```events```").format(cond)
raise ValueError(error)
onsets = -(events[np.where(covs != 0), 0] + tmin_)[0]
v = np.asarray(covs)[np.nonzero(covs)].astype(float)
values = np.ones((len(onsets), n_lags)) * v[:, np.newaxis]
cond_length[cond] = len(onsets)
xs.append(sparse.dia_matrix((values, onsets),
shape=(n_samples, n_lags)))
return sparse.hstack(xs), conds, cond_length, tmin_s, tmax_s
def _clean_rerp_input(X, data, reject, flat, decim, info, tstep):
"""Remove empty and contaminated points from data & predictor matrices."""
# find only those positions where at least one predictor isn't 0
has_val = np.unique(X.nonzero()[0])
# reject positions based on extreme steps in the data
if reject is not None:
_, inds = _reject_data_segments(data, reject, flat, decim=None,
info=info, tstep=tstep)
for t0, t1 in inds:
has_val = np.setdiff1d(has_val, range(t0, t1))
return X.tocsr()[has_val], data[:, has_val]
def _make_evokeds(coefs, conds, cond_length, tmin_s, tmax_s, info):
"""Create a dictionary of Evoked objects.
These will be created from a coefs matrix and condition durations.
"""
evokeds = dict()
cumul = 0
for cond in conds:
tmin_, tmax_ = tmin_s[cond], tmax_s[cond]
evokeds[cond] = EvokedArray(
coefs[:, cumul:cumul + tmax_ - tmin_], info=info, comment=cond,
tmin=tmin_ / float(info["sfreq"]), nave=cond_length[cond],
kind='average') # nave and kind are technically incorrect
cumul += tmax_ - tmin_
return evokeds