-
Notifications
You must be signed in to change notification settings - Fork 6
/
arm_control.py
444 lines (342 loc) · 14 KB
/
arm_control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
'''Control of the arm for action execution.'''
# ######################################################################
# Imports
# ######################################################################
# Core ROS imports come first.
import rospy
# System builtins
import threading
# ROS builtins
from std_srvs.srv import Empty, EmptyResponse
from tf import TransformListener
# Local
from fetch_arm_control import Arm
from fetch_arm_control.msg import GripperState
from fetch_pbd_interaction.msg import ArmState, ExecutionStatus
from fetch_pbd_interaction.srv import MoveArm, MoveArmResponse, \
GetEEPose, GetEEPoseResponse, \
GetGripperState, \
GetGripperStateResponse, \
SetGripperState, \
SetGripperStateResponse, \
GetJointStates, GetJointStatesResponse, \
GetArmMovement, GetArmMovementResponse, \
MoveArmTraj, MoveArmTrajResponse
# ######################################################################
# Module level constants
# ######################################################################
# The minimum arm movement that 'counts' as moved.
# TODO(mbforbes): This is duplicated in arm.py. Use theirs.
ARM_MOVEMENT_THRESHOLD = 0.04
# How long to sleep between checking whether arms have successfully
# completed their movement.
MOVE_TO_JOINTS_SLEEP_INTERVAL = 0.01 # seconds
# How long to sleep between checking whether arms have successfully
# completed their trajectory.
TRAJECTORY_COMPLETE_SLEEP_INTERVAL = 0.01 # seconds
# How long to sleep between checking whether grippers have finished
# opening/closing.
GRIPPER_FINISH_SLEEP_INTERVAL = 0.01 # seconds
# ######################################################################
# Classes
# ######################################################################
class ArmControl:
'''Higher-level interface that exposes capabilities from
fetch_arm_control/Arm.py class as services.
'''
def __init__(self):
# Initialize arm state.
self._tf_listener = TransformListener()
self._arm = Arm(self._tf_listener)
self._arm.close_gripper()
self._status = ExecutionStatus.NOT_EXECUTING
rospy.Service('move_arm_to_joints_plan', MoveArm,
self._move_to_joints_plan)
rospy.Service('move_arm_to_joints', MoveArmTraj, self._move_to_joints)
rospy.Service('move_arm_to_pose', MoveArm, self._move_to_pose)
rospy.Service('start_move_arm_to_pose', MoveArm,
self._start_move_to_pose)
rospy.Service('is_reachable', MoveArm, self._is_reachable)
rospy.Service('is_arm_moving', GetArmMovement, self._is_arm_moving)
rospy.Service('relax_arm', Empty, self._relax_arm)
rospy.Service('reset_arm_movement_history', Empty,
self._reset_movement_history)
rospy.Service('get_gripper_state', GetGripperState,
self._get_gripper_state)
rospy.Service('get_ee_pose', GetEEPose, self._get_ee_pose)
rospy.Service('get_joint_states', GetJointStates,
self._get_joint_states)
rospy.Service('set_gripper_state', SetGripperState,
self._set_gripper_state)
rospy.loginfo('Arm initialized.')
# ##################################################################
# Instance methods: Public (API)
# ##################################################################
def update(self):
'''Periodic update for the arm.'''
self._arm.update()
# ##################################################################
# Instance methods: Internal ("private")
# ##################################################################
def _reset_movement_history(self, req):
'''
Args:
req (EmptyRequest)
Returns:
EmptyResponse
'''
self._arm.reset_movement_history()
return EmptyResponse()
def _set_gripper_state(self, req):
'''Set gripper to gripper_state
(open or closed).
Args:
req (SetGripperStateRequest)
Returns:
SetGripperStateResponse
'''
gripper_state = req.gripper_state
if gripper_state == self._arm.get_gripper_state():
# Already in that mode; do nothing and return False.
return SetGripperStateResponse()
else:
# Change gripper mode.
if gripper_state == GripperState.OPEN:
self._arm.open_gripper()
else:
self._arm.close_gripper()
return SetGripperStateResponse()
def _is_reachable(self, req):
'''.
Args:
req (MoveArmRequest): The arm's state
Returns:
MoveArmResponse
'''
arm_state = req.arm_state
solution, reachable = self._solve_ik_for_arm(arm_state)
return MoveArmResponse(reachable)
def _solve_ik_for_arm(self, arm_state, z_offset=0.0):
'''Finds an IK solution for a particular arm pose.
Args:
arm_state (ArmState): The arm's state,
z_offset (float, optional): Offset to add to z-values of
pose positions. Defaults to 0.0.
Returns:
(ArmState, bool): Tuple of
the ArmState, which will be an updated ArmState object
with IK solved if an IK solution was found. If IK was
not found, what is returned will depend on whether the
reference frame is relative (ArmState.OBJECT) or
absolute (ArmState.ROBOT_BASE). If the reference frame
is relative and no IK was found, an empty ArmState is
returned. If the reference frame is absolute and no IK
was found, then the original passed arm_state is
returned;
the bool, which is whether an IK solution was
successfully found.
'''
# Ideally we need to find IK only if the frame is relative to an
# object, but users can edit absolute poses in the GUI to make
# them unreachable, so we try IK for absolute poses too.
if arm_state.ref_type == ArmState.OBJECT:
# Arm is relative.
solution = ArmState()
target_pose = self._tf_listener.transformPose(
'base_link', arm_state.ee_pose)
target_pose.pose.position.z = target_pose.pose.position.z + \
z_offset
# Try solving IK.
target_joints = self._arm.get_ik_for_ee(
target_pose, arm_state.joint_pose)
# Check whether solution found.
if target_joints is None:
# No solution: RETURN EMPTY ArmState.
rospy.loginfo('No IK for relative end-effector pose.')
return solution, False
else:
# Found a solution; update the solution arm_state and
# return.
solution.ref_type = ArmState.ROBOT_BASE
solution.ee_pose = target_pose
solution.joint_pose = target_joints
return solution, True
elif arm_state.ref_type == ArmState.ROBOT_BASE:
# Arm is absolute.
target_pose = arm_state.ee_pose
target_pose.pose.position.z = target_pose.pose.position.z + \
z_offset
# Try solving IK.
target_joints = self._arm.get_ik_for_ee(
target_pose, arm_state.joint_pose)
if target_joints is None:
# No IK found; return the original.
rospy.logdebug('No IK for absolute end-effector pose.')
return arm_state, False
else:
# IK found; fill in solution ArmState and return.
solution = ArmState()
solution.ref_type = ArmState.ROBOT_BASE
solution.ee_pose = target_pose
solution.joint_pose = target_joints
return solution, True
else:
return arm_state, True
def _get_joint_states(self, req):
'''Get joint positions.
Args:
req (GetJointStatesRequest)
Returns:
GetJointStatesResponse
'''
return GetJointStatesResponse(self._arm.get_joint_state())
def _get_gripper_state(self, req):
''' Get gripper status on the indicated side.
Args:
req (GetGripperStateRequest)
Returns:
GetGripperStateResponse
'''
return GetGripperStateResponse(self._arm.get_gripper_state())
def _get_ee_pose(self, req):
''' Get current pose of the arm's end-effector on the indicated
side.
Args:
req (GetEEPoseRequest)
Returns:
GetEEPoseResponse
'''
return GetEEPoseResponse(self._arm.get_ee_state())
def _relax_arm(self, req):
''' Turns on gravity comp controller and turns off other controllers
Args:
req (EmptyRequest): Unused
Returns:
EmptyResponse
'''
self._arm.relax_arm()
return EmptyResponse()
def _start_move_to_pose(self, req):
'''Creates a thread for moving to a target pose.
Args:
req (MoveArmRequest): Arm state that contains the pose to
move to.
'''
thread = threading.Thread(
group=None,
target=self._move_to_pose,
args=(req,),
name='move_to_arm_state_thread'
)
thread.start()
# Log
rospy.loginfo('Started thread to move arm.')
return MoveArmResponse(True)
def _move_to_pose(self, req):
'''The thread function that makes the arm move to the
target end-effector pose (within arm_state).
Args:
req (MoveArmRequest): Arm state that contains the pose to
move to.
'''
arm_state = req.arm_state
rospy.loginfo("Move to pose")
self._status = ExecutionStatus.EXECUTING
# Should we transform pose to base_link?
if self._arm.move_to_pose(arm_state.ee_pose):
self._status = ExecutionStatus.SUCCEEDED
success = True
else:
self._status = ExecutionStatus.NO_IK
success = False
self._arm.relax_arm()
return MoveArmResponse(success)
def _move_to_joints_plan(self, req):
'''
Makes the arms move to the joint positions contained in the
passed arm states.
This assumes that the joint positions are valid (i.e. IK has
been called previously to set each ArmState's joints to good
values).
Args:
req (MoveArmRequest)
Returns:
MoveArmResponse: Whether the arms successfully moved to the passed
joint positions.
'''
# Estimate times to get to both poses.
arm_state = req.arm_state
# Move arms to target.
self._status = ExecutionStatus.EXECUTING
suc = self._arm.move_to_joints_plan(arm_state.joint_pose,
arm_state.velocities)
# Wait until both arms complete the trajectory.
while self._arm.is_executing():
rospy.sleep(MOVE_TO_JOINTS_SLEEP_INTERVAL)
rospy.loginfo('\tArms reached target.')
# Verify that both arms succeeded
# DEBUG: remove
if not suc:
self._status = ExecutionStatus.NO_IK
success = False
else:
self._status = ExecutionStatus.SUCCEEDED
success = True
self._arm.relax_arm()
return MoveArmResponse(success)
def _move_to_joints(self, req):
'''
Makes the arms move to the joint positions contained in the
passed arm states.
Note: This moves directly to the joint positions using interpolation
This assumes that the joint positions are valid (i.e. IK has
been called previously to set each ArmState's joints to good
values).
Args:
req (MoveArmRequest)
Returns:
MoveArmResponse: Whether the arms successfully moved to the passed
joint positions.
'''
# Estimate times to get to both poses.
joints = []
# times_to_poses = []
# arm_state = req.arm_state[i]
# time_to_pose = None
# if arm_state is not None:
# time_to_pose = self._arm.get_time_to_pose(arm_state.ee_pose)
for arm_state in req.arm_states:
joints.append(arm_state.joint_pose)
# If both arms are moving, adjust velocities and find most
# moving arm. Look at it.
# Move arms to target.
suc = False
self._status = ExecutionStatus.EXECUTING
suc = self._arm.move_to_joints(
joints, req.times)
# Wait until both arms complete the trajectory.
while self._arm.is_executing():
rospy.sleep(MOVE_TO_JOINTS_SLEEP_INTERVAL)
rospy.loginfo('\tArms reached target.')
# Verify that both arms succeeded
# DEBUG: remove
if not suc:
self._status = ExecutionStatus.NO_IK
success = False
else:
self._status = ExecutionStatus.SUCCEEDED
success = True
self._arm.relax_arm()
return MoveArmTrajResponse(success)
def _is_arm_moving(self, req):
'''
Returns true is arm is moving
Args:
req (IsArmMovingRequest)
Returns:
IsArmMovingResponse: Whether the arm is currently moving
'''
if self._arm.get_movement() < ARM_MOVEMENT_THRESHOLD:
return GetArmMovementResponse(False)
else:
return GetArmMovementResponse(True)