Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions src/loggers/groot2_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,15 @@ std::chrono::milliseconds Groot2Publisher::maxHeartbeatDelay() const

Groot2Publisher::~Groot2Publisher()
{
removeAllHooks();

// First, signal threads to stop
_p->active_server = false;

// Shutdown the ZMQ context to unblock any recv() calls immediately.
// This prevents waiting for the recv timeout (100ms) before threads can exit.
// Context shutdown will cause all blocking operations to return with ETERM error.
_p->context.shutdown();

// Now join the threads - they should exit quickly
if(_p->server_thread.joinable())
{
_p->server_thread.join();
Expand All @@ -196,6 +202,9 @@ Groot2Publisher::~Groot2Publisher()
_p->heartbeat_thread.join();
}

// Remove hooks after threads are stopped to avoid race conditions
removeAllHooks();

flush();

{
Expand Down Expand Up @@ -260,9 +269,17 @@ void Groot2Publisher::serverLoop()
while(_p->active_server)
{
zmq::multipart_t requestMsg;
if(!requestMsg.recv(socket) || requestMsg.size() == 0)
try
{
continue;
if(!requestMsg.recv(socket) || requestMsg.size() == 0)
{
continue;
}
}
catch(const zmq::error_t&)
{
// Context was terminated or socket error - exit the loop
break;
}
Comment on lines +272 to 283
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

ZMQ zmq::multipart_t::send throw exception ETERM shutdown pattern

💡 Result:

zmq::multipart_t::send() ultimately calls zmq_send() on the underlying socket. If your process is shutting down and the context is shut down / terminated while another thread is sending, libzmq makes (almost) all further socket operations fail with ETERM, which cppzmq surfaces as zmq::error_t. This is expected behavior: context shutdown/termination forces any blocking (and many non-blocking) socket ops to return immediately with ETERM. [1], [2]

Recommended shutdown pattern (avoid “spurious” exceptions, exit cleanly)

  1. Signal your worker threads to stop producing/sending (atomic flag, cancellation token, etc.).
  2. Interrupt any threads blocked in recv() / send() by calling ctx.shutdown() (maps to zmq_ctx_shutdown()), which causes blocked operations to return with ETERM. [1]
  3. In every send/recv loop, catch zmq::error_t and treat ETERM as a normal exit condition.
  4. Close sockets (often set linger = 0 first so close/term doesn’t hang waiting for pending sends). [3]
  5. Finally terminate the context (ctx.close() / zmq_ctx_term()), after sockets are closed. [2]

Minimal C++ example (cppzmq)

std::atomic_bool stopping{false};

void worker(zmq::context_t& ctx, zmq::socket_t& s) {
  s.set(zmq::sockopt::linger, 0); // optional but common for fast shutdown [3]

  while (!stopping.load(std::memory_order_relaxed)) {
    try {
      zmq::multipart_t mp;
      // ... fill mp ...
      mp.send(s);                 // may throw zmq::error_t(ETERM) during shutdown
    } catch (const zmq::error_t& e) {
      if (e.num() == ETERM) break; // context shutdown/term in progress -> exit loop [1][2]
      throw;
    }
  }

  s.close();
}

// elsewhere (shutdown path):
stopping = true;
ctx.shutdown();  // interrupts blocked ops with ETERM [1]
// join worker threads
ctx.close();     // terminates context; requires sockets closed; may block per linger [2][3]

Key point: if you call ctx.shutdown() / ctx.close() before (or concurrently with) stopping your senders, you should expect multipart_t::send() to throw ETERM—handle it as a normal shutdown condition rather than an error. [1], [4]

Sources:
[1] zmq_ctx_shutdown(3) behavior and ETERM semantics https://libzmq.readthedocs.io/en/latest/zmq_ctx_shutdown.html
[2] zmq_ctx_term(3) termination steps and blocking/linger considerations https://libzmq.readthedocs.io/en/latest/zmq_ctx_term.html
[3] ZMQ_LINGER option and effect on close/termination https://api.zeromq.org/4-2:zmq-setsockopt
[4] Catching ETERM during shutdown (discussion/example) https://stackoverflow.com/questions/8022391/zeromq-socket-recv-throws-context-was-terminated-exception-why-and-how-to


🏁 Script executed:

wc -l src/loggers/groot2_publisher.cpp

Repository: BehaviorTree/BehaviorTree.CPP

Length of output: 109


🏁 Script executed:

sed -n '260,295p' src/loggers/groot2_publisher.cpp

Repository: BehaviorTree/BehaviorTree.CPP

Length of output: 965


🏁 Script executed:

sed -n '500,530p' src/loggers/groot2_publisher.cpp

Repository: BehaviorTree/BehaviorTree.CPP

Length of output: 792


🏁 Script executed:

sed -n '250,270p' src/loggers/groot2_publisher.cpp

Repository: BehaviorTree/BehaviorTree.CPP

Length of output: 566


🏁 Script executed:

sed -n '240,265p' src/loggers/groot2_publisher.cpp

Repository: BehaviorTree/BehaviorTree.CPP

Length of output: 588


🏁 Script executed:

sed -n '255,265p' src/loggers/groot2_publisher.cpp

Repository: BehaviorTree/BehaviorTree.CPP

Length of output: 353


Guard sendErrorReply against ZMQ shutdown exceptions.

The recv() and reply_msg.send() calls in the main loop (lines 272–283 and 510–518) are wrapped in try/catch for zmq::error_t, but the sendErrorReply lambda (lines 256–262) sends without exception handling. During context shutdown, zmq::multipart_t::send() throws zmq::error_t (with ETERM), which will terminate the thread if uncaught. Wrap the error reply send similarly to maintain consistent shutdown behavior:

Suggested fix
  auto sendErrorReply = [&socket](const std::string& msg) {
    zmq::multipart_t error_msg;
    error_msg.addstr("error");
    error_msg.addstr(msg);
+   try
+   {
      error_msg.send(socket);
+   }
+   catch(const zmq::error_t&)
+   {
+     // Ignore errors during shutdown (e.g., ETERM)
+   }
  };

Also applies to: 510–518

🤖 Prompt for AI Agents
In `@src/loggers/groot2_publisher.cpp` around lines 272 - 283, The sendErrorReply
lambda currently calls reply_msg.send() without catching zmq::error_t, so during
ZMQ context shutdown an exception (ETERM) can escape and kill the thread; update
sendErrorReply to wrap the send call in a try/catch for zmq::error_t and
silently handle/return on ETERM (or log and return) to mirror the
recv()/reply_msg.send() handling in the main loop; apply the same pattern to the
similar send path around the code referenced at lines 510–518 so all
zmq::multipart_t::send() calls are guarded against ZMQ shutdown exceptions.


// this heartbeat will help establishing if Groot is connected or not
Expand Down Expand Up @@ -490,7 +507,15 @@ void Groot2Publisher::serverLoop()
}
}
// send the reply
reply_msg.send(socket);
try
{
reply_msg.send(socket);
}
catch(const zmq::error_t&)
{
// Context was terminated or socket error - exit the loop
break;
}
}
}

Expand Down
58 changes: 58 additions & 0 deletions tests/gtest_groot2_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ static const char* xml_text = R"(
</root>
)";

static const char* xml_text_sequence = R"(
<root BTCPP_format="4">
<BehaviorTree ID="MainTree">
<Sequence>
<AlwaysSuccess/>
<ThrowRuntimeError/>
</Sequence>
</BehaviorTree>
</root>
)";

void throwRuntimeError()
{
BT::BehaviorTreeFactory factory;
Expand Down Expand Up @@ -46,3 +57,50 @@ TEST(Groot2PublisherTest, EnsureNoInfiniteLoopOnThrow)
},
::testing::ExitedWithCode(EXIT_SUCCESS), ".*");
}

// Test that destructor completes quickly even after exception
// This test runs multiple times to catch race conditions
TEST(Groot2PublisherTest, DestructorCompletesAfterException)
{
for(int i = 0; i < 5; i++)
{
BT::BehaviorTreeFactory factory;
factory.registerSimpleAction("ThrowRuntimeError",
[](BT::TreeNode&) -> BT::NodeStatus {
throw BT::RuntimeError("Test exception");
});

auto tree = factory.createTreeFromText(xml_text);
BT::Groot2Publisher publisher(tree, 1667 + i * 2);
EXPECT_THROW(tree.tickExactlyOnce(), BT::RuntimeError);
}
}

// Test that destructor completes quickly when tree has multiple nodes
TEST(Groot2PublisherTest, DestructorCompletesWithMultipleNodes)
{
BT::BehaviorTreeFactory factory;
factory.registerSimpleAction("ThrowRuntimeError", [](BT::TreeNode&) -> BT::NodeStatus {
throw BT::RuntimeError("Test exception in sequence");
});

auto tree = factory.createTreeFromText(xml_text_sequence);
BT::Groot2Publisher publisher(tree, 1677);
EXPECT_THROW(tree.tickExactlyOnce(), BT::RuntimeError);
}

// Test rapid creation and destruction of publishers
TEST(Groot2PublisherTest, RapidCreateDestroy)
{
for(int i = 0; i < 3; i++)
{
BT::BehaviorTreeFactory factory;
factory.registerSimpleAction(
"ThrowRuntimeError",
[](BT::TreeNode&) -> BT::NodeStatus { throw BT::RuntimeError("Rapid test"); });

auto tree = factory.createTreeFromText(xml_text);
BT::Groot2Publisher publisher(tree, 1687 + i * 2);
EXPECT_THROW(tree.tickExactlyOnce(), BT::RuntimeError);
}
}
Loading