In [1]:
import re
import json
import yaml
from pprint import pprint
from collections import namedtuple
from colorama import Fore, Style
import yaml
from jinja2 import Environment
from jinja2.loaders import FileSystemLoader
# import re
from pathlib import Path
from collections import namedtuple
# import logging
# from colorama import Fore
# from pprint import pprint
import struct
import enum

In [2]:
# Regular expressions for parsing
package_re = re.compile(r'package\s+([\w.]+)\s*;')
import_re = re.compile(r'import\s+"([^"]+)"\s*;')
message_start_re = re.compile(r'message\s+(\w+)\s*{')
enum_start_re = re.compile(r'enum\s+(\w+)\s*{')
field_re = re.compile(r'\s*([\w.]+)\s+(\w+)\[?(\d+)?\]?\s*=?\s*([\d\-\+eE_.,\[\]]+)?\s*;\s*(?:\/\/\s*(.*))?')
enum_value_re = re.compile(r'\s*(\w+)\s*=\s*(\d+)\s*;\s*(?://\s*(.*))?')
comment_re = re.compile(r'^\s*//\s*(.*)')
array_re = re.compile(r'\s*\[([\d.,\-eE\s]+)?\]\s*')

In [3]:
PrimType = namedtuple("PrimType","type size var")
Enum = namedtuple("Enum","name size values")
Constant = namedtuple("Constant","type var value")

# tmp_dir = pathlib.Path(__file__).resolve().parent/"templates"
# env = Environment(loader=FileSystemLoader(tmp_dir))
tmp_dir = Path("templates").resolve()
env = Environment(loader=FileSystemLoader(tmp_dir))
tmpl = env.get_template("msg.cpp.jinja")

class Lang(enum.Enum):
    python = 1
    c = 2

In [16]:
# c - C/C++ name
# py - python name
# fmt - python struct pack/unpack
# size - number of bytes
# complex - for user defined types, more involved
# package - name of package (std, etc)
VarInfo = namedtuple("VarInfo","c py size fmt complex")

std_types = {
    "uint8": VarInfo("uint8_t", "int",1, "B", False),
    "uint16": VarInfo("uint16_t", "int",2, "H", False),
    "uint32": VarInfo("uint32_t", "int", 4, "I", False),
    "uint64": VarInfo("uint64_t", "int",8, "Q", False),
    "int8": VarInfo("int8_t", "int",1, "b", False),
    "int16": VarInfo("int16_t", "int", 2, "h", False),
    "int32": VarInfo("int32_t", "int", 4, "i", False),
    "int64": VarInfo("int64_t", "int", 8, "q", False),
    "float": VarInfo("float", "float", 4, "f", False),
    "double": VarInfo("double", "float", 8, "d", False),
    "bool": VarInfo("bool", "bool", 1, "?", False),
    # "char": VarInfo("bool", "bool", 1, "?", False),
    # "vec": VarInfo("vec_t", "vec_t", 12, "3f", True)
}

extlibs = {}

# PRIMITIVE_TYPES = list(var_types.keys())

In [17]:
def read_file(file_path):
    with open(file_path, 'r') as file:
        data = file.read()
    return data
    
def parse_proto_file(file):
    # Initialize the dictionary to store the parsed contents
    proto_dict = {
        "package": "",
        "imports": [],
        "messages": {},
        "enums": {}
    }

    current_message = None
    current_enum = None
    inside_message = False
    inside_enum = False

    # with open(file_path, 'r') as file:
    # file = read_file(file_path)

    for lineno, line in enumerate(file.split('\n')):
        # print(f"{lineno}: {line}")
            line = line.strip()

            # Skip empty lines
            if not line:
                continue

            # Check for package
            package_match = package_re.match(line)
            if package_match:
                proto_dict["package"] = package_match.group(1)
                continue

            # Check for imports
            import_match = import_re.match(line)
            if import_match:
                proto_dict["imports"].append(import_match.group(1))
                continue

            # Check for message start
            message_match = message_start_re.match(line)
            if message_match:
                current_message = message_match.group(1)
                proto_dict["messages"][current_message] = {
                    "fields": [],
                    "comments": []
                }
                inside_message = True
                inside_enum = False
                continue

            # Check for enum start
            enum_match = enum_start_re.match(line)
            if enum_match:
                current_enum = enum_match.group(1)
                proto_dict["enums"][current_enum] = {
                    "values": [],
                    "comments": []
                }
                inside_enum = True
                inside_message = False
                continue

            # Check for message or enum end
            if line == '}' and (inside_message or inside_enum):
                if inside_message:
                    inside_message = False
                    current_message = None
                elif inside_enum:
                    inside_enum = False
                    current_enum = None
                continue

            # Check for comments
            comment_match = comment_re.match(line)
            if comment_match:
                if current_message:
                    proto_dict["messages"][current_message]["comments"].append(comment_match.group(1))
                elif current_enum:
                    proto_dict["enums"][current_enum]["comments"].append(comment_match.group(1))
                continue

            # Check for fields inside a message
            if inside_message:
                field_match = field_re.match(line)
                if field_match:
                    # repeated = field_match.group(1) is not None
                    field_type = field_match.group(1)
                    field_name = field_match.group(2)
                    field_size = field_match.group(3)
                    field_number = field_match.group(4)
                    comment = field_match.group(5) or ""

                    proto_dict["messages"][current_message]["fields"].append({
                        "name": field_name,
                        "type": field_type,
                        "number": field_number,
                        "array_size": field_size,
                        "comment": comment
                    })
                else:
                    print(f"ERROR, invalid line[{lineno}]: {line}")

            # Check for enum values inside an enum
            if inside_enum:
                enum_value_match = enum_value_re.match(line)
                if enum_value_match:
                    value_name = enum_value_match.group(1)
                    value_number = int(enum_value_match.group(2))
                    comment = enum_value_match.group(3) or ""

                    proto_dict["enums"][current_enum]["values"].append({
                        "name": value_name,
                        "number": value_number,
                        "comment": comment
                    })
            

    return proto_dict

In [18]:
# file_path = "geometry_msgs.proto"
# data = read_file(file_path)
# proto_dict = parse_proto_file(data)
# pprint(proto_dict)

In [8]:
# field_re = re.compile(r'\s*([\w.\[\]]+)\s+(\w+)\s*=?\s*(-?\d*((\.)|([eE]-?))?\d+)?\s*;\s*(?:\/\/\s*(.*))?')

In [9]:
# f = field_re.match("int32[33] bob = -1e3; // hello")
# for i in range(5):
#     print(f.group(i))

In [10]:

# array_re = re.compile(r'\s*\[([\d.,\-eE\s]+)?\]\s*')
# a = array_re.match(" [-.1, -2e-33, 3.44567 ] ")
# g = a.group(1)
# nums = g.split(',')
# for n in nums:
#     print(float(n))

In [11]:
class MsgParts:
    """
    Breaks a message format appart and stores the results so it can be
    converted into other languages. Supported languages:
    - python
    - C/C++
    """
    def __init__(self):
        # self.comments = []  # comments in body of message prototype
        self.fields = []    # variables in message
        self.imports = []  # included message headers/modules
        self.constants = [] # defines
        # self.c_funcs = []   # custom C functions
        # self.py_funcs = []  # custom Python functions
        self.enums = []     # enums
        self.msg_size = 0   # size of message in bytes
        self.name = None    # filename for naming the message
        self.id = 0         # message id number
        self.fmt = None     # struct format string
        # self.namespace = None # cpp namespace

    def get_info(self):
        info = {
            # "name": self.file.stem,
            "name": self.name,
            "vars": self.fields,
            "includes": self.imports,
            "msg_size": self.msg_size,
            "constants": self.constants,
            # "msg_size_type": "uint8_t",
            # "comments": comments,
            # "args": func_args,
            # "functions": msg_parts.c_funcs,
            "enums": self.enums,
            "msgid": self.id,
            "fmt": self.fmt
            # "license_notice": msg_parts.license_notice,
            # "namespace": msg_parts.namespace
        }
        return info

    def __repr__(self):
        return str(self)

    def __str__(self):
        ret = f"{Fore.YELLOW}------------------------------\n"
        ret += f"Name: {self.name}\n"
        ret += f"ID: {self.id}\n"
        ret += f"Size: {self.msg_size} bytes\n"
        ret += f"Fmt: {self.fmt}\n"
        # if self.namespace is not None:
        #     ret += f"Namespace: {self.namespace}\n"
        ret += f"------------------------------\n{Fore.RESET}"
        # ret += f"{Fore.CYAN}Comments:\n{Fore.RESET}"
        # ret += f"{Fore.GREEN}"
        # for c in self.comments:
        #     ret += f" {c}\n"
        # ret += f"{Fore.RESET}"
        
        ret += f"\n{Fore.CYAN}Constants:\n{Fore.RESET}"
        for c in self.constants:
            ret += f" {c}\n"

        ret += f"\n{Fore.CYAN}Fields:\n{Fore.RESET}"
        for f in self.fields:
            ret += f" {f}\n"

        ret += f"\n{Fore.CYAN}Includes:\n{Fore.RESET}"
        ret += f"{Fore.BLUE}"
        for i in self.includes:
            ret += f" {i}\n"
        ret += f"{Fore.RESET}"

        ret += f"\n{Fore.CYAN}Enums:\n{Fore.RESET}"
        for f in self.enums:
            ret += f" {f}\n"

        ret += f"{Fore.CYAN}\nMessage Size:{Fore.RESET}\n"
        ret += f" {self.msg_size} bytes\n"
        ret += f" {self.fmt}\n"
        return ret

In [12]:
class Field:
    
    def __init__(self, package, dtype, name, array_size, default, comments):
        
        self.dtypes = {
            "uint8": int,
            "uint16": int,
            "uint32": int,
            "uint64": int,
            "int8": int,
            "int16": int,
            "int32": int,
            "int64": int,
            "float": float,
            "double": float,
            "char": str,
            "bool": self.get_bool,
        }
    
        if package is not None:
            package = package.replace(".","")
        else:
            package = "std"
            
        self.package = package
        self.dtype = dtype
        self.name = name
        self.array_size = array_size
        if (array_size is not None) and (default is not None):
            self.default = self.get_array(default, self.dtypes[dtype])
        elif (dtype == "char") and (default is not None):
            self.array_size = len(default)
            self.default = default
        elif (dtype == "char") and (array_size is None):
            raise Exception("char data type needs to define array size")
            # print("char data type needs to define array size")
            # self.default = None
        elif (default is not None) and (dtype in self.dtypes):
            self.default = self.dtypes[dtype](default)
        else:
            self.default = None
                
        self.comments = comments

    def get_bool(self, value):
        value = value.lower()
        if value == "true": return True
        elif value == "false": return False
        raise Exception("Invalid value: {value}")

    def get_array(self, string, atype):
        if atype != float and atype != int:
            return 
        try:
            a = array_re.match(string)
            g = a.group(1)
            nums = g.split(',')
            array = []
            for n in nums:
                array.append(atype(n))
            return array
        except:
            return []

    def __str__(self):
        if self.comments is None:
            comments = ""
        else:
            comments = f"{Fore.GREEN}// {self.comments}{Fore.RESET}"

        if self.default is None:
            default = ""
        else:
            default = f" = {Fore.LIGHTWHITE_EX}{self.default}{Fore.RESET}"
            
        if self.array_size is None:
            array = ""
        else:
            array = f"{Fore.YELLOW}[{self.array_size}]{Fore.RESET}"
        
        return f"{Fore.CYAN}{self.package}.{Fore.BLUE}{self.dtype}{Fore.RESET} {self.name}{array}{default}; {comments}"

In [11]:
# const_re = re.compile(r"constant\s+(\w+\.)?([\w]+)\s+(\w+)\[?(\d+)?\]?\s*=?\s*(.+)?\s*;\s*(?:\/\/\s*(.*))?")
const_re = re.compile(r"constant\s+(.+)")
# field_re = re.compile(r'\s*([\w.]+)\s+(\w+)\[?(\d+)?\]?\s*=?\s*([\d\-\+eE_.,\[\]]+)?\s*;\s*(?:\/\/\s*(.*))?')
field_re = re.compile(r"\s*(\w+\.)?([\w]+)\s+(\w+)\[?(\d+)?\]?\s*=?\s*(.+)?\s*;\s*(?:\/\/\s*(.*))?")

data = [
    "constant char bob[] = \"tom12 . 34/#@\"; // frame name in poland 12",
    "constant uint32 tom = 32;",
    "constant int8 bob; // this is a test",
    "constant kevin12.Vector3 package_tom = 3; // hi",
    "constant float sam[3] = [1.2,0.003,-4.5];",
    # "constant char bob;",
    "constant bool tom = false;",
]

for d in data:
    matches = const_re.match(d)
    if matches is not None:
        m = matches.groups()[0]
        # print(m)
        f = field_re.match(m).groups()
        # print(f)
        f = Field(*f)
        print(f)
        # print(f"found {len(m)}: {m}")

[36mstd.[34mchar[39m bob[33m[15][39m = [97m"tom12 . 34/#@"[39m; [32m// frame name in poland 12[39m
[36mstd.[34muint32[39m tom = [97m32[39m; 
[36mstd.[34mint8[39m bob; [32m// this is a test[39m
[36mkevin12.[34mVector3[39m package_tom; [32m// hi[39m
[36mstd.[34mfloat[39m sam[33m[3][39m = [97m[1.2, 0.003, -4.5][39m; 
[36mstd.[34mbool[39m tom = [97mFalse[39m; 


In [None]:
inline = """
package kevin;

import "geometry.proto";
import "kevin";
import "how/now/brown/cow.proto";

// Power supply technology (chemistry) constants
enum BatteryTechnology {
  BATTERY_TECHNOLOGY_UNKNOWN = 0; // Unknown battery technology
  BATTERY_TECHNOLOGY_NIMH = 1;    // Nickel-Metal Hydride battery
  BATTERY_TECHNOLOGY_LION = 2;    // Lithium-ion battery
  BATTERY_TECHNOLOGY_LIPO = 3;    // Lithium Polymer battery
  BATTERY_TECHNOLOGY_LIFE = 4;    // Lithium Iron Phosphate battery
  BATTERY_TECHNOLOGY_NICD = 5;    // Nickel-Cadmium battery
  BATTERY_TECHNOLOGY_LIMN = 6;    // Lithium Manganese Dioxide battery
  BATTERY_TECHNOLOGY_TERNARY = 7; // Ternary Lithium battery
  BATTERY_TECHNOLOGY_VRLA = 8;    // Valve Regulated Lead-Acid battery
}

message Vector3 {
    float x;
    float y;
    float z;
}

message Timestamp {
  // Represents seconds of UTC time since Unix epoch
  uint64 seconds[2]=[1,2];
  float bob = 3.14;

  // Non-negative fractions of a second at nanosecond resolution. Negative
  // second values with fractions must still have non-negative nanos values
  // that count forward in time. Must be from 0 to 999,999,999
  // inclusive.
  uint32 nanos = 1_000_000;
  char frame_id[10];
  kevin.Vector3 tom;
}
"""

proto_dict = parse_proto_file(inline)
pprint(proto_dict)

{'enums': {'BatteryTechnology': {'comments': [],
                                 'values': [{'comment': 'Unknown battery '
                                                        'technology',
                                             'name': 'BATTERY_TECHNOLOGY_UNKNOWN',
                                             'number': 0},
                                            {'comment': 'Nickel-Metal Hydride '
                                                        'battery',
                                             'name': 'BATTERY_TECHNOLOGY_NIMH',
                                             'number': 1},
                                            {'comment': 'Lithium-ion battery',
                                             'name': 'BATTERY_TECHNOLOGY_LION',
                                             'number': 2},
                                            {'comment': 'Lithium Polymer '
                                                        'battery',
                  

In [None]:
tmpl = env.get_template("msg.cpp.jinja")
content = tmpl.render(info)
print(content)