/
utils.py
382 lines (290 loc) · 14.5 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
# standard modules
import logging # logging lib (terminal & file)
import os # for path management
import shutil # for copyfile
from datetime import datetime # to get current time
import sys # to stop script execution on case of error
import re # regular expressions
import json # to write json files
import time # to time execution of code
from functools import wraps # for decorator
import traceback # to get the current function name
import inspect # to get the current module name
from typing import List, Tuple # for function signature
import runpy # to run config script
# dependency modules
# local modules
from niix2bids import metadata
from niix2bids.classes import Volume
########################################################################################################################
def init_logger(write_file: bool, out_dir: str) -> None:
# create logger
log = logging.getLogger()
log.setLevel(logging.DEBUG)
# create formatter
formatter = logging.Formatter(
fmt='%(asctime)s - %(name)-55s - %(levelname)-8s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
# create console handler
consoleHandler = logging.StreamHandler() # create
consoleHandler.setLevel(logging.DEBUG) # and set level to debug
consoleHandler.setFormatter(formatter) # add formatter handlers
log.addHandler(consoleHandler) # add handlers to logger
# same thing but for a file handler
if write_file:
upperstack = inspect.stack()[1]
mdl_name = inspect.getmodule(upperstack[0]).__name__.split('.')[0] # get module name of the module calling
logfile = os.path.join(out_dir, datetime.now().strftime('%Y-%m-%d_%Hh%Sm%S') + "_" + mdl_name + ".log")
fileHandeler = logging.FileHandler(logfile)
fileHandeler.setLevel(logging.DEBUG)
fileHandeler.setFormatter(formatter)
log.addHandler(fileHandeler)
########################################################################################################################
def get_logger() -> logging.Logger:
fcn_name = traceback.extract_stack(None, 2)[0][2] # get function name of the caller
upperstack = inspect.stack()[1]
mdl_name = inspect.getmodule(upperstack[0]).__name__ # get module name of the caller
name = mdl_name + ':' + fcn_name # ex : niix2bids.utils:apply_bids_architecture
log = logging.getLogger(name)
return log
########################################################################################################################
def logit(message, level=logging.INFO):
def log_time(func):
@wraps(func) # to keep function info, such as __name__
def wrapper(*args, **kwargs):
display_name = func.__module__ + ':' + func.__name__
log = logging.getLogger(display_name)
log.log(level, message + ' # start...')
start_time = time.time()
res = func(*args, **kwargs)
stop_time = time.time()
log.log(level, message + f" # ...done in {stop_time-start_time:.3f}s")
return res
return wrapper
return log_time
########################################################################################################################
def load_config_file(config_file: str) -> list:
log = get_logger()
fpath = ''
if type(config_file) is list:
for idx, file in enumerate(config_file):
log.info(f"Trying config_file location : {file}")
if os.path.exists(file):
fpath = file
else:
log.info(f"Trying config_file location : {config_file}")
if os.path.exists(config_file):
fpath = config_file
if fpath == '':
log.critical(f"No config file found")
sys.exit(1)
script_content = runpy.run_path(fpath)
if "config" in script_content:
config = script_content['config']
log.info(f"Using config_file : {fpath}")
return config
else:
log.critical(f"Config_file incorrect (no 'config' variable inside) : {fpath}")
sys.exit(1)
########################################################################################################################
@logit('Fetch all files recursively. This might take time, it involves exploring the whole disk tree.', logging.INFO)
def fetch_all_files(in_dir: str) -> List[str]:
file_list = []
for one_dir in in_dir:
for root, dirs, files in os.walk(one_dir):
for file in files:
file_list.append(os.path.join(root, file))
if len(file_list) == 0:
log = get_logger()
log.error(f"no file found in {in_dir}")
sys.exit(1)
file_list.sort()
return file_list
########################################################################################################################
@logit('Keep only nifti files (.nii, .nii.gz).', logging.INFO)
def isolate_nii_files(in_list: List[str]) -> List[str]:
log = get_logger()
r = re.compile(r"(.*nii$)|(.*nii.gz$)$")
file_list_nii = list(filter(r.match, in_list))
log.info(f"found {len(file_list_nii)} nifti files")
if len(file_list_nii) == 0:
log.error(f"no .nii file found in {in_list}")
sys.exit(1)
return file_list_nii
########################################################################################################################
@logit('Check if .json exist for each nifti file.', logging.INFO)
def check_if_json_exists(file_list_nii: List[str]) -> Tuple[List[str], List[str]]:
log = get_logger()
file_list_json = []
for file in file_list_nii:
root, ext = os.path.splitext(file)
if ext == ".gz":
jsonfile = os.path.splitext(root)[0] + ".json"
else:
jsonfile = os.path.splitext(file)[0] + ".json"
if not os.path.exists(jsonfile):
log.warning(f"this file has no .json associated : {file}")
file_list_nii.remove(file)
else:
file_list_json.append(jsonfile)
log.info(f"remaining {len(file_list_nii)} nifti files")
return file_list_nii, file_list_json
########################################################################################################################
@logit('Creation of internal object that will store all info, 1 per nifti.', logging.DEBUG)
def create_volume_list(file_list_nii: List[str]) -> List[Volume]:
for file in file_list_nii:
Volume(file)
return Volume.instances
########################################################################################################################
@logit('Read all .json files. This step might take time, it involves reading lots of files', logging.INFO)
def read_all_json(volume_list: List[Volume]) -> None:
for volume in volume_list:
volume.load_json()
########################################################################################################################
def assemble_bids_name(vol: Volume) -> str:
bidsfields = ''
for key, value in vol.bidsfields.items():
bidsfields += '_' + key + '-' + str(value)
if len(vol.suffix) > 0:
# name = 'sub-' + vol.sub + bidsfields + '_' + vol.suffix
name = f"sub-{vol.sub}_ses-{vol.ses}{bidsfields}_{vol.suffix}"
else:
# name = 'sub-' + vol.sub + bidsfields
name = f"sub-{vol.sub}{bidsfields}"
return name
########################################################################################################################
def write_json(out_path_json: str, json_dict: str) -> None:
if not os.path.exists(out_path_json):
with open(out_path_json, 'w') as fp: # write file
json.dump(json_dict, fp, indent=4) # indent for prettiness
fp.write('\n') # for prettiness too
########################################################################################################################
def ln_or_cp_file(symlink_or_copyfile: str, in_path: str, out_path: str) -> None:
if not os.path.exists(out_path):
if symlink_or_copyfile == "symlink":
os.symlink(in_path, out_path)
elif symlink_or_copyfile == "copyfile":
shutil.copyfile(in_path, out_path)
else:
raise RuntimeError('??? coding error')
########################################################################################################################
@logit('Apply BIDS architecture. This might take time, it involves lots of disk writing.', logging.INFO)
def apply_bids_architecture(out_dir: str, volume_list: List[Volume], symlink_or_copyfile: str) -> None:
log = get_logger()
log_info = []
log_info_discard = []
log_warning = []
log_warning_unknown = []
log_error_not_interpreted = []
for vol in volume_list:
if len(vol.tag) > 0: # only process correctly parsed volumes
if vol.tag == 'DISCARD':
dir_path = os.path.join(
out_dir,
'DISCARD')
log_info_discard.append(f'{vol.reason_not_ready} : {vol.nii.path}')
elif vol.tag == 'UNKNOWN':
dir_path = os.path.join(
out_dir,
'UNKNOWN')
log_warning_unknown.append(f'{vol.reason_not_ready} : {vol.nii.path}')
else:
dir_path = os.path.join(
out_dir,
f"sub-{vol.sub}",
f"ses-{vol.ses}",
vol.tag)
# recursive directory creation, and do not raise error if already exists
os.makedirs(dir_path, exist_ok=True)
out_name = assemble_bids_name(vol)
# ----------------------------------------------------------------------------------------------------------
# nii
in_path_nii = vol.nii.path
out_path_nii = os.path.join(dir_path, out_name + vol.ext)
ln_or_cp_file(symlink_or_copyfile, in_path_nii, out_path_nii)
# ----------------------------------------------------------------------------------------------------------
# json
in_path_json = vol.json.path
out_path_json = os.path.join(dir_path, out_name + '.json')
if vol.tag == 'func':
# for func, the .json file needs to have 'TaskName' field
json_dict = vol.seqparam # copy original the json dict
del json_dict['Volume'] # remove the pointer to Volume instance
json_dict['TaskName'] = vol.bidsfields['task'] # add TaskName
write_json(out_path_json, json_dict)
elif vol.tag == 'fmap' and vol.suffix == 'phasediff':
# for fmap, the phasediff .json must contain EchoTime1 and EchoTime2
# in Siemens gre_field_mapping, EchoTime2-EchoTime1 = 2.46ms. This seems constant
json_dict = vol.seqparam
del json_dict['Volume']
json_dict['EchoTime1'] = json_dict['EchoTime'] - 0.00246
json_dict['EchoTime2'] = json_dict['EchoTime']
write_json(out_path_json, json_dict)
else:
ln_or_cp_file(symlink_or_copyfile, in_path_json, out_path_json)
# ----------------------------------------------------------------------------------------------------------
# bval
if hasattr(vol, 'bval'):
in_path_bval = vol.bval.path
out_path_bval = os.path.join(dir_path, out_name + '.bval')
ln_or_cp_file(symlink_or_copyfile, in_path_bval, out_path_bval)
# ----------------------------------------------------------------------------------------------------------
# bvec
if hasattr(vol, 'bvec'):
in_path_bvec = vol.bvec.path
out_path_bvec = os.path.join(dir_path, out_name + '.bvec')
ln_or_cp_file(symlink_or_copyfile, in_path_bvec, out_path_bvec)
elif len(vol.reason_not_ready) > 0:
log_warning.append(f'{vol.reason_not_ready} : {vol.nii.path}')
else:
log_error_not_interpreted.append(f'file not interpreted : {vol.nii.path}')
# print them all, but in order
for msg in log_error_not_interpreted:
log.error(msg)
for msg in log_warning_unknown:
log.warning(msg)
for msg in log_warning:
log.warning(msg)
# for msg in log_info_discard:
# log.info(msg)
for msg in log_info:
log.info(msg)
########################################################################################################################
@logit('Writing dataset_description.json', logging.INFO)
def write_bids_dataset_description(out_dir: str) -> None:
dataset_description = {
'Name': '',
'BIDSVersion': metadata.get_bids_version(),
'HEDVersion': '',
'DatasetType': 'raw',
'License': 'PDDL',
'Authors': [''],
'Acknowledgements': '',
'HowToAcknowledge': '',
'Funding': [''],
'EthicsApprovals': [''],
'ReferencesAndLinks': [''],
'DatasetDOI': '',
}
with open(os.path.join(out_dir, 'dataset_description.json'), 'w') as fp:
json.dump(dataset_description, fp, indent=4) # indent is for prettiness
fp.write('\n') # for prettiness too
########################################################################################################################
@logit('Writing README, CHANGES, LICENSE, .bidsignore files', logging.INFO)
def write_bids_other_files(out_dir: str) -> None:
# README
with open(os.path.join(out_dir, 'README'), 'w') as fp:
fp.write(f"GeneratedBy : niix2bids=={metadata.get_niix2bids_version()} \n")
fp.write(f"BIDSVersion : {metadata.get_bids_version()} \n")
# CHANGES
with open(os.path.join(out_dir, 'CHANGES'), 'w') as fp:
fp.write(f"1.0.0 {datetime.now().strftime('%Y-%m-%d')} \n")
fp.write(f" - Initial release \n")
# LICENSE
with open(os.path.join(out_dir, 'LICENSE'), 'w') as fp:
fp.write('PDDL \n')
# .bidsignore
with open(os.path.join(out_dir, '.bidsignore'), 'w') as fp:
fp.write('*.log \n')
fp.write('UNKNOWN \n')
fp.write('DISCARD \n')