Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introspecting running state machines #893

Closed
sillkjc opened this issue May 3, 2024 · 4 comments
Closed

Introspecting running state machines #893

sillkjc opened this issue May 3, 2024 · 4 comments

Comments

@sillkjc
Copy link

sillkjc commented May 3, 2024

Hi all,
We're currently launching our RAFCON state machines via the API in a ROS2 launch file, see:
#883 (comment)

We'd like to be able to introspect state machine progress from within a ROS2 test. This will let us see what states have completed, as well as their output data. Additionally, we will need to see if the state machine has reached a state or not. In this way, we can block test scripts while waiting for the SM to complete, then assert on the return output of the states.

What would the best way to do this be? Presumably the SM has some sort of introspection API since the GUI itself can see the states status.
Many thanks!

@sillkjc
Copy link
Author

sillkjc commented May 3, 2024

Digging through the RAFCON tests I have found this:
https://github.com/DLR-RM/RAFCON/blob/ea89dbe5f9621e7a2cf406410fd02a5f8ff71e4b/tests/core/test_run_to_selected_state.py#L29C18-L29C35
I think this partially answers the question but there still exists the issue of how to get access to the state machine object from within the test:

  sm = storage.load_state_machine_from_path(sm_path)

We might be able to pass it through as an argument on the test function alongside the context:

def launch_setup(context: LaunchContext, *args, **kwargs):
...
    return [
        launch_descriptions,
        nodes,
    ]

def generate_test_description():
    declared_arguments = [
        DeclareLaunchArgument(name='sim',
                              description='Execute in simulation mode using Gazebo.',
                              default_value='true', ),
    ]
    return (LaunchDescription(
        declared_arguments +
        [OpaqueFunction(function=launch_setup)] +
        [launch_testing.actions.ReadyToTest()]),
        {
        'sim': LaunchConfiguration('sim'),
        'sm': rafcon.core.state_machine.StateMachine,
    })

    def test_run_sm_task(self, launch_service: LaunchService, sim: LaunchConfiguration, sm rafcon.core.state_machine.StateMachine)

Not sure if this is making things much harder than they need to be.

Thanks

@JohannesErnst
Copy link
Collaborator

Hey @sillkjc, thanks for posting your issue!

I think what you are looking for is the execution history. In the RAFCON GUI, this history is used during runtime (or afterwards) to display all the state outputs, progress and data exchange. For this, the global config variable IN_MEMORY_EXECUTION_HISTORY_ENABLE: true must be enabled. However, this should be the standard setting.

Following up on the link to the test script test_run_to_selected_state.py you posted above, I created a simple example how to access these history items during runtime (for demonstration I just used the DEBUG CONSOLE here and set a break point):

delete

I just selected one of the return items of the execution history (_history_items[3]) to show that you can grab the actual logic output and data output here.
As a disclaimer: I didn't dig into detail on how to best grab the input/output values here, maybe they are actually more accessible somewhere in this execution_histories object. Nonetheless, this should work for your requested use-case.

Let me know if this is of help to you!

@sillkjc
Copy link
Author

sillkjc commented May 6, 2024

Thanks Johannes, that is what I need!

I am still a bit stumped on how to gain access to the singleton object from the context of the ROS test. Since I am using multiprocessing to kick off the state machine in the launch (so the launch does not get blocked) the ROS test and the state machine do not share memory space (and hence the singletons are empty). I am exploring if this can be done somehow with threads instead of processes, or by passing the state_machine object into the test somehow as mentioned above.

Update:
I've made a bit of progress by using threading.Thread not multiprocessing.Process. This exposes the singleton objects into the launch context threads memory. By making the thread a global (so it doesn't die once the launch function ends) I am able to get access everything. It's very ugly, but works for now.

@JohannesErnst
Copy link
Collaborator

That's great to hear!

As mentioned in the other issue right now I am not experienced with the threading/multiprocessing approach for RAFCON. But good to know that at least this setup works for now.

I will close the issue as the multiprocessing is some different problem. But ofc you are very welcome to still post updates here or re-open if there are still problems with execution history.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants