Skip to content
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
executable file 361 lines (311 sloc) 13.1 KB
#!/usr/bin/env python
# Copyright (c) 2013-2018, Rethink Robotics Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
SDK Joint Trajectory Example: file playback
import argparse
import operator
import sys
import threading
from bisect import bisect
from copy import copy
from os import path
import rospy
import actionlib
from control_msgs.msg import (
from trajectory_msgs.msg import (
import intera_interface
from intera_interface import CHECK_VERSION
class Trajectory(object):
def __init__(self, limb="right"):
#create our action server clients
self._limb_client = actionlib.SimpleActionClient(
#verify joint trajectory action servers are available
is_server_up = self._limb_client.wait_for_server(rospy.Duration(10.0))
if not is_server_up:
msg = ("Action server not available."
" Verify action server availability.")
#create our goal request
self.goal = FollowJointTrajectoryGoal()
#limb interface - current angles needed for start move
self.arm = intera_interface.Limb(limb)
self.limb = limb
self.gripper_name = '_'.join([limb, 'gripper'])
#gripper interface - for gripper command playback
self.gripper = intera_interface.Gripper(self.gripper_name)
self.gripper = None
rospy.loginfo("Did not detect a connected electric gripper.")
#flag to signify the arm trajectories have begun executing
self._arm_trajectory_started = False
#reentrant lock to prevent same-thread lockout
self._lock = threading.RLock()
# Verify Grippers Have No Errors and are Calibrated
if self.gripper:
if self.gripper.has_error():
if not self.gripper.is_calibrated():
#gripper goal trajectories
self.grip = FollowJointTrajectoryGoal()
#gripper control rate
self._gripper_rate = 20.0 # Hz
# Timing offset to prevent gripper playback before trajectory has started
self._slow_move_offset = 0.0
self._trajectory_start_offset = rospy.Duration(0.0)
self._trajectory_actual_offset = rospy.Duration(0.0)
#param namespace
self._param_ns = '/rsdk_joint_trajectory_action_server/'
def _execute_gripper_commands(self):
start_time = rospy.get_time() - self._trajectory_actual_offset.to_sec()
grip_cmd = self.grip.trajectory.points
pnt_times = [pnt.time_from_start.to_sec() for pnt in grip_cmd]
end_time = pnt_times[-1]
rate = rospy.Rate(self._gripper_rate)
now_from_start = rospy.get_time() - start_time
while(now_from_start < end_time + (1.0 / self._gripper_rate) and
not rospy.is_shutdown()):
idx = bisect(pnt_times, now_from_start) - 1
if self.gripper:
now_from_start = rospy.get_time() - start_time
def _clean_line(self, line, joint_names):
Cleans a single line of recorded joint positions
@param line: the line described in a list to process
@param joint_names: joint name keys
@return command: returns dictionary {joint: value} of valid commands
@return line: returns list of current line values stripped of commas
def try_float(x):
return float(x)
except ValueError:
return None
#convert the line of strings to a float or None
line = [try_float(x) for x in line.rstrip().split(',')]
#zip the values with the joint names
combined = zip(joint_names[1:], line[1:])
#take out any tuples that have a none value
cleaned = [x for x in combined if x[1] is not None]
#convert it to a dictionary with only valid commands
command = dict(cleaned)
return (command, line,)
def _add_point(self, positions, side, time):
Appends trajectory with new point
@param positions: joint positions
@param side: limb to command point
@param time: time from start for point in seconds
#creates a point in trajectory with time_from_start and positions
point = JointTrajectoryPoint()
point.positions = copy(positions)
point.time_from_start = rospy.Duration(time)
if side == self.limb:
elif self.gripper and side == self.gripper_name:
def parse_file(self, filename):
Parses input file into FollowJointTrajectoryGoal format
@param filename: input filename
#open recorded file
with open(filename, 'r') as f:
lines = f.readlines()
#read joint names specified in file
joint_names = lines[0].rstrip().split(',')
#parse joint names for right limb
for name in joint_names:
if self.limb == name[:-3]:
def find_start_offset(pos):
#create empty lists
cur = []
cmd = []
dflt_vel = []
vel_param = self._param_ns + "%s_default_velocity"
#for all joints find our current and first commanded position
#reading default velocities from the parameter server if specified
for name in joint_names:
if self.limb == name[:-3]:
prm = rospy.get_param(vel_param % name, 0.25)
diffs = map(operator.sub, cmd, cur)
diffs = map(operator.abs, diffs)
#determine the largest time offset necessary across all joints
offset = max(map(operator.div, diffs, dflt_vel))
return offset
for idx, values in enumerate(lines[1:]):
#clean each line of file
cmd, values = self._clean_line(values, joint_names)
#find allowable time offset for move to start position
if idx == 0:
# Set the initial position to be the current pose.
# This ensures we move slowly to the starting point of the
# trajectory from the current pose - The user may have moved
# arm since recording
cur_cmd = [self.arm.joint_angle(jnt) for jnt in self.goal.trajectory.joint_names]
self._add_point(cur_cmd, self.limb, 0.0)
start_offset = find_start_offset(cmd)
# Gripper playback won't start until the starting movement's
# duration has passed, and the actual trajectory playback begins
self._slow_move_offset = start_offset
self._trajectory_start_offset = rospy.Duration(start_offset + values[0])
#add a point for this set of commands with recorded time
cur_cmd = [cmd[jnt] for jnt in self.goal.trajectory.joint_names]
self._add_point(cur_cmd, self.limb, values[0] + start_offset)
if self.gripper:
cur_cmd = [cmd[self.gripper_name]]
self._add_point(cur_cmd, self.gripper_name, values[0] + start_offset)
def _feedback(self, data):
# Test to see if the actual playback time has exceeded
# the move-to-start-pose timing offset
if (not self._get_trajectory_flag() and
data.actual.time_from_start >= self._trajectory_start_offset):
self._trajectory_actual_offset = data.actual.time_from_start
def _set_trajectory_flag(self, value=False):
with self._lock:
# Assign a value to the flag
self._arm_trajectory_started = value
def _get_trajectory_flag(self):
temp_flag = False
with self._lock:
# Copy to external variable
temp_flag = self._arm_trajectory_started
return temp_flag
def start(self):
Sends FollowJointTrajectoryAction request
self._limb_client.send_goal(self.goal, feedback_cb=self._feedback)
# Syncronize playback by waiting for the trajectories to start
while not rospy.is_shutdown() and not self._get_trajectory_flag():
if self.gripper:
def stop(self):
Preempts trajectory execution by sending cancel goals
if ( is not None and
self._limb_client.get_state() == actionlib.GoalStatus.ACTIVE):
#delay to allow for terminating handshake
def wait(self):
Waits for and verifies trajectory execution result
#create a timeout for our trajectory execution
#total time trajectory expected for trajectory execution plus a buffer
last_time = self.goal.trajectory.points[-1].time_from_start.to_sec()
time_buffer = rospy.get_param(self._param_ns + 'goal_time', 0.0) + 1.5
timeout = rospy.Duration(self._slow_move_offset +
last_time +
finish = self._limb_client.wait_for_result(timeout)
result = (self._limb_client.get_result().error_code == 0)
#verify result
if all([finish, result]):
return True
msg = ("Trajectory action failed or did not finish before "
return False
def main():
"""SDK Joint Trajectory Example: File Playback
Plays back joint positions honoring timestamps recorded
via the joint_recorder example.
Run the example first to create a recording
file for use with this example. Then make sure to start the
joint_trajectory_action_server before running this example.
This example will use the joint trajectory action server
with velocity control to follow the positions and times of
the recorded motion, accurately replicating movement speed
necessary to hit each trajectory point on time.
epilog = """
Related examples:;
rp = intera_interface.RobotParams()
valid_limbs = rp.get_limb_names()
if not valid_limbs:
rp.log_message(("Cannot detect any limb parameters on this robot. "
"Exiting."), "ERROR")
arg_fmt = argparse.RawDescriptionHelpFormatter
parser = argparse.ArgumentParser(formatter_class=arg_fmt,
'-l', '--limb', choices=valid_limbs, default=valid_limbs[0],
help='send joint trajectory to which limb'
'-f', '--file', metavar='PATH', required=True,
help='path to input file'
'-n', '--number_loops', type=int, default=1,
help='number of playback loops. 0=infinite.'
# remove ROS args and filename (sys.arv[0]) for argparse
args = parser.parse_args(rospy.myargv()[1:])
print("Initializing node... ")
print("Getting robot state... ")
rs = intera_interface.RobotEnable(CHECK_VERSION)
print("Enabling robot... ")
print("Running. Ctrl-c to quit")
traj = Trajectory(args.limb)
#for safe interrupt handling
result = True
loop_cnt = 1
loopstr = str(args.number_loops)
if args.number_loops == 0:
args.number_loops = float('inf')
loopstr = "forever"
while (result == True and loop_cnt <= args.number_loops
and not rospy.is_shutdown()):
print("Playback loop %d of %s" % (loop_cnt, loopstr,))
result = traj.wait()
loop_cnt = loop_cnt + 1
print("Exiting - File Playback Complete")
if __name__ == "__main__":
You can’t perform that action at this time.