From 2f09476cccaad3f431701b22b62dcb2cab01ded8 Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Thu, 15 Sep 2022 19:30:47 +0200 Subject: [PATCH] Merge pull request #40901 from ClickHouse/backport/22.3/40732 Backport #40732 to 22.3: Fix memory leak while pushing to MVs w/o query context (from Kafka/...) --- src/Disks/IO/ThreadPoolReader.cpp | 18 ++++---- src/Disks/IO/ThreadPoolRemoteFSReader.cpp | 20 +++++---- src/IO/WriteBufferFromS3.cpp | 1 - src/Interpreters/ThreadStatusExt.cpp | 5 +++ .../Transforms/buildPushingToViewsChain.cpp | 45 ++++++++++--------- 5 files changed, 49 insertions(+), 40 deletions(-) diff --git a/src/Disks/IO/ThreadPoolReader.cpp b/src/Disks/IO/ThreadPoolReader.cpp index 21f73c3129e5..224b7d03b987 100644 --- a/src/Disks/IO/ThreadPoolReader.cpp +++ b/src/Disks/IO/ThreadPoolReader.cpp @@ -201,9 +201,9 @@ std::future ThreadPoolReader::submit(Request reques ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMiss); - ThreadGroupStatusPtr running_group = CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup() - ? CurrentThread::get().getThreadGroup() - : MainThreadStatus::getInstance().getThreadGroup(); + ThreadGroupStatusPtr running_group; + if (CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup()) + running_group = CurrentThread::get().getThreadGroup(); ContextPtr query_context; if (CurrentThread::isInitialized()) @@ -213,12 +213,17 @@ std::future ThreadPoolReader::submit(Request reques { ThreadStatus thread_status; - if (query_context) - thread_status.attachQueryContext(query_context); + SCOPE_EXIT({ + if (running_group) + thread_status.detachQuery(); + }); if (running_group) thread_status.attachQuery(running_group); + if (query_context) + thread_status.attachQueryContext(query_context); + setThreadName("ThreadPoolRead"); Stopwatch watch(CLOCK_MONOTONIC); @@ -253,9 +258,6 @@ std::future ThreadPoolReader::submit(Request reques ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMissElapsedMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); - if (running_group) - thread_status.detachQuery(); - return Result{ .size = bytes_read, .offset = request.ignore }; }); diff --git a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp index bdb012a6376f..7652b4211aa0 100644 --- a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp +++ b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp @@ -42,9 +42,9 @@ ThreadPoolRemoteFSReader::ThreadPoolRemoteFSReader(size_t pool_size, size_t queu std::future ThreadPoolRemoteFSReader::submit(Request request) { - ThreadGroupStatusPtr running_group = CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup() - ? CurrentThread::get().getThreadGroup() - : MainThreadStatus::getInstance().getThreadGroup(); + ThreadGroupStatusPtr running_group; + if (CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup()) + running_group = CurrentThread::get().getThreadGroup(); ContextPtr query_context; if (CurrentThread::isInitialized()) @@ -54,14 +54,19 @@ std::future ThreadPoolRemoteFSReader::submit(Reques { ThreadStatus thread_status; - /// Save query context if any, because cache implementation needs it. - if (query_context) - thread_status.attachQueryContext(query_context); + SCOPE_EXIT({ + if (running_group) + thread_status.detachQuery(); + }); /// To be able to pass ProfileEvents. if (running_group) thread_status.attachQuery(running_group); + /// Save query context if any, because cache implementation needs it. + if (query_context) + thread_status.attachQueryContext(query_context); + setThreadName("VFSRead"); CurrentMetrics::Increment metric_increment{CurrentMetrics::Read}; @@ -74,9 +79,6 @@ std::future ThreadPoolRemoteFSReader::submit(Reques ProfileEvents::increment(ProfileEvents::RemoteFSReadMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, bytes_read); - if (running_group) - thread_status.detachQuery(); - return Result{ .size = bytes_read, .offset = offset }; }); diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index eda7bb6f8aed..de7130319a7f 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -86,7 +86,6 @@ void WriteBufferFromS3::nextImpl() temporary_buffer->write(working_buffer.begin(), offset()); ProfileEvents::increment(ProfileEvents::S3WriteBytes, offset()); - last_part_size += offset(); /// Data size exceeds singlepart upload threshold, need to use multipart upload. diff --git a/src/Interpreters/ThreadStatusExt.cpp b/src/Interpreters/ThreadStatusExt.cpp index 2ea371d3d033..1f2a8e4681a8 100644 --- a/src/Interpreters/ThreadStatusExt.cpp +++ b/src/Interpreters/ThreadStatusExt.cpp @@ -408,6 +408,11 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits) query_context.reset(); thread_trace_context.trace_id = 0; thread_trace_context.span_id = 0; + + /// Avoid leaking of ThreadGroupStatus::finished_threads_counters_memory + /// (this is in case someone uses system thread but did not call getProfileEventsCountersAndMemoryForThreads()) + thread_group->getProfileEventsCountersAndMemoryForThreads(); + thread_group.reset(); thread_state = thread_exits ? ThreadState::Died : ThreadState::DetachedFromQuery; diff --git a/src/Processors/Transforms/buildPushingToViewsChain.cpp b/src/Processors/Transforms/buildPushingToViewsChain.cpp index 3cb5b16a2d0d..0d47494d22be 100644 --- a/src/Processors/Transforms/buildPushingToViewsChain.cpp +++ b/src/Processors/Transforms/buildPushingToViewsChain.cpp @@ -237,29 +237,30 @@ Chain buildPushingToViewsChain( ASTPtr query; Chain out; - /// If the materialized view is executed outside of a query, for example as a result of SYSTEM FLUSH LOGS or - /// SYSTEM FLUSH DISTRIBUTED ..., we can't attach to any thread group and we won't log, so there is no point on collecting metrics - std::unique_ptr view_thread_status_ptr = nullptr; - - ThreadGroupStatusPtr running_group = current_thread && current_thread->getThreadGroup() - ? current_thread->getThreadGroup() - : MainThreadStatus::getInstance().getThreadGroup(); - if (running_group) + ThreadGroupStatusPtr running_group; + if (current_thread && current_thread->getThreadGroup()) + running_group = current_thread->getThreadGroup(); + else + running_group = std::make_shared(); + + /// We are creating a ThreadStatus per view to store its metrics individually + /// Since calling ThreadStatus() changes current_thread we save it and restore it after the calls + /// Later on, before doing any task related to a view, we'll switch to its ThreadStatus, do the work, + /// and switch back to the original thread_status. + auto * original_thread = current_thread; + SCOPE_EXIT({ current_thread = original_thread; }); + + std::unique_ptr view_thread_status_ptr = std::make_unique(); + /// Disable query profiler for this ThreadStatus since the running (main query) thread should already have one + /// If we didn't disable it, then we could end up with N + 1 (N = number of dependencies) profilers which means + /// N times more interruptions + view_thread_status_ptr->disableProfiling(); + /// view_thread_status_ptr will be moved later (on and on), so need to capture raw pointer. + view_thread_status_ptr->deleter = [thread_status = view_thread_status_ptr.get(), running_group] { - /// We are creating a ThreadStatus per view to store its metrics individually - /// Since calling ThreadStatus() changes current_thread we save it and restore it after the calls - /// Later on, before doing any task related to a view, we'll switch to its ThreadStatus, do the work, - /// and switch back to the original thread_status. - auto * original_thread = current_thread; - SCOPE_EXIT({ current_thread = original_thread; }); - - view_thread_status_ptr = std::make_unique(); - /// Disable query profiler for this ThreadStatus since the running (main query) thread should already have one - /// If we didn't disable it, then we could end up with N + 1 (N = number of dependencies) profilers which means - /// N times more interruptions - view_thread_status_ptr->disableProfiling(); - view_thread_status_ptr->attachQuery(running_group); - } + thread_status->detachQuery(); + }; + view_thread_status_ptr->attachQuery(running_group); auto runtime_stats = std::make_unique(); runtime_stats->target_name = database_table.getFullTableName();