Skip to content
67 changes: 67 additions & 0 deletions src/roswire/proxy/launch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import xml.etree.ElementTree as ET

import attr
import yaml

from .rosparam import load_from_yaml_string as load_rosparam_from_string
from .config import ROSConfig, NodeConfig, Parameter
from .context import LaunchContext
from ..substitution import resolve as resolve_args
Expand All @@ -27,6 +29,12 @@
_TAG_TO_LOADER = {}


def _read_contents(tag: ET.Element) -> str:
"""Reads the text contents of an XML element."""
# FIXME add support for CDATA -- possibly via lxml or xml.dom?
return ''.join(t.text for t in tag if t.text)


def _parse_bool(attr: str, val: str) -> bool:
"""Parses a boolean value from an XML attribute."""
val = val.lower()
Expand Down Expand Up @@ -170,6 +178,65 @@ def _load_param_tag(self,

return ctx, cfg

@tag('rosparam', ['command', 'ns', 'file', 'param', 'subst_value'])
def _load_rosparam_tag(self,
ctx: LaunchContext,
cfg: ROSConfig,
tag: ET.Element
) -> Tuple[LaunchContext, ROSConfig]:
filename = self._read_optional(tag, 'file', ctx)
subst_value = self._read_optional_bool(tag, 'subst_value', ctx, False)
ns = self._read_optional(tag, 'ns', ctx) or ''
param = self._read_optional(tag, 'param', ctx) or ''
param = namespace_join(ns, param)
full_param = namespace_join(ctx.namespace, param)
value = _read_contents(tag)

cmd: str = self._read_optional(tag, 'command', ctx) or 'load'
if cmd not in ('load', 'delete', 'dump'):
m = f"<rosparam> unsupported 'command': {cmd}"
raise FailedToParseLaunchFile(m)

if cmd == 'load' and not filename:
m = "<rosparam> load command requires 'filename' attribute"
raise FailedToParseLaunchFile(m)

if cmd == 'load':
assert filename is not None # mypy can't work this out
if not self.__files.isfile(filename):
m = f"<rosparam> file does not exist: {filename}"
raise FailedToParseLaunchFile(m)

if cmd == 'delete' and filename is not None:
m = "<rosparam> command:delete does not support filename"
raise FailedToParseLaunchFile(m)

# handle load command
if cmd == 'load':
assert filename is not None # mypy can't work this out
yml_text = self.__files.read(filename)
if subst_value:
yml_text = self._resolve_args(yml_text, ctx)
logger.debug("parsing rosparam YAML:\n%s", yml_text)
data = load_rosparam_from_string(yml_text)
logger.debug("rosparam values: %s", data)
if not isinstance(data, dict) and not param:
m = "<rosparam> requires 'param' for non-dictionary values"
raise FailedToParseLaunchFile(m)
cfg = cfg.with_param(full_param, data)

# handle dump command
if cmd == 'dump':
m = "'dump' command is currently not supported in <rosparam>"
raise NotImplementedError(m)

# handle delete command
if cmd == 'delete':
m = "'delete' command is currently not supported in <rosparam>"
raise NotImplementedError(m)

return ctx, cfg

@tag('remap', ['from', 'to'])
def _load_remap_tag(self,
ctx: LaunchContext,
Expand Down
51 changes: 51 additions & 0 deletions src/roswire/proxy/launch/rosparam.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# -*- coding: utf-8 -*-
"""
This file provides utilities for interacting with rosparam.
"""
__all__ = ('load_from_yaml_string',)

from typing import Dict, Any
import math
import re

import yaml


class YAMLLoader(yaml.SafeLoader):
"""A custom YAML loader for rosparam files."""


def load_from_yaml_string(s: str) -> Dict[str, Any]:
"""Parses the contents of a rosparam file to a dictionary."""
return yaml.load(s, Loader=YAMLLoader) or {}


def __load_radians(loader: YAMLLoader, node: yaml.YAMLObject) -> float:
"""Safely converts rad(num) to a float value.

Note
----
This does not support evaluation of expressions.
"""
expr_s = loader.construct_scalar(node).strip()
if expr_s.startswith('rad('):
expr_s = expr_s[4:-1]

# TODO safely parse and evaluate expression
return float(expr_s)


def __load_degrees(loader: YAMLLoader, node: yaml.YAMLObject) -> float:
"""Safely converts deg(num) to a float value."""
expr_s = loader.construct_scalar(node).strip()
if expr_s.startswith('def('):
expr_s = expr_s[4:-1]
return float(expr_s) * math.pi / 180.0


YAMLLoader.add_constructor('!degrees', __load_degrees)
YAMLLoader.add_implicit_resolver(
'!degrees', re.compile('^deg\([^\)]*\)$'), first='deg(')
YAMLLoader.add_constructor('!radians', __load_radians)
YAMLLoader.add_implicit_resolver(
'!radians', re.compile('^rad\([^\)]*\)$'), first='rad(')