Skip to content

Commit 79c09e4

Browse files
committed
pr/threadclean initial attempt to fix
fix posted Sjors/bitcoin#90 (comment) CI error attempting to fix (does not work because nothing prevents ~ProxyClient<Thread> object from being destoryed during m_disconnect_cb call if m_disconnect_cb is interrupted before successfully acquires Waiter::m_mutex, and mutex coudl be deleted while it is waiting or before it starts waiting): WARNING: ThreadSanitizer: data race (pid=8333) Write of size 8 at 0x721000003e90 by thread T11 (mutexes: write M0, write M1): #0 operator delete(void*, unsigned long) <null> (mptest+0x12b1dc) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #1 void std::__1::__libcpp_operator_delete[abi:ne200100]<std::__1::__list_node<std::__1::function<void ()>, void*>*, unsigned long>(std::__1::__list_node<std::__1::function<void ()>, void*>*, unsigned long) /usr/lib/llvm-20/bin/../include/c++/v1/__new/allocate.h:46:3 (mptest+0x235783) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #2 void std::__1::__libcpp_deallocate[abi:ne200100]<std::__1::__list_node<std::__1::function<void ()>, void*>>(std::__1::__type_identity<std::__1::__list_node<std::__1::function<void ()>, void*>>::type*, std::__1::__element_count, unsigned long) /usr/lib/llvm-20/bin/../include/c++/v1/__new/allocate.h:86:12 (mptest+0x235783) #3 std::__1::allocator<std::__1::__list_node<std::__1::function<void ()>, void*>>::deallocate[abi:ne200100](std::__1::__list_node<std::__1::function<void ()>, void*>*, unsigned long) /usr/lib/llvm-20/bin/../include/c++/v1/__memory/allocator.h:120:7 (mptest+0x235783) #4 std::__1::allocator_traits<std::__1::allocator<std::__1::__list_node<std::__1::function<void ()>, void*>>>::deallocate[abi:ne200100](std::__1::allocator<std::__1::__list_node<std::__1::function<void ()>, void*>>&, std::__1::__list_node<std::__1::function<void ()>, void*>*, unsigned long) /usr/lib/llvm-20/bin/../include/c++/v1/__memory/allocator_traits.h:302:9 (mptest+0x235783) #5 std::__1::__list_imp<std::__1::function<void ()>, std::__1::allocator<std::__1::function<void ()>>>::__delete_node[abi:ne200100](std::__1::__list_node<std::__1::function<void ()>, void*>*) /usr/lib/llvm-20/bin/../include/c++/v1/list:571:5 (mptest+0x235783) #6 std::__1::list<std::__1::function<void ()>, std::__1::allocator<std::__1::function<void ()>>>::erase(std::__1::__list_const_iterator<std::__1::function<void ()>, void*>) /usr/lib/llvm-20/bin/../include/c++/v1/list:1342:9 (mptest+0x235783) #7 mp::Connection::removeSyncCleanup(std::__1::__list_iterator<std::__1::function<void ()>, void*>) /ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/./ipc/libmultiprocess/src/mp/proxy.cpp:160:24 (mptest+0x235783) 157 void Connection::removeSyncCleanup(CleanupIt it) 158 { 159 const Lock lock(m_loop->m_mutex); 160 m_sync_cleanup_fns.erase(it); 161 } #8 mp::ProxyClient<mp::Thread>::~ProxyClient() /ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/./ipc/libmultiprocess/src/mp/proxy.cpp:339:31 (mptest+0x235783) 333 ProxyClient<Thread>::~ProxyClient() 334 { 335 // If thread is being destroyed before connection is destroyed, remove the 336 // cleanup callback that was registered to handle the connection being 337 // destroyed before the thread being destroyed. 338 if (m_disconnect_cb) { 339 m_context.connection->removeSyncCleanup(*m_disconnect_cb); 340 } 341 } #9 std::__1::pair<mp::Connection* const, mp::ProxyClient<mp::Thread>>::~pair() /usr/lib/llvm-20/bin/../include/c++/v1/__utility/pair.h:63:29 (mptest+0x1d2e8d) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #10 void std::__1::__destroy_at[abi:ne200100]<std::__1::pair<mp::Connection* const, mp::ProxyClient<mp::Thread>>, 0>(std::__1::pair<mp::Connection* const, mp::ProxyClient<mp::Thread>>*) /usr/lib/llvm-20/bin/../include/c++/v1/__memory/construct_at.h:66:11 (mptest+0x1d2e8d) #11 void std::__1::allocator_traits<std::__1::allocator<std::__1::__tree_node<std::__1::__value_type<mp::Connection*, mp::ProxyClient<mp::Thread>>, void*>>>::destroy[abi:ne200100]<std::__1::pair<mp::Connection* const, mp::ProxyClient<mp::Thread>>, void, 0>(std::__1::allocator<std::__1::__tree_node<std::__1::__value_type<mp::Connection*, mp::ProxyClient<mp::Thread>>, void*>>&, std::__1::pair<mp::Connection* const, mp::ProxyClient<mp::Thread>>*) /usr/lib/llvm-20/bin/../include/c++/v1/__memory/allocator_traits.h:329:5 (mptest+0x1d2e8d) #12 std::__1::__tree<std::__1::__value_type<mp::Connection*, mp::ProxyClient<mp::Thread>>, std::__1::__map_value_compare<mp::Connection*, std::__1::__value_type<mp::Connection*, mp::ProxyClient<mp::Thread>>, std::__1::less<mp::Connection*>, true>, std::__1::allocator<std::__1::__value_type<mp::Connection*, mp::ProxyClient<mp::Thread>>>>::erase(std::__1::__tree_const_iterator<std::__1::__value_type<mp::Connection*, mp::ProxyClient<mp::Thread>>, std::__1::__tree_node<std::__1::__value_type<mp::Connection*, mp::ProxyClient<mp::Thread>>, void*>*, long>) /usr/lib/llvm-20/bin/../include/c++/v1/__tree:2047:3 (mptest+0x1d2e8d) #13 unsigned long std::__1::__tree<std::__1::__value_type<mp::Connection*, mp::ProxyClient<mp::Thread>>, std::__1::__map_value_compare<mp::Connection*, std::__1::__value_type<mp::Connection*, mp::ProxyClient<mp::Thread>>, std::__1::less<mp::Connection*>, true>, std::__1::allocator<std::__1::__value_type<mp::Connection*, mp::ProxyClient<mp::Thread>>>>::__erase_unique<mp::Connection*>(mp::Connection* const&) /usr/lib/llvm-20/bin/../include/c++/v1/__tree:2067:3 (mptest+0x1d2e8d) #14 std::__1::map<mp::Connection*, mp::ProxyClient<mp::Thread>, std::__1::less<mp::Connection*>, std::__1::allocator<std::__1::pair<mp::Connection* const, mp::ProxyClient<mp::Thread>>>>::erase[abi:ne200100](mp::Connection* const&) /usr/lib/llvm-20/bin/../include/c++/v1/map:1320:79 (mptest+0x205841) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #15 std::__1::enable_if<std::is_same<decltype(mp::Accessor<mp::foo_fields::Context, 17>::get(fp1.call_context.getParams())), mp::Context::Reader>::value, kj::Promise<mp::ServerInvokeContext<mp::ProxyServer<mp::test::messages::FooInterface>, capnp::CallContext<mp::test::messages::FooInterface::CallFnAsyncParams, mp::test::messages::FooInterface::CallFnAsyncResults>>::CallContext>>::type mp::PassField<mp::Accessor<mp::foo_fields::Context, 17>, mp::ServerInvokeContext<mp::ProxyServer<mp::test::messages::FooInterface>, capnp::CallContext<mp::test::messages::FooInterface::CallFnAsyncParams, mp::test::messages::FooInterface::CallFnAsyncResults>>, mp::ServerCall, mp::TypeList<>>(mp::Priority<1>, mp::TypeList<>, mp::ServerInvokeContext<mp::ProxyServer<mp::test::messages::FooInterface>, capnp::CallContext<mp::test::messages::FooInterface::CallFnAsyncParams, mp::test::messages::FooInterface::CallFnAsyncResults>>&, mp::ServerCall const&, mp::TypeList<>&&)::'lambda'()::operator()()::'lambda0'()::operator()() const /ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/test/./ipc/libmultiprocess/include/mp/type-context.h:103:21 (mptest+0x205841) 103 KJ_DEFER(if (erase_thread) { 104 std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex); 105 // Call erase here with a Connection* argument instead 106 // of an iterator argument, because the `request_thread` 107 // iterator may be invalid if the connection is closed 108 // during this function call. More specifically, the 109 // iterator may be invalid because SetThread adds a 110 // cleanup callback to the Connection destructor that 111 // erases the thread from the map, and also because the 112 // ProxyServer<Thread> destructor calls 113 // request_threads.clear(). 114 request_threads.erase(server.m_context.connection); 115 }); #16 kj::_::Deferred<std::__1::enable_if<std::is_same<decltype(mp::Accessor<mp::foo_fields::Context, 17>::get(fp1.call_context.getParams())), mp::Context::Reader>::value, kj::Promise<mp::ServerInvokeContext<mp::ProxyServer<mp::test::messages::FooInterface>, capnp::CallContext<mp::test::messages::FooInterface::CallFnAsyncParams, mp::test::messages::FooInterface::CallFnAsyncResults>>::CallContext>>::type mp::PassField<mp::Accessor<mp::foo_fields::Context, 17>, mp::ServerInvokeContext<mp::ProxyServer<mp::test::messages::FooInterface>, capnp::CallContext<mp::test::messages::FooInterface::CallFnAsyncParams, mp::test::messages::FooInterface::CallFnAsyncResults>>, mp::ServerCall, mp::TypeList<>>(mp::Priority<1>, mp::TypeList<>, mp::ServerInvokeContext<mp::ProxyServer<mp::test::messages::FooInterface>, capnp::CallContext<mp::test::messages::FooInterface::CallFnAsyncParams, mp::test::messages::FooInterface::CallFnAsyncResults>>&, mp::ServerCall const&, mp::TypeList<>&&)::'lambda'()::operator()()::'lambda0'()>::run() /ci_container_base/depends/x86_64-pc-linux-gnu/include/kj/common.h:2010:7 (mptest+0x205841) #17 kj::_::Deferred<std::__1::enable_if<std::is_same<decltype(mp::Accessor<mp::foo_fields::Context, 17>::get(fp1.call_context.getParams())), mp::Context::Reader>::value, kj::Promise<mp::ServerInvokeContext<mp::ProxyServer<mp::test::messages::FooInterface>, capnp::CallContext<mp::test::messages::FooInterface::CallFnAsyncParams, mp::test::messages::FooInterface::CallFnAsyncResults>>::CallContext>>::type mp::PassField<mp::Accessor<mp::foo_fields::Context, 17>, mp::ServerInvokeContext<mp::ProxyServer<mp::test::messages::FooInterface>, capnp::CallContext<mp::test::messages::FooInterface::CallFnAsyncParams, mp::test::messages::FooInterface::CallFnAsyncResults>>, mp::ServerCall, mp::TypeList<>>(mp::Priority<1>, mp::TypeList<>, mp::ServerInvokeContext<mp::ProxyServer<mp::test::messages::FooInterface>, capnp::CallContext<mp::test::messages::FooInterface::CallFnAsyncParams, mp::test::messages::FooInterface::CallFnAsyncResults>>&, mp::ServerCall const&, mp::TypeList<>&&)::'lambda'()::operator()()::'lambda0'()>::~Deferred() /ci_container_base/depends/x86_64-pc-linux-gnu/include/kj/common.h:1999:5 (mptest+0x205841) #18 std::__1::enable_if<std::is_same<decltype(mp::Accessor<mp::foo_fields::Context, 17>::get(fp1.call_context.getParams())), mp::Context::Reader>::value, kj::Promise<mp::ServerInvokeContext<mp::ProxyServer<mp::test::messages::FooInterface>, capnp::CallContext<mp::test::messages::FooInterface::CallFnAsyncParams, mp::test::messages::FooInterface::CallFnAsyncResults>>::CallContext>>::type mp::PassField<mp::Accessor<mp::foo_fields::Context, 17>, mp::ServerInvokeContext<mp::ProxyServer<mp::test::messages::FooInterface>, capnp::CallContext<mp::test::messages::FooInterface::CallFnAsyncParams, mp::test::messages::FooInterface::CallFnAsyncResults>>, mp::ServerCall, mp::TypeList<>>(mp::Priority<1>, mp::TypeList<>, mp::ServerInvokeContext<mp::ProxyServer<mp::test::messages::FooInterface>, capnp::CallContext<mp::test::messages::FooInterface::CallFnAsyncParams, mp::test::messages::FooInterface::CallFnAsyncResults>>&, mp::ServerCall const&, mp::TypeList<>&&)::'lambda'()::operator()() /ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/test/./ipc/libmultiprocess/include/mp/type-context.h:117:17 (mptest+0x205841) #19 kj::Function<void ()>::Impl<std::__1::enable_if<std::is_same<decltype(mp::Accessor<mp::foo_fields::Context, 17>::get(fp1.call_context.getParams())), mp::Context::Reader>::value, kj::Promise<mp::ServerInvokeContext<mp::ProxyServer<mp::test::messages::FooInterface>, capnp::CallContext<mp::test::messages::FooInterface::CallFnAsyncParams, mp::test::messages::FooInterface::CallFnAsyncResults>>::CallContext>>::type mp::PassField<mp::Accessor<mp::foo_fields::Context, 17>, mp::ServerInvokeContext<mp::ProxyServer<mp::test::messages::FooInterface>, capnp::CallContext<mp::test::messages::FooInterface::CallFnAsyncParams, mp::test::messages::FooInterface::CallFnAsyncResults>>, mp::ServerCall, mp::TypeList<>>(mp::Priority<1>, mp::TypeList<>, mp::ServerInvokeContext<mp::ProxyServer<mp::test::messages::FooInterface>, capnp::CallContext<mp::test::messages::FooInterface::CallFnAsyncParams, mp::test::messages::FooInterface::CallFnAsyncResults>>&, mp::ServerCall const&, mp::TypeList<>&&)::'lambda'()>::operator()() /ci_container_base/depends/x86_64-pc-linux-gnu/include/kj/function.h:142:14 (mptest+0x20552f) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #20 kj::Function<void ()>::operator()() /ci_container_base/depends/x86_64-pc-linux-gnu/include/kj/function.h:119:12 (mptest+0x154867) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #21 void mp::Unlock<std::__1::unique_lock<std::__1::mutex>, kj::Function<void ()>&>(std::__1::unique_lock<std::__1::mutex>&, kj::Function<void ()>&) /ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/test/./ipc/libmultiprocess/include/mp/util.h:198:5 (mptest+0x154867) 194 template <typename Lock, typename Callback> 195 void Unlock(Lock& lock, Callback&& callback) 196 { 197 const UnlockGuard<Lock> unlock(lock); 198 callback(); 199 } #22 void mp::Waiter::wait<mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0::operator()() const::'lambda'()>(std::__1::unique_lock<std::__1::mutex>&, mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0::operator()() const::'lambda'())::'lambda'()::operator()() const /ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/./ipc/libmultiprocess/include/mp/proxy-io.h:296:17 (mptest+0x237d69) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) 284 template <class Predicate> 285 void wait(std::unique_lock<std::mutex>& lock, Predicate pred) 286 { 287 m_cv.wait(lock, [&] { 288 // Important for this to be "while (m_fn)", not "if (m_fn)" to avoid 289 // a lost-wakeup bug. A new m_fn and m_cv notification might be sent 290 // after the fn() call and before the lock.lock() call in this loop 291 // in the case where a capnp response is sent and a brand new 292 // request is immediately received. 293 while (m_fn) { 294 auto fn = std::move(*m_fn); 295 m_fn.reset(); 296 Unlock(lock, fn); 297 } 298 const bool done = pred(); 299 return done; 300 }); 301 } #23 void std::__1::condition_variable::wait<void mp::Waiter::wait<mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0::operator()() const::'lambda'()>(std::__1::unique_lock<std::__1::mutex>&, mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0::operator()() const::'lambda'())::'lambda'()>(std::__1::unique_lock<std::__1::mutex>&, mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0::operator()() const::'lambda'()) /usr/lib/llvm-20/bin/../include/c++/v1/__condition_variable/condition_variable.h:146:11 (mptest+0x237d69) #24 void mp::Waiter::wait<mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0::operator()() const::'lambda'()>(std::__1::unique_lock<std::__1::mutex>&, mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0::operator()() const::'lambda'()) /ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/./ipc/libmultiprocess/include/mp/proxy-io.h:287:14 (mptest+0x237d69) #25 mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0::operator()() const /ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/./ipc/libmultiprocess/src/mp/proxy.cpp:404:34 (mptest+0x237d69) 404 g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; }); #26 decltype(std::declval<mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0>()()) std::__1::__invoke[abi:ne200100]<mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0>(mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0&&) /usr/lib/llvm-20/bin/../include/c++/v1/__type_traits/invoke.h:179:25 (mptest+0x237d69) #27 void std::__1::__thread_execute[abi:ne200100]<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0>(std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0>&, std::__1::__tuple_indices<...>) /usr/lib/llvm-20/bin/../include/c++/v1/__thread/thread.h:199:3 (mptest+0x237d69) #28 void* std::__1::__thread_proxy[abi:ne200100]<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0>>(void*) /usr/lib/llvm-20/bin/../include/c++/v1/__thread/thread.h:208:3 (mptest+0x237d69) Previous read of size 8 at 0x721000003e90 by thread T10: #0 std::__1::__function::__value_func<void ()>::operator()[abi:ne200100]() const /usr/lib/llvm-20/bin/../include/c++/v1/__functional/function.h:436:12 (mptest+0x238638) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #1 std::__1::function<void ()>::operator()() const /usr/lib/llvm-20/bin/../include/c++/v1/__functional/function.h:995:10 (mptest+0x238638) #2 void mp::Unlock<mp::Lock, std::__1::function<void ()>&>(mp::Lock&, std::__1::function<void ()>&) /ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/./ipc/libmultiprocess/include/mp/util.h:198:5 (mptest+0x238638) 194 template <typename Lock, typename Callback> 195 void Unlock(Lock& lock, Callback&& callback) 196 { 197 const UnlockGuard<Lock> unlock(lock); 198 callback(); 199 } #3 mp::Connection::~Connection() /ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/./ipc/libmultiprocess/src/mp/proxy.cpp:139:9 (mptest+0x232bcb) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) 135 Lock lock{m_loop->m_mutex}; 136 while (!m_sync_cleanup_fns.empty()) { 137 CleanupList fn; 138 fn.splice(fn.begin(), m_sync_cleanup_fns, m_sync_cleanup_fns.begin()); 139 Unlock(lock, fn.front()); 140 } #4 std::__1::default_delete<mp::Connection>::operator()[abi:ne200100](mp::Connection*) const /usr/lib/llvm-20/bin/../include/c++/v1/__memory/unique_ptr.h:78:5 (mptest+0x13742b) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #5 std::__1::unique_ptr<mp::Connection, std::__1::default_delete<mp::Connection>>::reset[abi:ne200100](mp::Connection*) /usr/lib/llvm-20/bin/../include/c++/v1/__memory/unique_ptr.h:300:7 (mptest+0x13742b) #6 mp::test::TestSetup::TestSetup(bool)::'lambda'()::operator()() const::'lambda0'()::operator()() const /ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/test/./ipc/libmultiprocess/test/mp/test/test.cpp:79:71 (mptest+0x13742b) 79 server_connection->onDisconnect([&] { server_connection.reset(); }); #7 kj::_::Void kj::_::MaybeVoidCaller<kj::_::Void, kj::_::Void>::apply<mp::test::TestSetup::TestSetup(bool)::'lambda'()::operator()() const::'lambda0'()>(mp::test::TestSetup::TestSetup(bool)::'lambda'()::operator()() const::'lambda0'()&, kj::_::Void&&) /ci_container_base/depends/x86_64-pc-linux-gnu/include/kj/async-prelude.h:195:5 (mptest+0x13742b) #8 kj::_::TransformPromiseNode<kj::_::Void, kj::_::Void, mp::test::TestSetup::TestSetup(bool)::'lambda'()::operator()() const::'lambda0'(), kj::_::PropagateException>::getImpl(kj::_::ExceptionOrValue&) /ci_container_base/depends/x86_64-pc-linux-gnu/include/kj/async-inl.h:739:31 (mptest+0x13742b) #9 kj::_::TransformPromiseNodeBase::get(kj::_::ExceptionOrValue&) <null> (mptest+0x2df64c) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #10 mp::EventLoop::loop() /ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/./ipc/libmultiprocess/src/mp/proxy.cpp:231:68 (mptest+0x234699) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) 231 const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope); #11 mp::test::TestSetup::TestSetup(bool)::'lambda'()::operator()() const /ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/test/./ipc/libmultiprocess/test/mp/test/test.cpp:92:20 (mptest+0x133819) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #12 decltype(std::declval<mp::test::TestSetup::TestSetup(bool)::'lambda'()>()()) std::__1::__invoke[abi:ne200100]<mp::test::TestSetup::TestSetup(bool)::'lambda'()>(mp::test::TestSetup::TestSetup(bool)::'lambda'()&&) /usr/lib/llvm-20/bin/../include/c++/v1/__type_traits/invoke.h:179:25 (mptest+0x133119) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #13 void std::__1::__thread_execute[abi:ne200100]<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, mp::test::TestSetup::TestSetup(bool)::'lambda'()>(std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, mp::test::TestSetup::TestSetup(bool)::'lambda'()>&, std::__1::__tuple_indices<...>) /usr/lib/llvm-20/bin/../include/c++/v1/__thread/thread.h:199:3 (mptest+0x133119) #14 void* std::__1::__thread_proxy[abi:ne200100]<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, mp::test::TestSetup::TestSetup(bool)::'lambda'()>>(void*) /usr/lib/llvm-20/bin/../include/c++/v1/__thread/thread.h:208:3 (mptest+0x133119) Mutex M0 (0x721c00003790) created at: #0 pthread_mutex_lock <null> (mptest+0xa844b) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #1 std::__1::mutex::lock() <null> (libc++.so.1.0.20+0x713cc) (BuildId: 30ef7da36db6fb0c014ee96603f7649f755cb793) Mutex M1 (0x7fbbe45fd4e0) created at: #0 pthread_mutex_lock <null> (mptest+0xa844b) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #1 std::__1::mutex::lock() <null> (libc++.so.1.0.20+0x713cc) (BuildId: 30ef7da36db6fb0c014ee96603f7649f755cb793) #2 mp::Connection::Connection(mp::EventLoop&, kj::Own<kj::AsyncIoStream, std::nullptr_t>&&, std::__1::function<capnp::Capability::Client (mp::Connection&)> const&) /ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/test/./ipc/libmultiprocess/include/mp/proxy-io.h:330:11 (mptest+0x134cfb) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #3 std::__1::unique_ptr<mp::Connection, std::__1::default_delete<mp::Connection>> std::__1::make_unique[abi:ne200100]<mp::Connection, mp::EventLoop&, kj::Own<kj::AsyncIoStream, std::nullptr_t>, mp::test::TestSetup::TestSetup(bool)::'lambda'()::operator()() const::'lambda'(mp::Connection&), 0>(mp::EventLoop&, kj::Own<kj::AsyncIoStream, std::nullptr_t>&&, mp::test::TestSetup::TestSetup(bool)::'lambda'()::operator()() const::'lambda'(mp::Connection&)&&) /usr/lib/llvm-20/bin/../include/c++/v1/__memory/unique_ptr.h:767:30 (mptest+0x1333f9) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #4 mp::test::TestSetup::TestSetup(bool)::'lambda'()::operator()() const /ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/test/./ipc/libmultiprocess/test/mp/test/test.cpp:70:19 (mptest+0x1333f9) #5 decltype(std::declval<mp::test::TestSetup::TestSetup(bool)::'lambda'()>()()) std::__1::__invoke[abi:ne200100]<mp::test::TestSetup::TestSetup(bool)::'lambda'()>(mp::test::TestSetup::TestSetup(bool)::'lambda'()&&) /usr/lib/llvm-20/bin/../include/c++/v1/__type_traits/invoke.h:179:25 (mptest+0x133119) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #6 void std::__1::__thread_execute[abi:ne200100]<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, mp::test::TestSetup::TestSetup(bool)::'lambda'()>(std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, mp::test::TestSetup::TestSetup(bool)::'lambda'()>&, std::__1::__tuple_indices<...>) /usr/lib/llvm-20/bin/../include/c++/v1/__thread/thread.h:199:3 (mptest+0x133119) #7 void* std::__1::__thread_proxy[abi:ne200100]<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, mp::test::TestSetup::TestSetup(bool)::'lambda'()>>(void*) /usr/lib/llvm-20/bin/../include/c++/v1/__thread/thread.h:208:3 (mptest+0x133119) Thread T11 (tid=8378, running) created by thread T10 at: #0 pthread_create <null> (mptest+0xa673e) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #1 std::__1::__libcpp_thread_create[abi:ne200100](unsigned long*, void* (*)(void*), void*) /usr/lib/llvm-20/bin/../include/c++/v1/__thread/support/pthread.h:182:10 (mptest+0x236338) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #2 std::__1::thread::thread<mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0, 0>(mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0&&) /usr/lib/llvm-20/bin/../include/c++/v1/__thread/thread.h:218:14 (mptest+0x236338) #3 mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>) /ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/./ipc/libmultiprocess/src/mp/proxy.cpp:397:17 (mptest+0x236338) #4 mp::ThreadMap::Server::dispatchCallInternal(unsigned short, capnp::CallContext<capnp::AnyPointer, capnp::AnyPointer>) /ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/include/mp/proxy.capnp.c++:602:9 (mptest+0x231af8) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #5 mp::ThreadMap::Server::dispatchCall(unsigned long, unsigned short, capnp::CallContext<capnp::AnyPointer, capnp::AnyPointer>) /ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/include/mp/proxy.capnp.c++:591:14 (mptest+0x231af8) #6 virtual thunk to mp::ThreadMap::Server::dispatchCall(unsigned long, unsigned short, capnp::CallContext<capnp::AnyPointer, capnp::AnyPointer>) /ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/include/mp/proxy.capnp.c++ (mptest+0x231af8) #7 capnp::LocalClient::callInternal(unsigned long, unsigned short, capnp::CallContextHook&) <null> (mptest+0x25031c) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #8 mp::EventLoop::loop() /ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/./ipc/libmultiprocess/src/mp/proxy.cpp:231:68 (mptest+0x234699) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #9 mp::test::TestSetup::TestSetup(bool)::'lambda'()::operator()() const /ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/test/./ipc/libmultiprocess/test/mp/test/test.cpp:92:20 (mptest+0x133819) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #10 decltype(std::declval<mp::test::TestSetup::TestSetup(bool)::'lambda'()>()()) std::__1::__invoke[abi:ne200100]<mp::test::TestSetup::TestSetup(bool)::'lambda'()>(mp::test::TestSetup::TestSetup(bool)::'lambda'()&&) /usr/lib/llvm-20/bin/../include/c++/v1/__type_traits/invoke.h:179:25 (mptest+0x133119) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #11 void std::__1::__thread_execute[abi:ne200100]<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, mp::test::TestSetup::TestSetup(bool)::'lambda'()>(std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, mp::test::TestSetup::TestSetup(bool)::'lambda'()>&, std::__1::__tuple_indices<...>) /usr/lib/llvm-20/bin/../include/c++/v1/__thread/thread.h:199:3 (mptest+0x133119) #12 void* std::__1::__thread_proxy[abi:ne200100]<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, mp::test::TestSetup::TestSetup(bool)::'lambda'()>>(void*) /usr/lib/llvm-20/bin/../include/c++/v1/__thread/thread.h:208:3 (mptest+0x133119) Thread T10 (tid=8377, running) created by main thread at: #0 pthread_create <null> (mptest+0xa673e) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #1 std::__1::__libcpp_thread_create[abi:ne200100](unsigned long*, void* (*)(void*), void*) /usr/lib/llvm-20/bin/../include/c++/v1/__thread/support/pthread.h:182:10 (mptest+0x132c30) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #2 std::__1::thread::thread<mp::test::TestSetup::TestSetup(bool)::'lambda'(), 0>(mp::test::TestSetup::TestSetup(bool)::'lambda'()&&) /usr/lib/llvm-20/bin/../include/c++/v1/__thread/thread.h:218:14 (mptest+0x132c30) #3 mp::test::TestSetup::TestSetup(bool) /ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/test/./ipc/libmultiprocess/test/mp/test/test.cpp:62:11 (mptest+0x12f8dd) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #4 mp::test::TestCase251::run() /ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/test/./ipc/libmultiprocess/test/mp/test/test.cpp:272:15 (mptest+0x12e95e) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) #5 kj::Maybe<kj::Exception> kj::runCatchingExceptions<kj::TestRunner::run()::'lambda'()>(kj::TestRunner::run()::'lambda'()&&) <null> (mptest+0x23e7c0) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) SUMMARY: ThreadSanitizer: data race (/ci_container_base/ci/scratch/build-x86_64-pc-linux-gnu/src/ipc/libmultiprocess/test/mptest+0x12b1dc) (BuildId: bcd28b84fed9c9590c0eb7740b07101619532417) in operator delete(void*, unsigned long)
1 parent a11e690 commit 79c09e4

File tree

3 files changed

+30
-7
lines changed

3 files changed

+30
-7
lines changed

include/mp/proxy-io.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
7878
//! Since this variable is accessed from multiple threads, accesses should
7979
//! be guarded with the associated Waiter::m_mutex.
8080
std::optional<CleanupIt> m_disconnect_cb;
81+
//! State shared with disconnect callback telling it if this ProxyClient is
82+
//! already destroyed and no longer should be accessed. This is also
83+
//! accessed from multiple threads and guarded with Waiter::m_mutex.
84+
std::shared_ptr<bool> m_destroyed{std::make_shared<bool>(false)};
8185
};
8286

8387
template <>
@@ -343,6 +347,7 @@ class Connection
343347
//! any new i/o.
344348
CleanupIt addSyncCleanup(std::function<void()> fn);
345349
void removeSyncCleanup(CleanupIt it);
350+
void removeSyncCleanup(CleanupIt it, const Lock& lock);
346351

347352
//! Add disconnect handler.
348353
template <typename F>

include/mp/util.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ class MP_SCOPED_CAPABILITY Lock {
173173
~Lock() MP_RELEASE() = default;
174174
void unlock() MP_RELEASE() { m_lock.unlock(); }
175175
void lock() MP_ACQUIRE() { m_lock.lock(); }
176-
void assert_locked(Mutex& mutex) MP_ASSERT_CAPABILITY() MP_ASSERT_CAPABILITY(mutex)
176+
void assert_locked(Mutex& mutex) const MP_ASSERT_CAPABILITY() MP_ASSERT_CAPABILITY(mutex)
177177
{
178178
assert(m_lock.mutex() == &mutex.m_mutex);
179179
assert(m_lock);

src/mp/proxy.cpp

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,14 @@ Connection::~Connection()
134134
// callbacks. In the clean shutdown case both lists will be empty.
135135
Lock lock{m_loop->m_mutex};
136136
while (!m_sync_cleanup_fns.empty()) {
137-
CleanupList fn;
138-
fn.splice(fn.begin(), m_sync_cleanup_fns, m_sync_cleanup_fns.begin());
139-
Unlock(lock, fn.front());
137+
// Call the first function in the connection cleanup list. Before
138+
// calling it, move it into a temporary variable so outside classes
139+
// which registered for disconnect callbacks can check if the
140+
// disconnection function is null and know if it's about to be called.
141+
auto it{m_sync_cleanup_fns.begin()};
142+
std::function<void()> fn = std::move(*it);
143+
Unlock(lock, fn);
144+
m_sync_cleanup_fns.erase(it);
140145
}
141146
}
142147

@@ -157,6 +162,12 @@ CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
157162
void Connection::removeSyncCleanup(CleanupIt it)
158163
{
159164
const Lock lock(m_loop->m_mutex);
165+
removeSyncCleanup(it, lock);
166+
}
167+
168+
void Connection::removeSyncCleanup(CleanupIt it, const Lock& lock)
169+
{
170+
lock.assert_locked(m_loop->m_mutex);
160171
m_sync_cleanup_fns.erase(it);
161172
}
162173

@@ -313,7 +324,7 @@ std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex,
313324
thread = threads.emplace(
314325
std::piecewise_construct, std::forward_as_tuple(connection),
315326
std::forward_as_tuple(make_thread(), connection, /* destroy_connection= */ false)).first;
316-
thread->second.setDisconnectCallback([&threads, &mutex, thread] {
327+
thread->second.setDisconnectCallback([&threads, &mutex, thread, destroyed = thread->second.m_destroyed] {
317328
// Note: it is safe to use the `thread` iterator in this cleanup
318329
// function, because the iterator would only be invalid if the map entry
319330
// was removed, and if the map entry is removed the ProxyClient<Thread>
@@ -324,6 +335,7 @@ std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex,
324335
// try to unregister this callback after connection is destroyed.
325336
// Remove connection pointer about to be destroyed from the map
326337
const std::unique_lock<std::mutex> lock(mutex);
338+
if (*destroyed) return;
327339
thread->second.m_disconnect_cb.reset();
328340
threads.erase(thread);
329341
});
@@ -332,12 +344,18 @@ std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex,
332344

333345
ProxyClient<Thread>::~ProxyClient()
334346
{
347+
// Waiter::m_mutex is already held here so it is safe to access
348+
// m_disconnect_cb EventLoop::mutex needs to be locked to access
349+
// **m_disconnect_cb, which Connection destructor will set to null before it
350+
// invokes the callback.
351+
const Lock lock(m_context.loop->m_mutex);
335352
// If thread is being destroyed before connection is destroyed, remove the
336353
// cleanup callback that was registered to handle the connection being
337354
// destroyed before the thread being destroyed.
338-
if (m_disconnect_cb) {
339-
m_context.connection->removeSyncCleanup(*m_disconnect_cb);
355+
if (m_disconnect_cb && **m_disconnect_cb) {
356+
m_context.connection->removeSyncCleanup(*m_disconnect_cb, lock);
340357
}
358+
*m_destroyed = true;
341359
}
342360

343361
void ProxyClient<Thread>::setDisconnectCallback(const std::function<void()>& fn)

0 commit comments

Comments
 (0)