# Creating a custom Location Costmap

In this tutorial we will walk through the creation of a costmap through machine learning.
First we need to gather a lot of data. For that we will write a randomized experiment for grasping a couple of objects.
In the experiment the robot will try to grasp a randomized object using random poses and torso heights.

In [1]:
import itertools
import time
from typing import Optional, List, Tuple

import numpy as np
np.random.seed(69)
import sqlalchemy.orm
import tf
import tqdm

import pycram.orm.base
import pycram.task
from pycram.bullet_world import BulletWorld, Object as BulletWorldObject
from pycram.designators.action_designator import MoveTorsoAction, PickUpAction, NavigateAction, ParkArmsAction
from pycram.designators.object_designator import ObjectDesignatorDescription
import pycram.enums
from pycram.plan_failures import PlanFailure
from pycram.process_module import ProcessModule
ProcessModule.execution_delay = False
from pycram.process_module import simulated_robot
import sqlalchemy.orm
from pycram.resolver.location.jpt_location import JPTCostmapLocation
from pycram.orm.base import Position, RobotState
from pycram.orm.task import TaskTreeNode, Code
from pycram.orm.action_designator import PickUpAction as ORMPickUpAction
from pycram.orm.object_designator import ObjectDesignator
import sqlalchemy.sql
import pandas as pd


class GraspingExplorer:
    """Class to try randomized grasping plans."""

    world: Optional[BulletWorld]

    def __init__(self, robots: Optional[List[Tuple[str, str]]] = None, objects: Optional[List[Tuple[str, str]]] = None,
                 arms: Optional[List[str]] = None, grasps: Optional[List[str]] = None,
                 samples_per_scenario: int = 1000):
        """
        Create a GraspingExplorer.
        :param robots: The robots to use
        :param objects: The objects to try to grasp
        :param arms: The arms of the robot to use
        :param grasps: The grasp orientations to use
        :param samples_per_scenario: The number of tries per scenario.
        """
        # store exploration space
        if not robots:
            self.robots: List[Tuple[str, str]] = [("pr2", "pr2.urdf")]

        if not objects:
            self.objects: List[Tuple[str, str]] = [("cereal", "breakfast_cereal.stl"), ("bowl", "bowl.stl"),
                                                   ("milk", "milk.stl"),
                                                   ("spoon", "spoon.stl")]

        if not arms:
            self.arms: List[str] = ["left", "right"]

        if not grasps:
            self.grasps: List[str] = ["left", "right", "front", "top"]

        # store trials per scenario
        self.samples_per_scenario: int = samples_per_scenario

        # chain hyperparameters
        self.hyper_parameters = [self.robots, self.objects, self.arms, self.grasps]

        self.total_tries = 0
        self.total_failures = 0

    def perform(self, session: sqlalchemy.orm.Session):
        """
        Perform all experiments.
        :param session: The database-session to insert the samples in.
        """

        # create progress bar
        progress_bar = tqdm.tqdm(
            total=np.prod([len(p) for p in self.hyper_parameters]) * self.samples_per_scenario)

        self.world = BulletWorld("DIRECT")

        # for every robot
        for robot, robot_urdf in self.robots:

            # spawn it
            robot = BulletWorldObject(robot, "robot", robot_urdf)

            # for every obj
            for obj, obj_stl in self.objects:

                # spawn it
                bw_object = BulletWorldObject(obj, obj, obj_stl, [0, 0, 0.75], [0, 0, 0, 1])

                # create object designator
                object_designator = ObjectDesignatorDescription(names=[obj])

                # for every arm and grasp pose
                for arm, grasp in itertools.product(self.arms, self.grasps):
                    # sample positions in 2D
                    positions = np.random.uniform([-2, -2], [2, 2], (self.samples_per_scenario, 2))

                    # for every position
                    for position in positions:

                        # set z axis to 0
                        position = [*position, 0]

                        # calculate orientation for robot to face the object
                        angle = np.arctan2(position[1], position[0]) + np.pi
                        orientation = list(tf.transformations.quaternion_from_euler(0, 0, angle, axes="sxyz"))

                        # try to execute a grasping plan
                        with simulated_robot:

                            ParkArmsAction.Action(pycram.enums.Arms.BOTH).perform()
                            # navigate to sampled position
                            NavigateAction([(position, orientation)]).resolve().perform()

                            # move torso
                            height = np.random.uniform(0., 0.33, 1)[0]
                            MoveTorsoAction.Action(height).perform()

                            # try to pick it up
                            try:
                                PickUpAction(object_designator, [arm], [grasp]).resolve().perform()

                            # if it fails
                            except PlanFailure:

                                # update failure stats
                                self.total_failures += 1

                            # reset BulletWorld
                            self.world.reset_bullet_world()

                            # update progress bar
                            self.total_tries += 1

                            # insert into database
                            pycram.task.task_tree.insert(session, use_progress_bar=False)
                            pycram.task.reset_tree()

                            progress_bar.update()
                            progress_bar.set_postfix(success_rate=(self.total_tries - self.total_failures) /
                                                                  self.total_tries)

                bw_object.remove()
            robot.remove()

        self.world.exit()

Unknown attribute "type" in /robot[@name='pr2']/link[@name='base_laser_link']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='wide_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='narrow_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='laser_tilt_link']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='base_laser_link']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='wide_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='narrow_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='laser_tilt_link']


Next we have to establish a connection to a database and execute the experiment a couple of times. Note that the (few) number of samples we generate is only for demonstrations.
For robust and reliable machine learning millions of samples are required.

In [2]:
engine = sqlalchemy.create_engine("sqlite+pysqlite:///:memory:")
session = sqlalchemy.orm.Session(bind=engine)
pycram.orm.base.Base.metadata.create_all(bind=engine)
session.commit()

explorer = GraspingExplorer(samples_per_scenario=30)
explorer.perform(session)

100%|██████████| 960/960 [00:48<00:00, 18.01it/s, success_rate=0.0677]

Next we will create a dataframe from the just created database by joining the relevant information together using the pycram.orm package.
We will get the following columns:
    - status of the grasping task
    - type of the grasped object
    - arm used to grasp
    - grasp pose
    - torso height relative to the object
    - x and y coordinate of the robot relative to the object.

Keep in mind that filtering before joining is always advisable, hence the subquery.

In [3]:
robot_pos = sqlalchemy.orm.aliased(Position)
object_pos = sqlalchemy.orm.aliased(Position)

filtered_code = session.query(Code.id, Code.designator).filter(Code.designator != None).subquery("filtered_code")

query = session.query(TaskTreeNode.status, ObjectDesignator.type, ORMPickUpAction.arm, ORMPickUpAction.grasp,
                      sqlalchemy.label("relative torso height", object_pos.z - RobotState.torso_height),
                      sqlalchemy.label("x", robot_pos.x - object_pos.x),
                      sqlalchemy.label("y", robot_pos.y - object_pos.y)). \
    join(filtered_code, filtered_code.c.id == TaskTreeNode.id). \
    join(ORMPickUpAction, ORMPickUpAction.id == filtered_code.c.designator). \
    join(RobotState, RobotState.id == ORMPickUpAction.robot_state). \
    join(robot_pos, RobotState.position == robot_pos.id). \
    join(ObjectDesignator, ObjectDesignator.id == ORMPickUpAction.object). \
    join(object_pos, ObjectDesignator.position == object_pos.id)

df = pd.read_sql(query.statement, session.get_bind())
print(df)

        status    type    arm grasp  relative torso height         x         y
0       FAILED  cereal   left  left               0.696237 -0.815003  1.236271
1       FAILED  cereal   left  left               0.566798 -0.598990  1.157637
2       FAILED  cereal   left  left               0.708161  0.245396 -0.985644
3       FAILED  cereal   left  left               0.437907 -1.580092 -1.766157
4       FAILED  cereal   left  left               0.425738  0.693170  0.791319
..         ...     ...    ...   ...                    ...       ...       ...
955     FAILED   spoon  right   top               0.431723  1.278617 -0.343853
956     FAILED   spoon  right   top               0.527409  1.762970 -1.759426
957     FAILED   spoon  right   top               0.584864 -1.316506 -0.108327
958     FAILED   spoon  right   top               0.720372  0.740378 -1.658181
959  SUCCEEDED   spoon  right   top               0.551467 -0.785473 -0.139471

[960 rows x 7 columns]


Next we will define a joint probability distributions over these points using, for instance, a Joint Probability Tree.

In [4]:
import jpt

model = jpt.trees.JPT(variables=jpt.infer_from_dataframe(df, scale_numeric_types=False, precision=0.05),
                      min_samples_leaf=5, min_impurity_improvement=0.01)
model.fit(df)

<JPT #innernodes = 140, #leaves = 141 (281 total)>

Finally, we will insert our model in the existing JPT-Costmap wrapper, create a toy simulation and visualize our result.

In [None]:
ProcessModule.execution_delay = True

world = BulletWorld("GUI")
robot = BulletWorldObject("pr2", "robot", "pr2.urdf")

cereal = BulletWorldObject("milk", "milk", "milk.stl", position=[0, 0, 0.75])

cml = JPTCostmapLocation(cereal, reachable_for=robot, model=model)

with simulated_robot:
    for sample in iter(cml):

        ParkArmsAction.Action(pycram.enums.Arms.BOTH).perform()
        NavigateAction.Action(sample.pose).perform()
        MoveTorsoAction.Action(sample.torso_height).perform()
        try:
            PickUpAction.Action(
                ObjectDesignatorDescription(types=["milk"]).resolve(),
                arm=sample.reachable_arm, grasp=sample.grasp).perform()
            time.sleep(5)
        except pycram.plan_failures.PlanFailure as p:
            continue
        break
world.exit()

[Object(world=<pycram.bullet_world.BulletWorld object at 0x7f1de6c3da30>, name=floor, type=environment, color=[1, 1, 1, 1], id=1, path=/home/tom_sch/catkin_ws/src/pycram/src/pycram/../../resources/cached/plane.urdf, joints={}, links={}, attachments={}, cids={}, original_pose=[[0, 0, 0], [0, 0, 0, 1]], base_origin_shift=[ 0.     0.    10.001]), Object(world=<pycram.bullet_world.BulletWorld object at 0x7f1de6c3da30>, name=pr2, type=robot, color=[1, 1, 1, 1], id=2, path=/home/tom_sch/catkin_ws/src/pycram/src/pycram/../../resources/cached/pr2.urdf, joints={'base_footprint_joint': 0, 'base_bellow_joint': 1, 'base_laser_joint': 2, 'fl_caster_rotation_joint': 3, 'fl_caster_l_wheel_joint': 4, 'fl_caster_r_wheel_joint': 5, 'fr_caster_rotation_joint': 6, 'fr_caster_l_wheel_joint': 7, 'fr_caster_r_wheel_joint': 8, 'bl_caster_rotation_joint': 9, 'bl_caster_l_wheel_joint': 10, 'bl_caster_r_wheel_joint': 11, 'br_caster_rotation_joint': 12, 'br_caster_l_wheel_joint': 13, 'br_caster_r_wheel_joint': 14

Congratulations, you made the PR2 learn how to grasp objects.
Yeehaaw!🤠