Skip to content

Commit

Permalink
Wait for jobs to finish on exception (fixes rare segfaults) (#7350)
Browse files Browse the repository at this point in the history
  • Loading branch information
tavplubix authored and akuzm committed Oct 17, 2019
1 parent b62c9e7 commit 0046b9f
Show file tree
Hide file tree
Showing 29 changed files with 154 additions and 103 deletions.
21 changes: 15 additions & 6 deletions dbms/programs/benchmark/Benchmark.cpp
Expand Up @@ -274,15 +274,24 @@ class Benchmark : public Poco::Util::Application
pcg64 generator(randomSeed());
std::uniform_int_distribution<size_t> distribution(0, queries.size() - 1);

for (size_t i = 0; i < concurrency; ++i)
try
{
EntryPtrs connection_entries;
connection_entries.reserve(connections.size());
for (size_t i = 0; i < concurrency; ++i)
{
EntryPtrs connection_entries;
connection_entries.reserve(connections.size());

for (const auto & connection : connections)
connection_entries.emplace_back(std::make_shared<Entry>(connection->get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings))));
for (const auto & connection : connections)
connection_entries.emplace_back(std::make_shared<Entry>(
connection->get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings))));

pool.schedule(std::bind(&Benchmark::thread, this, connection_entries));
pool.scheduleOrThrowOnError(std::bind(&Benchmark::thread, this, connection_entries));
}
}
catch (...)
{
pool.wait();
throw;
}

InterruptListener interrupt_listener;
Expand Down
4 changes: 2 additions & 2 deletions dbms/programs/copier/ClusterCopier.cpp
Expand Up @@ -895,7 +895,7 @@ class ClusterCopier
ThreadPool thread_pool(num_threads ? num_threads : 2 * getNumberOfPhysicalCPUCores());

for (const TaskShardPtr & task_shard : task_table.all_shards)
thread_pool.schedule([this, timeouts, task_shard]() { discoverShardPartitions(timeouts, task_shard); });
thread_pool.scheduleOrThrowOnError([this, timeouts, task_shard]() { discoverShardPartitions(timeouts, task_shard); });

LOG_DEBUG(log, "Waiting for " << thread_pool.active() << " setup jobs");
thread_pool.wait();
Expand Down Expand Up @@ -2038,7 +2038,7 @@ class ClusterCopier
ThreadPool thread_pool(std::min<UInt64>(num_shards, getNumberOfPhysicalCPUCores()));

for (UInt64 shard_index = 0; shard_index < num_shards; ++shard_index)
thread_pool.schedule([=] { do_for_shard(shard_index); });
thread_pool.scheduleOrThrowOnError([=] { do_for_shard(shard_index); });

thread_pool.wait();
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/programs/server/TCPHandler.cpp
Expand Up @@ -565,7 +565,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
auto executor = pipeline.execute();
std::atomic_bool exception = false;

pool.schedule([&]()
pool.scheduleOrThrowOnError([&]()
{
/// ThreadStatus thread_status;

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Common/ThreadPool.cpp
Expand Up @@ -121,13 +121,13 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::opti
}

template <typename Thread>
void ThreadPoolImpl<Thread>::schedule(Job job, int priority)
void ThreadPoolImpl<Thread>::scheduleOrThrowOnError(Job job, int priority)
{
scheduleImpl<void>(std::move(job), priority, std::nullopt);
}

template <typename Thread>
bool ThreadPoolImpl<Thread>::trySchedule(Job job, int priority, uint64_t wait_microseconds)
bool ThreadPoolImpl<Thread>::trySchedule(Job job, int priority, uint64_t wait_microseconds) noexcept
{
return scheduleImpl<bool>(std::move(job), priority, wait_microseconds);
}
Expand Down
19 changes: 12 additions & 7 deletions dbms/src/Common/ThreadPool.h
Expand Up @@ -36,18 +36,23 @@ class ThreadPoolImpl
ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_);

/// Add new job. Locks until number of scheduled jobs is less than maximum or exception in one of threads was thrown.
/// If an exception in some thread was thrown, method silently returns, and exception will be rethrown only on call to 'wait' function.
/// If any thread was throw an exception, first exception will be rethrown from this method,
/// and exception will be cleared.
/// Also throws an exception if cannot create thread.
/// Priority: greater is higher.
void schedule(Job job, int priority = 0);
/// NOTE: Probably you should call wait() if exception was thrown. If some previously scheduled jobs are using some objects,
/// located on stack of current thread, the stack must not be unwinded until all jobs finished. However,
/// if ThreadPool is a local object, it will wait for all scheduled jobs in own destructor.
void scheduleOrThrowOnError(Job job, int priority = 0);

/// Wait for specified amount of time and schedule a job or return false.
bool trySchedule(Job job, int priority = 0, uint64_t wait_microseconds = 0);
/// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or return false.
bool trySchedule(Job job, int priority = 0, uint64_t wait_microseconds = 0) noexcept;

/// Wait for specified amount of time and schedule a job or throw an exception.
/// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or throw an exception.
void scheduleOrThrow(Job job, int priority = 0, uint64_t wait_microseconds = 0);

/// Wait for all currently active jobs to be done.
/// You may call schedule and wait many times in arbitary order.
/// You may call schedule and wait many times in arbitrary order.
/// If any thread was throw an exception, first exception will be rethrown from this method,
/// and exception will be cleared.
void wait();
Expand Down Expand Up @@ -140,7 +145,7 @@ class ThreadFromGlobalPool
explicit ThreadFromGlobalPool(Function && func, Args &&... args)
: state(std::make_shared<Poco::Event>())
{
/// NOTE: If this will throw an exception, the descructor won't be called.
/// NOTE: If this will throw an exception, the destructor won't be called.
GlobalThreadPool::instance().scheduleOrThrow([
state = state,
func = std::forward<Function>(func),
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Common/tests/gtest_thread_pool_concurrent_wait.cpp
Expand Up @@ -21,14 +21,14 @@ TEST(ThreadPool, ConcurrentWait)
ThreadPool pool(num_threads);

for (size_t i = 0; i < num_jobs; ++i)
pool.schedule(worker);
pool.scheduleOrThrowOnError(worker);

constexpr size_t num_waiting_threads = 4;

ThreadPool waiting_pool(num_waiting_threads);

for (size_t i = 0; i < num_waiting_threads; ++i)
waiting_pool.schedule([&pool]{ pool.wait(); });
waiting_pool.scheduleOrThrowOnError([&pool] { pool.wait(); });

waiting_pool.wait();
}
10 changes: 5 additions & 5 deletions dbms/src/Common/tests/gtest_thread_pool_global_full.cpp
Expand Up @@ -30,11 +30,11 @@ TEST(ThreadPool, GlobalFull1)
ThreadPool pool(num_jobs);

for (size_t i = 0; i < capacity; ++i)
pool.schedule(func);
pool.scheduleOrThrowOnError(func);

for (size_t i = capacity; i < num_jobs; ++i)
{
EXPECT_THROW(pool.schedule(func), DB::Exception);
EXPECT_THROW(pool.scheduleOrThrowOnError(func), DB::Exception);
++counter;
}

Expand Down Expand Up @@ -67,10 +67,10 @@ TEST(ThreadPool, GlobalFull2)

ThreadPool pool(capacity, 0, capacity);
for (size_t i = 0; i < capacity; ++i)
pool.schedule(func);
pool.scheduleOrThrowOnError(func);

ThreadPool another_pool(1);
EXPECT_THROW(another_pool.schedule(func), DB::Exception);
EXPECT_THROW(another_pool.scheduleOrThrowOnError(func), DB::Exception);

++counter;

Expand All @@ -79,7 +79,7 @@ TEST(ThreadPool, GlobalFull2)
global_pool.wait();

for (size_t i = 0; i < capacity; ++i)
another_pool.schedule([&] { ++counter; });
another_pool.scheduleOrThrowOnError([&] { ++counter; });

another_pool.wait();
EXPECT_EQ(counter, capacity * 2 + 1);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Common/tests/gtest_thread_pool_limit.cpp
Expand Up @@ -14,7 +14,7 @@ int test()

std::atomic<int> counter{0};
for (size_t i = 0; i < 10; ++i)
pool.schedule([&]{ ++counter; });
pool.scheduleOrThrowOnError([&]{ ++counter; });
pool.wait();

return counter;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Common/tests/gtest_thread_pool_loop.cpp
Expand Up @@ -14,7 +14,7 @@ TEST(ThreadPool, Loop)
size_t threads = 16;
ThreadPool pool(threads);
for (size_t j = 0; j < threads; ++j)
pool.schedule([&]{ ++res; });
pool.scheduleOrThrowOnError([&] { ++res; });
pool.wait();
}

Expand Down
Expand Up @@ -9,12 +9,12 @@ bool check()
{
ThreadPool pool(10);

pool.schedule([]{ throw std::runtime_error("Hello, world!"); });
pool.scheduleOrThrowOnError([] { throw std::runtime_error("Hello, world!"); });

try
{
for (size_t i = 0; i < 100; ++i)
pool.schedule([]{}); /// An exception will be rethrown from this method.
pool.scheduleOrThrowOnError([] {}); /// An exception will be rethrown from this method.
}
catch (const std::runtime_error &)
{
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Common/tests/multi_version.cpp
Expand Up @@ -37,8 +37,8 @@ int main(int, char **)
ThreadPool tp(8);
for (size_t i = 0; i < n; ++i)
{
tp.schedule(std::bind(thread1, std::ref(x), std::ref(results[i])));
tp.schedule(std::bind(thread2, std::ref(x), (rand() % 2) ? s1 : s2));
tp.scheduleOrThrowOnError(std::bind(thread1, std::ref(x), std::ref(results[i])));
tp.scheduleOrThrowOnError(std::bind(thread2, std::ref(x), (rand() % 2) ? s1 : s2));
}
tp.wait();

Expand Down
24 changes: 12 additions & 12 deletions dbms/src/Common/tests/parallel_aggregation.cpp
Expand Up @@ -284,7 +284,7 @@ int main(int argc, char ** argv)
Stopwatch watch;

for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate1,
pool.scheduleOrThrowOnError(std::bind(aggregate1,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
Expand Down Expand Up @@ -338,7 +338,7 @@ int main(int argc, char ** argv)
Stopwatch watch;

for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate12,
pool.scheduleOrThrowOnError(std::bind(aggregate12,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
Expand Down Expand Up @@ -397,7 +397,7 @@ int main(int argc, char ** argv)
Stopwatch watch;

for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate1,
pool.scheduleOrThrowOnError(std::bind(aggregate1,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
Expand Down Expand Up @@ -473,7 +473,7 @@ int main(int argc, char ** argv)
Stopwatch watch;

for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate2,
pool.scheduleOrThrowOnError(std::bind(aggregate2,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
Expand All @@ -499,7 +499,7 @@ int main(int argc, char ** argv)
watch.restart();

for (size_t i = 0; i < MapTwoLevel::NUM_BUCKETS; ++i)
pool.schedule(std::bind(merge2,
pool.scheduleOrThrowOnError(std::bind(merge2,
maps.data(), num_threads, i));

pool.wait();
Expand Down Expand Up @@ -527,7 +527,7 @@ int main(int argc, char ** argv)
Stopwatch watch;

for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate22,
pool.scheduleOrThrowOnError(std::bind(aggregate22,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
Expand All @@ -553,7 +553,7 @@ int main(int argc, char ** argv)
watch.restart();

for (size_t i = 0; i < MapTwoLevel::NUM_BUCKETS; ++i)
pool.schedule(std::bind(merge2, maps.data(), num_threads, i));
pool.scheduleOrThrowOnError(std::bind(merge2, maps.data(), num_threads, i));

pool.wait();

Expand Down Expand Up @@ -592,7 +592,7 @@ int main(int argc, char ** argv)
Stopwatch watch;

for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate3,
pool.scheduleOrThrowOnError(std::bind(aggregate3,
std::ref(local_maps[i]),
std::ref(global_map),
std::ref(mutex),
Expand Down Expand Up @@ -658,7 +658,7 @@ int main(int argc, char ** argv)
Stopwatch watch;

for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate33,
pool.scheduleOrThrowOnError(std::bind(aggregate33,
std::ref(local_maps[i]),
std::ref(global_map),
std::ref(mutex),
Expand Down Expand Up @@ -727,7 +727,7 @@ int main(int argc, char ** argv)
Stopwatch watch;

for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate4,
pool.scheduleOrThrowOnError(std::bind(aggregate4,
std::ref(local_maps[i]),
std::ref(global_map),
mutexes.data(),
Expand Down Expand Up @@ -797,7 +797,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate5,
pool.scheduleOrThrowOnError(std::bind(aggregate5,
std::ref(local_maps[i]),
std::ref(global_map),
data.begin() + (data.size() * i) / num_threads,
Expand Down Expand Up @@ -860,7 +860,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate1,
pool.scheduleOrThrowOnError(std::bind(aggregate1,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Common/tests/parallel_aggregation2.cpp
Expand Up @@ -42,7 +42,7 @@ struct AggregateIndependent
auto end = data.begin() + (data.size() * (i + 1)) / num_threads;
auto & map = *results[i];

pool.schedule([&, begin, end]()
pool.scheduleOrThrowOnError([&, begin, end]()
{
for (auto it = begin; it != end; ++it)
{
Expand Down Expand Up @@ -85,7 +85,7 @@ struct AggregateIndependentWithSequentialKeysOptimization
auto end = data.begin() + (data.size() * (i + 1)) / num_threads;
auto & map = *results[i];

pool.schedule([&, begin, end]()
pool.scheduleOrThrowOnError([&, begin, end]()
{
typename Map::LookupResult place = nullptr;
Key prev_key {};
Expand Down Expand Up @@ -180,7 +180,7 @@ struct MergeParallelForTwoLevelTable
ThreadPool & pool)
{
for (size_t bucket = 0; bucket < Map::NUM_BUCKETS; ++bucket)
pool.schedule([&, bucket, num_maps]
pool.scheduleOrThrowOnError([&, bucket, num_maps]
{
std::vector<typename Map::Impl *> section(num_maps);
for (size_t i = 0; i < num_maps; ++i)
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Common/tests/thread_creation_latency.cpp
Expand Up @@ -66,7 +66,7 @@ int main(int argc, char ** argv)
test(n, "Create and destroy ThreadPool each iteration", []
{
ThreadPool tp(1);
tp.schedule(f);
tp.scheduleOrThrowOnError(f);
tp.wait();
});

Expand All @@ -90,7 +90,7 @@ int main(int argc, char ** argv)

test(n, "Schedule job for Threadpool each iteration", [&tp]
{
tp.schedule(f);
tp.scheduleOrThrowOnError(f);
tp.wait();
});
}
Expand All @@ -100,7 +100,7 @@ int main(int argc, char ** argv)

test(n, "Schedule job for Threadpool with 128 threads each iteration", [&tp]
{
tp.schedule(f);
tp.scheduleOrThrowOnError(f);
tp.wait();
});
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/AsynchronousBlockInputStream.cpp
Expand Up @@ -36,7 +36,7 @@ void AsynchronousBlockInputStream::next()
{
ready.reset();

pool.schedule([this, thread_group = CurrentThread::getGroup()] ()
pool.scheduleOrThrowOnError([this, thread_group = CurrentThread::getGroup()]()
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};

Expand Down

0 comments on commit 0046b9f

Please sign in to comment.