Skip to content

Commit

Permalink
Merge pull request #3026 from atn34/atn34/tsan
Browse files Browse the repository at this point in the history
Add USE_TSAN and fix some data races
  • Loading branch information
ajbeamon committed May 8, 2020
2 parents e2cf7af + 202129d commit 0be453d
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 62 deletions.
2 changes: 1 addition & 1 deletion bindings/c/fdb_c.cpp
Expand Up @@ -449,7 +449,7 @@ FDBFuture* fdb_transaction_get_range_impl(

/* _ITERATOR mode maps to one of the known streaming modes
depending on iteration */
static const int mode_bytes_array[] = {CLIENT_KNOBS->BYTE_LIMIT_UNLIMITED, 256, 1000, 4096, 80000};
const int mode_bytes_array[] = { CLIENT_KNOBS->BYTE_LIMIT_UNLIMITED, 256, 1000, 4096, 80000 };

/* The progression used for FDB_STREAMING_MODE_ITERATOR.
Goes from small -> medium -> large. Then 1.5 * previous until serial. */
Expand Down
11 changes: 11 additions & 0 deletions cmake/ConfigureCompiler.cmake
Expand Up @@ -8,6 +8,7 @@ env_set(ALLOC_INSTRUMENTATION OFF BOOL "Instrument alloc")
env_set(WITH_UNDODB OFF BOOL "Use rr or undodb")
env_set(USE_ASAN OFF BOOL "Compile with address sanitizer")
env_set(USE_UBSAN OFF BOOL "Compile with undefined behavior sanitizer")
env_set(USE_TSAN OFF BOOL "Compile with thread sanitizer")
env_set(FDB_RELEASE OFF BOOL "This is a building of a final release")
env_set(USE_CCACHE OFF BOOL "Use ccache for compilation if available")
env_set(RELATIVE_DEBUG_PATHS OFF BOOL "Use relative file paths in debug info")
Expand Down Expand Up @@ -81,6 +82,7 @@ include(CheckFunctionExists)
set(CMAKE_REQUIRED_INCLUDES stdlib.h malloc.h)
set(CMAKE_REQUIRED_LIBRARIES c)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_C_STANDARD 11)

if(WIN32)
# see: https://docs.microsoft.com/en-us/windows/desktop/WinProg/using-the-windows-headers
Expand Down Expand Up @@ -164,6 +166,15 @@ else()
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=undefined ${CMAKE_THREAD_LIBS_INIT}")
endif()

if(USE_TSAN)
add_compile_options(
-fsanitize=thread
-DUSE_SANITIZER)
set(CMAKE_MODULE_LINKER_FLAGS "${CMAKE_MODULE_LINKER_FLAGS} -fsanitize=thread")
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -fsanitize=thread")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=thread ${CMAKE_THREAD_LIBS_INIT}")
endif()

if(PORTABLE_BINARY)
message(STATUS "Create a more portable binary")
set(CMAKE_MODULE_LINKER_FLAGS "-static-libstdc++ -static-libgcc ${CMAKE_MODULE_LINKER_FLAGS}")
Expand Down
4 changes: 2 additions & 2 deletions fdbrpc/AsyncFileEIO.actor.h
Expand Up @@ -415,7 +415,7 @@ class AsyncFileEIO : public IAsyncFile, public ReferenceCounted<AsyncFileEIO> {
return data.result.get();
}

static volatile int32_t want_poll;
static std::atomic<int32_t> want_poll;

ACTOR static void poll_eio() {
while (eio_poll() == -1)
Expand Down Expand Up @@ -445,7 +445,7 @@ class AsyncFileEIO : public IAsyncFile, public ReferenceCounted<AsyncFileEIO> {
};

#ifdef FILESYSTEM_IMPL
volatile int32_t AsyncFileEIO::want_poll = 0;
std::atomic<int32_t> AsyncFileEIO::want_poll = 0;
#endif

#include "flow/unactorcompiler.h"
Expand Down
34 changes: 9 additions & 25 deletions fdbrpc/libeio/eio.c
Expand Up @@ -366,19 +366,19 @@ tvdiff (struct timeval *tv1, struct timeval *tv2)
+ ((tv2->tv_usec - tv1->tv_usec) >> 10);
}

static unsigned int started, idle, wanted = 4;
static _Atomic(unsigned int) started, idle, wanted = 4;

static void (*want_poll_cb) (void);
static void (*done_poll_cb) (void);

static unsigned int max_poll_time; /* reslock */
static unsigned int max_poll_reqs; /* reslock */

static unsigned int nreqs; /* reqlock */
static unsigned int nready; /* reqlock */
static unsigned int npending; /* reqlock */
static unsigned int max_idle = 4; /* maximum number of threads that can idle indefinitely */
static unsigned int idle_timeout = 10; /* number of seconds after which an idle threads exit */
static _Atomic(unsigned int) max_poll_time; /* reslock */
static _Atomic(unsigned int) max_poll_reqs; /* reslock */

static _Atomic(unsigned int) nreqs; /* reqlock */
static _Atomic(unsigned int) nready; /* reqlock */
static _Atomic(unsigned int) npending; /* reqlock */
static _Atomic(unsigned int) max_idle = 4; /* maximum number of threads that can idle indefinitely */
static _Atomic(unsigned int) idle_timeout = 10; /* number of seconds after which an idle threads exit */

static xmutex_t wrklock;
static xmutex_t reslock;
Expand Down Expand Up @@ -435,9 +435,7 @@ static unsigned int
etp_nreqs (void)
{
int retval;
if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
retval = nreqs;
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
return retval;
}

Expand All @@ -446,9 +444,7 @@ etp_nready (void)
{
unsigned int retval;

if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
retval = nready;
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);

return retval;
}
Expand All @@ -458,9 +454,7 @@ etp_npending (void)
{
unsigned int retval;

if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
retval = npending;
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);

return retval;
}
Expand All @@ -470,9 +464,7 @@ etp_nthreads (void)
{
unsigned int retval;

if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
retval = started;
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);

return retval;
}
Expand Down Expand Up @@ -744,33 +736,25 @@ etp_submit (ETP_REQ *req)
static void ecb_cold
etp_set_max_poll_time (double nseconds)
{
if (WORDACCESS_UNSAFE) X_LOCK (reslock);
max_poll_time = nseconds * EIO_TICKS;
if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
}

static void ecb_cold
etp_set_max_poll_reqs (unsigned int maxreqs)
{
if (WORDACCESS_UNSAFE) X_LOCK (reslock);
max_poll_reqs = maxreqs;
if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
}

static void ecb_cold
etp_set_max_idle (unsigned int nthreads)
{
if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
max_idle = nthreads;
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
}

static void ecb_cold
etp_set_idle_timeout (unsigned int seconds)
{
if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
idle_timeout = seconds;
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
}

static void ecb_cold
Expand Down
12 changes: 0 additions & 12 deletions fdbrpc/libeio/xthread.h
@@ -1,18 +1,6 @@
#ifndef XTHREAD_H_
#define XTHREAD_H_

/* whether word reads are potentially non-atomic.
* this is conservative, likely most arches this runs
* on have atomic word read/writes.
*/
#ifndef WORDACCESS_UNSAFE
# if __i386 || __x86_64
# define WORDACCESS_UNSAFE 0
# else
# define WORDACCESS_UNSAFE 1
# endif
#endif

/////////////////////////////////////////////////////////////////////////////

#ifdef _WIN32
Expand Down
8 changes: 6 additions & 2 deletions flow/IThreadPool.h
Expand Up @@ -92,12 +92,16 @@ class ThreadReturnPromise : NonCopyable {
void send( T const& t ) { // Can be called safely from another thread. Call send or sendError at most once.
Promise<Void> signal;
tagAndForward( &promise, t, signal.getFuture() );
g_network->onMainThread( std::move(signal), incrementPriorityIfEven( g_network->getCurrentTask() ) );
g_network->onMainThread(std::move(signal), g_network->isOnMainThread()
? incrementPriorityIfEven(g_network->getCurrentTask())
: TaskPriority::DefaultOnMainThread);
}
void sendError( Error const& e ) { // Can be called safely from another thread. Call send or sendError at most once.
Promise<Void> signal;
tagAndForwardError( &promise, e, signal.getFuture() );
g_network->onMainThread( std::move(signal), incrementPriorityIfEven( g_network->getCurrentTask() ) );
g_network->onMainThread(std::move(signal), g_network->isOnMainThread()
? incrementPriorityIfEven(g_network->getCurrentTask())
: TaskPriority::DefaultOnMainThread);
}
private:
Promise<T> promise;
Expand Down
2 changes: 1 addition & 1 deletion flow/SignalSafeUnwind.cpp
Expand Up @@ -22,7 +22,7 @@

int64_t dl_iterate_phdr_calls = 0;

#ifdef __linux__
#if defined(__linux__) && !defined(USE_SANITIZER)

#include <link.h>
#include <mutex>
Expand Down
8 changes: 2 additions & 6 deletions flow/ThreadHelper.actor.h
Expand Up @@ -197,12 +197,8 @@ class ThreadSingleAssignmentVarBase {
};

void blockUntilReady() {
if(isReadyUnsafe()) {
ThreadSpinLockHolder holder(mutex);
ASSERT(isReadyUnsafe());
}
else {
BlockCallback cb( *this );
if (!isReady()) {
BlockCallback cb(*this);
}
}

Expand Down
21 changes: 9 additions & 12 deletions flow/ThreadPrimitives.h
Expand Up @@ -22,6 +22,8 @@
#define FLOW_THREADPRIMITIVES_H
#pragma once

#include <atomic>

#include "flow/Error.h"
#include "flow/Trace.h"

Expand All @@ -45,7 +47,7 @@
class ThreadSpinLock {
public:
// #ifdef _WIN32
ThreadSpinLock(bool initiallyLocked=false) : isLocked(initiallyLocked) {
ThreadSpinLock() {
#if VALGRIND
ANNOTATE_RWLOCK_CREATE(this);
#endif
Expand All @@ -56,31 +58,26 @@ class ThreadSpinLock {
#endif
}
void enter() {
while (interlockedCompareExchange(&isLocked, 1, 0) == 1)
_mm_pause();
while (isLocked.test_and_set(std::memory_order_acquire)) _mm_pause();
#if VALGRIND
ANNOTATE_RWLOCK_ACQUIRED(this, true);
#endif
}
void leave() {
#if defined(__linux__)
__sync_synchronize();
#endif
isLocked = 0;
#if defined(__linux__)
__sync_synchronize();
#endif
isLocked.clear(std::memory_order_release);
#if VALGRIND
ANNOTATE_RWLOCK_RELEASED(this, true);
#endif
}
void assertNotEntered() {
ASSERT( !isLocked );
ASSERT(!isLocked.test_and_set(std::memory_order_acquire));
isLocked.clear(std::memory_order_release);
}

private:
ThreadSpinLock(const ThreadSpinLock&);
void operator=(const ThreadSpinLock&);
volatile int32_t isLocked;
std::atomic_flag isLocked = ATOMIC_FLAG_INIT;
};

class ThreadSpinLockHolder {
Expand Down
2 changes: 1 addition & 1 deletion flow/stacktrace.amalgamation.cpp
Expand Up @@ -2926,7 +2926,7 @@ static class VDSOInitHelper {
/* Each function is empty and called (via a macro) only in debug mode.
The arguments are captured by dynamic tools at runtime. */

#if DYNAMIC_ANNOTATIONS_EXTERNAL_IMPL == 0 && !defined(__native_client__)
#if DYNAMIC_ANNOTATIONS_EXTERNAL_IMPL == 0 && !defined(__native_client__) && !__has_feature(thread_sanitizer)

#if __has_feature(memory_sanitizer)
#include <sanitizer/msan_interface.h>
Expand Down

0 comments on commit 0be453d

Please sign in to comment.