Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap pthread functions in ThreadFuzzer #9635

Merged
merged 3 commits into from
Mar 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
148 changes: 131 additions & 17 deletions dbms/src/Common/ThreadFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@
#include <Common/ThreadFuzzer.h>


/// We will also wrap some thread synchronization functions to inject sleep/migration before or after.
#if OS_LINUX
#define FOR_EACH_WRAPPED_FUNCTION(M) \
M(int, pthread_mutex_lock, pthread_mutex_t * arg) \
M(int, pthread_mutex_unlock, pthread_mutex_t * arg)
#else
#define FOR_EACH_WRAPPED_FUNCTION(M)
#endif


namespace DB
{

Expand Down Expand Up @@ -48,41 +58,108 @@ static void initFromEnv(T & what, const char * name)
what = parse<T>(env);
}

template <typename T>
static void initFromEnv(std::atomic<T> & what, const char * name)
{
const char * env = getenv(name);
if (!env)
return;
what.store(parse<T>(env), std::memory_order_relaxed);
}


static std::atomic<int> num_cpus = 0;

#define DEFINE_WRAPPER_PARAMS(RET, NAME, ...) \
static std::atomic<double> NAME ## _before_yield_probability = 0; \
static std::atomic<double> NAME ## _before_migrate_probability = 0; \
static std::atomic<double> NAME ## _before_sleep_probability = 0; \
static std::atomic<double> NAME ## _before_sleep_time_us = 0; \
\
static std::atomic<double> NAME ## _after_yield_probability = 0; \
static std::atomic<double> NAME ## _after_migrate_probability = 0; \
static std::atomic<double> NAME ## _after_sleep_probability = 0; \
static std::atomic<double> NAME ## _after_sleep_time_us = 0; \

FOR_EACH_WRAPPED_FUNCTION(DEFINE_WRAPPER_PARAMS)

#undef DEFINE_WRAPPER_PARAMS


void ThreadFuzzer::initConfiguration()
{
#if OS_LINUX
num_cpus = get_nprocs();
num_cpus.store(get_nprocs(), std::memory_order_relaxed);
#else
(void)num_cpus;
#endif

initFromEnv(cpu_time_period_us, "THREAD_FUZZER_CPU_TIME_PERIOD_US");
if (!cpu_time_period_us)
return;
initFromEnv(yield_probability, "THREAD_FUZZER_YIELD_PROBABILITY");
initFromEnv(migrate_probability, "THREAD_FUZZER_MIGRATE_PROBABILITY");
initFromEnv(sleep_probability, "THREAD_FUZZER_SLEEP_PROBABILITY");
initFromEnv(chaos_sleep_time_us, "THREAD_FUZZER_SLEEP_TIME_US");
initFromEnv(sleep_time_us, "THREAD_FUZZER_SLEEP_TIME_US");

#define INIT_WRAPPER_PARAMS(RET, NAME, ...) \
initFromEnv(NAME ## _before_yield_probability, "THREAD_FUZZER_" #NAME "_BEFORE_YIELD_PROBABILITY"); \
initFromEnv(NAME ## _before_migrate_probability, "THREAD_FUZZER_" #NAME "_BEFORE_MIGRATE_PROBABILITY"); \
initFromEnv(NAME ## _before_sleep_probability, "THREAD_FUZZER_" #NAME "_BEFORE_SLEEP_PROBABILITY"); \
initFromEnv(NAME ## _before_sleep_time_us, "THREAD_FUZZER_" #NAME "_BEFORE_SLEEP_TIME_US"); \
\
initFromEnv(NAME ## _after_yield_probability, "THREAD_FUZZER_" #NAME "_AFTER_YIELD_PROBABILITY"); \
initFromEnv(NAME ## _after_migrate_probability, "THREAD_FUZZER_" #NAME "_AFTER_MIGRATE_PROBABILITY"); \
initFromEnv(NAME ## _after_sleep_probability, "THREAD_FUZZER_" #NAME "_AFTER_SLEEP_PROBABILITY"); \
initFromEnv(NAME ## _after_sleep_time_us, "THREAD_FUZZER_" #NAME "_AFTER_SLEEP_TIME_US"); \

FOR_EACH_WRAPPED_FUNCTION(INIT_WRAPPER_PARAMS)

#undef INIT_WRAPPER_PARAMS
}

void ThreadFuzzer::signalHandler(int)

bool ThreadFuzzer::isEffective() const
{
auto saved_errno = errno;
#define CHECK_WRAPPER_PARAMS(RET, NAME, ...) \
if (NAME ## _before_yield_probability.load(std::memory_order_relaxed)) return true; \
if (NAME ## _before_migrate_probability.load(std::memory_order_relaxed)) return true; \
if (NAME ## _before_sleep_probability.load(std::memory_order_relaxed)) return true; \
if (NAME ## _before_sleep_time_us.load(std::memory_order_relaxed)) return true; \
\
if (NAME ## _after_yield_probability.load(std::memory_order_relaxed)) return true; \
if (NAME ## _after_migrate_probability.load(std::memory_order_relaxed)) return true; \
if (NAME ## _after_sleep_probability.load(std::memory_order_relaxed)) return true; \
if (NAME ## _after_sleep_time_us.load(std::memory_order_relaxed)) return true; \

FOR_EACH_WRAPPED_FUNCTION(CHECK_WRAPPER_PARAMS)

#undef INIT_WRAPPER_PARAMS

return cpu_time_period_us != 0
&& (yield_probability > 0
|| migrate_probability > 0
|| (sleep_probability > 0 && sleep_time_us > 0));
}

auto & fuzzer = ThreadFuzzer::instance();

if (fuzzer.yield_probability > 0
&& std::bernoulli_distribution(fuzzer.yield_probability)(thread_local_rng))
static void injection(
double yield_probability,
double migrate_probability,
double sleep_probability,
double sleep_time_us [[maybe_unused]])
{
if (yield_probability > 0
&& std::bernoulli_distribution(yield_probability)(thread_local_rng))
{
sched_yield();
}

#if OS_LINUX
if (fuzzer.num_cpus > 0
&& fuzzer.migrate_probability > 0
&& std::bernoulli_distribution(fuzzer.migrate_probability)(thread_local_rng))
int num_cpus_loaded = num_cpus.load(std::memory_order_relaxed);
if (num_cpus_loaded > 0
&& migrate_probability > 0
&& std::bernoulli_distribution(migrate_probability)(thread_local_rng))
{
int migrate_to = std::uniform_int_distribution<>(0, fuzzer.num_cpus - 1)(thread_local_rng);
int migrate_to = std::uniform_int_distribution<>(0, num_cpus_loaded - 1)(thread_local_rng);

cpu_set_t set;
CPU_ZERO(&set);
Expand All @@ -92,12 +169,21 @@ void ThreadFuzzer::signalHandler(int)
}
#endif

if (fuzzer.sleep_probability > 0
&& fuzzer.chaos_sleep_time_us > 0
&& std::bernoulli_distribution(fuzzer.sleep_probability)(thread_local_rng))
if (sleep_probability > 0
&& sleep_time_us > 0
&& std::bernoulli_distribution(sleep_probability)(thread_local_rng))
{
sleepForNanoseconds(fuzzer.chaos_sleep_time_us * 1000);
sleepForNanoseconds(sleep_time_us * 1000);
}
}


void ThreadFuzzer::signalHandler(int)
{
auto saved_errno = errno;

auto & fuzzer = ThreadFuzzer::instance();
injection(fuzzer.yield_probability, fuzzer.migrate_probability, fuzzer.sleep_probability, fuzzer.sleep_time_us);

errno = saved_errno;
}
Expand Down Expand Up @@ -130,4 +216,32 @@ void ThreadFuzzer::setup()
}


/// We expect that for every function like pthread_mutex_lock there is the same function with two underscores prefix.
/// NOTE We cannot use dlsym(... RTLD_NEXT), because it will call pthread_mutex_lock and it will lead to infinite recursion.

#define MAKE_WRAPPER(RET, NAME, ...) \
extern "C" RET __ ## NAME(__VA_ARGS__); /* NOLINT */ \
extern "C" RET NAME(__VA_ARGS__) /* NOLINT */ \
{ \
injection( \
NAME ## _before_yield_probability.load(std::memory_order_relaxed), \
NAME ## _before_migrate_probability.load(std::memory_order_relaxed), \
NAME ## _before_sleep_probability.load(std::memory_order_relaxed), \
NAME ## _before_sleep_time_us.load(std::memory_order_relaxed)); \
\
auto && ret{__ ## NAME(arg)}; \
\
injection( \
NAME ## _after_yield_probability.load(std::memory_order_relaxed), \
NAME ## _after_migrate_probability.load(std::memory_order_relaxed), \
NAME ## _after_sleep_probability.load(std::memory_order_relaxed), \
NAME ## _after_sleep_time_us.load(std::memory_order_relaxed)); \
\
return ret; \
} \

FOR_EACH_WRAPPED_FUNCTION(MAKE_WRAPPER)

#undef MAKE_WRAPPER

}
21 changes: 10 additions & 11 deletions dbms/src/Common/ThreadFuzzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ namespace DB
* it is doable with wrapping these functions (todo).
* - we should also make the sleep time random.
* - sleep obviously helps, but the effect of yield and migration is unclear.
*
* In addition, we allow to inject glitches around thread synchronization functions.
* Example:
*
* THREAD_FUZZER_pthread_mutex_lock_BEFORE_SLEEP_PROBABILITY=0.001
* THREAD_FUZZER_pthread_mutex_lock_BEFORE_SLEEP_TIME_US=10000
* THREAD_FUZZER_pthread_mutex_lock_AFTER_SLEEP_PROBABILITY=0.001
* THREAD_FUZZER_pthread_mutex_lock_AFTER_SLEEP_TIME_US=10000
*/
class ThreadFuzzer
{
Expand All @@ -45,23 +53,14 @@ class ThreadFuzzer
return res;
}

bool isEffective() const
{
return cpu_time_period_us != 0
&& (yield_probability > 0
|| migrate_probability > 0
|| (sleep_probability > 0 && chaos_sleep_time_us > 0));
}
bool isEffective() const;

private:
uint64_t cpu_time_period_us = 0;
double yield_probability = 0;
double migrate_probability = 0;
double sleep_probability = 0;
double chaos_sleep_time_us = 0;

int num_cpus = 0;

double sleep_time_us = 0;

ThreadFuzzer();

Expand Down