diff --git a/src/roswire/proxy/launch/__init__.py b/src/roswire/proxy/launch/__init__.py index ad0758ff..0870df73 100644 --- a/src/roswire/proxy/launch/__init__.py +++ b/src/roswire/proxy/launch/__init__.py @@ -11,7 +11,9 @@ import xml.etree.ElementTree as ET import attr +import yaml +from .rosparam import load_from_yaml_string as load_rosparam_from_string from .config import ROSConfig, NodeConfig, Parameter from .context import LaunchContext from ..substitution import resolve as resolve_args @@ -27,6 +29,12 @@ _TAG_TO_LOADER = {} +def _read_contents(tag: ET.Element) -> str: + """Reads the text contents of an XML element.""" + # FIXME add support for CDATA -- possibly via lxml or xml.dom? + return ''.join(t.text for t in tag if t.text) + + def _parse_bool(attr: str, val: str) -> bool: """Parses a boolean value from an XML attribute.""" val = val.lower() @@ -170,6 +178,65 @@ def _load_param_tag(self, return ctx, cfg + @tag('rosparam', ['command', 'ns', 'file', 'param', 'subst_value']) + def _load_rosparam_tag(self, + ctx: LaunchContext, + cfg: ROSConfig, + tag: ET.Element + ) -> Tuple[LaunchContext, ROSConfig]: + filename = self._read_optional(tag, 'file', ctx) + subst_value = self._read_optional_bool(tag, 'subst_value', ctx, False) + ns = self._read_optional(tag, 'ns', ctx) or '' + param = self._read_optional(tag, 'param', ctx) or '' + param = namespace_join(ns, param) + full_param = namespace_join(ctx.namespace, param) + value = _read_contents(tag) + + cmd: str = self._read_optional(tag, 'command', ctx) or 'load' + if cmd not in ('load', 'delete', 'dump'): + m = f" unsupported 'command': {cmd}" + raise FailedToParseLaunchFile(m) + + if cmd == 'load' and not filename: + m = " load command requires 'filename' attribute" + raise FailedToParseLaunchFile(m) + + if cmd == 'load': + assert filename is not None # mypy can't work this out + if not self.__files.isfile(filename): + m = f" file does not exist: {filename}" + raise FailedToParseLaunchFile(m) + + if cmd == 'delete' and filename is not None: + m = " command:delete does not support filename" + raise FailedToParseLaunchFile(m) + + # handle load command + if cmd == 'load': + assert filename is not None # mypy can't work this out + yml_text = self.__files.read(filename) + if subst_value: + yml_text = self._resolve_args(yml_text, ctx) + logger.debug("parsing rosparam YAML:\n%s", yml_text) + data = load_rosparam_from_string(yml_text) + logger.debug("rosparam values: %s", data) + if not isinstance(data, dict) and not param: + m = " requires 'param' for non-dictionary values" + raise FailedToParseLaunchFile(m) + cfg = cfg.with_param(full_param, data) + + # handle dump command + if cmd == 'dump': + m = "'dump' command is currently not supported in " + raise NotImplementedError(m) + + # handle delete command + if cmd == 'delete': + m = "'delete' command is currently not supported in " + raise NotImplementedError(m) + + return ctx, cfg + @tag('remap', ['from', 'to']) def _load_remap_tag(self, ctx: LaunchContext, diff --git a/src/roswire/proxy/launch/rosparam.py b/src/roswire/proxy/launch/rosparam.py new file mode 100644 index 00000000..09b59325 --- /dev/null +++ b/src/roswire/proxy/launch/rosparam.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +""" +This file provides utilities for interacting with rosparam. +""" +__all__ = ('load_from_yaml_string',) + +from typing import Dict, Any +import math +import re + +import yaml + + +class YAMLLoader(yaml.SafeLoader): + """A custom YAML loader for rosparam files.""" + + +def load_from_yaml_string(s: str) -> Dict[str, Any]: + """Parses the contents of a rosparam file to a dictionary.""" + return yaml.load(s, Loader=YAMLLoader) or {} + + +def __load_radians(loader: YAMLLoader, node: yaml.YAMLObject) -> float: + """Safely converts rad(num) to a float value. + + Note + ---- + This does not support evaluation of expressions. + """ + expr_s = loader.construct_scalar(node).strip() + if expr_s.startswith('rad('): + expr_s = expr_s[4:-1] + + # TODO safely parse and evaluate expression + return float(expr_s) + + +def __load_degrees(loader: YAMLLoader, node: yaml.YAMLObject) -> float: + """Safely converts deg(num) to a float value.""" + expr_s = loader.construct_scalar(node).strip() + if expr_s.startswith('def('): + expr_s = expr_s[4:-1] + return float(expr_s) * math.pi / 180.0 + + +YAMLLoader.add_constructor('!degrees', __load_degrees) +YAMLLoader.add_implicit_resolver( + '!degrees', re.compile('^deg\([^\)]*\)$'), first='deg(') +YAMLLoader.add_constructor('!radians', __load_radians) +YAMLLoader.add_implicit_resolver( + '!radians', re.compile('^rad\([^\)]*\)$'), first='rad(')