In [1]:
import rospy
import rosmsg
import rostopic
import os
import re
from collections import defaultdict

In [2]:
OUT_DIR = os.path.abspath(os.path.join(os.path.abspath(''), 'generated'))
PROTO_FILE = os.path.join(OUT_DIR, 'ros.proto')

In [3]:
scalar_ros2pb = {
    'bool': 'bool',
    'int8': 'int32',
    'uint8': 'uint32',
    'int16': 'int32',
    'uint16': 'uint32',
    'int32': 'int32',
    'uint32': 'uint32',
    'int64': 'int64',
    'uint64': 'uint64',
    'float32': 'float',
    'float64': 'double',
    'string': 'string',
    'time': 'Time',
    'duration': 'Duration',
    # deprecated
    'char': 'uint32',
    'byte': 'int32',
}

def write_file(path, content):
    folder = os.path.dirname(path)
    
    if not os.path.exists(folder):
        os.makedirs(folder)
    
    f = open(os.path.join(path), 'w+')
    f.write(content)
    f.close()
    print("file was written to {}".format(path))

    # TODO remame strip_ros_array_notation
def strip_array_notation(name):
    """Remove array notation from ROS msg types
    
    >>> strip_array_notation('Foo[]')
    'Foo'
    >>> strip_array_notation('Foo[123]')
    'Foo'
    >>> strip_array_notation('Foo')
    'Foo'
    """
    return re.sub(r'\[\d*\]$', '', name)

def type_ros2pb(ros_type):
    """Convert ROS msg type to protobuff type
    
    >>> type_ros2pb('byte')
    'int32'
    >>> type_ros2pb('std_msgs/Header')
    'std_msgs.Header'
    >>> type_ros2pb('string[]')
    'repeated string'
    >>> type_ros2pb('time')
    'Time'
    >>> type_ros2pb('string')
    'string'
    >>> type_ros2pb('uint32')
    'uint32'
    """
    pb_type = strip_array_notation(ros_type)
    if pb_type in scalar_ros2pb:
        pb_type = scalar_ros2pb[pb_type]

    if ros_type.endswith(']'):
        pb_type = 'repeated ' + pb_type
    
    return pb_type.replace('/', '.')

def topic2service_name(topic):
    """Convert ROS topic names to valid protobuff names (replace slash with underscore)
    
    >>> topic2service_name('/rosout')
    'rosout'
    >>> topic2service_name('/rosout_agg')
    'rosout_agg'
    >>> topic2service_name('/turtle1/pose')
    'turtle1_pose'
    >>> topic2service_name('/turtle1/color_sensor')
    'turtle1_color_sensor'
    """
    return topic.replace('/', '_')[1:]

def parse_ros_type(ros_type):
    """Convert a line in a ROS msg file to (package, typename, is_array)
    
    >>> parse_ros_type('rosgraph_msgs/Log')
    ('rosgraph_msgs', 'Log', False)
    >>> parse_ros_type('turtlesim/Pose')
    ('turtlesim', 'Pose', False)
    >>> parse_ros_type('uint32')
    (None, 'uint32', False)
    >>> parse_ros_type('time')
    (None, 'time', False)
    >>> parse_ros_type('rosgraph_msgs/Log')
    ('rosgraph_msgs', 'Log', False)
    """
    is_array = ros_type.endswith(']')
    ros_type = strip_array_notation(ros_type)
    
    if '/' in ros_type:
        package, typename = ros_type.split('/')
        return package, typename, is_array
    else:
        return None, ros_type, is_array

import doctest
doctest.testmod()

TestResults(failed=0, attempted=18)

In [25]:
def snapshot_ros_messages():
    def add_ros_message(type_tree, message, msgtype):
        lines = message.splitlines()
        indent_size = 2

        def indent_level(line=''):
            return (len(line)-len(line.lstrip(' '))) / indent_size

        def read_fields():
            indent = indent_level(lines[0])
            prev_line = None
            fields = {}

            while len(lines) > 0:
                current_indent = indent_level(lines[0])
                # end of the definition of this message
                if current_indent == indent:
                    line = prev_line = lines.pop(0)
                    
                    # skip constants
                    if '=' in line:
                        continue

                    # convert the ros field definition and add it to the fields dict
                    ros_type, fieldname = line.lstrip(' ').split(' ')
                    fields[fieldname] = ros_type
                # the previous line has subfields
                elif current_indent > indent:
                    ros_type = prev_line.lstrip(' ').split(' ')[0]
                    package, msgtype = strip_array_notation(
                        ros_type).split('/')
                    type_tree[package][msgtype] = read_fields()
                # we're one indentation level back so this message ended
                else:
                    return fields

            return fields

        package, msgtype = msgtype.split('/')
        type_tree[package][msgtype] = read_fields()
    
    published, subscribed = rostopic.get_topic_list()
    published_topics = dict(map(lambda x: (x[0], x[1]), (published + subscribed))).items()
    def deep_dict(): return defaultdict(deep_dict)
    type_tree = deep_dict()

    for (_, ros_type) in published_topics:
        add_ros_message(type_tree, rosmsg.get_msg_text(ros_type), ros_type)
    
    return type_tree, published_topics
type_tree, published_topics = snapshot_ros_messages()

In [26]:
published, subscribed = rostopic.get_topic_list()

published_topics = dict(map(lambda x: (x[0], x[1]), (published + subscribed)))
published_topics.items()

dict_items([('/rosout', 'rosgraph_msgs/Log'), ('/rosout_agg', 'rosgraph_msgs/Log'), ('/turtle1/pose', 'turtlesim/Pose'), ('/turtle1/color_sensor', 'turtlesim/Color'), ('/turtle1/cmd_vel', 'geometry_msgs/Twist')])

In [27]:
rospy.get_published_topics()

[['/rosout', 'rosgraph_msgs/Log'],
 ['/rosout_agg', 'rosgraph_msgs/Log'],
 ['/turtle1/pose', 'turtlesim/Pose'],
 ['/turtle1/color_sensor', 'turtlesim/Color']]

In [28]:
rostopic.get_topic_list()

([('/rosout', 'rosgraph_msgs/Log', ['/turtlesim']),
  ('/rosout_agg', 'rosgraph_msgs/Log', ['/rosout']),
  ('/turtle1/pose', 'turtlesim/Pose', ['/turtlesim']),
  ('/turtle1/color_sensor', 'turtlesim/Color', ['/turtlesim'])],
 [('/rosout', 'rosgraph_msgs/Log', ['/rosout']),
  ('/turtle1/cmd_vel', 'geometry_msgs/Twist', ['/turtlesim'])])

## Python script templates

In [29]:
service_template = """
service {service_name} {{
    rpc Publish({pb_type}) returns (Empty);
    rpc Subscribe(Empty) returns (stream {pb_type});
}}

"""

header = '''
syntax = 'proto3';

package ros;

message Empty {}

message Time {
    uint32 secs = 1;
    uint32 nsecs = 2;
}

message Duration {
    int32 secs = 1;
    int32 nsecs = 2;
}

'''

In [30]:
def generate_pb_messages(message_tree):
    proto = ''
    for pkgname, pkg in message_tree.items():
        proto += 'message %s {\n' % pkgname

        for msgname, fields in pkg.items():
            proto += '  message %s {\n' % msgname
            for key, (fieldname, ros_type) in enumerate(fields.items()):
                definition = '{} {} = {};'.format(
                    type_ros2pb(ros_type), fieldname, key+1)
                proto += '    {}\n'.format(definition)

            proto += '  }\n'
        proto += '}\n\n'
    return proto

def generate_pb_service(topic, ros_type):
    return service_template.format(service_name=topic2service_name(topic), pb_type=type_ros2pb(ros_type))


def generate_proto_file():
    type_tree, published_topics = snapshot_ros_messages()

#     type_tree[None]['time'] = {
#         'secs': 'uint32',
#         'nsecs': 'uint32',
#     }

#     type_tree[None]['duration'] = {
#         'secs': 'int32',
#         'nsecs': 'int32',
#     }

    print('Found {} topics'.format(len(published_topics)))


    content = header
    for (topic, ros_type) in published_topics:
        content += generate_pb_service(topic, ros_type)
    content += generate_pb_messages(type_tree)
    write_file(os.path.join(PROTO_FILE), content)


    
    
generate_proto_file()

Found 5 topics
file was written to /home/azazdeaz/repos/grpc_bridge_test/src/grpc_api_generator/scripts/generated/ros.proto


In [31]:
frame_template = '''
from concurrent import futures
import time
import math
import logging
import argparse
import sys
import threading
import time

import grpc

import rospy
import roslib.message

import ros_pb2 as ros_pb
import ros_pb2_grpc as ros_grpc


{classes}

def create_server():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
{add_servicers}
    return server


def run_server(address):
    rospy.init_node('grpc_server', anonymous=True)
    server = create_server()
    server.add_insecure_port(address)
    server.start()
    print("gRPC server is running at %s" % address )
    rospy.spin()

def parse_args_and_run_server():
    run_server(parse_args())


def parse_args(args=None):
    parser = argparse.ArgumentParser(
        description='Generated ROS gRPC server')
    parser.add_argument(
        '-a', 
        '--address',
        help='host:port: host and port of the gRPC server to connect to', 
        default='[::]:50051')


    results = parser.parse_args(args)
    return results.address


if __name__ == '__main__':
    parse_args_and_run_server()

'''

add_servicer_template = 'ros_grpc.add_{servicer_class}_to_server({servicer_class}(), server)'

class_template = '''
class {servicer_class}(ros_grpc.{servicer_class}):
    def __init__(self):
        self.pub = None
        self.Msg = roslib.message.get_message_class('{ros_type}')

    def Publish(self, pb_msg, context):
        print('pb_msg', pb_msg, context)

        if self.pub == None:
            self.pub = rospy.Publisher('{topic}', self.Msg, queue_size=10)

        ros_msg = self.Msg()
{copy_pb2ros}
        print('publishing', ros_msg)
        self.pub.publish(ros_msg)
        return ros_pb.Empty()

    def Subscribe(self, request, context):
        c = {{'unsubscribed': False}}
        ros_messages = []

        def callback(ros_msg):
            print('ros in', ros_msg)
            ros_messages.append(ros_msg)
        subscription = rospy.Subscriber('{topic}', self.Msg, callback)

        def on_rpc_done():
            c['unsubscribed'] = True
            print("Attempting to regain servicer thread...", c)
            subscription.unregister()

        context.add_callback(on_rpc_done)

        print('sub', request, context)
        while not c['unsubscribed']:
            while ros_messages:
                ros_msg = ros_messages.pop(0)
{copy_ros2pb}
                yield pb_msg
            rospy.sleep(0.01)


'''

def add_tab(lines, tabs=1):
    return re.sub(r'^([^$])',  '    ' * tabs + '\\1', lines, flags=re.MULTILINE)


def generate_msg_copier(type_tree, ros_type, left='pb_msg', right='ros_msg', new_instance=True):
    result = ''
    package, typename, _ = parse_ros_type(ros_type)
    
    if new_instance:
        if package:
            if left.startswith('ros_'):
                result += '{} = roslib.message.get_message_class(\'{}/{}\')()\n'.format(
                    left, package, typename)
            else:
                result += '{} = ros_pb.{}.{}()\n'.format(left, package, typename)
        else:
            result += '{} = ros_pb.{}()\n'.format(left, typename)

    if type_tree[package][typename]:
        for fieldname, ros_type in type_tree[package][typename].items():
            package, typename, is_array = parse_ros_type(ros_type)

            is_complex = (
                type_tree[package] and type_tree[package][typename])

            if is_array:
                sub_left = '{}_'.format(left.split('.')[0])
                sub_right = '{}_'.format(right.split('.')[0])
                result += 'for {sub_right} in {right}.{fieldname}:\n'.format(
                    sub_right=sub_right, right=right, fieldname=fieldname)
                body = ''
                if is_complex:
                    body += generate_msg_copier(
                        type_tree, ros_type, sub_left, sub_right, True)
                    body += '{left}.{fieldname}.append({sub_left})\n'.format(
                        left=left, sub_left=sub_left, fieldname=fieldname)
                else:
                    body += '{left}.{fieldname}.append({sub_right})\n'.format(
                        left=left, sub_right=sub_right, fieldname=fieldname)
                result += add_tab(body)
            elif is_complex:
                sub_left = '{}.{}'.format(left, fieldname)
                sub_right = '{}.{}'.format(right, fieldname)
                result += generate_msg_copier(type_tree,
                                              ros_type, sub_left, sub_right, False)
            else:
                result += '{left}.{fieldname} = {right}.{fieldname}\n'.format(
                    left=left, right=right, fieldname=fieldname)

    return result


def generate_server(type_tree, topics):
    add_servicers = []
    servicer_classes = []
    for topic, ros_type in topics:
        servicer_class = topic2service_name(topic) + 'Servicer'
        add_servicers.append(add_servicer_template.format(
            servicer_class=servicer_class))
        copy_ros2pb = generate_msg_copier(
            type_tree, ros_type, 'pb_msg', 'ros_msg', new_instance=True)
        copy_pb2ros = generate_msg_copier(
            type_tree, ros_type, 'ros_msg', 'pb_msg', new_instance=False)
        copy_ros2pb = add_tab(copy_ros2pb, 4)
        copy_pb2ros = add_tab(copy_pb2ros, 2)
        servicer_classes.append(class_template.format(
            servicer_class=servicer_class, ros_type=ros_type, topic=topic, copy_ros2pb=copy_ros2pb, copy_pb2ros=copy_pb2ros))

    return frame_template.format(add_servicers=add_tab('\n'.join(add_servicers)), classes='\n'.join(servicer_classes))

write_file(os.path.join(OUT_DIR, '__init__.py'), "")
write_file(os.path.join(OUT_DIR, 'grpc_server.py'), 
           generate_server(type_tree, published_topics))
print('grpc_server.py file generated')

file was written to /home/azazdeaz/repos/grpc_bridge_test/src/grpc_api_generator/scripts/generated/__init__.py


ValueError: too many values to unpack (expected 2)

In [32]:
!python3 -m grpc_tools.protoc \
    -I={os.path.relpath(OUT_DIR)} \
    --python_out={os.path.relpath(OUT_DIR)} \
    --grpc_python_out={os.path.relpath(OUT_DIR)} \
    {os.path.relpath(PROTO_FILE)}

### All done! Run `rosrun ros_grpc_server_generator server.py` to kick in the gRPC server.