# Hands on Object Relational Mapping in PyCram

This tutorial will walk through the serialization of a minimal plan in pycram.
First we will import sqlalchemy, create an in memory database and connect a session to it.

In [1]:
import sqlalchemy
import sqlalchemy.orm

engine = sqlalchemy.create_engine("sqlite+pysqlite:///:memory:", echo=False)
session = sqlalchemy.orm.Session(bind=engine)
session

<sqlalchemy.orm.session.Session at 0x7f9dd4933a00>

Next we create the database schema using the sqlalchemy functionality. For that we need to import the base class of pycram.orm.

In [2]:
import pycram.orm.base
import pycram.orm.task
import pycram.orm.object_designator
import pycram.orm.motion_designator
import pycram.orm.action_designator
pycram.orm.base.Base.metadata.create_all(engine)
session.commit()

Unknown attribute "type" in /robot[@name='pr2']/link[@name='base_laser_link']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='wide_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='narrow_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='laser_tilt_link']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='base_laser_link']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='wide_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='narrow_stereo_optical_frame']
Unknown attribute "type" in /robot[@name='pr2']/link[@name='laser_tilt_link']


Next we will write a simple pick and place plan. We will construct a TaskTree around it such that we can serialize it later. As usual, we first create a world and then define the plan. After that we get and print the task tree.

In [3]:
from pycram.designators.action_designator import *
from pycram.designators.location_designator import *
from pycram.process_module import simulated_robot
from pycram.enums import Arms
from pycram.task import with_tree
import pycram.task
from pycram.bullet_world import BulletWorld, Object
from pycram.robot_descriptions.robot_description_handler import InitializedRobotDescription as robot_description
from pycram.designators.object_designator import *
import anytree

world = BulletWorld()
pr2 = Object("pr2", "robot", "pr2.urdf")
kitchen = Object("kitchen", "environment", "kitchen.urdf")
milk = Object("milk", "milk", "milk.stl", position=[1.3, 1, 0.9])
cereal = Object("cereal", "cereal", "breakfast_cereal.stl", position=[1.3, 0.7, 0.95])
milk_desig = ObjectDesignatorDescription(names=["milk"])
cereal_desig = ObjectDesignatorDescription(names=["cereal"])
robot_desig = ObjectDesignatorDescription(names=["pr2"]).resolve()
kitchen_desig = ObjectDesignatorDescription(names=["kitchen"])

@with_tree
def plan():
    with simulated_robot:
        ParkArmsAction.Action(Arms.BOTH).perform()
        MoveTorsoAction([0.3]).resolve().perform()
        pickup_pose = CostmapLocation(target=cereal_desig.resolve(), reachable_for=robot_desig).resolve()
        pickup_arm = pickup_pose.reachable_arms[0]
        NavigateAction(target_locations=[pickup_pose.pose]).resolve().perform()
        PickUpAction(object_designator_description=cereal_desig, arms=[pickup_arm], grasps=["front"]).resolve().perform()
        ParkArmsAction([Arms.BOTH]).resolve().perform()

        place_island = SemanticCostmapLocation("kitchen_island_surface", kitchen_desig.resolve(),
                                           cereal_desig.resolve()).resolve()

        place_stand = CostmapLocation(place_island.pose, reachable_for=robot_desig, reachable_arm=pickup_arm).resolve()

        NavigateAction(target_locations=[place_stand.pose]).resolve().perform()

        PlaceAction(cereal_desig, target_locations=[place_island.pose], arms=[pickup_arm]).resolve().perform()

        ParkArmsAction.Action(Arms.BOTH).perform()

plan()
world.exit()

task_tree = pycram.task.task_tree
print(anytree.RenderTree(task_tree))



no_operation()
└── plan()
    ├── perform(self = ParkArmsAction.Action(arm=<Arms.BOTH: 3>))
    ├── perform(self = MoveTorsoAction.Action(position=0.3))
    ├── perform(self = NavigateAction.Action(target_location=[[0.7199999690055847, 0.6800000071525574, 0.0], [0.0, 0.0, 0.017233690238890705, 0.9998514889326064]]))
    ├── perform(self = PickUpAction.Action(object_designator=ObjectDesignatorDescription.Object(name='cereal', type='cereal', pose=((1.3, 0.7, 0.95), (0.0, 0.0, 0.0, 1.0)), bullet_world_object=<pycram.bullet_world.Object object at 0x7f9d88ef1790>), arm='left', grasp='front'))
    ├── perform(self = ParkArmsAction.Action(arm=<Arms.BOTH: 3>))
    ├── perform(self = NavigateAction.Action(target_location=[[-1.9075000286102295, 0.7792000770568848, 0.0], [0.0, 0.0, 0.1643989830107149, 0.9863939245479174]]))
    ├── perform(self = PlaceAction.Action(object_designator=ObjectDesignatorDescription.Object(name='cereal', type='cereal', pose=((-1.739311695098877, 1.3202829360961914, 1.2

Next we serialize the task tree by just recursively inserting from its root.

In [4]:
task_tree.root.insert(session)

pycram.orm.task.TaskTreeNode(1, 1, 2023-04-28 09:50:27.528656, None, RUNNING, None)

Lastly we can look at various table to see how the structures got logged.
For example, we can get all the navigate actions that occurred.

In [7]:
navigations = session.query(pycram.orm.action_designator.NavigateAction).all()
print(*navigations, sep="\n")


pycram.orm.action_designator.NavigateAction(3, Navigate, 1, 1)
pycram.orm.action_designator.NavigateAction(6, Navigate, 3, 3)


Since inheritance is correctly mapped in the ORM package we can also get all actions that were executed with the correct classes in just one line.

In [6]:
actions = session.query(pycram.orm.action_designator.Action).all()
print(*actions, sep="\n")

pycram.orm.action_designator.ParkArmsAction(1, ParkArms, BOTH)
pycram.orm.action_designator.MoveTorsoAction(2, MoveTorso, 0.3)
pycram.orm.action_designator.NavigateAction(3, Navigate, 1, 1)
pycram.orm.action_designator.PickUpAction(4, PickUp, left, front, 1)
pycram.orm.action_designator.ParkArmsAction(5, ParkArms, BOTH)
pycram.orm.action_designator.NavigateAction(6, Navigate, 3, 3)
pycram.orm.action_designator.PlaceAction(7, Place, left, 5, 5, 2)
pycram.orm.action_designator.ParkArmsAction(8, ParkArms, BOTH)
pycram.orm.action_designator.ParkArmsAction(9, ParkArms, BOTH)


Of course all relational algebra operators, such as filtering and joining also work in pycram.orm queries. For example if we want all actions that have an object designator assigned in them, we can execute:

In [8]:
object_actions = session.query(pycram.orm.action_designator.PickUpAction,
                               pycram.orm.object_designator.ObjectDesignator).join(pycram.orm.object_designator.ObjectDesignator).all() + \
                 session.query(pycram.orm.action_designator.PlaceAction,
                               pycram.orm.object_designator.ObjectDesignator).join(pycram.orm.object_designator.ObjectDesignator).all()
print(*object_actions, sep="\n")

(pycram.orm.action_designator.PickUpAction(4, PickUp, left, front, 1), pycram.orm.object_designator.ObjectDesignator(1, Object, cereal, cereal, 2, 2))
(pycram.orm.action_designator.PlaceAction(7, Place, left, 5, 5, 2), pycram.orm.object_designator.ObjectDesignator(2, Object, cereal, cereal, 4, 4))


If we want to filter for all successful tasks we can just add the filter operator.

In [9]:
successful_tasks = session.query(pycram.orm.task.TaskTreeNode).filter(pycram.orm.task.TaskTreeNode.status == "SUCCEEDED")
print(*successful_tasks, sep="\n")

pycram.orm.task.TaskTreeNode(2, 2, 2023-04-28 09:50:36.756899, 2023-04-28 09:50:37.311820, SUCCEEDED, 1)
pycram.orm.task.TaskTreeNode(3, 3, 2023-04-28 09:50:36.756942, 2023-04-28 09:50:36.780745, SUCCEEDED, 2)
pycram.orm.task.TaskTreeNode(4, 4, 2023-04-28 09:50:36.780827, 2023-04-28 09:50:36.780937, SUCCEEDED, 2)
pycram.orm.task.TaskTreeNode(5, 5, 2023-04-28 09:50:37.078064, 2023-04-28 09:50:37.079345, SUCCEEDED, 2)
pycram.orm.task.TaskTreeNode(6, 6, 2023-04-28 09:50:37.079393, 2023-04-28 09:50:37.084248, SUCCEEDED, 2)
pycram.orm.task.TaskTreeNode(7, 7, 2023-04-28 09:50:37.084292, 2023-04-28 09:50:37.084869, SUCCEEDED, 2)
pycram.orm.task.TaskTreeNode(8, 8, 2023-04-28 09:50:37.305035, 2023-04-28 09:50:37.306429, SUCCEEDED, 2)
pycram.orm.task.TaskTreeNode(9, 9, 2023-04-28 09:50:37.306498, 2023-04-28 09:50:37.311035, SUCCEEDED, 2)
pycram.orm.task.TaskTreeNode(10, 10, 2023-04-28 09:50:37.311078, 2023-04-28 09:50:37.311482, SUCCEEDED, 2)
pycram.orm.task.TaskTreeNode(11, 11, 2023-04-28 09:50

As expected all but the root node succeeded, since the root node is still running.

Writing an extension to the ORM package is also done with ease. We need to create a class that can be inserted and its ORM equivalent, write a to_sql() and insert() method and then insert it somewhere.

In [10]:
import pycram.designators.action_designator


# define ORM class from pattern in every pycram.orm class
class ORMSaying(pycram.orm.action_designator.Action):
    __tablename__ = "Saying"
    id = sqlalchemy.Column(sqlalchemy.types.Integer, sqlalchemy.ForeignKey("Action.id"), primary_key=True)
    text = sqlalchemy.Column(sqlalchemy.types.String(255))

    __mapper_args__ = {
        "polymorphic_identity": __tablename__,
        "polymorphic_on": "dtype",
    }

    def __init__(self, text: str):
        super().__init__()
        self.text = text


# define brand new action designator
class Saying(pycram.designators.action_designator.ActionDesignatorDescription):

    def __init__(self, text):
        super().__init__()
        self.text = text

    def to_sql(self):
        return ORMSaying(self.text)

    def insert(self, session):
        action = self.to_sql()
        session.add(action)
        session.commit()
        return action


So we now got our new ActionDesignator called Saying and its ORM version. Since this class got created after all other classes got inserted into the database (in the beginning of the notebook) we have to insert it manually.

In [11]:
ORMSaying.metadata.create_all(bind=engine)

Now we can create and insert a Saying action.

In [12]:
# create a saying action and insert it
foo = Saying("Patchie, Patchie; Where is my Patchie?")
orm_foo = foo.insert(session)
orm_foo

__main__.ORMSaying(10, Saying, Patchie, Patchie; Where is my Patchie?)

It is notable that committing the object to the session fills its primary key. Hence, there is no worries about assigning unique IDs manually.
Finally, we can double-check that our object exists in the database.

In [13]:
session.query(ORMSaying).all()

[__main__.ORMSaying(10, Saying, Patchie, Patchie; Where is my Patchie?)]