-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
_make_forward.py
789 lines (682 loc) · 30.7 KB
/
_make_forward.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
# Authors: Matti Hämäläinen <msh@nmr.mgh.harvard.edu>
# Alexandre Gramfort <alexandre.gramfort@inria.fr>
# Martin Luessi <mluessi@nmr.mgh.harvard.edu>
# Eric Larson <larson.eric.d@gmail.com>
#
# License: BSD-3-Clause
# The computations in this code were primarily derived from Matti Hämäläinen's
# C code.
from copy import deepcopy
from contextlib import contextmanager
from pathlib import Path
import os
import os.path as op
import numpy as np
from ._compute_forward import _compute_forwards
from ..io import read_info, _loc_to_coil_trans, _loc_to_eeg_loc, Info
from ..io.compensator import get_current_comp, make_compensator
from ..io.pick import _has_kit_refs, pick_types, pick_info
from ..io.constants import FIFF, FWD
from ..transforms import (_ensure_trans, transform_surface_to, apply_trans,
_get_trans, _print_coord_trans, _coord_frame_name,
Transform, invert_transform)
from ..utils import logger, verbose, warn, _pl, _validate_type, _check_fname
from ..source_space import (_ensure_src, _filter_source_spaces,
_make_discrete_source_space, _complete_vol_src)
from ..source_estimate import VolSourceEstimate
from ..surface import _normalize_vectors, _CheckInside
from ..bem import read_bem_solution, _bem_find_surface, ConductorModel
from .forward import (Forward, _merge_fwds, convert_forward_solution,
_FWD_ORDER)
_accuracy_dict = dict(normal=FWD.COIL_ACCURACY_NORMAL,
accurate=FWD.COIL_ACCURACY_ACCURATE)
_extra_coil_def_fname = None
@verbose
def _read_coil_defs(verbose=None):
"""Read a coil definition file.
Parameters
----------
%(verbose)s
Returns
-------
res : list of dict
The coils. It is a dictionary with valid keys:
'cosmag' | 'coil_class' | 'coord_frame' | 'rmag' | 'type' |
'chname' | 'accuracy'.
cosmag contains the direction of the coils and rmag contains the
position vector.
Notes
-----
The global variable "_extra_coil_def_fname" can be used to prepend
additional definitions. These are never added to the registry.
"""
coil_dir = op.join(op.split(__file__)[0], '..', 'data')
coils = list()
if _extra_coil_def_fname is not None:
coils += _read_coil_def_file(_extra_coil_def_fname, use_registry=False)
coils += _read_coil_def_file(op.join(coil_dir, 'coil_def.dat'))
return coils
# Typically we only have 1 or 2 coil def files, but they can end up being
# read a lot. Let's keep a list of them and just reuse them:
_coil_registry = {}
def _read_coil_def_file(fname, use_registry=True):
"""Read a coil def file."""
if not use_registry or fname not in _coil_registry:
big_val = 0.5
coils = list()
with open(fname, 'r') as fid:
lines = fid.readlines()
lines = lines[::-1]
while len(lines) > 0:
line = lines.pop().strip()
if line[0] == '#' and len(line) > 0:
continue
desc_start = line.find('"')
desc_end = len(line) - 1
assert line.strip()[desc_end] == '"'
desc = line[desc_start:desc_end]
vals = np.fromstring(line[:desc_start].strip(),
dtype=float, sep=' ')
assert len(vals) == 6
npts = int(vals[3])
coil = dict(coil_type=vals[1], coil_class=vals[0], desc=desc,
accuracy=vals[2], size=vals[4], base=vals[5])
# get parameters of each component
rmag = list()
cosmag = list()
w = list()
for p in range(npts):
# get next non-comment line
line = lines.pop()
while line[0] == '#':
line = lines.pop()
vals = np.fromstring(line, sep=' ')
if len(vals) != 7:
raise RuntimeError(
f'Could not interpret line {p + 1} as 7 points:\n'
f'{line}')
# Read and verify data for each integration point
w.append(vals[0])
rmag.append(vals[[1, 2, 3]])
cosmag.append(vals[[4, 5, 6]])
w = np.array(w)
rmag = np.array(rmag)
cosmag = np.array(cosmag)
size = np.sqrt(np.sum(cosmag ** 2, axis=1))
if np.any(np.sqrt(np.sum(rmag ** 2, axis=1)) > big_val):
raise RuntimeError('Unreasonable integration point')
if np.any(size <= 0):
raise RuntimeError('Unreasonable normal')
cosmag /= size[:, np.newaxis]
coil.update(dict(w=w, cosmag=cosmag, rmag=rmag))
coils.append(coil)
if use_registry:
_coil_registry[fname] = coils
if use_registry:
coils = deepcopy(_coil_registry[fname])
logger.info('%d coil definition%s read', len(coils), _pl(coils))
return coils
def _create_meg_coil(coilset, ch, acc, do_es):
"""Create a coil definition using templates, transform if necessary."""
# Also change the coordinate frame if so desired
if ch['kind'] not in [FIFF.FIFFV_MEG_CH, FIFF.FIFFV_REF_MEG_CH]:
raise RuntimeError('%s is not a MEG channel' % ch['ch_name'])
# Simple linear search from the coil definitions
for coil in coilset:
if coil['coil_type'] == (ch['coil_type'] & 0xFFFF) and \
coil['accuracy'] == acc:
break
else:
raise RuntimeError('Desired coil definition not found '
'(type = %d acc = %d)' % (ch['coil_type'], acc))
# Apply a coordinate transformation if so desired
coil_trans = _loc_to_coil_trans(ch['loc'])
# Create the result
res = dict(chname=ch['ch_name'], coil_class=coil['coil_class'],
accuracy=coil['accuracy'], base=coil['base'], size=coil['size'],
type=ch['coil_type'], w=coil['w'], desc=coil['desc'],
coord_frame=FIFF.FIFFV_COORD_DEVICE, rmag_orig=coil['rmag'],
cosmag_orig=coil['cosmag'], coil_trans_orig=coil_trans,
r0=coil_trans[:3, 3],
rmag=apply_trans(coil_trans, coil['rmag']),
cosmag=apply_trans(coil_trans, coil['cosmag'], False))
if do_es:
r0_exey = (np.dot(coil['rmag'][:, :2], coil_trans[:3, :2].T) +
coil_trans[:3, 3])
res.update(ex=coil_trans[:3, 0], ey=coil_trans[:3, 1],
ez=coil_trans[:3, 2], r0_exey=r0_exey)
return res
def _create_eeg_el(ch, t=None):
"""Create an electrode definition, transform coords if necessary."""
if ch['kind'] != FIFF.FIFFV_EEG_CH:
raise RuntimeError('%s is not an EEG channel. Cannot create an '
'electrode definition.' % ch['ch_name'])
if t is None:
t = Transform('head', 'head') # identity, no change
if t.from_str != 'head':
raise RuntimeError('Inappropriate coordinate transformation')
r0ex = _loc_to_eeg_loc(ch['loc'])
if r0ex.shape[1] == 1: # no reference
w = np.array([1.])
else: # has reference
w = np.array([1., -1.])
# Optional coordinate transformation
r0ex = apply_trans(t['trans'], r0ex.T)
# The electrode location
cosmag = r0ex.copy()
_normalize_vectors(cosmag)
res = dict(chname=ch['ch_name'], coil_class=FWD.COILC_EEG, w=w,
accuracy=_accuracy_dict['normal'], type=ch['coil_type'],
coord_frame=t['to'], rmag=r0ex, cosmag=cosmag)
return res
def _create_meg_coils(chs, acc, t=None, coilset=None, do_es=False):
"""Create a set of MEG coils in the head coordinate frame."""
acc = _accuracy_dict[acc] if isinstance(acc, str) else acc
coilset = _read_coil_defs(verbose=False) if coilset is None else coilset
coils = [_create_meg_coil(coilset, ch, acc, do_es) for ch in chs]
_transform_orig_meg_coils(coils, t, do_es=do_es)
return coils
def _transform_orig_meg_coils(coils, t, do_es=True):
"""Transform original (device) MEG coil positions."""
if t is None:
return
for coil in coils:
coil_trans = np.dot(t['trans'], coil['coil_trans_orig'])
coil.update(
coord_frame=t['to'], r0=coil_trans[:3, 3],
rmag=apply_trans(coil_trans, coil['rmag_orig']),
cosmag=apply_trans(coil_trans, coil['cosmag_orig'], False))
if do_es:
r0_exey = (np.dot(coil['rmag_orig'][:, :2],
coil_trans[:3, :2].T) + coil_trans[:3, 3])
coil.update(ex=coil_trans[:3, 0], ey=coil_trans[:3, 1],
ez=coil_trans[:3, 2], r0_exey=r0_exey)
def _create_eeg_els(chs):
"""Create a set of EEG electrodes in the head coordinate frame."""
return [_create_eeg_el(ch) for ch in chs]
@verbose
def _setup_bem(bem, bem_extra, neeg, mri_head_t, allow_none=False,
verbose=None):
"""Set up a BEM for forward computation, making a copy and modifying."""
if allow_none and bem is None:
return None
logger.info('')
_validate_type(bem, ('path-like', ConductorModel), bem)
if not isinstance(bem, ConductorModel):
logger.info('Setting up the BEM model using %s...\n' % bem_extra)
bem = read_bem_solution(bem)
else:
bem = bem.copy()
if bem['is_sphere']:
logger.info('Using the sphere model.\n')
if len(bem['layers']) == 0 and neeg > 0:
raise RuntimeError('Spherical model has zero shells, cannot use '
'with EEG data')
if bem['coord_frame'] != FIFF.FIFFV_COORD_HEAD:
raise RuntimeError('Spherical model is not in head coordinates')
else:
if bem['surfs'][0]['coord_frame'] != FIFF.FIFFV_COORD_MRI:
raise RuntimeError(
'BEM is in %s coordinates, should be in MRI'
% (_coord_frame_name(bem['surfs'][0]['coord_frame']),))
if neeg > 0 and len(bem['surfs']) == 1:
raise RuntimeError('Cannot use a homogeneous (1-layer BEM) model '
'for EEG forward calculations, consider '
'using a 3-layer BEM instead')
logger.info('Employing the head->MRI coordinate transform with the '
'BEM model.')
# fwd_bem_set_head_mri_t: Set the coordinate transformation
bem['head_mri_t'] = _ensure_trans(mri_head_t, 'head', 'mri')
logger.info('BEM model %s is now set up' % op.split(bem_extra)[1])
logger.info('')
return bem
@verbose
def _prep_meg_channels(info, accuracy='accurate', exclude=(), *,
ignore_ref=False, head_frame=True, do_es=False,
verbose=None):
"""Prepare MEG coil definitions for forward calculation."""
# Find MEG channels
ref_meg = True if not ignore_ref else False
picks = pick_types(info, meg=True, ref_meg=ref_meg, exclude=exclude)
# Make sure MEG coils exist
if len(picks) <= 0:
raise RuntimeError('Could not find any MEG channels')
info_meg = pick_info(info, picks)
del picks
# Get channel info and names for MEG channels
logger.info(f'Read {len(info_meg["chs"])} MEG channels from info')
# Get MEG compensation channels
compensator = post_picks = None
ch_names = info_meg['ch_names']
if not ignore_ref:
ref_picks = pick_types(info, meg=False, ref_meg=True, exclude=exclude)
ncomp = len(ref_picks)
if (ncomp > 0):
logger.info(f'Read {ncomp} MEG compensation channels from info')
# We need to check to make sure these are NOT KIT refs
if _has_kit_refs(info, ref_picks):
raise NotImplementedError(
'Cannot create forward solution with KIT reference '
'channels. Consider using "ignore_ref=True" in '
'calculation')
logger.info(
f'{len(info["comps"])} compensation data sets in info')
# Compose a compensation data set if necessary
# adapted from mne_make_ctf_comp() from mne_ctf_comp.c
logger.info('Setting up compensation data...')
comp_num = get_current_comp(info)
if comp_num is None or comp_num == 0:
logger.info(' No compensation set. Nothing more to do.')
else:
compensator = make_compensator(
info_meg, 0, comp_num, exclude_comp_chs=False)
logger.info(
f' Desired compensation data ({comp_num}) found.')
logger.info(' All compensation channels found.')
logger.info(' Preselector created.')
logger.info(' Compensation data matrix created.')
logger.info(' Postselector created.')
post_picks = pick_types(
info_meg, meg=True, ref_meg=False, exclude=exclude)
ch_names = [ch_names[pick] for pick in post_picks]
# Create coil descriptions with transformation to head or device frame
templates = _read_coil_defs()
if head_frame:
_print_coord_trans(info['dev_head_t'])
transform = info['dev_head_t']
else:
transform = None
megcoils = _create_meg_coils(
info_meg['chs'], accuracy, transform, templates, do_es=do_es)
# Check that coordinate frame is correct and log it
if head_frame:
assert megcoils[0]['coord_frame'] == FIFF.FIFFV_COORD_HEAD
logger.info('MEG coil definitions created in head coordinates.')
else:
assert megcoils[0]['coord_frame'] == FIFF.FIFFV_COORD_DEVICE
logger.info('MEG coil definitions created in device coordinate.')
return dict(
defs=megcoils, ch_names=ch_names, compensator=compensator,
info=info_meg, post_picks=post_picks)
@verbose
def _prep_eeg_channels(info, exclude=(), verbose=None):
"""Prepare EEG electrode definitions for forward calculation."""
info_extra = 'info'
# Find EEG electrodes
picks = pick_types(info, meg=False, eeg=True, ref_meg=False,
exclude=exclude)
# Make sure EEG electrodes exist
neeg = len(picks)
if neeg <= 0:
raise RuntimeError('Could not find any EEG channels')
# Get channel info and names for EEG channels
eegchs = pick_info(info, picks)['chs']
eegnames = [info['ch_names'][p] for p in picks]
logger.info('Read %3d EEG channels from %s' % (len(picks), info_extra))
# Create EEG electrode descriptions
eegels = _create_eeg_els(eegchs)
logger.info('Head coordinate coil definitions created.')
return dict(defs=eegels, ch_names=eegnames)
@verbose
def _prepare_for_forward(src, mri_head_t, info, bem, mindist, n_jobs,
bem_extra='', trans='', info_extra='',
meg=True, eeg=True, ignore_ref=False,
allow_bem_none=False, verbose=None):
"""Prepare for forward computation.
The sensors dict contains keys for each sensor type, e.g. 'meg', 'eeg'.
The vale for each of these is a dict that comes from _prep_meg_channels or
_prep_eeg_channels. Each dict contains:
- defs : a list of dicts (one per channel) with 'rmag', 'cosmag', etc.
- ch_names: a list of str channel names corresponding to the defs
- compensator (optional): the ndarray compensation matrix to apply
- post_picks (optional): the ndarray of indices to pick after applying the
compensator
"""
# Read the source locations
logger.info('')
# let's make a copy in case we modify something
src = _ensure_src(src).copy()
nsource = sum(s['nuse'] for s in src)
if nsource == 0:
raise RuntimeError('No sources are active in these source spaces. '
'"do_all" option should be used.')
logger.info('Read %d source spaces a total of %d active source locations'
% (len(src), nsource))
# Delete some keys to clean up the source space:
for key in ['working_dir', 'command_line']:
if key in src.info:
del src.info[key]
# Read the MRI -> head coordinate transformation
logger.info('')
_print_coord_trans(mri_head_t)
# make a new dict with the relevant information
arg_list = [info_extra, trans, src, bem_extra, meg, eeg, mindist,
n_jobs, verbose]
cmd = 'make_forward_solution(%s)' % (', '.join([str(a) for a in arg_list]))
mri_id = dict(machid=np.zeros(2, np.int32), version=0, secs=0, usecs=0)
info_trans = str(trans) if isinstance(trans, Path) else trans
info = Info(chs=info['chs'], comps=info['comps'],
dev_head_t=info['dev_head_t'], mri_file=info_trans,
mri_id=mri_id,
meas_file=info_extra, meas_id=None, working_dir=os.getcwd(),
command_line=cmd, bads=info['bads'], mri_head_t=mri_head_t)
info._update_redundant()
info._check_consistency()
logger.info('')
sensors = dict()
if meg and len(pick_types(info, meg=True, ref_meg=False, exclude=[])) > 0:
sensors['meg'] = _prep_meg_channels(info, ignore_ref=ignore_ref)
if eeg and len(pick_types(info, eeg=True, exclude=[])) > 0:
sensors['eeg'] = _prep_eeg_channels(info)
# Check that some channels were found
if len(sensors) == 0:
raise RuntimeError('No MEG or EEG channels found.')
# pick out final info
info = pick_info(info, pick_types(info, meg=meg, eeg=eeg, ref_meg=False,
exclude=[]))
# Transform the source spaces into the appropriate coordinates
# (will either be HEAD or MRI)
for s in src:
transform_surface_to(s, 'head', mri_head_t)
logger.info('Source spaces are now in %s coordinates.'
% _coord_frame_name(s['coord_frame']))
# Prepare the BEM model
eegnames = sensors.get('eeg', dict()).get('ch_names', [])
bem = _setup_bem(bem, bem_extra, len(eegnames), mri_head_t,
allow_none=allow_bem_none)
del eegnames
# Circumvent numerical problems by excluding points too close to the skull,
# and check that sensors are not inside any BEM surface
if bem is not None:
if not bem['is_sphere']:
check_surface = 'inner skull surface'
inner_skull = _bem_find_surface(bem, 'inner_skull')
check_inside = _filter_source_spaces(
inner_skull, mindist, mri_head_t, src, n_jobs)
logger.info('')
if len(bem['surfs']) == 3:
check_surface = 'scalp surface'
check_inside = _CheckInside(_bem_find_surface(bem, 'head'))
else:
check_surface = 'outermost sphere shell'
if len(bem['layers']) == 0:
def check_inside(x):
return np.zeros(len(x), bool)
else:
def check_inside(x):
return (np.linalg.norm(x - bem['r0'], axis=1) <
bem['layers'][-1]['rad'])
if 'meg' in sensors:
meg_loc = apply_trans(
invert_transform(mri_head_t),
np.array([coil['r0'] for coil in sensors['meg']['defs']]))
n_inside = check_inside(meg_loc).sum()
if n_inside:
raise RuntimeError(
f'Found {n_inside} MEG sensor{_pl(n_inside)} inside the '
f'{check_surface}, perhaps coordinate frames and/or '
'coregistration must be incorrect')
rr = np.concatenate([s['rr'][s['vertno']] for s in src])
if len(rr) < 1:
raise RuntimeError('No points left in source space after excluding '
'points close to inner skull.')
# deal with free orientations:
source_nn = np.tile(np.eye(3), (len(rr), 1))
update_kwargs = dict(nchan=len(info['ch_names']), nsource=len(rr),
info=info, src=src, source_nn=source_nn,
source_rr=rr, surf_ori=False, mri_head_t=mri_head_t)
return sensors, rr, info, update_kwargs, bem
@verbose
def make_forward_solution(info, trans, src, bem, meg=True, eeg=True, *,
mindist=0.0, ignore_ref=False, n_jobs=None,
verbose=None):
"""Calculate a forward solution for a subject.
Parameters
----------
%(info_str)s
%(trans)s
.. versionchanged:: 0.19
Support for 'fsaverage' argument.
src : path-like | instance of SourceSpaces
If string, should be a source space filename. Can also be an
instance of loaded or generated SourceSpaces.
bem : path-like | dict
Filename of the BEM (e.g., "sample-5120-5120-5120-bem-sol.fif") to
use, or a loaded sphere model (dict).
meg : bool
If True (Default), include MEG computations.
eeg : bool
If True (Default), include EEG computations.
mindist : float
Minimum distance of sources from inner skull surface (in mm).
ignore_ref : bool
If True, do not include reference channels in compensation. This
option should be True for KIT files, since forward computation
with reference channels is not currently supported.
%(n_jobs)s
%(verbose)s
Returns
-------
fwd : instance of Forward
The forward solution.
See Also
--------
convert_forward_solution
Notes
-----
The ``--grad`` option from MNE-C (to compute gradients) is not implemented
here.
To create a fixed-orientation forward solution, use this function
followed by :func:`mne.convert_forward_solution`.
.. note::
If the BEM solution was computed with :doc:`OpenMEEG <openmeeg:index>`
in :func:`mne.make_bem_solution`, then OpenMEEG will automatically
be used to compute the forward solution.
.. versionchanged:: 1.2
Added support for OpenMEEG-based forward solution calculations.
"""
# Currently not (sup)ported:
# 1. --grad option (gradients of the field, not used much)
# 2. --fixed option (can be computed post-hoc)
# 3. --mricoord option (probably not necessary)
# read the transformation from MRI to HEAD coordinates
# (could also be HEAD to MRI)
mri_head_t, trans = _get_trans(trans)
if isinstance(bem, ConductorModel):
bem_extra = 'instance of ConductorModel'
else:
bem_extra = bem
_validate_type(info, ('path-like', Info), 'info')
if not isinstance(info, Info):
info_extra = op.split(info)[1]
info = _check_fname(info, must_exist=True, overwrite='read',
name='info')
info = read_info(info, verbose=False)
else:
info_extra = 'instance of Info'
# Report the setup
logger.info('Source space : %s' % src)
logger.info('MRI -> head transform : %s' % trans)
logger.info('Measurement data : %s' % info_extra)
if isinstance(bem, ConductorModel) and bem['is_sphere']:
logger.info('Sphere model : origin at %s mm'
% (bem['r0'],))
logger.info('Standard field computations')
else:
logger.info('Conductor model : %s' % bem_extra)
logger.info('Accurate field computations')
logger.info('Do computations in %s coordinates',
_coord_frame_name(FIFF.FIFFV_COORD_HEAD))
logger.info('Free source orientations')
# Create MEG coils and EEG electrodes in the head coordinate frame
sensors, rr, info, update_kwargs, bem = _prepare_for_forward(
src, mri_head_t, info, bem, mindist, n_jobs, bem_extra, trans,
info_extra, meg, eeg, ignore_ref)
del (src, mri_head_t, trans, info_extra, bem_extra, mindist,
meg, eeg, ignore_ref)
# Time to do the heavy lifting: MEG first, then EEG
fwds = _compute_forwards(rr, bem=bem, sensors=sensors, n_jobs=n_jobs)
# merge forwards
fwds = {key: _to_forward_dict(fwds[key], sensors[key]['ch_names'])
for key in _FWD_ORDER if key in fwds}
fwd = _merge_fwds(fwds, verbose=False)
del fwds
logger.info('')
# Don't transform the source spaces back into MRI coordinates (which is
# done in the C code) because mne-python assumes forward solution source
# spaces are in head coords.
fwd.update(**update_kwargs)
logger.info('Finished.')
return fwd
@verbose
def make_forward_dipole(dipole, bem, info, trans=None, n_jobs=None, *,
verbose=None):
"""Convert dipole object to source estimate and calculate forward operator.
The instance of Dipole is converted to a discrete source space,
which is then combined with a BEM or a sphere model and
the sensor information in info to form a forward operator.
The source estimate object (with the forward operator) can be projected to
sensor-space using :func:`mne.simulation.simulate_evoked`.
.. note:: If the (unique) time points of the dipole object are unevenly
spaced, the first output will be a list of single-timepoint
source estimates.
Parameters
----------
%(dipole)s
bem : str | dict
The BEM filename (str) or a loaded sphere model (dict).
info : instance of Info
The measurement information dictionary. It is sensor-information etc.,
e.g., from a real data file.
trans : str | None
The head<->MRI transform filename. Must be provided unless BEM
is a sphere model.
%(n_jobs)s
%(verbose)s
Returns
-------
fwd : instance of Forward
The forward solution corresponding to the source estimate(s).
stc : instance of VolSourceEstimate | list of VolSourceEstimate
The dipoles converted to a discrete set of points and associated
time courses. If the time points of the dipole are unevenly spaced,
a list of single-timepoint source estimates are returned.
See Also
--------
mne.simulation.simulate_evoked
Notes
-----
.. versionadded:: 0.12.0
"""
if isinstance(dipole, list):
from ..dipole import _concatenate_dipoles # To avoid circular import
dipole = _concatenate_dipoles(dipole)
# Make copies to avoid mangling original dipole
times = dipole.times.copy()
pos = dipole.pos.copy()
amplitude = dipole.amplitude.copy()
ori = dipole.ori.copy()
# Convert positions to discrete source space (allows duplicate rr & nn)
# NB information about dipole orientation enters here, then no more
sources = dict(rr=pos, nn=ori)
# Dipole objects must be in the head frame
src = _complete_vol_src(
[_make_discrete_source_space(sources, coord_frame='head')])
# Forward operator created for channels in info (use pick_info to restrict)
# Use defaults for most params, including min_dist
fwd = make_forward_solution(info, trans, src, bem, n_jobs=n_jobs,
verbose=verbose)
# Convert from free orientations to fixed (in-place)
convert_forward_solution(fwd, surf_ori=False, force_fixed=True,
copy=False, use_cps=False, verbose=None)
# Check for omissions due to proximity to inner skull in
# make_forward_solution, which will result in an exception
if fwd['src'][0]['nuse'] != len(pos):
inuse = fwd['src'][0]['inuse'].astype(bool)
head = ('The following dipoles are outside the inner skull boundary')
msg = len(head) * '#' + '\n' + head + '\n'
for (t, pos) in zip(times[np.logical_not(inuse)],
pos[np.logical_not(inuse)]):
msg += ' t={:.0f} ms, pos=({:.0f}, {:.0f}, {:.0f}) mm\n'.\
format(t * 1000., pos[0] * 1000.,
pos[1] * 1000., pos[2] * 1000.)
msg += len(head) * '#'
logger.error(msg)
raise ValueError('One or more dipoles outside the inner skull.')
# multiple dipoles (rr and nn) per time instant allowed
# uneven sampling in time returns list
timepoints = np.unique(times)
if len(timepoints) > 1:
tdiff = np.diff(timepoints)
if not np.allclose(tdiff, tdiff[0]):
warn('Unique time points of dipoles unevenly spaced: returned '
'stc will be a list, one for each time point.')
tstep = -1.0
else:
tstep = tdiff[0]
elif len(timepoints) == 1:
tstep = 0.001
# Build the data matrix, essentially a block-diagonal with
# n_rows: number of dipoles in total (dipole.amplitudes)
# n_cols: number of unique time points in dipole.times
# amplitude with identical value of times go together in one col (others=0)
data = np.zeros((len(amplitude), len(timepoints))) # (n_d, n_t)
row = 0
for tpind, tp in enumerate(timepoints):
amp = amplitude[np.in1d(times, tp)]
data[row:row + len(amp), tpind] = amp
row += len(amp)
if tstep > 0:
stc = VolSourceEstimate(data, vertices=[fwd['src'][0]['vertno']],
tmin=timepoints[0],
tstep=tstep, subject=None)
else: # Must return a list of stc, one for each time point
stc = []
for col, tp in enumerate(timepoints):
stc += [VolSourceEstimate(data[:, col][:, np.newaxis],
vertices=[fwd['src'][0]['vertno']],
tmin=tp, tstep=0.001, subject=None)]
return fwd, stc
def _to_forward_dict(fwd, names, fwd_grad=None,
coord_frame=FIFF.FIFFV_COORD_HEAD,
source_ori=FIFF.FIFFV_MNE_FREE_ORI):
"""Convert forward solution matrices to dicts."""
assert names is not None
sol = dict(data=fwd.T, nrow=fwd.shape[1], ncol=fwd.shape[0],
row_names=names, col_names=[])
fwd = Forward(sol=sol, source_ori=source_ori, nsource=sol['ncol'],
coord_frame=coord_frame, sol_grad=None,
nchan=sol['nrow'], _orig_source_ori=source_ori,
_orig_sol=sol['data'].copy(), _orig_sol_grad=None)
if fwd_grad is not None:
sol_grad = dict(data=fwd_grad.T, nrow=fwd_grad.shape[1],
ncol=fwd_grad.shape[0], row_names=names,
col_names=[])
fwd.update(dict(sol_grad=sol_grad),
_orig_sol_grad=sol_grad['data'].copy())
return fwd
@contextmanager
def use_coil_def(fname):
"""Use a custom coil definition file.
Parameters
----------
fname : str
The filename of the coil definition file.
Returns
-------
context : contextmanager
The context for using the coil definition.
Notes
-----
This is meant to be used a context manager such as:
>>> with use_coil_def(my_fname): # doctest:+SKIP
... make_forward_solution(...)
This allows using custom coil definitions with functions that require
forward modeling.
"""
global _extra_coil_def_fname
_extra_coil_def_fname = fname
try:
yield
finally:
_extra_coil_def_fname = None