-
-
Notifications
You must be signed in to change notification settings - Fork 38
/
configuration.py
420 lines (356 loc) · 13.8 KB
/
configuration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
import re
import os
import warnings
import yaml
from itertools import repeat
from warnings import warn
SPECIAL_REPLACEMENT_STRINGS = {r'${resolution_for_anat}',
r'${func_resolution}'}
# Find default config
# in-container location
DEFAULT_PIPELINE_FILE = '/cpac_resources/default_pipeline.yml'
if not os.path.exists(DEFAULT_PIPELINE_FILE):
CPAC_DIRECTORY = os.path.abspath(os.path.join(
__file__,
*repeat(os.path.pardir, 3)))
# package location
DEFAULT_PIPELINE_FILE = os.path.join(
CPAC_DIRECTORY,
'CPAC/resources/configs/default_pipeline.yml')
# source code (developer) location
if not os.path.exists(DEFAULT_PIPELINE_FILE):
DEFAULT_PIPELINE_FILE = os.path.join(
CPAC_DIRECTORY,
'dev/docker_data/default_pipeline.yml')
del CPAC_DIRECTORY
with open(DEFAULT_PIPELINE_FILE, 'r') as dp_fp:
default_config = yaml.safe_load(dp_fp)
class ConfigurationDictUpdateConflation(SyntaxError):
def __init__(self):
self.msg = (
'`Configuration().update` requires a key and a value. '
'Perhaps you meant `Configuration().dict().update`?')
class Configuration(object):
"""Class to set dictionary keys as map attributes.
If the given dictionary includes the key `FROM`, that key's value
will form the base of the Configuration object with the values in
the given dictionary overriding matching keys in the base at any
depth. If no `FROM` key is included, the base Configuration is
the default Configuration.
`FROM` accepts either the name of a preconfigured pipleine or a
path to a YAML file.
Given a Configuration `c`, and a list or tuple of an attribute name
and nested keys `keys = ['attribute', 'key0', 'key1']` or
`keys = ('attribute', 'key0', 'key1')`, the value 'value' nested in
c.attribute = {'key0': {'key1': 'value'}}
can be accessed (get and set) in any of the following ways (and
more):
c.attribute['key0']['key1']
c['attribute']['key0']['key1']
c['attribute', 'key0', 'key1']
c[keys]
Examples
--------
>>> c = Configuration({})
>>> c['pipeline_setup', 'pipeline_name']
'cpac-default-pipeline'
>>> c = Configuration({'pipeline_setup': {
... 'pipeline_name': 'example_pipeline'}})
>>> c['pipeline_setup', 'pipeline_name']
'example_pipeline'
>>> c['pipeline_setup', 'pipeline_name'] = 'new_pipeline2'
>>> c['pipeline_setup', 'pipeline_name']
'new_pipeline2'
"""
def __init__(self, config_map=None):
from CPAC.pipeline.schema import schema
from CPAC.utils.utils import load_preconfig, lookup_nested_value, \
update_nested_dict
from optparse import OptionError
if config_map is None:
config_map = {}
base_config = config_map.get('FROM', 'default_pipeline')
# import another config (specified with 'FROM' key)
if base_config not in ['default', 'default_pipeline']:
try:
base_config = load_preconfig(base_config)
except OptionError:
base_config = base_config
from_config = yaml.safe_load(open(base_config, 'r'))
config_map = update_nested_dict(
Configuration(from_config).dict(), config_map)
# base everything on default pipeline
config_map = _enforce_forkability(
update_nested_dict(default_config, config_map))
config_map = self._nonestr_to_None(config_map)
try:
regressors = lookup_nested_value(
config_map,
['nuisance_corrections', '2-nuisance_regression', 'Regressors']
)
except KeyError:
regressors = []
if isinstance(regressors, list):
for i, regressor in enumerate(regressors):
# set Regressor 'Name's if not provided
if 'Name' not in regressor:
regressor['Name'] = f'Regressor-{str(i + 1)}'
# replace spaces with hyphens in Regressor 'Name's
regressor['Name'] = regressor['Name'].replace(' ', '-')
# Don't double-run FreeSurfer
try:
if 'FreeSurfer-ABCD' in config_map['anatomical_preproc'][
'brain_extraction']['using']:
config_map['surface_analysis']['freesurfer']['run'] = False
except TypeError:
pass
config_map = schema(config_map)
# remove 'FROM' before setting attributes now that it's imported
if 'FROM' in config_map:
del config_map['FROM']
# set FSLDIR to the environment $FSLDIR if the user sets it to
# 'FSLDIR' in the pipeline config file
_FSLDIR = config_map.get('FSLDIR')
if _FSLDIR and bool(re.match(r'^[\$\{]{0,2}?FSLDIR[\}]?$', _FSLDIR)):
config_map['FSLDIR'] = os.environ['FSLDIR']
for key in config_map:
# set attribute
setattr(self, key, set_from_ENV(config_map[key]))
self.__update_attr()
def __str__(self):
return 'C-PAC Configuration'
def __repr__(self):
# show Configuration as a dict when accessed directly
return self.__str__()
def __copy__(self):
newone = type(self)({})
newone.__dict__.update(self.__dict__)
newone.__update_attr()
return newone
def __getitem__(self, key):
if isinstance(key, str):
return getattr(self, key)
elif isinstance(key, tuple) or isinstance(key, list):
return self.get_nested(self, key)
else:
self.key_type_error(key)
def __setitem__(self, key, value):
if isinstance(key, str):
setattr(self, key, value)
elif isinstance(key, tuple) or isinstance(key, list):
self.set_nested(self, key, value)
else:
self.key_type_error(key)
def dict(self):
'''Show contents of a C-PAC configuration as a dict'''
return {k: self[k] for k in self.__dict__ if not callable(
self.__dict__[k])}
def _nonestr_to_None(self, d):
'''Recursive method to type convert 'None' to None in nested
config
Parameters
----------
d : any
config item to check
Returns
-------
d : any
same item, same type, but with 'none' strings converted to
Nonetypes
'''
if isinstance(d, str) and d.lower() == 'none':
return None
elif isinstance(d, list):
return [self._nonestr_to_None(i) for i in d]
elif isinstance(d, set):
return {self._nonestr_to_None(i) for i in d}
elif isinstance(d, dict):
return {i: self._nonestr_to_None(d[i]) for i in d}
else:
return d
def return_config_elements(self):
# this returns a list of tuples
# each tuple contains the name of the element in the yaml config file
# and its value
attributes = [
(attr, getattr(self, attr))
for attr in dir(self)
if not callable(attr) and not attr.startswith("__")
]
return attributes
def sub_pattern(self, pattern, orig_key):
return orig_key.replace(pattern, self[pattern[2:-1].split('.')])
def check_pattern(self, orig_key, tags=None):
if tags is None:
tags = []
if isinstance(orig_key, dict):
return {k: self.check_pattern(orig_key[k], tags) for k in orig_key}
if isinstance(orig_key, list):
return [self.check_pattern(item) for item in orig_key]
if not isinstance(orig_key, str):
return orig_key
template_pattern = r'\${.*}'
r = re.finditer(template_pattern, orig_key)
for i in r:
pattern = i.group(0)
if (
isinstance(pattern, str) and len(pattern) and
pattern not in tags
):
try:
orig_key = self.sub_pattern(pattern, orig_key)
except AttributeError as ae:
if pattern not in SPECIAL_REPLACEMENT_STRINGS:
warn(str(ae), category=SyntaxWarning)
return orig_key
# method to find any pattern ($) in the configuration
# and update the attributes with its pattern value
def __update_attr(self):
def check_path(key):
if type(key) is str and '/' in key:
if not os.path.exists(key):
warnings.warn(
"Invalid path- %s. Please check your configuration "
"file" % key)
attributes = [(attr, getattr(self, attr)) for attr in dir(self)
if not callable(attr) and not attr.startswith("__")]
template_list = ['template_brain_only_for_anat',
'template_skull_for_anat',
'ref_mask',
'template_brain_only_for_func',
'template_skull_for_func',
'template_symmetric_brain_only',
'template_symmetric_skull',
'dilated_symmetric_brain_mask']
for attr_key, attr_value in attributes:
if attr_key in template_list:
new_key = self.check_pattern(attr_value, 'FSLDIR')
else:
new_key = self.check_pattern(attr_value)
setattr(self, attr_key, new_key)
def update(self, key, val=ConfigurationDictUpdateConflation):
if isinstance(key, dict):
raise ConfigurationDictUpdateConflation
setattr(self, key, val)
def get_nested(self, d, keys):
if isinstance(keys, str):
return d[keys]
elif isinstance(keys, tuple) or isinstance(keys, list):
if len(keys) > 1:
return self.get_nested(d[keys[0]], keys[1:])
else:
return d[keys[0]]
def set_nested(self, d, keys, value):
if isinstance(keys, str):
d[keys] = value
elif isinstance(keys, tuple) or isinstance(keys, list):
if len(keys) > 1:
d[keys[0]] = self.set_nested(d[keys[0]], keys[1:], value)
else:
d[keys[0]] = value
return d
def key_type_error(self, key):
raise KeyError(' '.join([
'Configuration key must be a string, list, or tuple;',
type(key).__name__,
f'`{str(key)}`',
'was given.'
]))
def collect_key_list(config_dict):
'''Function to return a list of lists of keys for a nested dictionary
Parameters
----------
config_dict : dict
Returns
-------
key_list : list
Examples
--------
>>> collect_key_list({'test': {'nested': 1, 'dict': 2}})
[['test', 'nested'], ['test', 'dict']]
'''
key_list = []
for key in config_dict:
if isinstance(config_dict[key], dict):
for inner_key_list in collect_key_list(config_dict[key]):
key_list.append([key, *inner_key_list])
else:
key_list.append([key])
return key_list
def _enforce_forkability(config_dict):
'''Function to set forkable booleans as lists of booleans.
Parameters
----------
config_dict : dict
Returns
-------
config_dict : dict
Examples
--------
>>> c = Configuration().dict()
>>> c['functional_preproc']['despiking']['run']
[False]
>>> c['functional_preproc']['despiking']['run'] = True
>>> c['functional_preproc']['despiking']['run']
True
>>> _enforce_forkability(c)['functional_preproc']['despiking']['run']
[True]
'''
from CPAC.pipeline.schema import schema
from CPAC.utils.utils import lookup_nested_value, set_nested_value
key_list_list = collect_key_list(config_dict)
for key_list in key_list_list:
try:
schema_check = lookup_nested_value(schema.schema, key_list)
except KeyError:
continue
if hasattr(schema_check, 'validators'):
schema_check = schema_check.validators
if bool in schema_check and [bool] in schema_check:
try:
value = lookup_nested_value(config_dict, key_list)
except KeyError:
continue
if isinstance(value, bool):
config_dict = set_nested_value(
config_dict, key_list, [value])
return config_dict
def set_from_ENV(conf):
'''Function to replace strings like $VAR and ${VAR} with
environment variable values
Parameters
----------
conf : any
Returns
-------
conf : any
Examples
--------
>>> import os
>>> os.environ['SAMPLE_VALUE_SFE'] = '/example/path'
>>> set_from_ENV({'key': {'nested_list': [
... 1, '1', '$SAMPLE_VALUE_SFE/extended']}})
{'key': {'nested_list': [1, '1', '/example/path/extended']}}
>>> set_from_ENV(['${SAMPLE_VALUE_SFE}', 'SAMPLE_VALUE_SFE'])
['/example/path', 'SAMPLE_VALUE_SFE']
>>> del os.environ['SAMPLE_VALUE_SFE']
'''
if isinstance(conf, list):
return [set_from_ENV(item) for item in conf]
if isinstance(conf, dict):
return {key: set_from_ENV(conf[key]) for key in conf}
if isinstance(conf, str):
# set any specified environment variables
# (only matching all-caps plus `-` and `_`)
# like `${VAR}`
_pattern1 = r'\${[A-Z\-_]*}'
# like `$VAR`
_pattern2 = r'\$[A-Z\-_]*(?=/|$)'
# replace with environment variables if they exist
for _pattern in [_pattern1, _pattern2]:
_match = re.search(_pattern, conf)
if _match:
_match = _match.group().lstrip('${').rstrip('}')
conf = re.sub(
_pattern, os.environ.get(_match, f'${_match}'), conf)
return conf