-
Notifications
You must be signed in to change notification settings - Fork 6
/
io.py
364 lines (291 loc) · 15.8 KB
/
io.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
""" Utilities for reading and writing reports
:Author: Jonathan Karr <karr@mssm.edu>
:Date: 2020-12-06
:Copyright: 2020, Center for Reproducible Biomedical Modeling
:License: MIT
"""
from ..config import get_config
from ..sedml.data_model import Output, Report, Plot2D, Plot3D # noqa: F401
from ..utils.core import pad_arrays_to_consistent_shapes
from ..warnings import warn
from .data_model import DataSetResults, ReportFormat
from .warnings import (RepeatDataSetLabelsWarning, MissingReportMetadataWarning, MissingDataWarning,
ExtraDataWarning, CannotExportMultidimensionalTableWarning)
import enum
import functools
import glob
import h5py
import numpy
import openpyxl
import os
import pandas
__all__ = [
'ReportWriter',
'ReportReader',
]
class Hdf5DataSetType(enum.Enum):
""" Type of data encoded in an HDF5 data set """
SedReport = Report
SedPlot2D = Plot2D
SedPlot3D = Plot3D
class ReportWriter(object):
""" Class for writing reports of simulation results """
def run(self, report, results, base_path, rel_path, format=ReportFormat.h5, type=Report):
""" Save a report
Args:
report (:obj:`Report`): report
results (:obj:`DataSetResults`): results of the data sets
base_path (:obj:`str`): path to save results
* CSV: parent directory to save results
* HDF5: file to save results
rel_path (:obj:`str`): path to save results relative to :obj:`base_path`
* CSV: relative path to :obj:`base_path`
* HDF5: key within HDF5 file
format (:obj:`ReportFormat`, optional): report format
type (:obj:`type`): type of output (e.g., subclass of :obj:`Output` such as :obj:`Report`, :obj:`Plot2D`)
"""
rel_path = os.path.relpath(rel_path, '.')
results_array = []
data_set_ids = []
data_set_labels = []
data_set_names = []
data_set_data_types = []
data_set_shapes = []
for data_set in report.data_sets:
if data_set.id in results:
data_set_result = results[data_set.id]
results_array.append(data_set_result)
data_set_ids.append(data_set.id)
data_set_labels.append(data_set.label)
data_set_names.append(data_set.name or '')
if data_set_result is None:
data_set_data_types.append('__None__')
data_set_shapes.append('')
else:
data_set_dtype = data_set_result.dtype
if data_set_dtype in [numpy.dtype('object'), numpy.dtype('void'), numpy.dtype('S'), numpy.dtype('a')]:
msg = 'NumPy dtype should be a specific type such as `float64` or `int64` not `{}`.'.format(data_set_dtype.name)
raise TypeError(msg)
data_set_data_types.append(data_set_dtype.name)
data_set_shapes.append(','.join(str(dim_len) for dim_len in data_set_result.shape))
results_array = pad_arrays_to_consistent_shapes(results_array)
results_array = numpy.array(results_array)
if format in [ReportFormat.csv, ReportFormat.tsv, ReportFormat.xlsx]:
if results_array.ndim > 2:
msg = 'Report has {} dimensions. Multidimensional reports cannot be exported to {}.'.format(
results_array.ndim, format.value.upper())
warn(msg, CannotExportMultidimensionalTableWarning)
return
if len(set(data_set.label for data_set in report.data_sets)) < len(report.data_sets):
warn('To facilitate machine interpretation, data sets should have unique labels.',
RepeatDataSetLabelsWarning)
msg = 'Reports exported to {} do not contain information about the data type or size of each data set.'.format(
format.value.upper())
warn(msg, MissingReportMetadataWarning)
results_df = pandas.DataFrame(results_array, index=data_set_labels)
if format in [ReportFormat.csv, ReportFormat.tsv]:
filename = os.path.join(base_path, rel_path + '.' + format.value)
out_dir = os.path.dirname(filename)
if not os.path.isdir(out_dir):
os.makedirs(out_dir)
results_df.to_csv(filename, header=False, sep=',' if format == ReportFormat.csv else '\t')
else:
filename = os.path.join(base_path, os.path.dirname(rel_path) + '.' + format.value)
out_dir = os.path.dirname(filename)
if not os.path.isdir(out_dir):
os.makedirs(out_dir)
with pandas.ExcelWriter(filename, mode='a' if os.path.isfile(filename) else 'w', engine='openpyxl') as writer:
results_df.to_excel(writer, sheet_name=os.path.basename(rel_path), header=False)
elif format == ReportFormat.h5:
filename = os.path.join(base_path, get_config().H5_REPORTS_PATH)
if not os.path.isdir(base_path):
os.makedirs(base_path)
rel_path = '/'.join(rel_path.split(os.path.sep))
with h5py.File(filename, 'a') as file:
try:
file[rel_path]
del file[rel_path]
except KeyError:
pass
data_set = file.create_dataset(rel_path, data=results_array,
chunks=True, compression="gzip", compression_opts=9)
data_set.attrs['_type'] = Hdf5DataSetType(type).name
if report.id:
data_set.attrs['uri'] = rel_path
data_set.attrs['sedmlId'] = report.id
if report.name:
data_set.attrs['sedmlName'] = report.name
data_set.attrs['sedmlDataSetIds'] = data_set_ids
data_set.attrs['sedmlDataSetNames'] = data_set_names
data_set.attrs['sedmlDataSetLabels'] = data_set_labels
data_set.attrs['sedmlDataSetDataTypes'] = data_set_data_types
data_set.attrs['sedmlDataSetShapes'] = data_set_shapes
group_ids = rel_path.split('/')[0:-1]
for i_group in range(len(group_ids)):
uri = '/'.join(group_ids[0:i_group + 1])
group = file[uri]
group.attrs['uri'] = uri
group.attrs['combineArchiveLocation'] = uri
else:
raise NotImplementedError('Report format {} is not supported'.format(format))
class ReportReader(object):
""" Class for reading reports of simulation results """
def run(self, report, base_path, rel_path, format=ReportFormat.h5):
""" Read a report for a file
Args:
report (:obj:`Report`): report
base_path (:obj:`str`): path to save results
* CSV: parent directory to save results
* HDF5: file to save results
rel_path (:obj:`str`): path to save results relative to :obj:`base_path`
* CSV: relative path to :obj:`base_path`
* HDF5: key within HDF5 file
format (:obj:`ReportFormat`, optional): report format
Returns:
:obj:`DataSetResults`: report results
"""
rel_path = os.path.relpath(rel_path, '.')
if format in [ReportFormat.csv, ReportFormat.tsv, ReportFormat.xlsx]:
warn('Reports exported to {} do not contain information about the data type or size of each data set.'.format(
format.value.upper()), MissingReportMetadataWarning)
if format in [ReportFormat.csv, ReportFormat.tsv]:
filename = os.path.join(base_path, rel_path + '.' + format.value)
df = pandas.read_csv(filename,
index_col=0,
header=None,
sep=',' if format == ReportFormat.csv else '\t')
else:
filename = os.path.join(base_path, os.path.dirname(rel_path) + '.' + format.value)
df = pandas.read_excel(filename,
sheet_name=os.path.basename(rel_path),
index_col=0,
header=None,
engine='openpyxl')
df.columns = pandas.RangeIndex(start=0, stop=df.shape[1], step=1)
results = DataSetResults()
data_set_labels = [data_set.label for data_set in report.data_sets]
if df.index.tolist() == data_set_labels:
data = df.to_numpy()
for i_data_set, data_set in enumerate(report.data_sets):
results[data_set.id] = data[i_data_set, :]
extra_data_sets = set()
else:
data_set_label_to_index = {}
for i_data_set, data_set_label in enumerate(df.index):
if data_set_label not in data_set_label_to_index:
data_set_label_to_index[data_set_label] = i_data_set
else:
data_set_label_to_index[data_set_label] = None
unreadable_data_sets = []
for data_set in report.data_sets:
i_data_set = data_set_label_to_index.get(data_set.label, None)
if i_data_set is None:
# results[data_set.id] = None
unreadable_data_sets.append(data_set.id)
else:
results[data_set.id] = df.loc[data_set.label, :].to_numpy()
if unreadable_data_sets:
warn('Some data sets could not be read because their labels are not unique:\n - {}'.format(
'\n'.join('`' + id + '`' for id in sorted(unreadable_data_sets))), RepeatDataSetLabelsWarning)
data_set_id_to_label = {data_set.id: data_set.label for data_set in report.data_sets}
extra_data_sets = set(df.index) - set(data_set_id_to_label[id] for id in results.keys()) - set(unreadable_data_sets)
file_data_set_ids = set(results.keys()) | extra_data_sets
elif format == ReportFormat.h5:
filename = os.path.join(base_path, get_config().H5_REPORTS_PATH)
rel_path = '/'.join(rel_path.split(os.path.sep))
with h5py.File(filename, 'r') as file:
data_set = file[rel_path]
data_set_results = data_set[:]
file_data_set_ids = ReportReader.parse_dataset_str_list_values(data_set.attrs['sedmlDataSetIds'])
data_set_data_types = ReportReader.parse_dataset_str_list_values(data_set.attrs['sedmlDataSetDataTypes'])
data_set_shapes = []
for data_set_shape in ReportReader.parse_dataset_str_list_values(data_set.attrs['sedmlDataSetShapes']):
if data_set_shape:
data_set_shapes.append([int(dim_len) for dim_len in data_set_shape.split(',')])
else:
data_set_shapes.append([])
results = DataSetResults()
data_set_id_to_index = {data_set_id: i_data_set for i_data_set, data_set_id in enumerate(file_data_set_ids)}
data_set_ndim = data_set_results.ndim - 1
for data_set in report.data_sets:
i_data_set = data_set_id_to_index.get(data_set.id, None)
if i_data_set is not None:
data_set_data_type = data_set_data_types[i_data_set]
if data_set_data_type == '__None__':
results[data_set.id] = None
else:
data_set_shape = data_set_shapes[i_data_set]
data_set_slice = tuple([slice(0, dim_len) for dim_len in data_set_shape] +
[slice(0, 1)] * (data_set_ndim - len(data_set_shape)))
results[data_set.id] = (
data_set_results[i_data_set][data_set_slice]
.reshape(data_set_shape)
.astype(data_set_data_type)
)
file_data_set_ids = set(file_data_set_ids)
else:
raise NotImplementedError('Report format {} is not supported'.format(format))
report_data_set_ids = set(data_set.id for data_set in report.data_sets)
missing_data_set_ids = report_data_set_ids.difference(file_data_set_ids)
extra_data_set_ids = file_data_set_ids.difference(report_data_set_ids)
if missing_data_set_ids:
warn('File does not contain data for the following data sets of the report:\n - {}'.format(
'\n'.join('`' + id + '`' for id in sorted(missing_data_set_ids))), MissingDataWarning)
if extra_data_set_ids:
warn('File contains additional data that could not be mapped to data sets of the report:\n - {}'.format(
'\n'.join('`' + id + '`' for id in sorted(extra_data_set_ids))), ExtraDataWarning)
return results
def get_ids(self, base_path, format=ReportFormat.h5, type=Output):
""" Get the ids of the reports in a file
Args:
base_path (:obj:`str`): path to save results
* CSV: parent directory to save results
* HDF5: file to save results
format (:obj:`ReportFormat`, optional): report format
type (:obj:`type`): type of report to get
Returns:
:obj:`list` of :obj:`str`: ids of reports
"""
if format in [ReportFormat.csv, ReportFormat.tsv]:
report_ids = []
for path in glob.glob(os.path.join(base_path, '**/*.' + format.value), recursive=True):
report_ids.append(os.path.relpath(path, base_path)[0:-len(format.value)-1])
return report_ids
elif format == ReportFormat.xlsx:
report_ids = []
for path in glob.glob(os.path.join(base_path, '**/*.' + format.value), recursive=True):
wb = openpyxl.load_workbook(path)
for sheet_name in wb.get_sheet_names():
report_ids.append(os.path.join(os.path.relpath(path, base_path)[0:-len(format.value)-1], sheet_name))
return report_ids
elif format == ReportFormat.h5:
filename = os.path.join(base_path, get_config().H5_REPORTS_PATH)
with h5py.File(filename, 'r') as file:
report_ids = []
file.visititems(functools.partial(self.append_report_id, type, report_ids))
return report_ids
else:
raise NotImplementedError('Report format {} is not supported'.format(format))
@staticmethod
def append_report_id(type, report_ids, name, object):
if isinstance(object, h5py.Dataset):
dataset: h5py.Dataset = object
data_set_type = dataset.attrs.get('_type', None)
if not data_set_type:
return
if not isinstance(data_set_type, str):
data_set_type = str(dataset.attrs.get('_type', None), encoding='utf-8')
debug = Hdf5DataSetType[data_set_type]
if (debug.value == type or issubclass(debug.value, type)):
report_ids.append(name)
@staticmethod
def parse_dataset_str_value(raw_value):
if raw_value and not isinstance(raw_value, str):
return str(raw_value, encoding='utf-8')
return raw_value
@staticmethod
def parse_dataset_str_list_values(raw_values: list):
parsedStrs: "list[str]" = []
for value in raw_values:
parsedStrs.append(ReportReader.parse_dataset_str_value(value))
return parsedStrs