/
_compute_beamformer.py
609 lines (541 loc) · 20.1 KB
/
_compute_beamformer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
"""Functions shared between different beamformer types."""
# Authors: Alexandre Gramfort <alexandre.gramfort@inria.fr>
# Roman Goj <roman.goj@gmail.com>
# Britta Westner <britta.wstnr@gmail.com>
#
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.
from copy import deepcopy
import numpy as np
from .._fiff.proj import Projection, make_projector
from ..cov import Covariance, make_ad_hoc_cov
from ..forward.forward import _restrict_forward_to_src_sel, is_fixed_orient
from ..minimum_norm.inverse import _get_vertno, _prepare_forward
from ..source_space._source_space import label_src_vertno_sel
from ..time_frequency.csd import CrossSpectralDensity
from ..utils import (
_check_option,
_check_src_normal,
_import_h5io_funcs,
_pl,
_reg_pinv,
_sym_mat_pow,
check_fname,
logger,
verbose,
warn,
)
def _check_proj_match(proj, filters):
"""Check whether SSP projections in data and spatial filter match."""
proj_data, _, _ = make_projector(proj, filters["ch_names"])
if not np.allclose(
proj_data, filters["proj"], atol=np.finfo(float).eps, rtol=1e-13
):
raise ValueError(
"The SSP projections present in the data "
"do not match the projections used when "
"calculating the spatial filter."
)
def _check_src_type(filters):
"""Check whether src_type is in filters and set custom warning."""
if "src_type" not in filters:
filters["src_type"] = None
warn_text = (
"The spatial filter does not contain src_type and a robust "
"guess of src_type is not possible without src. Consider "
"recomputing the filter."
)
return filters, warn_text
def _prepare_beamformer_input(
info,
forward,
label=None,
pick_ori=None,
noise_cov=None,
rank=None,
pca=False,
loose=None,
combine_xyz="fro",
exp=None,
limit=None,
allow_fixed_depth=True,
limit_depth_chs=False,
):
"""Input preparation common for LCMV, DICS, and RAP-MUSIC."""
_check_option("pick_ori", pick_ori, ("normal", "max-power", "vector", None))
# Restrict forward solution to selected vertices
if label is not None:
_, src_sel = label_src_vertno_sel(label, forward["src"])
forward = _restrict_forward_to_src_sel(forward, src_sel)
if loose is None:
loose = 0.0 if is_fixed_orient(forward) else 1.0
# TODO: Deduplicate with _check_one_ch_type, should not be necessary
# (DICS hits this code path, LCMV does not)
if noise_cov is None:
noise_cov = make_ad_hoc_cov(info, std=1.0)
(
forward,
info_picked,
gain,
_,
orient_prior,
_,
trace_GRGT,
noise_cov,
whitener,
) = _prepare_forward(
forward,
info,
noise_cov,
"auto",
loose,
rank=rank,
pca=pca,
use_cps=True,
exp=exp,
limit_depth_chs=limit_depth_chs,
combine_xyz=combine_xyz,
limit=limit,
allow_fixed_depth=allow_fixed_depth,
)
is_free_ori = not is_fixed_orient(forward) # could have been changed
nn = forward["source_nn"]
if is_free_ori: # take Z coordinate
nn = nn[2::3]
nn = nn.copy()
vertno = _get_vertno(forward["src"])
if forward["surf_ori"]:
nn[...] = [0, 0, 1] # align to local +Z coordinate
if pick_ori is not None and not is_free_ori:
raise ValueError(
"Normal or max-power orientation (got %r) can only be picked when "
"a forward operator with free orientation is used." % (pick_ori,)
)
if pick_ori == "normal" and not forward["surf_ori"]:
raise ValueError(
"Normal orientation can only be picked when a "
"forward operator oriented in surface coordinates is "
"used."
)
_check_src_normal(pick_ori, forward["src"])
del forward, info
# Undo the scaling that MNE prefers
scale = np.sqrt((noise_cov["eig"] > 0).sum() / trace_GRGT)
gain /= scale
if orient_prior is not None:
orient_std = np.sqrt(orient_prior)
else:
orient_std = np.ones(gain.shape[1])
# Get the projector
proj, _, _ = make_projector(info_picked["projs"], info_picked["ch_names"])
return (is_free_ori, info_picked, proj, vertno, gain, whitener, nn, orient_std)
def _reduce_leadfield_rank(G):
"""Reduce the rank of the leadfield."""
# decompose lead field
u, s, v = np.linalg.svd(G, full_matrices=False)
# backproject, omitting one direction (equivalent to setting the smallest
# singular value to zero)
G = np.matmul(u[:, :, :-1], s[:, :-1, np.newaxis] * v[:, :-1, :])
return G
def _sym_inv_sm(x, reduce_rank, inversion, sk):
"""Symmetric inversion with single- or matrix-style inversion."""
if x.shape[1:] == (1, 1):
with np.errstate(divide="ignore", invalid="ignore"):
x_inv = 1.0 / x
x_inv[~np.isfinite(x_inv)] = 1.0
else:
assert x.shape[1:] == (3, 3)
if inversion == "matrix":
x_inv = _sym_mat_pow(x, -1, reduce_rank=reduce_rank)
# Reapply source covariance after inversion
x_inv *= sk[:, :, np.newaxis]
x_inv *= sk[:, np.newaxis, :]
else:
# Invert for each dipole separately using plain division
diags = np.diagonal(x, axis1=1, axis2=2)
assert not reduce_rank # guaranteed earlier
with np.errstate(divide="ignore"):
diags = 1.0 / diags
# set the diagonal of each 3x3
x_inv = np.zeros_like(x)
for k in range(x.shape[0]):
this = diags[k]
# Reapply source covariance after inversion
this *= sk[k] * sk[k]
x_inv[k].flat[::4] = this
return x_inv
def _compute_beamformer(
G,
Cm,
reg,
n_orient,
weight_norm,
pick_ori,
reduce_rank,
rank,
inversion,
nn,
orient_std,
whitener,
):
"""Compute a spatial beamformer filter (LCMV or DICS).
For more detailed information on the parameters, see the docstrings of
`make_lcmv` and `make_dics`.
Parameters
----------
G : ndarray, shape (n_dipoles, n_channels)
The leadfield.
Cm : ndarray, shape (n_channels, n_channels)
The data covariance matrix.
reg : float
Regularization parameter.
n_orient : int
Number of dipole orientations defined at each source point
weight_norm : None | 'unit-noise-gain' | 'nai'
The weight normalization scheme to use.
pick_ori : None | 'normal' | 'max-power'
The source orientation to compute the beamformer in.
reduce_rank : bool
Whether to reduce the rank by one during computation of the filter.
rank : dict | None | 'full' | 'info'
See compute_rank.
inversion : 'matrix' | 'single'
The inversion scheme to compute the weights.
nn : ndarray, shape (n_dipoles, 3)
The source normals.
orient_std : ndarray, shape (n_dipoles,)
The std of the orientation prior used in weighting the lead fields.
whitener : ndarray, shape (n_channels, n_channels)
The whitener.
Returns
-------
W : ndarray, shape (n_dipoles, n_channels)
The beamformer filter weights.
"""
_check_option(
"weight_norm",
weight_norm,
["unit-noise-gain-invariant", "unit-noise-gain", "nai", None],
)
# Whiten the data covariance
Cm = whitener @ Cm @ whitener.T.conj()
# Restore to properly Hermitian as large whitening coefs can have bad
# rounding error
Cm[:] = (Cm + Cm.T.conj()) / 2.0
assert Cm.shape == (G.shape[0],) * 2
s, _ = np.linalg.eigh(Cm)
if not (s >= -s.max() * 1e-7).all():
# This shouldn't ever happen, but just in case
warn(
"data covariance does not appear to be positive semidefinite, "
"results will likely be incorrect"
)
# Tikhonov regularization using reg parameter to control for
# trade-off between spatial resolution and noise sensitivity
# eq. 25 in Gross and Ioannides, 1999 Phys. Med. Biol. 44 2081
Cm_inv, loading_factor, rank = _reg_pinv(Cm, reg, rank)
assert orient_std.shape == (G.shape[1],)
n_sources = G.shape[1] // n_orient
assert nn.shape == (n_sources, 3)
logger.info(
"Computing beamformer filters for %d source%s" % (n_sources, _pl(n_sources))
)
n_channels = G.shape[0]
assert n_orient in (3, 1)
Gk = np.reshape(G.T, (n_sources, n_orient, n_channels)).transpose(0, 2, 1)
assert Gk.shape == (n_sources, n_channels, n_orient)
sk = np.reshape(orient_std, (n_sources, n_orient))
del G, orient_std
_check_option("reduce_rank", reduce_rank, (True, False))
# inversion of the denominator
_check_option("inversion", inversion, ("matrix", "single"))
if (
inversion == "single"
and n_orient > 1
and pick_ori == "vector"
and weight_norm == "unit-noise-gain-invariant"
):
raise ValueError(
'Cannot use pick_ori="vector" with inversion="single" and '
'weight_norm="unit-noise-gain-invariant"'
)
if reduce_rank and inversion == "single":
raise ValueError(
'reduce_rank cannot be used with inversion="single"; '
'consider using inversion="matrix" if you have a '
"rank-deficient forward model (i.e., from a sphere "
"model with MEG channels), otherwise consider using "
"reduce_rank=False"
)
if n_orient > 1:
_, Gk_s, _ = np.linalg.svd(Gk, full_matrices=False)
assert Gk_s.shape == (n_sources, n_orient)
if not reduce_rank and (Gk_s[:, 0] > 1e6 * Gk_s[:, 2]).any():
raise ValueError(
"Singular matrix detected when estimating spatial filters. "
"Consider reducing the rank of the forward operator by using "
"reduce_rank=True."
)
del Gk_s
#
# 1. Reduce rank of the lead field
#
if reduce_rank:
Gk = _reduce_leadfield_rank(Gk)
def _compute_bf_terms(Gk, Cm_inv):
bf_numer = np.matmul(Gk.swapaxes(-2, -1).conj(), Cm_inv)
bf_denom = np.matmul(bf_numer, Gk)
return bf_numer, bf_denom
#
# 2. Reorient lead field in direction of max power or normal
#
if pick_ori == "max-power":
assert n_orient == 3
_, bf_denom = _compute_bf_terms(Gk, Cm_inv)
if weight_norm is None:
ori_numer = np.eye(n_orient)[np.newaxis]
ori_denom = bf_denom
else:
# compute power, cf Sekihara & Nagarajan 2008, eq. 4.47
ori_numer = bf_denom
# Cm_inv should be Hermitian so no need for .T.conj()
ori_denom = np.matmul(
np.matmul(Gk.swapaxes(-2, -1).conj(), Cm_inv @ Cm_inv), Gk
)
ori_denom_inv = _sym_inv_sm(ori_denom, reduce_rank, inversion, sk)
ori_pick = np.matmul(ori_denom_inv, ori_numer)
assert ori_pick.shape == (n_sources, n_orient, n_orient)
# pick eigenvector that corresponds to maximum eigenvalue:
eig_vals, eig_vecs = np.linalg.eig(ori_pick.real) # not Hermitian!
# sort eigenvectors by eigenvalues for picking:
order = np.argsort(np.abs(eig_vals), axis=-1)
# eig_vals = np.take_along_axis(eig_vals, order, axis=-1)
max_power_ori = eig_vecs[np.arange(len(eig_vecs)), :, order[:, -1]]
assert max_power_ori.shape == (n_sources, n_orient)
# set the (otherwise arbitrary) sign to match the normal
signs = np.sign(np.sum(max_power_ori * nn, axis=1, keepdims=True))
signs[signs == 0] = 1.0
max_power_ori *= signs
# Compute the lead field for the optimal orientation,
# and adjust numer/denom
Gk = np.matmul(Gk, max_power_ori[..., np.newaxis])
n_orient = 1
else:
max_power_ori = None
if pick_ori == "normal":
Gk = Gk[..., 2:3]
n_orient = 1
#
# 3. Compute numerator and denominator of beamformer formula (unit-gain)
#
bf_numer, bf_denom = _compute_bf_terms(Gk, Cm_inv)
assert bf_denom.shape == (n_sources,) + (n_orient,) * 2
assert bf_numer.shape == (n_sources, n_orient, n_channels)
del Gk # lead field has been adjusted and should not be used anymore
#
# 4. Invert the denominator
#
# Here W is W_ug, i.e.:
# G.T @ Cm_inv / (G.T @ Cm_inv @ G)
bf_denom_inv = _sym_inv_sm(bf_denom, reduce_rank, inversion, sk)
assert bf_denom_inv.shape == (n_sources, n_orient, n_orient)
W = np.matmul(bf_denom_inv, bf_numer)
assert W.shape == (n_sources, n_orient, n_channels)
del bf_denom_inv, sk
#
# 5. Re-scale filter weights according to the selected weight_norm
#
# Weight normalization is done by computing, for each source::
#
# W_ung = W_ug / sqrt(W_ug @ W_ug.T)
#
# with W_ung referring to the unit-noise-gain (weight normalized) filter
# and W_ug referring to the above-calculated unit-gain filter stored in W.
if weight_norm is not None:
# Three different ways to calculate the normalization factors here.
# Only matters when in vector mode, as otherwise n_orient == 1 and
# they are all equivalent.
#
# In MNE < 0.21, we just used the Frobenius matrix norm:
#
# noise_norm = np.linalg.norm(W, axis=(1, 2), keepdims=True)
# assert noise_norm.shape == (n_sources, 1, 1)
# W /= noise_norm
#
# Sekihara 2008 says to use sqrt(diag(W_ug @ W_ug.T)), which is not
# rotation invariant:
if weight_norm in ("unit-noise-gain", "nai"):
noise_norm = np.matmul(W, W.swapaxes(-2, -1).conj()).real
noise_norm = np.reshape( # np.diag operation over last two axes
noise_norm, (n_sources, -1, 1)
)[:, :: n_orient + 1]
np.sqrt(noise_norm, out=noise_norm)
noise_norm[noise_norm == 0] = np.inf
assert noise_norm.shape == (n_sources, n_orient, 1)
W /= noise_norm
else:
assert weight_norm == "unit-noise-gain-invariant"
# Here we use sqrtm. The shortcut:
#
# use = W
#
# ... does not match the direct route (it is rotated!), so we'll
# use the direct one to match FieldTrip:
use = bf_numer
inner = np.matmul(use, use.swapaxes(-2, -1).conj())
W = np.matmul(_sym_mat_pow(inner, -0.5), use)
noise_norm = 1.0
if weight_norm == "nai":
# Estimate noise level based on covariance matrix, taking the
# first eigenvalue that falls outside the signal subspace or the
# loading factor used during regularization, whichever is largest.
if rank > len(Cm):
# Covariance matrix is full rank, no noise subspace!
# Use the loading factor as noise ceiling.
if loading_factor == 0:
raise RuntimeError(
"Cannot compute noise subspace with a full-rank "
"covariance matrix and no regularization. Try "
"manually specifying the rank of the covariance "
"matrix or using regularization."
)
noise = loading_factor
else:
noise, _ = np.linalg.eigh(Cm)
noise = noise[-rank]
noise = max(noise, loading_factor)
W /= np.sqrt(noise)
W = W.reshape(n_sources * n_orient, n_channels)
logger.info("Filter computation complete")
return W, max_power_ori
def _compute_power(Cm, W, n_orient):
"""Use beamformer filters to compute source power.
Parameters
----------
Cm : ndarray, shape (n_channels, n_channels)
Data covariance matrix or CSD matrix.
W : ndarray, shape (nvertices*norient, nchannels)
Beamformer weights.
Returns
-------
power : ndarray, shape (nvertices,)
Source power.
"""
n_sources = W.shape[0] // n_orient
Wk = W.reshape(n_sources, n_orient, W.shape[1])
source_power = np.trace(
(Wk @ Cm @ Wk.conj().transpose(0, 2, 1)).real, axis1=1, axis2=2
)
return source_power
class Beamformer(dict):
"""A computed beamformer.
Notes
-----
.. versionadded:: 0.17
"""
def copy(self):
"""Copy the beamformer.
Returns
-------
beamformer : instance of Beamformer
A deep copy of the beamformer.
"""
return deepcopy(self)
def __repr__(self): # noqa: D105
n_verts = sum(len(v) for v in self["vertices"])
n_channels = len(self["ch_names"])
if self["subject"] is None:
subject = "unknown"
else:
subject = '"%s"' % (self["subject"],)
out = "<Beamformer | %s, subject %s, %s vert, %s ch" % (
self["kind"],
subject,
n_verts,
n_channels,
)
if self["pick_ori"] is not None:
out += ", %s ori" % (self["pick_ori"],)
if self["weight_norm"] is not None:
out += ", %s norm" % (self["weight_norm"],)
if self.get("inversion") is not None:
out += ", %s inversion" % (self["inversion"],)
if "rank" in self:
out += ", rank %s" % (self["rank"],)
out += ">"
return out
@verbose
def save(self, fname, overwrite=False, verbose=None):
"""Save the beamformer filter.
Parameters
----------
fname : path-like
The filename to use to write the HDF5 data.
Should end in ``'-lcmv.h5'`` or ``'-dics.h5'``.
%(overwrite)s
%(verbose)s
"""
_, write_hdf5 = _import_h5io_funcs()
ending = "-%s.h5" % (self["kind"].lower(),)
check_fname(fname, self["kind"], (ending,))
csd_orig = None
try:
if "csd" in self:
csd_orig = self["csd"]
self["csd"] = self["csd"].__getstate__()
write_hdf5(fname, self, overwrite=overwrite, title="mnepython")
finally:
if csd_orig is not None:
self["csd"] = csd_orig
def read_beamformer(fname):
"""Read a beamformer filter.
Parameters
----------
fname : path-like
The filename of the HDF5 file.
Returns
-------
filter : instance of Beamformer
The beamformer filter.
"""
read_hdf5, _ = _import_h5io_funcs()
beamformer = read_hdf5(fname, title="mnepython")
if "csd" in beamformer:
beamformer["csd"] = CrossSpectralDensity(**beamformer["csd"])
# h5io seems to cast `bool` to `int` on round-trip, probably a bug
# we should fix at some point (if possible -- could be HDF5 limitation)
for key in ("normalize_fwd", "is_free_ori", "is_ssp"):
if key in beamformer:
beamformer[key] = bool(beamformer[key])
for key in ("data_cov", "noise_cov"):
if beamformer.get(key) is not None:
for pi, p in enumerate(beamformer[key]["projs"]):
p = Projection(**p)
p["active"] = bool(p["active"])
beamformer[key]["projs"][pi] = p
beamformer[key] = Covariance(
*[
beamformer[key].get(arg)
for arg in (
"data",
"names",
"bads",
"projs",
"nfree",
"eig",
"eigvec",
"method",
"loglik",
)
]
)
return Beamformer(beamformer)
def _proj_whiten_data(M, proj, filters):
if filters.get("is_ssp", True):
# check whether data and filter projs match
_check_proj_match(proj, filters)
if filters["whitener"] is None:
M = np.dot(filters["proj"], M)
if filters["whitener"] is not None:
M = np.dot(filters["whitener"], M)
return M