Skip to content

Commit

Permalink
Add smoke test adapted from ROS 1 smoke test (#585)
Browse files Browse the repository at this point in the history
Re-enable the [smoke test from develop](https://github.com/RobotWebTools/rosbridge_suite/blob/a09a964fb5956321aca3b296da367e21d3d2e044/rosbridge_server/test/websocket/test_smoke.py) on the ROS 2 branch, along with various fixes for compatibility with older ROS 2 versions.

- Uses [launch_testing](https://github.com/ros2/launch/blob/master/launch_testing/README.md) instead of a `.test` xml file.
- The server now allows choosing `port:=0` to select an ephemeral port, and sets the port number in the `actual_port` ROS param (similar to the Autobahn server implementation on develop). The test uses the [`GetParameters` service](https://docs.ros2.org/foxy/api/rcl_interfaces/srv/GetParameters.html) to retrieve the port number.

I used various tricks to get  `async`/`await` working in the test. This was difficult because [rclpy has its own executor](ros2/rclpy#279) that doesn't interoperate with other event loops (like twisted or asyncio). Since the test uses both ROS publishers/subscribers and a Twisted WebSocket client, both event loops are needed, so I ran `rclpy.spin_until_future_complete` in a thread managed by Twisted. Mostly this worked nicely, but required care to use `reactor.callFromThread` in certain places, and required some explicit `executor.wake()` since the rclpy executor didn't automatically wake up when futures were resolved.
  • Loading branch information
jtbandes committed Aug 12, 2021
1 parent db81054 commit 0751df3
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 18 deletions.
7 changes: 6 additions & 1 deletion rosbridge_server/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.5)
project(rosbridge_server NONE)
project(rosbridge_server)

find_package(ament_cmake_ros REQUIRED)
find_package(ament_cmake_core REQUIRED)
Expand All @@ -26,3 +26,8 @@ install(FILES
launch/rosbridge_udp_launch.xml
DESTINATION share/${PROJECT_NAME}/launch
)

if(BUILD_TESTING)
find_package(launch_testing_ament_cmake REQUIRED)
add_launch_test(test/websocket/smoke.test.py)
endif()
2 changes: 1 addition & 1 deletion rosbridge_server/launch/rosbridge_websocket_launch.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

<arg name="fragment_timeout" default="600" />
<arg name="delay_between_messages" default="0" />
<arg name="max_message_size" default="Infinity" />
<arg name="max_message_size" default="10000000" />
<arg name="unregister_timeout" default="10.0" />

<arg name="use_compression" default="false" />
Expand Down
7 changes: 7 additions & 0 deletions rosbridge_server/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@
<exec_depend>rosapi</exec_depend>
<exec_depend>rosauth</exec_depend>

<test_depend>python3-autobahn</test_depend>
<test_depend>launch</test_depend>
<test_depend>launch_ros</test_depend>
<test_depend>launch_testing</test_depend>
<test_depend>launch_testing_ros</test_depend>
<test_depend>launch_testing_ament_cmake</test_depend>

<export>
<build_type>ament_cmake</build_type>
</export>
Expand Down
29 changes: 13 additions & 16 deletions rosbridge_server/scripts/rosbridge_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,16 @@

from socket import error

from threading import Thread
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from tornado.ioloop import PeriodicCallback
from tornado.netutil import bind_sockets
from tornado.web import Application

import rclpy
from rclpy.node import Node
from rclpy.parameter import Parameter
from rclpy.qos import QoSProfile, QoSDurabilityPolicy
from std_msgs.msg import Int32
from rcl_interfaces.msg import ParameterDescriptor

from rosbridge_server import RosbridgeWebSocket, ClientManager

Expand Down Expand Up @@ -88,16 +87,13 @@ def __init__(self):
'delay_between_messages', RosbridgeWebSocket.delay_between_messages).value

RosbridgeWebSocket.max_message_size = self.declare_parameter(
'max_message_size', RosbridgeWebSocket.max_message_size, ParameterDescriptor(dynamic_typing=True)).value
'max_message_size', RosbridgeWebSocket.max_message_size).value

RosbridgeWebSocket.unregister_timeout = self.declare_parameter(
'unregister_timeout', RosbridgeWebSocket.unregister_timeout).value

bson_only_mode = self.declare_parameter('bson_only_mode', False).value

if RosbridgeWebSocket.max_message_size == "None":
RosbridgeWebSocket.max_message_size = None

# get tornado application parameters
tornado_settings = {}
tornado_settings['websocket_ping_interval'] = float(self.declare_parameter('websocket_ping_interval', 0).value)
Expand Down Expand Up @@ -190,12 +186,9 @@ def __init__(self):
idx = sys.argv.index("--max_message_size") + 1
if idx < len(sys.argv):
value = sys.argv[idx]
if value == "None":
RosbridgeWebSocket.max_message_size = None
else:
RosbridgeWebSocket.max_message_size = int(value)
RosbridgeWebSocket.max_message_size = int(value)
else:
print("--max_message_size argument provided without a value. (can be None or <Integer>)")
print("--max_message_size argument provided without a value. (can be <Integer>)")
sys.exit(-1)

if "--unregister_timeout" in sys.argv:
Expand Down Expand Up @@ -281,11 +274,15 @@ def __init__(self):
connected = False
while not connected and self.context.ok():
try:
ssl_options = None
if certfile is not None and keyfile is not None:
application.listen(port, address, ssl_options={ "certfile": certfile, "keyfile": keyfile})
else:
application.listen(port, address)
self.get_logger().info("Rosbridge WebSocket server started on port {}".format(port))
ssl_options = { "certfile": certfile, "keyfile": keyfile }
sockets = bind_sockets(port, address)
actual_port = sockets[0].getsockname()[1]
server = HTTPServer(application, ssl_options=ssl_options)
server.add_sockets(sockets)
self.declare_parameter("actual_port", actual_port)
self.get_logger().info("Rosbridge WebSocket server started on port {}".format(actual_port))
connected = True
except error as e:
self.get_logger().warn(
Expand Down
210 changes: 210 additions & 0 deletions rosbridge_server/test/websocket/smoke.test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
#!/usr/bin/env python
import json
import sys
import unittest

from autobahn.twisted.websocket import WebSocketClientFactory, WebSocketClientProtocol
from twisted.internet import reactor
from twisted.internet.endpoints import TCP4ClientEndpoint
from twisted.python import log

import launch
import launch.actions
import launch_ros
import launch_ros.actions
import rclpy
import rclpy.task
from rcl_interfaces.srv import GetParameters
from rclpy.executors import SingleThreadedExecutor
from std_msgs.msg import String

log.startLogging(sys.stderr)

# For consistency, the number of messages must not exceed the the protocol
# Subscriber queue_size.
NUM_MSGS = 10
MSG_SIZE = 10
A_TOPIC = "/a_topic"
B_TOPIC = "/b_topic"
A_STRING = "A" * MSG_SIZE
B_STRING = "B" * MSG_SIZE
WARMUP_DELAY = 1.0 # seconds
TIME_LIMIT = 5.0 # seconds


def generate_test_description(ready_fn):
try:
node = launch_ros.actions.Node(
executable="rosbridge_websocket",
package="rosbridge_server",
parameters=[{"port": 0}],
)
except TypeError:
# Deprecated keyword arg node_executable: https://github.com/ros2/launch_ros/pull/140
node = launch_ros.actions.Node(
node_executable="rosbridge_websocket",
package="rosbridge_server",
parameters=[{"port": 0}],
)

return launch.LaunchDescription(
[
node,
launch.actions.OpaqueFunction(function=lambda context: ready_fn()),
]
)


class TestClientProtocol(WebSocketClientProtocol):
def __init__(self, *args, **kwargs):
self.received = []
self.connected_future = rclpy.task.Future()
super().__init__(*args, **kwargs)

def onOpen(self):
self.connected_future.set_result(None)

def sendDict(self, msg_dict, times=1):
msg = json.dumps(msg_dict).encode("utf-8")
for _ in range(times):
print(f"WebSocket client sent message: {msg}")
self.sendMessage(msg)

def onMessage(self, payload, binary):
print(f"WebSocket client received message: {payload}")
self.received.append(payload)


class TestWebsocketSmoke(unittest.TestCase):
@classmethod
def setUpClass(cls):
rclpy.init()

@classmethod
def tearDownClass(cls):
rclpy.shutdown()

def setUp(self):
self.executor = SingleThreadedExecutor()
self.node = rclpy.create_node("websocket_smoke_test")
self.executor.add_node(self.node)

def tearDown(self):
self.node.destroy_node()

async def get_server_port(self):
"""
Returns the port which the WebSocket server is running on
"""
client = self.node.create_client(
GetParameters, "/rosbridge_websocket/get_parameters"
)
try:
if not client.wait_for_service(5):
raise RuntimeError("GetParameters service not available")
port_param = await client.call_async(
GetParameters.Request(names=["actual_port"])
)
return port_param.values[0].integer_value
finally:
self.node.destroy_client(client)

async def connect_to_server(self):
port = await self.get_server_port()
factory = WebSocketClientFactory("ws://127.0.0.1:" + str(port))
factory.protocol = TestClientProtocol

future = rclpy.task.Future()
future.add_done_callback(lambda _: self.executor.wake())

def connect():
TCP4ClientEndpoint(reactor, "127.0.0.1", port).connect(factory).addCallback(
future.set_result
)

reactor.callFromThread(connect)

protocol = await future
protocol.connected_future.add_done_callback(lambda _: self.executor.wake())
await protocol.connected_future # wait for onOpen before proceeding
return protocol

def sleep(self, duration):
future = rclpy.task.Future()

def callback():
future.set_result(None)
timer.cancel()
self.node.destroy_timer(timer)

timer = self.node.create_timer(duration, callback)
return future

def test_smoke(self):
ros_received = []
sub_a = self.node.create_subscription(
String, A_TOPIC, ros_received.append, NUM_MSGS
)
pub_b = self.node.create_publisher(String, B_TOPIC, NUM_MSGS)

async def run_test():
print("Connecting to server...")
ws_client = await self.connect_to_server()
print("Connected!")

ws_client.sendDict(
{
"op": "subscribe",
"topic": B_TOPIC,
"type": "std_msgs/String",
"queue_length": 0, # Test the roslib default.
}
)
ws_client.sendDict(
{
"op": "advertise",
"topic": A_TOPIC,
"type": "std_msgs/String",
}
)
ws_client.sendDict(
{
"op": "publish",
"topic": A_TOPIC,
"msg": {
"data": A_STRING,
},
},
NUM_MSGS,
)

await self.sleep(WARMUP_DELAY)

for _ in range(NUM_MSGS):
pub_b.publish(String(data=B_STRING))

await self.sleep(TIME_LIMIT)

reactor.callFromThread(reactor.stop)
return ws_client.received

future = self.executor.create_task(run_test)
reactor.callInThread(
rclpy.spin_until_future_complete, self.node, future, self.executor
)
reactor.run(installSignalHandlers=False)

ws_received = future.result()
for received in ws_received:
msg = json.loads(received)
self.assertEqual("publish", msg["op"])
self.assertEqual(B_TOPIC, msg["topic"])
self.assertEqual(B_STRING, msg["msg"]["data"])
self.assertEqual(NUM_MSGS, len(ws_received))

for msg in ros_received:
self.assertEqual(A_STRING, msg.data)
self.assertEqual(NUM_MSGS, len(ros_received))

self.node.destroy_subscription(sub_a)
self.node.destroy_publisher(pub_b)

0 comments on commit 0751df3

Please sign in to comment.