# Creating a custom Location Costmap

In this tutorial we will walk through the creation of a costmap through probabilistic machine learning.

After this tutorial you will know:
- Why probabilities are a useful thing in robot planning
- How to craft probabilistic models for your plan

Lets start by installing the important package.

In [1]:
!pip install --upgrade probabilistic_model



Next, we need to gather a lot of data. For that we will write a randomized experiment for grasping a couple of objects.
In the experiment the robot will try to grasp a randomized object using random poses and torso heights.

In [2]:
from pycram.designators.actions.actions import ParkArmsActionPerformable, MoveTorsoActionPerformable
from tf import transformations
import itertools
from typing import Optional, List, Tuple
import numpy as np
import sqlalchemy.orm
import tqdm
import pycram.orm.base
import pycram.task
from pycram.bullet_world import BulletWorld, Object as BulletWorldObject
from pycram.designators.action_designator import PickUpAction, NavigateAction
from pycram.designators.object_designator import ObjectDesignatorDescription
import pycram.enums
from pycram.plan_failures import PlanFailure
from pycram.process_module import ProcessModule
from pycram.process_module import simulated_robot
import pycram.orm

import sqlalchemy.sql
from pycram.pose import Pose
from pycram.ros.viz_marker_publisher import VizMarkerPublisher
        

np.random.seed(420)

ProcessModule.execution_delay = False
pycram.orm.base.ProcessMetaData().description = "Tutorial for learning from experience in a Grasping action."


class GraspingExplorer:
    """Class to try randomized grasping plans."""

    world: Optional[BulletWorld]

    def __init__(self, robots: Optional[List[Tuple[str, str]]] = None, objects: Optional[List[Tuple[str, str]]] = None,
                 arms: Optional[List[str]] = None, grasps: Optional[List[str]] = None,
                 samples_per_scenario: int = 1000):
        """
        Create a GraspingExplorer.
        :param robots: The robots to use
        :param objects: The objects to try to grasp
        :param arms: The arms of the robot to use
        :param grasps: The grasp orientations to use
        :param samples_per_scenario: The number of tries per scenario.
        """
        # store exploration space
        if not robots:
            self.robots: List[Tuple[str, str]] = [("pr2", "pr2.urdf")]

        if not objects:
            self.objects: List[Tuple[str, pycram.enums.ObjectType, str]] = [("cereal", pycram.enums.ObjectType.BREAKFAST_CEREAL, "breakfast_cereal.stl"), 
                                                                            ("bowl", pycram.enums.ObjectType.BOWL, "bowl.stl"),
                                                                            ("milk", pycram.enums.ObjectType.MILK, "milk.stl"),
                                                                            ("spoon", pycram.enums.ObjectType.SPOON, "spoon.stl")]

        if not arms:
            self.arms: List[str] = ["left", "right"]

        if not grasps:
            self.grasps: List[str] = ["left", "right", "front", "top"]

        # store trials per scenario
        self.samples_per_scenario: int = samples_per_scenario

        # chain hyperparameters
        self.hyper_parameters = [self.robots, self.objects, self.arms, self.grasps]

        self.total_tries = 0
        self.total_failures = 0

    def perform(self, session: sqlalchemy.orm.Session):
        """
        Perform all experiments.
        :param session: The database-session to insert the samples in.
        """

        # create progress bar
        progress_bar = tqdm.tqdm(
            total=np.prod([len(p) for p in self.hyper_parameters]) * self.samples_per_scenario)

        self.world = BulletWorld("DIRECT")
        
        v = VizMarkerPublisher()


        # for every robot
        for robot, robot_urdf in self.robots:

            # spawn it
            robot = BulletWorldObject(robot, pycram.enums.ObjectType.ROBOT, robot_urdf)

            # for every obj
            for obj, obj_type, obj_stl in self.objects:

                # spawn it
                bw_object = BulletWorldObject(obj, obj_type, obj_stl, Pose([0, 0, 0.75], [0, 0, 0, 1]))

                # create object designator
                object_designator = ObjectDesignatorDescription(names=[obj])

                # for every arm and grasp pose
                for arm, grasp in itertools.product(self.arms, self.grasps):
                    # sample positions in 2D
                    positions = np.random.uniform([-2, -2], [2, 2], (self.samples_per_scenario, 2))

                    # for every position
                    for position in positions:

                        # set z axis to 0
                        position = [*position, 0]

                        # calculate orientation for robot to face the object
                        angle = np.arctan2(position[1], position[0]) + np.pi
                        orientation = list(transformations.quaternion_from_euler(0, 0, angle, axes="sxyz"))

                        # try to execute a grasping plan
                        with simulated_robot:

                            ParkArmsActionPerformable(pycram.enums.Arms.BOTH).perform()
                            # navigate to sampled position
                            NavigateAction([Pose(position, orientation)]).resolve().perform()

                            # move torso
                            height = np.random.uniform(0., 0.33, 1)[0]
                            MoveTorsoActionPerformable(height).perform()

                            # try to pick it up
                            try:
                                PickUpAction(object_designator, [arm], [grasp]).resolve().perform()

                            # if it fails
                            except PlanFailure:

                                # update failure stats
                                self.total_failures += 1

                            # reset BulletWorld
                            self.world.reset_bullet_world()

                            # update progress bar
                            self.total_tries += 1

                            # insert into database
                            pycram.task.task_tree.insert(session, use_progress_bar=False)
                            pycram.task.reset_tree()

                            progress_bar.update()
                            progress_bar.set_postfix(success_rate=(self.total_tries - self.total_failures) /
                                                                  self.total_tries)

                bw_object.remove()
            robot.remove()

        self.world.exit()

Unknown attribute "type" in /robot[@name='pr2']/link[@name='base_laser_link']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='wide_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='narrow_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='laser_tilt_link']
[WARN] [1709821116.366007]: Could not import RoboKudo messages, RoboKudo interface could not be initialized


Next we have to establish a connection to a database and execute the experiment a couple of times. Note that the (few) number of samples we generate is only for demonstrations.
For robust and reliable machine learning millions of samples are required.

In [3]:
engine = sqlalchemy.create_engine("sqlite+pysqlite:///:memory:")
session = sqlalchemy.orm.Session(bind=engine)
pycram.orm.base.Base.metadata.create_all(bind=engine)
session.commit()

explorer = GraspingExplorer(samples_per_scenario=30)

Write the database to a file if wanted.

In [4]:
import os
explorer.perform(session)
con = engine.raw_connection()

with open(os.path.join(os.path. expanduser('~'), "Documents", "costmap.dump"), 'w') as f:
    for line in con.iterdump():
        f.write('%s\n' % line)

  0%|          | 0/960 [00:00<?, ?it/s]Unknown tag "material" in /robot[@name='plane']/link[@name='planeLink']/collision[1]
Unknown tag "contact" in /robot[@name='plane']/link[@name='planeLink']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='base_laser_link']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='wide_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='narrow_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='laser_tilt_link']
Unknown tag "material" in /robot[@name='plane']/link[@name='planeLink']/collision[1]
Unknown tag "contact" in /robot[@name='plane']/link[@name='planeLink']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='base_laser_link']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='wide_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='narrow_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name=

Read the database from a file instead of executing all the experiments.

In [5]:
import os

engine = sqlalchemy.create_engine("sqlite+pysqlite:///:memory:")
session = sqlalchemy.orm.Session(bind=engine)
# pycram.orm.base.Base.metadata.create_all(bind=engine)
session.commit()

with engine.connect() as con:
    with open(os.path.join(os.path. expanduser('~'), "Documents", "costmap.dump")) as file:
        raw_connection = engine.raw_connection()
        # raw_cursor = raw_connection()
        raw_connection.executescript(file.read())

Next we will define a joint probability distributions over the parameters used in the pick-up actions, for instance, a Joint Probability Tree.

In [6]:
from pycram.resolver.location.jpt_location import JPTCostmapLocation
model = JPTCostmapLocation.fit_from_database(session, success_only=True)

100%|██████████| 960/960 [01:38<00:00,  9.75it/s, success_rate=0.0854]


Finally, we will insert our model in the existing JPT-Costmap wrapper, create a toy simulation and visualize our result.

In [8]:
import random
from pycram.designators.actions.actions import NavigateActionPerformable, PickUpActionPerformable
from pycram.ros.viz_marker_publisher import VizMarkerPublisher
import time

ProcessModule.execution_delay = True

world = BulletWorld("DIRECT")
VizMarkerPublisher(interval=0.1)
robot = BulletWorldObject("pr2", pycram.enums.ObjectType.ROBOT, "pr2.urdf")

milk = BulletWorldObject("milk", pycram.enums.ObjectType.MILK, "milk.stl", pose=Pose([1, 0, 0.75], [0,0,0,1]))
random.seed(420)
np.random.seed(420)
cml = JPTCostmapLocation(milk, reachable_for=robot, model=model)

# cml.visualize()
with simulated_robot:
    for sample in iter(cml):
        ParkArmsActionPerformable(pycram.enums.Arms.BOTH).perform()
        NavigateActionPerformable(sample.pose).perform()
        MoveTorsoActionPerformable(sample.torso_height).perform()
        time.sleep(1)
        try:
            PickUpActionPerformable(ObjectDesignatorDescription(types=[milk.type]).resolve(), arm=sample.reachable_arm, grasp=sample.grasp).perform()
            break
        except pycram.plan_failures.PlanFailure as p:
            continue
time.sleep(5)
world.exit()

Unknown tag "material" in /robot[@name='plane']/link[@name='planeLink']/collision[1]
Unknown tag "contact" in /robot[@name='plane']/link[@name='planeLink']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='base_laser_link']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='wide_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='narrow_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='laser_tilt_link']
Unknown tag "material" in /robot[@name='plane']/link[@name='planeLink']/collision[1]
Unknown tag "contact" in /robot[@name='plane']/link[@name='planeLink']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='base_laser_link']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='wide_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='narrow_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='laser_tilt_link']


Congratulations, you made the PR2 learn how to grasp objects.
Yeehaaw!🤠

If you want to learn more about the usage and integration of probabilities into the plan language, check out this [package](https://probabilistic-model.readthedocs.io/en/latest/index.html).