Skip to content

How to implement new monitors

Simon edited this page Jan 18, 2024 · 2 revisions

How to Implement a new Monitor

Before you begin, you have to either create your own python package and define it in the Giskard config, or you have to create a new file in giskardpy/src/giskardpy/monitors.

There are two major kinds of monitors:

  • Expression monitors
  • Payload monitors

Expression monitors are more difficult to implement, but are turned into C++ code and compiled. This makes the evaluation time insignificant compared to payload monitors.
However, this limits what expression monitors can do. They are simple functions, that take the state of Giskard as input and return True or False.

Payload monitors, on the other hand, are able to execute any python code. This means they can have side effects. Expensive monitors can also be executed in a separate thread, as to not block the control loop while the monitors is executed.

Implementation - Payload Monitor

A minimal implementation for a payload monitor looks like this:

from typing import Optional

import numpy as np

from giskardpy import casadi_wrapper as cas
from giskardpy.monitors.payload_monitors import PayloadMonitor


class MyPayloadMonitor(PayloadMonitor):
    def __init__(self, *,
                 message: str,
                 name: Optional[str] = None,
                 start_condition: cas.Expression = cas.TrueSymbol):
        super().__init__(run_call_in_thread=False, name=name, stay_true=False,
                         start_condition=start_condition)
        self.message = message

    def __call__(self):
        self.state = np.random.randint(2)
        print(f'{self.message} - {self.state}')

You create a new class, inherit from PayloadMonitor and implement __init__ and __call__. You can add any parameters to the __init__ you like, as long as they are of a primitive python type or a ROS message. In this example, I've added an additional message parameter.
The __call__ function is executed, once the start_condition of the monitor becomes True for the first time. If run_call_in_thread is True, then a separate thread is started for it. In this function you can add any code. To change the state of your Monitor, you update the self.state parameters as you wish.

Implementation - Expression Monitor

The general idea for expression monitors is the same, but the key differences are that you inherit from ExpressionMonitor and that you don't implement a __call__ function. Instead, you define a function using the CasADi library as a symbolic expression, which evaluates to either 0 or 1. Check the symbolic expression tutorial for more infos

from typing import Optional

from giskardpy import casadi_wrapper as cas
from giskardpy.monitors.monitors import ExpressionMonitor
from giskardpy.symbol_manager import symbol_manager


class MyExpressionMonitor(ExpressionMonitor):
    def __init__(self, *,
                 sleep_time: float,
                 name: Optional[str] = None,
                 start_condition: cas.Expression = cas.TrueSymbol):
        super().__init__(name=name, stay_true=False, start_condition=start_condition, plot=True)
        self.expression = cas.if_greater(symbol_manager.time, sleep_time)

Usage of Custom Monitors

Here is a minimal example for how to call your new payload monitor. Generally, all monitors can be called with giskard.monitors.add_monitor() and all other add_* functions are only wrappers for this one. This function one required parameter: monitor_class. It is the name of the class of your monitor. You could use either 'MyPayloadMonitor' or MyPayloadMonitor.__name__, but the latter might be preferable because it allows for easier refactoring with an IDE. All other parameter for add_monitor have to be provided as keyword arguments and are passed onto the __init__ of your custom monitor.

import rospy

from giskardpy.python_interface.python_interface import GiskardWrapper
from my_giskard_config.my_monitors import MyPayloadMonitor

joint_goal1 = {'torso_lift_joint': 0.3}

rospy.init_node('custom_monitor', anonymous=True)
giskard = GiskardWrapper()

my_monitor = giskard.monitors.add_monitor(monitor_class=MyPayloadMonitor.__name__,
                                          message='muh')
giskard.motion_goals.add_joint_position(goal_state=joint_goal1)
giskard.monitors.add_end_motion(start_condition=my_monitor)
giskard.execute()