-
Notifications
You must be signed in to change notification settings - Fork 4
/
aruco_tag_locator.py
212 lines (173 loc) · 8.59 KB
/
aruco_tag_locator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
#!/usr/bin/env python3
# Import modules
import rclpy
import time
import tf2_ros
from tf2_ros import TransformException
from rclpy.time import Time
from math import pi
# Import hello_misc script for handling trajectory goals with an action client
import hello_helpers.hello_misc as hm
# We're going to subscribe to a JointState message type, so we need to import
# the definition for it
from sensor_msgs.msg import JointState
# Import the FollowJointTrajectory from the control_msgs.action package to
# control the Stretch robot
from control_msgs.action import FollowJointTrajectory
# Import JointTrajectoryPoint from the trajectory_msgs package to define
# robot trajectories
from trajectory_msgs.msg import JointTrajectoryPoint
# Import TransformStamped from the geometry_msgs package for the publisher
from geometry_msgs.msg import TransformStamped
class LocateArUcoTag(hm.HelloNode):
"""
A class that actuates the RealSense camera to find the docking station's
ArUco tag and returns a Transform between the `base_link` and the requested tag.
"""
def __init__(self):
"""
A function that initializes the subscriber and other needed variables.
:param self: The self reference.
"""
# Initialize the inhereted hm.Hellonode class
hm.HelloNode.__init__(self)
hm.HelloNode.main(self, 'aruco_tag_locator', 'aruco_tag_locator', wait_for_first_pointcloud=False)
# Initialize subscriber
self.joint_states_sub = self.create_subscription(JointState, '/stretch/joint_states', self.joint_states_callback, 1)
# Initialize publisher
self.transform_pub = self.create_publisher(TransformStamped, 'ArUco_transform', 10)
# Initialize the variable that will store the joint state positions
self.joint_state = None
# Provide the min and max joint positions for the head pan. These values
# are needed for sweeping the head to search for the ArUco tag
self.min_pan_position = -3.8
self.max_pan_position = 1.50
# Define the number of steps for the sweep, then create the step size for
# the head pan joint
self.pan_num_steps = 10
self.pan_step_size = abs(self.min_pan_position - self.max_pan_position)/self.pan_num_steps
# Define the min tilt position, number of steps, and step size
self.min_tilt_position = -0.75
self.tilt_num_steps = 3
self.tilt_step_size = pi/16
# Define the head actuation rotational velocity
self.rot_vel = 0.5 # radians per sec
def joint_states_callback(self, msg):
"""
A callback function that stores Stretch's joint states.
:param self: The self reference.
:param msg: The JointState message type.
"""
self.joint_state = msg
def send_command(self, command):
'''
Handles single joint control commands by constructing a FollowJointTrajectoryGoal
message and sending it to the trajectory_client created in hello_misc.
:param self: The self reference.
:param command: A dictionary message type.
'''
if (self.joint_state is not None) and (command is not None):
# Extract the string value from the `joint` key
joint_name = command['joint']
# Set trajectory_goal as a FollowJointTrajectory.Goal and define
# the joint name
trajectory_goal = FollowJointTrajectory.Goal()
trajectory_goal.trajectory.joint_names = [joint_name]
# Create a JointTrajectoryPoint message type
point = JointTrajectoryPoint()
# Check to see if `delta` is a key in the command dictionary
if 'delta' in command:
# Get the current position of the joint and add the delta as a
# new position value
joint_index = self.joint_state.name.index(joint_name)
joint_value = self.joint_state.position[joint_index]
delta = command['delta']
new_value = joint_value + delta
point.positions = [new_value]
# Check to see if `position` is a key in the command dictionary
elif 'position' in command:
# extract the head position value from the `position` key
point.positions = [command['position']]
# Set the rotational velocity
point.velocities = [self.rot_vel]
# Assign goal position with updated point variable
trajectory_goal.trajectory.points = [point]
# Specify the coordinate frame that we want (base_link) and set the time to be now.
trajectory_goal.trajectory.header.stamp = self.get_clock().now().to_msg()
trajectory_goal.trajectory.header.frame_id = 'base_link'
# Make the action call and send the goal. The last line of code waits
# for the result
self.trajectory_client.send_goal(trajectory_goal)
def find_tag(self, tag_name='docking_station'):
"""
A function that actuates the camera to search for a defined ArUco tag
marker. Then the function returns the pose.
:param self: The self reference.
:param tag_name: A string value of the ArUco marker name.
:returns transform: The docking station's TransformStamped message.
"""
# Create dictionaries to get the head in its initial position
pan_command = {'joint': 'joint_head_pan', 'position': self.min_pan_position}
self.send_command(pan_command)
tilt_command = {'joint': 'joint_head_tilt', 'position': self.min_tilt_position}
self.send_command(tilt_command)
# Nested for loop to sweep the joint_head_pan and joint_head_tilt in increments
for i in range(self.tilt_num_steps):
for j in range(self.pan_num_steps):
# Update the joint_head_pan position by the pan_step_size
pan_command = {'joint': 'joint_head_pan', 'delta': self.pan_step_size}
self.send_command(pan_command)
# Give time for system to do a Transform lookup before next step
time.sleep(0.2)
# Use a try-except block
try:
now = Time()
# Look up transform between the base_link and requested ArUco tag
transform = self.tf_buffer.lookup_transform('base_link',
tag_name,
now)
self.get_logger().info("Found Requested Tag: \n%s", transform)
# Publish the transform
self.transform_pub.publish(transform)
# Return the transform
return transform
except TransformException as ex:
continue
# Begin sweep with new tilt angle
pan_command = {'joint': 'joint_head_pan', 'position': self.min_pan_position}
self.send_command(pan_command)
tilt_command = {'joint': 'joint_head_tilt', 'delta': self.tilt_step_size}
self.send_command(tilt_command)
time.sleep(0.25)
# Notify that the requested tag was not found
self.get_logger().info("The requested tag '%s' was not found", tag_name)
def main(self):
"""
Function that initiates the issue_command function.
:param self: The self reference.
"""
# Create a StaticTranformBoradcaster Node. Also, start a Tf buffer that
# will store the tf information for a few seconds.Then set up a tf listener, which
# will subscribe to all of the relevant tf topics, and keep track of the information
self.static_broadcaster = tf2_ros.StaticTransformBroadcaster(self)
self.tf_buffer = tf2_ros.Buffer()
self.listener = tf2_ros.TransformListener(self.tf_buffer, self)
# Give the listener some time to accumulate transforms
time.sleep(1.0)
# Notify Stretch is searching for the ArUco tag with `get_logger().info()`
self.get_logger().info('Searching for docking ArUco tag')
# Search for the ArUco marker for the docking station
pose = self.find_tag("docking_station")
def main():
try:
# Instantiate the `LocateArUcoTag()` object
node = LocateArUcoTag()
# Run the `main()` method
node.main()
node.new_thread.join()
except:
node.get_logger().info('Interrupt received, so shutting down')
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()