Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add USE_TSAN and fix some data races #3026

Merged
merged 7 commits into from May 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion bindings/c/fdb_c.cpp
Expand Up @@ -449,7 +449,7 @@ FDBFuture* fdb_transaction_get_range_impl(

/* _ITERATOR mode maps to one of the known streaming modes
depending on iteration */
static const int mode_bytes_array[] = {CLIENT_KNOBS->BYTE_LIMIT_UNLIMITED, 256, 1000, 4096, 80000};
const int mode_bytes_array[] = { CLIENT_KNOBS->BYTE_LIMIT_UNLIMITED, 256, 1000, 4096, 80000 };

/* The progression used for FDB_STREAMING_MODE_ITERATOR.
Goes from small -> medium -> large. Then 1.5 * previous until serial. */
Expand Down
11 changes: 11 additions & 0 deletions cmake/ConfigureCompiler.cmake
Expand Up @@ -7,6 +7,7 @@ env_set(ALLOC_INSTRUMENTATION OFF BOOL "Instrument alloc")
env_set(WITH_UNDODB OFF BOOL "Use rr or undodb")
env_set(USE_ASAN OFF BOOL "Compile with address sanitizer")
env_set(USE_UBSAN OFF BOOL "Compile with undefined behavior sanitizer")
env_set(USE_TSAN OFF BOOL "Compile with thread sanitizer")
env_set(FDB_RELEASE OFF BOOL "This is a building of a final release")
env_set(USE_CCACHE OFF BOOL "Use ccache for compilation if available")
env_set(RELATIVE_DEBUG_PATHS OFF BOOL "Use relative file paths in debug info")
Expand Down Expand Up @@ -80,6 +81,7 @@ include(CheckFunctionExists)
set(CMAKE_REQUIRED_INCLUDES stdlib.h malloc.h)
set(CMAKE_REQUIRED_LIBRARIES c)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_C_STANDARD 11)

if(WIN32)
# see: https://docs.microsoft.com/en-us/windows/desktop/WinProg/using-the-windows-headers
Expand Down Expand Up @@ -163,6 +165,15 @@ else()
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=undefined ${CMAKE_THREAD_LIBS_INIT}")
endif()

if(USE_TSAN)
add_compile_options(
-fsanitize=thread
-DUSE_SANITIZER)
set(CMAKE_MODULE_LINKER_FLAGS "${CMAKE_MODULE_LINKER_FLAGS} -fsanitize=thread")
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -fsanitize=thread")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=thread ${CMAKE_THREAD_LIBS_INIT}")
endif()

if(PORTABLE_BINARY)
message(STATUS "Create a more portable binary")
set(CMAKE_MODULE_LINKER_FLAGS "-static-libstdc++ -static-libgcc ${CMAKE_MODULE_LINKER_FLAGS}")
Expand Down
4 changes: 2 additions & 2 deletions fdbrpc/AsyncFileEIO.actor.h
Expand Up @@ -415,7 +415,7 @@ class AsyncFileEIO : public IAsyncFile, public ReferenceCounted<AsyncFileEIO> {
return data.result.get();
}

static volatile int32_t want_poll;
static std::atomic<int32_t> want_poll;

ACTOR static void poll_eio() {
while (eio_poll() == -1)
Expand Down Expand Up @@ -445,7 +445,7 @@ class AsyncFileEIO : public IAsyncFile, public ReferenceCounted<AsyncFileEIO> {
};

#ifdef FILESYSTEM_IMPL
volatile int32_t AsyncFileEIO::want_poll = 0;
std::atomic<int32_t> AsyncFileEIO::want_poll = 0;
#endif

#include "flow/unactorcompiler.h"
Expand Down
34 changes: 9 additions & 25 deletions fdbrpc/libeio/eio.c
Expand Up @@ -364,19 +364,19 @@ tvdiff (struct timeval *tv1, struct timeval *tv2)
+ ((tv2->tv_usec - tv1->tv_usec) >> 10);
}

static unsigned int started, idle, wanted = 4;
static _Atomic(unsigned int) started, idle, wanted = 4;

static void (*want_poll_cb) (void);
static void (*done_poll_cb) (void);

static unsigned int max_poll_time; /* reslock */
static unsigned int max_poll_reqs; /* reslock */

static unsigned int nreqs; /* reqlock */
static unsigned int nready; /* reqlock */
static unsigned int npending; /* reqlock */
static unsigned int max_idle = 4; /* maximum number of threads that can idle indefinitely */
static unsigned int idle_timeout = 10; /* number of seconds after which an idle threads exit */
static _Atomic(unsigned int) max_poll_time; /* reslock */
static _Atomic(unsigned int) max_poll_reqs; /* reslock */

static _Atomic(unsigned int) nreqs; /* reqlock */
static _Atomic(unsigned int) nready; /* reqlock */
static _Atomic(unsigned int) npending; /* reqlock */
static _Atomic(unsigned int) max_idle = 4; /* maximum number of threads that can idle indefinitely */
static _Atomic(unsigned int) idle_timeout = 10; /* number of seconds after which an idle threads exit */

static xmutex_t wrklock;
static xmutex_t reslock;
Expand Down Expand Up @@ -433,9 +433,7 @@ static unsigned int
etp_nreqs (void)
{
int retval;
if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
retval = nreqs;
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
return retval;
}

Expand All @@ -444,9 +442,7 @@ etp_nready (void)
{
unsigned int retval;

if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
retval = nready;
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);

return retval;
}
Expand All @@ -456,9 +452,7 @@ etp_npending (void)
{
unsigned int retval;

if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
retval = npending;
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);

return retval;
}
Expand All @@ -468,9 +462,7 @@ etp_nthreads (void)
{
unsigned int retval;

if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
retval = started;
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);

return retval;
}
Expand Down Expand Up @@ -742,33 +734,25 @@ etp_submit (ETP_REQ *req)
static void ecb_cold
etp_set_max_poll_time (double nseconds)
{
if (WORDACCESS_UNSAFE) X_LOCK (reslock);
max_poll_time = nseconds * EIO_TICKS;
if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
}

static void ecb_cold
etp_set_max_poll_reqs (unsigned int maxreqs)
{
if (WORDACCESS_UNSAFE) X_LOCK (reslock);
max_poll_reqs = maxreqs;
if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
}

static void ecb_cold
etp_set_max_idle (unsigned int nthreads)
{
if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
max_idle = nthreads;
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
}

static void ecb_cold
etp_set_idle_timeout (unsigned int seconds)
{
if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
idle_timeout = seconds;
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
}

static void ecb_cold
Expand Down
12 changes: 0 additions & 12 deletions fdbrpc/libeio/xthread.h
@@ -1,18 +1,6 @@
#ifndef XTHREAD_H_
#define XTHREAD_H_

/* whether word reads are potentially non-atomic.
* this is conservative, likely most arches this runs
* on have atomic word read/writes.
*/
#ifndef WORDACCESS_UNSAFE
# if __i386 || __x86_64
# define WORDACCESS_UNSAFE 0
# else
# define WORDACCESS_UNSAFE 1
# endif
#endif

/////////////////////////////////////////////////////////////////////////////

#ifdef _WIN32
Expand Down
8 changes: 6 additions & 2 deletions flow/IThreadPool.h
Expand Up @@ -92,12 +92,16 @@ class ThreadReturnPromise : NonCopyable {
void send( T const& t ) { // Can be called safely from another thread. Call send or sendError at most once.
Promise<Void> signal;
tagAndForward( &promise, t, signal.getFuture() );
g_network->onMainThread( std::move(signal), incrementPriorityIfEven( g_network->getCurrentTask() ) );
g_network->onMainThread(std::move(signal), g_network->isOnMainThread()
? incrementPriorityIfEven(g_network->getCurrentTask())
: TaskPriority::DefaultOnMainThread);
}
void sendError( Error const& e ) { // Can be called safely from another thread. Call send or sendError at most once.
Promise<Void> signal;
tagAndForwardError( &promise, e, signal.getFuture() );
g_network->onMainThread( std::move(signal), incrementPriorityIfEven( g_network->getCurrentTask() ) );
g_network->onMainThread(std::move(signal), g_network->isOnMainThread()
? incrementPriorityIfEven(g_network->getCurrentTask())
: TaskPriority::DefaultOnMainThread);
}
private:
Promise<T> promise;
Expand Down
2 changes: 1 addition & 1 deletion flow/SignalSafeUnwind.cpp
Expand Up @@ -22,7 +22,7 @@

int64_t dl_iterate_phdr_calls = 0;

#ifdef __linux__
#if defined(__linux__) && !defined(USE_SANITIZER)

#include <link.h>
#include <mutex>
Expand Down
8 changes: 2 additions & 6 deletions flow/ThreadHelper.actor.h
Expand Up @@ -197,12 +197,8 @@ class ThreadSingleAssignmentVarBase {
};

void blockUntilReady() {
if(isReadyUnsafe()) {
ThreadSpinLockHolder holder(mutex);
ASSERT(isReadyUnsafe());
}
else {
BlockCallback cb( *this );
if (!isReady()) {
BlockCallback cb(*this);
}
}

Expand Down
21 changes: 9 additions & 12 deletions flow/ThreadPrimitives.h
Expand Up @@ -22,6 +22,8 @@
#define FLOW_THREADPRIMITIVES_H
#pragma once

#include <atomic>

#include "flow/Error.h"
#include "flow/Trace.h"

Expand All @@ -45,7 +47,7 @@
class ThreadSpinLock {
public:
// #ifdef _WIN32
ThreadSpinLock(bool initiallyLocked=false) : isLocked(initiallyLocked) {
ThreadSpinLock() {
#if VALGRIND
ANNOTATE_RWLOCK_CREATE(this);
#endif
Expand All @@ -56,31 +58,26 @@ class ThreadSpinLock {
#endif
}
void enter() {
while (interlockedCompareExchange(&isLocked, 1, 0) == 1)
_mm_pause();
while (isLocked.test_and_set(std::memory_order_acquire)) _mm_pause();
#if VALGRIND
ANNOTATE_RWLOCK_ACQUIRED(this, true);
#endif
}
void leave() {
#if defined(__linux__)
__sync_synchronize();
#endif
isLocked = 0;
#if defined(__linux__)
__sync_synchronize();
#endif
isLocked.clear(std::memory_order_release);
#if VALGRIND
ANNOTATE_RWLOCK_RELEASED(this, true);
#endif
}
void assertNotEntered() {
ASSERT( !isLocked );
ASSERT(!isLocked.test_and_set(std::memory_order_acquire));
isLocked.clear(std::memory_order_release);
}

private:
ThreadSpinLock(const ThreadSpinLock&);
void operator=(const ThreadSpinLock&);
volatile int32_t isLocked;
std::atomic_flag isLocked = ATOMIC_FLAG_INIT;
};

class ThreadSpinLockHolder {
Expand Down
2 changes: 1 addition & 1 deletion flow/stacktrace.amalgamation.cpp
Expand Up @@ -2926,7 +2926,7 @@ static class VDSOInitHelper {
/* Each function is empty and called (via a macro) only in debug mode.
The arguments are captured by dynamic tools at runtime. */

#if DYNAMIC_ANNOTATIONS_EXTERNAL_IMPL == 0 && !defined(__native_client__)
#if DYNAMIC_ANNOTATIONS_EXTERNAL_IMPL == 0 && !defined(__native_client__) && !__has_feature(thread_sanitizer)

#if __has_feature(memory_sanitizer)
#include <sanitizer/msan_interface.h>
Expand Down