Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed the possibility of hanging queries when server is overloaded #6301

Merged
merged 3 commits into from Aug 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 32 additions & 2 deletions dbms/src/Common/ThreadPool.cpp
@@ -1,7 +1,6 @@
#include <Common/ThreadPool.h>
#include <Common/Exception.h>

#include <iostream>
#include <type_traits>


Expand Down Expand Up @@ -34,6 +33,28 @@ ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads, size_t max_free_threa
{
}

template <typename Thread>
void ThreadPoolImpl<Thread>::setMaxThreads(size_t value)
{
std::lock_guard lock(mutex);
max_threads = value;
}

template <typename Thread>
void ThreadPoolImpl<Thread>::setMaxFreeThreads(size_t value)
{
std::lock_guard lock(mutex);
max_free_threads = value;
}

template <typename Thread>
void ThreadPoolImpl<Thread>::setQueueSize(size_t value)
{
std::lock_guard lock(mutex);
queue_size = value;
}


template <typename Thread>
template <typename ReturnType>
ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::optional<uint64_t> wait_microseconds)
Expand All @@ -59,7 +80,7 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::opti

auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; };

if (wait_microseconds)
if (wait_microseconds) /// Check for optional. Condition is true if the optional is set and the value is zero.
{
if (!job_finished.wait_for(lock, std::chrono::microseconds(*wait_microseconds), pred))
return on_error();
Expand All @@ -83,6 +104,15 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::opti
catch (...)
{
threads.pop_front();

/// Remove the job and return error to caller.
/// Note that if we have allocated at least one thread, we may continue
/// (one thread is enough to process all jobs).
/// But this condition indicate an error nevertheless and better to refuse.

jobs.pop();
--scheduled_jobs;
return on_error();
}
}
}
Expand Down
10 changes: 7 additions & 3 deletions dbms/src/Common/ThreadPool.h
Expand Up @@ -60,14 +60,18 @@ class ThreadPoolImpl
/// Returns number of running and scheduled jobs.
size_t active() const;

void setMaxThreads(size_t value);
void setMaxFreeThreads(size_t value);
void setQueueSize(size_t value);

private:
mutable std::mutex mutex;
std::condition_variable job_finished;
std::condition_variable new_job_or_shutdown;

const size_t max_threads;
const size_t max_free_threads;
const size_t queue_size;
size_t max_threads;
size_t max_free_threads;
size_t queue_size;

size_t scheduled_jobs = 0;
bool shutdown = false;
Expand Down
89 changes: 89 additions & 0 deletions dbms/src/Common/tests/gtest_thread_pool_global_full.cpp
@@ -0,0 +1,89 @@
#include <atomic>

#include <Common/ThreadPool.h>

#include <gtest/gtest.h>


/// Test what happens if local ThreadPool cannot create a ThreadFromGlobalPool.
/// There was a bug: if local ThreadPool cannot allocate even a single thread,
/// the job will be scheduled but never get executed.


TEST(ThreadPool, GlobalFull1)
{
GlobalThreadPool & global_pool = GlobalThreadPool::instance();

static constexpr size_t capacity = 5;

global_pool.setMaxThreads(capacity);
global_pool.setMaxFreeThreads(1);
global_pool.setQueueSize(capacity);
global_pool.wait();

std::atomic<size_t> counter = 0;
static constexpr size_t num_jobs = capacity + 1;

auto func = [&] { ++counter; while (counter != num_jobs) {} };

ThreadPool pool(num_jobs);

for (size_t i = 0; i < capacity; ++i)
pool.schedule(func);

for (size_t i = capacity; i < num_jobs; ++i)
{
EXPECT_THROW(pool.schedule(func), DB::Exception);
++counter;
}

pool.wait();
EXPECT_EQ(counter, num_jobs);

global_pool.setMaxThreads(10000);
global_pool.setMaxFreeThreads(1000);
global_pool.setQueueSize(10000);
}


TEST(ThreadPool, GlobalFull2)
{
GlobalThreadPool & global_pool = GlobalThreadPool::instance();

static constexpr size_t capacity = 5;

global_pool.setMaxThreads(capacity);
global_pool.setMaxFreeThreads(1);
global_pool.setQueueSize(capacity);

/// ThreadFromGlobalPool from local thread pools from previous test case have exited
/// but their threads from global_pool may not have finished (they still have to exit).
/// If we will not wait here, we can get "Cannot schedule a task exception" earlier than we expect in this test.
global_pool.wait();

std::atomic<size_t> counter = 0;
auto func = [&] { ++counter; while (counter != capacity + 1) {} };

ThreadPool pool(capacity, 0, capacity);
for (size_t i = 0; i < capacity; ++i)
pool.schedule(func);

ThreadPool another_pool(1);
EXPECT_THROW(another_pool.schedule(func), DB::Exception);

++counter;

pool.wait();

global_pool.wait();

for (size_t i = 0; i < capacity; ++i)
another_pool.schedule([&] { ++counter; });

another_pool.wait();
EXPECT_EQ(counter, capacity * 2 + 1);

global_pool.setMaxThreads(10000);
global_pool.setMaxFreeThreads(1000);
global_pool.setQueueSize(10000);
}