Skip to content

Commit

Permalink
add rostest
Browse files Browse the repository at this point in the history
  • Loading branch information
T045T committed Nov 15, 2017
1 parent 249671d commit 6d03ae7
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 0 deletions.
8 changes: 8 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,11 @@ install(DIRECTORY include/
FILES_MATCHING PATTERN "*.h" PATTERN "*.hpp"
)

#############
## Rostest ##
#############
if(CATKIN_ENABLE_TESTING)
find_package(rostest REQUIRED)

add_rostest(test/test_tf2_web_republisher.test)
endif()
3 changes: 3 additions & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@
<run_depend>tf2_ros</run_depend>
<run_depend>tf</run_depend>
<run_depend>message_runtime</run_depend>

<test_depend>rospy</test_depend>
<test_depend>rostest</test_depend>
</package>
42 changes: 42 additions & 0 deletions src/test/dummy_transform_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#! /usr/bin/env python
import geometry_msgs.msg
import rospy
import tf2_ros

if __name__ == '__main__':
"""
Broadcast a transform from parent_frame to target_frame forever,
with changing translation.
"""

rospy.init_node('dummy_transform_publisher', anonymous=True)
br = tf2_ros.TransformBroadcaster()
current_transform = geometry_msgs.msg.TransformStamped()
current_transform.header.frame_id = rospy.get_param('~parent_frame')
current_transform.child_frame_id = rospy.get_param('~child_frame')
# Ensure rotation quaternion is well-formed
current_transform.transform.rotation.w = 1.0

osc_rate_hz = rospy.get_param('~osc_rate')
update_rate_hz = rospy.get_param('~update_rate')
# we oscillate between 0 and 1, so this is our step size
# for osc_rate = 1Hz, update_rate = 60Hz -> 1/60
step_size = (1.0 * osc_rate_hz) / update_rate_hz
update_rate = rospy.Rate(update_rate_hz)
counting_up = True

while not rospy.is_shutdown():
current_transform.header.stamp = rospy.Time.now()
if counting_up:
current_transform.transform.translation.x += step_size
if current_transform.transform.translation.x >= 1.0:
counting_up = False
else:
current_transform.transform.translation.x -= step_size
if current_transform.transform.translation.x <= 0.0:
counting_up = True
br.sendTransform(current_transform)
update_rate.sleep()



72 changes: 72 additions & 0 deletions src/test/test_tf_web_republisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env python
PKG = 'test_tf2_web_republisher'

import actionlib
import rospy
from tf2_web_republisher.msg import TFSubscriptionAction, TFSubscriptionGoal, TFArray
from tf2_web_republisher.srv import RepublishTFs, RepublishTFsRequest

import sys
import unittest


class TestTfRepublisher(unittest.TestCase):
def setUp(self):
self.msgs_received = 0

def transform_cb(self, _):
self.msgs_received += 1

"""
Test the action interface for tf2_web_republisher.
"""
def test_action(self):
client = actionlib.SimpleActionClient('tf2_web_republisher',
TFSubscriptionAction)
client.wait_for_server(timeout=rospy.Duration(2))
goal = TFSubscriptionGoal(
source_frames=['foo', 'bar'],
target_frame='world',
angular_thres=0.1,
trans_thres=0.05,
rate=2.0)

client.send_goal(goal, done_cb=None, active_cb=None,
feedback_cb=self.transform_cb)
rospy.sleep(1.3)
client.cancel_goal()
# We should have gotten two feedback messages by now
self.assertEquals(2, self.msgs_received)
rospy.sleep(1.0)
# We cancelled, so we expect no further feedback
self.assertEquals(2, self.msgs_received)

def test_service(self):
self.assertEquals(0, self.msgs_received)
rospy.wait_for_service('republish_tfs', 2.0)
proxy = rospy.ServiceProxy('republish_tfs', RepublishTFs)
result = proxy(RepublishTFsRequest(source_frames=['foo', 'bar'],
target_frame='world',
angular_thres=0.1,
trans_thres=0.05,
rate=2,
timeout=rospy.Duration(1.0)))
sub = rospy.Subscriber(result.topic_name,
TFArray,
self.transform_cb)
rospy.sleep(1.3)
self.assertTrue(any([topic_tuple[0] == result.topic_name
for topic_tuple in rospy.get_published_topics()]),
msg=str(rospy.get_published_topics()))
sub.unregister()
self.assertEquals(2, self.msgs_received)
rospy.sleep(2.0)
self.assertFalse(any([topic_tuple[0] == result.topic_name
for topic_tuple in rospy.get_published_topics()]))


if __name__ == '__main__':
import rostest
rospy.init_node('test_tf_web_republisher')
rostest.rosrun(PKG, 'test_tf2_web_republisher', TestTfRepublisher)

22 changes: 22 additions & 0 deletions test/test_tf2_web_republisher.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<launch>
<node pkg="tf2_web_republisher" type="dummy_transform_publisher.py"
name="dummy1">
<param name="parent_frame" value="world" />
<param name="child_frame" value="foo" />
<param name="osc_rate" value="1.0" />
<param name="update_rate" value="60" />
</node>
<node pkg="tf2_web_republisher" type="dummy_transform_publisher.py"
name="dummy2">
<param name="parent_frame" value="world" />
<param name="child_frame" value="bar" />
<param name="osc_rate" value="2.0" />
<param name="update_rate" value="60" />
</node>
<node name="tf2_web_republisher"
pkg="tf2_web_republisher"
type="tf2_web_republisher" />
<test test-name="test_tf2_web_republisher"
pkg="tf2_web_republisher"
type="test_tf_web_republisher.py" />
</launch>

0 comments on commit 6d03ae7

Please sign in to comment.