From 0046b9f13730db5e8d2bb3d7ed3031c2b151fa28 Mon Sep 17 00:00:00 2001 From: tavplubix Date: Thu, 17 Oct 2019 17:41:27 +0300 Subject: [PATCH] Wait for jobs to finish on exception (fixes rare segfaults) (#7350) --- dbms/programs/benchmark/Benchmark.cpp | 21 +++++-- dbms/programs/copier/ClusterCopier.cpp | 4 +- dbms/programs/server/TCPHandler.cpp | 2 +- dbms/src/Common/ThreadPool.cpp | 4 +- dbms/src/Common/ThreadPool.h | 19 +++--- .../gtest_thread_pool_concurrent_wait.cpp | 4 +- .../tests/gtest_thread_pool_global_full.cpp | 10 +-- .../Common/tests/gtest_thread_pool_limit.cpp | 2 +- .../Common/tests/gtest_thread_pool_loop.cpp | 2 +- .../gtest_thread_pool_schedule_exception.cpp | 4 +- dbms/src/Common/tests/multi_version.cpp | 4 +- .../src/Common/tests/parallel_aggregation.cpp | 24 +++---- .../Common/tests/parallel_aggregation2.cpp | 6 +- .../Common/tests/thread_creation_latency.cpp | 6 +- .../AsynchronousBlockInputStream.cpp | 2 +- ...regatedMemoryEfficientBlockInputStream.cpp | 62 ++++++++++++------- .../PushingToViewsBlockOutputStream.cpp | 2 +- dbms/src/Databases/DatabaseOrdinary.cpp | 13 ++-- dbms/src/IO/AsynchronousWriteBuffer.h | 2 +- dbms/src/Interpreters/Aggregator.cpp | 6 +- .../Interpreters/InterpreterSystemQuery.cpp | 2 +- .../src/Interpreters/tests/internal_iotop.cpp | 2 +- .../Executors/ParallelPipelineExecutor.cpp | 2 +- dbms/src/Processors/tests/processors_test.cpp | 2 +- .../DistributedBlockOutputStream.cpp | 40 ++++++++---- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 4 +- dbms/src/Storages/StorageMergeTree.cpp | 2 +- utils/iotest/iotest.cpp | 2 +- utils/iotest/iotest_aio.cpp | 2 +- 29 files changed, 154 insertions(+), 103 deletions(-) diff --git a/dbms/programs/benchmark/Benchmark.cpp b/dbms/programs/benchmark/Benchmark.cpp index e685425eefc9..195f8c012700 100644 --- a/dbms/programs/benchmark/Benchmark.cpp +++ b/dbms/programs/benchmark/Benchmark.cpp @@ -274,15 +274,24 @@ class Benchmark : public Poco::Util::Application pcg64 generator(randomSeed()); std::uniform_int_distribution distribution(0, queries.size() - 1); - for (size_t i = 0; i < concurrency; ++i) + try { - EntryPtrs connection_entries; - connection_entries.reserve(connections.size()); + for (size_t i = 0; i < concurrency; ++i) + { + EntryPtrs connection_entries; + connection_entries.reserve(connections.size()); - for (const auto & connection : connections) - connection_entries.emplace_back(std::make_shared(connection->get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings)))); + for (const auto & connection : connections) + connection_entries.emplace_back(std::make_shared( + connection->get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings)))); - pool.schedule(std::bind(&Benchmark::thread, this, connection_entries)); + pool.scheduleOrThrowOnError(std::bind(&Benchmark::thread, this, connection_entries)); + } + } + catch (...) + { + pool.wait(); + throw; } InterruptListener interrupt_listener; diff --git a/dbms/programs/copier/ClusterCopier.cpp b/dbms/programs/copier/ClusterCopier.cpp index 7ac03dfdd3bb..5fc1d76b5422 100644 --- a/dbms/programs/copier/ClusterCopier.cpp +++ b/dbms/programs/copier/ClusterCopier.cpp @@ -895,7 +895,7 @@ class ClusterCopier ThreadPool thread_pool(num_threads ? num_threads : 2 * getNumberOfPhysicalCPUCores()); for (const TaskShardPtr & task_shard : task_table.all_shards) - thread_pool.schedule([this, timeouts, task_shard]() { discoverShardPartitions(timeouts, task_shard); }); + thread_pool.scheduleOrThrowOnError([this, timeouts, task_shard]() { discoverShardPartitions(timeouts, task_shard); }); LOG_DEBUG(log, "Waiting for " << thread_pool.active() << " setup jobs"); thread_pool.wait(); @@ -2038,7 +2038,7 @@ class ClusterCopier ThreadPool thread_pool(std::min(num_shards, getNumberOfPhysicalCPUCores())); for (UInt64 shard_index = 0; shard_index < num_shards; ++shard_index) - thread_pool.schedule([=] { do_for_shard(shard_index); }); + thread_pool.scheduleOrThrowOnError([=] { do_for_shard(shard_index); }); thread_pool.wait(); } diff --git a/dbms/programs/server/TCPHandler.cpp b/dbms/programs/server/TCPHandler.cpp index 58ef28d0c030..83749975f34f 100644 --- a/dbms/programs/server/TCPHandler.cpp +++ b/dbms/programs/server/TCPHandler.cpp @@ -565,7 +565,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads) auto executor = pipeline.execute(); std::atomic_bool exception = false; - pool.schedule([&]() + pool.scheduleOrThrowOnError([&]() { /// ThreadStatus thread_status; diff --git a/dbms/src/Common/ThreadPool.cpp b/dbms/src/Common/ThreadPool.cpp index b88106fab731..c1cad465ed2d 100644 --- a/dbms/src/Common/ThreadPool.cpp +++ b/dbms/src/Common/ThreadPool.cpp @@ -121,13 +121,13 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, int priority, std::opti } template -void ThreadPoolImpl::schedule(Job job, int priority) +void ThreadPoolImpl::scheduleOrThrowOnError(Job job, int priority) { scheduleImpl(std::move(job), priority, std::nullopt); } template -bool ThreadPoolImpl::trySchedule(Job job, int priority, uint64_t wait_microseconds) +bool ThreadPoolImpl::trySchedule(Job job, int priority, uint64_t wait_microseconds) noexcept { return scheduleImpl(std::move(job), priority, wait_microseconds); } diff --git a/dbms/src/Common/ThreadPool.h b/dbms/src/Common/ThreadPool.h index 2ced4626a1be..1b3071f732cd 100644 --- a/dbms/src/Common/ThreadPool.h +++ b/dbms/src/Common/ThreadPool.h @@ -36,18 +36,23 @@ class ThreadPoolImpl ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_); /// Add new job. Locks until number of scheduled jobs is less than maximum or exception in one of threads was thrown. - /// If an exception in some thread was thrown, method silently returns, and exception will be rethrown only on call to 'wait' function. + /// If any thread was throw an exception, first exception will be rethrown from this method, + /// and exception will be cleared. + /// Also throws an exception if cannot create thread. /// Priority: greater is higher. - void schedule(Job job, int priority = 0); + /// NOTE: Probably you should call wait() if exception was thrown. If some previously scheduled jobs are using some objects, + /// located on stack of current thread, the stack must not be unwinded until all jobs finished. However, + /// if ThreadPool is a local object, it will wait for all scheduled jobs in own destructor. + void scheduleOrThrowOnError(Job job, int priority = 0); - /// Wait for specified amount of time and schedule a job or return false. - bool trySchedule(Job job, int priority = 0, uint64_t wait_microseconds = 0); + /// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or return false. + bool trySchedule(Job job, int priority = 0, uint64_t wait_microseconds = 0) noexcept; - /// Wait for specified amount of time and schedule a job or throw an exception. + /// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or throw an exception. void scheduleOrThrow(Job job, int priority = 0, uint64_t wait_microseconds = 0); /// Wait for all currently active jobs to be done. - /// You may call schedule and wait many times in arbitary order. + /// You may call schedule and wait many times in arbitrary order. /// If any thread was throw an exception, first exception will be rethrown from this method, /// and exception will be cleared. void wait(); @@ -140,7 +145,7 @@ class ThreadFromGlobalPool explicit ThreadFromGlobalPool(Function && func, Args &&... args) : state(std::make_shared()) { - /// NOTE: If this will throw an exception, the descructor won't be called. + /// NOTE: If this will throw an exception, the destructor won't be called. GlobalThreadPool::instance().scheduleOrThrow([ state = state, func = std::forward(func), diff --git a/dbms/src/Common/tests/gtest_thread_pool_concurrent_wait.cpp b/dbms/src/Common/tests/gtest_thread_pool_concurrent_wait.cpp index 213e70ce3dd5..f5f14739e396 100644 --- a/dbms/src/Common/tests/gtest_thread_pool_concurrent_wait.cpp +++ b/dbms/src/Common/tests/gtest_thread_pool_concurrent_wait.cpp @@ -21,14 +21,14 @@ TEST(ThreadPool, ConcurrentWait) ThreadPool pool(num_threads); for (size_t i = 0; i < num_jobs; ++i) - pool.schedule(worker); + pool.scheduleOrThrowOnError(worker); constexpr size_t num_waiting_threads = 4; ThreadPool waiting_pool(num_waiting_threads); for (size_t i = 0; i < num_waiting_threads; ++i) - waiting_pool.schedule([&pool]{ pool.wait(); }); + waiting_pool.scheduleOrThrowOnError([&pool] { pool.wait(); }); waiting_pool.wait(); } diff --git a/dbms/src/Common/tests/gtest_thread_pool_global_full.cpp b/dbms/src/Common/tests/gtest_thread_pool_global_full.cpp index 597ed60baac7..583d43be1bbe 100644 --- a/dbms/src/Common/tests/gtest_thread_pool_global_full.cpp +++ b/dbms/src/Common/tests/gtest_thread_pool_global_full.cpp @@ -30,11 +30,11 @@ TEST(ThreadPool, GlobalFull1) ThreadPool pool(num_jobs); for (size_t i = 0; i < capacity; ++i) - pool.schedule(func); + pool.scheduleOrThrowOnError(func); for (size_t i = capacity; i < num_jobs; ++i) { - EXPECT_THROW(pool.schedule(func), DB::Exception); + EXPECT_THROW(pool.scheduleOrThrowOnError(func), DB::Exception); ++counter; } @@ -67,10 +67,10 @@ TEST(ThreadPool, GlobalFull2) ThreadPool pool(capacity, 0, capacity); for (size_t i = 0; i < capacity; ++i) - pool.schedule(func); + pool.scheduleOrThrowOnError(func); ThreadPool another_pool(1); - EXPECT_THROW(another_pool.schedule(func), DB::Exception); + EXPECT_THROW(another_pool.scheduleOrThrowOnError(func), DB::Exception); ++counter; @@ -79,7 +79,7 @@ TEST(ThreadPool, GlobalFull2) global_pool.wait(); for (size_t i = 0; i < capacity; ++i) - another_pool.schedule([&] { ++counter; }); + another_pool.scheduleOrThrowOnError([&] { ++counter; }); another_pool.wait(); EXPECT_EQ(counter, capacity * 2 + 1); diff --git a/dbms/src/Common/tests/gtest_thread_pool_limit.cpp b/dbms/src/Common/tests/gtest_thread_pool_limit.cpp index c18ff2e38ee2..bc67ffd0bc16 100644 --- a/dbms/src/Common/tests/gtest_thread_pool_limit.cpp +++ b/dbms/src/Common/tests/gtest_thread_pool_limit.cpp @@ -14,7 +14,7 @@ int test() std::atomic counter{0}; for (size_t i = 0; i < 10; ++i) - pool.schedule([&]{ ++counter; }); + pool.scheduleOrThrowOnError([&]{ ++counter; }); pool.wait(); return counter; diff --git a/dbms/src/Common/tests/gtest_thread_pool_loop.cpp b/dbms/src/Common/tests/gtest_thread_pool_loop.cpp index 63d4114b867a..159150446527 100644 --- a/dbms/src/Common/tests/gtest_thread_pool_loop.cpp +++ b/dbms/src/Common/tests/gtest_thread_pool_loop.cpp @@ -14,7 +14,7 @@ TEST(ThreadPool, Loop) size_t threads = 16; ThreadPool pool(threads); for (size_t j = 0; j < threads; ++j) - pool.schedule([&]{ ++res; }); + pool.scheduleOrThrowOnError([&] { ++res; }); pool.wait(); } diff --git a/dbms/src/Common/tests/gtest_thread_pool_schedule_exception.cpp b/dbms/src/Common/tests/gtest_thread_pool_schedule_exception.cpp index 52091a1ea7fb..373c9421e942 100644 --- a/dbms/src/Common/tests/gtest_thread_pool_schedule_exception.cpp +++ b/dbms/src/Common/tests/gtest_thread_pool_schedule_exception.cpp @@ -9,12 +9,12 @@ bool check() { ThreadPool pool(10); - pool.schedule([]{ throw std::runtime_error("Hello, world!"); }); + pool.scheduleOrThrowOnError([] { throw std::runtime_error("Hello, world!"); }); try { for (size_t i = 0; i < 100; ++i) - pool.schedule([]{}); /// An exception will be rethrown from this method. + pool.scheduleOrThrowOnError([] {}); /// An exception will be rethrown from this method. } catch (const std::runtime_error &) { diff --git a/dbms/src/Common/tests/multi_version.cpp b/dbms/src/Common/tests/multi_version.cpp index 0937e597e2da..a4645a16e6e3 100644 --- a/dbms/src/Common/tests/multi_version.cpp +++ b/dbms/src/Common/tests/multi_version.cpp @@ -37,8 +37,8 @@ int main(int, char **) ThreadPool tp(8); for (size_t i = 0; i < n; ++i) { - tp.schedule(std::bind(thread1, std::ref(x), std::ref(results[i]))); - tp.schedule(std::bind(thread2, std::ref(x), (rand() % 2) ? s1 : s2)); + tp.scheduleOrThrowOnError(std::bind(thread1, std::ref(x), std::ref(results[i]))); + tp.scheduleOrThrowOnError(std::bind(thread2, std::ref(x), (rand() % 2) ? s1 : s2)); } tp.wait(); diff --git a/dbms/src/Common/tests/parallel_aggregation.cpp b/dbms/src/Common/tests/parallel_aggregation.cpp index 4b3cc3006d7d..7ecb054b481f 100644 --- a/dbms/src/Common/tests/parallel_aggregation.cpp +++ b/dbms/src/Common/tests/parallel_aggregation.cpp @@ -284,7 +284,7 @@ int main(int argc, char ** argv) Stopwatch watch; for (size_t i = 0; i < num_threads; ++i) - pool.schedule(std::bind(aggregate1, + pool.scheduleOrThrowOnError(std::bind(aggregate1, std::ref(maps[i]), data.begin() + (data.size() * i) / num_threads, data.begin() + (data.size() * (i + 1)) / num_threads)); @@ -338,7 +338,7 @@ int main(int argc, char ** argv) Stopwatch watch; for (size_t i = 0; i < num_threads; ++i) - pool.schedule(std::bind(aggregate12, + pool.scheduleOrThrowOnError(std::bind(aggregate12, std::ref(maps[i]), data.begin() + (data.size() * i) / num_threads, data.begin() + (data.size() * (i + 1)) / num_threads)); @@ -397,7 +397,7 @@ int main(int argc, char ** argv) Stopwatch watch; for (size_t i = 0; i < num_threads; ++i) - pool.schedule(std::bind(aggregate1, + pool.scheduleOrThrowOnError(std::bind(aggregate1, std::ref(maps[i]), data.begin() + (data.size() * i) / num_threads, data.begin() + (data.size() * (i + 1)) / num_threads)); @@ -473,7 +473,7 @@ int main(int argc, char ** argv) Stopwatch watch; for (size_t i = 0; i < num_threads; ++i) - pool.schedule(std::bind(aggregate2, + pool.scheduleOrThrowOnError(std::bind(aggregate2, std::ref(maps[i]), data.begin() + (data.size() * i) / num_threads, data.begin() + (data.size() * (i + 1)) / num_threads)); @@ -499,7 +499,7 @@ int main(int argc, char ** argv) watch.restart(); for (size_t i = 0; i < MapTwoLevel::NUM_BUCKETS; ++i) - pool.schedule(std::bind(merge2, + pool.scheduleOrThrowOnError(std::bind(merge2, maps.data(), num_threads, i)); pool.wait(); @@ -527,7 +527,7 @@ int main(int argc, char ** argv) Stopwatch watch; for (size_t i = 0; i < num_threads; ++i) - pool.schedule(std::bind(aggregate22, + pool.scheduleOrThrowOnError(std::bind(aggregate22, std::ref(maps[i]), data.begin() + (data.size() * i) / num_threads, data.begin() + (data.size() * (i + 1)) / num_threads)); @@ -553,7 +553,7 @@ int main(int argc, char ** argv) watch.restart(); for (size_t i = 0; i < MapTwoLevel::NUM_BUCKETS; ++i) - pool.schedule(std::bind(merge2, maps.data(), num_threads, i)); + pool.scheduleOrThrowOnError(std::bind(merge2, maps.data(), num_threads, i)); pool.wait(); @@ -592,7 +592,7 @@ int main(int argc, char ** argv) Stopwatch watch; for (size_t i = 0; i < num_threads; ++i) - pool.schedule(std::bind(aggregate3, + pool.scheduleOrThrowOnError(std::bind(aggregate3, std::ref(local_maps[i]), std::ref(global_map), std::ref(mutex), @@ -658,7 +658,7 @@ int main(int argc, char ** argv) Stopwatch watch; for (size_t i = 0; i < num_threads; ++i) - pool.schedule(std::bind(aggregate33, + pool.scheduleOrThrowOnError(std::bind(aggregate33, std::ref(local_maps[i]), std::ref(global_map), std::ref(mutex), @@ -727,7 +727,7 @@ int main(int argc, char ** argv) Stopwatch watch; for (size_t i = 0; i < num_threads; ++i) - pool.schedule(std::bind(aggregate4, + pool.scheduleOrThrowOnError(std::bind(aggregate4, std::ref(local_maps[i]), std::ref(global_map), mutexes.data(), @@ -797,7 +797,7 @@ int main(int argc, char ** argv) Stopwatch watch; for (size_t i = 0; i < num_threads; ++i) - pool.schedule(std::bind(aggregate5, + pool.scheduleOrThrowOnError(std::bind(aggregate5, std::ref(local_maps[i]), std::ref(global_map), data.begin() + (data.size() * i) / num_threads, @@ -860,7 +860,7 @@ int main(int argc, char ** argv) Stopwatch watch; for (size_t i = 0; i < num_threads; ++i) - pool.schedule(std::bind(aggregate1, + pool.scheduleOrThrowOnError(std::bind(aggregate1, std::ref(maps[i]), data.begin() + (data.size() * i) / num_threads, data.begin() + (data.size() * (i + 1)) / num_threads)); diff --git a/dbms/src/Common/tests/parallel_aggregation2.cpp b/dbms/src/Common/tests/parallel_aggregation2.cpp index 7df230c56518..56eb34bbf0c7 100644 --- a/dbms/src/Common/tests/parallel_aggregation2.cpp +++ b/dbms/src/Common/tests/parallel_aggregation2.cpp @@ -42,7 +42,7 @@ struct AggregateIndependent auto end = data.begin() + (data.size() * (i + 1)) / num_threads; auto & map = *results[i]; - pool.schedule([&, begin, end]() + pool.scheduleOrThrowOnError([&, begin, end]() { for (auto it = begin; it != end; ++it) { @@ -85,7 +85,7 @@ struct AggregateIndependentWithSequentialKeysOptimization auto end = data.begin() + (data.size() * (i + 1)) / num_threads; auto & map = *results[i]; - pool.schedule([&, begin, end]() + pool.scheduleOrThrowOnError([&, begin, end]() { typename Map::LookupResult place = nullptr; Key prev_key {}; @@ -180,7 +180,7 @@ struct MergeParallelForTwoLevelTable ThreadPool & pool) { for (size_t bucket = 0; bucket < Map::NUM_BUCKETS; ++bucket) - pool.schedule([&, bucket, num_maps] + pool.scheduleOrThrowOnError([&, bucket, num_maps] { std::vector section(num_maps); for (size_t i = 0; i < num_maps; ++i) diff --git a/dbms/src/Common/tests/thread_creation_latency.cpp b/dbms/src/Common/tests/thread_creation_latency.cpp index 120f9bca2cd9..480199f211f3 100644 --- a/dbms/src/Common/tests/thread_creation_latency.cpp +++ b/dbms/src/Common/tests/thread_creation_latency.cpp @@ -66,7 +66,7 @@ int main(int argc, char ** argv) test(n, "Create and destroy ThreadPool each iteration", [] { ThreadPool tp(1); - tp.schedule(f); + tp.scheduleOrThrowOnError(f); tp.wait(); }); @@ -90,7 +90,7 @@ int main(int argc, char ** argv) test(n, "Schedule job for Threadpool each iteration", [&tp] { - tp.schedule(f); + tp.scheduleOrThrowOnError(f); tp.wait(); }); } @@ -100,7 +100,7 @@ int main(int argc, char ** argv) test(n, "Schedule job for Threadpool with 128 threads each iteration", [&tp] { - tp.schedule(f); + tp.scheduleOrThrowOnError(f); tp.wait(); }); } diff --git a/dbms/src/DataStreams/AsynchronousBlockInputStream.cpp b/dbms/src/DataStreams/AsynchronousBlockInputStream.cpp index 417df4c53b19..2aa70abd6920 100644 --- a/dbms/src/DataStreams/AsynchronousBlockInputStream.cpp +++ b/dbms/src/DataStreams/AsynchronousBlockInputStream.cpp @@ -36,7 +36,7 @@ void AsynchronousBlockInputStream::next() { ready.reset(); - pool.schedule([this, thread_group = CurrentThread::getGroup()] () + pool.scheduleOrThrowOnError([this, thread_group = CurrentThread::getGroup()]() { CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread}; diff --git a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp index 96ea9112e1d2..cfbf50526483 100644 --- a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp @@ -168,21 +168,28 @@ void MergingAggregatedMemoryEfficientBlockInputStream::start() else { size_t num_children = children.size(); - for (size_t i = 0; i < num_children; ++i) + try { - auto & child = children[i]; - - auto thread_group = CurrentThread::getGroup(); - reading_pool->schedule([&child, thread_group] + for (size_t i = 0; i < num_children; ++i) { - setThreadName("MergeAggReadThr"); - if (thread_group) - CurrentThread::attachToIfDetached(thread_group); - CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread}; - child->readPrefix(); - }); - } + auto & child = children[i]; + auto thread_group = CurrentThread::getGroup(); + reading_pool->scheduleOrThrowOnError([&child, thread_group] + { + setThreadName("MergeAggReadThr"); + if (thread_group) + CurrentThread::attachToIfDetached(thread_group); + CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread}; + child->readPrefix(); + }); + } + } + catch (...) + { + reading_pool->wait(); + throw; + } reading_pool->wait(); } @@ -194,7 +201,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::start() */ for (size_t i = 0; i < merging_threads; ++i) - pool.schedule([this, thread_group = CurrentThread::getGroup()] () { mergeThread(thread_group); }); + pool.scheduleOrThrowOnError([this, thread_group = CurrentThread::getGroup()]() { mergeThread(thread_group); }); } } @@ -475,22 +482,29 @@ MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregate } else { - for (auto & input : inputs) + try { - if (need_that_input(input)) + for (auto & input : inputs) { - auto thread_group = CurrentThread::getGroup(); - reading_pool->schedule([&input, &read_from_input, thread_group] + if (need_that_input(input)) { - setThreadName("MergeAggReadThr"); - if (thread_group) - CurrentThread::attachToIfDetached(thread_group); - CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread}; - read_from_input(input); - }); + auto thread_group = CurrentThread::getGroup(); + reading_pool->scheduleOrThrowOnError([&input, &read_from_input, thread_group] + { + setThreadName("MergeAggReadThr"); + if (thread_group) + CurrentThread::attachToIfDetached(thread_group); + CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread}; + read_from_input(input); + }); + } } } - + catch (...) + { + reading_pool->wait(); + throw; + } reading_pool->wait(); } diff --git a/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp b/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp index 6c3012a481ed..84a5bd782939 100644 --- a/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp +++ b/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp @@ -129,7 +129,7 @@ void PushingToViewsBlockOutputStream::write(const Block & block) for (size_t view_num = 0; view_num < views.size(); ++view_num) { auto thread_group = CurrentThread::getGroup(); - pool.schedule([=] + pool.scheduleOrThrowOnError([=] { setThreadName("PushingToViews"); if (thread_group) diff --git a/dbms/src/Databases/DatabaseOrdinary.cpp b/dbms/src/Databases/DatabaseOrdinary.cpp index 8dbeab29f41b..5987597e3e91 100644 --- a/dbms/src/Databases/DatabaseOrdinary.cpp +++ b/dbms/src/Databases/DatabaseOrdinary.cpp @@ -141,7 +141,7 @@ void DatabaseOrdinary::loadTables( for (const auto & file_name : file_names) { - pool.schedule([&]() { loadOneTable(file_name); }); + pool.scheduleOrThrowOnError([&]() { loadOneTable(file_name); }); } pool.wait(); @@ -174,11 +174,16 @@ void DatabaseOrdinary::startupTables(ThreadPool & thread_pool) } }; - for (const auto & table : tables) + try { - thread_pool.schedule([&]() { startupOneTable(table.second); }); + for (const auto & table : tables) + thread_pool.scheduleOrThrowOnError([&]() { startupOneTable(table.second); }); + } + catch (...) + { + thread_pool.wait(); + throw; } - thread_pool.wait(); } diff --git a/dbms/src/IO/AsynchronousWriteBuffer.h b/dbms/src/IO/AsynchronousWriteBuffer.h index c87777450e78..74b5804691ba 100644 --- a/dbms/src/IO/AsynchronousWriteBuffer.h +++ b/dbms/src/IO/AsynchronousWriteBuffer.h @@ -41,7 +41,7 @@ class AsynchronousWriteBuffer : public WriteBuffer swapBuffers(); /// The data will be written in separate stream. - pool.schedule([this] { thread(); }); + pool.scheduleOrThrowOnError([this] { thread(); }); } public: diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 53c706bf85d6..fc358c221899 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -1158,7 +1158,7 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl( tasks[bucket] = std::packaged_task(std::bind(converter, bucket, CurrentThread::getGroup())); if (thread_pool) - thread_pool->schedule([bucket, &tasks] { tasks[bucket](); }); + thread_pool->scheduleOrThrowOnError([bucket, &tasks] { tasks[bucket](); }); else tasks[bucket](); } @@ -1614,7 +1614,7 @@ class MergingAndConvertingBlockInputStream : public IBlockInputStream if (max_scheduled_bucket_num >= NUM_BUCKETS) return; - parallel_merge_data->pool.schedule(std::bind(&MergingAndConvertingBlockInputStream::thread, this, + parallel_merge_data->pool.scheduleOrThrowOnError(std::bind(&MergingAndConvertingBlockInputStream::thread, this, max_scheduled_bucket_num, CurrentThread::getGroup())); } @@ -1968,7 +1968,7 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari auto task = std::bind(merge_bucket, bucket, aggregates_pool, CurrentThread::getGroup()); if (thread_pool) - thread_pool->schedule(task); + thread_pool->scheduleOrThrowOnError(task); else task(); } diff --git a/dbms/src/Interpreters/InterpreterSystemQuery.cpp b/dbms/src/Interpreters/InterpreterSystemQuery.cpp index d4cdf10fd632..6da0b9333ac3 100644 --- a/dbms/src/Interpreters/InterpreterSystemQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSystemQuery.cpp @@ -327,7 +327,7 @@ void InterpreterSystemQuery::restartReplicas(Context & system_context) ThreadPool pool(std::min(size_t(getNumberOfPhysicalCPUCores()), replica_names.size())); for (auto & table : replica_names) - pool.schedule([&] () { tryRestartReplica(table.first, table.second, system_context); }); + pool.scheduleOrThrowOnError([&]() { tryRestartReplica(table.first, table.second, system_context); }); pool.wait(); } diff --git a/dbms/src/Interpreters/tests/internal_iotop.cpp b/dbms/src/Interpreters/tests/internal_iotop.cpp index 3a171c0c8458..75086796c429 100644 --- a/dbms/src/Interpreters/tests/internal_iotop.cpp +++ b/dbms/src/Interpreters/tests/internal_iotop.cpp @@ -140,7 +140,7 @@ try size_t num_threads = 2; ThreadPool pool(num_threads); for (size_t i = 0; i < num_threads; ++i) - pool.schedule([i]() { do_io(i); }); + pool.scheduleOrThrowOnError([i]() { do_io(i); }); pool.wait(); test_perf(); diff --git a/dbms/src/Processors/Executors/ParallelPipelineExecutor.cpp b/dbms/src/Processors/Executors/ParallelPipelineExecutor.cpp index 7f0969e64519..cf963e45a4a0 100644 --- a/dbms/src/Processors/Executors/ParallelPipelineExecutor.cpp +++ b/dbms/src/Processors/Executors/ParallelPipelineExecutor.cpp @@ -85,7 +85,7 @@ namespace DB // active_processors.insert(current_processor); // } // -// pool.schedule([processor = current_processor, &watch, this] +// pool.scheduleOrThrowOnError([processor = current_processor, &watch, this] // { // processor->work(); // { diff --git a/dbms/src/Processors/tests/processors_test.cpp b/dbms/src/Processors/tests/processors_test.cpp index 519eb79e0179..3e2e6abd1da7 100644 --- a/dbms/src/Processors/tests/processors_test.cpp +++ b/dbms/src/Processors/tests/processors_test.cpp @@ -88,7 +88,7 @@ class SleepyNumbersSource : public IProcessor void schedule(EventCounter & watch) override { active = true; - pool.schedule([&watch, this] + pool.scheduleOrThrowOnError([&watch, this] { usleep(sleep_useconds); current_chunk = generate(); diff --git a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp index b1b63258f06d..5dce68ec381b 100644 --- a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -339,11 +339,19 @@ void DistributedBlockOutputStream::writeSync(const Block & block) per_shard_jobs[current_selector[i]].shard_current_block_permuation.push_back(i); } - /// Run jobs in parallel for each block and wait them - finished_jobs_count = 0; - for (size_t shard_index : ext::range(0, shards_info.size())) - for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs) - pool->schedule(runWritingJob(job, block)); + try + { + /// Run jobs in parallel for each block and wait them + finished_jobs_count = 0; + for (size_t shard_index : ext::range(0, shards_info.size())) + for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs) + pool->scheduleOrThrowOnError(runWritingJob(job, block)); + } + catch (...) + { + pool->wait(); + throw; + } try { @@ -373,17 +381,27 @@ void DistributedBlockOutputStream::writeSuffix() if (insert_sync && pool) { finished_jobs_count = 0; - for (auto & shard_jobs : per_shard_jobs) - for (JobReplica & job : shard_jobs.replicas_jobs) + try + { + for (auto & shard_jobs : per_shard_jobs) { - if (job.stream) + for (JobReplica & job : shard_jobs.replicas_jobs) { - pool->schedule([&job] () + if (job.stream) { - job.stream->writeSuffix(); - }); + pool->scheduleOrThrowOnError([&job]() + { + job.stream->writeSuffix(); + }); + } } } + } + catch (...) + { + pool->wait(); + throw; + } try { diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index d31fc5cf3adf..af985c02927f 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -802,7 +802,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) for (size_t i = 0; i < part_names_with_disks.size(); ++i) { - pool.schedule([&, i] + pool.scheduleOrThrowOnError([&, i] { const auto & part_name = part_names_with_disks[i].first; const auto part_disk_ptr = part_names_with_disks[i].second; @@ -1155,7 +1155,7 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_re /// NOTE: Under heavy system load you may get "Cannot schedule a task" from ThreadPool. for (const DataPartPtr & part : parts_to_remove) { - pool.schedule([&] + pool.scheduleOrThrowOnError([&] { LOG_DEBUG(log, "Removing part from filesystem " << part->name); part->remove(); diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index daffbbd149f5..c752109e328d 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -216,7 +216,7 @@ std::vector StorageMergeTree::prepar }; if (thread_pool) - thread_pool->schedule(job); + thread_pool->scheduleOrThrowOnError(job); else job(); } diff --git a/utils/iotest/iotest.cpp b/utils/iotest/iotest.cpp index fac48aae00d1..8c17163778e3 100644 --- a/utils/iotest/iotest.cpp +++ b/utils/iotest/iotest.cpp @@ -150,7 +150,7 @@ int mainImpl(int argc, char ** argv) Stopwatch watch; for (size_t i = 0; i < threads; ++i) - pool.schedule(std::bind(thread, fd, mode, min_offset, max_offset, block_size, count)); + pool.scheduleOrThrowOnError(std::bind(thread, fd, mode, min_offset, max_offset, block_size, count)); pool.wait(); fsync(fd); diff --git a/utils/iotest/iotest_aio.cpp b/utils/iotest/iotest_aio.cpp index 8fb7459fd3bd..038bc600ec62 100644 --- a/utils/iotest/iotest_aio.cpp +++ b/utils/iotest/iotest_aio.cpp @@ -175,7 +175,7 @@ int mainImpl(int argc, char ** argv) Stopwatch watch; for (size_t i = 0; i < threads_count; ++i) - pool.schedule(std::bind(thread, fd, mode, min_offset, max_offset, block_size, buffers_count, count)); + pool.scheduleOrThrowOnError(std::bind(thread, fd, mode, min_offset, max_offset, block_size, buffers_count, count)); pool.wait(); watch.stop();