Skip to content

[C++] Ensure ExecutorServicePtr is not destroyed while the timer object is alive#6226

Merged
merlimat merged 3 commits intoapache:masterfrom
merlimat:executor-ptr
Feb 7, 2020
Merged

[C++] Ensure ExecutorServicePtr is not destroyed while the timer object is alive#6226
merlimat merged 3 commits intoapache:masterfrom
merlimat:executor-ptr

Conversation

@merlimat
Copy link
Contributor

@merlimat merlimat commented Feb 5, 2020

Motivation

The ProducerStatsImpl and ConsumerStatsImpl objects are keeping a shared_ptr to a timer object. While that keeps the timer alive, it does not keep the required event loop object.

When the stats objects are destroyed, we cancel the timer, though that will use the already destroyed ExecutorService. In most cases this will not show any problem, though in others it will cause a segfault.

Valgrind output showing the issue:

==12549== Invalid read of size 4
==12549==    at 0x5966010: __pthread_mutex_unlock_full (pthread_mutex_unlock.c:100)
==12549==    by 0x5BFECD9: unlock (posix_mutex.hpp:58)
==12549==    by 0x5BFECD9: unlock (scoped_lock.hpp:72)
==12549==    by 0x5BFECD9: unsigned long boost::asio::detail::epoll_reactor::cancel_timer<boost::asio::time_traits<boost::posix_time::ptime> >(boost::asio::detail::timer_queue<boost::asio::time_traits<boost::posix_time::ptime> >&, boost::asio::detail::timer_queue<boost::asio::time_traits<boost::posix_time::ptime> >::per_timer_data&, unsigned long) (epoll_reactor.hpp:65)
==12549==    by 0x5D28808: cancel (deadline_timer_service.hpp:108)
==12549==    by 0x5D28808: cancel (deadline_timer_service.hpp:96)
==12549==    by 0x5D28808: cancel (basic_deadline_timer.hpp:216)
==12549==    by 0x5D28808: pulsar::ConsumerStatsImpl::~ConsumerStatsImpl() (ConsumerStatsImpl.cc:70)
==12549==    by 0x41F2F5: std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() (shared_ptr_base.h:150)
==12549==    by 0x5C50211: ~__shared_count (shared_ptr_base.h:659)
==12549==    by 0x5C50211: ~__shared_ptr (shared_ptr_base.h:925)
==12549==    by 0x5C50211: ~shared_ptr (shared_ptr.h:93)
==12549==    by 0x5C50211: pulsar::ConsumerImpl::~ConsumerImpl() (ConsumerImpl.cc:95)
==12549==    by 0x41F2F5: std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() (shared_ptr_base.h:150)
==12549==    by 0x4E39EC: ~__shared_count (shared_ptr_base.h:659)
==12549==    by 0x4E39EC: ~__shared_ptr (shared_ptr_base.h:925)
==12549==    by 0x4E39EC: ~shared_ptr (shared_ptr.h:93)
==12549==    by 0x4E39EC: BasicEndToEndTest_testReceiveAsyncFailedConsumer_Test::TestBody() (Consumer.h:35)
==12549==    by 0x541853: void testing::internal::HandleSehExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) (in /pulsar/pulsar-client-cpp/tests/main)
==12549==    by 0x53C484: void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) (in /pulsar/pulsar-client-cpp/tests/main)
==12549==    by 0x521911: testing::Test::Run() (in /pulsar/pulsar-client-cpp/tests/main)
==12549==    by 0x5221B9: testing::TestInfo::Run() (in /pulsar/pulsar-client-cpp/tests/main)
==12549==    by 0x5228A8: testing::TestCase::Run() (in /pulsar/pulsar-client-cpp/tests/main)
==12549==  Address 0x10837bb0 is 64 bytes inside a block of size 176 free'd
==12549==    at 0x4C2F24B: operator delete(void*) (in /usr/lib/valgrind/vgpreload_memcheck-amd64-linux.so)
==12549==    by 0x431509: boost::asio::detail::epoll_reactor::~epoll_reactor() (epoll_reactor.ipp:69)
==12549==    by 0x5CEF161: destroy (service_registry.ipp:101)
==12549==    by 0x5CEF161: ~service_registry (service_registry.ipp:45)
==12549==    by 0x5CEF161: ~io_service (io_service.ipp:53)
==12549==    by 0x5CEF161: pulsar::ExecutorService::~ExecutorService() (ExecutorService.cc:30)
==12549==    by 0x41F2F5: std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() (shared_ptr_base.h:150)
==12549==    by 0x5C4FD2A: ~__shared_count (shared_ptr_base.h:659)
==12549==    by 0x5C4FD2A: ~__shared_ptr (shared_ptr_base.h:925)
==12549==    by 0x5C4FD2A: ~shared_ptr (shared_ptr.h:93)
==12549==    by 0x5C4FD2A: ~NegativeAcksTracker (NegativeAcksTracker.h:32)
==12549==    by 0x5C4FD2A: pulsar::ConsumerImpl::~ConsumerImpl() (ConsumerImpl.cc:95)
==12549==    by 0x41F2F5: std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() (shared_ptr_base.h:150)
==12549==    by 0x4E39EC: ~__shared_count (shared_ptr_base.h:659)
==12549==    by 0x4E39EC: ~__shared_ptr (shared_ptr_base.h:925)
==12549==    by 0x4E39EC: ~shared_ptr (shared_ptr.h:93)
==12549==    by 0x4E39EC: BasicEndToEndTest_testReceiveAsyncFailedConsumer_Test::TestBody() (Consumer.h:35)
==12549==    by 0x541853: void testing::internal::HandleSehExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) (in /pulsar/pulsar-client-cpp/tests/main)
==12549==    by 0x53C484: void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) (in /pulsar/pulsar-client-cpp/tests/main)
==12549==    by 0x521911: testing::Test::Run() (in /pulsar/pulsar-client-cpp/tests/main)
==12549==    by 0x5221B9: testing::TestInfo::Run() (in /pulsar/pulsar-client-cpp/tests/main)
==12549==    by 0x5228A8: testing::TestCase::Run() (in /pulsar/pulsar-client-cpp/tests/main)
==12549==  Block was alloc'd at
==12549==    at 0x4C2E0EF: operator new(unsigned long) (in /usr/lib/valgrind/vgpreload_memcheck-amd64-linux.so)
==12549==    by 0x43763A: boost::asio::io_service::service* boost::asio::detail::service_registry::create<boost::asio::detail::epoll_reactor>(boost::asio::io_service&) (service_registry.hpp:81)
==12549==    by 0x432497: boost::asio::detail::service_registry::do_use_service(boost::asio::io_service::service::key const&, boost::asio::io_service::service* (*)(boost::asio::io_service&)) (service_registry.ipp:123)
==12549==    by 0x4325B6: use_service<boost::asio::detail::epoll_reactor> (service_registry.hpp:48)
==12549==    by 0x4325B6: use_service<boost::asio::detail::epoll_reactor> (io_service.hpp:33)
==12549==    by 0x4325B6: reactive_socket_service_base (reactive_socket_service_base.ipp:33)
==12549==    by 0x4325B6: reactive_socket_service (reactive_socket_service.hpp:77)
==12549==    by 0x4325B6: stream_socket_service (stream_socket_service.hpp:95)
==12549==    by 0x4325B6: boost::asio::io_service::service* boost::asio::detail::service_registry::create<boost::asio::stream_socket_service<boost::asio::ip::tcp> >(boost::asio::io_service&) (service_registry.hpp:81)
==12549==    by 0x5CF0E5D: do_use_service (service_registry.ipp:123)
==12549==    by 0x5CF0E5D: use_service<boost::asio::stream_socket_service<boost::asio::ip::tcp> > (service_registry.hpp:48)
==12549==    by 0x5CF0E5D: use_service<boost::asio::stream_socket_service<boost::asio::ip::tcp> > (io_service.hpp:33)
==12549==    by 0x5CF0E5D: basic_io_object (basic_io_object.hpp:183)
==12549==    by 0x5CF0E5D: basic_socket (basic_socket.hpp:71)
==12549==    by 0x5CF0E5D: basic_stream_socket (basic_stream_socket.hpp:73)
==12549==    by 0x5CF0E5D: pulsar::ExecutorService::createSocket() (ExecutorService.cc:36)
==12549==    by 0x5CB1794: pulsar::ClientConnection::ClientConnection(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::shared_ptr<pulsar::ExecutorService>, pulsar::ClientConfiguration const&, std::shared_ptr<pulsar::Authentication> const&) (ClientConnection.cc:169)
==12549==    by 0x5C1D3F0: pulsar::ConnectionPool::getConnectionAsync(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&) (ConnectionPool.cc:83)
==12549==    by 0x5CA573F: pulsar::BinaryProtoLookupService::getPartitionMetadataAsync(std::shared_ptr<pulsar::TopicName> const&) (BinaryProtoLookupService.cc:72)
==12549==    by 0x5D00CA6: pulsar::ClientImpl::subscribeAsync(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, pulsar::ConsumerConfiguration const&, std::function<void (pulsar::Result, pulsar::Consumer)>) (ClientImpl.cc:350)
==12549==    by 0x5C985EC: pulsar::Client::subscribeAsync(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, pulsar::ConsumerConfiguration const&, std::function<void (pulsar::Result, pulsar::Consumer)>) (Client.cc:89)
==12549==    by 0x5C98803: pulsar::Client::subscribeAsync(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::function<void (pulsar::Result, pulsar::Consumer)>) (Client.cc:83)
==12549==    by 0x4E3852: BasicEndToEndTest_testReceiveAsyncFailedConsumer_Test::TestBody() (BasicEndToEndTest.cc:2848)
==12549==

Modifications

Keep a shared_ptr to ExecutorServicePtr to ensure the executor service is only destroyed after the timer is cancelled.

@merlimat merlimat added type/bug The PR fixed a bug or issue reported a bug component/c++ labels Feb 5, 2020
@merlimat merlimat added this to the 2.6.0 milestone Feb 5, 2020
@merlimat merlimat self-assigned this Feb 5, 2020
@merlimat merlimat merged commit d63b6c1 into apache:master Feb 7, 2020
aahmed-se pushed a commit to aahmed-se/pulsar that referenced this pull request Feb 11, 2020
…ct is alive (apache#6226)

* [C++] Ensure ExecutorServicePtr is not destroyed while the timer object is alive

* Fixed formatting

Co-authored-by: Sijie Guo <guosijie@gmail.com>
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this pull request Aug 24, 2020
…ct is alive (apache#6226)

* [C++] Ensure ExecutorServicePtr is not destroyed while the timer object is alive

* Fixed formatting

Co-authored-by: Sijie Guo <guosijie@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

type/bug The PR fixed a bug or issue reported a bug

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants

Comments