From 5108445e5d16eb34224b802771863c4f184ca899 Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Thu, 24 Apr 2025 09:37:13 -0400 Subject: [PATCH 01/13] test: Add test coverage for client & server disconnections --- test/mp/test/test.cpp | 138 +++++++++++++++++++++++++++++++++--------- 1 file changed, 108 insertions(+), 30 deletions(-) diff --git a/test/mp/test/test.cpp b/test/mp/test/test.cpp index 807b8766..ee38b6f5 100644 --- a/test/mp/test/test.cpp +++ b/test/mp/test/test.cpp @@ -23,32 +23,82 @@ namespace mp { namespace test { +/** + * Test setup class creating a two way connection between a + * ProxyServer object and a ProxyClient. + * + * Provides client_disconnect and server_disconnect lambdas that can be used to + * trigger disconnects and test handling of broken and closed connections. + * + * Accepts a client_owns_connection option to test different ProxyClient + * destroy_connection values and control whether destroying the ProxyClient + * object destroys the client Connection object. Normally it makes sense for + * this to be true to simplify shutdown and avoid needing to call + * client_disconnect manually, but false allows testing more ProxyClient + * behavior and the "clientInvoke call made after disconnect" code path. + */ +class TestSetup +{ +public: + std::thread thread; + std::function server_disconnect; + std::function client_disconnect; + std::promise>> client_promise; + std::unique_ptr> client; + + TestSetup(bool client_owns_connection = true) + : thread{[&] { + EventLoop loop("mptest", [](bool raise, const std::string& log) { + std::cout << "LOG" << raise << ": " << log << "\n"; + if (raise) throw std::runtime_error(log); + }); + auto pipe = loop.m_io_context.provider->newTwoWayPipe(); + + auto server_connection = + std::make_unique(loop, kj::mv(pipe.ends[0]), [&](Connection& connection) { + auto server_proxy = kj::heap>( + std::make_shared(), connection); + return capnp::Capability::Client(kj::mv(server_proxy)); + }); + server_disconnect = [&] { loop.sync([&] { server_connection.reset(); }); }; + // Set handler to destroy the server when the client disconnects. This + // is ignored if server_disconnect() is called instead. + server_connection->onDisconnect([&] { server_connection.reset(); }); + + auto client_connection = std::make_unique(loop, kj::mv(pipe.ends[1])); + auto client_proxy = std::make_unique>( + client_connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs(), + client_connection.get(), /* destroy_connection= */ client_owns_connection); + if (client_owns_connection) { + client_connection.release(); + } else { + client_disconnect = [&] { loop.sync([&] { client_connection.reset(); }); }; + } + + client_promise.set_value(std::move(client_proxy)); + loop.loop(); + }} + { + client = client_promise.get_future().get(); + } + + ~TestSetup() + { + // Test that client cleanup_fns are executed. + bool destroyed = false; + client->m_context.cleanup_fns.emplace_front([&destroyed] { destroyed = true; }); + client.reset(); + KJ_EXPECT(destroyed); + + thread.join(); + } +}; + KJ_TEST("Call FooInterface methods") { - std::promise>> foo_promise; - std::function disconnect_client; - std::thread thread([&]() { - EventLoop loop("mptest", [](bool raise, const std::string& log) { - std::cout << "LOG" << raise << ": " << log << "\n"; - }); - auto pipe = loop.m_io_context.provider->newTwoWayPipe(); - - auto connection_client = std::make_unique(loop, kj::mv(pipe.ends[0])); - auto foo_client = std::make_unique>( - connection_client->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs(), - connection_client.get(), /* destroy_connection= */ false); - foo_promise.set_value(std::move(foo_client)); - disconnect_client = [&] { loop.sync([&] { connection_client.reset(); }); }; - - auto connection_server = std::make_unique(loop, kj::mv(pipe.ends[1]), [&](Connection& connection) { - auto foo_server = kj::heap>(std::make_shared(), connection); - return capnp::Capability::Client(kj::mv(foo_server)); - }); - connection_server->onDisconnect([&] { connection_server.reset(); }); - loop.loop(); - }); - - auto foo = foo_promise.get_future().get(); + TestSetup setup; + ProxyClient* foo = setup.client.get(); + KJ_EXPECT(foo->add(1, 2) == 3); FooStruct in; @@ -129,14 +179,42 @@ KJ_TEST("Call FooInterface methods") KJ_EXPECT(mut.message == "init build pass call return read"); KJ_EXPECT(foo->passFn([]{ return 10; }) == 10); +} - disconnect_client(); - thread.join(); +KJ_TEST("Call IPC method after client connection is closed") +{ + TestSetup setup{/*client_owns_connection=*/false}; + ProxyClient* foo = setup.client.get(); + KJ_EXPECT(foo->add(1, 2) == 3); + setup.client_disconnect(); + + bool disconnected{false}; + try { + foo->add(1, 2); + } catch (const std::logic_error& e) { + KJ_EXPECT(std::string_view{e.what()} == "clientInvoke call made after disconnect"); + disconnected = true; + } + KJ_EXPECT(disconnected); +} - bool destroyed = false; - foo->m_context.cleanup_fns.emplace_front([&destroyed]{ destroyed = true; }); - foo.reset(); - KJ_EXPECT(destroyed); +KJ_TEST("Calling IPC method after server connection is closed") +{ + TestSetup setup; + ProxyClient* foo = setup.client.get(); + KJ_EXPECT(foo->add(1, 2) == 3); + setup.server_disconnect(); + + bool disconnected{false}; + try { + foo->add(1, 2); + } catch (const std::runtime_error& e) { + std::string_view error{e.what()}; + KJ_EXPECT(error.starts_with("kj::Exception: ")); + KJ_EXPECT(error.find("disconnected: Peer disconnected.") != std::string_view::npos); + disconnected = true; + } + KJ_EXPECT(disconnected); } } // namespace test From f58c8d8ba2f0c73f6c6ec5b392eef2ff7cc053f4 Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Sun, 9 Feb 2025 14:14:34 -0500 Subject: [PATCH 02/13] proxy-io.h: Add more detailed EventLoop comment --- include/mp/proxy-io.h | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 566e36b8..e11a6fd6 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -129,6 +129,28 @@ std::string LongThreadName(const char* exe_name); //! Event loop implementation. //! +//! Cap'n Proto threading model is very simple: all I/O operations are +//! asynchronous and must be performed on a single thread. This includes: +//! +//! - Code starting an asynchronous operation (calling a function that returns a +//! promise object) +//! - Code notifying that an asynchronous operation is complete (code using a +//! fulfiller object) +//! - Code handling a completed operation (code chaining or waiting for a promise) +//! +//! All of this code needs to access shared state, and there is no mutex that +//! can be acquired to lock this state because Cap'n Proto +//! assumes it will only be accessed from one thread. So all this code needs to +//! actually run on one thread, and the EventLoop::loop() method is the entry point for +//! this thread. ProxyClient and ProxyServer objects that use other threads and +//! need to perform I/O operations post to this thread using EventLoop::post() +//! and EventLoop::sync() methods. +//! +//! Specifically, because ProxyClient methods can be called from arbitrary +//! threads, and ProxyServer methods can run on arbitrary threads, ProxyClient +//! methods use the EventLoop thread to send requests, and ProxyServer methods +//! use the thread to return results. +//! //! Based on https://groups.google.com/d/msg/capnproto/TuQFF1eH2-M/g81sHaTAAQAJ class EventLoop { From 9aaeec3678d3b5dad61d2142c37bac18aa6257e0 Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Sun, 9 Feb 2025 14:14:34 -0500 Subject: [PATCH 03/13] proxy-io.h: Add EventLoopRef RAII class handle addClient/removeClient refcounting --- include/mp/proxy-io.h | 3 ++- include/mp/proxy.h | 21 +++++++++++++++++++++ include/mp/util.h | 15 +++++++++++++++ src/mp/proxy.cpp | 17 +++++++++++++++++ 4 files changed, 55 insertions(+), 1 deletion(-) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index e11a6fd6..74ecdc85 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -190,7 +190,8 @@ class EventLoop //! other IPC calls. void startAsyncThread(std::unique_lock& lock); - //! Add/remove remote client reference counts. + //! Add/remove remote client reference counts. Can use EventLoopRef + //! below to avoid calling these directly. void addClient(std::unique_lock& lock); bool removeClient(std::unique_lock& lock); //! Check if loop should exit. diff --git a/include/mp/proxy.h b/include/mp/proxy.h index d315fa14..0e894e5d 100644 --- a/include/mp/proxy.h +++ b/include/mp/proxy.h @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -47,6 +48,26 @@ inline void CleanupRun(CleanupList& fns) { } } +//! Event loop smart pointer automatically calling addClient and removeClient. +//! If a lock pointer argument is passed, the specified lock will be used, +//! otherwise EventLoop::m_mutex will be locked when needed. +class EventLoopRef +{ +public: + explicit EventLoopRef(EventLoop& loop, std::unique_lock* lock = nullptr); + EventLoopRef(EventLoopRef&& other) noexcept : m_loop(other.m_loop) { other.m_loop = nullptr; } + EventLoopRef(const EventLoopRef&) = delete; + EventLoopRef& operator=(const EventLoopRef&) = delete; + EventLoopRef& operator=(EventLoopRef&&) = delete; + ~EventLoopRef() { reset(); } + EventLoop& operator*() const { assert(m_loop); return *m_loop; } + EventLoop* operator->() const { assert(m_loop); return m_loop; } + bool reset(); + + EventLoop* m_loop{nullptr}; + std::unique_lock* m_lock{nullptr}; +}; + //! Context data associated with proxy client and server classes. struct ProxyContext { diff --git a/include/mp/util.h b/include/mp/util.h index 45ce0aa7..974093b7 100644 --- a/include/mp/util.h +++ b/include/mp/util.h @@ -18,6 +18,7 @@ #include #include #include +#include #include namespace mp { @@ -130,6 +131,20 @@ const char* TypeName() return short_name ? short_name + 1 : display_name; } +//! Convenient wrapper around std::variant +template +struct PtrOrValue { + std::variant data; + + template + PtrOrValue(T* ptr, Args&&... args) : data(ptr ? ptr : std::variant{std::in_place_type, std::forward(args)...}) {} + + T& operator*() { return data.index() ? std::get(data) : *std::get(data); } + T* operator->() { return &**this; } + T& operator*() const { return data.index() ? std::get(data) : *std::get(data); } + T* operator->() const { return &**this; } +}; + //! Analog to std::lock_guard that unlocks instead of locks. template struct UnlockGuard diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index 091985db..00252580 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -48,6 +48,23 @@ void LoggingErrorHandler::taskFailed(kj::Exception&& exception) m_loop.log() << "Uncaught exception in daemonized task."; } +EventLoopRef::EventLoopRef(EventLoop& loop, std::unique_lock* lock) : m_loop(&loop), m_lock(lock) +{ + auto loop_lock{PtrOrValue{m_lock, m_loop->m_mutex}}; + m_loop->addClient(*loop_lock); +} + +bool EventLoopRef::reset() +{ + bool done = false; + if (auto* loop{m_loop}) { + m_loop = nullptr; + auto loop_lock{PtrOrValue{m_lock, loop->m_mutex}}; + done = loop->removeClient(*loop_lock); + } + return done; +} + Connection::~Connection() { // Shut down RPC system first, since this will garbage collect Server From 315ff537fb6550468c3148e795dffb78313bd11f Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Sun, 9 Feb 2025 14:21:37 -0500 Subject: [PATCH 04/13] refactor: Add ProxyContext EventLoop* member This commit makes mechanical changes needed to simplify an upcoming commit which replaces EventLoop* with an EventLoopRef. This change also happens to be also useful on its own so clientInvoke can detect disconnections in a non-racy way (https://github.com/bitcoin-core/libmultiprocess/issues/123#issuecomment-2516088243) by seeing if the client Connection pointer is null while holding the event loop mutex. --- include/mp/proxy-io.h | 2 +- include/mp/proxy-types.h | 28 ++++++++++++++-------------- include/mp/proxy.h | 3 ++- include/mp/type-context.h | 8 ++++---- src/mp/proxy.cpp | 2 ++ 5 files changed, 23 insertions(+), 20 deletions(-) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 74ecdc85..58270aa5 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -446,7 +446,7 @@ ProxyClientBase::ProxyClientBase(typename Interface::Client cli Sub::destroy(*this); // FIXME: Could just invoke removed addCleanup fn here instead of duplicating code - m_context.connection->m_loop.sync([&]() { + m_context.loop->sync([&]() { // Release client capability by move-assigning to temporary. { typename Interface::Client(std::move(m_client)); diff --git a/include/mp/proxy-types.h b/include/mp/proxy-types.h index 607aaccd..6f74bedc 100644 --- a/include/mp/proxy-types.h +++ b/include/mp/proxy-types.h @@ -568,7 +568,7 @@ template void clientDestroy(Client& client) { if (client.m_context.connection) { - client.m_context.connection->m_loop.log() << "IPC client destroy " << typeid(client).name(); + client.m_context.loop->log() << "IPC client destroy " << typeid(client).name(); } else { KJ_LOG(INFO, "IPC interrupted client destroy", typeid(client).name()); } @@ -577,7 +577,7 @@ void clientDestroy(Client& client) template void serverDestroy(Server& server) { - server.m_context.connection->m_loop.log() << "IPC server destroy " << typeid(server).name(); + server.m_context.loop->log() << "IPC server destroy " << typeid(server).name(); } //! Entry point called by generated client code that looks like: @@ -597,7 +597,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel } if (!g_thread_context.waiter) { assert(g_thread_context.thread_name.empty()); - g_thread_context.thread_name = ThreadName(proxy_client.m_context.connection->m_loop.m_exe_name); + g_thread_context.thread_name = ThreadName(proxy_client.m_context.loop->m_exe_name); // If next assert triggers, it means clientInvoke is being called from // the capnp event loop thread. This can happen when a ProxyServer // method implementation that runs synchronously on the event loop @@ -608,7 +608,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel // declaration so the server method runs in a dedicated thread. assert(!g_thread_context.loop_thread); g_thread_context.waiter = std::make_unique(); - proxy_client.m_context.connection->m_loop.logPlain() + proxy_client.m_context.loop->logPlain() << "{" << g_thread_context.thread_name << "} IPC client first request from current thread, constructing waiter"; } @@ -616,18 +616,18 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel std::exception_ptr exception; std::string kj_exception; bool done = false; - proxy_client.m_context.connection->m_loop.sync([&]() { + proxy_client.m_context.loop->sync([&]() { auto request = (proxy_client.m_client.*get_request)(nullptr); using Request = CapRequestTraits; using FieldList = typename ProxyClientMethodTraits::Fields; IterateFields().handleChain(invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...); - proxy_client.m_context.connection->m_loop.logPlain() + proxy_client.m_context.loop->logPlain() << "{" << invoke_context.thread_context.thread_name << "} IPC client send " << TypeName() << " " << LogEscape(request.toString()); - proxy_client.m_context.connection->m_loop.m_task_set->add(request.send().then( + proxy_client.m_context.loop->m_task_set->add(request.send().then( [&](::capnp::Response&& response) { - proxy_client.m_context.connection->m_loop.logPlain() + proxy_client.m_context.loop->logPlain() << "{" << invoke_context.thread_context.thread_name << "} IPC client recv " << TypeName() << " " << LogEscape(response.toString()); try { @@ -642,7 +642,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel }, [&](const ::kj::Exception& e) { kj_exception = kj::str("kj::Exception: ", e).cStr(); - proxy_client.m_context.connection->m_loop.logPlain() + proxy_client.m_context.loop->logPlain() << "{" << invoke_context.thread_context.thread_name << "} IPC client exception " << kj_exception; const std::unique_lock lock(invoke_context.thread_context.waiter->m_mutex); done = true; @@ -653,7 +653,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel std::unique_lock lock(invoke_context.thread_context.waiter->m_mutex); invoke_context.thread_context.waiter->wait(lock, [&done]() { return done; }); if (exception) std::rethrow_exception(exception); - if (!kj_exception.empty()) proxy_client.m_context.connection->m_loop.raise() << kj_exception; + if (!kj_exception.empty()) proxy_client.m_context.loop->raise() << kj_exception; } //! Invoke callable `fn()` that may return void. If it does return void, replace @@ -687,7 +687,7 @@ kj::Promise serverInvoke(Server& server, CallContext& call_context, Fn fn) using Results = typename decltype(call_context.getResults())::Builds; int req = ++server_reqs; - server.m_context.connection->m_loop.log() << "IPC server recv request #" << req << " " + server.m_context.loop->log() << "IPC server recv request #" << req << " " << TypeName() << " " << LogEscape(params.toString()); try { @@ -704,14 +704,14 @@ kj::Promise serverInvoke(Server& server, CallContext& call_context, Fn fn) return ReplaceVoid([&]() { return fn.invoke(server_context, ArgList()); }, [&]() { return kj::Promise(kj::mv(call_context)); }) .then([&server, req](CallContext call_context) { - server.m_context.connection->m_loop.log() << "IPC server send response #" << req << " " << TypeName() + server.m_context.loop->log() << "IPC server send response #" << req << " " << TypeName() << " " << LogEscape(call_context.getResults().toString()); }); } catch (const std::exception& e) { - server.m_context.connection->m_loop.log() << "IPC server unhandled exception: " << e.what(); + server.m_context.loop->log() << "IPC server unhandled exception: " << e.what(); throw; } catch (...) { - server.m_context.connection->m_loop.log() << "IPC server unhandled exception"; + server.m_context.loop->log() << "IPC server unhandled exception"; throw; } } diff --git a/include/mp/proxy.h b/include/mp/proxy.h index 0e894e5d..ab911c4a 100644 --- a/include/mp/proxy.h +++ b/include/mp/proxy.h @@ -72,9 +72,10 @@ class EventLoopRef struct ProxyContext { Connection* connection; + EventLoop* loop; CleanupList cleanup_fns; - ProxyContext(Connection* connection) : connection(connection) {} + ProxyContext(Connection* connection); }; //! Base class for generated ProxyClient classes that implement a C++ interface diff --git a/include/mp/type-context.h b/include/mp/type-context.h index cf040c7b..399f8985 100644 --- a/include/mp/type-context.h +++ b/include/mp/type-context.h @@ -132,13 +132,13 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& return; } KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { - server.m_context.connection->m_loop.sync([&] { + server.m_context.loop->sync([&] { auto fulfiller_dispose = kj::mv(fulfiller); fulfiller_dispose->fulfill(kj::mv(call_context)); }); })) { - server.m_context.connection->m_loop.sync([&]() { + server.m_context.loop->sync([&]() { auto fulfiller_dispose = kj::mv(fulfiller); fulfiller_dispose->reject(kj::mv(*exception)); }); @@ -156,11 +156,11 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& // thread. KJ_IF_MAYBE (thread_server, perhaps) { const auto& thread = static_cast&>(*thread_server); - server.m_context.connection->m_loop.log() + server.m_context.loop->log() << "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}"; thread.m_thread_context.waiter->post(std::move(invoke)); } else { - server.m_context.connection->m_loop.log() + server.m_context.loop->log() << "IPC server error request #" << req << ", missing thread to execute request"; throw std::runtime_error("invalid thread handle"); } diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index 00252580..dab1c306 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -65,6 +65,8 @@ bool EventLoopRef::reset() return done; } +ProxyContext::ProxyContext(Connection* connection) : connection(connection), loop{&connection->m_loop} {} + Connection::~Connection() { // Shut down RPC system first, since this will garbage collect Server From 2b830e558e6153a3c0b4b3b5a69697dbace27101 Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Sun, 9 Feb 2025 14:21:37 -0500 Subject: [PATCH 05/13] refactor: Use EventLoopRef instead of addClient/removeClient Use EventLoopRef to avoid reference counting bugs and be more exception safe --- include/mp/proxy-io.h | 36 +++++------------------------------- include/mp/proxy.h | 2 +- src/mp/proxy.cpp | 40 +++++++++++++++++++++++----------------- 3 files changed, 29 insertions(+), 49 deletions(-) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 58270aa5..5be834de 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -313,21 +313,13 @@ class Connection Connection(EventLoop& loop, kj::Own&& stream_) : m_loop(loop), m_stream(kj::mv(stream_)), m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()), - m_rpc_system(::capnp::makeRpcClient(m_network)) - { - std::unique_lock lock(m_loop.m_mutex); - m_loop.addClient(lock); - } + m_rpc_system(::capnp::makeRpcClient(m_network)) {} Connection(EventLoop& loop, kj::Own&& stream_, const std::function<::capnp::Capability::Client(Connection&)>& make_client) : m_loop(loop), m_stream(kj::mv(stream_)), m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()), - m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) - { - std::unique_lock lock(m_loop.m_mutex); - m_loop.addClient(lock); - } + m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) {} //! Run cleanup functions. Must be called from the event loop thread. First //! calls synchronous cleanup functions while blocked (to free capnp @@ -356,12 +348,12 @@ class Connection // to the EventLoop TaskSet to avoid "Promise callback destroyed itself" // error in cases where f deletes this Connection object. m_on_disconnect.add(m_network.onDisconnect().then( - [f = std::forward(f), this]() mutable { m_loop.m_task_set->add(kj::evalLater(kj::mv(f))); })); + [f = std::forward(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); })); } - EventLoop& m_loop; + EventLoopRef m_loop; kj::Own m_stream; - LoggingErrorHandler m_error_handler{m_loop}; + LoggingErrorHandler m_error_handler{*m_loop}; kj::TaskSet m_on_disconnect{m_error_handler}; ::capnp::TwoPartyVatNetwork m_network; std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system; @@ -404,21 +396,12 @@ ProxyClientBase::ProxyClientBase(typename Interface::Client cli : m_client(std::move(client)), m_context(connection) { - { - std::unique_lock lock(m_context.connection->m_loop.m_mutex); - m_context.connection->m_loop.addClient(lock); - } - // Handler for the connection getting destroyed before this client object. auto cleanup_it = m_context.connection->addSyncCleanup([this]() { // Release client capability by move-assigning to temporary. { typename Interface::Client(std::move(m_client)); } - { - std::unique_lock lock(m_context.connection->m_loop.m_mutex); - m_context.connection->m_loop.removeClient(lock); - } m_context.connection = nullptr; }); @@ -451,11 +434,6 @@ ProxyClientBase::ProxyClientBase(typename Interface::Client cli { typename Interface::Client(std::move(m_client)); } - { - std::unique_lock lock(m_context.connection->m_loop.m_mutex); - m_context.connection->m_loop.removeClient(lock); - } - if (destroy_connection) { delete m_context.connection; m_context.connection = nullptr; @@ -477,8 +455,6 @@ ProxyServerBase::ProxyServerBase(std::shared_ptr impl, Co : m_impl(std::move(impl)), m_context(&connection) { assert(m_impl); - std::unique_lock lock(m_context.connection->m_loop.m_mutex); - m_context.connection->m_loop.addClient(lock); } //! ProxyServer destructor, called from the EventLoop thread by Cap'n Proto @@ -512,8 +488,6 @@ ProxyServerBase::~ProxyServerBase() }); } assert(m_context.cleanup_fns.empty()); - std::unique_lock lock(m_context.connection->m_loop.m_mutex); - m_context.connection->m_loop.removeClient(lock); } //! If the capnp interface defined a special "destroy" method, as described the diff --git a/include/mp/proxy.h b/include/mp/proxy.h index ab911c4a..34efd369 100644 --- a/include/mp/proxy.h +++ b/include/mp/proxy.h @@ -72,7 +72,7 @@ class EventLoopRef struct ProxyContext { Connection* connection; - EventLoop* loop; + EventLoopRef loop; CleanupList cleanup_fns; ProxyContext(Connection* connection); diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index dab1c306..5fd654af 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -65,7 +65,7 @@ bool EventLoopRef::reset() return done; } -ProxyContext::ProxyContext(Connection* connection) : connection(connection), loop{&connection->m_loop} {} +ProxyContext::ProxyContext(Connection* connection) : connection(connection), loop{*connection->m_loop} {} Connection::~Connection() { @@ -122,18 +122,17 @@ Connection::~Connection() m_sync_cleanup_fns.pop_front(); } while (!m_async_cleanup_fns.empty()) { - const std::unique_lock lock(m_loop.m_mutex); - m_loop.m_async_fns.emplace_back(std::move(m_async_cleanup_fns.front())); + const std::unique_lock lock(m_loop->m_mutex); + m_loop->m_async_fns.emplace_back(std::move(m_async_cleanup_fns.front())); m_async_cleanup_fns.pop_front(); } - std::unique_lock lock(m_loop.m_mutex); - m_loop.startAsyncThread(lock); - m_loop.removeClient(lock); + std::unique_lock lock(m_loop->m_mutex); + m_loop->startAsyncThread(lock); } CleanupIt Connection::addSyncCleanup(std::function fn) { - const std::unique_lock lock(m_loop.m_mutex); + const std::unique_lock lock(m_loop->m_mutex); // Add cleanup callbacks to the front of list, so sync cleanup functions run // in LIFO order. This is a good approach because sync cleanup functions are // added as client objects are created, and it is natural to clean up @@ -147,13 +146,13 @@ CleanupIt Connection::addSyncCleanup(std::function fn) void Connection::removeSyncCleanup(CleanupIt it) { - const std::unique_lock lock(m_loop.m_mutex); + const std::unique_lock lock(m_loop->m_mutex); m_sync_cleanup_fns.erase(it); } void Connection::addAsyncCleanup(std::function fn) { - const std::unique_lock lock(m_loop.m_mutex); + const std::unique_lock lock(m_loop->m_mutex); // Add async cleanup callbacks to the back of the list. Unlike the sync // cleanup list, this list order is more significant because it determines // the order server objects are destroyed when there is a sudden disconnect, @@ -244,7 +243,7 @@ void EventLoop::post(const std::function& fn) return; } std::unique_lock lock(m_mutex); - addClient(lock); + EventLoopRef ref(*this, &lock); m_cv.wait(lock, [this] { return m_post_fn == nullptr; }); m_post_fn = &fn; int post_fd{m_post_fd}; @@ -253,13 +252,13 @@ void EventLoop::post(const std::function& fn) KJ_SYSCALL(write(post_fd, &buffer, 1)); }); m_cv.wait(lock, [this, &fn] { return m_post_fn != &fn; }); - removeClient(lock); } void EventLoop::addClient(std::unique_lock& lock) { m_num_clients += 1; } bool EventLoop::removeClient(std::unique_lock& lock) { + assert(m_num_clients > 0); m_num_clients -= 1; if (done(lock)) { m_cv.notify_all(); @@ -267,6 +266,8 @@ bool EventLoop::removeClient(std::unique_lock& lock) lock.unlock(); char buffer = 0; KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon) + // Do not try to relock `lock` after writing, because the event loop + // could wake up and destroy itself and the mutex might no longer exist. return true; } return false; @@ -275,20 +276,25 @@ bool EventLoop::removeClient(std::unique_lock& lock) void EventLoop::startAsyncThread(std::unique_lock& lock) { if (m_async_thread.joinable()) { + // Notify to wake up the async thread if it is already running. m_cv.notify_all(); } else if (!m_async_fns.empty()) { m_async_thread = std::thread([this] { std::unique_lock lock(m_mutex); - while (true) { + while (!done(lock)) { if (!m_async_fns.empty()) { - addClient(lock); + EventLoopRef ref{*this, &lock}; const std::function fn = std::move(m_async_fns.front()); m_async_fns.pop_front(); Unlock(lock, fn); - if (removeClient(lock)) break; + // Reset ref and break if that returns true instead of + // passively letting ref go out of scope. This is important + // because the ref destructor would leave m_mutex unlocked + // when done() returns true, causing undefined behavior if + // the loop continued to execute. + if (ref.reset()) break; + // Continue without waiting in case there are more async_fns continue; - } else if (m_num_clients == 0) { - break; } m_cv.wait(lock); } @@ -394,7 +400,7 @@ kj::Promise ProxyServer::makeThread(MakeThreadContext context) const std::string from = context.getParams().getName(); std::promise thread_context; std::thread thread([&thread_context, from, this]() { - g_thread_context.thread_name = ThreadName(m_connection.m_loop.m_exe_name) + " (from " + from + ")"; + g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")"; g_thread_context.waiter = std::make_unique(); thread_context.set_value(&g_thread_context); std::unique_lock lock(g_thread_context.waiter->m_mutex); From f24894794adf946a27deae49910771a9af6ef0fb Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Sun, 9 Feb 2025 14:29:32 -0500 Subject: [PATCH 06/13] refactor: Drop addClient/removeClient methods Now that they are only called in one place by EventLoopRef class, they can be inlined. --- include/mp/proxy-io.h | 4 ---- include/mp/proxy.h | 2 +- src/mp/proxy.cpp | 36 ++++++++++++++---------------------- 3 files changed, 15 insertions(+), 27 deletions(-) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 5be834de..bd95ba78 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -190,10 +190,6 @@ class EventLoop //! other IPC calls. void startAsyncThread(std::unique_lock& lock); - //! Add/remove remote client reference counts. Can use EventLoopRef - //! below to avoid calling these directly. - void addClient(std::unique_lock& lock); - bool removeClient(std::unique_lock& lock); //! Check if loop should exit. bool done(std::unique_lock& lock) const; diff --git a/include/mp/proxy.h b/include/mp/proxy.h index 34efd369..9097c501 100644 --- a/include/mp/proxy.h +++ b/include/mp/proxy.h @@ -48,7 +48,7 @@ inline void CleanupRun(CleanupList& fns) { } } -//! Event loop smart pointer automatically calling addClient and removeClient. +//! Event loop smart pointer automatically managing m_num_clients. //! If a lock pointer argument is passed, the specified lock will be used, //! otherwise EventLoop::m_mutex will be locked when needed. class EventLoopRef diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index 5fd654af..4a914326 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -51,7 +51,7 @@ void LoggingErrorHandler::taskFailed(kj::Exception&& exception) EventLoopRef::EventLoopRef(EventLoop& loop, std::unique_lock* lock) : m_loop(&loop), m_lock(lock) { auto loop_lock{PtrOrValue{m_lock, m_loop->m_mutex}}; - m_loop->addClient(*loop_lock); + m_loop->m_num_clients += 1; } bool EventLoopRef::reset() @@ -60,7 +60,18 @@ bool EventLoopRef::reset() if (auto* loop{m_loop}) { m_loop = nullptr; auto loop_lock{PtrOrValue{m_lock, loop->m_mutex}}; - done = loop->removeClient(*loop_lock); + assert(loop->m_num_clients > 0); + loop->m_num_clients -= 1; + if (loop->done(*loop_lock)) { + done = true; + loop->m_cv.notify_all(); + int post_fd{loop->m_post_fd}; + loop_lock->unlock(); + char buffer = 0; + KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon) + // Do not try to relock `loop_lock` after writing, because the event loop + // could wake up and destroy itself and the mutex might no longer exist. + } } return done; } @@ -220,7 +231,7 @@ void EventLoop::loop() m_cv.notify_all(); } else if (done(lock)) { // Intentionally do not break if m_post_fn was set, even if done() - // would return true, to ensure that the removeClient write(post_fd) + // would return true, to ensure that the EventLoopRef write(post_fd) // call always succeeds and the loop does not exit between the time // that the done condition is set and the write call is made. break; @@ -254,25 +265,6 @@ void EventLoop::post(const std::function& fn) m_cv.wait(lock, [this, &fn] { return m_post_fn != &fn; }); } -void EventLoop::addClient(std::unique_lock& lock) { m_num_clients += 1; } - -bool EventLoop::removeClient(std::unique_lock& lock) -{ - assert(m_num_clients > 0); - m_num_clients -= 1; - if (done(lock)) { - m_cv.notify_all(); - int post_fd{m_post_fd}; - lock.unlock(); - char buffer = 0; - KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon) - // Do not try to relock `lock` after writing, because the event loop - // could wake up and destroy itself and the mutex might no longer exist. - return true; - } - return false; -} - void EventLoop::startAsyncThread(std::unique_lock& lock) { if (m_async_thread.joinable()) { From 52256e730f516af3d4ec5068c0157a183f7aee6e Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Tue, 4 Feb 2025 08:32:52 -0500 Subject: [PATCH 07/13] refactor: Remove DestructorCatcher and AsyncCallable Use kj::Function instead of std::function to avoid the need for AsyncCallable and DestructorCatcher classes, which were used to work around the requirement that std::function objects need to be copyable. kj::Function does not have this requirement. Change is from https://github.com/chaincodelabs/libmultiprocess/pull/144#discussion_r1941178169 Co-authored-by: Ryan Ofsky --- include/mp/proxy-io.h | 19 +++++++++---------- include/mp/type-context.h | 7 +++---- include/mp/util.h | 40 --------------------------------------- src/mp/proxy.cpp | 3 ++- 4 files changed, 14 insertions(+), 55 deletions(-) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index bd95ba78..6d1b77fd 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -14,9 +14,10 @@ #include #include -#include +#include #include #include +#include #include #include @@ -166,7 +167,7 @@ class EventLoop //! Run function on event loop thread. Does not return until function completes. //! Must be called while the loop() function is active. - void post(const std::function& fn); + void post(kj::Function fn); //! Wrapper around EventLoop::post that takes advantage of the //! fact that callable will not go out of scope to avoid requirement that it @@ -174,7 +175,7 @@ class EventLoop template void sync(Callable&& callable) { - post(std::ref(callable)); + post(std::forward(callable)); } //! Start asynchronous worker thread if necessary. This is only done if @@ -214,7 +215,7 @@ class EventLoop std::thread m_async_thread; //! Callback function to run on event loop thread during post() or sync() call. - const std::function* m_post_fn = nullptr; + kj::Function* m_post_fn = nullptr; //! Callback functions to run on async thread. CleanupList m_async_fns; @@ -282,11 +283,9 @@ struct Waiter // in the case where a capnp response is sent and a brand new // request is immediately received. while (m_fn) { - auto fn = std::move(m_fn); - m_fn = nullptr; - lock.unlock(); - fn(); - lock.lock(); + auto fn = std::move(*m_fn); + m_fn.reset(); + Unlock(lock, fn); } const bool done = pred(); return done; @@ -295,7 +294,7 @@ struct Waiter std::mutex m_mutex; std::condition_variable m_cv; - std::function m_fn; + std::optional> m_fn; }; //! Object holding network & rpc state associated with either an incoming server diff --git a/include/mp/type-context.h b/include/mp/type-context.h index 399f8985..952734f3 100644 --- a/include/mp/type-context.h +++ b/include/mp/type-context.h @@ -64,8 +64,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& auto future = kj::newPromiseAndFulfiller(); auto& server = server_context.proxy_server; int req = server_context.req; - auto invoke = MakeAsyncCallable( - [fulfiller = kj::mv(future.fulfiller), + auto invoke = [fulfiller = kj::mv(future.fulfiller), call_context = kj::mv(server_context.call_context), &server, req, fn, args...]() mutable { const auto& params = call_context.getParams(); Context::Reader context_arg = Accessor::get(params); @@ -143,14 +142,14 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& fulfiller_dispose->reject(kj::mv(*exception)); }); } - }); + }; // Lookup Thread object specified by the client. The specified thread should // be a local Thread::Server object, but it needs to be looked up // asynchronously with getLocalServer(). auto thread_client = context_arg.getThread(); return server.m_context.connection->m_threads.getLocalServer(thread_client) - .then([&server, invoke, req](const kj::Maybe& perhaps) { + .then([&server, invoke = kj::mv(invoke), req](const kj::Maybe& perhaps) mutable { // Assuming the thread object is found, pass it a pointer to the // `invoke` lambda above which will invoke the function on that // thread. diff --git a/include/mp/util.h b/include/mp/util.h index 974093b7..3f677cf6 100644 --- a/include/mp/util.h +++ b/include/mp/util.h @@ -161,46 +161,6 @@ void Unlock(Lock& lock, Callback&& callback) callback(); } -//! Needed for libc++/macOS compatibility. Lets code work with shared_ptr nothrow declaration -//! https://github.com/capnproto/capnproto/issues/553#issuecomment-328554603 -template -struct DestructorCatcher -{ - T value; - template - DestructorCatcher(Params&&... params) : value(kj::fwd(params)...) - { - } - ~DestructorCatcher() noexcept try { - } catch (const kj::Exception& e) { // NOLINT(bugprone-empty-catch) - } -}; - -//! Wrapper around callback function for compatibility with std::async. -//! -//! std::async requires callbacks to be copyable and requires noexcept -//! destructors, but this doesn't work well with kj types which are generally -//! move-only and not noexcept. -template -struct AsyncCallable -{ - AsyncCallable(Callable&& callable) : m_callable(std::make_shared>(std::move(callable))) - { - } - AsyncCallable(const AsyncCallable&) = default; - AsyncCallable(AsyncCallable&&) = default; - ~AsyncCallable() noexcept = default; - ResultOf operator()() const { return (m_callable->value)(); } - mutable std::shared_ptr> m_callable; -}; - -//! Construct AsyncCallable object. -template -AsyncCallable> MakeAsyncCallable(Callable&& callable) -{ - return std::forward(callable); -} - //! Format current thread name as "{exe_name}-{$pid}/{thread_name}-{$tid}". std::string ThreadName(const char* exe_name); diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index 4a914326..a5660d39 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -247,7 +248,7 @@ void EventLoop::loop() m_post_fd = -1; } -void EventLoop::post(const std::function& fn) +void EventLoop::post(kj::Function fn) { if (std::this_thread::get_id() == m_thread_id) { fn(); From 9b8ed3dc5f87718cfe862a8f448298bf0e268ef4 Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Thu, 15 May 2025 13:28:16 -0400 Subject: [PATCH 08/13] refactor: Add clang thread safety annotations to EventLoop Add basic thread safety annotations to EventLoop. Use could be expanded and deepened but this puts basic functionality in place. Use of annotations was discussed in https://github.com/chaincodelabs/libmultiprocess/pull/129#discussion_r1928455500 --- include/mp/proxy-io.h | 12 ++++++------ include/mp/proxy.h | 4 ++-- include/mp/util.h | 40 +++++++++++++++++++++++++++++++++++++++ src/mp/proxy.cpp | 44 +++++++++++++++++++++---------------------- 4 files changed, 70 insertions(+), 30 deletions(-) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 6d1b77fd..806fd2ab 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -189,10 +189,10 @@ class EventLoop //! is important that ProxyServer::m_impl destructors do not run on the //! eventloop thread because they may need it to do I/O if they perform //! other IPC calls. - void startAsyncThread(std::unique_lock& lock); + void startAsyncThread() MP_REQUIRES(m_mutex); //! Check if loop should exit. - bool done(std::unique_lock& lock) const; + bool done() const MP_REQUIRES(m_mutex); Logger log() { @@ -215,10 +215,10 @@ class EventLoop std::thread m_async_thread; //! Callback function to run on event loop thread during post() or sync() call. - kj::Function* m_post_fn = nullptr; + kj::Function* m_post_fn MP_GUARDED_BY(m_mutex) = nullptr; //! Callback functions to run on async thread. - CleanupList m_async_fns; + CleanupList m_async_fns MP_GUARDED_BY(m_mutex); //! Pipe read handle used to wake up the event loop thread. int m_wait_fd = -1; @@ -228,11 +228,11 @@ class EventLoop //! Number of clients holding references to ProxyServerBase objects that //! reference this event loop. - int m_num_clients = 0; + int m_num_clients MP_GUARDED_BY(m_mutex) = 0; //! Mutex and condition variable used to post tasks to event loop and async //! thread. - std::mutex m_mutex; + Mutex m_mutex; std::condition_variable m_cv; //! Capnp IO context. diff --git a/include/mp/proxy.h b/include/mp/proxy.h index 9097c501..7c355755 100644 --- a/include/mp/proxy.h +++ b/include/mp/proxy.h @@ -54,7 +54,7 @@ inline void CleanupRun(CleanupList& fns) { class EventLoopRef { public: - explicit EventLoopRef(EventLoop& loop, std::unique_lock* lock = nullptr); + explicit EventLoopRef(EventLoop& loop, Lock* lock = nullptr); EventLoopRef(EventLoopRef&& other) noexcept : m_loop(other.m_loop) { other.m_loop = nullptr; } EventLoopRef(const EventLoopRef&) = delete; EventLoopRef& operator=(const EventLoopRef&) = delete; @@ -65,7 +65,7 @@ class EventLoopRef bool reset(); EventLoop* m_loop{nullptr}; - std::unique_lock* m_lock{nullptr}; + Lock* m_lock{nullptr}; }; //! Context data associated with proxy client and server classes. diff --git a/include/mp/util.h b/include/mp/util.h index 3f677cf6..8e182d21 100644 --- a/include/mp/util.h +++ b/include/mp/util.h @@ -6,6 +6,7 @@ #define MP_UTIL_H #include +#include #include #include #include @@ -13,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -145,6 +147,44 @@ struct PtrOrValue { T* operator->() const { return &**this; } }; +// Annotated mutex and lock class (https://clang.llvm.org/docs/ThreadSafetyAnalysis.html) +#if defined(__clang__) && (!defined(SWIG)) +#define MP_TSA(x) __attribute__((x)) +#else +#define MP_TSA(x) // no-op +#endif + +#define MP_CAPABILITY(x) MP_TSA(capability(x)) +#define MP_SCOPED_CAPABILITY MP_TSA(scoped_lockable) +#define MP_REQUIRES(x) MP_TSA(requires_capability(x)) +#define MP_ACQUIRE(...) MP_TSA(acquire_capability(__VA_ARGS__)) +#define MP_RELEASE(...) MP_TSA(release_capability(__VA_ARGS__)) +#define MP_ASSERT_CAPABILITY(x) MP_TSA(assert_capability(x)) +#define MP_GUARDED_BY(x) MP_TSA(guarded_by(x)) + +class MP_CAPABILITY("mutex") Mutex { +public: + void lock() MP_ACQUIRE() { m_mutex.lock(); } + void unlock() MP_RELEASE() { m_mutex.unlock(); } + + std::mutex m_mutex; +}; + +class MP_SCOPED_CAPABILITY Lock { +public: + explicit Lock(Mutex& m) MP_ACQUIRE(m) : m_lock(m.m_mutex) {} + ~Lock() MP_RELEASE() {} + void unlock() MP_RELEASE() { m_lock.unlock(); } + void lock() MP_ACQUIRE() { m_lock.lock(); } + void assert_locked(Mutex& mutex) MP_ASSERT_CAPABILITY() MP_ASSERT_CAPABILITY(mutex) + { + assert(m_lock.mutex() == &mutex.m_mutex); + assert(m_lock); + } + + std::unique_lock m_lock; +}; + //! Analog to std::lock_guard that unlocks instead of locks. template struct UnlockGuard diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index a5660d39..12b96463 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -49,9 +49,10 @@ void LoggingErrorHandler::taskFailed(kj::Exception&& exception) m_loop.log() << "Uncaught exception in daemonized task."; } -EventLoopRef::EventLoopRef(EventLoop& loop, std::unique_lock* lock) : m_loop(&loop), m_lock(lock) +EventLoopRef::EventLoopRef(EventLoop& loop, Lock* lock) : m_loop(&loop), m_lock(lock) { auto loop_lock{PtrOrValue{m_lock, m_loop->m_mutex}}; + loop_lock->assert_locked(m_loop->m_mutex); m_loop->m_num_clients += 1; } @@ -61,9 +62,10 @@ bool EventLoopRef::reset() if (auto* loop{m_loop}) { m_loop = nullptr; auto loop_lock{PtrOrValue{m_lock, loop->m_mutex}}; + loop_lock->assert_locked(loop->m_mutex); assert(loop->m_num_clients > 0); loop->m_num_clients -= 1; - if (loop->done(*loop_lock)) { + if (loop->done()) { done = true; loop->m_cv.notify_all(); int post_fd{loop->m_post_fd}; @@ -134,17 +136,17 @@ Connection::~Connection() m_sync_cleanup_fns.pop_front(); } while (!m_async_cleanup_fns.empty()) { - const std::unique_lock lock(m_loop->m_mutex); + const Lock lock(m_loop->m_mutex); m_loop->m_async_fns.emplace_back(std::move(m_async_cleanup_fns.front())); m_async_cleanup_fns.pop_front(); } - std::unique_lock lock(m_loop->m_mutex); - m_loop->startAsyncThread(lock); + Lock lock(m_loop->m_mutex); + m_loop->startAsyncThread(); } CleanupIt Connection::addSyncCleanup(std::function fn) { - const std::unique_lock lock(m_loop->m_mutex); + const Lock lock(m_loop->m_mutex); // Add cleanup callbacks to the front of list, so sync cleanup functions run // in LIFO order. This is a good approach because sync cleanup functions are // added as client objects are created, and it is natural to clean up @@ -158,13 +160,13 @@ CleanupIt Connection::addSyncCleanup(std::function fn) void Connection::removeSyncCleanup(CleanupIt it) { - const std::unique_lock lock(m_loop->m_mutex); + const Lock lock(m_loop->m_mutex); m_sync_cleanup_fns.erase(it); } void Connection::addAsyncCleanup(std::function fn) { - const std::unique_lock lock(m_loop->m_mutex); + const Lock lock(m_loop->m_mutex); // Add async cleanup callbacks to the back of the list. Unlike the sync // cleanup list, this list order is more significant because it determines // the order server objects are destroyed when there is a sudden disconnect, @@ -200,7 +202,7 @@ EventLoop::EventLoop(const char* exe_name, LogFn log_fn, void* context) EventLoop::~EventLoop() { if (m_async_thread.joinable()) m_async_thread.join(); - const std::lock_guard lock(m_mutex); + const Lock lock(m_mutex); KJ_ASSERT(m_post_fn == nullptr); KJ_ASSERT(m_async_fns.empty()); KJ_ASSERT(m_wait_fd == -1); @@ -225,12 +227,12 @@ void EventLoop::loop() for (;;) { const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope); if (read_bytes != 1) throw std::logic_error("EventLoop wait_stream closed unexpectedly"); - std::unique_lock lock(m_mutex); + Lock lock(m_mutex); if (m_post_fn) { Unlock(lock, *m_post_fn); m_post_fn = nullptr; m_cv.notify_all(); - } else if (done(lock)) { + } else if (done()) { // Intentionally do not break if m_post_fn was set, even if done() // would return true, to ensure that the EventLoopRef write(post_fd) // call always succeeds and the loop does not exit between the time @@ -243,7 +245,7 @@ void EventLoop::loop() log() << "EventLoop::loop bye."; wait_stream = nullptr; KJ_SYSCALL(::close(post_fd)); - const std::unique_lock lock(m_mutex); + const Lock lock(m_mutex); m_wait_fd = -1; m_post_fd = -1; } @@ -254,27 +256,27 @@ void EventLoop::post(kj::Function fn) fn(); return; } - std::unique_lock lock(m_mutex); + Lock lock(m_mutex); EventLoopRef ref(*this, &lock); - m_cv.wait(lock, [this] { return m_post_fn == nullptr; }); + m_cv.wait(lock.m_lock, [this]() MP_REQUIRES(m_mutex) { return m_post_fn == nullptr; }); m_post_fn = &fn; int post_fd{m_post_fd}; Unlock(lock, [&] { char buffer = 0; KJ_SYSCALL(write(post_fd, &buffer, 1)); }); - m_cv.wait(lock, [this, &fn] { return m_post_fn != &fn; }); + m_cv.wait(lock.m_lock, [this, &fn]() MP_REQUIRES(m_mutex) { return m_post_fn != &fn; }); } -void EventLoop::startAsyncThread(std::unique_lock& lock) +void EventLoop::startAsyncThread() { if (m_async_thread.joinable()) { // Notify to wake up the async thread if it is already running. m_cv.notify_all(); } else if (!m_async_fns.empty()) { m_async_thread = std::thread([this] { - std::unique_lock lock(m_mutex); - while (!done(lock)) { + Lock lock(m_mutex); + while (!done()) { if (!m_async_fns.empty()) { EventLoopRef ref{*this, &lock}; const std::function fn = std::move(m_async_fns.front()); @@ -289,17 +291,15 @@ void EventLoop::startAsyncThread(std::unique_lock& lock) // Continue without waiting in case there are more async_fns continue; } - m_cv.wait(lock); + m_cv.wait(lock.m_lock); } }); } } -bool EventLoop::done(std::unique_lock& lock) const +bool EventLoop::done() const { assert(m_num_clients >= 0); - assert(lock.owns_lock()); - assert(lock.mutex() == &m_mutex); return m_num_clients == 0 && m_async_fns.empty(); } From 56fff76f940b5be7dc41a2532195becf8b15230e Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Thu, 24 Apr 2025 08:18:30 -0400 Subject: [PATCH 09/13] Improve IPC client disconnected exceptions Improve clientInvoke exceptions so IPC clients can more reliably detect when they are calling a remote method after the connection is closed. Before this change different exceptions were thrown, which made this condition difficult to detect and handle. --- include/mp/proxy-types.h | 23 +++++++++++++++++------ test/mp/test/test.cpp | 10 ++++------ 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/include/mp/proxy-types.h b/include/mp/proxy-types.h index 6f74bedc..c2f09865 100644 --- a/include/mp/proxy-types.h +++ b/include/mp/proxy-types.h @@ -592,9 +592,6 @@ void serverDestroy(Server& server) template void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, FieldObjs&&... fields) { - if (!proxy_client.m_context.connection) { - throw std::logic_error("clientInvoke call made after disconnect"); - } if (!g_thread_context.waiter) { assert(g_thread_context.thread_name.empty()); g_thread_context.thread_name = ThreadName(proxy_client.m_context.loop->m_exe_name); @@ -616,7 +613,16 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel std::exception_ptr exception; std::string kj_exception; bool done = false; + const char* disconnected = nullptr; proxy_client.m_context.loop->sync([&]() { + if (!proxy_client.m_context.connection) { + const std::unique_lock lock(invoke_context.thread_context.waiter->m_mutex); + done = true; + disconnected = "IPC client method called after disconnect."; + invoke_context.thread_context.waiter->m_cv.notify_all(); + return; + } + auto request = (proxy_client.m_client.*get_request)(nullptr); using Request = CapRequestTraits; using FieldList = typename ProxyClientMethodTraits::Fields; @@ -641,9 +647,13 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel invoke_context.thread_context.waiter->m_cv.notify_all(); }, [&](const ::kj::Exception& e) { - kj_exception = kj::str("kj::Exception: ", e).cStr(); - proxy_client.m_context.loop->logPlain() - << "{" << invoke_context.thread_context.thread_name << "} IPC client exception " << kj_exception; + if (e.getType() == ::kj::Exception::Type::DISCONNECTED) { + disconnected = "IPC client method call interrupted by disconnect."; + } else { + kj_exception = kj::str("kj::Exception: ", e).cStr(); + proxy_client.m_context.loop->logPlain() + << "{" << invoke_context.thread_context.thread_name << "} IPC client exception " << kj_exception; + } const std::unique_lock lock(invoke_context.thread_context.waiter->m_mutex); done = true; invoke_context.thread_context.waiter->m_cv.notify_all(); @@ -654,6 +664,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel invoke_context.thread_context.waiter->wait(lock, [&done]() { return done; }); if (exception) std::rethrow_exception(exception); if (!kj_exception.empty()) proxy_client.m_context.loop->raise() << kj_exception; + if (disconnected) proxy_client.m_context.loop->raise() << disconnected; } //! Invoke callable `fn()` that may return void. If it does return void, replace diff --git a/test/mp/test/test.cpp b/test/mp/test/test.cpp index ee38b6f5..46bd4e93 100644 --- a/test/mp/test/test.cpp +++ b/test/mp/test/test.cpp @@ -35,7 +35,7 @@ namespace test { * object destroys the client Connection object. Normally it makes sense for * this to be true to simplify shutdown and avoid needing to call * client_disconnect manually, but false allows testing more ProxyClient - * behavior and the "clientInvoke call made after disconnect" code path. + * behavior and the "IPC client method called after disconnect" code path. */ class TestSetup { @@ -191,8 +191,8 @@ KJ_TEST("Call IPC method after client connection is closed") bool disconnected{false}; try { foo->add(1, 2); - } catch (const std::logic_error& e) { - KJ_EXPECT(std::string_view{e.what()} == "clientInvoke call made after disconnect"); + } catch (const std::runtime_error& e) { + KJ_EXPECT(std::string_view{e.what()} == "IPC client method called after disconnect."); disconnected = true; } KJ_EXPECT(disconnected); @@ -209,9 +209,7 @@ KJ_TEST("Calling IPC method after server connection is closed") try { foo->add(1, 2); } catch (const std::runtime_error& e) { - std::string_view error{e.what()}; - KJ_EXPECT(error.starts_with("kj::Exception: ")); - KJ_EXPECT(error.find("disconnected: Peer disconnected.") != std::string_view::npos); + KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect."); disconnected = true; } KJ_EXPECT(disconnected); From 616d9a75d20a3dbcfb02696ee64ef1761cee49ac Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Mon, 9 Jun 2025 10:05:49 -0400 Subject: [PATCH 10/13] doc: Document ProxyClientBase destroy_connection option Suggested by Sjors Provoost https://github.com/bitcoin-core/libmultiprocess/pull/160#discussion_r2135564583 --- include/mp/proxy.h | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/include/mp/proxy.h b/include/mp/proxy.h index 7c355755..08a40981 100644 --- a/include/mp/proxy.h +++ b/include/mp/proxy.h @@ -89,6 +89,15 @@ class ProxyClientBase : public Impl_ using Sub = ProxyClient; using Super = ProxyClientBase; + //! Construct libmultiprocess client object wrapping Cap'n Proto client + //! object with a reference to the associated mp::Connection object. + //! + //! The destroy_connection option determines whether destroying this client + //! object closes the connection. It is set to true for the + //! ProxyClient object returned by ConnectStream, to let IPC + //! clients close the connection by freeing the object. It is false for + //! other client objects so they can be destroyed without affecting the + //! connection. ProxyClientBase(typename Interface::Client client, Connection* connection, bool destroy_connection); ~ProxyClientBase() noexcept; From ea38392960e1edbdb423bebc5cc0a665fa83a253 Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Tue, 10 Jun 2025 13:18:24 -0400 Subject: [PATCH 11/13] Prevent EventLoop async cleanup thread early exit during shutdown Antoine Poinsot reported a bug with details in https://github.com/bitcoin-core/libmultiprocess/issues/182#issuecomment-2960280611 where an IPC client rapidly connecting and disconnecting to the server in a loop could cause problems. One problem, fixed by this commit, was that if a server process is shutting down, the async cleanup thread in `EventLoop::startAsyncThread` responsible for destroying unused server object on unclean disconnects could detect the done() condition and decide to exit right before a new incoming connection is processed, and exit prematurely when there might be more cleanup work for it do. If this happens, process shutdown could hang waiting for cleanup work that will never be completed. This commit fixes that problem by changing the `EventLoop::startAsyncThread()` while condition to check whether the `EventLoop::loop() method has _exited_, instead of checking whether it is about to exit. Specifically the change makes the `m_async_fns` list into a `std::optional` value and sets it to `nullopt` when the `loop()` method exits. --- include/mp/proxy-io.h | 2 +- include/mp/proxy.h | 2 +- include/mp/util.h | 1 + src/mp/proxy.cpp | 47 +++++++++++++++++++++++++------------------ 4 files changed, 30 insertions(+), 22 deletions(-) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 806fd2ab..5f3d11e5 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -218,7 +218,7 @@ class EventLoop kj::Function* m_post_fn MP_GUARDED_BY(m_mutex) = nullptr; //! Callback functions to run on async thread. - CleanupList m_async_fns MP_GUARDED_BY(m_mutex); + std::optional m_async_fns MP_GUARDED_BY(m_mutex); //! Pipe read handle used to wake up the event loop thread. int m_wait_fd = -1; diff --git a/include/mp/proxy.h b/include/mp/proxy.h index 08a40981..94c65820 100644 --- a/include/mp/proxy.h +++ b/include/mp/proxy.h @@ -62,7 +62,7 @@ class EventLoopRef ~EventLoopRef() { reset(); } EventLoop& operator*() const { assert(m_loop); return *m_loop; } EventLoop* operator->() const { assert(m_loop); return m_loop; } - bool reset(); + void reset(bool relock=false); EventLoop* m_loop{nullptr}; Lock* m_lock{nullptr}; diff --git a/include/mp/util.h b/include/mp/util.h index 8e182d21..22e8188e 100644 --- a/include/mp/util.h +++ b/include/mp/util.h @@ -161,6 +161,7 @@ struct PtrOrValue { #define MP_RELEASE(...) MP_TSA(release_capability(__VA_ARGS__)) #define MP_ASSERT_CAPABILITY(x) MP_TSA(assert_capability(x)) #define MP_GUARDED_BY(x) MP_TSA(guarded_by(x)) +#define MP_NO_TSA MP_TSA(no_thread_safety_analysis) class MP_CAPABILITY("mutex") Mutex { public: diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index 12b96463..3b7fb2a9 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -56,9 +56,11 @@ EventLoopRef::EventLoopRef(EventLoop& loop, Lock* lock) : m_loop(&loop), m_lock( m_loop->m_num_clients += 1; } -bool EventLoopRef::reset() +// Due to the conditionals in this function, MP_NO_TSA is required to avoid +// error "error: mutex 'loop_lock' is not held on every path through here +// [-Wthread-safety-analysis]" +void EventLoopRef::reset(bool relock) MP_NO_TSA { - bool done = false; if (auto* loop{m_loop}) { m_loop = nullptr; auto loop_lock{PtrOrValue{m_lock, loop->m_mutex}}; @@ -66,17 +68,17 @@ bool EventLoopRef::reset() assert(loop->m_num_clients > 0); loop->m_num_clients -= 1; if (loop->done()) { - done = true; loop->m_cv.notify_all(); int post_fd{loop->m_post_fd}; loop_lock->unlock(); char buffer = 0; KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon) - // Do not try to relock `loop_lock` after writing, because the event loop - // could wake up and destroy itself and the mutex might no longer exist. + // By default, do not try to relock `loop_lock` after writing, + // because the event loop could wake up and destroy itself and the + // mutex might no longer exist. + if (relock) loop_lock->lock(); } } - return done; } ProxyContext::ProxyContext(Connection* connection) : connection(connection), loop{*connection->m_loop} {} @@ -137,7 +139,7 @@ Connection::~Connection() } while (!m_async_cleanup_fns.empty()) { const Lock lock(m_loop->m_mutex); - m_loop->m_async_fns.emplace_back(std::move(m_async_cleanup_fns.front())); + m_loop->m_async_fns->emplace_back(std::move(m_async_cleanup_fns.front())); m_async_cleanup_fns.pop_front(); } Lock lock(m_loop->m_mutex); @@ -204,7 +206,7 @@ EventLoop::~EventLoop() if (m_async_thread.joinable()) m_async_thread.join(); const Lock lock(m_mutex); KJ_ASSERT(m_post_fn == nullptr); - KJ_ASSERT(m_async_fns.empty()); + KJ_ASSERT(!m_async_fns); KJ_ASSERT(m_wait_fd == -1); KJ_ASSERT(m_post_fd == -1); KJ_ASSERT(m_num_clients == 0); @@ -220,6 +222,12 @@ void EventLoop::loop() g_thread_context.loop_thread = true; KJ_DEFER(g_thread_context.loop_thread = false); + { + const Lock lock(m_mutex); + assert(!m_async_fns); + m_async_fns.emplace(); + } + kj::Own wait_stream{ m_io_context.lowLevelProvider->wrapSocketFd(m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)}; int post_fd{m_post_fd}; @@ -248,6 +256,8 @@ void EventLoop::loop() const Lock lock(m_mutex); m_wait_fd = -1; m_post_fd = -1; + m_async_fns.reset(); + m_cv.notify_all(); } void EventLoop::post(kj::Function fn) @@ -270,24 +280,21 @@ void EventLoop::post(kj::Function fn) void EventLoop::startAsyncThread() { + assert (std::this_thread::get_id() == m_thread_id); if (m_async_thread.joinable()) { // Notify to wake up the async thread if it is already running. m_cv.notify_all(); - } else if (!m_async_fns.empty()) { + } else if (!m_async_fns->empty()) { m_async_thread = std::thread([this] { Lock lock(m_mutex); - while (!done()) { - if (!m_async_fns.empty()) { + while (m_async_fns) { + if (!m_async_fns->empty()) { EventLoopRef ref{*this, &lock}; - const std::function fn = std::move(m_async_fns.front()); - m_async_fns.pop_front(); + const std::function fn = std::move(m_async_fns->front()); + m_async_fns->pop_front(); Unlock(lock, fn); - // Reset ref and break if that returns true instead of - // passively letting ref go out of scope. This is important - // because the ref destructor would leave m_mutex unlocked - // when done() returns true, causing undefined behavior if - // the loop continued to execute. - if (ref.reset()) break; + // Important to relock because of the wait() call below. + ref.reset(/*relock=*/true); // Continue without waiting in case there are more async_fns continue; } @@ -300,7 +307,7 @@ void EventLoop::startAsyncThread() bool EventLoop::done() const { assert(m_num_clients >= 0); - return m_num_clients == 0 && m_async_fns.empty(); + return m_num_clients == 0 && m_async_fns->empty(); } std::tuple SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function& make_thread) From 949573da84112388f68d1893f55aae0ca7f12d0c Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Fri, 13 Jun 2025 13:15:13 -0400 Subject: [PATCH 12/13] Prevent IPC server crash if disconnected during IPC call Antoine Poinsot reported a bug with details in https://github.com/bitcoin-core/libmultiprocess/issues/182#issuecomment-2960280611 where an IPC client rapidly connecting and disconnecting to the server in a loop could cause problems. One problem this uncovered was hangs on shutdown fixed by the previous commit. Another problem it uncovered is that if a disconnect happened while the IPC server was executing an IPC call, and the onDisconnect() handling code ran before the IPC call finished, memory corruption would occur usually leading to a hang or crash. This happend because the ~ProxyServerBase() destructor was written with the assumption that it would always be called either from the ~Connection() destructor, or before that point, so its m_context.connection pointer would always be valid. Typically this was the case, but not if the connection was closed during an active IPC call. A unit test to reproduce this error is added in the subsequent commit. The fix for the bug here is actually a code simplification. Previously ~ProxyServerBase() destructor would append a std::function callback destroying the ProxyServerBase::m_impl object to m_context.connection->m_async_cleanup_fns, so the object destructor could be called asynchronously without blocking the event loop. Now it just appends the same callback function to m_context.loop->m_async_fns without going through the Connection object. The new code is an improvement over the previous code in two other ways: - It is a code simplification since previous code was needlessly complicated. - In the case where there is no disconnect, and the remote ProxyClient is destroyed, and no "destroy" method is declared, this change causes the ProxyServer::m_impl object to be freed shortly after the client is freed, instead of being delayed until the connection is closed. (If a "destroy" method is declared, this change has no effect because in that case destroying the ProxyClient calls destroy and destroys ProxyServer::m_impl synchronously.) The previous code was trying to make the ProxyServer cleanup with Connection::m_async_cleanup_fns mirror the ProxyClient cleanup with Connection::m_sync_cleanup_fns, but it was just fragile and unncecessarily complicated. Some comments about ProxyClient cleanup code are updated for clarity here but no changes are made. --- include/mp/proxy-io.h | 27 ++++++++++++++++++--------- src/mp/proxy.cpp | 25 +++++++++++-------------- 2 files changed, 29 insertions(+), 23 deletions(-) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 5f3d11e5..9f3eed62 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -178,6 +178,10 @@ class EventLoop post(std::forward(callable)); } + //! Register cleanup function to run on asynchronous worker thread without + //! blocking the event loop thread. + void addAsyncCleanup(std::function fn); + //! Start asynchronous worker thread if necessary. This is only done if //! there are ProxyServerBase::m_impl objects that need to be destroyed //! asynchronously, without tying up the event loop thread. This can happen @@ -329,10 +333,6 @@ class Connection CleanupIt addSyncCleanup(std::function fn); void removeSyncCleanup(CleanupIt it); - //! Register asynchronous cleanup function to run on worker thread when - //! disconnect() is called. - void addAsyncCleanup(std::function fn); - //! Add disconnect handler. template void onDisconnect(F&& f) @@ -361,11 +361,10 @@ class Connection //! ThreadMap.makeThread) used to service requests to clients. ::capnp::CapabilityServerSet m_threads; - //! Cleanup functions to run if connection is broken unexpectedly. - //! Lists will be empty if all ProxyClient and ProxyServer objects are - //! destroyed cleanly before the connection is destroyed. + //! Cleanup functions to run if connection is broken unexpectedly. List + //! will be empty if all ProxyClient are destroyed cleanly before the + //! connection is destroyed. CleanupList m_sync_cleanup_fns; - CleanupList m_async_cleanup_fns; }; //! Vat id for server side of connection. Required argument to RpcSystem::bootStrap() @@ -454,6 +453,16 @@ ProxyServerBase::ProxyServerBase(std::shared_ptr impl, Co //! ProxyServer destructor, called from the EventLoop thread by Cap'n Proto //! garbage collection code after there are no more references to this object. +//! This will typically happen when the corresponding ProxyClient object on the +//! other side of the connection is destroyed. It can also happen earlier if the +//! connection is broken or destroyed. In the latter case this destructor will +//! typically be called inside m_rpc_system.reset() call in the ~Connection +//! destructor while the Connection object still exists. However, because +//! ProxyServer objects are refcounted, and the Connection object could be +//! destroyed while asynchronous IPC calls are still in-flight, it's possible +//! for this destructor to be called after the Connection object no longer +//! exists, so it is NOT valid to dereference the m_context.connection pointer +//! from this function. template ProxyServerBase::~ProxyServerBase() { @@ -477,7 +486,7 @@ ProxyServerBase::~ProxyServerBase() // connection is broken). Probably some refactoring of the destructor // and invokeDestroy function is possible to make this cleaner and more // consistent. - m_context.connection->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(m_context.cleanup_fns)]() mutable { + m_context.loop->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(m_context.cleanup_fns)]() mutable { impl.reset(); CleanupRun(fns); }); diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index 3b7fb2a9..0f5e566a 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -85,10 +85,13 @@ ProxyContext::ProxyContext(Connection* connection) : connection(connection), loo Connection::~Connection() { - // Shut down RPC system first, since this will garbage collect Server - // objects that were not freed before the connection was closed, some of - // which may call addAsyncCleanup and add more cleanup callbacks which can - // run below. + // Shut down RPC system first, since this will garbage collect any + // ProxyServer objects that were not freed before the connection was closed. + // Typically all ProxyServer objects associated with this connection will be + // freed before this call returns. However that will not be the case if + // there are asynchronous IPC calls over this connection still currently + // executing. In that case, Cap'n Proto will destroy the ProxyServer objects + // after the calls finish. m_rpc_system.reset(); // ProxyClient cleanup handlers are in sync list, and ProxyServer cleanup @@ -137,13 +140,6 @@ Connection::~Connection() m_sync_cleanup_fns.front()(); m_sync_cleanup_fns.pop_front(); } - while (!m_async_cleanup_fns.empty()) { - const Lock lock(m_loop->m_mutex); - m_loop->m_async_fns->emplace_back(std::move(m_async_cleanup_fns.front())); - m_async_cleanup_fns.pop_front(); - } - Lock lock(m_loop->m_mutex); - m_loop->startAsyncThread(); } CleanupIt Connection::addSyncCleanup(std::function fn) @@ -166,9 +162,9 @@ void Connection::removeSyncCleanup(CleanupIt it) m_sync_cleanup_fns.erase(it); } -void Connection::addAsyncCleanup(std::function fn) +void EventLoop::addAsyncCleanup(std::function fn) { - const Lock lock(m_loop->m_mutex); + const Lock lock(m_mutex); // Add async cleanup callbacks to the back of the list. Unlike the sync // cleanup list, this list order is more significant because it determines // the order server objects are destroyed when there is a sudden disconnect, @@ -185,7 +181,8 @@ void Connection::addAsyncCleanup(std::function fn) // process, otherwise shared pointer counts of the CWallet objects (which // inherit from Chain::Notification) will not be 1 when WalletLoader // destructor runs and it will wait forever for them to be released. - m_async_cleanup_fns.emplace(m_async_cleanup_fns.end(), std::move(fn)); + m_async_fns->emplace_back(std::move(fn)); + startAsyncThread(); } EventLoop::EventLoop(const char* exe_name, LogFn log_fn, void* context) From 84cf56a0b5f496b02e761dc8fb243cd6a69b3888 Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Wed, 11 Jun 2025 15:15:52 -0400 Subject: [PATCH 13/13] test: Test disconnects during IPC calls This change adds two unit tests to make sure shutdown happens cleanly if the the client disconnects while the server is processing an IPC call. The first test checks a synchronous case which has always worked, where the disconnect starts but is not processed by the event loop until after the IPC call completes. The second test checks an asynchronous case which was fixed by the previous commit, where the disconnect is processed before the IPC call generates a response. This test would usually crash or hang prior to the fix in the previous commit. It would crash reliably with address sanitizer. --- test/mp/test/foo.capnp | 2 ++ test/mp/test/foo.h | 4 +++ test/mp/test/test.cpp | 66 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+) diff --git a/test/mp/test/foo.capnp b/test/mp/test/foo.capnp index 4a8af0bb..75e4617d 100644 --- a/test/mp/test/foo.capnp +++ b/test/mp/test/foo.capnp @@ -29,6 +29,8 @@ interface FooInterface $Proxy.wrap("mp::test::FooImplementation") { passMutable @14 (arg :FooMutable) -> (arg :FooMutable); passEnum @15 (arg :Int32) -> (result :Int32); passFn @16 (context :Proxy.Context, fn :FooFn) -> (result :Int32); + callFn @17 () -> (); + callFnAsync @18 (context :Proxy.Context) -> (); } interface FooCallback $Proxy.wrap("mp::test::FooCallback") { diff --git a/test/mp/test/foo.h b/test/mp/test/foo.h index fa4ae610..70bf4ff1 100644 --- a/test/mp/test/foo.h +++ b/test/mp/test/foo.h @@ -5,6 +5,7 @@ #ifndef MP_TEST_FOO_H #define MP_TEST_FOO_H +#include #include #include #include @@ -78,6 +79,9 @@ class FooImplementation FooEnum passEnum(FooEnum foo) { return foo; } int passFn(std::function fn) { return fn(); } std::shared_ptr m_callback; + void callFn() { assert(m_fn); m_fn(); } + void callFnAsync() { assert(m_fn); m_fn(); } + std::function m_fn; }; } // namespace test diff --git a/test/mp/test/test.cpp b/test/mp/test/test.cpp index 46bd4e93..37201a9a 100644 --- a/test/mp/test/test.cpp +++ b/test/mp/test/test.cpp @@ -45,6 +45,7 @@ class TestSetup std::function client_disconnect; std::promise>> client_promise; std::unique_ptr> client; + ProxyServer* server{nullptr}; TestSetup(bool client_owns_connection = true) : thread{[&] { @@ -58,6 +59,7 @@ class TestSetup std::make_unique(loop, kj::mv(pipe.ends[0]), [&](Connection& connection) { auto server_proxy = kj::heap>( std::make_shared(), connection); + server = server_proxy; return capnp::Capability::Client(kj::mv(server_proxy)); }); server_disconnect = [&] { loop.sync([&] { server_connection.reset(); }); }; @@ -215,5 +217,69 @@ KJ_TEST("Calling IPC method after server connection is closed") KJ_EXPECT(disconnected); } +KJ_TEST("Calling IPC method and disconnecting during the call") +{ + TestSetup setup{/*client_owns_connection=*/false}; + ProxyClient* foo = setup.client.get(); + KJ_EXPECT(foo->add(1, 2) == 3); + + // Set m_fn to initiate client disconnect when server is in the middle of + // handling the callFn call to make sure this case is handled cleanly. + setup.server->m_impl->m_fn = setup.client_disconnect; + + bool disconnected{false}; + try { + foo->callFn(); + } catch (const std::runtime_error& e) { + KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect."); + disconnected = true; + } + KJ_EXPECT(disconnected); +} + +KJ_TEST("Calling IPC method, disconnecting and blocking during the call") +{ + // This test is similar to last test, except that instead of letting the IPC + // call return immediately after triggering a disconnect, make it disconnect + // & wait so server is forced to deal with having a disconnection and call + // in flight at the same time. + // + // Test uses callFnAsync() instead of callFn() to implement this. Both of + // these methods have the same implementation, but the callFnAsync() capnp + // method declaration takes an mp.Context argument so the method executes on + // an asynchronous thread instead of executing in the event loop thread, so + // it is able to block without deadlocking the event lock thread. + // + // This test adds important coverage because it causes the server Connection + // object to be destroyed before ProxyServer object, which is not a + // condition that usually happens because the m_rpc_system.reset() call in + // the ~Connection destructor usually would immediately free all remaing + // ProxyServer objects associated with the connection. Having an in-progress + // RPC call requires keeping the ProxyServer longer. + + TestSetup setup{/*client_owns_connection=*/false}; + ProxyClient* foo = setup.client.get(); + KJ_EXPECT(foo->add(1, 2) == 3); + + foo->initThreadMap(); + std::promise signal; + setup.server->m_impl->m_fn = [&] { + EventLoopRef loop{*setup.server->m_context.loop}; + setup.client_disconnect(); + signal.get_future().get(); + }; + + bool disconnected{false}; + try { + foo->callFnAsync(); + } catch (const std::runtime_error& e) { + KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect."); + disconnected = true; + } + KJ_EXPECT(disconnected); + + signal.set_value(); +} + } // namespace test } // namespace mp