Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separated pool for background moves #7670

Merged
merged 4 commits into from
Nov 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.", 0) \
M(SettingBool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \
M(SettingUInt64, background_pool_size, 16, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.", 0) \
M(SettingUInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. Only has meaning at server startup.", 0) \
M(SettingUInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables. Only has meaning at server startup.", 0) \
\
M(SettingMilliseconds, distributed_directory_monitor_sleep_time_ms, 100, "Sleep time for StorageDistributed DirectoryMonitors, in case of any errors delay grows exponentially.", 0) \
Expand Down
10 changes: 10 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ struct ContextShared
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
std::optional<BackgroundProcessingPool> background_pool; /// The thread pool for the background work performed by the tables.
std::optional<BackgroundProcessingPool> background_move_pool; /// The thread pool for the background moves performed by the tables.
std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
MultiVersion<Macros> macros; /// Substitutions extracted from config.
std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
Expand Down Expand Up @@ -287,6 +288,7 @@ struct ContextShared
external_dictionaries_loader.reset();
external_models_loader.reset();
background_pool.reset();
background_move_pool.reset();
schedule_pool.reset();
ddl_worker.reset();

Expand Down Expand Up @@ -1489,6 +1491,14 @@ BackgroundProcessingPool & Context::getBackgroundPool()
return *shared->background_pool;
}

BackgroundProcessingPool & Context::getBackgroundMovePool()
{
auto lock = getLock();
if (!shared->background_move_pool)
shared->background_move_pool.emplace(settings.background_move_pool_size, "BackgroundMovePool", "BgMoveProcPool");
return *shared->background_move_pool;
}

BackgroundSchedulePool & Context::getSchedulePool()
{
auto lock = getLock();
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ class Context
void dropCaches() const;

BackgroundProcessingPool & getBackgroundPool();
BackgroundProcessingPool & getBackgroundMovePool();
BackgroundSchedulePool & getSchedulePool();

void setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker);
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,12 @@ void BackgroundProcessingPoolTaskInfo::wake()
}


BackgroundProcessingPool::BackgroundProcessingPool(int size_) : size(size_)
BackgroundProcessingPool::BackgroundProcessingPool(int size_, const char * log_name, const char * thread_name_)
: size(size_)
, thread_name(thread_name_)
{
LOG_INFO(&Logger::get("BackgroundProcessingPool"), "Create BackgroundProcessingPool with " << size << " threads");
logger = &Logger::get(log_name);
LOG_INFO(logger, "Create " << log_name << " with " << size << " threads");

threads.resize(size);
for (auto & thread : threads)
Expand Down Expand Up @@ -122,7 +125,7 @@ BackgroundProcessingPool::~BackgroundProcessingPool()

void BackgroundProcessingPool::threadFunction()
{
setThreadName("BackgrProcPool");
setThreadName(thread_name);

{
std::lock_guard lock(tasks_mutex);
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Storages/MergeTree/BackgroundProcessingPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ class BackgroundProcessingPool
using TaskHandle = std::shared_ptr<TaskInfo>;


BackgroundProcessingPool(int size_);
BackgroundProcessingPool(int size_,
const char * log_name = "BackgroundProcessingPool",
const char * thread_name_ = "BackgrProcPool");

size_t getNumberOfThreads() const
{
Expand All @@ -67,6 +69,8 @@ class BackgroundProcessingPool
using Threads = std::vector<ThreadFromGlobalPool>;

const size_t size;
const char * thread_name;
Poco::Logger * logger;

Tasks tasks; /// Ordered in priority.
std::mutex tasks_mutex;
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3486,6 +3486,11 @@ bool MergeTreeData::selectPartsAndMove()
return moveParts(std::move(moving_tagger));
}

bool MergeTreeData::areBackgroundMovesNeeded() const
{
return storage_policy->getVolumes().size() > 1;
}

bool MergeTreeData::movePartsToSpace(const DataPartsVector & parts, DiskSpace::SpacePtr space)
{
if (parts_mover.moves_blocker.isCancelled())
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,8 @@ class MergeTreeData : public IStorage
/// Selects parts for move and moves them, used in background process
bool selectPartsAndMove();

bool areBackgroundMovesNeeded() const;

private:
/// RAII Wrapper for atomic work with currently moving parts
/// Acuire them in constructor and remove them in destructor
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/StorageMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ void StorageMergeTree::startup()
/// NOTE background task will also do the above cleanups periodically.
time_after_previous_cleanup.restart();
merging_mutating_task_handle = global_context.getBackgroundPool().addTask([this] { return mergeMutateTask(); });
moving_task_handle = global_context.getBackgroundPool().addTask([this] { return movePartsTask(); });
if (areBackgroundMovesNeeded())
moving_task_handle = global_context.getBackgroundMovePool().addTask([this] { return movePartsTask(); });
}


Expand All @@ -115,7 +116,7 @@ void StorageMergeTree::shutdown()
global_context.getBackgroundPool().removeTask(merging_mutating_task_handle);

if (moving_task_handle)
global_context.getBackgroundPool().removeTask(moving_task_handle);
global_context.getBackgroundMovePool().removeTask(moving_task_handle);
}


Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2878,7 +2878,8 @@ void StorageReplicatedMergeTree::startup()
data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint, global_context.getInterserverIOHandler());

queue_task_handle = global_context.getBackgroundPool().addTask([this] { return queueTask(); });
move_parts_task_handle = global_context.getBackgroundPool().addTask([this] { return movePartsTask(); });
if (areBackgroundMovesNeeded())
move_parts_task_handle = global_context.getBackgroundMovePool().addTask([this] { return movePartsTask(); });

/// In this thread replica will be activated.
restarting_thread.start();
Expand All @@ -2902,7 +2903,7 @@ void StorageReplicatedMergeTree::shutdown()
queue_task_handle.reset();

if (move_parts_task_handle)
global_context.getBackgroundPool().removeTask(move_parts_task_handle);
global_context.getBackgroundMovePool().removeTask(move_parts_task_handle);
move_parts_task_handle.reset();

if (data_parts_exchange_endpoint_holder)
Expand Down