Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions src/Disks/IO/ThreadPoolReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,9 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques

ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMiss);

ThreadGroupStatusPtr running_group = CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup()
? CurrentThread::get().getThreadGroup()
: MainThreadStatus::getInstance().getThreadGroup();
ThreadGroupStatusPtr running_group;
if (CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup())
running_group = CurrentThread::get().getThreadGroup();

ContextPtr query_context;
if (CurrentThread::isInitialized())
Expand All @@ -213,12 +213,17 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
{
ThreadStatus thread_status;

if (query_context)
thread_status.attachQueryContext(query_context);
SCOPE_EXIT({
if (running_group)
thread_status.detachQuery();
});

if (running_group)
thread_status.attachQuery(running_group);

if (query_context)
thread_status.attachQueryContext(query_context);

setThreadName("ThreadPoolRead");

Stopwatch watch(CLOCK_MONOTONIC);
Expand Down Expand Up @@ -253,9 +258,6 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMissElapsedMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());

if (running_group)
thread_status.detachQuery();

return Result{ .size = bytes_read, .offset = request.ignore };
});

Expand Down
20 changes: 11 additions & 9 deletions src/Disks/IO/ThreadPoolRemoteFSReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ ThreadPoolRemoteFSReader::ThreadPoolRemoteFSReader(size_t pool_size, size_t queu

std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Request request)
{
ThreadGroupStatusPtr running_group = CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup()
? CurrentThread::get().getThreadGroup()
: MainThreadStatus::getInstance().getThreadGroup();
ThreadGroupStatusPtr running_group;
if (CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup())
running_group = CurrentThread::get().getThreadGroup();

ContextPtr query_context;
if (CurrentThread::isInitialized())
Expand All @@ -54,14 +54,19 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
{
ThreadStatus thread_status;

/// Save query context if any, because cache implementation needs it.
if (query_context)
thread_status.attachQueryContext(query_context);
SCOPE_EXIT({
if (running_group)
thread_status.detachQuery();
});

/// To be able to pass ProfileEvents.
if (running_group)
thread_status.attachQuery(running_group);

/// Save query context if any, because cache implementation needs it.
if (query_context)
thread_status.attachQueryContext(query_context);

setThreadName("VFSRead");

CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
Expand All @@ -74,9 +79,6 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
ProfileEvents::increment(ProfileEvents::RemoteFSReadMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, bytes_read);

if (running_group)
thread_status.detachQuery();

return Result{ .size = bytes_read, .offset = offset };
});

Expand Down
1 change: 0 additions & 1 deletion src/IO/WriteBufferFromS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ void WriteBufferFromS3::nextImpl()
temporary_buffer->write(working_buffer.begin(), offset());

ProfileEvents::increment(ProfileEvents::S3WriteBytes, offset());

last_part_size += offset();

/// Data size exceeds singlepart upload threshold, need to use multipart upload.
Expand Down
5 changes: 5 additions & 0 deletions src/Interpreters/ThreadStatusExt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,11 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
query_context.reset();
thread_trace_context.trace_id = 0;
thread_trace_context.span_id = 0;

/// Avoid leaking of ThreadGroupStatus::finished_threads_counters_memory
/// (this is in case someone uses system thread but did not call getProfileEventsCountersAndMemoryForThreads())
thread_group->getProfileEventsCountersAndMemoryForThreads();

thread_group.reset();

thread_state = thread_exits ? ThreadState::Died : ThreadState::DetachedFromQuery;
Expand Down
45 changes: 23 additions & 22 deletions src/Processors/Transforms/buildPushingToViewsChain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,29 +237,30 @@ Chain buildPushingToViewsChain(
ASTPtr query;
Chain out;

/// If the materialized view is executed outside of a query, for example as a result of SYSTEM FLUSH LOGS or
/// SYSTEM FLUSH DISTRIBUTED ..., we can't attach to any thread group and we won't log, so there is no point on collecting metrics
std::unique_ptr<ThreadStatus> view_thread_status_ptr = nullptr;

ThreadGroupStatusPtr running_group = current_thread && current_thread->getThreadGroup()
? current_thread->getThreadGroup()
: MainThreadStatus::getInstance().getThreadGroup();
if (running_group)
ThreadGroupStatusPtr running_group;
if (current_thread && current_thread->getThreadGroup())
running_group = current_thread->getThreadGroup();
else
running_group = std::make_shared<ThreadGroupStatus>();

/// We are creating a ThreadStatus per view to store its metrics individually
/// Since calling ThreadStatus() changes current_thread we save it and restore it after the calls
/// Later on, before doing any task related to a view, we'll switch to its ThreadStatus, do the work,
/// and switch back to the original thread_status.
auto * original_thread = current_thread;
SCOPE_EXIT({ current_thread = original_thread; });

std::unique_ptr<ThreadStatus> view_thread_status_ptr = std::make_unique<ThreadStatus>();
/// Disable query profiler for this ThreadStatus since the running (main query) thread should already have one
/// If we didn't disable it, then we could end up with N + 1 (N = number of dependencies) profilers which means
/// N times more interruptions
view_thread_status_ptr->disableProfiling();
/// view_thread_status_ptr will be moved later (on and on), so need to capture raw pointer.
view_thread_status_ptr->deleter = [thread_status = view_thread_status_ptr.get(), running_group]
{
/// We are creating a ThreadStatus per view to store its metrics individually
/// Since calling ThreadStatus() changes current_thread we save it and restore it after the calls
/// Later on, before doing any task related to a view, we'll switch to its ThreadStatus, do the work,
/// and switch back to the original thread_status.
auto * original_thread = current_thread;
SCOPE_EXIT({ current_thread = original_thread; });

view_thread_status_ptr = std::make_unique<ThreadStatus>();
/// Disable query profiler for this ThreadStatus since the running (main query) thread should already have one
/// If we didn't disable it, then we could end up with N + 1 (N = number of dependencies) profilers which means
/// N times more interruptions
view_thread_status_ptr->disableProfiling();
view_thread_status_ptr->attachQuery(running_group);
}
thread_status->detachQuery();
};
view_thread_status_ptr->attachQuery(running_group);

auto runtime_stats = std::make_unique<QueryViewsLogElement::ViewRuntimeStats>();
runtime_stats->target_name = database_table.getFullTableName();
Expand Down