Skip to content

Commit

Permalink
feat(MultiThreadedExecutor): Added ability to handle exceptions from …
Browse files Browse the repository at this point in the history
…threads

This commit adds external exception handling for the worker threads,
allowing application code to implement custom exception handling.

Signed-off-by: Janosch Machowinski <J.Machowinski@cellumation.com>
  • Loading branch information
Janosch Machowinski authored and Janosch Machowinski committed Apr 2, 2024
1 parent 7495ea6 commit bcc1169
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 20 deletions.
14 changes: 13 additions & 1 deletion rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,26 @@ class MultiThreadedExecutor : public rclcpp::Executor
void
spin() override;

/**
* \sa rclcpp::Executor:spin() for more details
* \throws std::runtime_error when spin() called while already spinning
* @param exception_handler will be called for every exception in the processing threads
*
* The exception_handler can be called from multiple threads at the same time.
* The exception_handler shall rethrow the exception it if wants to terminate the program.
*/
RCLCPP_PUBLIC
void
spin(std::function<void(const std::exception & e)> exception_handler);

RCLCPP_PUBLIC
size_t
get_number_of_threads();

protected:
RCLCPP_PUBLIC
void
run(size_t this_thread_number);
run(size_t this_thread_number, std::function<void(const std::exception & e)> exception_handler);

private:
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)
Expand Down
57 changes: 38 additions & 19 deletions rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ MultiThreadedExecutor::~MultiThreadedExecutor() {}

void
MultiThreadedExecutor::spin()
{
spin([](const std::exception & e) {throw e;});
}

void

MultiThreadedExecutor::spin(std::function<void(const std::exception & e)> exception_handler)
{
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
Expand All @@ -61,12 +68,12 @@ MultiThreadedExecutor::spin()
{
std::lock_guard wait_lock{wait_mutex_};
for (; thread_id < number_of_threads_ - 1; ++thread_id) {
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id, exception_handler);
threads.emplace_back(func);
}
}

run(thread_id);
run(thread_id, exception_handler);
for (auto & thread : threads) {
thread.join();
}
Expand All @@ -79,28 +86,40 @@ MultiThreadedExecutor::get_number_of_threads()
}

void
MultiThreadedExecutor::run(size_t this_thread_number)
MultiThreadedExecutor::run(
size_t this_thread_number,
std::function<void(const std::exception & e)> exception_handler)
{
(void)this_thread_number;
while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_exec;
{
std::lock_guard wait_lock{wait_mutex_};
if (!rclcpp::ok(this->context_) || !spinning.load()) {
return;
try {

(void)this_thread_number;
while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_exec;
{
std::lock_guard wait_lock{wait_mutex_};
if (!rclcpp::ok(this->context_) || !spinning.load()) {
return;
}
if (!get_next_executable(any_exec, next_exec_timeout_)) {
continue;
}
}
if (!get_next_executable(any_exec, next_exec_timeout_)) {
continue;
if (yield_before_execute_) {
std::this_thread::yield();
}
}
if (yield_before_execute_) {
std::this_thread::yield();

execute_any_executable(any_exec);

// Clear the callback_group to prevent the AnyExecutable destructor from
// resetting the callback group `can_be_taken_from`
any_exec.callback_group.reset();
}

execute_any_executable(any_exec);
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rclcpp"),
"Exception while spinning : " << e.what());

// Clear the callback_group to prevent the AnyExecutable destructor from
// resetting the callback group `can_be_taken_from`
any_exec.callback_group.reset();
exception_handler(e);
}
}
44 changes: 44 additions & 0 deletions rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,48 @@ TEST_F(TestMultiThreadedExecutor, timer_over_take) {
auto timer = node->create_wall_timer(PERIOD_MS, timer_callback, cbg);
executor.add_node(node);
executor.spin();

}

TEST_F(TestMultiThreadedExecutor, catch_exception) {

rclcpp::executors::MultiThreadedExecutor executor;

std::shared_ptr<rclcpp::Node> node =
std::make_shared<rclcpp::Node>("test_multi_threaded_executor_catch_exception");

const std::string test_reason = "test exception";

std::atomic_bool timer_executed_after_exception = false;

auto timer = node->create_wall_timer(
std::chrono::milliseconds(1), [test_reason, &timer_executed_after_exception, &executor]()
{
static size_t cnt = 0;
if (cnt == 0) {
cnt++;
throw std::runtime_error(test_reason);
}

timer_executed_after_exception = true;
executor.cancel();
});

std::atomic_bool caught_exception = false;

executor.add_node(node);
executor.spin(
[&caught_exception, &test_reason](const std::exception & e)
{
const std::runtime_error * runtime_error = dynamic_cast<const std::runtime_error *>(&e);
ASSERT_NE(runtime_error, nullptr);

ASSERT_EQ(runtime_error->what(), test_reason);

caught_exception = true;
}
);

ASSERT_TRUE(caught_exception);
ASSERT_TRUE(timer_executed_after_exception);
}

0 comments on commit bcc1169

Please sign in to comment.