Skip to content

Commit

Permalink
CBOR encoding (#364)
Browse files Browse the repository at this point in the history
* Add CBOR encoding

* Fix value extraction performance regression

Extract message values once per message.

* Fix typed array tags

Was using big-endian tags and encoding little-endian.

Always use little-endian for now since Intel is prevalent for desktop.

Add some comments to this effect.

* Update CBOR protocol documentation

More information about draft typed arrays and when to use CBOR.

* Fix 64-bit integer CBOR packing

Use an actual 64-bit format.
  • Loading branch information
mvollrath authored and jihoonl committed Dec 5, 2018
1 parent c86fe4b commit d77f7e0
Show file tree
Hide file tree
Showing 13 changed files with 382 additions and 30 deletions.
19 changes: 17 additions & 2 deletions ROSBRIDGE_PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ concatenated, resulting in the JSON string of the original message.

#### 3.1.2 PNG compression ( _png_ ) [experimental]

Some messages (such as point clouds) can be extremely large, and for efficiency
Some messages (such as images and maps) can be extremely large, and for efficiency
reasons we may wish to transfer them as PNG-encoded bytes. The PNG opcode
duplicates the fragmentation logic of the FRG opcode (and it is possible and
reasonable to only have a single fragment), except that the data field consists
Expand All @@ -168,6 +168,21 @@ the image. This string is now used as the data field. If fragmentation is
necessary, then fragment the data and set the ID, num and total fields to the
appropriate values in the fragments. Otherwise these fields can be left out.

#### 3.1.3 CBOR encoding ( _cbor_ )

[CBOR](https://tools.ietf.org/html/rfc7049) encoding is the fastest
compression method for messages containing large blobs of data, such as
byte arrays and numeric typed arrays.

When CBOR compression is requested by a subscriber, a binary message will be
produced instead of a JSON string. Once decoded, the message will contain
a normal protocol message.

The implementation uses [draft typed array tags] for efficient packing of
homogeneous arrays. At the moment, only little-endian packing is supported.

[draft typed array tags]: https://tools.ietf.org/html/draft-ietf-cbor-array-tags-00

### 3.2 Status messages

rosbridge sends status messages to the client relating to the successes and
Expand Down Expand Up @@ -372,7 +387,7 @@ which to send messages.
* **fragment_size** – the maximum size that a message can take before it is to
be fragmented.
* **compression** – an optional string to specify the compression scheme to be
used on messages. Valid values are "none" and "png"
used on messages. Valid values are "none", "png" and "cbor"

If queue_length is specified, then messages are placed into the queue before
being sent. Messages are sent from the head of the queue. If the queue gets
Expand Down
2 changes: 2 additions & 0 deletions rosbridge_library/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
<build_depend>geometry_msgs</build_depend>
<build_depend>message_generation</build_depend>
<build_depend>python-bson</build_depend>
<build_depend>python-cbor</build_depend>

<exec_depend>rospy</exec_depend>
<exec_depend>roscpp</exec_depend>
Expand All @@ -36,6 +37,7 @@
<exec_depend>geometry_msgs</exec_depend>
<exec_depend>message_runtime</exec_depend>
<exec_depend>python-bson</exec_depend>
<exec_depend>python-cbor</exec_depend>

<test_depend>rostest</test_depend>
<test_depend>actionlib_msgs</test_depend>
Expand Down
40 changes: 29 additions & 11 deletions rosbridge_library/src/rosbridge_library/capabilities/subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,26 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import sys
PYTHON2 = sys.version_info < (3, 0)

import fnmatch
from threading import Lock
from functools import partial
from rospy import loginfo
from rosbridge_library.capability import Capability
from rosbridge_library.internal.subscribers import manager
from rosbridge_library.internal.subscription_modifiers import MessageHandler
from rosbridge_library.internal.pngcompression import encode
from rosbridge_library.internal.pngcompression import encode as encode_png
from cbor import dumps as encode_cbor

try:
from ujson import dumps
from ujson import dumps as encode_json
except ImportError:
try:
from simplejson import dumps
from simplejson import dumps as encode_json
except ImportError:
from json import dumps
from json import dumps as encode_json

from rosbridge_library.util import string_types

Expand Down Expand Up @@ -173,7 +177,12 @@ def f(fieldname):
self.fragment_size = None
else:
self.fragment_size = min(frags)
self.compression = "png" if "png" in f("compression") else "none"

self.compression = "none"
if "png" in f("compression"):
self.compression = "png"
if "cbor" in f("compression"):
self.compression = "cbor"

with self.handler_lock:
self.handler = self.handler.set_throttle_rate(self.throttle_rate)
Expand Down Expand Up @@ -277,8 +286,7 @@ def publish(self, topic, message, fragment_size=None, compression="none"):
Keyword arguments:
topic -- the topic to publish the message on
message -- a dict of key-value pairs. Will be wrapped in a message with
opcode publish
message -- a ROS message wrapped by OutgoingMessage
fragment_size -- (optional) fragment the serialized message into msgs
with payloads not greater than this value
compression -- (optional) compress the message. valid values are
Expand All @@ -300,10 +308,20 @@ def publish(self, topic, message, fragment_size=None, compression="none"):
else:
self.protocol.log("debug", "No topic security glob, not checking topic publish.")

outgoing_msg = {"op": "publish", "topic": topic, "msg": message}
if compression == "png":
outgoing_msg_dumped = dumps(outgoing_msg)
outgoing_msg = {"op": "png", "data": encode(outgoing_msg_dumped)}
if PYTHON2:
topic = unicode(topic)

outgoing_msg = {u"op": u"publish", u"topic": topic}
if compression=="png":
outgoing_msg["msg"] = message.get_json_values()
outgoing_msg_dumped = encode_json(outgoing_msg)
outgoing_msg = {"op": "png", "data": encode_png(outgoing_msg_dumped)}
elif compression=="cbor":
outgoing_msg[u"msg"] = message.get_cbor_values()
outgoing_msg = bytearray(encode_cbor(outgoing_msg))
else:
outgoing_msg["msg"] = message.get_json_values()

self.protocol.send(outgoing_msg)

def finish(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import struct
import sys

PYTHON2 = sys.version_info < (3, 0)

from cbor import Tag


LIST_TYPES = [list, tuple]
INT_TYPES = ['byte', 'char', 'int8', 'uint8', 'int16', 'uint16', 'int32', 'uint32', 'int64', 'uint64']
FLOAT_TYPES = ['float32', 'float64']
STRING_TYPES = ['string']
BOOL_TYPES = ['bool']
TIME_TYPES = ['time', 'duration']
BOOL_ARRAY_TYPES = ['bool[]']
BYTESTREAM_TYPES = ['uint8[]', 'char[]']

# Typed array tags according to <https://tools.ietf.org/html/draft-ietf-cbor-array-tags-00>
# Always encode to little-endian variant, for now.
TAGGED_ARRAY_FORMATS = {
'uint16[]': (69, '<{}H'),
'uint32[]': (70, '<{}I'),
'uint64[]': (71, '<{}Q'),
'byte[]': (72, '{}b'),
'int8[]': (72, '{}b'),
'int16[]': (77, '<{}h'),
'int32[]': (78, '<{}i'),
'int64[]': (79, '<{}q'),
'float32[]': (85, '<{}f'),
'float64[]': (86, '<{}d'),
}


def extract_cbor_values(msg):
"""Extract a dictionary of CBOR-friendly values from a ROS message.
Primitive values will be casted to specific Python primitives.
Typed arrays will be tagged and packed into byte arrays.
"""
out = {}
for slot, slot_type in zip(msg.__slots__, msg._slot_types):
val = getattr(msg, slot)

if PYTHON2:
slot = unicode(slot)

# string
if slot_type in STRING_TYPES:
out[slot] = unicode(val) if PYTHON2 else str(val)

# bool
elif slot_type in BOOL_TYPES:
out[slot] = bool(val)

# integers
elif slot_type in INT_TYPES:
out[slot] = int(val)

# floats
elif slot_type in FLOAT_TYPES:
out[slot] = float(val)

# time/duration
elif slot_type in TIME_TYPES:
out[slot] = {
'secs': int(val.secs),
'nsecs': int(val.nsecs),
}

# byte array
elif slot_type in BYTESTREAM_TYPES:
if PYTHON2:
out[slot] = bytes(bytearray(val))
else:
out[slot] = bytes(val)

# bool array
elif slot_type in BOOL_ARRAY_TYPES:
out[slot] = [bool(i) for i in val]

# numeric arrays
elif slot_type in TAGGED_ARRAY_FORMATS:
tag, fmt = TAGGED_ARRAY_FORMATS[slot_type]
fmt_to_length = fmt.format(len(val))
packed = struct.pack(fmt_to_length, *val)
out[slot] = Tag(tag=tag, value=packed)

# array of messages
elif type(val) in LIST_TYPES:
out[slot] = [extract_cbor_values(i) for i in val]

# message
else:
out[slot] = extract_cbor_values(val)

return out
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from rosbridge_library.internal.message_conversion import extract_values as extract_json_values
from rosbridge_library.internal.cbor_conversion import extract_cbor_values


class OutgoingMessage:
"""A message wrapper for caching encoding operations."""
def __init__(self, message):
self._message = message
self._json_values = None
self._cbor_values = None

@property
def message(self):
return self._message

def get_json_values(self):
if self._json_values is None:
self._json_values = extract_json_values(self._message)
return self._json_values

def get_cbor_values(self):
if self._cbor_values is None:
self._cbor_values = extract_cbor_values(self._message)
return self._cbor_values
16 changes: 5 additions & 11 deletions rosbridge_library/src/rosbridge_library/internal/subscribers.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from rosbridge_library.internal import ros_loader, message_conversion
from rosbridge_library.internal.topics import TopicNotEstablishedException
from rosbridge_library.internal.topics import TypeConflictException
from rosbridge_library.internal.outgoing_message import OutgoingMessage

""" Manages and interfaces with ROS Subscriber objects. A single subscriber
is shared between multiple clients
Expand Down Expand Up @@ -149,22 +150,15 @@ def has_subscribers(self):
def callback(self, msg, callbacks=None):
""" Callback for incoming messages on the rospy.Subscriber
Converts the incoming msg to JSON, then passes the JSON to the
registered subscriber callbacks.
Passes the message to registered subscriber callbacks.
Keyword Arguments:
msg - the ROS message coming from the subscriber
callbacks - subscriber callbacks to invoke
"""
# Try to convert the msg to JSON
json = None
try:
json = message_conversion.extract_values(msg)
except Exception as exc:
logerr("Exception while converting messages in subscriber callback : %s", exc)
return

outgoing = OutgoingMessage(msg)

# Get the callbacks to call
if not callbacks:
with self.lock:
Expand All @@ -173,7 +167,7 @@ def callback(self, msg, callbacks=None):
# Pass the JSON to each of the callbacks
for callback in callbacks:
try:
callback(json)
callback(outgoing)
except Exception as exc:
# Do nothing if one particular callback fails except log it
logerr("Exception calling subscribe callback: %s", exc)
Expand Down
2 changes: 2 additions & 0 deletions rosbridge_library/src/rosbridge_library/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ def serialize(self, msg, cid=None):
Returns a JSON string representing the dictionary
"""
try:
if type(msg) == bytearray:
return msg
if has_binary(msg) or self.bson_only_mode:
return bson.BSON.encode(msg)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def test_subscribe_receive_json(self):
received = {"msg": None}

def cb(msg):
received["msg"] = msg
received["msg"] = msg.get_json_values()

multi.subscribe(client, cb)
sleep(0.5)
Expand All @@ -114,7 +114,7 @@ def test_subscribe_receive_json_multiple(self):
received = {"msgs": []}

def cb(msg):
received["msgs"].append(msg["data"])
received["msgs"].append(msg.get_json_values()["data"])

multi.subscribe(client, cb)
sleep(0.5)
Expand Down Expand Up @@ -167,10 +167,10 @@ def test_multiple_subscribers(self):
received = {"msg1": None, "msg2": None}

def cb1(msg):
received["msg1"] = msg
received["msg1"] = msg.get_json_values()

def cb2(msg):
received["msg2"] = msg
received["msg2"] = msg.get_json_values()

multi.subscribe(client1, cb1)
multi.subscribe(client2, cb2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def test_publisher_manager_publish(self):
received = {"msg": None}

def cb(msg):
received["msg"] = msg
received["msg"] = msg.get_json_values()

manager.subscribe(client, topic, cb, msg_type)
sleep(0.5)
Expand Down

0 comments on commit d77f7e0

Please sign in to comment.