Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent tables creation in recoverLostReplica #59277

Merged
merged 9 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,11 @@ try
0, // We don't need any threads one all the parts will be deleted
server_settings.max_parts_cleaning_thread_pool_size);

getDatabaseReplicatedCreateTablesThreadPool().initialize(
server_settings.max_database_replicated_create_table_thread_pool_size,
0, // We don't need any threads once all the tables will be created
server_settings.max_database_replicated_create_table_thread_pool_size);

/// Initialize global local cache for remote filesystem.
if (config().has("local_cache_for_remote_fs"))
{
Expand Down
3 changes: 3 additions & 0 deletions src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@
M(MergeTreePartsCleanerThreads, "Number of threads in the MergeTree parts cleaner thread pool.") \
M(MergeTreePartsCleanerThreadsActive, "Number of threads in the MergeTree parts cleaner thread pool running a task.") \
M(MergeTreePartsCleanerThreadsScheduled, "Number of queued or active jobs in the MergeTree parts cleaner thread pool.") \
M(DatabaseReplicatedCreateTablesThreads, "Number of threads in the threadpool for table creation in DatabaseReplicated.") \
M(DatabaseReplicatedCreateTablesThreadsActive, "Number of active threads in the threadpool for table creation in DatabaseReplicated.") \
M(DatabaseReplicatedCreateTablesThreadsScheduled, "Number of queued or active jobs in the threadpool for table creation in DatabaseReplicated.") \
M(IDiskCopierThreads, "Number of threads for copying data between disks of different types.") \
M(IDiskCopierThreadsActive, "Number of threads for copying data between disks of different types running a task.") \
M(IDiskCopierThreadsScheduled, "Number of queued or active jobs for copying data between disks of different types.") \
Expand Down
1 change: 1 addition & 0 deletions src/Core/ServerSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ namespace DB
M(Bool, validate_tcp_client_information, false, "Validate client_information in the query packet over the native TCP protocol.", 0) \
M(Bool, storage_metadata_write_full_object_key, false, "Write disk metadata files with VERSION_FULL_OBJECT_KEY format", 0) \
M(UInt64, max_materialized_views_count_for_table, 0, "A limit on the number of materialized views attached to a table.", 0) \
M(UInt64, max_database_replicated_create_table_thread_pool_size, 0, "The number of threads to create tables during replica recovery in DatabaseReplicated. Zero means tables will be created sequentially.", 0) \
thevar1able marked this conversation as resolved.
Show resolved Hide resolved

/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in StorageSystemServerSettings.cpp

Expand Down
65 changes: 46 additions & 19 deletions src/Databases/DatabaseReplicated.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/SharedThreadPools.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTFunction.h>
Expand Down Expand Up @@ -1091,31 +1092,57 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
}

tables_dependencies.checkNoCyclicDependencies();
auto tables_to_create = tables_dependencies.getTablesSortedByDependency();

for (const auto & table_id : tables_to_create)
auto allow_concurrent_table_creation = getContext()->getServerSettings().max_database_replicated_create_table_thread_pool_size > 0;
auto tables_to_create_by_level = tables_dependencies.getTablesSortedByDependencyWithLevels();

auto create_tables_runner = threadPoolCallbackRunner<void>(getDatabaseReplicatedCreateTablesThreadPool().get(), "CreateTables");
std::vector<std::future<void>> create_table_futures;

for (const auto & [_, tables_to_create] : tables_to_create_by_level)
{
auto table_name = table_id.getTableName();
auto metadata_it = table_name_to_metadata.find(table_name);
if (metadata_it == table_name_to_metadata.end())
for (const auto & table_id : tables_to_create)
{
/// getTablesSortedByDependency() may return some not existing tables or tables from other databases
LOG_WARNING(log, "Got table name {} when resolving table dependencies, "
"but database {} does not have metadata for that table. Ignoring it", table_id.getNameForLogs(), getDatabaseName());
continue;
}
auto task = [&]()
{
auto table_name = table_id.getTableName();
auto metadata_it = table_name_to_metadata.find(table_name);
if (metadata_it == table_name_to_metadata.end())
{
/// getTablesSortedByDependency() may return some not existing tables or tables from other databases
LOG_WARNING(log, "Got table name {} when resolving table dependencies, "
"but database {} does not have metadata for that table. Ignoring it", table_id.getNameForLogs(), getDatabaseName());
return;
}

const auto & create_query_string = metadata_it->second;
if (isTableExist(table_name, getContext()))
{
assert(create_query_string == readMetadataFile(table_name) || getTableUUIDIfReplicated(create_query_string, getContext()) != UUIDHelpers::Nil);
continue;
const auto & create_query_string = metadata_it->second;
if (isTableExist(table_name, getContext()))
{
assert(create_query_string == readMetadataFile(table_name) || getTableUUIDIfReplicated(create_query_string, getContext()) != UUIDHelpers::Nil);
return;
}

auto query_ast = parseQueryFromMetadataInZooKeeper(table_name, create_query_string);
LOG_INFO(log, "Executing {}", serializeAST(*query_ast));
auto create_query_context = make_query_context();
InterpreterCreateQuery(query_ast, create_query_context).execute();
};

if (allow_concurrent_table_creation)
create_table_futures.push_back(create_tables_runner(task, Priority{0}));
else
task();
}

auto query_ast = parseQueryFromMetadataInZooKeeper(table_name, create_query_string);
LOG_INFO(log, "Executing {}", serializeAST(*query_ast));
auto create_query_context = make_query_context();
InterpreterCreateQuery(query_ast, create_query_context).execute();
/// First wait for all tasks to finish.
for (auto & future : create_table_futures)
future.wait();

/// Now rethrow the first exception if any.
for (auto & future : create_table_futures)
future.get();

create_table_futures.clear();
}
LOG_INFO(log, "All tables are created successfully");

Expand Down
11 changes: 11 additions & 0 deletions src/Databases/TablesDependencyGraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,17 @@ std::vector<StorageID> TablesDependencyGraph::getTablesSortedByDependency() cons
}


std::map<size_t, std::vector<StorageID>> TablesDependencyGraph::getTablesSortedByDependencyWithLevels() const
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not std::vector<std::vector<StorageID>>?

{
std::map<size_t, std::vector<StorageID>> tables_by_level;
for (const auto * node : getNodesSortedByLevel())
{
tables_by_level[node->level].emplace_back(node->storage_id);
}
return tables_by_level;
}


void TablesDependencyGraph::log() const
{
if (nodes.empty())
Expand Down
6 changes: 6 additions & 0 deletions src/Databases/TablesDependencyGraph.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ class TablesDependencyGraph
/// tables which depend on the tables which depend on the tables without dependencies, and so on.
std::vector<StorageID> getTablesSortedByDependency() const;

/// Returns a map of lists of tables by the number of dependencies they have:
/// tables without dependencies first with level 0, then
/// tables with depend on the tables without dependencies with level 1, then
/// tables which depend on the tables which depend on the tables without dependencies with level 2, and so on.
std::map<size_t, std::vector<StorageID>> getTablesSortedByDependencyWithLevels() const;

/// Outputs information about this graph as a bunch of logging messages.
void log() const;

Expand Down
9 changes: 9 additions & 0 deletions src/IO/SharedThreadPools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ namespace CurrentMetrics
extern const Metric MergeTreeOutdatedPartsLoaderThreads;
extern const Metric MergeTreeOutdatedPartsLoaderThreadsActive;
extern const Metric MergeTreeOutdatedPartsLoaderThreadsScheduled;
extern const Metric DatabaseReplicatedCreateTablesThreads;
extern const Metric DatabaseReplicatedCreateTablesThreadsActive;
extern const Metric DatabaseReplicatedCreateTablesThreadsScheduled;
}

namespace DB
Expand Down Expand Up @@ -148,4 +151,10 @@ StaticThreadPool & getOutdatedPartsLoadingThreadPool()
return instance;
}

StaticThreadPool & getDatabaseReplicatedCreateTablesThreadPool()
{
static StaticThreadPool instance("CreateTablesThreadPool", CurrentMetrics::DatabaseReplicatedCreateTablesThreads, CurrentMetrics::DatabaseReplicatedCreateTablesThreadsActive, CurrentMetrics::DatabaseReplicatedCreateTablesThreadsScheduled);
return instance;
}

}
3 changes: 3 additions & 0 deletions src/IO/SharedThreadPools.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,7 @@ StaticThreadPool & getPartsCleaningThreadPool();
/// the number of threads by calling enableTurboMode() :-)
StaticThreadPool & getOutdatedPartsLoadingThreadPool();

/// ThreadPool used for creating tables in DatabaseReplicated.
StaticThreadPool & getDatabaseReplicatedCreateTablesThreadPool();

}
1 change: 1 addition & 0 deletions tests/config/config.d/database_replicated.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,5 @@
</remote_servers>

<_functional_tests_helper_database_replicated_replace_args_macros>1</_functional_tests_helper_database_replicated_replace_args_macros>
<max_database_replicated_create_table_thread_pool_size>50</max_database_replicated_create_table_thread_pool_size>
thevar1able marked this conversation as resolved.
Show resolved Hide resolved
</clickhouse>