Skip to content
This repository has been archived by the owner on Jan 7, 2022. It is now read-only.

Commit

Permalink
Fix TSAN error in LifoEventSem shutdown
Browse files Browse the repository at this point in the history
Summary:
TSAN report:

  ==================
  WARNING: ThreadSanitizer: data race (pid=3276373)
    Write of size 8 at 0x7ba000000330 by thread T10:
      #0 close at ??:?
      #1 facebook::logdevice::detail::SynchronizationFd::~SynchronizationFd() at ./logdevice/common/LifoEventSem.cpp:59
      #2 facebook::logdevice::detail::FdBaton<std::atomic>::~FdBaton() at ./logdevice/common/LifoEventSem.h:68
      #3 std::default_delete<facebook::logdevice::detail::FdBaton<std::atomic> >::operator()(facebook::logdevice::detail::FdBaton<std::atomic>*) const at ./third-party-buck/platform007/build/libgcc/include/c++/trunk/bits/unique_ptr.h:78
      #4 std::unique_ptr<facebook::logdevice::detail::FdBaton<std::atomic>, std::default_delete<facebook::logdevice::detail::FdBaton<std::atomic> > >::~unique_ptr() at ./third-party-buck/platform007/build/libgcc/include/c++/trunk/bits/unique_ptr.h:268
      #5 facebook::logdevice::detail::LifoFdBaton<std::atomic>::~LifoFdBaton() at ./logdevice/common/LifoEventSem.h:155
      #6 folly::detail::LifoSemNode<facebook::logdevice::detail::LifoFdBaton<std::atomic>, std::atomic>::destroy() at ./folly/synchronization/LifoSem.h:195
      #7 folly::detail::LifoSemNodeRecycler<facebook::logdevice::detail::LifoFdBaton<std::atomic>, std::atomic>::operator()(folly::detail::LifoSemNode<facebook::logdevice::detail::LifoFdBaton<std::atomic>, std::atomic>*) const at ./folly/synchronization/LifoSem.h:213
      #8 std::unique_ptr<folly::detail::LifoSemNode<facebook::logdevice::detail::LifoFdBaton<std::atomic>, std::atomic>, folly::detail::LifoSemNodeRecycler<facebook::logdevice::detail::LifoFdBaton<std::atomic>, std::atomic> >::~unique_ptr() at ./third-party-buck/platform007/build/libgcc/include/c++/trunk/bits/unique_ptr.h:268
      #9 facebook::logdevice::LifoEventSemImpl<std::atomic>::AsyncWaiter::~AsyncWaiter() at ./logdevice/common/LifoEventSem.h:380
      #10 facebook::logdevice::LifoEventSemImpl<std::atomic>::AsyncWaiter::~AsyncWaiter() at ./logdevice/common/LifoEventSem.h:358
      #11 folly::DelayedDestruction::onDelayedDestroy(bool) at ./folly/io/async/DelayedDestruction.h:115
      #12 folly::DelayedDestruction::destroy() at ./folly/io/async/DelayedDestruction.h:55
      #13 folly::DelayedDestruction::Destructor::operator()(folly::DelayedDestruction*) const at ./folly/io/async/DelayedDestruction.h:70
      #14 std::unique_ptr<facebook::logdevice::LifoEventSemImpl<std::atomic>::AsyncWaiter, folly::DelayedDestruction::Destructor>::reset(facebook::logdevice::LifoEventSemImpl<std::atomic>::AsyncWaiter*) at ./third-party-buck/platform007/build/libgcc/include/c++/trunk/bits/unique_ptr.h:376
      #15 facebook::logdevice::EventLoopTaskQueue::haveTasksEventHandler() at ./logdevice/common/EventLoopTaskQueue.cpp:113
      #16 facebook::logdevice::EventLoopTaskQueue::EventLoopTaskQueue(facebook::logdevice::EvBase&, unsigned long, std::array<unsigned int, 3ul> const&)::$_0::operator()() const at ./logdevice/common/EventLoopTaskQueue.cpp:36
      #17 void folly::detail::function::FunctionTraits<void ()>::callSmall<facebook::logdevice::EventLoopTaskQueue::EventLoopTaskQueue(facebook::logdevice::EvBase&, unsigned long, std::array<unsigned int, 3ul> const&)::$_0>(folly::detail::function::Data&) at ./folly/Function.h:361
      #18 folly::detail::function::FunctionTraits<void ()>::operator()() at ./folly/Function.h:377
      #19 facebook::logdevice::EventLegacy::evCallback(int, short, void*) at ./logdevice/common/libevent/EventLegacy.cpp:41
      #20 event_persist_closure at ./logdevice/external/libevent-2.1.3-alpha/event.c:1452
      #21 event_process_active_single_queue at ./logdevice/external/libevent-2.1.3-alpha/event.c:1508
      #22 event_process_active at ./logdevice/external/libevent-2.1.3-alpha/event.c:1596
      #23 ld_event_base_loop at ./logdevice/external/libevent-2.1.3-alpha/event.c:1819
      #24 facebook::logdevice::EvBaseLegacy::loop() at ./logdevice/common/libevent/EvBaseLegacy.cpp:58
      #25 facebook::logdevice::EvBase::loop() at ./logdevice/common/libevent/LibEventCompatibility.cpp:52
      #26 facebook::logdevice::EventLoop::run() at ./logdevice/common/EventLoop.cpp:140
      #27 facebook::logdevice::EventLoop::EventLoop(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, facebook::logdevice::ThreadID::Type, unsigned long, bool, std::array<unsigned int, 3ul> const&, facebook::logdevice::EvBase::EvBaseType)::$_0::operator()() const at ./logdevice/common/EventLoop.cpp:81
      #28 std::_Function_handler<void (), facebook::logdevice::EventLoop::EventLoop(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, facebook::logdevice::ThreadID::Type, unsigned long, bool, std::array<unsigned int, 3ul> const&, facebook::logdevice::EvBase::EvBaseType)::$_0>::_M_invoke(std::_Any_data const&) at ./third-party-buck/platform007/build/libgcc/include/c++/trunk/bits/std_function.h:316
      #29 std::function<void ()>::operator()() const at ./third-party-buck/platform007/build/libgcc/include/c++/trunk/bits/std_function.h:706
      #30 facebook::logdevice::thread_func(void*) at ./logdevice/common/PThread.cpp:9
      #31 __tsan_thread_start_func at tsan.c:?

    Previous read of size 8 at 0x7ba000000330 by main thread:
      #0 write at ??:?
      #1 facebook::logdevice::detail::SynchronizationFd::write() at ./logdevice/common/LifoEventSem.cpp:75
      #2 post at ./logdevice/common/LifoEventSem.h:72
      #3 facebook::logdevice::EventLoopTaskQueue::shutdown() at ./logdevice/common/EventLoopTaskQueue.cpp:68
      #4 facebook::logdevice::Processor::shutdown() at ./logdevice/common/Processor.cpp:585
      #5 facebook::logdevice::ClientImpl::~ClientImpl() at ./logdevice/lib/ClientImpl.cpp:314
      #6 void __gnu_cxx::new_allocator<facebook::logdevice::ClientImpl>::destroy<facebook::logdevice::ClientImpl>(facebook::logdevice::ClientImpl*) at ./third-party-buck/platform007/build/libgcc/include/c++/trunk/ext/new_allocator.h:157
      #7 void std::allocator_traits<std::allocator<facebook::logdevice::ClientImpl> >::destroy<facebook::logdevice::ClientImpl>(std::allocator<facebook::logdevice::ClientImpl>&, facebook::logdevice::ClientImpl*) at ./third-party-buck/platform007/build/libgcc/include/c++/trunk/bits/alloc_traits.h:488
      #8 std::_Sp_counted_ptr_inplace<facebook::logdevice::ClientImpl, std::allocator<facebook::logdevice::ClientImpl>, (__gnu_cxx::_Lock_policy)2>::_M_dispose() at ./third-party-buck/platform007/build/libgcc/include/c++/trunk/bits/shared_ptr_base.h:537
      #9 std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() at ./third-party-buck/platform007/build/libgcc/include/c++/trunk/bits/shared_ptr_base.h:156
      #10 std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() at ./third-party-buck/platform007/build/libgcc/include/c++/trunk/bits/shared_ptr_base.h:686
      #11 std::__shared_ptr<facebook::logdevice::Client, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr() at ./third-party-buck/platform007/build/libgcc/include/c++/trunk/bits/shared_ptr_base.h:1125
      #12 std::shared_ptr<facebook::logdevice::Client>::~shared_ptr() at ./third-party-buck/platform007/build/libgcc/include/c++/trunk/bits/shared_ptr.h:93
      #13 SequencerIntegrationTest_ExpandShrinkReactivationDelayTest_Test::TestBody() at ./logdevice/test/SequencerIntegrationTest.cpp:580
      #14 void testing::internal::HandleSehExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) at /home/engshare/third-party2/googletest/master/src/googletest/googletest/src/gtest.cc:2417
      #15 main at ./common/gtest/LightMain.cpp:19
      #16 ?? ??:0

During shutdown, a write() to fd triggers the worker thread to wake up and destroy the same fd. So, technically, fd might be destroyed before the write() returns, while write() is still accessing the fd. This diff adds some synchronization to make sure fd close() starts only after write() returns.

#thanks Nick Sukhanov for figuring that out.

Reviewed By: mcrnic, tau0

Differential Revision: D18018223

fbshipit-source-id: ddbff95257dd39c835ad6e8733ba0bf2ed432fa0
  • Loading branch information
al13n321 authored and facebook-github-bot committed Oct 25, 2019
1 parent b513b66 commit 729de55
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 30 deletions.
65 changes: 38 additions & 27 deletions logdevice/common/LifoEventSem.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ class SynchronizationFd {
pid_t pid_;
};

/// Works like a Baton, but performs its handoff via a user-visible
/// eventfd or pipe. This allows the caller to wait for multiple events
/// simultaneously using epoll, poll, select, or a wrapping library
/// Works like a Baton, but is reusable and performs its handoff via a
/// user-visible eventfd or pipe. This allows the caller to wait for multiple
/// events simultaneously using epoll, poll, select, or a wrapping library
/// like libevent.
/// All bets are off if you do anything to fd besides adding it to an
/// epoll wait set (don't read from, write to, or close it).
Expand All @@ -68,13 +68,14 @@ template <template <typename> class Atom>
class FdBaton {
public:
FOLLY_ALWAYS_INLINE void post() {
event_count_.fetch_add(1, std::memory_order_release);
bits_.fetch_add(1 + kFDWriteInProgress, std::memory_order_release);
fd_.write();
bits_.fetch_sub(kFDWriteInProgress, std::memory_order_release);
}
FOLLY_ALWAYS_INLINE bool try_wait() noexcept {
auto res = fd_.read();
if (res) {
event_count_.fetch_sub(1, std::memory_order_acquire);
bits_.fetch_sub(1, std::memory_order_acquire);
}
return res;
}
Expand Down Expand Up @@ -109,14 +110,22 @@ class FdBaton {

/// Introduces an acquire memory barrier to match a release one set by post
FOLLY_ALWAYS_INLINE int get_event_count() noexcept {
return event_count_.load(std::memory_order_acquire);
return bits_.load(std::memory_order_acquire) & (kFDWriteInProgress - 1);
}

/// Waits for baton to become active but does not consume an event.
FOLLY_ALWAYS_INLINE void wait_readable() noexcept {
fd_.poll();
}

~FdBaton() {
// Wait for any in-flight fd_.write() calls to return, making it safe to
// destroy the fd.
while (bits_.load(std::memory_order_acquire) >= kFDWriteInProgress) {
std::this_thread::yield();
}
}

private:
template <typename Rep, typename Period>
bool
Expand All @@ -134,8 +143,16 @@ class FdBaton {
}
return res;
}

SynchronizationFd fd_;
Atom<int> event_count_{0};

// The low 32 bits are event count, i.e. number of post() calls minus number
// of wait() calls done so far.
// The high 32 bits are number of writes to fd_
// that might be in progress right now; this is used for detecting when fd_ is
// safe to destroy during shutdown.
Atom<uint64_t> bits_{0};
static constexpr uint64_t kFDWriteInProgress = 1ul << 32;
};

/// This is a wrapper introduced for usage within LifoSem.
Expand Down Expand Up @@ -345,26 +362,20 @@ class LifoEventSemImpl

private:
~AsyncWaiter() override {
// If we are not enqueued we consumed an event from queue but did not
// process it. If we weren't posted to because of shutdown we should
// return the event to semaphore as we will not process it.
if (!owner_.tryRemoveNode(*node_)) {
// If we are destroying just as we were dequeued from LifoSem but the
// post did not happen yet we need to wait for a post to happen
// otherwise we risk a post after we are already destroyed
node_->handoff().wait_readable();
// We need to call get_event_count in order to introduce acquire memory
// barrier as node_->isShutdownNotice() call is not safe without it.
if (FOLLY_UNLIKELY(node_->handoff().get_event_count() != 1)) {
// we just waited for fd to become readable so event count should be 1
// otherwise all bets are off
return;
}
// if it's not shutdown we got notified but we will not process the
// request so we need to return token to semaphore
if (!node_->isShutdownNotice()) {
owner_.post();
}
// If we were enqueued we can just return otherwise we consumed an event
// from semaphore but did not process it.
if (owner_.tryRemoveNode(*node_)) {
return;
}

// If we are destroying just as we were dequeued from LifoSem but the
// post did not happen yet we need to wait for a post to happen
// otherwise we risk a post after we are already destroyed.
node_->handoff().wait_readable();
// if it's not shutdown we got notified but we will not process the
// event so we need to return token to semaphore.
if (!node_->isShutdownNotice()) {
owner_.post();
}
}
typedef typename LifoEventSemImpl<Atom>::WaitResult WaitResult;
Expand Down
3 changes: 0 additions & 3 deletions logdevice/common/test/LifoEventSemTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ using namespace folly;
using namespace folly::test;
using namespace facebook::logdevice;

typedef LifoEventSemImpl<DeterministicAtomic> DLifoEventSem;
typedef DeterministicSchedule DSched;

namespace facebook { namespace logdevice { namespace detail {

template <>
Expand Down

0 comments on commit 729de55

Please sign in to comment.