# Creating a custom Location Costmap

In this tutorial we will walk through the creation of a costmap through machine learning.
First we need to gather a lot of data. For that we will write a randomized experiment for grasping a couple of objects.
In the experiment the robot will try to grasp a randomized object using random poses and torso heights.

In [1]:
!pip install --upgrade pyjpt

Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.1.2[0m[39;49m -> [0m[32;49m23.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m


In [2]:
import itertools
import time
from typing import Optional, List, Tuple

import numpy as np

import sqlalchemy.orm
import tf
import tqdm

import pycram.orm.base
import pycram.task
from pycram.bullet_world import BulletWorld, Object as BulletWorldObject
from pycram.designators.action_designator import MoveTorsoAction, PickUpAction, NavigateAction, ParkArmsAction
from pycram.designators.object_designator import ObjectDesignatorDescription
import pycram.enums
from pycram.plan_failures import PlanFailure
from pycram.process_module import ProcessModule

from pycram.process_module import simulated_robot
import sqlalchemy.orm
from pycram.resolver.location.jpt_location import JPTCostmapLocation
import pycram.orm
from pycram.orm.base import Position, RobotState
from pycram.orm.task import TaskTreeNode, Code
from pycram.orm.action_designator import PickUpAction as ORMPickUpAction
from pycram.orm.object_designator import ObjectDesignator
import sqlalchemy.sql
import pandas as pd

from pycram.pose import Pose

np.random.seed(420)

ProcessModule.execution_delay = False
pycram.orm.base.MetaData().description = "Tutorial for learning from experience in a Grasping action."


class GraspingExplorer:
    """Class to try randomized grasping plans."""

    world: Optional[BulletWorld]

    def __init__(self, robots: Optional[List[Tuple[str, str]]] = None, objects: Optional[List[Tuple[str, str]]] = None,
                 arms: Optional[List[str]] = None, grasps: Optional[List[str]] = None,
                 samples_per_scenario: int = 1000):
        """
        Create a GraspingExplorer.
        :param robots: The robots to use
        :param objects: The objects to try to grasp
        :param arms: The arms of the robot to use
        :param grasps: The grasp orientations to use
        :param samples_per_scenario: The number of tries per scenario.
        """
        # store exploration space
        if not robots:
            self.robots: List[Tuple[str, str]] = [("pr2", "pr2.urdf")]

        if not objects:
            self.objects: List[Tuple[str, str]] = [("cereal", pycram.enums.ObjectType.BREAKFAST_CEREAL, "breakfast_cereal.stl"), 
                                                   ("bowl", pycram.enums.ObjectType.BOWL, "bowl.stl"),
                                                   ("milk", pycram.enums.ObjectType.MILK, "milk.stl"),
                                                   ("spoon", pycram.enums.ObjectType.SPOON, "spoon.stl")]

        if not arms:
            self.arms: List[str] = ["left", "right"]

        if not grasps:
            self.grasps: List[str] = ["left", "right", "front", "top"]

        # store trials per scenario
        self.samples_per_scenario: int = samples_per_scenario

        # chain hyperparameters
        self.hyper_parameters = [self.robots, self.objects, self.arms, self.grasps]

        self.total_tries = 0
        self.total_failures = 0

    def perform(self, session: sqlalchemy.orm.Session):
        """
        Perform all experiments.
        :param session: The database-session to insert the samples in.
        """

        # create progress bar
        progress_bar = tqdm.tqdm(
            total=np.prod([len(p) for p in self.hyper_parameters]) * self.samples_per_scenario)
        
        self.world = BulletWorld("DIRECT")

        # for every robot
        for robot, robot_urdf in self.robots:

            # spawn it
            robot = BulletWorldObject(robot, pycram.enums.ObjectType.ROBOT, robot_urdf)

            # for every obj
            for obj, obj_type, obj_stl in self.objects:

                # spawn it
                bw_object = BulletWorldObject(obj, obj_type, obj_stl, Pose([0, 0, 0.75], [0, 0, 0, 1]))

                # create object designator
                object_designator = ObjectDesignatorDescription(names=[obj])

                # for every arm and grasp pose
                for arm, grasp in itertools.product(self.arms, self.grasps):
                    # sample positions in 2D
                    positions = np.random.uniform([-2, -2], [2, 2], (self.samples_per_scenario, 2))

                    # for every position
                    for position in positions:

                        # set z axis to 0
                        position = [*position, 0]

                        # calculate orientation for robot to face the object
                        angle = np.arctan2(position[1], position[0]) + np.pi
                        orientation = list(tf.transformations.quaternion_from_euler(0, 0, angle, axes="sxyz"))

                        # try to execute a grasping plan
                        with simulated_robot:

                            ParkArmsAction.Action(pycram.enums.Arms.BOTH).perform()
                            # navigate to sampled position
                            NavigateAction([Pose(position, orientation)]).resolve().perform()

                            # move torso
                            height = np.random.uniform(0., 0.33, 1)[0]
                            MoveTorsoAction.Action(height).perform()

                            # try to pick it up
                            try:
                                PickUpAction(object_designator, [arm], [grasp]).resolve().perform()

                            # if it fails
                            except PlanFailure:

                                # update failure stats
                                self.total_failures += 1

                            # reset BulletWorld
                            self.world.reset_bullet_world()

                            # update progress bar
                            self.total_tries += 1

                            # insert into database
                            pycram.task.task_tree.insert(session, use_progress_bar=False)
                            pycram.task.reset_tree()

                            progress_bar.update()
                            progress_bar.set_postfix(success_rate=(self.total_tries - self.total_failures) /
                                                                  self.total_tries)

                bw_object.remove()
            robot.remove()

        self.world.exit()

Next we have to establish a connection to a database and execute the experiment a couple of times. Note that the (few) number of samples we generate is only for demonstrations.
For robust and reliable machine learning millions of samples are required.

In [3]:
engine = sqlalchemy.create_engine("sqlite+pysqlite:///:memory:")
session = sqlalchemy.orm.Session(bind=engine)
pycram.orm.base.Base.metadata.create_all(bind=engine)
session.commit()

explorer = GraspingExplorer(samples_per_scenario=30)
explorer.perform(session)


  0%|          | 0/960 [00:00<?, ?it/s]Unknown tag "material" in /robot[@name='plane']/link[@name='planeLink']/collision[1]
Unknown tag "contact" in /robot[@name='plane']/link[@name='planeLink']
Unknown tag "material" in /robot[@name='plane']/link[@name='planeLink']/collision[1]
Unknown tag "contact" in /robot[@name='plane']/link[@name='planeLink']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='base_laser_link']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='wide_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='narrow_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='laser_tilt_link']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='base_laser_link']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='wide_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='narrow_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name=

Write the database to a file if wanted.

In [4]:
import os
con = engine.raw_connection()

with open(os.path.join(os.path. expanduser('~'), "Documents", "costmap.dump"), 'w') as f:
    for line in con.iterdump():
        f.write('%s\n' % line)

Read the database from a file instead of executing all the experiments.

In [5]:
import os

engine = sqlalchemy.create_engine("sqlite+pysqlite:///:memory:")
session = sqlalchemy.orm.Session(bind=engine)
# pycram.orm.base.Base.metadata.create_all(bind=engine)
session.commit()

with engine.connect() as con:
    with open(os.path.join(os.path. expanduser('~'), "Documents", "costmap.dump")) as file:
        raw_connection = engine.raw_connection()
        # raw_cursor = raw_connection()
        raw_connection.executescript(file.read())

Next we will create a dataframe from the just created database by joining the relevant information together using the pycram.orm package.
We will get the following columns:
    - status of the grasping task
    - type of the grasped object
    - arm used to grasp
    - grasp pose
    - torso height relative to the object
    - x and y coordinate of the robot relative to the object.

Keep in mind that filtering before joining is always advisable, hence the subquery.

In [6]:
robot_pos = sqlalchemy.orm.aliased(Position)
object_pos = sqlalchemy.orm.aliased(Position)

filtered_code = session.query(Code.id, Code.designator).filter(Code.designator != None).subquery("filtered_code")

query = session.query(TaskTreeNode.status, ObjectDesignator.type, ORMPickUpAction.arm, ORMPickUpAction.grasp,
                      sqlalchemy.label("relative torso height", object_pos.z - RobotState.torso_height),
                      sqlalchemy.label("x", robot_pos.x - object_pos.x),
                      sqlalchemy.label("y", robot_pos.y - object_pos.y)). \
    join(filtered_code, filtered_code.c.id == TaskTreeNode.id). \
    join(ORMPickUpAction, ORMPickUpAction.id == filtered_code.c.designator). \
    join(RobotState, RobotState.id == ORMPickUpAction.robot_state). \
    join(robot_pos, RobotState.position == robot_pos.id). \
    join(ObjectDesignator, ObjectDesignator.id == ORMPickUpAction.object). \
    join(object_pos, ObjectDesignator.position == object_pos.id)

df = pd.read_sql(query.statement, session.get_bind())
df["status"] = df["status"].apply(lambda x: str(x.name))
print(df)

        status                         type    arm grasp  \
0       FAILED  ObjectType.BREAKFAST_CEREAL   left  left   
1       FAILED  ObjectType.BREAKFAST_CEREAL   left  left   
2       FAILED  ObjectType.BREAKFAST_CEREAL   left  left   
3       FAILED  ObjectType.BREAKFAST_CEREAL   left  left   
4    SUCCEEDED  ObjectType.BREAKFAST_CEREAL   left  left   
..         ...                          ...    ...   ...   
955     FAILED             ObjectType.SPOON  right   top   
956     FAILED             ObjectType.SPOON  right   top   
957     FAILED             ObjectType.SPOON  right   top   
958     FAILED             ObjectType.SPOON  right   top   
959     FAILED             ObjectType.SPOON  right   top   

     relative torso height         x         y  
0                 0.480005 -0.737416 -0.187877  
1                 0.668903 -0.932071 -1.564287  
2                 0.724878  1.472666  0.518914  
3                 0.553403 -0.589925 -1.729850  
4                 0.690212  0.5054

Next we will define a joint probability distributions over these points using, for instance, a Joint Probability Tree.

In [7]:
import jpt

model = jpt.trees.JPT(variables=jpt.infer_from_dataframe(df, scale_numeric_types=False, precision=0.05),
                      min_samples_leaf=10, min_impurity_improvement=0.01)
model.fit(df)

<JPT #innernodes = 71, #leaves = 72 (143 total)>

Finally, we will insert our model in the existing JPT-Costmap wrapper, create a toy simulation and visualize our result.

In [8]:
ProcessModule.execution_delay = True

world = BulletWorld("GUI")
robot = BulletWorldObject("pr2", pycram.enums.ObjectType.ROBOT, "pr2.urdf")

cereal = BulletWorldObject("milk", pycram.enums.ObjectType.MILK, "milk.stl", pose=Pose([1, 0, 0.75], [0,0,0,1]))
np.random.seed(420)
cml = JPTCostmapLocation(cereal, reachable_for=robot, model=model)
cml.visualize()
with simulated_robot:
    for sample in iter(cml):
        ParkArmsAction.Action(pycram.enums.Arms.BOTH).perform()
        NavigateAction.Action(sample.pose).perform()
        MoveTorsoAction.Action(sample.torso_height).perform()
        try:
            PickUpAction.Action(
                ObjectDesignatorDescription(types=[pycram.enums.ObjectType.MILK]).resolve(),
                arm=sample.reachable_arm, grasp=sample.grasp).perform()
            # time.sleep(5)
        except pycram.plan_failures.PlanFailure as p:
            continue
        break
world.exit()

Unknown tag "material" in /robot[@name='plane']/link[@name='planeLink']/collision[1]
Unknown tag "contact" in /robot[@name='plane']/link[@name='planeLink']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='base_laser_link']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='wide_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='narrow_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='laser_tilt_link']
Unknown tag "material" in /robot[@name='plane']/link[@name='planeLink']/collision[1]
Unknown tag "contact" in /robot[@name='plane']/link[@name='planeLink']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='base_laser_link']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='wide_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='narrow_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='laser_tilt_link']


Congratulations, you made the PR2 learn how to grasp objects.
Yeehaaw!🤠