Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow KILL QUERY for backups #58804

Merged
merged 6 commits into from
Jan 20, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1990,6 +1990,12 @@ try
else
LOG_INFO(log, "Closed all listening sockets.");

/// Wait for unfinished backups and restores.
/// This must be done after closing listening sockets (no more backups/restores) but before ProcessList::killAllQueries
/// (because killAllQueries() will cancel all running backups/restores).
if (server_settings.shutdown_wait_backups_and_restores)
global_context->waitAllBackupsAndRestores();

/// Killing remaining queries.
if (!server_settings.shutdown_wait_unfinished_queries)
global_context->getProcessList().killAllQueries();
Expand Down
4 changes: 3 additions & 1 deletion src/Backups/BackupCoordinationRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ BackupCoordinationRemote::BackupCoordinationRemote(
const Strings & all_hosts_,
const String & current_host_,
bool plain_backup_,
bool is_internal_)
bool is_internal_,
QueryStatusPtr process_list_element_)
: root_zookeeper_path(root_zookeeper_path_)
, zookeeper_path(root_zookeeper_path_ + "/backup-" + backup_uuid_)
, keeper_settings(keeper_settings_)
Expand All @@ -177,6 +178,7 @@ BackupCoordinationRemote::BackupCoordinationRemote(
log,
get_zookeeper_,
keeper_settings,
process_list_element_,
[my_zookeeper_path = zookeeper_path, my_current_host = current_host, my_is_internal = is_internal]
(WithRetries::FaultyKeeper & zk)
{
Expand Down
3 changes: 2 additions & 1 deletion src/Backups/BackupCoordinationRemote.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class BackupCoordinationRemote : public IBackupCoordination
const Strings & all_hosts_,
const String & current_host_,
bool plain_backup_,
bool is_internal_);
bool is_internal_,
QueryStatusPtr process_list_element_);

~BackupCoordinationRemote() override;

Expand Down
31 changes: 28 additions & 3 deletions src/Backups/BackupEntriesCollector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ BackupEntriesCollector::BackupEntriesCollector(
, backup_coordination(backup_coordination_)
, read_settings(read_settings_)
, context(context_)
, process_list_element(context->getProcessListElement())
, on_cluster_first_sync_timeout(context->getConfigRef().getUInt64("backups.on_cluster_first_sync_timeout", 180000))
, collect_metadata_timeout(context->getConfigRef().getUInt64(
"backups.collect_metadata_timeout", context->getConfigRef().getUInt64("backups.consistent_metadata_snapshot_timeout", 600000)))
Expand Down Expand Up @@ -158,8 +159,9 @@ BackupEntries BackupEntriesCollector::run()
Strings BackupEntriesCollector::setStage(const String & new_stage, const String & message)
{
LOG_TRACE(log, "Setting stage: {}", new_stage);
current_stage = new_stage;
checkQueryNotCancelled();

current_stage = new_stage;
backup_coordination->setStage(new_stage, message);

if (new_stage == Stage::formatGatheringMetadata(0))
Expand All @@ -179,6 +181,12 @@ Strings BackupEntriesCollector::setStage(const String & new_stage, const String
}
}

void BackupEntriesCollector::checkQueryNotCancelled() const
{
if (process_list_element)
process_list_element->checkTimeLimit();
}

/// Calculates the root path for collecting backup entries,
/// it's either empty or has the format "shards/<shard_num>/replicas/<replica_num>/".
void BackupEntriesCollector::calculateRootPathInBackup()
Expand Down Expand Up @@ -413,6 +421,8 @@ void BackupEntriesCollector::gatherDatabaseMetadata(
bool all_tables,
const std::set<DatabaseAndTableName> & except_table_names)
{
checkQueryNotCancelled();

auto it = database_infos.find(database_name);
if (it == database_infos.end())
{
Expand Down Expand Up @@ -491,6 +501,8 @@ void BackupEntriesCollector::gatherDatabaseMetadata(

void BackupEntriesCollector::gatherTablesMetadata()
{
checkQueryNotCancelled();

table_infos.clear();
for (const auto & [database_name, database_info] : database_infos)
{
Expand Down Expand Up @@ -552,6 +564,8 @@ std::vector<std::pair<ASTPtr, StoragePtr>> BackupEntriesCollector::findTablesInD
const auto & database_info = database_infos.at(database_name);
const auto & database = database_info.database;

checkQueryNotCancelled();

auto filter_by_table_name = [my_database_info = &database_info](const String & table_name)
{
/// We skip inner tables of materialized views.
Expand Down Expand Up @@ -629,8 +643,12 @@ void BackupEntriesCollector::lockTablesForReading()
for (auto & [table_name, table_info] : table_infos)
{
auto storage = table_info.storage;
if (storage)
table_info.table_lock = storage->tryLockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout);
if (!storage)
continue;

checkQueryNotCancelled();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it should be done before checking if storage is a null pointer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm calling checkQueryNotCancelled() before doing anything time-consuming. Checking a pointer for null is not time-consuming.


table_info.table_lock = storage->tryLockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout);
}

std::erase_if(
Expand Down Expand Up @@ -734,6 +752,7 @@ void BackupEntriesCollector::makeBackupEntriesForDatabasesDefs()
continue; /// We store CREATE DATABASE queries only if there was BACKUP DATABASE specified.

LOG_TRACE(log, "Adding the definition of database {} to backup", backQuoteIfNeed(database_name));
checkQueryNotCancelled();

ASTPtr new_create_query = database_info.create_database_query;
adjustCreateQueryForBackup(new_create_query, context->getGlobalContext(), nullptr);
Expand All @@ -750,6 +769,7 @@ void BackupEntriesCollector::makeBackupEntriesForTablesDefs()
for (auto & [table_name, table_info] : table_infos)
{
LOG_TRACE(log, "Adding the definition of {} to backup", tableNameWithTypeToString(table_name.database, table_name.table, false));
checkQueryNotCancelled();

ASTPtr new_create_query = table_info.create_table_query;
adjustCreateQueryForBackup(new_create_query, context->getGlobalContext(), &table_info.replicated_table_shared_id);
Expand Down Expand Up @@ -802,6 +822,7 @@ void BackupEntriesCollector::makeBackupEntriesForTableData(const QualifiedTableN
}

LOG_TRACE(log, "Collecting data of {} for backup", tableNameWithTypeToString(table_name.database, table_name.table, false));
checkQueryNotCancelled();

try
{
Expand Down Expand Up @@ -861,13 +882,17 @@ void BackupEntriesCollector::addPostTask(std::function<void()> task)
void BackupEntriesCollector::runPostTasks()
{
LOG_TRACE(log, "Will run {} post tasks", post_tasks.size());

/// Post collecting tasks can add other post collecting tasks, our code is fine with that.
while (!post_tasks.empty())
{
checkQueryNotCancelled();

auto task = std::move(post_tasks.front());
post_tasks.pop();
std::move(task)();
}

LOG_TRACE(log, "All post tasks successfully executed");
}

Expand Down
7 changes: 7 additions & 0 deletions src/Backups/BackupEntriesCollector.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class IDatabase;
using DatabasePtr = std::shared_ptr<IDatabase>;
struct StorageID;
enum class AccessEntityType;
class QueryStatus;
using QueryStatusPtr = std::shared_ptr<QueryStatus>;


/// Collects backup entries for all databases and tables which should be put to a backup.
class BackupEntriesCollector : private boost::noncopyable
Expand Down Expand Up @@ -97,11 +100,15 @@ class BackupEntriesCollector : private boost::noncopyable

Strings setStage(const String & new_stage, const String & message = "");

/// Throws an exception if the BACKUP query was cancelled.
void checkQueryNotCancelled() const;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename to checkQueryIsCancelled

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed it to checkIsQueryCancelled()


const ASTBackupQuery::Elements backup_query_elements;
const BackupSettings backup_settings;
std::shared_ptr<IBackupCoordination> backup_coordination;
const ReadSettings read_settings;
ContextPtr context;
QueryStatusPtr process_list_element;

/// The time a BACKUP ON CLUSTER or RESTORE ON CLUSTER command will wait until all the nodes receive the BACKUP (or RESTORE) query and start working.
/// This setting is similar to `distributed_ddl_task_timeout`.
Expand Down
9 changes: 7 additions & 2 deletions src/Backups/BackupFileInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <Common/scope_guard_safe.h>
#include <Common/setThreadName.h>
#include <Common/ThreadPool.h>
#include <Interpreters/ProcessList.h>

#include <base/hex.h>


Expand Down Expand Up @@ -203,7 +205,7 @@ BackupFileInfo buildFileInfoForBackupEntry(
return info;
}

BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entries, const BackupPtr & base_backup, const ReadSettings & read_settings, ThreadPool & thread_pool)
BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entries, const BackupPtr & base_backup, const ReadSettings & read_settings, ThreadPool & thread_pool, QueryStatusPtr process_list_element)
{
BackupFileInfos infos;
infos.resize(backup_entries.size());
Expand All @@ -225,7 +227,7 @@ BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entr
++num_active_jobs;
}

auto job = [&mutex, &num_active_jobs, &event, &exception, &infos, &backup_entries, &read_settings, &base_backup, &thread_group, i, log]()
auto job = [&mutex, &num_active_jobs, &event, &exception, &infos, &backup_entries, &read_settings, &base_backup, &thread_group, &process_list_element, i, log]()
{
SCOPE_EXIT_SAFE({
std::lock_guard lock{mutex};
Expand All @@ -250,6 +252,9 @@ BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entr
return;
}

if (process_list_element)
process_list_element->checkTimeLimit();

infos[i] = buildFileInfoForBackupEntry(name, entry, base_backup, read_settings, log);
}
catch (...)
Expand Down
4 changes: 3 additions & 1 deletion src/Backups/BackupFileInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ using BackupPtr = std::shared_ptr<const IBackup>;
using BackupEntryPtr = std::shared_ptr<const IBackupEntry>;
using BackupEntries = std::vector<std::pair<String, BackupEntryPtr>>;
struct ReadSettings;
class QueryStatus;
using QueryStatusPtr = std::shared_ptr<QueryStatus>;


/// Information about a file stored in a backup.
Expand Down Expand Up @@ -78,6 +80,6 @@ using BackupFileInfos = std::vector<BackupFileInfo>;
BackupFileInfo buildFileInfoForBackupEntry(const String & file_name, const BackupEntryPtr & backup_entry, const BackupPtr & base_backup, const ReadSettings & read_settings, Poco::Logger * log);

/// Builds a vector of BackupFileInfos for specified backup entries.
BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entries, const BackupPtr & base_backup, const ReadSettings & read_settings, ThreadPool & thread_pool);
BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entries, const BackupPtr & base_backup, const ReadSettings & read_settings, ThreadPool & thread_pool, QueryStatusPtr process_list_element);

}
4 changes: 4 additions & 0 deletions src/Backups/BackupStatus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ std::string_view toString(BackupStatus backup_status)
return "BACKUP_CREATED";
case BackupStatus::BACKUP_FAILED:
return "BACKUP_FAILED";
case BackupStatus::BACKUP_CANCELLED:
return "BACKUP_CANCELLED";
case BackupStatus::RESTORING:
return "RESTORING";
case BackupStatus::RESTORED:
return "RESTORED";
case BackupStatus::RESTORE_FAILED:
return "RESTORE_FAILED";
case BackupStatus::RESTORE_CANCELLED:
return "RESTORE_CANCELLED";
default:
break;
}
Expand Down
4 changes: 4 additions & 0 deletions src/Backups/BackupStatus.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ enum class BackupStatus
RESTORED,
RESTORE_FAILED,

/// Statuses used after a BACKUP or RESTORE operation was cancelled.
BACKUP_CANCELLED,
RESTORE_CANCELLED,

MAX,
};

Expand Down