Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use one ThreadGroup while pushing to materialized views (and some refactoring for ThreadGroup) #48543

Merged
merged 3 commits into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Common/CurrentThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ void CurrentThread::attachInternalTextLogsQueue(const std::shared_ptr<InternalTe
}


ThreadGroupStatusPtr CurrentThread::getGroup()
ThreadGroupPtr CurrentThread::getGroup()
{
if (unlikely(!current_thread))
return nullptr;
Expand Down
6 changes: 3 additions & 3 deletions src/Common/CurrentThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class CurrentThread
static ThreadStatus & get();

/// Group to which belongs current thread
static ThreadGroupStatusPtr getGroup();
static ThreadGroupPtr getGroup();

/// A logs queue used by TCPHandler to pass logs to a client
static void attachInternalTextLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue,
Expand Down Expand Up @@ -69,9 +69,9 @@ class CurrentThread

/// You must call one of these methods when create a query child thread:
/// Add current thread to a group associated with the thread group
static void attachToGroup(const ThreadGroupStatusPtr & thread_group);
static void attachToGroup(const ThreadGroupPtr & thread_group);
/// Is useful for a ThreadPool tasks
static void attachToGroupIfDetached(const ThreadGroupStatusPtr & thread_group);
static void attachToGroupIfDetached(const ThreadGroupPtr & thread_group);

/// Non-master threads call this method in destructor automatically
static void detachFromGroupIfNotDetached();
Expand Down
6 changes: 3 additions & 3 deletions src/Common/ThreadStatus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ static thread_local ThreadStack alt_stack;
static thread_local bool has_alt_stack = false;
#endif

ThreadGroupStatus::ThreadGroupStatus()
ThreadGroup::ThreadGroup()
: master_thread_id(CurrentThread::get().thread_id)
{}

Expand Down Expand Up @@ -119,7 +119,7 @@ ThreadStatus::ThreadStatus()
#endif
}

ThreadGroupStatusPtr ThreadStatus::getThreadGroup() const
ThreadGroupPtr ThreadStatus::getThreadGroup() const
{
return thread_group;
}
Expand All @@ -139,7 +139,7 @@ ContextPtr ThreadStatus::getGlobalContext() const
return global_context.lock();
}

void ThreadGroupStatus::attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue, LogsLevel logs_level)
void ThreadGroup::attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue, LogsLevel logs_level)
{
std::lock_guard lock(mutex);
shared_data.logs_queue_ptr = logs_queue;
Expand Down
40 changes: 27 additions & 13 deletions src/Common/ThreadStatus.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class TaskStatsInfoGetter;
class InternalTextLogsQueue;
struct ViewRuntimeData;
class QueryViewsLog;
class ThreadGroupSwitcher;
using InternalTextLogsQueuePtr = std::shared_ptr<InternalTextLogsQueue>;
using InternalTextLogsQueueWeakPtr = std::weak_ptr<InternalTextLogsQueue>;

Expand All @@ -58,15 +57,15 @@ using ThreadStatusPtr = ThreadStatus *;
* Create via CurrentThread::initializeQuery (for queries) or directly (for various background tasks).
* Use via CurrentThread::getGroup.
*/
class ThreadGroupStatus;
using ThreadGroupStatusPtr = std::shared_ptr<ThreadGroupStatus>;
class ThreadGroup;
using ThreadGroupPtr = std::shared_ptr<ThreadGroup>;

class ThreadGroupStatus
class ThreadGroup
{
public:
ThreadGroupStatus();
ThreadGroup();
using FatalErrorCallback = std::function<void()>;
ThreadGroupStatus(ContextPtr query_context_, FatalErrorCallback fatal_error_callback_ = {});
ThreadGroup(ContextPtr query_context_, FatalErrorCallback fatal_error_callback_ = {});

/// The first thread created this thread group
const UInt64 master_thread_id;
Expand Down Expand Up @@ -104,9 +103,9 @@ class ThreadGroupStatus
void attachInternalProfileEventsQueue(const InternalProfileEventsQueuePtr & profile_queue);

/// When new query starts, new thread group is created for it, current thread becomes master thread of the query
static ThreadGroupStatusPtr createForQuery(ContextPtr query_context_, FatalErrorCallback fatal_error_callback_ = {});
static ThreadGroupPtr createForQuery(ContextPtr query_context_, FatalErrorCallback fatal_error_callback_ = {});

static ThreadGroupStatusPtr createForBackgroundProcess(ContextPtr storage_context);
static ThreadGroupPtr createForBackgroundProcess(ContextPtr storage_context);

std::vector<UInt64> getInvolvedThreadIds() const;
void linkThread(UInt64 thread_it);
Expand All @@ -120,6 +119,21 @@ class ThreadGroupStatus
std::unordered_set<UInt64> thread_ids;
};

/**
* Since merge is executed with multiple threads, this class
* switches the parent MemoryTracker as part of the thread group to account all the memory used.
*/
class ThreadGroupSwitcher : private boost::noncopyable
{
public:
explicit ThreadGroupSwitcher(ThreadGroupPtr thread_group);
~ThreadGroupSwitcher();

private:
ThreadGroupPtr prev_thread_group;
};


/**
* We use **constinit** here to tell the compiler the current_thread variable is initialized.
* If we didn't help the compiler, then it would most likely add a check before every use of the variable to initialize it if needed.
Expand Down Expand Up @@ -163,7 +177,7 @@ class ThreadStatus : public boost::noncopyable

private:
/// Group of threads, to which this thread attached
ThreadGroupStatusPtr thread_group;
ThreadGroupPtr thread_group;

/// Is set once
ContextWeakPtr global_context;
Expand All @@ -174,7 +188,7 @@ class ThreadStatus : public boost::noncopyable
using FatalErrorCallback = std::function<void()>;
FatalErrorCallback fatal_error_callback;

ThreadGroupStatus::SharedData local_data;
ThreadGroup::SharedData local_data;

bool performance_counters_finalized = false;

Expand Down Expand Up @@ -215,7 +229,7 @@ class ThreadStatus : public boost::noncopyable
ThreadStatus();
~ThreadStatus();

ThreadGroupStatusPtr getThreadGroup() const;
ThreadGroupPtr getThreadGroup() const;

const String & getQueryId() const;

Expand All @@ -239,7 +253,7 @@ class ThreadStatus : public boost::noncopyable
void setInternalThread();

/// Attaches slave thread to existing thread group
void attachToGroup(const ThreadGroupStatusPtr & thread_group_, bool check_detached = true);
void attachToGroup(const ThreadGroupPtr & thread_group_, bool check_detached = true);

/// Detaches thread from the thread group and the query, dumps performance counters if they have not been dumped
void detachFromGroup();
Expand Down Expand Up @@ -287,7 +301,7 @@ class ThreadStatus : public boost::noncopyable

void logToQueryThreadLog(QueryThreadLog & thread_log, const String & current_database);

void attachToGroupImpl(const ThreadGroupStatusPtr & thread_group_);
void attachToGroupImpl(const ThreadGroupPtr & thread_group_);
};

/**
Expand Down
4 changes: 2 additions & 2 deletions src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2315,7 +2315,7 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(

std::atomic<UInt32> next_bucket_to_merge = 0;

auto converter = [&](size_t thread_id, ThreadGroupStatusPtr thread_group)
auto converter = [&](size_t thread_id, ThreadGroupPtr thread_group)
{
SCOPE_EXIT_SAFE(
if (thread_group)
Expand Down Expand Up @@ -3043,7 +3043,7 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari

LOG_TRACE(log, "Merging partially aggregated two-level data.");

auto merge_bucket = [&bucket_to_blocks, &result, this](Int32 bucket, Arena * aggregates_pool, ThreadGroupStatusPtr thread_group)
auto merge_bucket = [&bucket_to_blocks, &result, this](Int32 bucket, Arena * aggregates_pool, ThreadGroupPtr thread_group)
{
SCOPE_EXIT_SAFE(
if (thread_group)
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/ExternalLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,7 @@ class ExternalLoader::LoadingDispatcher : private boost::noncopyable
}

/// Does the loading, possibly in the separate thread.
void doLoading(const String & name, size_t loading_id, bool forced_to_reload, size_t min_id_to_finish_loading_dependencies_, bool async, ThreadGroupStatusPtr thread_group = {})
void doLoading(const String & name, size_t loading_id, bool forced_to_reload, size_t min_id_to_finish_loading_dependencies_, bool async, ThreadGroupPtr thread_group = {})
{
SCOPE_EXIT_SAFE(
if (thread_group)
Expand Down
24 changes: 21 additions & 3 deletions src/Interpreters/InterpreterInsertQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <Storages/StorageMaterializedView.h>
#include <Storages/WindowView/StorageWindowView.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Common/ThreadStatus.h>
#include <Common/checkStackSize.h>


Expand Down Expand Up @@ -233,15 +234,22 @@ Chain InterpreterInsertQuery::buildChain(
ThreadStatusesHolderPtr thread_status_holder,
std::atomic_uint64_t * elapsed_counter_ms)
{
ThreadGroupPtr running_group;
if (current_thread)
running_group = current_thread->getThreadGroup();
if (!running_group)
running_group = std::make_shared<ThreadGroup>(getContext());

auto sample = getSampleBlock(columns, table, metadata_snapshot);
return buildChainImpl(table, metadata_snapshot, sample, thread_status_holder, elapsed_counter_ms);
return buildChainImpl(table, metadata_snapshot, sample, thread_status_holder, running_group, elapsed_counter_ms);
}

Chain InterpreterInsertQuery::buildChainImpl(
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block,
ThreadStatusesHolderPtr thread_status_holder,
ThreadGroupPtr running_group,
std::atomic_uint64_t * elapsed_counter_ms)
{
ThreadStatus * thread_status = current_thread;
Expand Down Expand Up @@ -273,7 +281,9 @@ Chain InterpreterInsertQuery::buildChainImpl(
}
else
{
out = buildPushingToViewsChain(table, metadata_snapshot, context_ptr, query_ptr, no_destination, thread_status_holder, elapsed_counter_ms);
out = buildPushingToViewsChain(table, metadata_snapshot, context_ptr,
query_ptr, no_destination,
thread_status_holder, running_group, elapsed_counter_ms);
}

/// Note that we wrap transforms one on top of another, so we write them in reverse of data processing order.
Expand Down Expand Up @@ -461,9 +471,17 @@ BlockIO InterpreterInsertQuery::execute()
pipeline = interpreter_watch.buildQueryPipeline();
}

ThreadGroupPtr running_group;
if (current_thread)
running_group = current_thread->getThreadGroup();
if (!running_group)
running_group = std::make_shared<ThreadGroup>(getContext());
for (size_t i = 0; i < out_streams_size; ++i)
{
auto out = buildChainImpl(table, metadata_snapshot, query_sample_block, nullptr, nullptr);
auto out = buildChainImpl(table, metadata_snapshot, query_sample_block,
/* thread_status_holder= */ nullptr,
running_group,
/* elapsed_counter_ms= */ nullptr);
out_chains.emplace_back(std::move(out));
}
}
Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/InterpreterInsertQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class InterpreterInsertQuery : public IInterpreter, WithContext
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block,
ThreadStatusesHolderPtr thread_status_holder,
ThreadGroupPtr running_group,
std::atomic_uint64_t * elapsed_counter_ms);
};

Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/ProcessList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ QueryStatus::QueryStatus(
const String & query_,
const ClientInfo & client_info_,
QueryPriorities::Handle && priority_handle_,
ThreadGroupStatusPtr && thread_group_,
ThreadGroupPtr && thread_group_,
IAST::QueryKind query_kind_,
UInt64 watch_start_nanoseconds)
: WithContext(context_)
Expand Down
4 changes: 2 additions & 2 deletions src/Interpreters/ProcessList.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class QueryStatus : public WithContext
ClientInfo client_info;

/// Info about all threads involved in query execution
ThreadGroupStatusPtr thread_group;
ThreadGroupPtr thread_group;

Stopwatch watch;

Expand Down Expand Up @@ -162,7 +162,7 @@ class QueryStatus : public WithContext
const String & query_,
const ClientInfo & client_info_,
QueryPriorities::Handle && priority_handle_,
ThreadGroupStatusPtr && thread_group_,
ThreadGroupPtr && thread_group_,
IAST::QueryKind query_kind_,
UInt64 watch_start_nanoseconds);

Expand Down