# gRPC bridge generator for ROS

This notebook generates a gRPC server implementing the available ROS topics and services. Please see the [README](/README.md) for more detail.


### Steps:
 - [Snapshot ROS topics and serivces](#snapshot)
 - [Load snapshots](#load)
 - [Generate the .proto file](#proto)
 - [Generate the gRPC server](#server)

In [None]:
import rospy
import rosmsg
import rostopic
import rosservice
import os
import re
import io
import configparser
import doctest
from cookiecutter.main import cookiecutter
import collections

In [None]:
WORLD_NAME = get_param('~world_name', 'tomato_field')
MODEL_NAME_PREFIX = get_param('~model_name_prefix', 'tomato')

In [None]:
# Options

generator_pkg_dir = os.path.abspath('..')
default_pkg_dir = os.path.abspath(os.path.join(generator_pkg_dir, '..'))
TEMPLATE = os.path.abspath(os.path.join(generator_pkg_dir, './template'))

def get_param(name, default):
    value = rospy.get_param(name, default)
    print('param', name, value)
    if value == "":
        return default
    return value

PKG_NAME = get_param('pkg_name', 'grpc_bridge')
PKGS_ROOT = get_param('pkgs_root', default_pkg_dir)
PKG_PATH = os.path.join(PKGS_ROOT, PKG_NAME)
PKG_SRC_PATH = os.path.join(PKG_PATH, 'src')
PROTO_FILE = os.path.join(PKG_PATH, 'ros.proto')
SNAPSHOT_PATH = get_param('snapshot_path', os.path.join(PKG_PATH, 'snapshot.ini'))


KEEP_EXISTING_SNAPSHOT = get_param('keep_existing_snapshot', False) is True
if KEEP_EXISTING_SNAPSHOT and not os.path.exists(SNAPSHOT_PATH):
    raise OSError('Can\'t find snapshot file "{}"'.format(SNAPSHOT_PATH))

print("PKG_NAME =", PKG_NAME)
print("PKG_PATH =", PKG_PATH)
print("SNAPSHOT_PATH =", SNAPSHOT_PATH)
print("KEEP_EXISTING_SNAPSHOT =", KEEP_EXISTING_SNAPSHOT)
print("PROTO_FILE =", PROTO_FILE)
print("TEMPLATE =", TEMPLATE)

In [None]:
cookiecutter(TEMPLATE,
             output_dir=PKGS_ROOT, 
             overwrite_if_exists=True, 
             no_input=True,
             extra_context={'pkg_name': PKG_NAME})


# Utils

In [None]:
def write_file(path, content):
    folder = os.path.dirname(path)
    
    if not os.path.exists(folder):
        os.makedirs(folder)
    
    f = open(os.path.join(path), 'w+')
    f.write(content)
    f.close()
    print("file was written to {}".format(path))

scalar_ros2pb = {
    'bool': 'bool',
    'int8': 'int32',
    'uint8': 'uint32',
    'int16': 'int32',
    'uint16': 'uint32',
    'int32': 'int32',
    'uint32': 'uint32',
    'int64': 'int64',
    'uint64': 'uint64',
    'float32': 'float',
    'float64': 'double',
    'string': 'string',
    'time': 'Time',
    'duration': 'Duration',
    'char': 'uint32',
    'byte': 'int32',
}

# TODO remame strip_ros_array_notation
def strip_array_notation(name):
    """Remove array notation from ROS msg types
    
    >>> strip_array_notation('Foo[]')
    'Foo'
    >>> strip_array_notation('Foo[123]')
    'Foo'
    >>> strip_array_notation('Foo')
    'Foo'
    """
    return re.sub(r'\[\d*\]$', '', name)

def is_binary_ros_type(package, typename, is_array):
    """True if the type should be represented as bytes in protobuf
    >>> is_binary_ros_type(None, 'uint8', True)
    True
    >>> is_binary_ros_type('some_msgs', 'Foo', True)
    False
    >>> is_binary_ros_type(None, 'char', False)
    False
    """
    return (typename == 'uint8' or typename == 'char') and (package is None) and (is_array is True)

def type_ros2pb(ros_type):
    """Convert ROS msg type to protobuff type
    
    >>> type_ros2pb('byte')
    'int32'
    >>> type_ros2pb('std_msgs/Header')
    'std_msgs.Header'
    >>> type_ros2pb('string[]')
    'repeated string'
    >>> type_ros2pb('time')
    'Time'
    >>> type_ros2pb('string')
    'string'
    >>> type_ros2pb('uint32')
    'uint32'
    >>> type_ros2pb('uint8')
    'uint32'
    >>> type_ros2pb('uint8[]')
    'bytes'
    >>> type_ros2pb('char[3]')
    'bytes'
    """
    package, typename, is_array = parse_ros_type(ros_type)
    
    # use bytes for ROS uint8 and char arrays (same way as the python message generator) 
    if is_binary_ros_type(package, typename, is_array):
        return 'bytes'

    pb_type = typename if package is None else '{}.{}'.format(package, typename)

    if pb_type in scalar_ros2pb:
        pb_type = scalar_ros2pb[pb_type]

    if is_array:
        pb_type = 'repeated ' + pb_type
    
    return pb_type

def grpc_service_name(ros_name):
    """Convert ROS topic or service names to valid protobuff names (replace slash with underscore)
    
    >>> grpc_service_name('/rosout')
    'rosout'
    >>> grpc_service_name('/rosout_agg')
    'rosout_agg'
    >>> grpc_service_name('/turtle1/pose')
    'turtle1_pose'
    >>> grpc_service_name('/turtle1/color_sensor')
    'turtle1_color_sensor'
    """
    return ros_name.replace('/', '_')[1:]

def parse_ros_type(ros_type):
    """Convert a line in a ROS msg file to (package, typename, is_array)
    
    >>> parse_ros_type('rosgraph_msgs/Log')
    ('rosgraph_msgs', 'Log', False)
    >>> parse_ros_type('turtlesim/Pose')
    ('turtlesim', 'Pose', False)
    >>> parse_ros_type('uint32')
    (None, 'uint32', False)
    >>> parse_ros_type('time')
    (None, 'time', False)
    >>> parse_ros_type('rosgraph_msgs/Log[]')
    ('rosgraph_msgs', 'Log', True)
    """
    is_array = ros_type.endswith(']')
    ros_type = strip_array_notation(ros_type)
    
    if '/' in ros_type:
        package, typename = ros_type.split('/')
        return package, typename, is_array
    else:
        return None, ros_type, is_array

doctest.testmod()

<a id="snapshot"></a>
# Snapshot

In [None]:

if not KEEP_EXISTING_SNAPSHOT:
    snapshot = configparser.ConfigParser()
    # enable case-sensitive keys (so ROS msg types can be keys)
    snapshot.optionxform=str

    # list all topics, services, and their message definition in the snapshot
    snapshot['MESSAGE_DEFINITIONS'] = {}
    snapshot['TOPICS'] = {}
    snapshot['SERVICES'] = {}

    published, subscribed = rostopic.get_topic_list()
    published_topics = dict(map(lambda x: (x[0], x[1]), (published + subscribed))).items()
        
    for (message_name, ros_type) in published_topics:
        snapshot['TOPICS'][message_name] = ros_type
        snapshot['MESSAGE_DEFINITIONS'][ros_type] = rosmsg.get_msg_text(ros_type)

    services = rosservice.get_service_list()

    for service_name in services:
        try:
            ros_type = rosservice.get_service_type(service_name)
            snapshot['SERVICES'][service_name] = ros_type
            snapshot['MESSAGE_DEFINITIONS'][ros_type] = rosmsg.get_srv_text(ros_type)
        except rosservice.ROSServiceIOException as e:
            rospy.logerr('Can\'t read service "{}": {}'.format(service_name, e))

    # flatten nested message definitions   
    # TODO tests
    def flatten_types():
        changed = False
        for type_name in snapshot['MESSAGE_DEFINITIONS'].keys():
            fields = snapshot['MESSAGE_DEFINITIONS'][type_name].split('\n')
            top_level_fields = []
            subfields = []
            for field in fields:
                ros_type = strip_array_notation(field.split(' ')[0])
                package, typename, _ = parse_ros_type(ros_type)

                is_subfield = field.startswith('  ')

                # make sure that empty messages will be listed in the snapshot too
                if package and ros_type not in snapshot['MESSAGE_DEFINITIONS']:
                    snapshot['MESSAGE_DEFINITIONS'][ros_type] = ''

                if is_subfield:
                    subfields.append(field[2:])

                if subfields and (not is_subfield or field == fields[-1]): 
                    # empty the subfields buffer into a separate message definition
                    sub_type_name = top_level_fields[-1].split(' ')[0]
                    sub_type_name = strip_array_notation(sub_type_name)
                    snapshot['MESSAGE_DEFINITIONS'][sub_type_name] = '\n'.join(subfields)
                    changed = True
                    subfields = []
                
                if not is_subfield:
                    top_level_fields.append(field)

            snapshot['MESSAGE_DEFINITIONS'][type_name] = '\n'.join(top_level_fields)
        
        if (changed):
            flatten_types()

    flatten_types()

    # Order the content of each section alphabetically
    for section in snapshot._sections:
        snapshot._sections[section] = collections.OrderedDict(sorted(snapshot._sections[section].items(), key=lambda t: t[0]))

        
        
    # convert snapshot to string and save to file
    with io.StringIO() as ss:
        snapshot.write(ss)
        ss.seek(0) # rewind
        write_file(SNAPSHOT_PATH, ss.read())


<a id="load"></a>
# Load

In [None]:
class RosSnapshot:
    def __init__(self, path=None):
        self.config = configparser.ConfigParser()
        # enable case-sensitive keys (for the ROS types)
        self.config.optionxform=str
        if path:
            self.config.read(path)
    
    def get_message_definitions(self):
        return self.config["MESSAGE_DEFINITIONS"]
    
    def get_message_definition_packages(self):
        ros_types = self.get_message_definitions().keys()
        return set(map(lambda t: t.split('/')[0], ros_types))
    
    def get_topics(self):
        """
        Returns Map<topic, ros_type>
        """
        return self.config["TOPICS"]
    
    def get_services(self):
        """
        Returns Map<service, ros_type>
        """
        return self.config["SERVICES"]

    def get_sections(self, ros_type):
        """
        Returns the parts of a ROS message as a list. (one part for topics, two for services, three for actions)
        """
        # ROS has these two "complex primitives". Here, we return their fields too so the rest of the code can handle them as regural message types
        if ros_type == 'time':
            return ['uint32 secs\nuint32 nsecs']
        if ros_type == 'duration':
            return ['int32 secs\nint32 nsecs']

        if ros_type not in self.config["MESSAGE_DEFINITIONS"]:
            raise KeyError("Can't find message definition for '{}'".format(ros_type))

        return self.config["MESSAGE_DEFINITIONS"][ros_type].split('---')
    
    def _get_all_fields(self, ros_type, section=0):
        """
        Returns (ros_type, field_name)[]
        """
        sections = self.get_sections(ros_type)
        fields = sections[section].strip().split('\n')
        fields = filter(lambda f: f != '', fields)
        # skip constants
        fields = map(lambda f: f.split(' '), fields)
        return fields
    
    def get_fields(self, ros_type, section=0):
        """
        Returns (ros_type, field_name)[]
        """
        fields = self._get_all_fields(ros_type, section)
        fields = list(filter(lambda f: not '=' in f[1], fields))
        return fields
    
    def get_constants(self, ros_type, section=0):

        """
        Returns (ros_type, field_name)[]
        """
        fields = self._get_all_fields(ros_type, section)
        fields = list(filter(lambda f: '=' in f[1], fields))
        return fields

    def __str__(self):
        return '<RosSnapshot topics={} services={} message_definitions={}>'.format(
            len(self.get_topics()),
            len(self.get_services()),
            len(self.get_message_definitions()))
    

snap = RosSnapshot(SNAPSHOT_PATH)

In [None]:
# Example ROS snapshot for unit tesing
TEST_SNAPSHOT_FILE = """
[MESSAGE_DEFINITIONS]
msgs/Foo = byte BIM=1
	byte BAM=2
	byte BUM=4
    msgs2/Taz taz
    time stamp
	string name
	uint32[] numbers
    uint8[] image
	msgs/Bar[5] bar

msgs/Bar = uint8 number

msgs2/Taz = duration[] tazz

srvs/Empty = ---

srvs/Baz = string logger
	---
	string level


[TOPICS]
/foo = msgs/Foo

[SERVICES]
/baz = srvs/Baz
"""
def tsnap() -> RosSnapshot:
    """
    Returns a RosSnapshot instance filled with the test data
    """
    snap = RosSnapshot()
    snap.config.read_string(TEST_SNAPSHOT_FILE)
    return snap

<a id="proto"></a>
# Proto

In [None]:
topic_service_template = """
service {service_name} {{
    rpc Publish({pb_type}) returns (Empty);
    rpc Subscribe(Empty) returns (stream {pb_type});
}}

"""

srv_service_template = """
service {service_name} {{
    rpc Call({pb_type}Request) returns ({pb_type}Response);
}}

"""

header = '''
syntax = 'proto3';

package ros;

message Empty {}

message Time {
  uint32 secs = 1;
  uint32 nsecs = 2;
}

message Duration {
  int32 secs = 1;
  int32 nsecs = 2;
}

'''

In [None]:
def generate_pb_message_definition_for_package(snap: RosSnapshot, pkg):
    """Gerate the proto definitions of a ros package
    >>> print(generate_pb_message_definition_for_package(tsnap(), "msgs"))
    message msgs {
      message Bar {
        uint32 number = 1;
      }
      /**
       * BIM=1
       * BAM=2
       * BUM=4
       */
      message Foo {
        msgs2.Taz taz = 1;
        Time stamp = 2;
        string name = 3;
        repeated uint32 numbers = 4;
        bytes image = 5;
        repeated msgs.Bar bar = 6;
      }
    }
    <BLANKLINE>
    <BLANKLINE>

    >>> print(generate_pb_message_definition_for_package(tsnap(), "srvs"))
    message srvs {
      message BazRequest {
        string logger = 1;
      }
      message BazResponse {
        string level = 1;
      }
      message EmptyRequest {
      }
      message EmptyResponse {
      }
    }
    <BLANKLINE>
    <BLANKLINE>
    """
    proto = 'message %s {\n' % pkg
    for ros_type in sorted(snap.get_message_definitions().keys()):
        pkgname, msgname = ros_type.split('/')
        msgname = strip_array_notation(msgname)
        if pkg == pkgname:
            section_count = len(snap.get_sections(ros_type))
            is_msg = section_count == 1
            is_srv = section_count == 2
            for section in range(section_count):
                postfix = ''
                if is_srv:
                    postfix = 'Request' if section == 0 else 'Response'
                  

                fields = snap.get_fields(ros_type, section)
                constants = snap.get_constants(ros_type, section)
                
                if constants:
                    proto += '  /**\n'
                    for _, constant in constants:
                        proto += '   * {}\n'.format(constant)
                    proto += '   */\n'

                proto += '  message %s {\n' % (msgname + postfix)

                for key, (field_type, field_name) in enumerate(fields):
                    definition = '{} {} = {};'.format(
                        type_ros2pb(field_type), field_name, key+1)
                    proto += '    {}\n'.format(definition)
                proto += '  }\n'
    proto += '}\n\n'
    return proto

def generate_pb_message_all_definitions(snap: RosSnapshot):
    proto = ''
    for pkg in sorted(snap.get_message_definition_packages()):
        proto += generate_pb_message_definition_for_package(snap, pkg)
    return proto


def generate_proto_file(snap: RosSnapshot):
    print('Found {} messages'.format(len(snap.get_message_definitions())))
    
    content = header
    
    content += generate_pb_message_all_definitions(snap)
            
        
    for (topic, ros_type) in sorted(snap.get_topics().items()):
        content += topic_service_template.format(
            service_name=grpc_service_name(topic), 
            pb_type=type_ros2pb(ros_type))

    for (service, ros_type) in sorted(snap.get_services().items()):
        content += srv_service_template.format(
            service_name=grpc_service_name(service), 
            pb_type=type_ros2pb(ros_type))
            
    write_file(os.path.join(PROTO_FILE), content)



doctest.testmod()
generate_proto_file(snap)


<a id="server"></a>
# Server

In [None]:
def add_tab(lines, tabs=1):
    """ Insert tabs at the beginning of each line """
    return re.sub(r'^([^$])',  '    ' * tabs + '\\1', lines, flags=re.MULTILINE)

def generate_msg_copier(snap: RosSnapshot, ros_type, left='pb_msg', right='ros_msg', new_instance=False, section=0):
    """ Generate scripts for converison between protobuf and ROS messages
    >>> print(generate_msg_copier(tsnap(), 'msgs/Bar', 'pb_msg', 'ros_msg', True))
    pb_msg = ros_pb.msgs.Bar()
    pb_msg.number = ros_msg.number
    <BLANKLINE>

    >>> print(generate_msg_copier(tsnap(), 'msgs/Bar', 'ros_msg', 'pb_msg', True))
    ros_msg = roslib.message.get_message_class('msgs/Bar')()
    ros_msg.number = pb_msg.number
    <BLANKLINE>

    >>> print(generate_msg_copier(tsnap(), 'srvs/Baz', 'ros_msg','pb_msg', section=0))
    ros_msg.logger = pb_msg.logger
    <BLANKLINE>

    >>> print(generate_msg_copier(tsnap(), 'srvs/Baz', 'pb_msg','ros_msg', section=1))
    pb_msg.level = ros_msg.level
    <BLANKLINE>

    >>> print(generate_msg_copier(tsnap(), 'msgs/Foo', 'ros_msg','pb_msg', True))
    ros_msg = roslib.message.get_message_class('msgs/Foo')()
    for pb_msg_ in pb_msg.taz.tazz:
        ros_msg_ = ros_pb.duration()
        ros_msg_.secs = pb_msg_.secs
        ros_msg_.nsecs = pb_msg_.nsecs
        ros_msg.taz.tazz.append(ros_msg_)
    ros_msg.stamp.secs = pb_msg.stamp.secs
    ros_msg.stamp.nsecs = pb_msg.stamp.nsecs
    ros_msg.name = pb_msg.name
    for pb_msg_ in pb_msg.numbers:
        ros_msg.numbers.append(pb_msg_)
    ros_msg.image = pb_msg.image
    for pb_msg_ in pb_msg.bar:
        ros_msg_ = roslib.message.get_message_class('msgs/Bar')()
        ros_msg_.number = pb_msg_.number
        ros_msg.bar.append(ros_msg_)
    <BLANKLINE>

    >>> print(generate_msg_copier(tsnap(), 'msgs2/Taz', 'pb_msg','ros_msg', True))
    pb_msg = ros_pb.msgs2.Taz()
    for ros_msg_ in ros_msg.tazz:
        pb_msg_ = ros_pb.duration()
        pb_msg_.secs = ros_msg_.secs
        pb_msg_.nsecs = ros_msg_.nsecs
        pb_msg.tazz.append(pb_msg_)
    <BLANKLINE>
    """
    result = ''
    package, typename, _ = parse_ros_type(ros_type)
    if new_instance:
        if package:
            if left.startswith('ros_'):
                result += '{} = roslib.message.get_message_class(\'{}/{}\')()\n'.format(
                    left, package, typename)
            else:
                result += '{} = ros_pb.{}.{}()\n'.format(left, package, typename)
        else:
            result += '{} = ros_pb.{}()\n'.format(left, typename)

    for ros_fieldtype, fieldname in snap.get_fields(ros_type, section):
        package, typename, is_array = parse_ros_type(ros_fieldtype)
        is_binary = is_binary_ros_type(package, typename, is_array)
        ros_fieldtype = strip_array_notation(ros_fieldtype)
        is_complex = package is not None # TODO is_scalar
        is_time = package is None and (typename == 'time' or typename == 'duration')
        if is_array and not is_binary:
            sub_left = '{}_'.format(left.split('.')[0])
            sub_right = '{}_'.format(right.split('.')[0])
            result += 'for {sub_right} in {right}.{fieldname}:\n'.format(
                sub_right=sub_right, right=right, fieldname=fieldname)
            body = ''
            if is_complex or is_time:
                body += generate_msg_copier(
                    snap, ros_fieldtype, sub_left, sub_right, True)
                body += '{left}.{fieldname}.append({sub_left})\n'.format(
                    left=left, sub_left=sub_left, fieldname=fieldname)
            else:
                body += '{left}.{fieldname}.append({sub_right})\n'.format(
                    left=left, sub_right=sub_right, fieldname=fieldname)
            result += add_tab(body)
        elif is_complex or is_time:
            sub_left = '{}.{}'.format(left, fieldname)
            sub_right = '{}.{}'.format(right, fieldname)
            result += generate_msg_copier(snap, ros_fieldtype, sub_left, sub_right, False)
        else:
            result += '{left}.{fieldname} = {right}.{fieldname}\n'.format(
                left=left, right=right, fieldname=fieldname)

    return result
doctest.testmod()


In [None]:
frame_template = '''
#!/usr/bin/env python3

from concurrent import futures
import time
import math
import logging
import argparse
import sys
import threading
import time

import grpc

import rospy
import roslib.message

import ros_pb2 as ros_pb
import ros_pb2_grpc as ros_grpc


{classes}

def create_server():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
{add_servicers}
    return server


def run_server():
    address = rospy.get_param('address', '[::]:50051')
    rospy.init_node('grpc_server', anonymous=True)
    server = create_server()
    server.add_insecure_port(address)
    server.start()
    print("gRPC server is running at %s" % address )
    rospy.spin()

if __name__ == '__main__':
    run_server()

'''

add_servicer_template = 'ros_grpc.add_{servicer_class}_to_server({servicer_class}(), server)'

topic_class_template = '''
class {servicer_class}(ros_grpc.{servicer_class}):
    def __init__(self):
        self.pub = None
        self.Msg = roslib.message.get_message_class('{ros_type}')

    def Publish(self, pb_msg, context):
        if self.pub == None:
            self.pub = rospy.Publisher('{topic}', self.Msg, queue_size=10)

        ros_msg = self.Msg()
{copy_pb2ros}
        self.pub.publish(ros_msg)
        return ros_pb.Empty()

    def Subscribe(self, request, context):
        c = {{'unsubscribed': False}}
        ros_messages = []

        def callback(ros_msg):
            ros_messages.append(ros_msg)
        subscription = rospy.Subscriber('{topic}', self.Msg, callback)

        def on_rpc_done():
            c['unsubscribed'] = True
            print("Attempting to regain servicer thread...", c)
            subscription.unregister()

        context.add_callback(on_rpc_done)

        while not c['unsubscribed']:
            while ros_messages:
                ros_msg = ros_messages.pop(0)
{copy_ros2pb}
                yield pb_msg
            rospy.sleep(0.01)
'''

service_class_template = """
class {servicer_class}(ros_grpc.{servicer_class}):
    def Call(self, pb_msg, context):
        Srv = roslib.message.get_service_class('{ros_type}')
        rospy.wait_for_service('{service}')
        call = rospy.ServiceProxy('{service}', Srv)
        ros_msg = Srv._request_class()
{copy_pb2ros}
        ros_msg = call(ros_msg)
        {new_pb_response}
{copy_ros2pb}
        return pb_msg
"""

def generate_server(snap: RosSnapshot):
    add_servicers = []
    servicer_classes = []
    for topic, ros_type in sorted(snap.get_topics().items()):
        servicer_class = grpc_service_name(topic) + 'Servicer'
        add_servicers.append(add_servicer_template.format(
            servicer_class=servicer_class))
        copy_ros2pb = generate_msg_copier(snap, ros_type, 'pb_msg', 'ros_msg', new_instance=True)
        copy_pb2ros = generate_msg_copier(snap, ros_type, 'ros_msg', 'pb_msg')
        copy_ros2pb = add_tab(copy_ros2pb, 4)
        copy_pb2ros = add_tab(copy_pb2ros, 2)
        servicer_classes.append(topic_class_template.format(
            servicer_class=servicer_class, ros_type=ros_type, topic=topic, copy_ros2pb=copy_ros2pb, copy_pb2ros=copy_pb2ros))
    
    for service, ros_type in sorted(snap.get_services().items()):
        servicer_class = grpc_service_name(service) + 'Servicer'
        add_servicers.append(add_servicer_template.format(
            servicer_class=servicer_class))
        copy_ros2pb = generate_msg_copier(snap, ros_type, 'pb_msg', 'ros_msg', section=1)
        copy_pb2ros = generate_msg_copier(snap, ros_type, 'ros_msg', 'pb_msg', section=0)
        copy_ros2pb = add_tab(copy_ros2pb, 2)
        copy_pb2ros = add_tab(copy_pb2ros, 2)
        package, typename, _ = parse_ros_type(ros_type)
        new_pb_response = 'pb_msg = ros_pb.{}.{}Response()\n'.format(package, typename)
        servicer_classes.append(service_class_template.format(
            servicer_class=servicer_class, ros_type=ros_type, service=service, copy_ros2pb=copy_ros2pb, copy_pb2ros=copy_pb2ros, new_pb_response=new_pb_response))

    return frame_template.format(add_servicers=add_tab('\n'.join(add_servicers)), classes='\n'.join(servicer_classes))

write_file(os.path.join(PKG_SRC_PATH, '{}.py'.format(PKG_NAME)), 
           generate_server(snap))
print('grpc_server.py file generated')


In [None]:
!python3 -m grpc_tools.protoc \
    -I={os.path.relpath(PKG_PATH)} \
    --python_out={os.path.relpath(PKG_SRC_PATH)} \
    --grpc_python_out={os.path.relpath(PKG_SRC_PATH)} \
    {os.path.relpath(PROTO_FILE)}

In [None]:
# Check if the doctests are ok
doctest.testmod()