In [None]:
import functools
import py_trees
import py_trees_ros
import py_trees.console as console
import rospy
import sys, os
import numpy as np
sys.path.append("..")
import std_msgs.msg as std_msgs

from copy import deepcopy
from context_based_gesture_operation.msg import Scene as SceneRos

from srcmodules.Scenes import Scene, SceneCoppeliaInterface
from srcmodules.Actions import Actions
from srcmodules.RobotActions import RobotActions,act

from py_trees_ros import subscribers
def shutdown(behaviour_tree):
    behaviour_tree.interrupt()

# Behavior tree tests

- Scene instance can be converted to ROS message object type (`s.to_ros(SceneRos())`)
- Behavior tree in this test is simple: I publish the scene. The class `UpdateScene` writes the scene data to the blackboard.


In [3]:
class PrinterSceneMessages(py_trees.behaviour.Behaviour):
    def __init__(self, name, topic_name="/printer/message", var1="red"):
        super(PrinterSceneMessages, self).__init__(name=name)
        self.topic_name = topic_name
        self.var1 = var1

    def setup(self, timeout):
        self.publisher = rospy.Publisher(self.topic_name, std_msgs.String, queue_size=10, latch=True)
        self.feedback_message = "setup"
        return True

    def update(self):
        self.logger.debug("%s.update()" % self.__class__.__name__)
        blackboard = py_trees.blackboard.Blackboard()
        self.publisher.publish(std_msgs.String(blackboard.scene.robot_attached_str))
        self.feedback_message = "flashing {0}".format(self.var1)
        return py_trees.common.Status.RUNNING

    def terminate(self, new_status):
        self.publisher.publish(std_msgs.String(""))
        self.feedback_message = "cleared"

In [4]:
class UpdateScene(subscribers.ToBlackboard):
    def __init__(self, name, threshold=30.0):
        super(UpdateScene, self).__init__(name=name, topic_name="/scene", topic_type=SceneRos,
        blackboard_variables={"scene": None}, clearing_policy=py_trees.common.ClearingPolicy.NEVER)
        
        self.blackboard = py_trees.blackboard.Blackboard()
        self.blackboard.scene = SceneRos()
        self.blackboard.some_var_1 = False
        
        self.threshold = threshold

    def update(self):
        self.logger.debug("%s.update()" % self.__class__.__name__)
        status = super(UpdateScene, self).update()
        if status != py_trees.common.Status.RUNNING:
            if self.blackboard.scene.robot_attached_str != "":
                self.blackboard.some_var_1 = True
                rospy.logwarn_throttle(60, "%s: gripper attached!" % self.name)
            else:
                self.blackboard.some_var_1 = False
                
            self.feedback_message = "idk"
        return status
class HaveIntent(subscribers.ToBlackboard):
    def __init__(self, name, threshold=30.0):
        super(HaveIntent)

In [5]:
def create_root():
    ''' BT
    Saves scene to bb, when message comes.
    If gripper attached -> prints something
    '''
    # behaviours
    root = py_trees.composites.Parallel("Tutorial")
    topics2bb = py_trees.composites.Sequence("Topics2BB")
    scene2bb = UpdateScene(name="Scene2BB",
                                      topic_name="/scene",
                                      threshold=30.0
                                      )
    priorities = py_trees.composites.Selector("Priorities")
    gripper_check = py_trees.meta.success_is_failure(py_trees.composites.Selector)(name="Gripper Attached Print")
    is_gripper_attached = py_trees.blackboard.CheckBlackboardVariable(
        name="Gripper attached?",
        variable_name='scene_eef_attached',
        expected_value=""
    )
    flash_led_strip = PrinterSceneMessages(
        name="printDumyTexts",
        var1="red")
    idle = py_trees.behaviours.Running(name="Idle")

    # tree
    root.add_children([topics2bb, priorities])
    topics2bb.add_child(scene2bb)
    priorities.add_children([gripper_check, idle])
    gripper_check.add_children([flash_led_strip,is_gripper_attached])
    return root

In [6]:
rospy.init_node("tree")

#lackboard.scene = SceneRos()

root = create_root()
behaviour_tree = py_trees_ros.trees.BehaviourTree(root)
rospy.on_shutdown(functools.partial(shutdown, behaviour_tree))

Unable to register with master node [http://localhost:11311]: master may not be running yet. Will keep trying.


In [7]:
if not behaviour_tree.setup(timeout=15):
    console.logerror("failed to setup the tree, aborting.")
    sys.exit(1)

In [8]:
behaviour_tree.tick_tock(500)

## Try custom tree build

In [1]:
import sys; sys.path.append("..")

In [2]:
import rospy
from srcmodules import BTreeLib
import py_trees_ros, functools

In [3]:
rospy.init_node("tree")

root = BTreeLib.create_tree()
behaviour_tree = py_trees_ros.trees.BehaviourTree(root)
rospy.on_shutdown(functools.partial(BTreeLib.shutdown, behaviour_tree))

if not behaviour_tree.setup(timeout=15):
    console.logerror("failed to setup the tree, aborting.")
    sys.exit(1)

In [None]:
while True:
    behaviour_tree.tick()
    input("=== Tick ===")

update scene: Status.SUCCESS
update gestures: Status.SUCCESS
generate intent: Status.SUCCESS
Execute TA Status.SUCCESS! move_up,


In [2]:
sys.path.append(os.path.abspath("./../../coppelia_sim_ros_interface/src"))
from coppelia_sim_ros_client import CoppeliaROSInterface

In [3]:
import rospy
rospy.init_node("main", anonymous=True)

In [4]:
cop = CoppeliaROSInterface()
from geometry_msgs.msg import Point, Pose, Quaternion

In [5]:
sci = SceneCoppeliaInterface(cop)

In [17]:
RobotActions.reset__not_general(sci, cop)

KeyboardInterrupt: 

In [8]:
pub = rospy.Publisher("/scene", SceneRos, queue_size=5)
pub.publish(SceneRos())