-
Notifications
You must be signed in to change notification settings - Fork 76
/
epics_motor.py
366 lines (302 loc) · 12 KB
/
epics_motor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
import logging
from .utils.epics_pvs import fmt_time
from .signal import EpicsSignal, EpicsSignalRO
from .utils import DisconnectedError
from .utils.epics_pvs import raise_if_disconnected, AlarmSeverity
from .positioner import PositionerBase
from .device import Device, Component as Cpt, required_for_connection
from .status import wait as status_wait
from enum import Enum
logger = logging.getLogger(__name__)
class HomeEnum(str, Enum):
forward = "forward"
reverse = "reverse"
class EpicsMotor(Device, PositionerBase):
"""An EPICS motor record, wrapped in a :class:`Positioner`
Keyword arguments are passed through to the base class, Positioner
Parameters
----------
prefix : str
The record to use
read_attrs : sequence of attribute names
The signals to be read during data acquisition (i.e., in read() and
describe() calls)
name : str, optional
The name of the device
parent : instance or None
The instance of the parent device, if applicable
settle_time : float, optional
The amount of time to wait after moves to report status completion
timeout : float, optional
The default timeout to use for motion requests, in seconds.
"""
# position
user_readback = Cpt(EpicsSignalRO, ".RBV", kind="hinted", auto_monitor=True)
user_setpoint = Cpt(EpicsSignal, ".VAL", limits=True, auto_monitor=True)
# calibration dial <-> user
user_offset = Cpt(EpicsSignal, ".OFF", kind="config", auto_monitor=True)
user_offset_dir = Cpt(EpicsSignal, ".DIR", kind="config", auto_monitor=True)
offset_freeze_switch = Cpt(EpicsSignal, ".FOFF", kind="omitted", auto_monitor=True)
set_use_switch = Cpt(EpicsSignal, ".SET", kind="omitted", auto_monitor=True)
# configuration
velocity = Cpt(EpicsSignal, ".VELO", kind="config", auto_monitor=True)
acceleration = Cpt(EpicsSignal, ".ACCL", kind="config", auto_monitor=True)
motor_egu = Cpt(EpicsSignal, ".EGU", kind="config", auto_monitor=True)
# motor status
motor_is_moving = Cpt(EpicsSignalRO, ".MOVN", kind="omitted", auto_monitor=True)
motor_done_move = Cpt(EpicsSignalRO, ".DMOV", kind="omitted", auto_monitor=True)
high_limit_switch = Cpt(EpicsSignalRO, ".HLS", kind="omitted", auto_monitor=True)
low_limit_switch = Cpt(EpicsSignalRO, ".LLS", kind="omitted", auto_monitor=True)
high_limit_travel = Cpt(EpicsSignal, ".HLM", kind="omitted", auto_monitor=True)
low_limit_travel = Cpt(EpicsSignal, ".LLM", kind="omitted", auto_monitor=True)
direction_of_travel = Cpt(EpicsSignal, ".TDIR", kind="omitted", auto_monitor=True)
# commands
motor_stop = Cpt(EpicsSignal, '.STOP', kind='omitted')
home_forward = Cpt(EpicsSignal, '.HOMF', kind='omitted')
home_reverse = Cpt(EpicsSignal, '.HOMR', kind='omitted')
# alarm information
tolerated_alarm = AlarmSeverity.NO_ALARM
def __init__(self, prefix='', *, name, kind=None, read_attrs=None,
configuration_attrs=None, parent=None, **kwargs):
super().__init__(prefix=prefix, name=name, kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent, **kwargs)
# Make the default alias for the user_readback the name of the
# motor itself.
self.user_readback.name = self.name
def on_limit_changed(value, old_value, **kwargs):
"""
update EpicsSignal object when a limit CA monitor received from EPICS
"""
if (
self.connected
and old_value is not None
and value != old_value
):
self.user_setpoint._metadata_changed(
self.user_setpoint.pvname,
self.user_setpoint._read_pv.get_ctrlvars(),
from_monitor=True,
update=True,
)
self.low_limit_travel.subscribe(on_limit_changed)
self.high_limit_travel.subscribe(on_limit_changed)
@property
def precision(self):
'''The precision of the readback PV, as reported by EPICS'''
return self.user_readback.precision
@property
def egu(self):
'''The engineering units (EGU) for a position'''
return self.motor_egu.get()
@property
@raise_if_disconnected
def limits(self):
return self.user_setpoint.limits
@property
@raise_if_disconnected
def moving(self):
'''Whether or not the motor is moving
Returns
-------
moving : bool
'''
return bool(self.motor_is_moving.get(use_monitor=False))
@raise_if_disconnected
def stop(self, *, success=False):
self.motor_stop.put(1, wait=False)
super().stop(success=success)
@raise_if_disconnected
def move(self, position, wait=True, **kwargs):
'''Move to a specified position, optionally waiting for motion to
complete.
Parameters
----------
position
Position to move to
moved_cb : callable
Call this callback when movement has finished. This callback must
accept one keyword argument: 'obj' which will be set to this
positioner instance.
timeout : float, optional
Maximum time to wait for the motion. If None, the default timeout
for this positioner is used.
Returns
-------
status : MoveStatus
Raises
------
TimeoutError
When motion takes longer than `timeout`
ValueError
On invalid positions
RuntimeError
If motion fails other than timing out
'''
self._started_moving = False
status = super().move(position, **kwargs)
self.user_setpoint.put(position, wait=False)
try:
if wait:
status_wait(status)
except KeyboardInterrupt:
self.stop()
raise
return status
@property
@raise_if_disconnected
def position(self):
'''The current position of the motor in its engineering units
Returns
-------
position : float
'''
return self._position
@raise_if_disconnected
def set_current_position(self, pos):
'''Configure the motor user position to the given value
Parameters
----------
pos
Position to set.
'''
self.set_use_switch.put(1, wait=True)
try:
self.user_setpoint.put(pos, wait=True, force=True)
finally:
self.set_use_switch.put(0, wait=True)
@raise_if_disconnected
def home(self, direction, wait=True, **kwargs):
'''Perform the default homing function in the desired direction
Parameters
----------
direction : HomeEnum
Direction in which to perform the home search.
'''
direction = HomeEnum(direction)
self._started_moving = False
position = (self.low_limit + self.high_limit) / 2
status = super().move(position, **kwargs)
if direction == HomeEnum.forward:
self.home_forward.put(1, wait=False)
else:
self.home_reverse.put(1, wait=False)
try:
if wait:
status_wait(status)
except KeyboardInterrupt:
self.stop()
raise
return status
def check_value(self, pos):
'''Check that the position is within the soft limits'''
self.user_setpoint.check_value(pos)
@required_for_connection
@user_readback.sub_value
def _pos_changed(self, timestamp=None, value=None, **kwargs):
'''Callback from EPICS, indicating a change in position'''
self._set_position(value)
@required_for_connection
@motor_done_move.sub_value
def _move_changed(self, timestamp=None, value=None, sub_type=None,
**kwargs):
'''Callback from EPICS, indicating that movement status has changed'''
was_moving = self._moving
self._moving = (value != 1)
started = False
if not self._started_moving:
started = self._started_moving = (not was_moving and self._moving)
self.log.debug('[ts=%s] %s moving: %s (value=%s)', fmt_time(timestamp),
self, self._moving, value)
if started:
self._run_subs(sub_type=self.SUB_START, timestamp=timestamp,
value=value, **kwargs)
if was_moving and not self._moving:
success = True
# Check if we are moving towards the low limit switch
if self.direction_of_travel.get() == 0:
if self.low_limit_switch.get(use_monitor=False) == 1:
success = False
# No, we are going to the high limit switch
else:
if self.high_limit_switch.get(use_monitor=False) == 1:
success = False
# Check the severity of the alarm field after motion is complete.
# If there is any alarm at all warn the user, and if the alarm is
# greater than what is tolerated, mark the move as unsuccessful
severity = self.user_readback.alarm_severity
if severity != AlarmSeverity.NO_ALARM:
status = self.user_readback.alarm_status
if severity > self.tolerated_alarm:
self.log.error('Motion failed: %s is in an alarm state '
'status=%s severity=%s',
self.name, status, severity)
success = False
else:
self.log.warning('Motor %s raised an alarm during motion '
'status=%s severity %s',
self.name, status, severity)
self._done_moving(success=success, timestamp=timestamp,
value=value)
@property
def report(self):
try:
rep = super().report
except DisconnectedError:
# TODO there might be more in this that gets lost
rep = {'position': 'disconnected'}
rep['pv'] = self.user_readback.pvname
return rep
def get_lim(self, flag):
'''
Returns the travel limit of motor
* flag > 0: returns high limit
* flag < 0: returns low limit
* flag == 0: returns None
Included here for compatibility with similar with SPEC command.
Parameters
----------
high : float
Limit of travel in the positive direction.
low : float
Limit of travel in the negative direction.
'''
if flag > 0:
return self.high_limit_travel.get()
elif flag < 0:
return self.low_limit_travel.get()
def set_lim(self, low, high):
'''
Sets the low and high travel limits of motor
* No action taken if motor is moving.
* Low limit is set to lesser of (low, high)
* High limit is set to greater of (low, high)
Included here for compatibility with similar with SPEC command.
Parameters
----------
high : float
Limit of travel in the positive direction.
low : float
Limit of travel in the negative direction.
'''
if not self.moving:
# update EPICS
lo = min(low, high)
hi = max(low, high)
if lo <= self.position <= hi:
self.high_limit_travel.put(hi)
self.low_limit_travel.put(lo)
# and ophyd metadata dictionary will update via CA monitor
else:
self.log.debug(
"Could not set motor limits to (%f, %f) at position %g",
low,
high,
self.position
)
class MotorBundle(Device):
"""Sub-class this to device a bundle of motors
This provides better default behavior for :ref:``hints``.
"""
...