diff --git a/minimal-nodes-stubs/Makefile b/minimal-nodes-stubs/Makefile index 71b7f4a..025f35f 100644 --- a/minimal-nodes-stubs/Makefile +++ b/minimal-nodes-stubs/Makefile @@ -2,17 +2,20 @@ build: make -C dummy_image_source build make -C dummy_image_filter build + make -C dummy_simulator build make -C random_agent build build-no-cache: make -C dummy_image_source build-no-cache make -C dummy_image_filter build-no-cache + make -C dummy_simulator build-no-cache make -C random_agent build-no-cache push: make -C dummy_image_source push make -C dummy_image_filter push + make -C dummy_simulator push make -C random_agent push test-all_connected: diff --git a/minimal-nodes-stubs/dummy_image_filter/Dockerfile b/minimal-nodes-stubs/dummy_image_filter/Dockerfile index d53db7a..78d787c 100644 --- a/minimal-nodes-stubs/dummy_image_filter/Dockerfile +++ b/minimal-nodes-stubs/dummy_image_filter/Dockerfile @@ -1,4 +1,5 @@ FROM python:3.7 +WORKDIR /project COPY requirements.txt requirements.txt RUN pip install -r requirements.txt diff --git a/minimal-nodes-stubs/dummy_image_source/Dockerfile b/minimal-nodes-stubs/dummy_image_source/Dockerfile index 0c4249f..daf8f03 100644 --- a/minimal-nodes-stubs/dummy_image_source/Dockerfile +++ b/minimal-nodes-stubs/dummy_image_source/Dockerfile @@ -1,4 +1,5 @@ FROM python:3.7 +WORKDIR /project COPY requirements.txt requirements.txt RUN pip install -r requirements.txt diff --git a/minimal-nodes-stubs/dummy_simulator/.dockerignore b/minimal-nodes-stubs/dummy_simulator/.dockerignore new file mode 100644 index 0000000..163f080 --- /dev/null +++ b/minimal-nodes-stubs/dummy_simulator/.dockerignore @@ -0,0 +1,3 @@ +** +!*.py +!requirements.txt diff --git a/minimal-nodes-stubs/dummy_simulator/Dockerfile b/minimal-nodes-stubs/dummy_simulator/Dockerfile new file mode 100644 index 0000000..f41903e --- /dev/null +++ b/minimal-nodes-stubs/dummy_simulator/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.7 +WORKDIR /project + +COPY requirements.txt requirements.txt +RUN pip install -r requirements.txt + + + +COPY . . + +ENTRYPOINT ["python3", "dummy_simulator.py"] diff --git a/minimal-nodes-stubs/dummy_simulator/Makefile b/minimal-nodes-stubs/dummy_simulator/Makefile new file mode 100644 index 0000000..6a0f973 --- /dev/null +++ b/minimal-nodes-stubs/dummy_simulator/Makefile @@ -0,0 +1,20 @@ +repo=aidonode-dummy_simulator +# repo=$(shell basename -s .git `git config --get remote.origin.url`) +branch=$(shell git rev-parse --abbrev-ref HEAD) +tag=duckietown/$(repo):$(branch) + +build: + docker build -t $(tag) . + +build-no-cache: + docker build -t $(tag) --no-cache . + +push: build + docker push $(tag) + +test-data1-direct: + ./dummy_simulator.py < test_data/in1.json > test_data/out1.json + +test-data1-docker: + docker run -i $(tag) < test_data/in1.json > test_data/out1.json + diff --git a/minimal-nodes-stubs/dummy_simulator/dummy_simulator.py b/minimal-nodes-stubs/dummy_simulator/dummy_simulator.py new file mode 100755 index 0000000..86085e2 --- /dev/null +++ b/minimal-nodes-stubs/dummy_simulator/dummy_simulator.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python3 +from dataclasses import dataclass + +import numpy as np + +from aido_node_wrapper import wrap_direct, Context +from aido_schemas import JPGImage, Duckiebot1Observations, Duckiebot1Commands +from aido_schemas.protocol_simulator import SetMap, SpawnRobot, RobotName, StateDump, Step, RobotInterfaceDescription, \ + RobotObservations, RobotState, protocol_simulator, SetRobotCommands, RobotPerformance, Metric, PerformanceMetrics + + +# Specialize to our datatype +@dataclass +class MySetRobotCommands(SetRobotCommands): + robot_name: RobotName + t_effective: float + commands: Duckiebot1Commands + + +@dataclass +class MyRobotObservations(RobotObservations): + observations: Duckiebot1Observations + + +class DummySimulator: + """ A dummy simulator implementation. """ + current_time: float + robot_name: str + + def init(self, context: Context): + context.log('init()') + + def on_received_seed(self, context, data: int): + context.log(f'seed({data})') + + def on_received_clear(self, context): + context.log(f'clear()') + + def on_received_set_map(self, context, data: SetMap): + context.log(f'set_map({data})') + # TODO: load map + + def on_received_spawn_robot(self, data: SpawnRobot): + self.robot_name = data.robot_name + # TODO: set pose of robot + + def on_received_get_robot_interface_description(self, context, data: RobotName): + rid = RobotInterfaceDescription(robot_name=data, observations=Duckiebot1Observations, + commands=Duckiebot1Commands) + context.write('robot_interface_description', rid) + + def on_received_get_robot_performance(self, context, data: RobotName): + context.log(f'get_robot_interface_description()') + metrics = {} + metrics['reward'] = Metric(higher_is_better=True, cumulative_value=self.current_time, + description="Dummy reward equal to survival time.") + pm = PerformanceMetrics(metrics) + rid = RobotPerformance(robot_name=data, t_effective=self.current_time, performance=pm) + context.write('robot_performance', rid) + + def on_received_start_episode(self, context): + context.log(f'start_episode()') + self.current_time = 0 + + def on_received_step(self, context, data: Step): + context.log(f'step({data})') + self.current_time = data.until + + def on_received_set_robot_commands(self, context, data: MySetRobotCommands): + context.log(f'set_robot_commands({data})') + + def on_received_get_robot_observations(self, context, data: RobotName): + context.log(f'get_robot_observation({data!r})') + camera = get_random_image(shape=(200, 300)) + obs = Duckiebot1Observations(camera) + ro = MyRobotObservations(obs) + context.write('robot_observations', ro, with_schema=True) + + def on_received_get_robot_state(self, context, data: RobotName): + context.log(f'get_robot_state({data!r})') + rs = RobotState(robot_name=data, t_effective=self.current_time, state=None) + context.write('robot_state', rs) + + def on_received_dump_state(self, context): + context.log(f'dump_state()') + context.write('dump_state', StateDump(None)) + + +def get_random_image(shape): + H, W = shape + values = (128 + np.random.randn(H, W, 3) * 60).astype('uint8') + jpg_data = bgr2jpg(values) + image = JPGImage(jpg_data) + return image + + +# noinspection PyUnresolvedReferences +def bgr2jpg(image_cv) -> bytes: + import cv2 + compress = cv2.imencode('.jpg', image_cv)[1] + jpg_data = np.array(compress).tostring() + return jpg_data + + +def main(): + node = DummySimulator() + protocol = protocol_simulator + protocol.inputs['set_robot_commands'] = MySetRobotCommands + protocol.outputs['robot_observations'] = MyRobotObservations + wrap_direct(node=node, protocol=protocol) + + +if __name__ == '__main__': + main() diff --git a/minimal-nodes-stubs/dummy_simulator/requirements.txt b/minimal-nodes-stubs/dummy_simulator/requirements.txt new file mode 100644 index 0000000..c949043 --- /dev/null +++ b/minimal-nodes-stubs/dummy_simulator/requirements.txt @@ -0,0 +1,7 @@ +-e git://github.com/duckietown/aido-protocols.git@v4-devel3#egg=aido-protocols +-e git://github.com/AndreaCensi/zuper_utils.git@v4-devel#egg=zuper_utils + +numpy==1.16.2 + + +opencv-python==4.0.0.21 diff --git a/minimal-nodes-stubs/dummy_simulator/test_data/in1.json b/minimal-nodes-stubs/dummy_simulator/test_data/in1.json new file mode 100644 index 0000000..70a8caf --- /dev/null +++ b/minimal-nodes-stubs/dummy_simulator/test_data/in1.json @@ -0,0 +1,4 @@ +{"compat": ["aido2"], "topic": "seed", "data": 12} +{"compat": ["aido2"], "topic": "clear"} +{"compat": ["aido2"], "topic": "set_map", "data": {"map_data": "TBD"}} +{"compat": ["aido2"], "topic": "spawn_robot", "data": {"robot_name": "ego", "configuration": {"pose": null, "velocity": null }}} diff --git a/minimal-nodes-stubs/random_agent/Dockerfile b/minimal-nodes-stubs/random_agent/Dockerfile index 17c6dbf..8cb815b 100644 --- a/minimal-nodes-stubs/random_agent/Dockerfile +++ b/minimal-nodes-stubs/random_agent/Dockerfile @@ -1,4 +1,5 @@ FROM python:3.7 +WORKDIR /project COPY requirements.txt requirements.txt RUN pip install -r requirements.txt diff --git a/minimal-nodes-stubs/random_agent/random_agent.py b/minimal-nodes-stubs/random_agent/random_agent.py index 17880fb..b9daa8a 100755 --- a/minimal-nodes-stubs/random_agent/random_agent.py +++ b/minimal-nodes-stubs/random_agent/random_agent.py @@ -1,8 +1,10 @@ #!/usr/bin/env python3 + + import numpy as np from aido_node_wrapper import wrap_direct, Context -from aido_schemas import PWMCommands, protocol_agent_jpg_pwm, EpisodeStart +from aido_schemas import EpisodeStart, protocol_agent_duckiebot1, PWMCommands, Duckiebot1Commands, LEDSCommands, RGB class RandomAgent: @@ -13,12 +15,15 @@ def init(self, context: Context): def on_received_episode_start(self, context: Context, data: EpisodeStart): context.log(f'Starting episode "{data.episode_name}".') - def on_received_camera_image(self, context: Context): + def on_received_observations(self, context: Context): pwm_left = np.random.uniform(0.0, 1.0) pwm_right = np.random.uniform(0.0, 1.0) - commands = PWMCommands(motor_left=pwm_left, motor_right=pwm_right) - context.write('pwm_commands', commands) + grey = RGB(0, 0, 0) + led_commands = LEDSCommands(grey, grey, grey, grey, grey) + pwm_commands = PWMCommands(motor_left=pwm_left, motor_right=pwm_right) + commands = Duckiebot1Commands(pwm_commands, led_commands) + context.write('commands', commands) def finish(self, context: Context): context.log('finish()') @@ -26,7 +31,7 @@ def finish(self, context: Context): def main(): node = RandomAgent() - protocol = protocol_agent_jpg_pwm + protocol = protocol_agent_duckiebot1 wrap_direct(node=node, protocol=protocol) diff --git a/src/aido_node_wrapper/__init__.py b/src/aido_node_wrapper/__init__.py index 88cd447..bbbe0e3 100644 --- a/src/aido_node_wrapper/__init__.py +++ b/src/aido_node_wrapper/__init__.py @@ -1 +1,11 @@ +import logging + +from aido_nodes import logger as aido_nodes_logger + +logger = aido_nodes_logger.getChild('wrapper') + +logger_interaction = logger.getChild("interaction") + +logger_interaction.setLevel(logging.CRITICAL) + from .wrapper import * diff --git a/src/aido_node_wrapper/constants.py b/src/aido_node_wrapper/constants.py index e69de29..8c8f7e1 100644 --- a/src/aido_node_wrapper/constants.py +++ b/src/aido_node_wrapper/constants.py @@ -0,0 +1,13 @@ + + +ENV_NAME = 'AIDONODE_NAME' +ENV_DATA_IN = 'AIDONODE_DATA_IN' +ENV_DATA_OUT = 'AIDONODE_DATA_OUT' +ENV_META_IN = 'AIDONODE_META_IN' +ENV_META_OUT = 'AIDONODE_META_OUT' +ENV_TRANSLATE = 'AIDONODE_TRANSLATE' +ENV_ENCODING = 'AIDONODE_ENCODING' +ENV_ENCODING_JSON = 'json' +ENV_ENCODING_CBOR = 'cbor' +ENV_ENCODING_VALID = [ENV_ENCODING_JSON, ENV_ENCODING_CBOR] +KNOWN = [ENV_DATA_IN, ENV_DATA_OUT, ENV_META_IN, ENV_META_OUT, ENV_NAME, ENV_TRANSLATE, ENV_ENCODING] diff --git a/src/aido_node_wrapper/identify.py b/src/aido_node_wrapper/identify.py index d7e25a6..cb2ac49 100644 --- a/src/aido_node_wrapper/identify.py +++ b/src/aido_node_wrapper/identify.py @@ -57,8 +57,7 @@ def describe_bd(nd: BuildDescription): def describe_cd(nd: ConfigDescription): s = [] for f in dataclasses.fields(nd.config): - - # for k, v in nd.config.__annotations__.items(): + # for k, v in nd.config.__annotations__.items(): s.append('%20s: %s = %s' % (f.name, f.type, f.default)) if not s: return 'No configuration switches available.' @@ -97,15 +96,6 @@ class NodeInfo: def identify_command(command) -> NodeInfo: - # "describe_config": type(None), - # - # "set_config": SetConfig, - # - # "describe_protocol": type(None), - # - # "describe_node": type(None), - # - # "describe_build": type(None), d = [{'topic': 'wrapper.describe_protocol'}, {'topic': 'wrapper.describe_config'}, {'topic': 'wrapper.describe_node'}, @@ -116,10 +106,9 @@ def identify_command(command) -> NodeInfo: p['compat'] = ['aido2'] to_send += (json.dumps(p) + '\n').encode('utf-8') cp = subprocess.run(command, input=to_send, capture_output=True) - # f = open('/dev/stderr', 'wb') s = cp.stderr.decode('utf-8') - # f.write(cp.stderr) - sys.stderr.write(indent(s.strip(), '|', ' stderr: |') +'\n\n') + + sys.stderr.write(indent(s.strip(), '|', ' stderr: |') + '\n\n') # noinspection PyTypeChecker f = BufferedReader(BytesIO(cp.stdout)) stream = read_cbor_or_json_objects(f) diff --git a/src/aido_node_wrapper/meta_protocol.py b/src/aido_node_wrapper/meta_protocol.py index 01859bd..78e4cc6 100644 --- a/src/aido_node_wrapper/meta_protocol.py +++ b/src/aido_node_wrapper/meta_protocol.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Any, List +from typing import * from aido_nodes import InteractionProtocol @@ -31,6 +31,29 @@ class ProtocolDescription: data: InteractionProtocol meta: InteractionProtocol +@dataclass +class CommsHealth: + # ignored because not compatible + ignored: Dict[str, int] + # unexpected topics + unexpected: Dict[str, int] + # malformed data + malformed: Dict[str, int] + # if we are completely lost + unrecoverable_protocol_error: bool + + +@dataclass +class NodeHealth: + # there is a critical error that makes it useless to continue + critical: bool + # severe problem but we can continue + severe: bool + # a minor problem to report + minor: bool + + details: str + LogEntry = str diff --git a/src/aido_node_wrapper/wrapper.py b/src/aido_node_wrapper/wrapper.py index d17712e..7bef176 100644 --- a/src/aido_node_wrapper/wrapper.py +++ b/src/aido_node_wrapper/wrapper.py @@ -8,6 +8,7 @@ import time from abc import ABCMeta, abstractmethod from dataclasses import dataclass +from io import BufferedReader from typing import List, Optional, Iterator, Dict, Tuple, Any import cbor2 @@ -15,27 +16,29 @@ from aido_node_wrapper.meta_protocol import basic_protocol, SetConfig, ProtocolDescription, ConfigDescription, \ BuildDescription, NodeDescription -from aido_nodes import InteractionProtocol, InputReceived, OutputProduced, Unexpected, LanguageChecker, logger +from aido_nodes import InteractionProtocol, InputReceived, OutputProduced, Unexpected, LanguageChecker from aido_nodes.structures import TimingInfo, local_time, TimeSpec, timestamp_from_seconds, DecodingError, \ - ExternalProtocolViolation, NotConforming + ExternalProtocolViolation, NotConforming, ExternalTimeout from compmake.utils import make_sure_dir_exists from contracts.utils import format_obs, check_isinstance -from zuper_json import read_cbor_or_json_objects from zuper_json.ipce import object_to_ipce, ipce_to_object +from zuper_json.json2cbor import read_next_either_json_or_cbor from zuper_json.json_utils import encode_bytes_before_json_serialization, decode_bytes_before_json_deserialization +from . import logger, logger_interaction def wrap_direct(node, protocol, args: Optional[List[str]] = None): if args is None: args = sys.argv[1:] + check_implementation(node, protocol) run_loop(node, protocol, args) class Context(metaclass=ABCMeta): @abstractmethod - def write(self, topic: str, data: Any, timing: TimingInfo = None): + def write(self, topic: str, data: Any, timing: TimingInfo = None, with_schema: bool = False): pass @abstractmethod @@ -73,6 +76,8 @@ def write(self, topic, data, timing=None, with_schema=False): msg = f'Output channel "{topic}" not found in protocol; know {sorted(self.protocol.outputs)}.' raise Exception(msg) + # logger.info(f'Writing output "{topic}".') + klass = self.protocol.outputs[topic] if isinstance(klass, type): check_isinstance(data, klass) @@ -82,35 +87,37 @@ def write(self, topic, data, timing=None, with_schema=False): if isinstance(res, Unexpected): msg = f'Unexpected output {topic}: {res}' logger.error(msg) - else: - klass = self.protocol.outputs[topic] + return - if isinstance(data, dict): - data = ipce_to_object(data, {}, {}, expect_type=klass) + klass = self.protocol.outputs[topic] - if timing is None: - timing = self.last_timing + if isinstance(data, dict): + data = ipce_to_object(data, {}, {}, expect_type=klass) - s = time.time() - hostname = socket.gethostname() - if timing.received is None: - # XXX - time1 = timestamp_from_seconds(s) - else: - time1 = timing.received.time - processed = TimeSpec(time=time1, - time2=timestamp_from_seconds(s), - frame='epoch', - clock=hostname) - timing.processed[self.node_name] = processed - # timing = TimingInfo(acquired=acquired, processed=processed) - m = {} - m[FIELD_COMPAT] = [CUR_PROTOCOL] - m[FIELD_TOPIC] = self.tout.get(topic, topic) - m[FIELD_DATA] = object_to_ipce(data, {}, with_schema=with_schema) - timing.received = None - m[FIELD_TIMING] = object_to_ipce(timing, {}, with_schema=False) - self._write_raw(m) + if timing is None: + timing = self.last_timing + + s = time.time() + hostname = socket.gethostname() + if timing.received is None: + # XXX + time1 = timestamp_from_seconds(s) + else: + time1 = timing.received.time + processed = TimeSpec(time=time1, + time2=timestamp_from_seconds(s), + frame='epoch', + clock=hostname) + timing.processed[self.node_name] = processed + # timing = TimingInfo(acquired=acquired, processed=processed) + m = {} + m[FIELD_COMPAT] = [CUR_PROTOCOL] + m[FIELD_TOPIC] = self.tout.get(topic, topic) + m[FIELD_DATA] = object_to_ipce(data, {}, with_schema=with_schema) + timing.received = None + m[FIELD_TIMING] = object_to_ipce(timing, {}, with_schema=False) + self._write_raw(m) + logger_interaction.debug(f'Written output "{topic}".') def _write_raw(self, json_data): if self.binary_out: @@ -119,7 +126,6 @@ def _write_raw(self, json_data): self.of.flush() else: json_data = encode_bytes_before_json_serialization(json_data) - j = json.dumps(json_data) + '\n' j = j.encode('utf-8') self.of.write(j) @@ -129,6 +135,22 @@ def log(self, s): prefix = f'{self.hostname}:{self.node_name}: ' logger.info(prefix + s) + def info(self, s): + prefix = f'{self.hostname}:{self.node_name}: ' + logger.info(prefix + s) + + def debug(self, s): + prefix = f'{self.hostname}:{self.node_name}: ' + logger.debug(prefix + s) + + def warning(self, s): + prefix = f'{self.hostname}:{self.node_name}: ' + logger.warning(prefix + s) + + def error(self, s): + prefix = f'{self.hostname}:{self.node_name}: ' + logger.error(prefix + s) + def get_translation_table(t: str) -> Tuple[Dict[str, str], Dict[str, str]]: tout = {} @@ -144,24 +166,16 @@ def get_translation_table(t: str) -> Tuple[Dict[str, str], Dict[str, str]]: return tin, tout -ENV_NAME = 'AIDONODE_NAME' -ENV_DATA_IN = 'AIDONODE_DATA_IN' -ENV_DATA_OUT = 'AIDONODE_DATA_OUT' -ENV_META_IN = 'AIDONODE_META_IN' -ENV_META_OUT = 'AIDONODE_META_OUT' -ENV_TRANSLATE = 'AIDONODE_TRANSLATE' -ENV_ENCODING = 'AIDONODE_ENCODING' -ENV_ENCODING_JSON = 'json' -ENV_ENCODING_CBOR = 'cbor' -ENV_ENCODING_VALID = [ENV_ENCODING_JSON, ENV_ENCODING_CBOR] -KNOWN = [ENV_DATA_IN, ENV_DATA_OUT, ENV_META_IN, ENV_META_OUT, ENV_NAME, ENV_TRANSLATE, ENV_ENCODING] - - def check_variables(): for k, v in os.environ.items(): if k.startswith('AIDO') and k not in KNOWN: - msg = f'I do not variable "{k}" set in environment with value "{v}".' - logger.error(msg) + msg = f'I do not expect variable "{k}" set in environment with value "{v}".' + msg += ' I expect: %s' % ", ".join(KNOWN) + logger.warn(msg) + + +from .constants import ENV_ENCODING_VALID, ENV_DATA_IN, KNOWN, ENV_DATA_OUT, ENV_TRANSLATE, ENV_ENCODING, \ + ENV_ENCODING_CBOR, ENV_ENCODING_JSON, ENV_META_OUT, ENV_META_IN, ENV_NAME def run_loop(node, protocol: InteractionProtocol, args: Optional[List[str]] = None): @@ -223,51 +237,53 @@ def run_loop(node, protocol: InteractionProtocol, args: Optional[List[str]] = No import traceback -def open_for_read(fin, timeout=15): +def open_for_read(fin, timeout=None): t0 = time.time() # first open reader file in case somebody is waiting for it while not os.path.exists(fin): - if time.time() - t0 > timeout: + delta = time.time() - t0 + if timeout is not None and (delta > timeout): msg = f'The file {fin} was not created before {timeout} seconds. I give up.' raise EnvironmentError(msg) - logger.info(f'waiting for file {fin} to be created') + logger_interaction.info(f'waiting for file {fin} to be created') time.sleep(1) - logger.info(f'Opening input {fin}') - fi = open(fin, 'rb') + logger_interaction.info(f'Opening input {fin}') + fi = open(fin, 'rb', buffering=0) + fi = BufferedReader(fi, buffer_size=1) return fi def open_for_write(fout): if fout == '/dev/stdout': - return open('/dev/stdout', 'wb') + return open('/dev/stdout', 'wb', buffering=0) else: wants_fifo = fout.startswith('fifo:') fout = fout.replace('fifo:', '') - logger.info(f'Opening output file {fout} (wants fifo: {wants_fifo})') + logger_interaction.info(f'Opening output file {fout} (wants fifo: {wants_fifo})') if not os.path.exists(fout): if wants_fifo: make_sure_dir_exists(fout) os.mkfifo(fout) - logger.info('Fifo created.') + logger_interaction.info('Fifo created.') else: is_fifo = stat.S_ISFIFO(os.stat(fout).st_mode) if wants_fifo and not is_fifo: - logger.info(f'Recreating {fout} as a fifo.') + logger_interaction.info(f'Recreating {fout} as a fifo.') os.unlink(fout) os.mkfifo(fout) if wants_fifo: - logger.info('Fifo detected. Opening will block until a reader appears.') + logger_interaction.info('Fifo detected. Opening will block until a reader appears.') make_sure_dir_exists(fout) - fo = open(fout, 'wb') + fo = open(fout, 'wb', buffering=0) if wants_fifo: - logger.info('Reader has connected to my fifo') + logger_interaction.info('Reader has connected to my fifo') return fo @@ -341,25 +357,34 @@ def on_received_describe_build(self, context): wrapper = Wrapper() - for stream, parsed in inputs([fi, mi]): + waiting_for = 'Expecting control message or one of: %s' % pc.get_expected_events() - topic = parsed[FIELD_TOPIC] - topic = tin.get(topic, topic) - parsed[FIELD_TOPIC] = topic - logger.info(f'received {topic}') - if topic.startswith('wrapper.'): - parsed[FIELD_TOPIC] = topic.replace('wrapper.', '') - handle_message_node(parsed, basic_protocol, pc2, wrapper, context_meta) - else: - if not initialized: - call_if_fun_exists(node, 'init', context=context_data) - initialized = True - handle_message_node(parsed, protocol, pc, node, context_data) + try: + for stream, parsed in inputs([fi, mi], waiting_for=waiting_for): + + topic = parsed[FIELD_TOPIC] + topic = tin.get(topic, topic) + parsed[FIELD_TOPIC] = topic + logger_interaction.info(f'Received message of topic "{topic}".') + if topic.startswith('wrapper.'): + parsed[FIELD_TOPIC] = topic.replace('wrapper.', '') + handle_message_node(parsed, basic_protocol, pc2, wrapper, context_meta) + else: + if not initialized: + call_if_fun_exists(node, 'init', context=context_data) + initialized = True + handle_message_node(parsed, protocol, pc, node, context_data) + except StopIteration: + pass + except ExternalTimeout as e: + msg = 'Could not receive any other messages.' + msg += '\n Expecting one of: %s' % pc.get_expected_events() + raise ExternalTimeout(msg) from e res = pc.finish() if isinstance(res, Unexpected): msg = f'Protocol did not finish: {res}' - logger.error(msg) + logger_interaction.error(msg) if initialized: call_if_fun_exists(node, 'finish', context=context_data) @@ -379,6 +404,11 @@ def handle_message_node(parsed, protocol, pc: LanguageChecker, agent, context): ob = ipce_to_object(data, {}, {}, expect_type=klass) except BaseException as e: msg = f'Cannot deserialize object for topic "{topic}" expecting {klass}.' + try: + parsed = json.dumps(parsed, indent=2) + except: + parsed = str(parsed) + msg += '\n\n' + contracts.indent(parsed, '|', 'parsed: |') raise DecodingError(msg) from e if FIELD_TIMING in parsed: @@ -411,12 +441,11 @@ def handle_message_node(parsed, protocol, pc: LanguageChecker, agent, context): import select - -def read_exactly_one(f): - for a in read_cbor_or_json_objects(f): - return a - else: - raise StopIteration() +# def read_exactly_one(f): +# for a in read_cbor_or_json_objects(f): +# return a +# else: +# raise StopIteration() CUR_PROTOCOL = 'aido2' @@ -426,17 +455,22 @@ def read_exactly_one(f): FIELD_TOPIC = 'topic' -def inputs(fs, timeout=1.0, give_up=15) -> Iterator[Dict]: +def inputs(fs, give_up: Optional[float] = None, waiting_for: str = None) -> Iterator[Dict]: last = time.time() + intermediate_timeout = 3.0 while True: - readyr, readyw, readyx = select.select(fs, [], fs, timeout) + readyr, readyw, readyx = select.select(fs, [], fs, intermediate_timeout) if readyr: for fi in readyr: try: - parsed = read_exactly_one(fi) + parsed = read_next_either_json_or_cbor(fi, waiting_for=waiting_for) except StopIteration: - logger.info(f'EOF') - break + return + + if not isinstance(parsed, dict): + msg = f'Expected a dictionary, obtained {parsed!r}' + logger.error(msg) + continue if not FIELD_DATA in parsed: parsed[FIELD_DATA] = None @@ -465,14 +499,17 @@ def inputs(fs, timeout=1.0, give_up=15) -> Iterator[Dict]: break elif readyx: - logger.warning('Exceptional condition on input channel.') + logger.warning('Exceptional condition on input channel %s' % readyx) else: delta = time.time() - last - if delta > give_up: - msg = f'I am giving up after {delta} seconds.' - raise ExternalProtocolViolation(msg) + if give_up is not None and (delta > give_up): + msg = f'I am giving up after %.1f seconds.' % delta + raise ExternalTimeout(msg) else: - logger.warning(f'Input channel not ready after {delta} seconds. Will re-try') + msg = f'Input channel not ready after %.1f seconds. Will re-try.' % delta + if waiting_for: + msg += '\n' + waiting_for + logger.warning(msg) import contracts @@ -493,9 +530,17 @@ def call_if_fun_exists(ob, fname, **kwargs): def check_implementation(node, protocol: InteractionProtocol): + logger.info('checking implementation') for n in protocol.inputs: expect_fn = f'on_received_{n}' if not hasattr(node, expect_fn): msg = f'Missing function {expect_fn}' - msg += f'\nI know {sorted(node.__dict__)}' + msg += f'\nI know {sorted(type(node).__dict__)}' raise NotConforming(msg) + + for x in type(node).__dict__: + if x.startswith('on_received_'): + input_name = x.replace('on_received_', '') + if input_name not in protocol.inputs: + msg = f'The node has function "{x}" but there is no input "{input_name}".' + raise NotConforming(msg) diff --git a/src/aido_node_wrapper/wrapper_outside.py b/src/aido_node_wrapper/wrapper_outside.py new file mode 100644 index 0000000..36ad919 --- /dev/null +++ b/src/aido_node_wrapper/wrapper_outside.py @@ -0,0 +1,139 @@ +import json +import os +import time +from dataclasses import dataclass +from io import BufferedReader +from typing import * + +from aido_nodes import InteractionProtocol +from zuper_json.ipce import object_to_ipce, ipce_to_object +from zuper_json.json2cbor import read_next_either_json_or_cbor +from . import logger, ProtocolDescription, basic_protocol, encode_bytes_before_json_serialization, logger_interaction + + +def wait_for_creation(fn): + while not os.path.exists(fn): + msg = 'waiting for creation of %s' % fn + logger.info(msg) + time.sleep(1) + + +X = TypeVar('X') + + +@dataclass +class MsgReceived(Generic[X]): + topic: str + data: X + + +from .constants import ENV_ENCODING, ENV_ENCODING_JSON, ENV_ENCODING_CBOR + + +def should_use_binary_encoding(): + encoding = os.environ.get(ENV_ENCODING, ENV_ENCODING_JSON) + binary_out = ENV_ENCODING_CBOR == encoding + return binary_out + + +import cbor2 as cbor + + +class ComponentInterface(object): + + def __init__(self, fnin, fnout, expect_protocol: InteractionProtocol, nickname: str): + self.nickname = nickname + self._cc = None + os.mkfifo(fnin) + self.fpin = open(fnin, 'wb', buffering=0) + wait_for_creation(fnout) + self.fnout = fnout + f = open(fnout, 'rb', buffering=0) + # noinspection PyTypeChecker + self.fpout = BufferedReader(f, buffer_size=1) + self.nreceived = 0 + self.expect_protocol = expect_protocol + self.node_protocol = None + self.node_protocol = self._get_node_protocol() + + + def cc(self, f): + """ CC-s everything that is read or written to this file. """ + self._cc = f + + def _get_node_protocol(self) -> InteractionProtocol: + self.write('wrapper.describe_protocol') + ob: MsgReceived[ProtocolDescription] = self.read_one(expect_topic='protocol_description') + return ob.data.data + + def write(self, topic, data=None, with_schema=False): + data = object_to_ipce(data, {}, with_schema=with_schema) + msg = {'compat': ['aido2'], 'topic': topic, 'data': data} + + j = self._serialize(msg) + + self.fpin.write(j) + self.fpin.flush() + + if self._cc: + self._cc.write(j) + self._cc.flush() + + logger_interaction.info(f'Written to topic "{topic}" >> {self.nickname}.') + + def _serialize(self, msg) -> bytes: + if should_use_binary_encoding(): + j = cbor.dumps(msg) + return j + else: + msg = encode_bytes_before_json_serialization(msg) + j = (json.dumps(msg) + '\n').encode('utf-8') + return j + + def read_one(self, expect_topic=None, timeout=None) -> MsgReceived: + try: + if expect_topic: + waiting_for = f'Expecting topic "{expect_topic}" << {self.nickname}.' + else: + waiting_for = None + msg = read_next_either_json_or_cbor(self.fpout, timeout=timeout, waiting_for=waiting_for) + if self._cc: + msg_b = self._serialize(msg) + self._cc.write(msg_b) + self._cc.flush() + + topic = msg['topic'] + if expect_topic: + if topic != expect_topic: + msg = f'I expected topic "{expect_topic}" but received "{topic}".' + raise Exception(msg) # XXX + if topic in basic_protocol.outputs: + klass = basic_protocol.outputs[topic] + else: + if self.node_protocol: + if topic not in self.node_protocol.outputs: + msg = f'Cannot find topic "{topic}" in outputs of detected node protocol.' + msg += '\nI know: %s' % sorted(self.node_protocol.outputs) + raise Exception(msg) # XXX + else: + klass = self.node_protocol.outputs[topic] + else: + if not topic in self.expect_protocol.outputs: + msg = f'Cannot find topic "{topic}".' + raise Exception(msg) # XXX + else: + klass = self.expect_protocol.outputs[topic] + data = ipce_to_object(msg['data'], {}, expect_type=klass) + self.nreceived += 1 + return MsgReceived[klass](topic, data) + + except StopIteration as e: + msg = 'EOF detected on %s after %d messages.' % (self.fnout, self.nreceived) + if expect_topic: + msg += f' Expected topic "{expect_topic}".' + raise StopIteration(msg) from e + except TimeoutError as e: + msg = 'Timeout detected on %s after %d messages.' % (self.fnout, self.nreceived) + if expect_topic: + msg += f' Expected topic "{expect_topic}".' + raise TimeoutError(msg) from e diff --git a/src/aido_nodes/__init__.py b/src/aido_nodes/__init__.py index 571feb6..20c1bde 100644 --- a/src/aido_nodes/__init__.py +++ b/src/aido_nodes/__init__.py @@ -2,6 +2,8 @@ from .col_logging import logger +logger.info('Using aido_nodes version %s' % __version__) + from .language import * from .language_parse import * diff --git a/src/aido_nodes/structures.py b/src/aido_nodes/structures.py index f76b732..0b49682 100644 --- a/src/aido_nodes/structures.py +++ b/src/aido_nodes/structures.py @@ -19,6 +19,7 @@ 'TimingInfo', 'EnvironmentError', 'NotConforming', + 'ExternalTimeout', ] @@ -33,6 +34,8 @@ class ProtocolViolation(AIDONodesException): class ExternalProtocolViolation(ProtocolViolation): pass +class ExternalTimeout(ExternalProtocolViolation): + pass class InternalProtocolViolation(ProtocolViolation): pass diff --git a/src/aido_schemas/protocol_simulator.py b/src/aido_schemas/protocol_simulator.py new file mode 100644 index 0000000..613c4e1 --- /dev/null +++ b/src/aido_schemas/protocol_simulator.py @@ -0,0 +1,220 @@ +from dataclasses import dataclass +from typing import * + +import numpy as np + +from aido_nodes.language import InteractionProtocol + +RobotName = str + + +@dataclass +class SetMap: + map_data: Dict[str, Any] + + +@dataclass +class RobotConfiguration: + pose: np.ndarray + velocity: np.ndarray + + +@dataclass +class SpawnRobot: + robot_name: RobotName + configuration: RobotConfiguration + + +@dataclass +class SetRobotCommands: + robot_name: RobotName + t_effective: float + commands: Any + + +@dataclass +class GetRobotObservations: + robot_name: RobotName + t_effective: float + + +@dataclass +class GetRobotState: + robot_name: RobotName + t_effective: float + + +@dataclass +class RobotObservations: + robot_name: RobotName + t_effective: float + observations: Any + + +@dataclass +class RobotState: + robot_name: RobotName + t_effective: float + state: Any + + +@dataclass +class SimulationState: + """ + Returns the simulation state. + + done: Whether the simulation should be terminated. + done_why: Human-readable short message. + done_code: Short string to use as code for statistics. + """ + done: bool + done_why: Optional[str] + done_code: Optional[str] + + +@dataclass +class Metric: + higher_is_better: bool + cumulative_value: float + description: str + + +@dataclass +class PerformanceMetrics: + """ + Performance metrics for an agent. + + By convention there will be one called "reward" for RL tasks. + + Note that the values are *cumulative* to make it possible to have + a sampling-invariant behavior. + """ + metrics: Dict[str, Metric] + + +@dataclass +class RobotPerformance: + robot_name: RobotName + t_effective: float + performance: PerformanceMetrics + + +@dataclass +class RobotInterfaceDescription: + robot_name: RobotName + observations: type + commands: type + + +@dataclass +class Step: + until: float + + +@dataclass +class DumpState: + t_effective: float + + +@dataclass +class StateDump: + state: Any + + +protocol_simulator = InteractionProtocol( + description="""\ + +Interface to be implemented by a simulator. + +Logical API: + +`simulator.clear(reset_info)` + +Resets the simulation data. Need to re-transmit map and robot poses. + +`simulator.set_map(map)` + +Sets the map to use. + +`simulator.spawn_robot(name, configuration)` + +Adds a robot to the simulation of the given name. + +`simulator.start_episode` + +`simulator.step(until: timestamp)` + +Steps the simulation until the given timestamp. + + +`simulator.set_robot_commands(t: timestamp, commands)` + +Steps the simulation until the given timestamp. + +`simulator.get_robot_observations(name, t: timestamps)` + +Asks for the dump of a robot state. + +`simulator.dump_robot_state(name)` + +Asks for the dump of a robot state. + + +`seed(int)` + +Sets seed for random process. + + + """, + language="""\ + + in:seed? ; + ( + in:clear ; + in:set_map ; + (in:spawn_robot)*; + + (in:get_robot_interface_description; out:robot_interface_description)*; + + in:start_episode; + + ( + in:step | + in:set_robot_commands | + (in:get_robot_observations ; out:robot_observations) | + (in:get_robot_performance ; out:robot_performance) | + (in:get_robot_state ; out:robot_state) | + (in:dump_state ; out:state_dump) + )* + )* +""", + inputs={ + # Seed random number generator + "seed": int, + "clear": type(None), + + "set_map": SetMap, + "spawn_robot": SpawnRobot, + "get_robot_interface_description": RobotName, + "get_robot_performance": RobotName, + + "start_episode": type(None), + + # Step physics + "step": Step, + + "set_robot_commands": SetRobotCommands, + "get_robot_observations": GetRobotObservations, + "get_robot_state": GetRobotObservations, + + # Dump state information + "dump_state": DumpState, + }, + outputs={ + "state_dump": StateDump, + "robot_observations": RobotObservations, + "robot_state": RobotState, + "robot_performance": RobotPerformance, + "robot_interface_description": RobotInterfaceDescription, + }, +) diff --git a/src/aido_schemas/protocols.py b/src/aido_schemas/protocols.py index 8c32b77..0370dfc 100644 --- a/src/aido_schemas/protocols.py +++ b/src/aido_schemas/protocols.py @@ -1,18 +1,41 @@ from aido_nodes import InteractionProtocol -from aido_schemas import JPGImage, PWMCommands, EpisodeStart +from aido_schemas import JPGImage, PWMCommands, Duckiebot1Observations, Duckiebot1Commands +from typing import * +from dataclasses import dataclass -protocol_agent_jpg_pwm = InteractionProtocol( +@dataclass +class EpisodeStart: + """ Marker for the start of an episode. """ + episode_name: str + + + +protocol_agent = InteractionProtocol( + description=""" + +Receives a DistortedImage, to which it replies with PWMCommands. + + """, + inputs={"observations": Any, + "episode_start": EpisodeStart}, + outputs={"commands": Any}, + language=""" + (in:episode_start ; (in:observations ; out:commands)*)* + """ +) + + +protocol_agent_duckiebot1 = InteractionProtocol( description=""" -Receives a DistortedImage, to which -it replies with PWMCommands. +Receives a DistortedImage, to which it replies with PWMCommands. """, - inputs={"camera_image": JPGImage, + inputs={"observations": Duckiebot1Observations, "episode_start": EpisodeStart}, - outputs={"pwm_commands": PWMCommands}, + outputs={"commands": Duckiebot1Commands}, language=""" - (in:episode_start ; (in:camera_image ; out:pwm_commands)*)* + (in:episode_start ; (in:observations ; out:commands)*)* """ ) diff --git a/src/aido_schemas/protocols_drafts.py b/src/aido_schemas/protocols_drafts.py deleted file mode 100644 index ff0f1e8..0000000 --- a/src/aido_schemas/protocols_drafts.py +++ /dev/null @@ -1,133 +0,0 @@ -from aido_nodes.language import InteractionProtocol -from aido_schemas import JPGImage, PWMCommands, Seed, RequestRender, EpisodeStart -from typing import * -from dataclasses import dataclass - -renderer_protocol = InteractionProtocol( - description="", - inputs={ - "map_set": ..., - # Seed random number generator - "seed": Seed, - # Reset request - "reset": type(None), - # Render request - produces image - "render_image": RequestRender, - }, - outputs={"image": JPGImage}, - language="", -) - -DuckietownMap = Any -RobotName = str -Pose = Any # TODO - - -@dataclass -class SetRobotPose: - name: RobotName - pose: Pose - - -@dataclass -class Result: - """ Result of an operation. """ - success: bool - message: Optional[str] - - -simulator_protocol = InteractionProtocol( - description="""\ - -Interface to be implemented by a simulator. - -Logical API: - - -`simulator.set_map(map)` - -Sets the map to use. - -`simulator.spawn_robot(name)` - -Adds a robot to the simulation of the given name. - -`simulator.set_robot_pose(name, pose)` - -Sets a robot's pose. - -`simulator.set_robot_dynamics(name, params)` - -`simulator.set_robot_extrinsics(name, params)` - -`simulator.set_robot_intrinsics(name, params)` - -`simulator.step(until: timestamp)` - -Steps the simulation until the given timestamp. - -`simulator.clear(t0: timestamp)` - -Resets the simulation data. Need to re-transmit map and robot poses. - -`simulator.set_robot_commands(t: timestamp, commands)` - -Steps the simulation until the given timestamp. - -`simulator.get_robot_observations(name, t: timestamps)` - -Asks for the dump of a robot state. - -`simulator.dump_robot_state(name)` - -Asks for the dump of a robot state. - - -`seed(int)` - -Sets seed for random process. - - - """, - language="""\ - in:seed? ; - in:next_episode ; out:episode_start ; - ( - (in:set_commands ; out:commands_ack) | - (in:step_physics ; out:step_physics_ack) | - (in:dump_state ; out:state_dump) | - (in:render ; out:image) | - (in:next_episode ; out:episode_start) - )* -""", - inputs={ - # Seed random number generator - "seed": int, - "set_map": DuckietownMap, - "spawn_robot": SpawnRobot, - "set_robot_pose": ..., - "step": ..., - "clear": ..., - "set_robot_commands": PWMCommands, - - # Reset request - "reset": type(None), - # Render request - produces image - "render_image": type(None), - # Step physics - "step_physics": float, - # Dump state information - "dump_state": float, - }, - outputs={ - "seed_ack": Result, - "set_map_ack": Result, - "spawn_robot_ack": type(None), - - "image": JPGImage, - "state_dump": Any, - "reset_ack": type(None), - "step_physics_ack": type(None), - "episode_start": EpisodeStart, - }, -) diff --git a/src/aido_schemas/schemas.py b/src/aido_schemas/schemas.py index 895c112..8ff777d 100644 --- a/src/aido_schemas/schemas.py +++ b/src/aido_schemas/schemas.py @@ -1,60 +1,59 @@ -from dataclasses import dataclass, field +from dataclasses import dataclass -def doc(s): - return field(metadata=dict(doc=s)) +# +# def doc(s): +# return field(metadata=dict(doc=s)) + @dataclass class PWMCommands: """ - PWM commands are floats between 0 and 1. + PWM commands are floats between -1 and 1. """ motor_left: float motor_right: float -@dataclass -class WheelsCmd: - """ Kinematic wheels commands. Radiants per second. """ - vel_left: float - vel_right: float + +# @dataclass +# class WheelsCmd: +# """ Kinematic wheels commands. Radiants per second. """ +# vel_left: float +# vel_right: float @dataclass class JPGImage: - """ An image in JPG format. """ - jpg_data: bytes = doc("Bytes of a JPG file") + An image in JPG format. + :param jpg_data: Bytes of a JPG file + """ + jpg_data: bytes -@dataclass -class EpisodeStart: - """ Marker for the start of an episode. """ - episode_name: str @dataclass -class Timestamp: - pass +class Duckiebot1Observations: + camera: JPGImage @dataclass -class Seed: - """ Used for describing an RNG seed. """ - seed: int - +class RGB: + r: float + g: float + b: float @dataclass -class StateDump: - pass +class LEDSCommands: + center: RGB + front_left: RGB + front_right: RGB + back_left: RGB + back_right: RGB @dataclass -class StepPhysicsACK: - pass - - - -class RequestRender: - pass - - +class Duckiebot1Commands: + wheels: PWMCommands + LEDS: LEDSCommands