From d77f7e0a6a8eb6d3d7b5e1cdb7b70b081953fbab Mon Sep 17 00:00:00 2001 From: Matt Vollrath Date: Tue, 4 Dec 2018 23:35:02 -0500 Subject: [PATCH] CBOR encoding (#364) * Add CBOR encoding * Fix value extraction performance regression Extract message values once per message. * Fix typed array tags Was using big-endian tags and encoding little-endian. Always use little-endian for now since Intel is prevalent for desktop. Add some comments to this effect. * Update CBOR protocol documentation More information about draft typed arrays and when to use CBOR. * Fix 64-bit integer CBOR packing Use an actual 64-bit format. --- ROSBRIDGE_PROTOCOL.md | 19 ++- rosbridge_library/package.xml | 2 + .../capabilities/subscribe.py | 40 +++-- .../internal/cbor_conversion.py | 97 +++++++++++ .../internal/outgoing_message.py | 24 +++ .../rosbridge_library/internal/subscribers.py | 16 +- .../src/rosbridge_library/protocol.py | 2 + .../subscribers/test_multi_subscriber.py | 8 +- .../subscribers/test_subscriber_manager.py | 2 +- .../test/internal/test_cbor_conversion.py | 156 ++++++++++++++++++ .../test/internal/test_internal.test | 2 + .../test/internal/test_outgoing_message.py | 35 ++++ .../src/rosbridge_server/websocket_handler.py | 9 +- 13 files changed, 382 insertions(+), 30 deletions(-) create mode 100644 rosbridge_library/src/rosbridge_library/internal/cbor_conversion.py create mode 100644 rosbridge_library/src/rosbridge_library/internal/outgoing_message.py create mode 100755 rosbridge_library/test/internal/test_cbor_conversion.py create mode 100755 rosbridge_library/test/internal/test_outgoing_message.py diff --git a/ROSBRIDGE_PROTOCOL.md b/ROSBRIDGE_PROTOCOL.md index 3caff3f9b..6eaa4b913 100644 --- a/ROSBRIDGE_PROTOCOL.md +++ b/ROSBRIDGE_PROTOCOL.md @@ -141,7 +141,7 @@ concatenated, resulting in the JSON string of the original message. #### 3.1.2 PNG compression ( _png_ ) [experimental] -Some messages (such as point clouds) can be extremely large, and for efficiency +Some messages (such as images and maps) can be extremely large, and for efficiency reasons we may wish to transfer them as PNG-encoded bytes. The PNG opcode duplicates the fragmentation logic of the FRG opcode (and it is possible and reasonable to only have a single fragment), except that the data field consists @@ -168,6 +168,21 @@ the image. This string is now used as the data field. If fragmentation is necessary, then fragment the data and set the ID, num and total fields to the appropriate values in the fragments. Otherwise these fields can be left out. +#### 3.1.3 CBOR encoding ( _cbor_ ) + +[CBOR](https://tools.ietf.org/html/rfc7049) encoding is the fastest +compression method for messages containing large blobs of data, such as +byte arrays and numeric typed arrays. + +When CBOR compression is requested by a subscriber, a binary message will be +produced instead of a JSON string. Once decoded, the message will contain +a normal protocol message. + +The implementation uses [draft typed array tags] for efficient packing of +homogeneous arrays. At the moment, only little-endian packing is supported. + +[draft typed array tags]: https://tools.ietf.org/html/draft-ietf-cbor-array-tags-00 + ### 3.2 Status messages rosbridge sends status messages to the client relating to the successes and @@ -372,7 +387,7 @@ which to send messages. * **fragment_size** – the maximum size that a message can take before it is to be fragmented. * **compression** – an optional string to specify the compression scheme to be - used on messages. Valid values are "none" and "png" + used on messages. Valid values are "none", "png" and "cbor" If queue_length is specified, then messages are placed into the queue before being sent. Messages are sent from the head of the queue. If the queue gets diff --git a/rosbridge_library/package.xml b/rosbridge_library/package.xml index e779a58cf..1f4d78064 100644 --- a/rosbridge_library/package.xml +++ b/rosbridge_library/package.xml @@ -25,6 +25,7 @@ geometry_msgs message_generation python-bson + python-cbor rospy roscpp @@ -36,6 +37,7 @@ geometry_msgs message_runtime python-bson + python-cbor rostest actionlib_msgs diff --git a/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py b/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py index 8cef70696..3c48f1749 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py @@ -30,6 +30,9 @@ # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. +import sys +PYTHON2 = sys.version_info < (3, 0) + import fnmatch from threading import Lock from functools import partial @@ -37,15 +40,16 @@ from rosbridge_library.capability import Capability from rosbridge_library.internal.subscribers import manager from rosbridge_library.internal.subscription_modifiers import MessageHandler -from rosbridge_library.internal.pngcompression import encode +from rosbridge_library.internal.pngcompression import encode as encode_png +from cbor import dumps as encode_cbor try: - from ujson import dumps + from ujson import dumps as encode_json except ImportError: try: - from simplejson import dumps + from simplejson import dumps as encode_json except ImportError: - from json import dumps + from json import dumps as encode_json from rosbridge_library.util import string_types @@ -173,7 +177,12 @@ def f(fieldname): self.fragment_size = None else: self.fragment_size = min(frags) - self.compression = "png" if "png" in f("compression") else "none" + + self.compression = "none" + if "png" in f("compression"): + self.compression = "png" + if "cbor" in f("compression"): + self.compression = "cbor" with self.handler_lock: self.handler = self.handler.set_throttle_rate(self.throttle_rate) @@ -277,8 +286,7 @@ def publish(self, topic, message, fragment_size=None, compression="none"): Keyword arguments: topic -- the topic to publish the message on - message -- a dict of key-value pairs. Will be wrapped in a message with - opcode publish + message -- a ROS message wrapped by OutgoingMessage fragment_size -- (optional) fragment the serialized message into msgs with payloads not greater than this value compression -- (optional) compress the message. valid values are @@ -300,10 +308,20 @@ def publish(self, topic, message, fragment_size=None, compression="none"): else: self.protocol.log("debug", "No topic security glob, not checking topic publish.") - outgoing_msg = {"op": "publish", "topic": topic, "msg": message} - if compression == "png": - outgoing_msg_dumped = dumps(outgoing_msg) - outgoing_msg = {"op": "png", "data": encode(outgoing_msg_dumped)} + if PYTHON2: + topic = unicode(topic) + + outgoing_msg = {u"op": u"publish", u"topic": topic} + if compression=="png": + outgoing_msg["msg"] = message.get_json_values() + outgoing_msg_dumped = encode_json(outgoing_msg) + outgoing_msg = {"op": "png", "data": encode_png(outgoing_msg_dumped)} + elif compression=="cbor": + outgoing_msg[u"msg"] = message.get_cbor_values() + outgoing_msg = bytearray(encode_cbor(outgoing_msg)) + else: + outgoing_msg["msg"] = message.get_json_values() + self.protocol.send(outgoing_msg) def finish(self): diff --git a/rosbridge_library/src/rosbridge_library/internal/cbor_conversion.py b/rosbridge_library/src/rosbridge_library/internal/cbor_conversion.py new file mode 100644 index 000000000..38fe10613 --- /dev/null +++ b/rosbridge_library/src/rosbridge_library/internal/cbor_conversion.py @@ -0,0 +1,97 @@ +import struct +import sys + +PYTHON2 = sys.version_info < (3, 0) + +from cbor import Tag + + +LIST_TYPES = [list, tuple] +INT_TYPES = ['byte', 'char', 'int8', 'uint8', 'int16', 'uint16', 'int32', 'uint32', 'int64', 'uint64'] +FLOAT_TYPES = ['float32', 'float64'] +STRING_TYPES = ['string'] +BOOL_TYPES = ['bool'] +TIME_TYPES = ['time', 'duration'] +BOOL_ARRAY_TYPES = ['bool[]'] +BYTESTREAM_TYPES = ['uint8[]', 'char[]'] + +# Typed array tags according to +# Always encode to little-endian variant, for now. +TAGGED_ARRAY_FORMATS = { + 'uint16[]': (69, '<{}H'), + 'uint32[]': (70, '<{}I'), + 'uint64[]': (71, '<{}Q'), + 'byte[]': (72, '{}b'), + 'int8[]': (72, '{}b'), + 'int16[]': (77, '<{}h'), + 'int32[]': (78, '<{}i'), + 'int64[]': (79, '<{}q'), + 'float32[]': (85, '<{}f'), + 'float64[]': (86, '<{}d'), +} + + +def extract_cbor_values(msg): + """Extract a dictionary of CBOR-friendly values from a ROS message. + + Primitive values will be casted to specific Python primitives. + + Typed arrays will be tagged and packed into byte arrays. + """ + out = {} + for slot, slot_type in zip(msg.__slots__, msg._slot_types): + val = getattr(msg, slot) + + if PYTHON2: + slot = unicode(slot) + + # string + if slot_type in STRING_TYPES: + out[slot] = unicode(val) if PYTHON2 else str(val) + + # bool + elif slot_type in BOOL_TYPES: + out[slot] = bool(val) + + # integers + elif slot_type in INT_TYPES: + out[slot] = int(val) + + # floats + elif slot_type in FLOAT_TYPES: + out[slot] = float(val) + + # time/duration + elif slot_type in TIME_TYPES: + out[slot] = { + 'secs': int(val.secs), + 'nsecs': int(val.nsecs), + } + + # byte array + elif slot_type in BYTESTREAM_TYPES: + if PYTHON2: + out[slot] = bytes(bytearray(val)) + else: + out[slot] = bytes(val) + + # bool array + elif slot_type in BOOL_ARRAY_TYPES: + out[slot] = [bool(i) for i in val] + + # numeric arrays + elif slot_type in TAGGED_ARRAY_FORMATS: + tag, fmt = TAGGED_ARRAY_FORMATS[slot_type] + fmt_to_length = fmt.format(len(val)) + packed = struct.pack(fmt_to_length, *val) + out[slot] = Tag(tag=tag, value=packed) + + # array of messages + elif type(val) in LIST_TYPES: + out[slot] = [extract_cbor_values(i) for i in val] + + # message + else: + out[slot] = extract_cbor_values(val) + + return out diff --git a/rosbridge_library/src/rosbridge_library/internal/outgoing_message.py b/rosbridge_library/src/rosbridge_library/internal/outgoing_message.py new file mode 100644 index 000000000..12c18e59e --- /dev/null +++ b/rosbridge_library/src/rosbridge_library/internal/outgoing_message.py @@ -0,0 +1,24 @@ +from rosbridge_library.internal.message_conversion import extract_values as extract_json_values +from rosbridge_library.internal.cbor_conversion import extract_cbor_values + + +class OutgoingMessage: + """A message wrapper for caching encoding operations.""" + def __init__(self, message): + self._message = message + self._json_values = None + self._cbor_values = None + + @property + def message(self): + return self._message + + def get_json_values(self): + if self._json_values is None: + self._json_values = extract_json_values(self._message) + return self._json_values + + def get_cbor_values(self): + if self._cbor_values is None: + self._cbor_values = extract_cbor_values(self._message) + return self._cbor_values diff --git a/rosbridge_library/src/rosbridge_library/internal/subscribers.py b/rosbridge_library/src/rosbridge_library/internal/subscribers.py index 67f051f24..161c7e60f 100644 --- a/rosbridge_library/src/rosbridge_library/internal/subscribers.py +++ b/rosbridge_library/src/rosbridge_library/internal/subscribers.py @@ -37,6 +37,7 @@ from rosbridge_library.internal import ros_loader, message_conversion from rosbridge_library.internal.topics import TopicNotEstablishedException from rosbridge_library.internal.topics import TypeConflictException +from rosbridge_library.internal.outgoing_message import OutgoingMessage """ Manages and interfaces with ROS Subscriber objects. A single subscriber is shared between multiple clients @@ -149,22 +150,15 @@ def has_subscribers(self): def callback(self, msg, callbacks=None): """ Callback for incoming messages on the rospy.Subscriber - Converts the incoming msg to JSON, then passes the JSON to the - registered subscriber callbacks. + Passes the message to registered subscriber callbacks. Keyword Arguments: msg - the ROS message coming from the subscriber callbacks - subscriber callbacks to invoke """ - # Try to convert the msg to JSON - json = None - try: - json = message_conversion.extract_values(msg) - except Exception as exc: - logerr("Exception while converting messages in subscriber callback : %s", exc) - return - + outgoing = OutgoingMessage(msg) + # Get the callbacks to call if not callbacks: with self.lock: @@ -173,7 +167,7 @@ def callback(self, msg, callbacks=None): # Pass the JSON to each of the callbacks for callback in callbacks: try: - callback(json) + callback(outgoing) except Exception as exc: # Do nothing if one particular callback fails except log it logerr("Exception calling subscribe callback: %s", exc) diff --git a/rosbridge_library/src/rosbridge_library/protocol.py b/rosbridge_library/src/rosbridge_library/protocol.py index 00cf06c3d..c62cd7ca6 100644 --- a/rosbridge_library/src/rosbridge_library/protocol.py +++ b/rosbridge_library/src/rosbridge_library/protocol.py @@ -286,6 +286,8 @@ def serialize(self, msg, cid=None): Returns a JSON string representing the dictionary """ try: + if type(msg) == bytearray: + return msg if has_binary(msg) or self.bson_only_mode: return bson.BSON.encode(msg) else: diff --git a/rosbridge_library/test/internal/subscribers/test_multi_subscriber.py b/rosbridge_library/test/internal/subscribers/test_multi_subscriber.py index 8b52311a1..2bfd59d79 100755 --- a/rosbridge_library/test/internal/subscribers/test_multi_subscriber.py +++ b/rosbridge_library/test/internal/subscribers/test_multi_subscriber.py @@ -93,7 +93,7 @@ def test_subscribe_receive_json(self): received = {"msg": None} def cb(msg): - received["msg"] = msg + received["msg"] = msg.get_json_values() multi.subscribe(client, cb) sleep(0.5) @@ -114,7 +114,7 @@ def test_subscribe_receive_json_multiple(self): received = {"msgs": []} def cb(msg): - received["msgs"].append(msg["data"]) + received["msgs"].append(msg.get_json_values()["data"]) multi.subscribe(client, cb) sleep(0.5) @@ -167,10 +167,10 @@ def test_multiple_subscribers(self): received = {"msg1": None, "msg2": None} def cb1(msg): - received["msg1"] = msg + received["msg1"] = msg.get_json_values() def cb2(msg): - received["msg2"] = msg + received["msg2"] = msg.get_json_values() multi.subscribe(client1, cb1) multi.subscribe(client2, cb2) diff --git a/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py b/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py index d7f966fc1..add2bf67b 100755 --- a/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py +++ b/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py @@ -175,7 +175,7 @@ def test_publisher_manager_publish(self): received = {"msg": None} def cb(msg): - received["msg"] = msg + received["msg"] = msg.get_json_values() manager.subscribe(client, topic, cb, msg_type) sleep(0.5) diff --git a/rosbridge_library/test/internal/test_cbor_conversion.py b/rosbridge_library/test/internal/test_cbor_conversion.py new file mode 100755 index 000000000..dd938ed8c --- /dev/null +++ b/rosbridge_library/test/internal/test_cbor_conversion.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python +import rostest +import sys +import unittest + +PYTHON2 = sys.version_info < (3, 0) + +import struct +from rosbridge_library.internal.cbor_conversion import extract_cbor_values, TAGGED_ARRAY_FORMATS +from cbor import Tag + +from std_msgs.msg import ( + Bool, String, + Byte, Char, + Int8, Int16, Int32, Int64, + UInt8, UInt16, UInt32, UInt64, + Float32, Float64, + ByteMultiArray, + Int8MultiArray, Int16MultiArray, Int32MultiArray, Int64MultiArray, + UInt8MultiArray, UInt16MultiArray, UInt32MultiArray, UInt64MultiArray, + Float32MultiArray, Float64MultiArray, + Time, Duration, + Empty, + MultiArrayLayout, MultiArrayDimension, +) + + +class TestCBORConversion(unittest.TestCase): + def test_string(self): + msg = String(data="foo") + extracted = extract_cbor_values(msg) + + self.assertEqual(extracted['data'], msg.data) + if PYTHON2: + self.assertEqual(type(extracted['data']), unicode) + else: + self.assertEqual(type(extracted['data']), str) + + def test_bool(self): + for val in [True, False]: + msg = Bool(data=val) + extracted = extract_cbor_values(msg) + + self.assertEqual(extracted['data'], msg.data, 'val={}'.format(val)) + self.assertEqual(type(extracted['data']), bool, 'val={}'.format(val)) + + def test_numbers(self): + for msg_type in [Int8, Int16, Int32, Int64]: + msg = msg_type(data=-5) + extracted = extract_cbor_values(msg) + + self.assertEqual(extracted['data'], msg.data, 'type={}'.format(msg_type)) + self.assertEqual(type(extracted['data']), int, 'type={}'.format(msg_type)) + + for msg_type in [UInt8, UInt16, UInt32, UInt64]: + msg = msg_type(data=5) + extracted = extract_cbor_values(msg) + + self.assertEqual(extracted['data'], msg.data, 'type={}'.format(msg_type)) + self.assertEqual(type(extracted['data']), int, 'type={}'.format(msg_type)) + + for msg_type in [Float32, Float64]: + msg = msg_type(data=2.3) + extracted = extract_cbor_values(msg) + + self.assertEqual(extracted['data'], msg.data, 'type={}'.format(msg_type)) + self.assertEqual(type(extracted['data']), float, 'type={}'.format(msg_type)) + + def test_time(self): + for msg_type in [Time, Duration]: + msg = msg_type() + extracted = extract_cbor_values(msg) + + self.assertEqual(extracted['data']['secs'], msg.data.secs, 'type={}'.format(msg_type)) + self.assertEqual(extracted['data']['nsecs'], msg.data.nsecs, 'type={}'.format(msg_type)) + self.assertEqual(type(extracted['data']['secs']), int, 'type={}'.format(msg_type)) + self.assertEqual(type(extracted['data']['nsecs']), int, 'type={}'.format(msg_type)) + + def test_byte_array(self): + msg = UInt8MultiArray(data=[0, 1, 2]) + extracted = extract_cbor_values(msg) + + data = extracted['data'] + self.assertEqual(type(data), bytes) + for i, val in enumerate(msg.data): + if PYTHON2: + self.assertEqual(ord(data[i]), val) + else: + self.assertEqual(data[i], val) + + def test_numeric_array(self): + for msg_type in [Int8MultiArray, Int16MultiArray, Int32MultiArray, Int64MultiArray, + UInt16MultiArray, UInt32MultiArray, UInt64MultiArray, + Float32MultiArray, Float64MultiArray]: + msg = msg_type(data=[0, 1, 2]) + extracted = extract_cbor_values(msg) + + tag = extracted['data'] + self.assertEqual(type(tag), Tag, 'type={}'.format(msg_type)) + self.assertEqual(type(tag.value), bytes, 'type={}'.format(msg_type)) + + # This is as consistent as the message defs.. + array_type = msg._slot_types[1] + + expected_tag = TAGGED_ARRAY_FORMATS[array_type][0] + self.assertEqual(tag.tag, expected_tag, 'type={}'.format(msg_type)) + + fmt = TAGGED_ARRAY_FORMATS[array_type][1] + fmt_to_length = fmt.format(len(msg.data)) + unpacked = list(struct.unpack(fmt_to_length, tag.value)) + + self.assertEqual(unpacked, msg.data, 'type={}'.format(msg_type)) + + def test_nested_messages(self): + msg = UInt8MultiArray(layout=MultiArrayLayout( + data_offset=5, + dim=[ + MultiArrayDimension( + label="foo", + size=4, + stride=4, + ), + MultiArrayDimension( + label="bar", + size=8, + stride=8, + ), + ] + )) + extracted = extract_cbor_values(msg) + + ex_layout = extracted['layout'] + self.assertEqual(type(ex_layout), dict) + self.assertEqual(ex_layout['data_offset'], msg.layout.data_offset) + self.assertEqual(len(ex_layout['dim']), len(msg.layout.dim)) + for i, val in enumerate(msg.layout.dim): + self.assertEqual(ex_layout['dim'][i]['label'], val.label) + self.assertEqual(ex_layout['dim'][i]['size'], val.size) + self.assertEqual(ex_layout['dim'][i]['stride'], val.stride) + + def test_unicode_keys(self): + msg = String(data="foo") + extracted = extract_cbor_values(msg) + + keys = extracted.keys() + for key in keys: + if PYTHON2: + self.assertEqual(type(key), unicode) + else: + self.assertEqual(type(key), str) + + +PKG = 'rosbridge_library' +NAME = 'test_cbor_conversion' +if __name__ == '__main__': + rostest.unitrun(PKG, NAME, TestCBORConversion) diff --git a/rosbridge_library/test/internal/test_internal.test b/rosbridge_library/test/internal/test_internal.test index 0486dcb60..62bbf3fa4 100644 --- a/rosbridge_library/test/internal/test_internal.test +++ b/rosbridge_library/test/internal/test_internal.test @@ -9,4 +9,6 @@ + + diff --git a/rosbridge_library/test/internal/test_outgoing_message.py b/rosbridge_library/test/internal/test_outgoing_message.py new file mode 100755 index 000000000..ff7a441a2 --- /dev/null +++ b/rosbridge_library/test/internal/test_outgoing_message.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python +import rostest +import sys +import unittest + +from rosbridge_library.internal.outgoing_message import OutgoingMessage +from std_msgs.msg import String + + +class TestOutgoingMessage(unittest.TestCase): + def test_json_values(self): + msg = String(data="foo") + outgoing = OutgoingMessage(msg) + + result = outgoing.get_json_values() + self.assertEqual(result['data'], msg.data) + + again = outgoing.get_json_values() + self.assertTrue(result is again) + + def test_cbor_values(self): + msg = String(data="foo") + outgoing = OutgoingMessage(msg) + + result = outgoing.get_cbor_values() + self.assertEqual(result['data'], msg.data) + + again = outgoing.get_cbor_values() + self.assertTrue(result is again) + + +PKG = 'rosbridge_library' +NAME = 'test_outgoing_message' +if __name__ == '__main__': + rostest.unitrun(PKG, NAME, TestOutgoingMessage) diff --git a/rosbridge_server/src/rosbridge_server/websocket_handler.py b/rosbridge_server/src/rosbridge_server/websocket_handler.py index 58a666d32..e6e26e6fd 100755 --- a/rosbridge_server/src/rosbridge_server/websocket_handler.py +++ b/rosbridge_server/src/rosbridge_server/websocket_handler.py @@ -122,7 +122,14 @@ def on_close(self): rospy.loginfo("Client disconnected. %d clients total.", cls.clients_connected) def send_message(self, message): - binary = type(message)==bson.BSON + if type(message) == bson.BSON: + binary = True + elif type(message) == bytearray: + binary = True + message = bytes(message) + else: + binary = False + IOLoop.instance().add_callback(partial(self.write_message, message, binary)) def check_origin(self, origin):