Skip to content

Commit

Permalink
Add cbor-raw compression (#452)
Browse files Browse the repository at this point in the history
The CBOR compression is already a huge win over JSON or PNG encoding,
but it’s still suboptimal in some situations. This PR adds support for
getting messages in their raw binary (ROS-serialized) format. This has
benefits in the following cases:
- Your application already knows how to parse messages in bag files
  (e.g. using [rosbag.js](https://github.com/cruise-automation/rosbag.js),
  which means that now you can use consistent code paths for both bags
  and live messages.
- You want to parse messages as late as possible, or in parallel, e.g.
  only in the thread or WebWorker that cares about the message. Delaying
  the parsing of the message means that moving or copying the message to
  the thread is cheaper when its in binary form, since no serialization
  between threads is necessary.
- You only care about part of the message, and don't need to parse the
  rest of it.
- You really care about performance; no conversion between the ROS
  binary format and CBOR is done in the rosbridge_sever.
  • Loading branch information
janpaul123 authored and mvollrath committed Jan 8, 2020
1 parent 856987e commit dc7fcb2
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 7 deletions.
31 changes: 30 additions & 1 deletion ROSBRIDGE_PROTOCOL.md
Expand Up @@ -183,6 +183,35 @@ homogeneous arrays. At the moment, only little-endian packing is supported.

[draft typed array tags]: https://tools.ietf.org/html/draft-ietf-cbor-array-tags-00

#### 3.1.4 CBOR-RAW encoding ( _cbor-raw_ )

While CBOR encodes the entire message as CBOR, sometimes it's desirable to get the raw binary message in the
[ROS serialization format](https://wiki.ros.org/roscpp/Overview/MessagesSerializationAndAdaptingTypes),
which is the same format as sent between ROS nodes and stored in [Bag files](http://wiki.ros.org/Bags/Format/2.0).

This can be useful in several cases:
- Your application already knows how to parse messages in bag files (e.g. using
[rosbag.js](https://github.com/cruise-automation/rosbag.js), which means that now you can use
consistent code paths for both bags and live messages.
- You want to parse messages as late as possible, or in parallel, e.g. only in the thread
or WebWorker that cares about the message. Delaying the parsing of the message means that moving
or copying the message to the thread is cheaper when its in binary form, since no serialization
between threads is necessary.
- You only care about part of the message, and don't need to parse the rest of it.
- You really care about performance; no conversion between the ROS binary format and CBOR is done in
the rosbridge_sever.

The format is similar to CBOR above, but instead of the "msg" field containing the message itself
in CBOR format, instead it contains an object with a "bytes" field which is a byte array containing
the raw message. The "msg" object also includes "secs" and "nsecs" of the `get_rostime()` upon
receiving the message, which is especially useful when `use_sim_time` is set, since it will give you
the simulated time the message was received.

When using this encoding, a client application will need to know exactly how to parse the raw
message. For this it's useful to use the `/rosapi/get_topics_and_raw_types` service, which will give
you all topics and their raw message definitions, similar to `gendeps --cat`. This is the same
format as used by bag files.

### 3.2 Status messages

rosbridge sends status messages to the client relating to the successes and
Expand Down Expand Up @@ -387,7 +416,7 @@ which to send messages.
* **fragment_size** – the maximum size that a message can take before it is to
be fragmented.
* **compression** – an optional string to specify the compression scheme to be
used on messages. Valid values are "none", "png" and "cbor"
used on messages. Valid values are "none", "png", "cbor", and "cbor-raw".

If queue_length is specified, then messages are placed into the queue before
being sent. Messages are sent from the head of the queue. If the queue gets
Expand Down
1 change: 1 addition & 0 deletions rosapi/CMakeLists.txt
Expand Up @@ -33,6 +33,7 @@ add_service_files(
SetParam.srv
Subscribers.srv
Topics.srv
TopicsAndRawTypes.srv
TopicsForType.srv
TopicType.srv
)
Expand Down
7 changes: 7 additions & 0 deletions rosapi/scripts/rosapi_node
Expand Up @@ -65,6 +65,7 @@ def register_services():
rospy.Service('/rosapi/delete_param', DeleteParam, delete_param)
rospy.Service('/rosapi/get_param_names', GetParamNames, get_param_names)
rospy.Service('/rosapi/get_time', GetTime, get_time)
rospy.Service('/rosapi/topics_and_raw_types', TopicsAndRawTypes, get_topics_and_raw_types)

def get_topics(request):
""" Called by the rosapi/Topics service. Returns a list of all the topics being published. """
Expand Down Expand Up @@ -176,6 +177,12 @@ def get_param_names(request):
def get_time(request):
return GetTimeResponse(rospy.get_rostime())

def get_topics_and_raw_types(request):
""" Called by the rosapi/topics_and_raw_types service. Returns a list of all the topics being published, and their
raw types, similar to `gendeps --cat`. """
topics, types = proxy.get_topics_and_types(rosapi.glob_helper.topics_glob)
return TopicsAndRawTypesResponse(topics, types, [objectutils.get_typedef_full_text(type) for type in types])

def dict_to_typedef(typedefdict):
typedef = TypeDef()
typedef.type = typedefdict["type"]
Expand Down
9 changes: 9 additions & 0 deletions rosapi/src/rosapi/objectutils.py
Expand Up @@ -95,6 +95,15 @@ def get_service_response_typedef_recursive(servicetype):
# Return the list of sub-typedefs
return _get_subtypedefs_recursive(typedef, [])

def get_typedef_full_text(type):
""" Returns the full text (similar to `gendeps --cat`) for the specified message type """
# Get an instance of the service response class and get its typedef
try:
instance = ros_loader.get_message_instance(type)
return instance._full_text
except Exception:
return ""

def _get_typedef(instance):
""" Gets a typedef dict for the specified instance """
if instance is None or not hasattr(instance, "__slots__") or not hasattr(instance, "_slot_types"):
Expand Down
5 changes: 5 additions & 0 deletions rosapi/srv/TopicsAndRawTypes.srv
@@ -0,0 +1,5 @@

---
string[] topics
string[] types
string[] typedefs_full_text
Expand Up @@ -36,7 +36,7 @@
import fnmatch
from threading import Lock
from functools import partial
from rospy import loginfo
from rospy import loginfo, get_rostime
from rosbridge_library.capability import Capability
from rosbridge_library.internal.subscribers import manager
from rosbridge_library.internal.subscription_modifiers import MessageHandler
Expand Down Expand Up @@ -123,6 +123,9 @@ def subscribe(self, sid=None, msg_type=None, throttle_rate=0,

self.update_params()

if compression == "cbor-raw":
msg_type = "__AnyMsg"

# Subscribe with the manager. This will propagate any exceptions
manager.subscribe(self.client_id, self.topic, self.on_msg, msg_type)

Expand Down Expand Up @@ -187,6 +190,8 @@ def f(fieldname):
self.compression = "png"
if "cbor" in f("compression"):
self.compression = "cbor"
if "cbor-raw" in f("compression"):
self.compression = "cbor-raw"

with self.handler_lock:
self.handler = self.handler.set_throttle_rate(self.throttle_rate)
Expand Down Expand Up @@ -323,6 +328,14 @@ def publish(self, topic, message, fragment_size=None, compression="none"):
elif compression=="cbor":
outgoing_msg[u"msg"] = message.get_cbor_values()
outgoing_msg = bytearray(encode_cbor(outgoing_msg))
elif compression=="cbor-raw":
now = get_rostime()
outgoing_msg[u"msg"] = {
u"secs": now.secs,
u"nsecs": now.nsecs,
u"bytes": message._message._buff
}
outgoing_msg = bytearray(encode_cbor(outgoing_msg))
else:
outgoing_msg["msg"] = message.get_json_values()

Expand Down
16 changes: 11 additions & 5 deletions rosbridge_library/src/rosbridge_library/internal/subscribers.py
Expand Up @@ -38,6 +38,7 @@
from rosbridge_library.internal.topics import TopicNotEstablishedException
from rosbridge_library.internal.topics import TypeConflictException
from rosbridge_library.internal.outgoing_message import OutgoingMessage
from rospy.msg import AnyMsg

""" Manages and interfaces with ROS Subscriber objects. A single subscriber
is shared between multiple clients
Expand Down Expand Up @@ -79,12 +80,15 @@ def __init__(self, topic, msg_type=None):
if msg_type is None:
msg_type = topic_type

# Load the message class, propagating any exceptions from bad msg types
msg_class = ros_loader.get_message_class(msg_type)
if msg_type == "__AnyMsg":
msg_class = AnyMsg
else:
# Load the message class, propagating any exceptions from bad msg types
msg_class = ros_loader.get_message_class(msg_type)

# Make sure the specified msg type and established msg type are same
if topic_type is not None and topic_type != msg_class._type:
raise TypeConflictException(topic, topic_type, msg_class._type)
# Make sure the specified msg type and established msg type are same
if topic_type is not None and topic_type != msg_class._type:
raise TypeConflictException(topic, topic_type, msg_class._type)

# Create the subscriber and associated member variables
self.subscriptions = {}
Expand All @@ -110,6 +114,8 @@ def verify_type(self, msg_type):
this publisher
"""
if msg_type == "__AnyMsg":
return
if not ros_loader.get_message_class(msg_type) is self.msg_class:
raise TypeConflictException(self.topic,
self.msg_class._type, msg_type)
Expand Down
1 change: 1 addition & 0 deletions rosbridge_server/CMakeLists.txt
Expand Up @@ -27,4 +27,5 @@ install(FILES
if (CATKIN_ENABLE_TESTING)
find_package(rostest REQUIRED)
add_rostest(test/websocket/test_smoke.test)
add_rostest(test/websocket/test_cbor_raw.test)
endif()
76 changes: 76 additions & 0 deletions rosbridge_server/test/websocket/test_cbor_raw.py
@@ -0,0 +1,76 @@
#!/usr/bin/env python
import json
import io
import os
import rostest
import sys
import threading
import unittest

import rospy
from std_msgs.msg import String

from twisted.internet import reactor
from autobahn.twisted.websocket import (WebSocketClientProtocol,
WebSocketClientFactory)

from rosbridge_library.util.cbor import loads as decode_cbor

from twisted.python import log
log.startLogging(sys.stderr)

TOPIC = '/b_topic'
STRING = 'B' * 10000
WARMUP_DELAY = 1.0 # seconds
TIME_LIMIT = 5.0 # seconds


class TestWebsocketCborRaw(unittest.TestCase):
def test_cbor_raw(self):
test_client_received = []
class TestClientProtocol(WebSocketClientProtocol):
def onOpen(self):
self.sendMessage(json.dumps({
'op': 'subscribe',
'topic': TOPIC,
'compression': 'cbor-raw',
}))

def onMessage(self, payload, binary):
test_client_received.append(payload)

protocol = os.environ.get('PROTOCOL')
port = rospy.get_param('/rosbridge_websocket/actual_port')
url = protocol + '://127.0.0.1:' + str(port)

factory = WebSocketClientFactory(url)
factory.protocol = TestClientProtocol
reactor.connectTCP('127.0.0.1', port, factory)

pub = rospy.Publisher(TOPIC, String, queue_size=1)
def publish_timer():
rospy.sleep(WARMUP_DELAY)
pub.publish(String(STRING))
rospy.sleep(TIME_LIMIT)
reactor.stop()
reactor.callInThread(publish_timer)
reactor.run()

self.assertEqual(len(test_client_received), 1)
websocket_message = decode_cbor(test_client_received[0])
self.assertEqual(websocket_message['topic'], TOPIC)
buff = io.BytesIO()
String(STRING).serialize(buff)
self.assertEqual(websocket_message['msg']['bytes'], buff.getvalue())


PKG = 'rosbridge_server'
NAME = 'test_websocket_cbor_raw'

if __name__ == '__main__':
rospy.init_node(NAME)

while not rospy.is_shutdown() and not rospy.has_param('/rosbridge_websocket/actual_port'):
rospy.sleep(1.0)

rostest.rosrun(PKG, NAME, TestWebsocketCborRaw)
7 changes: 7 additions & 0 deletions rosbridge_server/test/websocket/test_cbor_raw.test
@@ -0,0 +1,7 @@
<launch>
<env name="PROTOCOL" value="ws" />
<node name="rosbridge_websocket" pkg="rosbridge_server" type="rosbridge_websocket.py">
<param name="port" value="0" />
</node>
<test test-name="test_cbor_raw" pkg="rosbridge_server" type="test_cbor_raw.py" />
</launch>

0 comments on commit dc7fcb2

Please sign in to comment.