-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
nirs.py
312 lines (262 loc) · 11.3 KB
/
nirs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# Authors: Robert Luke <mail@robertluke.net>
# Eric Larson <larson.eric.d@gmail.com>
# Alexandre Gramfort <alexandre.gramfort@inria.fr>
#
# License: BSD-3-Clause
import re
import numpy as np
from ...io.pick import _picks_to_idx, pick_types
from ...utils import fill_doc, _check_option, _validate_type
# Standardized fNIRS channel name regexs
_S_D_F_RE = re.compile(r'S(\d+)_D(\d+) (\d+\.?\d*)')
_S_D_H_RE = re.compile(r'S(\d+)_D(\d+) (\w+)')
@fill_doc
def source_detector_distances(info, picks=None):
r"""Determine the distance between NIRS source and detectors.
Parameters
----------
%(info_not_none)s
%(picks_all)s
Returns
-------
dists : array of float
Array containing distances in meters.
Of shape equal to number of channels, or shape of picks if supplied.
"""
return np.array([
np.linalg.norm(np.diff(
info['chs'][pick]['loc'][3:9].reshape(2, 3), axis=0)[0])
for pick in _picks_to_idx(info, picks, exclude=[])], float)
@fill_doc
def short_channels(info, threshold=0.01):
r"""Determine which NIRS channels are short.
Channels with a source to detector distance of less than
``threshold`` are reported as short. The default threshold is 0.01 m.
Parameters
----------
%(info_not_none)s
threshold : float
The threshold distance for what is considered short in meters.
Returns
-------
short : array of bool
Array indicating which channels are short.
Of shape equal to number of channels.
"""
return source_detector_distances(info) < threshold
def _channel_frequencies(info):
"""Return the light frequency for each channel."""
# Only valid for fNIRS data before conversion to haemoglobin
picks = _picks_to_idx(info, ['fnirs_cw_amplitude', 'fnirs_od'],
exclude=[], allow_empty=True)
freqs = list()
for pick in picks:
freqs.append(round(float(
_S_D_F_RE.match(info['ch_names'][pick]).groups()[2])))
return np.array(freqs, int)
def _channel_chromophore(info):
"""Return the chromophore of each channel."""
# Only valid for fNIRS data after conversion to haemoglobin
picks = _picks_to_idx(info, ['hbo', 'hbr'], exclude=[], allow_empty=True)
chroma = []
for ii in picks:
chroma.append(info['ch_names'][ii].split(" ")[1])
return chroma
def _check_channels_ordered(info, pair_vals, *, throw_errors=True,
check_bads=True):
"""Check channels follow expected fNIRS format.
If the channels are correctly ordered then an array of valid picks
will be returned.
If throw_errors is True then any errors in fNIRS formatting will be
thrown to inform the user. If throw_errors is False then an empty array
will be returned if the channels are not sufficiently formatted.
"""
# Every second channel should be same SD pair
# and have the specified light frequencies.
# All wavelength based fNIRS data.
picks_wave = _picks_to_idx(info, ['fnirs_cw_amplitude', 'fnirs_od'],
exclude=[], allow_empty=True)
# All chromophore fNIRS data
picks_chroma = _picks_to_idx(info, ['hbo', 'hbr'],
exclude=[], allow_empty=True)
if (len(picks_wave) > 0) & (len(picks_chroma) > 0):
picks = _throw_or_return_empty(
'MNE does not support a combination of amplitude, optical '
'density, and haemoglobin data in the same raw structure.',
throw_errors)
# All continuous wave fNIRS data
if len(picks_wave):
error_word = "frequencies"
use_RE = _S_D_F_RE
picks = picks_wave
else:
error_word = "chromophore"
use_RE = _S_D_H_RE
picks = picks_chroma
pair_vals = np.array(pair_vals)
if pair_vals.shape != (2,):
raise ValueError(
f'Exactly two {error_word} must exist in info, got '
f'{list(pair_vals)}')
# In principle we do not need to require that these be sorted --
# all we need to do is change our sorted() below to make use of a
# pair_vals.index(...) in a sort key -- but in practice we always want
# (hbo, hbr) or (lower_freq, upper_freq) pairings, both of which will
# work with a naive string sort, so let's just enforce sorted-ness here
is_str = pair_vals.dtype.kind == 'U'
pair_vals = list(pair_vals)
if is_str:
if pair_vals != ['hbo', 'hbr']:
raise ValueError(
f'The {error_word} in info must be ["hbo", "hbr"], but got '
f'{pair_vals} instead')
elif not np.array_equal(np.unique(pair_vals), pair_vals):
raise ValueError(
f'The {error_word} in info must be unique and sorted, but got '
f'got {pair_vals} instead')
if len(picks) % 2 != 0:
picks = _throw_or_return_empty(
'NIRS channels not ordered correctly. An even number of NIRS '
f'channels is required. {len(info.ch_names)} channels were'
f'provided', throw_errors)
# Ensure wavelength info exists for waveform data
all_freqs = [info["chs"][ii]["loc"][9] for ii in picks_wave]
if np.any(np.isnan(all_freqs)):
picks = _throw_or_return_empty(
f'NIRS channels is missing wavelength information in the '
f'info["chs"] structure. The encoded wavelengths are {all_freqs}.',
throw_errors)
# Validate the channel naming scheme
for pick in picks:
ch_name_info = use_RE.match(info['chs'][pick]['ch_name'])
if not bool(ch_name_info):
picks = _throw_or_return_empty(
'NIRS channels have specified naming conventions. '
'The provided channel name can not be parsed: '
f'{repr(info.ch_names[pick])}', throw_errors)
break
value = ch_name_info.groups()[2]
if len(picks_wave):
value = value
else: # picks_chroma
if value not in ["hbo", "hbr"]:
picks = _throw_or_return_empty(
"NIRS channels have specified naming conventions."
"Chromophore data must be labeled either hbo or hbr. "
f"The failing channel is {info['chs'][pick]['ch_name']}",
throw_errors)
break
# Reorder to be paired (naive sort okay here given validation above)
picks = picks[np.argsort([info['ch_names'][pick] for pick in picks])]
# Validate our paired ordering
for ii, jj in zip(picks[::2], picks[1::2]):
ch1_name = info['chs'][ii]['ch_name']
ch2_name = info['chs'][jj]['ch_name']
ch1_re = use_RE.match(ch1_name)
ch2_re = use_RE.match(ch2_name)
ch1_S, ch1_D, ch1_value = ch1_re.groups()[:3]
ch2_S, ch2_D, ch2_value = ch2_re.groups()[:3]
if len(picks_wave):
ch1_value, ch2_value = float(ch1_value), float(ch2_value)
if (ch1_S != ch2_S) or (ch1_D != ch2_D) or \
(ch1_value != pair_vals[0]) or (ch2_value != pair_vals[1]):
picks = _throw_or_return_empty(
'NIRS channels not ordered correctly. Channels must be '
'ordered as source detector pairs with alternating'
f' {error_word} {pair_vals[0]} & {pair_vals[1]}, but got '
f'S{ch1_S}_D{ch1_D} pair '
f'{repr(ch1_name)} and {repr(ch2_name)}',
throw_errors)
break
if check_bads:
for ii, jj in zip(picks[::2], picks[1::2]):
want = [info.ch_names[ii], info.ch_names[jj]]
got = list(set(info['bads']).intersection(want))
if len(got) == 1:
raise RuntimeError(
f'NIRS bad labelling is not consistent, found {got} but '
f'needed {want}')
return picks
def _throw_or_return_empty(msg, throw_errors):
if throw_errors:
raise ValueError(msg)
else:
return []
def _validate_nirs_info(info, *, throw_errors=True, fnirs=None, which=None,
check_bads=True, allow_empty=True):
"""Apply all checks to fNIRS info. Works on all continuous wave types."""
_validate_type(fnirs, (None, str), 'fnirs')
kinds = dict(
od='optical density',
cw_amplitude='continuous wave',
hb='chromophore',
)
_check_option('fnirs', fnirs, (None,) + tuple(kinds))
if fnirs is not None:
kind = kinds[fnirs]
fnirs = ['hbo', 'hbr'] if fnirs == 'hb' else f'fnirs_{fnirs}'
if not len(pick_types(info, fnirs=fnirs)):
raise RuntimeError(
f'{which} must operate on {kind} data, but none was found.')
freqs = np.unique(_channel_frequencies(info))
if freqs.size > 0:
pair_vals = freqs
else:
pair_vals = np.unique(_channel_chromophore(info))
out = _check_channels_ordered(
info, pair_vals, throw_errors=throw_errors,
check_bads=check_bads)
return out
def _fnirs_spread_bads(info):
"""Spread bad labeling across fnirs channels."""
# For an optode pair if any component (light frequency or chroma) is marked
# as bad, then they all should be. This function will find any pairs marked
# as bad and spread the bad marking to all components of the optode pair.
picks = _validate_nirs_info(info, check_bads=False)
new_bads = set(info['bads'])
for ii, jj in zip(picks[::2], picks[1::2]):
ch1_name, ch2_name = info.ch_names[ii], info.ch_names[jj]
if ch1_name in new_bads:
new_bads.add(ch2_name)
elif ch2_name in new_bads:
new_bads.add(ch1_name)
info['bads'] = sorted(new_bads)
return info
def _fnirs_optode_names(info):
"""Return list of unique optode names."""
picks_wave = _picks_to_idx(info, ['fnirs_cw_amplitude', 'fnirs_od'],
exclude=[], allow_empty=True)
picks_chroma = _picks_to_idx(info, ['hbo', 'hbr'],
exclude=[], allow_empty=True)
if len(picks_wave) > 0:
regex = _S_D_F_RE
elif len(picks_chroma) > 0:
regex = _S_D_H_RE
else:
return [], []
sources = np.unique([int(regex.match(ch).groups()[0])
for ch in info.ch_names])
detectors = np.unique([int(regex.match(ch).groups()[1])
for ch in info.ch_names])
src_names = [f"S{s}" for s in sources]
det_names = [f"D{d}" for d in detectors]
return src_names, det_names
def _optode_position(info, optode):
"""Find the position of an optode."""
idx = [optode in a for a in info.ch_names].index(True)
if "S" in optode:
loc_idx = range(3, 6)
elif "D" in optode:
loc_idx = range(6, 9)
return info["chs"][idx]["loc"][loc_idx]
def _reorder_nirx(raw):
# Maybe someday we should make this public like
# mne.preprocessing.nirs.reorder_standard(raw, order='nirx')
info = raw.info
picks = pick_types(info, fnirs=True, exclude=[])
prefixes = [info['ch_names'][pick].split()[0] for pick in picks]
nirs_names = [info['ch_names'][pick] for pick in picks]
nirs_sorted = sorted(nirs_names,
key=lambda name: (prefixes.index(name.split()[0]),
name.split(maxsplit=1)[1]))
raw.reorder_channels(nirs_sorted)