Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make system.replicas parallel #43998

Merged
merged 6 commits into from Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Core/Settings.h
Expand Up @@ -155,6 +155,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
\
M(Bool, allow_experimental_parallel_reading_from_replicas, false, "If true, ClickHouse will send a SELECT query to all replicas of a table. It will work for any kind on MergeTree table.", 0) \
\
M(UInt64, system_replicas_fetch_threads, 16, "The maximum number of threads to fetch data for system.replicas table.", 0) \
evillique marked this conversation as resolved.
Show resolved Hide resolved
\
M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards and nodes unresolvable through DNS. Shard is marked as unavailable when none of the replicas can be reached.", 0) \
\
M(UInt64, parallel_distributed_insert_select, 0, "Process distributed INSERT SELECT query in the same cluster on local tables on every shard; if set to 1 - SELECT is executed on each shard; if set to 2 - SELECT and INSERT are executed on each shard", 0) \
Expand Down
30 changes: 24 additions & 6 deletions src/Storages/System/StorageSystemReplicas.cpp
Expand Up @@ -151,14 +151,32 @@ Pipe StorageSystemReplicas::read(

MutableColumns res_columns = storage_snapshot->metadata->getSampleBlock().cloneEmptyColumns();

for (size_t i = 0, size = col_database->size(); i < size; ++i)
auto settings = context->getSettingsRef();
size_t thread_pool_size = settings.system_replicas_fetch_threads;

if (settings.max_threads != 0)
thread_pool_size = std::min(thread_pool_size, static_cast<size_t>(settings.max_threads));

ThreadPool thread_pool(thread_pool_size);

size_t tables_size = col_database->size();
std::vector<StorageReplicatedMergeTree::Status> statuses(tables_size);

for (size_t i = 0; i < tables_size; ++i)
{
StorageReplicatedMergeTree::Status status;
dynamic_cast<StorageReplicatedMergeTree &>(
*replicated_tables
[(*col_database)[i].safeGet<const String &>()]
[(*col_table)[i].safeGet<const String &>()]).getStatus(status, with_zk_fields);
thread_pool.scheduleOrThrowOnError([i, &statuses, &replicated_tables, &col_database, &col_table, &with_zk_fields]
evillique marked this conversation as resolved.
Show resolved Hide resolved
evillique marked this conversation as resolved.
Show resolved Hide resolved
{
dynamic_cast<StorageReplicatedMergeTree &>(
*replicated_tables
[(*col_database)[i].safeGet<const String &>()]
[(*col_table)[i].safeGet<const String &>()]).getStatus(statuses[i], with_zk_fields);
});
}

thread_pool.wait();

for (const auto & status: statuses)
{
size_t col_num = 3;
res_columns[col_num++]->insert(status.is_leader);
res_columns[col_num++]->insert(status.can_become_leader);
Expand Down