Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix max_distributed_connections #9673

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 3 additions & 6 deletions dbms/programs/server/TCPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ void TCPHandler::runImpl()
state.io.onFinish();
}
else if (state.io.pipeline.initialized())
processOrdinaryQueryWithProcessors(query_context->getSettingsRef().max_threads);
processOrdinaryQueryWithProcessors();
else
processOrdinaryQuery();

Expand Down Expand Up @@ -544,13 +544,10 @@ void TCPHandler::processOrdinaryQuery()
}


void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
void TCPHandler::processOrdinaryQueryWithProcessors()
{
auto & pipeline = state.io.pipeline;

/// Reduce the number of threads to recommended value.
num_threads = std::min(num_threads, pipeline.getNumThreads());

/// Send header-block, to allow client to prepare output format for data to send.
{
auto & header = pipeline.getHeader();
Expand Down Expand Up @@ -585,7 +582,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)

try
{
executor->execute(num_threads);
executor->execute(pipeline.getNumThreads());
azat marked this conversation as resolved.
Show resolved Hide resolved
}
catch (...)
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/programs/server/TCPHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class TCPHandler : public Poco::Net::TCPServerConnection
/// Process a request that does not require the receiving of data blocks from the client
void processOrdinaryQuery();

void processOrdinaryQueryWithProcessors(size_t num_threads);
void processOrdinaryQueryWithProcessors();

void processTablesStatusRequest();

Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,6 @@ QueryPipeline InterpreterSelectQuery::executeWithProcessors()
{
QueryPipeline query_pipeline;
executeImpl(query_pipeline, input, std::move(input_pipe), query_pipeline);
query_pipeline.setMaxThreads(max_streams);
query_pipeline.addInterpreterContext(context);
query_pipeline.addStorageHolder(storage);
return query_pipeline;
Expand Down Expand Up @@ -1291,6 +1290,7 @@ void InterpreterSelectQuery::executeFetchColumns(
{
is_remote = true;
max_streams = settings.max_distributed_connections;
pipeline.setMaxThreads(max_streams);
}

UInt64 max_block_size = settings.max_block_size;
Expand All @@ -1315,6 +1315,7 @@ void InterpreterSelectQuery::executeFetchColumns(
{
max_block_size = std::max(UInt64(1), limit_length + limit_offset);
max_streams = 1;
pipeline.setMaxThreads(max_streams);
}

if (!max_block_size)
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Interpreters/InterpreterSelectQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ class InterpreterSelectQuery : public IInterpreter

ASTPtr getQuery() const { return query_ptr; }

size_t getMaxStreams() const { return max_streams; }

private:
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
Expand Down Expand Up @@ -122,6 +124,9 @@ class InterpreterSelectQuery : public IInterpreter
BlockInputStreamPtr stream_with_non_joined_data;
bool union_stream = false;

/// Cache value of InterpreterSelectQuery::max_streams
size_t max_threads = 1;

BlockInputStreamPtr & firstStream() { return streams.at(0); }

template <typename Transform>
Expand All @@ -147,6 +152,10 @@ class InterpreterSelectQuery : public IInterpreter

bool hasDelayedStream() const { return stream_with_non_joined_data != nullptr; }
bool initialized() const { return !streams.empty(); }

/// Compatibility with QueryPipeline (Processors)
void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; }
size_t getNumThreads() const { return max_threads; }
};

template <typename TPipeline>
Expand Down
18 changes: 14 additions & 4 deletions dbms/src/Interpreters/InterpreterSelectWithUnionQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
const Names & required_result_column_names)
: options(options_),
query_ptr(query_ptr_),
context(std::make_shared<Context>(context_))
context(std::make_shared<Context>(context_)),
max_streams(context->getSettingsRef().max_threads)
{
const auto & ast = query_ptr->as<ASTSelectWithUnionQuery &>();

Expand Down Expand Up @@ -196,14 +197,23 @@ BlockInputStreams InterpreterSelectWithUnionQuery::executeWithMultipleStreams(Qu
parent_pipeline.addInterpreterContext(context);
}

/// Update max_streams due to:
/// - max_distributed_connections for Distributed() engine
/// - max_streams_to_max_threads_ratio
///
/// XXX: res.pipeline.getMaxThreads() cannot be used since it is capped to
/// number of streams, which is empty for non-Processors case.
max_streams = (*std::min_element(nested_interpreters.begin(), nested_interpreters.end(), [](const auto &a, const auto &b)
{
return a->getMaxStreams() < b->getMaxStreams();
}))->getMaxStreams();

return nested_streams;
}


BlockIO InterpreterSelectWithUnionQuery::execute()
{
const Settings & settings = context->getSettingsRef();

BlockIO res;
BlockInputStreams nested_streams = executeWithMultipleStreams(res.pipeline);
BlockInputStreamPtr result_stream;
Expand All @@ -219,7 +229,7 @@ BlockIO InterpreterSelectWithUnionQuery::execute()
}
else
{
result_stream = std::make_shared<UnionBlockInputStream>(nested_streams, nullptr, settings.max_threads);
result_stream = std::make_shared<UnionBlockInputStream>(nested_streams, nullptr, max_streams);
nested_streams.clear();
}

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Interpreters/InterpreterSelectWithUnionQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class InterpreterSelectWithUnionQuery : public IInterpreter

Block result_header;

size_t max_streams = 1;

static Block getCommonHeaderForUnion(const Blocks & headers);
};

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ void executeQuery(

{
auto executor = pipeline.execute();
executor->execute(context.getSettingsRef().max_threads);
executor->execute(pipeline.getNumThreads());
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/StorageDistributed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ Pipes StorageDistributed::read(
if (!smaller_cluster)
{
LOG_DEBUG(log, "Reading from " << table_id.getNameForLogs() <<
(has_sharding_key ? "" : "(no sharding key)") << ": "
(has_sharding_key ? "" : " (no sharding key)") << ": "
"Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - "
"the query will be sent to all shards of the cluster");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
0
0
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env bash

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh

opts=(
--max_distributed_connections 2
--max_threads 1
--query "SELECT sleepEachRow(1) FROM remote('127.{2,3}', system.one)"
)
# 1.8 less then 2 seconds, but long enough to cover possible load peaks
# "$@" left to pass manual options (like --experimental_use_processors 0) during manual testing
timeout 1.8s $CLICKHOUSE_CLIENT "${opts[@]}" "$@"
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
0
0
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env bash

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh

query="SELECT sleepEachRow(1) FROM remote('127.{2,3}', system.one)"
# 1.8 less then 2 seconds, but long enough to cover possible load peaks
timeout 1.8s ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_distributed_connections=2&max_threads=1" -d "$query"