In [24]:
#!/usr/bin/env python
#
# License: BSD
#   https://raw.githubusercontent.com/splintered-reality/py_trees/devel/LICENSE
#
##############################################################################
# Documentation
##############################################################################

"""
.. argparse::
   :module: py_trees.demos.action
   :func: command_line_argument_parser
   :prog: py-trees-demo-action-behaviour

.. image:: images/action.gif
"""

##############################################################################
# Imports
##############################################################################

import argparse
import atexit
import multiprocessing
import py_trees.common
import time

import py_trees.console as console

##############################################################################
# Classes
##############################################################################


def description():
    content = "Demonstrates the characteristics of a typical 'action' behaviour.\n"
    content += "\n"
    content += "* Mocks an external process and connects to it in the setup() method\n"
    content += "* Kickstarts new goals with the external process in the initialise() method\n"
    content += "* Monitors the ongoing goal status in the update() method\n"
    content += "* Determines RUNNING/SUCCESS pending feedback from the external process\n"

    if py_trees.console.has_colours:
        banner_line = console.green + "*" * 79 + "\n" + console.reset
        s = "\n"
        s += banner_line
        s += console.bold_white + "Action Behaviour".center(79) + "\n" + console.reset
        s += banner_line
        s += "\n"
        s += content
        s += "\n"
        s += banner_line
    else:
        s = content
    return s


def epilog():
    if py_trees.console.has_colours:
        return console.cyan + "And his noodly appendage reached forth to tickle the blessed...\n" + console.reset
    else:
        return None


def command_line_argument_parser():
    return argparse.ArgumentParser(description=description(),
                                   epilog=epilog(),
                                   formatter_class=argparse.RawDescriptionHelpFormatter,
                                   )


def planning(pipe_connection):
    """
    Emulates an external process which might accept long running planning jobs.
    """
    idle = True
    percentage_complete = 0
    try:
        while(True):
            if pipe_connection.poll():
                pipe_connection.recv()
                percentage_complete = 0
                idle = False
            if not idle:
                percentage_complete += 10
                pipe_connection.send([percentage_complete])
                if percentage_complete == 100:
                    idle = True
            time.sleep(0.5)
    except KeyboardInterrupt:
        pass


class Action(py_trees.behaviour.Behaviour):
    """
    Connects to a subprocess to initiate a goal, and monitors the progress
    of that goal at each tick until the goal is completed, at which time
    the behaviour itself returns with success or failure (depending on
    success or failure of the goal itself).
    连接到子流程以启动目标，并在每个tick点监视目标的进展，直到目标完成，
    此时行为本身返回成功或失败(取决于目标本身的成功或失败)。
    
    This is typical of a behaviour that is connected to an external process
    responsible for driving hardware, conducting a plan, or a long running
    processing pipeline (e.g. planning/vision).
    这是连接到负责驱动硬件、执行计划或长时间运行的处理管道
    (例如规划/愿景)的外部进程的典型行为。

    Key point - this behaviour itself should not be doing any work!
    关键点-这种行为本身不应该做任何工作!
    """
    def __init__(self, name="Action"):
        """
            Default construction.
            默认构造.
        """
        super(Action, self).__init__(name)
        self.logger.debug("%s.__init__()" % (self.__class__.__name__))
        print(self.__class__.__name__)

    def setup(self):
        """
            No delayed initialisation required for this example.
            此示例不需要延迟初始化。
        """
        self.logger.debug("%s.setup()->connections to an external process" % (self.__class__.__name__))
        self.parent_connection, self.child_connection = multiprocessing.Pipe()
        self.planning = multiprocessing.Process(target=planning, args=(self.child_connection,))
        atexit.register(self.planning.terminate)
        self.planning.start()

    def initialise(self):
        """
            Reset a counter variable.
            重置计数器变量。
        """
        self.logger.debug("%s.initialise()->sending new goal" % (self.__class__.__name__))
        self.parent_connection.send(['new goal'])
        self.percentage_completion = 0

    def update(self):
        """
            Increment the counter and decide upon a new status result for the behaviour.
            递增计数器并决定行为的新状态结果。
        """
        new_status = py_trees.common.Status.RUNNING
        if self.parent_connection.poll():
            self.percentage_completion = self.parent_connection.recv().pop()
            if self.percentage_completion == 100:
                new_status = py_trees.common.Status.SUCCESS
        if new_status == py_trees.common.Status.SUCCESS:
            self.feedback_message = "Processing finished"
            self.logger.debug("%s.update()[%s->%s][%s]" % (self.__class__.__name__, self.status, new_status, self.feedback_message))
        else:
            self.feedback_message = "{0}%".format(self.percentage_completion)
            self.logger.debug("%s.update()[%s][%s]" % (self.__class__.__name__, self.status, self.feedback_message))
        return new_status

    def terminate(self, new_status):
        """
            Nothing to clean up in this example.
            在本例中没有需要清理的内容。
        """
        self.logger.debug("%s.terminate()[%s->%s]" % (self.__class__.__name__, self.status, new_status))


##############################################################################
# Main
##############################################################################

def main():
    """
        Entry point for the demo script.
        演示脚本的入口点。
    """
    # command_line_argument_parser().parse_args()

    print(description())

    py_trees.logging.level = py_trees.logging.Level.DEBUG

    action = Action()
    action.setup()
    try:
        for unused_i in range(0, 5):
            action.tick_once()
            time.sleep(0.5)
        print("\n")
    except KeyboardInterrupt:
        pass

In [25]:
py_trees.logging.level = py_trees.logging.Level.DEBUG

In [26]:
action = Action()

[DEBUG] Action               : Action.__init__()
Action


In [27]:
action.setup()

[DEBUG] Action               : Action.setup()->connections to an external process


In [33]:
try:
    for unused_i in range(0, 12):
        action.tick_once()
        time.sleep(0.5)
    print("\n")
except KeyboardInterrupt:
    pass

[DEBUG] Action               : Action.tick()
[DEBUG] Action               : Action.update()[Status.RUNNING][10%]
[DEBUG] Action               : Action.tick()
[DEBUG] Action               : Action.update()[Status.RUNNING][20%]
[DEBUG] Action               : Action.tick()
[DEBUG] Action               : Action.update()[Status.RUNNING][30%]
[DEBUG] Action               : Action.tick()
[DEBUG] Action               : Action.update()[Status.RUNNING][40%]
[DEBUG] Action               : Action.tick()
[DEBUG] Action               : Action.update()[Status.RUNNING][50%]
[DEBUG] Action               : Action.tick()
[DEBUG] Action               : Action.update()[Status.RUNNING][60%]
[DEBUG] Action               : Action.tick()
[DEBUG] Action               : Action.update()[Status.RUNNING][70%]
[DEBUG] Action               : Action.tick()
[DEBUG] Action               : Action.update()[Status.RUNNING][80%]
[DEBUG] Action               : Action.tick()
[DEBUG] Action               : Action.update()[Stat

In [5]:
import py_trees.console as console

In [6]:
console.error("test")

test


In [10]:
print(description())

Demonstrates the characteristics of a typical 'action' behaviour.

* Mocks an external process and connects to it in the setup() method
* Kickstarts new goals with the external process in the initialise() method
* Monitors the ongoing goal status in the update() method
* Determines RUNNING/SUCCESS pending feedback from the external process



In [12]:
command_line_argument_parser().parse_args()

usage: ipykernel_launcher.py [-h]
ipykernel_launcher.py: error: unrecognized arguments: -f /home/hello/.local/share/jupyter/runtime/kernel-d28b04bd-e702-4ee6-9193-20a88bb3ebb5.json


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
