From eb069ab75d83fdf9a2d686804730eeb2c55cfa96 Mon Sep 17 00:00:00 2001 From: Novo Date: Tue, 23 Sep 2025 12:40:04 +0200 Subject: [PATCH 1/2] Fix crash on simultaneous IPC calls using the same thread This error occurs when non-libmultiprocess clients make simultaenous IPC calls to the server using the same Server thread. The libmultiprocess Cpp client ensures a 1-to-1 mapping of client-to-server threads, so simultaneous calls using the same thread cannot be made. --- include/mp/proxy-io.h | 5 +++-- include/mp/type-context.h | 7 ++++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index a0bdf137..9be80625 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -281,12 +281,13 @@ struct Waiter Waiter() = default; template - void post(Fn&& fn) + bool post(Fn&& fn) { const Lock lock(m_mutex); - assert(!m_fn); + if (m_fn) return false; m_fn = std::forward(fn); m_cv.notify_all(); + return true; } template diff --git a/include/mp/type-context.h b/include/mp/type-context.h index 078e1ced..9bd939e4 100644 --- a/include/mp/type-context.h +++ b/include/mp/type-context.h @@ -152,7 +152,12 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& const auto& thread = static_cast&>(*thread_server); server.m_context.loop->log() << "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}"; - thread.m_thread_context.waiter->post(std::move(invoke)); + if (!thread.m_thread_context.waiter->post(std::move(invoke))) { + server.m_context.loop->log() + << "IPC server error request #" << req + << " {" << thread.m_thread_context.thread_name << "}" << ", thread busy"; + throw std::runtime_error("thread busy"); + } } else { server.m_context.loop->log() << "IPC server error request #" << req << ", missing thread to execute request"; From 1238170f68e8fa7ae41c79465df5cdae34d568e9 Mon Sep 17 00:00:00 2001 From: Novo Date: Fri, 26 Sep 2025 14:39:21 +0200 Subject: [PATCH 2/2] test: simultaneous IPC calls using same thread Although the libmultiprocess client prevents this kind of situation from occuring, it easily occurs in non-libmultiprocess clients --- test/mp/test/test.cpp | 73 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/test/mp/test/test.cpp b/test/mp/test/test.cpp index 225d00d5..ee030abd 100644 --- a/test/mp/test/test.cpp +++ b/test/mp/test/test.cpp @@ -5,9 +5,12 @@ #include #include +#include #include #include +#include #include +#include #include #include #include @@ -15,11 +18,15 @@ #include #include #include +#include #include +#include #include #include #include +#include "mp/proxy.capnp.h" #include +#include "mp/util.h" #include #include #include @@ -297,5 +304,71 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call") signal.set_value(); } +KJ_TEST("Make simultaneous IPC callbacks with same request_thread and callback_thread") +{ + TestSetup setup; + ProxyClient* foo = setup.client.get(); + std::promise signal; + + foo->initThreadMap(); + // Use callFnAsync() to get the client to setup the request_thread + // that will be used for the test. + setup.server->m_impl->m_fn = [&] {}; + foo->callFnAsync(); + ThreadContext& tc{g_thread_context}; + std::optional callback_thread, request_thread; + { + Lock lock(tc.waiter->m_mutex); + callback_thread = tc.callback_threads.at(foo->m_context.connection)->m_client; + request_thread = tc.request_threads.at(foo->m_context.connection)->m_client; + } + + setup.server->m_impl->m_fn = [&] { + try + { + signal.get_future().get(); + } + catch(const std::exception& e) + { + KJ_EXPECT(e.what() == std::string("Future already retrieved")); + } + }; + + auto client{foo->m_client}; + bool caught_thread_busy = false; + // NOTE: '3' was choosen because it was the lowest number + // of simultaneous calls required to reliably catch a "thread busy" error + std::atomic running{3}; + foo->m_context.loop->sync([&] + { + for (size_t i = 0; i < running; i++) + { + auto request{client.callFnAsyncRequest()}; + auto context{request.initContext()}; + context.setCallbackThread(*callback_thread); + context.setThread(*request_thread); + foo->m_context.loop->m_task_set->add(request.send().then( + [&](auto&& results) { + running -= 1; + tc.waiter->m_cv.notify_all(); + }, + [&](kj::Exception&& e) { + KJ_EXPECT(std::string_view{e.getDescription().cStr()} == + "remote exception: std::exception: thread busy"); + caught_thread_busy = true; + running -= 1; + signal.set_value(); + tc.waiter->m_cv.notify_all(); + } + )); + } + }); + { + Lock lock(tc.waiter->m_mutex); + tc.waiter->wait(lock, [&running] { return running == 0; }); + } + KJ_EXPECT(caught_thread_busy); +} + } // namespace test } // namespace mp