Skip to content

Commit

Permalink
Fixed error when BackgroundSchedulePool is initialized in context of …
Browse files Browse the repository at this point in the history
…a query #2482
  • Loading branch information
alexey-milovidov committed Aug 22, 2018
1 parent fc9d335 commit ca5b83a
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 27 deletions.
1 change: 0 additions & 1 deletion dbms/src/Common/ThreadStatus.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ using InternalTextLogsQueueWeakPtr = std::weak_ptr<InternalTextLogsQueue>;
class ThreadGroupStatus
{
public:

mutable std::shared_mutex mutex;

ProfileEvents::Counters performance_counters{VariableContext::Process};
Expand Down
23 changes: 15 additions & 8 deletions dbms/src/Core/BackgroundSchedulePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,6 @@ BackgroundSchedulePool::BackgroundSchedulePool(size_t size)
{
LOG_INFO(&Logger::get("BackgroundSchedulePool"), "Create BackgroundSchedulePool with " << size << " threads");

/// Put all threads of both thread pools to one thread group
/// The master thread exits immediately
CurrentThread::initializeQuery();
thread_group = CurrentThread::getGroup();
CurrentThread::detachQuery();

threads.resize(size);
for (auto & thread : threads)
thread = std::thread([this] { threadFunction(); });
Expand Down Expand Up @@ -221,10 +215,23 @@ void BackgroundSchedulePool::threadFunction()
{
setThreadName("BackgrSchedPool");

{
std::lock_guard lock(delayed_tasks_mutex);

if (thread_group)
{
/// Put all threads to one thread pool
CurrentThread::attachTo(thread_group);
}
else
{
CurrentThread::initializeQuery();
thread_group = CurrentThread::getGroup();
}
}

/// Put all threads to one thread pool
CurrentThread::attachTo(thread_group);
SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });

CurrentThread::getMemoryTracker().setMetric(CurrentMetrics::MemoryTrackingInBackgroundSchedulePool);

while (!shutdown)
Expand Down
42 changes: 24 additions & 18 deletions dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ void BackgroundProcessingPoolTaskInfo::wake()
Poco::Timestamp current_time;

{
std::unique_lock<std::mutex> lock(pool.tasks_mutex);
std::unique_lock lock(pool.tasks_mutex);

auto next_time_to_execute = iterator->first;
auto this_task_handle = iterator->second;
Expand All @@ -58,12 +58,6 @@ BackgroundProcessingPool::BackgroundProcessingPool(int size_) : size(size_)
{
LOG_INFO(&Logger::get("BackgroundProcessingPool"), "Create BackgroundProcessingPool with " << size << " threads");

/// Put all threads to one thread group
/// The master thread exits immediately
CurrentThread::initializeQuery();
thread_group = CurrentThread::getGroup();
CurrentThread::detachQuery();

threads.resize(size);
for (auto & thread : threads)
thread = std::thread([this] { threadFunction(); });
Expand All @@ -77,7 +71,7 @@ BackgroundProcessingPool::TaskHandle BackgroundProcessingPool::addTask(const Tas
Poco::Timestamp current_time;

{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);
res->iterator = tasks.emplace(current_time, res);
}

Expand All @@ -93,11 +87,11 @@ void BackgroundProcessingPool::removeTask(const TaskHandle & task)

/// Wait for all executions of this task.
{
std::unique_lock<std::shared_mutex> wlock(task->rwlock);
std::unique_lock wlock(task->rwlock);
}

{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);
tasks.erase(task->iterator);
}
}
Expand All @@ -122,10 +116,22 @@ void BackgroundProcessingPool::threadFunction()
{
setThreadName("BackgrProcPool");

/// Put all threads to one thread pool
CurrentThread::attachTo(thread_group);
SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
{
std::lock_guard lock(tasks_mutex);

if (thread_group)
{
/// Put all threads to one thread pool
CurrentThread::attachTo(thread_group);
}
else
{
CurrentThread::initializeQuery();
thread_group = CurrentThread::getGroup();
}
}

SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
CurrentThread::getMemoryTracker().setMetric(CurrentMetrics::MemoryTrackingInBackgroundProcessingPool);

pcg64 rng(randomSeed());
Expand All @@ -141,7 +147,7 @@ void BackgroundProcessingPool::threadFunction()
Poco::Timestamp min_time;

{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);

if (!tasks.empty())
{
Expand All @@ -162,7 +168,7 @@ void BackgroundProcessingPool::threadFunction()

if (!task)
{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);
wake_event.wait_for(lock,
std::chrono::duration<double>(sleep_seconds
+ std::uniform_real_distribution<double>(0, sleep_seconds_random_part)(rng)));
Expand All @@ -173,12 +179,12 @@ void BackgroundProcessingPool::threadFunction()
Poco::Timestamp current_time;
if (min_time > current_time)
{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);
wake_event.wait_for(lock, std::chrono::microseconds(
min_time - current_time + std::uniform_int_distribution<uint64_t>(0, sleep_seconds_random_part * 1000000)(rng)));
}

std::shared_lock<std::shared_mutex> rlock(task->rwlock);
std::shared_lock rlock(task->rwlock);

if (task->removed)
continue;
Expand All @@ -202,7 +208,7 @@ void BackgroundProcessingPool::threadFunction()
Poco::Timestamp next_time_to_execute = Poco::Timestamp() + (done_work ? 0 : sleep_seconds * 1000000);

{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);

if (task->removed)
continue;
Expand Down

0 comments on commit ca5b83a

Please sign in to comment.