Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport #47161 to 22.3: Fix possible deadlock in QueryStatus #47355

Merged
merged 2 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
50 changes: 43 additions & 7 deletions src/Interpreters/ProcessList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,11 @@ QueryStatus::QueryStatus(

QueryStatus::~QueryStatus()
{
assert(executors.empty());
#if !defined(NDEBUG)
/// Check that all executors were invalidated.
for (const auto & e : executors)
assert(!e->executor);
#endif

if (auto * memory_tracker = getMemoryTracker())
{
Expand All @@ -350,15 +354,45 @@ QueryStatus::~QueryStatus()
}
}

void QueryStatus::ExecutorHolder::cancel()
{
std::lock_guard lock(mutex);
if (executor)
executor->cancel();
}

void QueryStatus::ExecutorHolder::remove()
{
std::lock_guard lock(mutex);
executor = nullptr;
}

CancellationCode QueryStatus::cancelQuery(bool)
{
if (is_killed.load())
return CancellationCode::CancelSent;

is_killed.store(true);

std::lock_guard lock(executors_mutex);
for (auto * e : executors)
std::vector<ExecutorHolderPtr> executors_snapshot;

{
/// Create a snapshot of executors under a mutex.
std::lock_guard lock(executors_mutex);
executors_snapshot = executors;
}

/// We should call cancel() for each executor with unlocked executors_mutex, because
/// cancel() can try to lock some internal mutex that is already locked by query executing
/// thread, and query executing thread can call removePipelineExecutor and lock executors_mutex,
/// which will lead to deadlock.
/// Note that the size and the content of executors cannot be changed while
/// executors_mutex is unlocked, because:
/// 1) We don't allow adding new executors while cancelling query in addPipelineExecutor
/// 2) We don't actually remove executor holder from executors in removePipelineExecutor,
/// just mark that executor is invalid.
/// So, it's ok to use a snapshot created above under a mutex, it won't be any differ from actual executors.
for (const auto & e : executors_snapshot)
e->cancel();

return CancellationCode::CancelSent;
Expand All @@ -367,15 +401,17 @@ CancellationCode QueryStatus::cancelQuery(bool)
void QueryStatus::addPipelineExecutor(PipelineExecutor * e)
{
std::lock_guard lock(executors_mutex);
assert(std::find(executors.begin(), executors.end(), e) == executors.end());
executors.push_back(e);
assert(std::find_if(executors.begin(), executors.end(), [e](const ExecutorHolderPtr & x){ return x->executor == e; }) == executors.end());
executors.push_back(std::make_shared<ExecutorHolder>(e));
}

void QueryStatus::removePipelineExecutor(PipelineExecutor * e)
{
std::lock_guard lock(executors_mutex);
assert(std::find(executors.begin(), executors.end(), e) != executors.end());
std::erase_if(executors, [e](PipelineExecutor * x) { return x == e; });
auto it = std::find_if(executors.begin(), executors.end(), [e](const ExecutorHolderPtr & x){ return x->executor == e; });
assert(it != executors.end());
/// Invalidate executor pointer inside holder, but don't remove holder from the executors (to avoid race with cancelQuery)
(*it)->remove();
}

bool QueryStatus::checkTimeLimit()
Expand Down
16 changes: 15 additions & 1 deletion src/Interpreters/ProcessList.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,22 @@ class QueryStatus : public WithContext

mutable std::mutex executors_mutex;

struct ExecutorHolder
{
ExecutorHolder(PipelineExecutor * e) : executor(e) {}

void cancel();

void remove();

PipelineExecutor * executor;
std::mutex mutex;
};

using ExecutorHolderPtr = std::shared_ptr<ExecutorHolder>;

/// Array of PipelineExecutors to be cancelled when a cancelQuery is received
std::vector<PipelineExecutor *> executors;
std::vector<ExecutorHolderPtr> executors;

enum QueryStreamsStatus
{
Expand Down
Empty file.
25 changes: 25 additions & 0 deletions tests/queries/0_stateless/02585_query_status_deadlock.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/usr/bin/env bash

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

QUERY_ID="${CLICKHOUSE_DATABASE}_test_02585_query_to_kill_id_1"

$CLICKHOUSE_CLIENT --query_id="$QUERY_ID" -n -q "
create temporary table tmp as select * from numbers(500000000);
select * from remote('127.0.0.2', 'system.numbers_mt') where number in (select * from tmp);" &> /dev/null &

$CLICKHOUSE_CLIENT -q "SYSTEM FLUSH LOGS"

while true
do
res=$($CLICKHOUSE_CLIENT -q "select query, event_time from system.query_log where query_id = '$QUERY_ID' and query like 'select%' limit 1")
if [ -n "$res" ]; then
break
fi
sleep 1
done

$CLICKHOUSE_CLIENT -q "kill query where query_id = '$QUERY_ID' sync" &> /dev/null