Skip to content

Commit

Permalink
Add server settings "shutdown_wait_backups_and_restores" (default=true)
Browse files Browse the repository at this point in the history
to set whether shutdown should wait for running backups to finish or just cancel them.
  • Loading branch information
vitlibar committed Jan 18, 2024
1 parent c15edeb commit 4c6d3e7
Show file tree
Hide file tree
Showing 11 changed files with 314 additions and 32 deletions.
6 changes: 6 additions & 0 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1990,6 +1990,12 @@ try
else
LOG_INFO(log, "Closed all listening sockets.");

/// Wait for unfinished backups and restores.
/// This must be done after closing listening sockets (no more backups/restores) but before ProcessList::killAllQueries
/// (because killAllQueries() will cancel all running backups/restores).
if (server_settings.shutdown_wait_backups_and_restores)
global_context->waitAllBackupsAndRestores();

/// Killing remaining queries.
if (!server_settings.shutdown_wait_unfinished_queries)
global_context->getProcessList().killAllQueries();
Expand Down
154 changes: 127 additions & 27 deletions src/Backups/BackupsWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ namespace
return isFinishedSuccessfully(status) || isFailedOrCancelled(status);
}

bool isBackupStatus(BackupStatus status)
{
return (status == BackupStatus::CREATING_BACKUP) || (status == BackupStatus::BACKUP_CREATED) || (status == BackupStatus::BACKUP_FAILED) || (status == BackupStatus::BACKUP_CANCELLED);
}

BackupStatus getBackupStatusFromCurrentException()
{
if (getCurrentExceptionCode() == ErrorCodes::QUERY_WAS_CANCELLED)
Expand Down Expand Up @@ -370,14 +375,15 @@ class BackupsWorker::ThreadPools
};


BackupsWorker::BackupsWorker(ContextPtr global_context, size_t num_backup_threads, size_t num_restore_threads, bool allow_concurrent_backups_, bool allow_concurrent_restores_, bool test_inject_sleep_)
BackupsWorker::BackupsWorker(ContextMutablePtr global_context, size_t num_backup_threads, size_t num_restore_threads, bool allow_concurrent_backups_, bool allow_concurrent_restores_, bool test_inject_sleep_)
: thread_pools(std::make_unique<ThreadPools>(num_backup_threads, num_restore_threads))
, allow_concurrent_backups(allow_concurrent_backups_)
, allow_concurrent_restores(allow_concurrent_restores_)
, test_inject_sleep(test_inject_sleep_)
, log(&Poco::Logger::get("BackupsWorker"))
, backup_log(global_context->getBackupLog())
, process_list(global_context->getProcessList())
{
backup_log = global_context->getBackupLog();
}


Expand Down Expand Up @@ -434,7 +440,7 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context

try
{
addInfo(backup_id, backup_name_for_logging, base_backup_name, backup_settings.internal, BackupStatus::CREATING_BACKUP);
addInfo(backup_id, backup_name_for_logging, base_backup_name, backup_settings.internal, context->getProcessListElement(), BackupStatus::CREATING_BACKUP);

/// Prepare context to use.
ContextPtr context_in_use = context;
Expand Down Expand Up @@ -819,7 +825,7 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt
if (restore_settings.base_backup_info)
base_backup_name = restore_settings.base_backup_info->toString();

addInfo(restore_id, backup_name_for_logging, base_backup_name, restore_settings.internal, BackupStatus::RESTORING);
addInfo(restore_id, backup_name_for_logging, base_backup_name, restore_settings.internal, context->getProcessListElement(), BackupStatus::RESTORING);

/// Prepare context to use.
ContextMutablePtr context_in_use = context;
Expand Down Expand Up @@ -1106,17 +1112,27 @@ void BackupsWorker::restoreTablesData(const OperationID & restore_id, BackupPtr
}


void BackupsWorker::addInfo(const OperationID & id, const String & name, const String & base_backup_name, bool internal, BackupStatus status)
void BackupsWorker::addInfo(const OperationID & id, const String & name, const String & base_backup_name, bool internal, QueryStatusPtr process_list_element, BackupStatus status)
{
BackupOperationInfo info;
ExtendedOperationInfo extended_info;
auto & info = extended_info.info;
info.id = id;
info.name = name;
info.base_backup_name = base_backup_name;
info.internal = internal;
info.status = status;
info.start_time = std::chrono::system_clock::now();

if (isFinalStatus(status))
bool is_final_status = isFinalStatus(status);

if (process_list_element)
{
info.profile_counters = process_list_element->getInfo(/* get_thread_list= */ false, /* get_profile_events= */ true, /* get_settings= */ false).profile_counters;
if (!is_final_status)
extended_info.process_list_element = process_list_element;
}

if (is_final_status)
info.end_time = info.start_time;

std::lock_guard lock{infos_mutex};
Expand All @@ -1125,15 +1141,15 @@ void BackupsWorker::addInfo(const OperationID & id, const String & name, const S
if (it != infos.end())
{
/// It's better not allow to overwrite the current status if it's in progress.
auto current_status = it->second.status;
auto current_status = it->second.info.status;
if (!isFinalStatus(current_status))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot start a backup or restore: ID {} is already in use", id);
}

if (backup_log)
backup_log->add(BackupLogElement{info});

infos[id] = std::move(info);
infos[id] = std::move(extended_info);

num_active_backups += getNumActiveBackupsChange(status);
num_active_restores += getNumActiveRestoresChange(status);
Expand All @@ -1152,13 +1168,21 @@ void BackupsWorker::setStatus(const String & id, BackupStatus status, bool throw
return;
}

auto & info = it->second;
auto old_status = info.status;
auto & extended_info = it->second;
auto & info = extended_info.info;

auto old_status = info.status;
info.status = status;
info.profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(CurrentThread::getProfileEvents().getPartiallyAtomicSnapshot());
bool is_final_status = isFinalStatus(status);

if (extended_info.process_list_element)
{
info.profile_counters = extended_info.process_list_element->getInfo(/* get_thread_list= */ false, /* get_profile_events= */ true, /* get_settings= */ false).profile_counters;
if (is_final_status)
extended_info.process_list_element = nullptr;
}

if (isFinalStatus(status))
if (is_final_status)
info.end_time = std::chrono::system_clock::now();

if (isFailedOrCancelled(status))
Expand All @@ -1172,6 +1196,9 @@ void BackupsWorker::setStatus(const String & id, BackupStatus status, bool throw

num_active_backups += getNumActiveBackupsChange(status) - getNumActiveBackupsChange(old_status);
num_active_restores += getNumActiveRestoresChange(status) - getNumActiveRestoresChange(old_status);

if (status != old_status)
status_changed.notify_all();
}


Expand All @@ -1185,7 +1212,7 @@ void BackupsWorker::setNumFilesAndSize(const OperationID & id, size_t num_files,
if (it == infos.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown backup ID {}", id);

auto & info = it->second;
auto & info = it->second.info;
info.num_files = num_files;
info.total_size = total_size;
info.num_entries = num_entries;
Expand All @@ -1203,37 +1230,113 @@ void BackupsWorker::maybeSleepForTesting() const
}


void BackupsWorker::wait(const OperationID & id, bool rethrow_exception)
void BackupsWorker::wait(const OperationID & backup_or_restore_id, bool rethrow_exception)
{
std::unique_lock lock{infos_mutex};
status_changed.wait(lock, [&]
{
auto it = infos.find(id);
auto it = infos.find(backup_or_restore_id);
if (it == infos.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown backup ID {}", id);
const auto & info = it->second;
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown backup ID {}", backup_or_restore_id);
const auto & info = it->second.info;
auto current_status = info.status;
if (rethrow_exception && isFailedOrCancelled(current_status))
std::rethrow_exception(info.exception);
return isFinalStatus(current_status);
if (isFinalStatus(current_status))
return true;
LOG_INFO(log, "Waiting {} {}", isBackupStatus(info.status) ? "backup" : "restore", info.name);
return false;
});
}

void BackupsWorker::waitAll()
{
std::vector<OperationID> current_operations;
{
std::lock_guard lock{infos_mutex};
for (const auto & [id, extended_info] : infos)
if (!isFinalStatus(extended_info.info.status))
current_operations.push_back(id);
}

if (current_operations.empty())
return;

LOG_INFO(log, "Waiting for running backups and restores to finish");

for (const auto & id : current_operations)
wait(id, /* rethrow_exception= */ false);

LOG_INFO(log, "Backups and restores finished");
}

void BackupsWorker::cancel(const BackupOperationID & backup_or_restore_id, bool wait_)
{
QueryStatusPtr process_list_element;
{
std::unique_lock lock{infos_mutex};
auto it = infos.find(backup_or_restore_id);
if (it == infos.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown backup ID {}", backup_or_restore_id);

const auto & extended_info = it->second;
const auto & info = extended_info.info;
if (isFinalStatus(info.status) || !extended_info.process_list_element)
return;

LOG_INFO(log, "Cancelling {} {}", isBackupStatus(info.status) ? "backup" : "restore", info.name);
process_list_element = extended_info.process_list_element;
}

process_list.sendCancelToQuery(process_list_element);

if (wait_)
wait(backup_or_restore_id, /* rethrow_exception= */ false);
}


void BackupsWorker::cancelAll(bool wait_)
{
std::vector<OperationID> current_operations;
{
std::lock_guard lock{infos_mutex};
for (const auto & [id, extended_info] : infos)
if (!isFinalStatus(extended_info.info.status))
current_operations.push_back(id);
}

if (current_operations.empty())
return;

LOG_INFO(log, "Cancelling running backups and restores");

for (const auto & id : current_operations)
cancel(id, /* wait= */ false);

if (wait_)
for (const auto & id : current_operations)
wait(id, /* rethrow_exception= */ false);

LOG_INFO(log, "Backups and restores finished or stopped");
}


BackupOperationInfo BackupsWorker::getInfo(const OperationID & id) const
{
std::lock_guard lock{infos_mutex};
auto it = infos.find(id);
if (it == infos.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown backup ID {}", id);
return it->second;
return it->second.info;
}

std::vector<BackupOperationInfo> BackupsWorker::getAllInfos() const
{
std::vector<BackupOperationInfo> res_infos;
std::lock_guard lock{infos_mutex};
for (const auto & info : infos | boost::adaptors::map_values)
for (const auto & extended_info : infos | boost::adaptors::map_values)
{
const auto & info = extended_info.info;
if (!info.internal)
res_infos.push_back(info);
}
Expand All @@ -1242,14 +1345,11 @@ std::vector<BackupOperationInfo> BackupsWorker::getAllInfos() const

void BackupsWorker::shutdown()
{
bool has_active_backups_and_restores = (num_active_backups || num_active_restores);
if (has_active_backups_and_restores)
LOG_INFO(log, "Waiting for {} backups and {} restores to be finished", num_active_backups, num_active_restores);
/// Cancel running backups and restores.
cancelAll(/* wait= */ true);

/// Wait for our thread pools (it must be done before destroying them).
thread_pools->wait();

if (has_active_backups_and_restores)
LOG_INFO(log, "All backup and restore tasks have finished");
}

}
37 changes: 32 additions & 5 deletions src/Backups/BackupsWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,22 @@ class ThreadGroup;
using ThreadGroupPtr = std::shared_ptr<ThreadGroup>;
class QueryStatus;
using QueryStatusPtr = std::shared_ptr<QueryStatus>;
class ProcessList;


/// Manager of backups and restores: executes backups and restores' threads in the background.
/// Keeps information about backups and restores started in this session.
class BackupsWorker
{
public:
BackupsWorker(ContextPtr global_context, size_t num_backup_threads, size_t num_restore_threads, bool allow_concurrent_backups_, bool allow_concurrent_restores_, bool test_inject_sleep_);
BackupsWorker(
ContextMutablePtr global_context,
size_t num_backup_threads,
size_t num_restore_threads,
bool allow_concurrent_backups_,
bool allow_concurrent_restores_,
bool test_inject_sleep_);

~BackupsWorker();

/// Waits until all tasks have been completed.
Expand All @@ -46,10 +54,20 @@ class BackupsWorker
/// Starts executing a BACKUP or RESTORE query. Returns ID of the operation.
BackupOperationID start(const ASTPtr & backup_or_restore_query, ContextMutablePtr context);

/// Waits until a BACKUP or RESTORE query started by start() is finished.
/// Waits until the specified backup or restore operation finishes or stops.
/// The function returns immediately if the operation is already finished.
void wait(const BackupOperationID & backup_or_restore_id, bool rethrow_exception = true);

/// Waits until all running backup and restore operations finish or stop.
void waitAll();

/// Cancels the specified backup or restore operation.
/// The function does nothing if this operation has already finished.
void cancel(const BackupOperationID & backup_or_restore_id, bool wait_ = true);

/// Cancels all running backup and restore operations.
void cancelAll(bool wait_ = true);

BackupOperationInfo getInfo(const BackupOperationID & id) const;
std::vector<BackupOperationInfo> getAllInfos() const;

Expand Down Expand Up @@ -90,7 +108,7 @@ class BackupsWorker
/// Run data restoring tasks which insert data to tables.
void restoreTablesData(const BackupOperationID & restore_id, BackupPtr backup, DataRestoreTasks && tasks, ThreadPool & thread_pool, QueryStatusPtr process_list_element);

void addInfo(const BackupOperationID & id, const String & name, const String & base_backup_name, bool internal, BackupStatus status);
void addInfo(const BackupOperationID & id, const String & name, const String & base_backup_name, bool internal, QueryStatusPtr process_list_element, BackupStatus status);
void setStatus(const BackupOperationID & id, BackupStatus status, bool throw_if_error = true);
void setStatusSafe(const String & id, BackupStatus status) { setStatus(id, status, false); }
void setNumFilesAndSize(const BackupOperationID & id, size_t num_files, UInt64 total_size, size_t num_entries,
Expand All @@ -111,12 +129,21 @@ class BackupsWorker

Poco::Logger * log;

std::unordered_map<BackupOperationID, BackupOperationInfo> infos;
std::shared_ptr<BackupLog> backup_log;
struct ExtendedOperationInfo
{
BackupOperationInfo info;
QueryStatusPtr process_list_element; /// to cancel this operation if we want to
};

std::unordered_map<BackupOperationID, ExtendedOperationInfo> infos;

std::condition_variable status_changed;
std::atomic<size_t> num_active_backups = 0;
std::atomic<size_t> num_active_restores = 0;
mutable std::mutex infos_mutex;

std::shared_ptr<BackupLog> backup_log;
ProcessList & process_list;
};

}
1 change: 1 addition & 0 deletions src/Core/ServerSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ namespace DB
M(UInt64, backup_threads, 16, "The maximum number of threads to execute BACKUP requests.", 0) \
M(UInt64, max_backup_bandwidth_for_server, 0, "The maximum read speed in bytes per second for all backups on server. Zero means unlimited.", 0) \
M(UInt64, restore_threads, 16, "The maximum number of threads to execute RESTORE requests.", 0) \
M(Bool, shutdown_wait_backups_and_restores, true, "If set to true ClickHouse will wait for running backups and restores to finish before shutdown.", 0) \
M(Int32, max_connections, 1024, "Max server connections.", 0) \
M(UInt32, asynchronous_metrics_update_period_s, 1, "Period in seconds for updating asynchronous metrics.", 0) \
M(UInt32, asynchronous_heavy_metrics_update_period_s, 120, "Period in seconds for updating heavy asynchronous metrics.", 0) \
Expand Down
6 changes: 6 additions & 0 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2547,6 +2547,12 @@ BackupsWorker & Context::getBackupsWorker() const
return *shared->backups_worker;
}

void Context::waitAllBackupsAndRestores() const
{
if (shared->backups_worker)
shared->backups_worker->waitAll();
}


void Context::setProgressCallback(ProgressCallback callback)
{
Expand Down

0 comments on commit 4c6d3e7

Please sign in to comment.