-
Notifications
You must be signed in to change notification settings - Fork 6
/
utils.py
432 lines (358 loc) · 16.1 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
""" Utilities for working with COMBINE/OMEX archives
:Author: Jonathan Karr <karr@mssm.edu>
:Date: 2020-12-06
:Copyright: 2020, Center for Reproducible Biomedical Modeling
:License: MIT
"""
from ..combine.data_model import CombineArchive # noqa: F401
from ..combine.utils import get_sedml_contents
from ..config import Config # noqa: F401
from ..sedml.data_model import SedDocument, Task, Output, Report, Plot2D, Plot3D, DataSet, Curve, Surface
from ..sedml.io import SedmlSimulationReader
from ..warnings import warn
from .data_model import (Status, CombineArchiveLog, SedDocumentLog, # noqa: F401
TaskLog, OutputLog, ReportLog, Plot2DLog, Plot3DLog,
StandardOutputErrorCapturerLevel)
from .warnings import StandardOutputNotLoggedWarning
try:
import capturer
except ModuleNotFoundError:
capturer = None
import contextlib
import io # noqa: F401
import os
import sys
__all__ = [
'init_combine_archive_log',
'init_sed_document_log',
'init_task_log',
'init_output_log',
'init_report_log',
'init_plot2d_log',
'init_plot3d_log',
'StandardOutputErrorCapturer',
'get_summary_combine_archive_log',
]
def init_combine_archive_log(archive, archive_dir,
supported_features=(SedDocument, Task, Report, Plot2D, Plot3D, DataSet, Curve, Surface),
logged_features=(SedDocument, Task, Report, Plot2D, Plot3D, DataSet, Curve, Surface),
config=None):
""" Initialize a log of a COMBINE/OMEX archive
Args:
archive (:obj:`CombineArchive`): COMBINE/OMEX archive
archive_dir (:obj:`str`): path where the content of the archive is located
supported_features (:obj:`list` of :obj:`type`, optional): list of supported elements.
Default: COMBINE/OMEX archives and SED documents, tasks, reports, plots,
data sets, curves, and surfaces.
logged_features (:obj:`list` of :obj:`type`, optional): list of elements which
will be logged. Default: COMBINE/OMEX archives and SED documents, tasks, reports, plots,
data sets, curves, and surfaces.
config (:obj:`Config`, optional): whether to fail on missing includes
Returns:
:obj:`CombineArchiveLog`: initialized log of a COMBINE/OMEX archive
"""
contents = get_sedml_contents(archive)
log = CombineArchiveLog(status=Status.QUEUED)
if SedDocument in logged_features:
log.sed_documents = {}
for content in contents:
content_filename = os.path.join(archive_dir, content.location)
doc = SedmlSimulationReader().run(content_filename, validate_semantics=False, validate_models_with_languages=False,
config=config)
doc_log = init_sed_document_log(doc, supported_features=supported_features, logged_features=logged_features)
doc_log.location = os.path.relpath(content.location, '.')
doc_log.status = Status.QUEUED if isinstance(doc, supported_features) else Status.SKIPPED
doc_log.parent = log
doc_id = os.path.relpath(content_filename, archive_dir)
log.sed_documents[doc_id] = doc_log
else:
log.sed_documents = None
return log
def init_sed_document_log(doc,
supported_features=(Task, Report, Plot2D, Plot3D, DataSet, Curve, Surface),
logged_features=(Task, Report, Plot2D, Plot3D, DataSet, Curve, Surface)):
""" Initialize a log of a SED document
Args:
doc (:obj:`SedDocument`): SED document
supported_features (:obj:`list` of :obj:`type`, optional): list of supported elements.
Default: tasks, reports, plots, data sets, curves, and surfaces.
logged_features (:obj:`list` of :obj:`type`, optional): list of SED elements which
will be logged. Default: tasks, reports, plots, data sets, curves, and surfaces.
Returns:
:obj:`SedDocumentLog`: initialized log of a SED document
"""
log = SedDocumentLog()
if Task in logged_features:
log.tasks = {}
for task in doc.tasks:
task_log = init_task_log(task, supported_features=supported_features, logged_features=logged_features)
task_log.status = Status.QUEUED if isinstance(task, supported_features) else Status.SKIPPED
task_log.parent = log
log.tasks[task.id] = task_log
else:
log.tasks = None
if set([Output, Report, Plot2D, Plot3D]).intersection(logged_features):
log.outputs = {}
for output in doc.outputs:
if isinstance(output, logged_features):
output_log = init_output_log(output, supported_features=supported_features, logged_features=logged_features)
output_log.status = Status.QUEUED if isinstance(output, supported_features) else Status.SKIPPED
output_log.parent = log
log.outputs[output.id] = output_log
else:
log.outputs = None
return log
def init_task_log(task,
supported_features=(),
logged_features=()):
""" Initialize a log of a task
Args:
output (:obj:`Task`): a SED task
supported_features (:obj:`list` of :obj:`type`, optional): list of supported elements.
Default: empty list.
logged_features (:obj:`list` of :obj:`type`, optional): list of elements which
will be logged. Default: empty list.
Returns:
:obj:`OutputLog`: initialized log of a SED document
"""
return TaskLog(id=task.id)
def init_output_log(output,
supported_features=(DataSet, Curve, Surface),
logged_features=(DataSet, Curve, Surface)):
""" Initialize a log of an output
Args:
output (:obj:`Output`): a SED output
supported_features (:obj:`list` of :obj:`type`, optional): list of supported elements.
Default: data sets, curves, and surfaces.
logged_features (:obj:`list` of :obj:`type`, optional): list of elements which
will be logged. Default: data sets, curves, and surfaces.
Returns:
:obj:`OutputLog`: initialized log of a SED document
"""
if isinstance(output, Report):
log = init_report_log(output, supported_features=supported_features, logged_features=logged_features)
elif isinstance(output, Plot2D):
log = init_plot2d_log(output, supported_features=supported_features, logged_features=logged_features)
elif isinstance(output, Plot3D):
log = init_plot3d_log(output, supported_features=supported_features, logged_features=logged_features)
else:
raise NotImplementedError('`{}` outputs are not supported.'.format(
output.__class__.__name__)) # pragma: no cover # unreachable because all cases are enumerated above
return log
def init_report_log(report,
supported_features=(DataSet, Curve, Surface),
logged_features=(DataSet, Curve, Surface)):
""" Initialize a log of a report
Args:
report (:obj:`Report`): a SED report
supported_features (:obj:`list` of :obj:`type`, optional): list of supported elements.
Default: data sets.
logged_features (:obj:`list` of :obj:`type`, optional): list of elements which
will be logged. Default: data sets.
Returns:
:obj:`ReportLog`: initialized log of a report
"""
log = ReportLog(id=report.id)
if DataSet in logged_features:
log.data_sets = {}
for data_set in report.data_sets:
log.data_sets[data_set.id] = (
Status.QUEUED
if isinstance(data_set, supported_features)
else Status.SKIPPED)
else:
log.data_sets = None
return log
def init_plot2d_log(plot,
supported_features=(Curve),
logged_features=(Curve)):
""" Initialize a log of a 2D plot
Args:
plot (:obj:`Plot2D`): a SED 2D plot
supported_features (:obj:`list` of :obj:`type`, optional): list of supported elements.
Default: curves.
logged_features (:obj:`list` of :obj:`type`, optional): list of elements which
will be logged. Default: curves.
Returns:
:obj:`Plot2DLog`: initialized log of a 2D plot
"""
log = Plot2DLog(id=plot.id)
if Curve in logged_features:
log.curves = {}
for curve in plot.curves:
log.curves[curve.id] = (
Status.QUEUED
if isinstance(curve, supported_features)
else Status.SKIPPED)
else:
log.curves = None
return log
def init_plot3d_log(plot,
supported_features=(Surface),
logged_features=(Surface)):
""" Initialize a log of a 3D plot
Args:
plot (:obj:`Plot3D`): a SED 3D plot
supported_features (:obj:`list` of :obj:`type`, optional): list of supported elements.
Default: surfaces.
logged_features (:obj:`list` of :obj:`type`, optional): list of elements which
will be logged. Default: surfaces.
Returns:
:obj:`Plot3DLog`: initialized log of a 3D plot
"""
log = Plot3DLog(id=plot.id)
if Surface in logged_features:
log.surfaces = {}
for surface in plot.surfaces:
log.surfaces[surface.id] = (
Status.QUEUED
if isinstance(surface, supported_features)
else Status.SKIPPED)
else:
log.surfaces = None
return log
class StandardOutputErrorCapturer(contextlib.AbstractContextManager):
""" Context manager for capturing standard output/error. When :obj:`capturer` is available (i.e.,
Linux, MacOS, Unix), :obj:`capturer` is used to capture standard output/error. When :obj:`capturer` is not
available (i.e. Windows), this context manager issues a warn and collects no output. The purpose of this
context manager is to encapsulate the handling of whether :obj:`capturer` is or isn't available so
that the other modules can work seamless in Linux, as well as Windows (except without the ability to log
standard output/error).
Attributes:
level (:obj:`StandardOutputErrorCapturerLevel`, optional): level at which stdout/stderr should be captured
relay (:obj:`bool`): if :obj:`True`, collect the standard output/error streams and continue to pass
them along. if :obj:`False`, collect the stream, squash them, and do not pass them along.
disabled (:obj:`bool`): whether to capture standard output and error
_captured (:obj:`capturer.CaptureOutput`): logged C output
_log (:obj:`str`): logged Python output
_stdout (:obj:`io.IOBase`): overridden stdout
_stderr (:obj:`io.IOBase`): overridden stderr
"""
def __init__(self, level=StandardOutputErrorCapturerLevel.c, relay=False, termination_delay=0.01, disabled=False):
"""
Args:
level (:obj:`StandardOutputErrorCapturerLevel`, optional): level at which stdout/stderr should be captured
relay (:obj:`bool`): if :obj:`True`, collect the standard output/error streams and continue to pass
them along. if :obj:`False`, collect the stream, squash them, and do not pass them along.
termination_delay (:obj:`float`, optional): The number of seconds to wait before terminating
the output relay process.
disabled (:obj:`bool`, optional): whether to capture standard output and error
"""
self.level = level
self.relay = relay
self.disabled = disabled
if not self.disabled:
if self.level >= StandardOutputErrorCapturerLevel.c and capturer:
self._captured = capturer.CaptureOutput(merged=True, relay=relay, termination_delay=termination_delay)
else:
self._log = ''
else:
msg = (
'Standard output and error could not be logged because capturer is not installed. '
'To install capturer, install BioSimulators utils with the `logging` option '
'(`pip install biosimulators-utils[logging]`).'
)
warn(msg, StandardOutputNotLoggedWarning)
def __enter__(self):
""" Enter a context """
if not self.disabled:
if self.level >= StandardOutputErrorCapturerLevel.c and capturer:
self._captured.start_capture()
else:
self._stdout = sys.stdout
self._stderr = sys.stderr
sys.stdout = self
sys.stderr = self
return self
def __exit__(self, exc_type, exc_value, traceback):
""" Exit a context """
if not self.disabled:
if self.level >= StandardOutputErrorCapturerLevel.c and capturer:
self._captured.finish_capture()
else:
sys.stdout = self._stdout
sys.stderr = self._stderr
def write(self, message):
if self.relay:
self._stdout.write(message)
self._log += message
def flush(self):
if self.relay:
self._stdout.flush()
def get_text(self):
""" Get the captured standard output/error
Returns:
:obj:`str`: captured standard output/error
"""
if self.disabled:
return None
else:
if self.level >= StandardOutputErrorCapturerLevel.c and capturer:
bytes = self._captured.get_bytes()
return bytes.decode(errors='ignore')
else:
return self._log
def get_summary_combine_archive_log(log):
""" Get a summary of the log of a COMBINE/OMEX archive
Args:
log (:obj:`CombineArchiveLog`): log of a COMBINE/OMEX archive
Returns:
:obj:`str`: summary of the log
"""
tasks_logged = False
outputs_logged = False
n_archives = 0
n_tasks = 0
n_outputs = 0
sed_doc_status_count = {
Status.SUCCEEDED: 0,
Status.SKIPPED: 0,
Status.FAILED: 0,
None: 0,
}
task_status_count = {
Status.SUCCEEDED: 0,
Status.SKIPPED: 0,
Status.FAILED: 0,
None: 0,
}
output_status_count = {
Status.SUCCEEDED: 0,
Status.SKIPPED: 0,
Status.FAILED: 0,
None: 0,
}
for doc_log in log.sed_documents.values():
n_archives += 1
sed_doc_status_count[doc_log.status] += 1
if doc_log.tasks is not None:
tasks_logged = True
for task_log in doc_log.tasks.values():
n_tasks += 1
task_status_count[task_log.status if task_log else None] += 1
if doc_log.outputs is not None:
outputs_logged = True
for output_log in doc_log.outputs.values():
n_outputs += 1
output_status_count[output_log.status if output_log else None] += 1
msg = ''
msg += 'Executed {} SED documents:\n'.format(n_archives)
msg += ' SED documents ({}):\n'.format(n_archives)
msg += ' Succeeded: {}\n'.format(sed_doc_status_count[Status.SUCCEEDED])
msg += ' Skipped: {}\n'.format(sed_doc_status_count[Status.SKIPPED])
msg += ' Failed: {}\n'.format(sed_doc_status_count[Status.FAILED])
if tasks_logged:
msg += ' Tasks ({}):\n'.format(n_tasks)
msg += ' Succeeded: {}\n'.format(task_status_count[Status.SUCCEEDED])
msg += ' Skipped: {}\n'.format(task_status_count[Status.SKIPPED])
msg += ' Failed: {}\n'.format(task_status_count[Status.FAILED])
if task_status_count[None]:
msg += ' Unknown: {}\n'.format(task_status_count[None])
if outputs_logged:
msg += ' Outputs ({}):\n'.format(n_outputs)
msg += ' Succeeded: {}\n'.format(output_status_count[Status.SUCCEEDED])
msg += ' Skipped: {}\n'.format(output_status_count[Status.SKIPPED])
msg += ' Failed: {}\n'.format(output_status_count[Status.FAILED])
if output_status_count[None]:
msg += ' Unknown: {}\n'.format(output_status_count[None])
return msg