-
Notifications
You must be signed in to change notification settings - Fork 76
/
flyers.py
434 lines (346 loc) · 13.1 KB
/
flyers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
import time as ttime
import functools
import logging
from collections import OrderedDict
from .signal import (Signal, EpicsSignal, EpicsSignalRO)
from .status import DeviceStatus, StatusBase
from .device import (Device, Component as Cpt, BlueskyInterface)
from .utils import OrderedDefaultDict
from typing import Generator, Dict, Iterable, Any
logger = logging.getLogger(__name__)
class FlyerInterface(BlueskyInterface):
def kickoff(self) -> StatusBase:
'''Start a flyer
The status object return is marked as done once flying
has started.
Returns
-------
kickoff_status : StatusBase
Indicate when flying has started.
'''
def complete(self) -> StatusBase:
'''Wait for flying to be complete.
This can either be a question ("are you done yet") or a
command ("please wrap up") to accommodate flyers that have a
fixed trajectory (ex. high-speed raster scans) or that are
passive collectors (ex MAIA or a hardware buffer).
In either case, the returned status object should indicate when
the device is actually finished flying.
Returns
-------
complete_status : StatusBase
Indicate when flying has completed
'''
def collect(self) -> Generator[Dict, None, None]:
'''Retrieve data from the flyer as proto-events
The events can be from a mixture of event streams, it is
the responsibility of the consumer (ei the RunEngine) to sort
them out.
Yields
------
event_data : dict
Must have the keys {'time', 'timestamps', 'data'}.
'''
def collect_tables(self) -> Iterable[Any]:
'''Retrieve data from flyer as tables
PROPOSED
Yields
------
time : Iterable[Float]
data : dict
timestamps : dict
'''
def describe_collect(self) -> Dict[str, Dict]:
'''Provide schema & meta-data from :meth:`collect`
This is analogous to :meth:`describe`, but nested by stream name.
This provides schema related information, (ex shape, dtype), the
source (ex PV name), and if available, units, limits, precision etc.
The data_keys are mapped to events from `collect` by matching the
keys.
Returns
-------
data_keys_by_stream : dict
The keys must be strings and the values must be dict-like
with keys that are str and the inner values are dict-like
with the ``event_model.event_descriptor.data_key`` schema.
'''
class AreaDetectorTimeseriesCollector(Device):
control = Cpt(EpicsSignal, "TSControl")
num_points = Cpt(EpicsSignal, "TSNumPoints")
cur_point = Cpt(EpicsSignalRO, "TSCurrentPoint")
waveform = Cpt(EpicsSignalRO, "TSTotal")
waveform_ts = Cpt(EpicsSignalRO, "TSTimestamp")
_default_configuration_attrs = ('num_points', )
_default_read_attrs = ()
def __init__(self, *args, stream_name=None, **kwargs):
self.stream_name = stream_name
super().__init__(*args, **kwargs)
def _get_waveforms(self):
n = self.cur_point.get()
if n:
return (self.waveform.get(count=n),
self.waveform_ts.get(count=n))
else:
return ([], [])
def kickoff(self):
# Erase buffer and start collection
self.control.put('Erase/Start', wait=True)
# make status object
status = DeviceStatus(self)
# it always done, the scan should never even try to wait for this
status.set_finished()
return status
def pause(self):
# Stop without clearing buffers
self.control.put('Stop', wait=True)
super().pause()
def resume(self):
# Resume without erasing
self.control.put('Start', wait=True)
super().resume()
def complete(self):
if self.control.get(as_string=True) == 'Stop':
raise RuntimeError('Not acquiring')
self.pause()
# Data is ready immediately
st = DeviceStatus(self)
st.set_finished()
return st
def collect(self):
if self.control.get(as_string=True) != 'Stop':
raise RuntimeError('Acquisition still in progress. Call complete()'
' first.')
payload_val, payload_time = self._get_waveforms()
for v, t in zip(payload_val, payload_time):
yield {'data': {self.name: v},
'timestamps': {self.name: t},
'time': t}
def describe_collect(self):
'''Describe details for the flyer collect() method'''
desc = OrderedDict()
desc.update(self.waveform.describe())
desc.update(self.waveform_ts.describe())
return {self.stream_name: desc}
class WaveformCollector(Device):
'''Waveform collector
See: https://github.com/NSLS-II-CSX/timestamp
Parameters
----------
data_is_time : bool, optional
Use time as the data being acquired
'''
_default_configuration_attrs = ()
_default_read_attrs = ()
select = Cpt(EpicsSignal, "Sw-Sel")
reset = Cpt(EpicsSignal, "Rst-Sel")
waveform_count = Cpt(EpicsSignalRO, "Val:TimeN-I")
waveform = Cpt(EpicsSignalRO, "Val:Time-Wfrm")
waveform_nord = Cpt(EpicsSignalRO, "Val:Time-Wfrm.NORD")
data_is_time = Cpt(Signal)
def __init__(self, *args,
data_is_time=True, stream_name=None,
**kwargs):
self.stream_name = stream_name
super().__init__(*args, **kwargs)
self.data_is_time.put(data_is_time)
def _get_waveform(self):
if self.waveform_count.get():
return self.waveform.get(count=int(self.waveform_nord.get()))
else:
return []
def pause(self):
# Stop without clearing buffers
self.select.put(0, wait=True)
def resume(self):
# Resume without erasing
self.select.put(1, wait=True)
def complete(self):
self.pause()
st = DeviceStatus(self)
st.set_finished()
return st
def kickoff(self):
# Put us in reset mode
self.select.put(2, wait=True)
# Trigger processing
self.reset.put(1, wait=True)
# Start Buffer
self.select.put(1, wait=True)
# make status object
status = DeviceStatus(self)
# it always done, the scan should never even try to wait for this
status.set_finished()
return status
def collect(self):
payload = self._get_waveform()
if payload:
data_is_time = self.data_is_time.get()
for i, v in enumerate(payload):
x = v if data_is_time else i
ev = {'data': {self.name: x},
'timestamps': {self.name: v},
'time': v}
yield ev
else:
yield from []
def _repr_info(self):
yield from super()._repr_info()
yield ('data_is_time', self.data_is_time.get())
def describe_collect(self):
'''Describe details for the flyer collect() method'''
desc = self._describe_attr_list(['waveform'])
return {self.stream_name: desc}
class MonitorFlyerMixin(BlueskyInterface):
'''A bluesky-compatible flyer mixin, using monitor_attrs
At kickoff(), all monitor_attrs will be subscribed to and monitored for the
until complete() is called. `complete` returns a DeviceStatus instance,
which indicates when the data is ready to be collected. The acquired
values are then be retrievable as bluesky bulk-readable documents in
collect().
Parameters
----------
monitor_attrs : list, optional
List of signal attribute names to monitor
stream_names : dict, optional
A mapping of attribute -> stream name
If an attribute is not in this dictionary, the stream name will default
to the object's name.
pivot : bool, optional
If set, each value and timestamp pair will be in separate events.
Otherwise, a single event will be generated with an array. Defaults to
False.
'''
def __init__(self, *args, monitor_attrs=None, stream_names=None,
pivot=False, **kwargs):
if monitor_attrs is None:
monitor_attrs = []
if stream_names is None:
stream_names = {}
self.monitor_attrs = monitor_attrs
self.stream_names = stream_names
self._acquiring = False
self._paused = False
self._collected_data = None
self._monitors = {}
self._pivot = pivot
super().__init__(*args, **kwargs)
def kickoff(self):
'''Start collection
Returns
-------
DeviceStatus
This will be set to done when acquisition has begun
'''
self._collected_data = OrderedDefaultDict(lambda: {'values': [],
'timestamps': []})
self._start_time = ttime.time()
self._acquiring = True
self._paused = False
self._add_monitors()
st = DeviceStatus(self)
st.set_finished()
return st
def _add_monitors(self):
for attr in self.monitor_attrs:
obj = getattr(self, attr)
if isinstance(obj, Device):
raise ValueError('Cannot monitor Devices, only Signals.')
cb = functools.partial(self._monitor_callback, attribute=attr)
self._monitors[obj] = cb
obj.subscribe(cb)
def _monitor_callback(self, attribute=None, obj=None, value=None,
timestamp=None, **kwargs):
'''A monitor_attr signal has changed'''
if not self._acquiring or self._paused:
return
if value is None or timestamp is None:
data = obj.read()[obj.name]
value = data['value']
timestamp = data['timestamp']
collected = self._collected_data[attribute]
collected['values'].append(value)
collected['timestamps'].append(timestamp)
def _get_stream_name(self, attr):
obj = getattr(self, attr)
return self.stream_names.get(attr, obj.name)
def _describe_attr_list(self, attrs):
desc = OrderedDict()
for attr in attrs:
desc.update(getattr(self, attr).describe())
return desc
def _describe_with_dtype(self, attr, *, dtype='array'):
'''Describe an attribute and change its dtype'''
desc = self._describe_attr_list([attr])
obj = getattr(self, attr)
desc[obj.name]['dtype'] = dtype
return desc
def describe_collect(self):
'''Description of monitored attributes retrieved by collect'''
if self._pivot:
return {self._get_stream_name(attr):
self._describe_attr_list([attr])
for attr in self.monitor_attrs
}
else:
return {self._get_stream_name(attr):
self._describe_with_dtype(attr, dtype='array')
for attr in self.monitor_attrs
}
def _clear_monitors(self):
'''Clear all subscriptions'''
for obj, monitor in self._monitors.items():
try:
obj.clear_sub(monitor, event_type=obj._default_sub)
except Exception as ex:
logger.debug('Failed to clear subscription',
exc_info=ex)
self._monitors.clear()
def pause(self):
'''Pause acquisition'''
if not self._acquiring:
# nothing to do
return
self._paused = True
self._clear_monitors()
super().pause()
def resume(self):
'''Resume acquisition'''
if not self._acquiring:
# nothing to do
return
self._paused = False
self._add_monitors()
super().resume()
def complete(self):
'''Acquisition completed'''
if not self._acquiring:
raise RuntimeError('Not acquiring')
self._acquiring = False
self._paused = False
self._clear_monitors()
# Data is ready immediately
st = DeviceStatus(self)
st.set_finished()
return st
def collect(self):
'''Retrieve all collected data'''
if self._acquiring:
raise RuntimeError('Acquisition still in progress. Call complete()'
' first.')
collected = self._collected_data
self._collected_data = None
if self._pivot:
for attr, data in collected.items():
name = getattr(self, attr).name
for ts, value in zip(data['timestamps'], data['values']):
yield dict(time=ts,
timestamps={name: ts},
data={name: value},
)
else:
for attr, data in collected.items():
name = getattr(self, attr).name
yield dict(time=self._start_time,
timestamps={name: data['timestamps']},
data={name: data['values']},
)