-
Notifications
You must be signed in to change notification settings - Fork 76
/
_pyepics_shim.py
223 lines (170 loc) · 6.23 KB
/
_pyepics_shim.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
import atexit
import ctypes
import epics
import queue
import threading
import warnings
from epics import get_pv as _get_pv, caget, caget, caput
try:
epics.ca.find_libca()
except epics.ca.ChannelAccessException:
thread_class = threading.Thread
else:
thread_class = epics.ca.CAThread
_dispatcher = None
def get_pv(*args, **kwargs):
import epics
kwargs.setdefault('context', epics.ca.current_context())
return _get_pv(*args, **kwargs)
class MonitorDispatcher(epics.ca.CAThread):
'''A monitor dispatcher which works with pyepics
The monitor dispatcher works around having callbacks from libca threads.
Using epics CA calls (caget, caput, etc.) from those callbacks is not
possible without this dispatcher workaround.
... note::
Without `all_contexts` set, only the callbacks that are run with
the same context as the the main thread are affected.
... note::
Ensure that you call epics.ca.use_initial_context() at startup in
the main thread
Parameters
----------
all_contexts : bool, optional
re-route _all_ callbacks from _any_ context to the dispatcher callback
thread
timeout : float, optional
callback_logger : logging.Logger, optional
A logger to notify about failed callbacks
Attributes
----------
main_context : ctypes long
The main CA context
callback_logger : logging.Logger
A logger to notify about failed callbacks
queue : Queue
The event queue
'''
def __init__(self, all_contexts=False, timeout=0.1,
callback_logger=None):
epics.ca.CAThread.__init__(self, name='monitor_dispatcher')
self.daemon = True
self.queue = queue.Queue()
# The dispatcher thread will stop if this event is set
self._stop_event = threading.Event()
self.main_context = epics.ca.current_context()
self.callback_logger = callback_logger
self._all_contexts = bool(all_contexts)
self._timeout = timeout
self.start()
def run(self):
'''The dispatcher itself'''
self._setup_pyepics(True)
while not self._stop_event.is_set():
try:
callback, args, kwargs = self.queue.get(True, self._timeout)
except queue.Empty:
pass
else:
try:
callback(*args, **kwargs)
except Exception as ex:
if self.callback_logger is not None:
self.callback_logger.error(ex, exc_info=ex)
self._setup_pyepics(False)
epics.ca.detach_context()
def stop(self):
'''Stop the dispatcher thread and re-enable normal callbacks'''
self._stop_event.set()
def _setup_pyepics(self, enable):
# Re-route monitor events to our new handler
if enable:
fcn = self._monitor_event
else:
fcn = epics.ca._onMonitorEvent
epics.ca._CB_EVENT = (
ctypes.CFUNCTYPE(None, epics.dbr.event_handler_args)(fcn))
def _monitor_event(self, args):
if (self._all_contexts or
self.main_context == epics.ca.current_context()):
if callable(args.usr):
if (not hasattr(args.usr, '_disp_tag') or
args.usr._disp_tag is not self):
args.usr = lambda orig_cb=args.usr, **kwargs: \
self.queue.put((orig_cb, [], kwargs))
args.usr._disp_tag = self
return epics.ca._onMonitorEvent(args)
def setup(logger):
'''Setup ophyd for use
Must be called once per session using ophyd
'''
try:
epics.ca.find_libca()
# if we can not find libca, then we clearly are not
# going to be using CA threads so no need to install
# the trampoline
except epics.ca.ChannelAccessException:
return
# It's important to use the same context in the callback dispatcher
# as the main thread, otherwise not-so-savvy users will be very
# confused
global _dispatcher
if _dispatcher is not None:
logger.debug('ophyd already setup')
return
def _cleanup():
'''Clean up the ophyd session'''
global _dispatcher
if _dispatcher is None:
return
logger.debug('Performing ophyd cleanup')
if _dispatcher.is_alive():
logger.debug('Joining the dispatcher thread')
_dispatcher.stop()
_dispatcher.join()
_dispatcher = None
logger.debug('Finalizing libca')
epics.ca.finalize_libca()
epics.ca.use_initial_context()
logger.debug('Installing monitor dispatcher')
_dispatcher = MonitorDispatcher()
atexit.register(_cleanup)
return _dispatcher
def get_pv_form(version):
'''Get the PV form that should be used for pyepics
Due to a bug in certain versions of PyEpics, form='time' cannot be used
with some large arrays.
native: gives time.time() timestamps from this machine
time: gives timestamps from the PVs themselves
Returns
-------
{'native', 'time'}
'''
def _fix_git_versioning(in_str):
return in_str.replace('-g', '+g')
def _naive_parse_version(version):
try:
version = version.lower()
# Strip off the release-candidate version number (best-effort)
if 'rc' in version:
version = version[:version.index('rc')]
version_tuple = tuple(int(v) for v in version.split('.'))
except Exception:
return None
return version_tuple
try:
from pkg_resources import parse_version
except ImportError:
parse_version = _naive_parse_version
version = parse_version(_fix_git_versioning(version))
if version is None:
warnings.warn('Unrecognized PyEpics version; using local timestamps',
ImportWarning)
return 'native'
elif version <= parse_version('3.2.3'):
warnings.warn('PyEpics versions <= 3.2.3 will use local timestamps (version: %s)' %
epics.__version__,
ImportWarning)
return 'native'
else:
return 'time'
pv_form = get_pv_form(epics.__version__)