Skip to content

Commit

Permalink
Merge branch 'global-thread-pool' of github.com:yandex/ClickHouse int…
Browse files Browse the repository at this point in the history
…o global-thread-pool
  • Loading branch information
alexey-milovidov committed Jan 14, 2019
2 parents aec5570 + 00a4b2c commit 35c35f1
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 111 deletions.
64 changes: 14 additions & 50 deletions dbms/src/Common/CurrentThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "CurrentThread.h"
#include <common/logger_useful.h>
#include <common/likely.h>
#include <Common/ThreadStatus.h>
#include <Common/TaskStatsInfoGetter.h>
#include <Interpreters/ProcessList.h>
Expand All @@ -10,11 +11,6 @@
#include <Poco/Logger.h>


#if defined(ARCADIA_ROOT)
# include <util/thread/singleton.h>
#endif


namespace DB
{

Expand All @@ -23,91 +19,59 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}

// Smoker's implementation to avoid thread_local usage: error: undefined symbol: __cxa_thread_atexit
#if defined(ARCADIA_ROOT)
struct ThreadStatusPtrHolder : ThreadStatusPtr
{
ThreadStatusPtrHolder() { ThreadStatusPtr::operator=(ThreadStatus::create()); }
};
struct ThreadScopePtrHolder : CurrentThread::ThreadScopePtr
{
ThreadScopePtrHolder() { CurrentThread::ThreadScopePtr::operator=(std::make_shared<CurrentThread::ThreadScope>()); }
};
# define current_thread (*FastTlsSingleton<ThreadStatusPtrHolder>())
# define current_thread_scope (*FastTlsSingleton<ThreadScopePtrHolder>())
#else
/// Order of current_thread and current_thread_scope matters
thread_local ThreadStatusPtr _current_thread = ThreadStatus::create();
thread_local CurrentThread::ThreadScopePtr _current_thread_scope = std::make_shared<CurrentThread::ThreadScope>();
# define current_thread _current_thread
# define current_thread_scope _current_thread_scope
#endif

void CurrentThread::updatePerformanceCounters()
{
get()->updatePerformanceCounters();
get().updatePerformanceCounters();
}

ThreadStatusPtr CurrentThread::get()
ThreadStatus & CurrentThread::get()
{
#ifndef NDEBUG
if (!current_thread || current_thread.use_count() <= 0)
if (unlikely(!current_thread))
throw Exception("Thread #" + std::to_string(Poco::ThreadNumber::get()) + " status was not initialized", ErrorCodes::LOGICAL_ERROR);

if (Poco::ThreadNumber::get() != current_thread->thread_number)
throw Exception("Current thread has different thread number", ErrorCodes::LOGICAL_ERROR);
#endif

return current_thread;
}

CurrentThread::ThreadScopePtr CurrentThread::getScope()
{
return current_thread_scope;
return *current_thread;
}

ProfileEvents::Counters & CurrentThread::getProfileEvents()
{
return current_thread->performance_counters;
return get().performance_counters;
}

MemoryTracker & CurrentThread::getMemoryTracker()
{
return current_thread->memory_tracker;
return get().memory_tracker;
}

void CurrentThread::updateProgressIn(const Progress & value)
{
current_thread->progress_in.incrementPiecewiseAtomically(value);
get().progress_in.incrementPiecewiseAtomically(value);
}

void CurrentThread::updateProgressOut(const Progress & value)
{
current_thread->progress_out.incrementPiecewiseAtomically(value);
get().progress_out.incrementPiecewiseAtomically(value);
}

void CurrentThread::attachInternalTextLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue)
{
get()->attachInternalTextLogsQueue(logs_queue);
get().attachInternalTextLogsQueue(logs_queue);
}

std::shared_ptr<InternalTextLogsQueue> CurrentThread::getInternalTextLogsQueue()
{
/// NOTE: this method could be called at early server startup stage
/// NOTE: this method could be called in ThreadStatus destructor, therefore we make use_count() check just in case

if (!current_thread || current_thread.use_count() <= 0)
if (!current_thread)
return nullptr;

if (current_thread->getCurrentState() == ThreadStatus::ThreadState::Died)
if (get().getCurrentState() == ThreadStatus::ThreadState::Died)
return nullptr;

return current_thread->getInternalTextLogsQueue();
return get().getInternalTextLogsQueue();
}

ThreadGroupStatusPtr CurrentThread::getGroup()
{
return get()->getThreadGroup();
return get().getThreadGroup();
}

}
21 changes: 1 addition & 20 deletions dbms/src/Common/CurrentThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class CurrentThread
{
public:
/// Handler to current thread
static ThreadStatusPtr get();
static ThreadStatus & get();

/// Group to which belongs current thread
static ThreadGroupStatusPtr getGroup();
Expand Down Expand Up @@ -85,25 +85,6 @@ class CurrentThread
bool log_peak_memory_usage_in_destructor = true;
};

/// Implicitly finalizes current thread in the destructor
class ThreadScope
{
public:
void (*deleter)() = nullptr;

ThreadScope() = default;
~ThreadScope()
{
if (deleter)
deleter();

/// std::terminate on exception: this is Ok.
}
};

using ThreadScopePtr = std::shared_ptr<ThreadScope>;
static ThreadScopePtr getScope();

private:
static void defaultThreadDeleter();
};
Expand Down
15 changes: 10 additions & 5 deletions dbms/src/Common/ThreadStatus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ namespace ErrorCodes
}


thread_local ThreadStatusPtr current_thread = nullptr;


TasksStatsCounters TasksStatsCounters::current()
{
TasksStatsCounters res;
CurrentThread::get()->taskstats_getter->getStat(res.stat, CurrentThread::get()->os_thread_id);
CurrentThread::get().taskstats_getter->getStat(res.stat, CurrentThread::get().os_thread_id);
return res;
}

Expand All @@ -39,17 +42,19 @@ ThreadStatus::ThreadStatus()
memory_tracker.setDescription("(for thread)");
log = &Poco::Logger::get("ThreadStatus");

current_thread = this;

/// NOTE: It is important not to do any non-trivial actions (like updating ProfileEvents or logging) before ThreadStatus is created
/// Otherwise it could lead to SIGSEGV due to current_thread dereferencing
}

ThreadStatusPtr ThreadStatus::create()
ThreadStatus::~ThreadStatus()
{
return ThreadStatusPtr(new ThreadStatus);
if (deleter)
deleter();
current_thread = nullptr;
}

ThreadStatus::~ThreadStatus() = default;

void ThreadStatus::initPerformanceCounters()
{
performance_counters_finalized = false;
Expand Down
22 changes: 13 additions & 9 deletions dbms/src/Common/ThreadStatus.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <map>
#include <mutex>
#include <shared_mutex>
#include <functional>
#include <boost/noncopyable.hpp>


namespace Poco
Expand All @@ -23,7 +25,7 @@ namespace DB
class Context;
class QueryStatus;
class ThreadStatus;
using ThreadStatusPtr = std::shared_ptr<ThreadStatus>;
using ThreadStatusPtr = ThreadStatus*;
class QueryThreadLog;
struct TasksStatsCounters;
struct RUsageCounters;
Expand Down Expand Up @@ -67,14 +69,20 @@ class ThreadGroupStatus
using ThreadGroupStatusPtr = std::shared_ptr<ThreadGroupStatus>;


extern thread_local ThreadStatusPtr current_thread;

/** Encapsulates all per-thread info (ProfileEvents, MemoryTracker, query_id, query context, etc.).
* Used inside thread-local variable. See variables in CurrentThread.cpp
* The object must be created in thread function and destroyed in the same thread before the exit.
* It is accessed through thread-local pointer.
*
* This object should be used only via "CurrentThread", see CurrentThread.h
*/
class ThreadStatus : public std::enable_shared_from_this<ThreadStatus>
class ThreadStatus : public boost::noncopyable
{
public:
ThreadStatus();
~ThreadStatus();

/// Poco's thread number (the same number is used in logs)
UInt32 thread_number = 0;
/// Linux's PID (or TGID) (the same id is shown by ps util)
Expand All @@ -88,8 +96,8 @@ class ThreadStatus : public std::enable_shared_from_this<ThreadStatus>
Progress progress_in;
Progress progress_out;

public:
static ThreadStatusPtr create();
using Deleter = std::function<void()>;
Deleter deleter;

ThreadGroupStatusPtr getThreadGroup() const
{
Expand Down Expand Up @@ -136,11 +144,7 @@ class ThreadStatus : public std::enable_shared_from_this<ThreadStatus>
/// Detaches thread from the thread group and the query, dumps performance counters if they have not been dumped
void detachQuery(bool exit_if_already_detached = false, bool thread_exits = false);

~ThreadStatus();

protected:
ThreadStatus();

void initPerformanceCounters();

void logToQueryThreadLog(QueryThreadLog & thread_log);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ target_link_libraries (thread_pool_2 PRIVATE clickhouse_common_io)

add_executable (multi_version multi_version.cpp)
target_link_libraries (multi_version PRIVATE clickhouse_common_io)
add_check(multi_version)

add_executable (array_cache array_cache.cpp)
target_link_libraries (array_cache PRIVATE clickhouse_common_io)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Common/tests/thread_pool_2.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include <atomic>
#include <iostream>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>


int main(int, char **)
Expand Down
33 changes: 16 additions & 17 deletions dbms/src/Interpreters/ThreadStatusExt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ String ThreadStatus::getQueryID()

void CurrentThread::defaultThreadDeleter()
{
ThreadStatus & thread = *CurrentThread::get();
ThreadStatus & thread = CurrentThread::get();
LOG_TRACE(thread.log, "Thread " << thread.thread_number << " exited");
thread.detachQuery(true, true);
}
Expand All @@ -51,8 +51,8 @@ void ThreadStatus::initializeQuery()
memory_tracker.setParent(&thread_group->memory_tracker);
thread_group->memory_tracker.setDescription("(for query)");

thread_group->master_thread = shared_from_this();
thread_group->thread_statuses.emplace(thread_number, shared_from_this());
thread_group->master_thread = this;
thread_group->thread_statuses.emplace(thread_number, this);

initPerformanceCounters();
thread_state = ThreadState::AttachedToQuery;
Expand Down Expand Up @@ -87,7 +87,7 @@ void ThreadStatus::attachQuery(const ThreadGroupStatusPtr & thread_group_, bool
if (!global_context)
global_context = thread_group->global_context;

if (!thread_group->thread_statuses.emplace(thread_number, shared_from_this()).second)
if (!thread_group->thread_statuses.emplace(thread_number, this).second)
throw Exception("Thread " + std::to_string(thread_number) + " is attached twice", ErrorCodes::LOGICAL_ERROR);
}

Expand Down Expand Up @@ -193,48 +193,47 @@ void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log)

void CurrentThread::initializeQuery()
{
get()->initializeQuery();
getScope()->deleter = CurrentThread::defaultThreadDeleter;
get().initializeQuery();
get().deleter = CurrentThread::defaultThreadDeleter;
}

void CurrentThread::attachTo(const ThreadGroupStatusPtr & thread_group)
{
get()->attachQuery(thread_group, true);
getScope()->deleter = CurrentThread::defaultThreadDeleter;
get().attachQuery(thread_group, true);
get().deleter = CurrentThread::defaultThreadDeleter;
}

void CurrentThread::attachToIfDetached(const ThreadGroupStatusPtr & thread_group)
{
get()->attachQuery(thread_group, false);
getScope()->deleter = CurrentThread::defaultThreadDeleter;
get().attachQuery(thread_group, false);
get().deleter = CurrentThread::defaultThreadDeleter;
}

std::string CurrentThread::getCurrentQueryID()
{
if (!get() || get().use_count() <= 0)
if (!current_thread)
return {};

return get()->getQueryID();
return get().getQueryID();
}

void CurrentThread::attachQueryContext(Context & query_context)
{
return get()->attachQueryContext(query_context);
return get().attachQueryContext(query_context);
}

void CurrentThread::finalizePerformanceCounters()
{
get()->finalizePerformanceCounters();
get().finalizePerformanceCounters();
}

void CurrentThread::detachQuery()
{
get()->detachQuery(false);
get().detachQuery(false);
}

void CurrentThread::detachQueryIfNotDetached()
{
get()->detachQuery(true);
get().detachQuery(true);
}


Expand Down
1 change: 0 additions & 1 deletion libs/libcommon/src/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ target_link_libraries (date_lut4 common ${PLATFORM_LIBS})
target_link_libraries (date_lut_default_timezone common ${PLATFORM_LIBS})
target_link_libraries (local_date_time_comparison common)
target_link_libraries (realloc-perf common)
add_check(multi_version)
add_check(local_date_time_comparison)

add_executable (unit_tests_libcommon gtest_json_test.cpp gtest_strong_typedef.cpp gtest_find_symbols.cpp)
Expand Down
2 changes: 1 addition & 1 deletion utils/iotest/iotest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#include <Poco/Exception.h>
#include <Common/Exception.h>
#include <Common/randomSeed.h>
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
#include <Common/Stopwatch.h>
#include <IO/BufferWithOwnMemory.h>
#include <cstdlib>
Expand Down

0 comments on commit 35c35f1

Please sign in to comment.