-
Notifications
You must be signed in to change notification settings - Fork 14
/
hdfwriterpart.py
323 lines (287 loc) · 13.8 KB
/
hdfwriterpart.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
import os
from xml.etree import cElementTree as ET
from scanpointgenerator import FixedDurationMutator
from malcolm.compat import et_to_string
from malcolm.core import method_takes, REQUIRED, Info
from malcolm.core.vmetas import StringMeta, PointGeneratorMeta
from malcolm.parts.builtin.childpart import ChildPart
from malcolm.controllers.runnablecontroller import RunnableController
from malcolm.parts.ADCore.datasettablepart import DatasetProducedInfo, \
dataset_types
SUFFIXES = "NXY3456789"
attribute_dataset_types = ["detector", "monitor", "position"]
# Produced by plugins in part_info
class NDAttributeDatasetInfo(Info):
def __init__(self, name, type, attr, rank):
self.name = name
assert type in attribute_dataset_types, \
"Dataset type %s not in %s" % (type, dataset_types)
self.type = type
self.attr = attr
self.rank = rank
class CalculatedNDAttributeDatasetInfo(Info):
def __init__(self, name, attr):
self.name = name
self.attr = attr
class NDArrayDatasetInfo(Info):
def __init__(self, name, rank):
self.name = name
self.rank = rank
class HDFWriterPart(ChildPart):
# Attributes
datasets = None
# Future for the start action
start_future = None
array_future = None
done_when_reaches = 0
def _create_dataset_infos(self, part_info, generator, filename):
# Update the dataset table
uniqueid = "/entry/NDAttributes/NDArrayUniqueId"
generator_rank = len(generator.index_dims)
# Get the detector name from the primary source
ndarray_infos = NDArrayDatasetInfo.filter_values(part_info)
assert len(ndarray_infos) in (0, 1), \
"More than one NDArrayDatasetInfo defined %s" % ndarray_infos
# Add the primary datasource
if ndarray_infos:
ndarray_info = ndarray_infos[0]
yield DatasetProducedInfo(
name="%s.data" % ndarray_info.name, filename=filename,
type="primary", rank=ndarray_info.rank + generator_rank,
path="/entry/detector/detector",
uniqueid=uniqueid)
# Add any secondary datasources
for calculated_info in \
CalculatedNDAttributeDatasetInfo.filter_values(part_info):
yield DatasetProducedInfo(
name="%s.%s" % (ndarray_info.name, calculated_info.name),
filename=filename, type="secondary",
rank=ndarray_info.rank + generator_rank,
path="/entry/%s/%s" % (
calculated_info.name, calculated_info.name),
uniqueid=uniqueid)
# Add all the other datasources
for dataset_info in NDAttributeDatasetInfo.filter_values(part_info):
if dataset_info.type == "detector":
# Something like I0
name = "%s.data" % dataset_info.name
type = "primary"
elif dataset_info.type == "monitor":
# Something like Iref
name = "%s.data" % dataset_info.name
type = "monitor"
elif dataset_info.type == "position":
# Something like x
name = "%s.value" % dataset_info.name
type = "position_value"
else:
raise AttributeError("Bad dataset type %r, should be in %s" % (
dataset_info.type, attribute_dataset_types))
yield DatasetProducedInfo(
name=name, filename=filename, type=type,
rank=dataset_info.rank + generator_rank,
path="/entry/%s/%s" % (dataset_info.name, dataset_info.name),
uniqueid=uniqueid)
# Add any setpoint dimensions
for dim in generator.axes:
yield DatasetProducedInfo(
name="%s.value_set" % dim, filename=filename,
type="position_set", rank=1,
path="/entry/detector/%s_set" % dim, uniqueid="")
@RunnableController.Reset
def reset(self, task):
super(HDFWriterPart, self).reset(task)
self.abort(task)
@RunnableController.Configure
@method_takes(
"generator", PointGeneratorMeta("Generator instance"), REQUIRED,
"filePath", StringMeta("File path to write data to"), REQUIRED)
def configure(self, task, completed_steps, steps_to_do, part_info, params):
self.done_when_reaches = completed_steps + steps_to_do
# For first run then open the file
# Enable position mode before setting any position related things
task.put(self.child["positionMode"], True)
# Setup our required settings
# TODO: this should be different for windows detectors
file_path = params.filePath.rstrip(os.sep)
file_dir, filename = file_path.rsplit(os.sep, 1)
assert "." in filename, \
"File extension for %r should be supplied" % filename
futures = task.put_many_async(self.child, dict(
enableCallbacks=True,
fileWriteMode="Stream",
swmrMode=True,
positionMode=True,
dimAttDatasets=True,
lazyOpen=True,
arrayCounter=0,
filePath=file_dir + os.sep,
fileName=filename,
fileTemplate="%s%s"))
futures += self._set_dimensions(task, params.generator)
xml = self._make_layout_xml(params.generator, part_info)
layout_filename = os.path.join(
file_dir, "%s-layout.xml" % self.params.mri)
open(layout_filename, "w").write(xml)
futures += task.put_async(self.child["xml"], layout_filename)
# Wait for the previous puts to finish
task.wait_all(futures)
# Reset numCapture back to 0
task.put(self.child["numCapture"], 0)
# We want the HDF writer to flush this often:
flush_time = 1 # seconds
# (In particular this means that HDF files can be read cleanly by
# SciSoft at the start of a scan.)
# To achieve this we'll tell the HDF writer how many frames it should
# write between each flush. Thus we need to know the exposure time. Get
# it from the last FDM. (There's probably only one, and we don't care
# about other cases.)
# Choose a default exposure time in case there is no FDM.
exposure_time = 0.1 # seconds
for mutator in params.generator.mutators:
if isinstance(mutator, FixedDurationMutator):
exposure_time = mutator.duration
# Now do some maths and set the relevant PV. (Xspress3 does not seem to
# support flushing more often than once per 2 frames.)
n_frames_between_flushes = max(2, round(flush_time/exposure_time))
task.put(self.child["flushDataPerNFrames"], n_frames_between_flushes)
task.put(self.child["flushAttrPerNFrames"], n_frames_between_flushes)
# Start the plugin
self.start_future = task.post_async(self.child["start"])
# Start a future waiting for the first array
self.array_future = task.when_matches_async(
self.child["arrayCounter"], 1)
# Return the dataset information
dataset_infos = list(self._create_dataset_infos(
part_info, params.generator, filename))
return dataset_infos
@RunnableController.PostRunReady
@RunnableController.Seek
def seek(self, task, completed_steps, steps_to_do, part_info):
self.done_when_reaches = completed_steps + steps_to_do
# Just reset the array counter
task.put(self.child["arrayCounter"], 0)
# Start a future waiting for the first array
self.array_future = task.when_matches_async(
self.child["arrayCounter"], 1)
@RunnableController.Run
@RunnableController.Resume
def run(self, task, update_completed_steps):
task.wait_all(self.array_future)
task.unsubscribe_all()
task.subscribe(self.child["uniqueId"], update_completed_steps, self)
# TODO: what happens if we miss the last frame?
task.when_matches(self.child["uniqueId"], self.done_when_reaches)
@RunnableController.PostRunIdle
def post_run_idle(self, task):
# If this is the last one, wait until the file is closed
task.wait_all(self.start_future)
@RunnableController.Abort
def abort(self, task):
task.post(self.child["stop"])
def _set_dimensions(self, task, generator):
num_dims = len(generator.index_dims)
assert num_dims <= 10, \
"Can only do 10 dims, you gave me %s" % num_dims
attr_dict = dict(numExtraDims=num_dims-1)
# Fill in dim name and size
for i in range(10):
suffix = SUFFIXES[i]
if i < len(generator.index_names):
index_name = generator.index_names[-i - 1]
index_size = generator.index_dims[-i - 1]
else:
index_name = ""
index_size = 1
attr_dict["posNameDim%s" % suffix] = index_name
attr_dict["extraDimSize%s" % suffix] = index_size
futures = task.put_many_async(self.child, attr_dict)
return futures
def _find_generator_index(self, generator, dim):
ndims = 0
for g in generator.generators:
if dim in g.position_units:
return ndims, g
else:
ndims += len(g.index_dims)
raise ValueError("Can't find generator for %s" % dim)
def _make_nxdata(self, name, rank, entry_el, generator, link=False):
# Make a dataset for the data
data_el = ET.SubElement(entry_el, "group", name=name)
ET.SubElement(data_el, "attribute", name="signal", source="constant",
value=name, type="string")
pad_dims = []
for n in generator.index_names:
if n in generator.position_units:
pad_dims.append("%s_set" % n)
else:
pad_dims.append(".")
pad_dims += ["."] * rank
ET.SubElement(data_el, "attribute", name="axes", source="constant",
value=",".join(pad_dims), type="string")
ET.SubElement(data_el, "attribute", name="NX_class", source="constant",
value="NXdata", type="string")
# Add in the indices into the dimensions array that our axes refer to
for dim, units in sorted(generator.position_units.items()):
# Find the generator for this dimension
ndims, g = self._find_generator_index(generator, dim)
ET.SubElement(data_el, "attribute",
name="%s_set_indices" % dim,
source="constant", value=str(ndims), type="string")
if link:
ET.SubElement(data_el, "hardlink",
name="%s_set" % dim,
target="/entry/detector/%s_set" % dim)
else:
axes_vals = []
for point in g.iterator():
axes_vals.append("%.12g" % point.positions[dim])
axis_el = ET.SubElement(
data_el, "dataset", name="%s_set" % dim,
source="constant", type="float", value=",".join(axes_vals))
ET.SubElement(axis_el, "attribute", name="units",
source="constant", value=units, type="string")
return data_el
def _make_layout_xml(self, generator, part_info):
# Make a root element with an NXEntry
root_el = ET.Element("hdf5_layout")
entry_el = ET.SubElement(root_el, "group", name="entry")
ET.SubElement(entry_el, "attribute", name="NX_class",
source="constant", value="NXentry", type="string")
# Check that there is only one primary source of detector data
ndarray_infos = NDArrayDatasetInfo.filter_values(part_info)
if not ndarray_infos:
# Still need to put the data in the file, so manufacture something
primary_rank = 1
else:
primary_rank = ndarray_infos[0].rank
# Make an NXData element with the detector data in it in
# /entry/detector/detector
data_el = self._make_nxdata(
"detector", primary_rank, entry_el, generator)
det_el = ET.SubElement(data_el, "dataset", name="detector",
source="detector", det_default="true")
ET.SubElement(det_el, "attribute", name="NX_class",
source="constant", value="SDS", type="string")
# Now add any calculated sources of data
for dataset_info in \
CalculatedNDAttributeDatasetInfo.filter_values(part_info):
# if we are a secondary source, use the same rank as the det
attr_el = self._make_nxdata(
dataset_info.name, primary_rank, entry_el, generator, link=True)
ET.SubElement(attr_el, "dataset", name=dataset_info.name,
source="ndattribute", ndattribute=dataset_info.attr)
# And then any other attribute sources of data
for dataset_info in NDAttributeDatasetInfo.filter_values(part_info):
# if we are a secondary source, use the same rank as the det
attr_el = self._make_nxdata(dataset_info.name, dataset_info.rank,
entry_el, generator, link=True)
ET.SubElement(attr_el, "dataset", name=dataset_info.name,
source="ndattribute", ndattribute=dataset_info.attr)
# Add a group for attributes
NDAttributes_el = ET.SubElement(entry_el, "group", name="NDAttributes",
ndattr_default="true")
ET.SubElement(NDAttributes_el, "attribute", name="NX_class",
source="constant", value="NXcollection", type="string")
xml = et_to_string(root_el)
return xml