Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backup/Restore for KeeperMap tables #56460

Merged
merged 17 commits into from Nov 18, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/Backups/BackupCoordinationKeeperMapTables.cpp
@@ -0,0 +1,23 @@
#include <Backups/BackupCoordinationKeeperMapTables.h>

namespace DB
{

void BackupCoordinationKeeperMapTables::addTable(const std::string & table_zookeeper_root_path, const std::string & table_id, const std::string & data_path_in_backup)
{
if (auto it = tables_with_info.find(table_zookeeper_root_path); it != tables_with_info.end())
{
if (table_id > it->second.table_id)
it->second = KeeperMapTableInfo{table_id, data_path_in_backup};
return;
}

tables_with_info.emplace(table_zookeeper_root_path, KeeperMapTableInfo{table_id, data_path_in_backup});
}

std::string BackupCoordinationKeeperMapTables::getDataPath(const std::string & table_zookeeper_root_path) const
{
return tables_with_info.at(table_zookeeper_root_path).data_path_in_backup;
}

}
23 changes: 23 additions & 0 deletions src/Backups/BackupCoordinationKeeperMapTables.h
@@ -0,0 +1,23 @@
#pragma once

#include <unordered_map>
#include <string>

namespace DB
{

struct BackupCoordinationKeeperMapTables
{
void addTable(const std::string & table_zookeeper_root_path, const std::string & table_id, const std::string & data_path_in_backup);
std::string getDataPath(const std::string & table_zookeeper_root_path) const;

struct KeeperMapTableInfo
{
std::string table_id;
std::string data_path_in_backup;
};
private:
std::unordered_map<std::string /* root zookeeper path */, KeeperMapTableInfo> tables_with_info;
};

}
12 changes: 12 additions & 0 deletions src/Backups/BackupCoordinationLocal.cpp
Expand Up @@ -97,6 +97,18 @@ Strings BackupCoordinationLocal::getReplicatedSQLObjectsDirs(const String & load
return replicated_sql_objects.getDirectories(loader_zk_path, object_type, "");
}

void BackupCoordinationLocal::addKeeperMapTable(const String & table_zookeeper_root_path, const String & table_id, const String & data_path_in_backup)
{
std::lock_guard lock(keeper_map_tables_mutex);
keeper_map_tables.addTable(table_zookeeper_root_path, table_id, data_path_in_backup);
}

String BackupCoordinationLocal::getKeeperMapDataPath(const String & table_zookeeper_root_path) const
{
std::lock_guard lock(keeper_map_tables_mutex);
return keeper_map_tables.getDataPath(table_zookeeper_root_path);
}


void BackupCoordinationLocal::addFileInfos(BackupFileInfos && file_infos_)
{
Expand Down
14 changes: 14 additions & 0 deletions src/Backups/BackupCoordinationLocal.h
Expand Up @@ -5,7 +5,9 @@
#include <Backups/BackupCoordinationReplicatedAccess.h>
#include <Backups/BackupCoordinationReplicatedSQLObjects.h>
#include <Backups/BackupCoordinationReplicatedTables.h>
#include <Backups/BackupCoordinationKeeperMapTables.h>
#include <base/defines.h>
#include <cstddef>
#include <mutex>
#include <unordered_set>

Expand Down Expand Up @@ -44,6 +46,9 @@ class BackupCoordinationLocal : public IBackupCoordination
void addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & dir_path) override;
Strings getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type) const override;

void addKeeperMapTable(const String & table_zookeeper_root_path, const String & table_id, const String & data_path_in_backup) override;
String getKeeperMapDataPath(const String & table_zookeeper_root_path) const override;

void addFileInfos(BackupFileInfos && file_infos) override;
BackupFileInfos getFileInfos() const override;
BackupFileInfos getFileInfosForAllHosts() const override;
Expand All @@ -58,13 +63,22 @@ class BackupCoordinationLocal : public IBackupCoordination
BackupCoordinationReplicatedAccess TSA_GUARDED_BY(replicated_access_mutex) replicated_access;
BackupCoordinationReplicatedSQLObjects TSA_GUARDED_BY(replicated_sql_objects_mutex) replicated_sql_objects;
BackupCoordinationFileInfos TSA_GUARDED_BY(file_infos_mutex) file_infos;
BackupCoordinationKeeperMapTables keeper_map_tables TSA_GUARDED_BY(keeper_map_tables_mutex);
std::unordered_set<size_t> TSA_GUARDED_BY(writing_files_mutex) writing_files;

struct KeeperMapTableInfo
{
String table_id;
String data_path_in_backup;
};


mutable std::mutex replicated_tables_mutex;
mutable std::mutex replicated_access_mutex;
mutable std::mutex replicated_sql_objects_mutex;
mutable std::mutex file_infos_mutex;
mutable std::mutex writing_files_mutex;
mutable std::mutex keeper_map_tables_mutex;
};

}
72 changes: 72 additions & 0 deletions src/Backups/BackupCoordinationRemote.cpp
Expand Up @@ -230,6 +230,7 @@ void BackupCoordinationRemote::createRootNodes()
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/repl_data_paths", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/repl_access", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/repl_sql_objects", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/keeper_map_tables", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/file_infos", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/writing_files", "", zkutil::CreateMode::Persistent));
zk->tryMulti(ops, responses);
Expand Down Expand Up @@ -666,6 +667,77 @@ void BackupCoordinationRemote::prepareReplicatedSQLObjects() const
replicated_sql_objects->addDirectory(std::move(directory));
}

void BackupCoordinationRemote::addKeeperMapTable(const String & table_zookeeper_root_path, const String & table_id, const String & data_path_in_backup)
{
{
std::lock_guard lock{keeper_map_tables_mutex};
if (keeper_map_tables)
throw Exception(ErrorCodes::LOGICAL_ERROR, "addKeeperMapTable() must not be called after preparing");
}

auto holder = with_retries.createRetriesControlHolder("addKeeperMapTable");
holder.retries_ctl.retryLoop(
[&, &zk = holder.faulty_zookeeper]()
{
with_retries.renewZooKeeper(zk);
String path = zookeeper_path + "/keeper_map_tables/" + escapeForFileName(table_id);
zk->create(path, fmt::format("{}\n{}", table_zookeeper_root_path, data_path_in_backup), zkutil::CreateMode::Persistent);
});
}

void BackupCoordinationRemote::prepareKeeperMapTables() const
{
if (keeper_map_tables)
return;

std::vector<std::pair<std::string, BackupCoordinationKeeperMapTables::KeeperMapTableInfo>> keeper_map_table_infos;
auto holder = with_retries.createRetriesControlHolder("prepareKeeperMapTables");
holder.retries_ctl.retryLoop(
[&, &zk = holder.faulty_zookeeper]()
{
keeper_map_table_infos.clear();

with_retries.renewZooKeeper(zk);

fs::path tables_path = fs::path(zookeeper_path) / "keeper_map_tables";

auto tables = zk->getChildren(tables_path);
keeper_map_table_infos.reserve(tables.size());

for (auto & table : tables)
table = tables_path / table;

auto tables_info = zk->get(tables);
for (size_t i = 0; i < tables_info.size(); ++i)
{
const auto & table_info = tables_info[i];

if (table_info.error != Coordination::Error::ZOK)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Path in Keeper {} is unexpectedly missing", tables[i]);

std::vector<std::string> data;
boost::split(data, table_info.data, [](char c) { return c == '\n'; });
keeper_map_table_infos.emplace_back(
std::move(data[0]),
BackupCoordinationKeeperMapTables::KeeperMapTableInfo{
.table_id = fs::path(tables[i]).filename(), .data_path_in_backup = std::move(data[1])});
}
});

keeper_map_tables.emplace();
for (const auto & [zk_root_path, table_info] : keeper_map_table_infos)
keeper_map_tables->addTable(zk_root_path, table_info.table_id, table_info.data_path_in_backup);

}

String BackupCoordinationRemote::getKeeperMapDataPath(const String & table_zookeeper_root_path) const
{
std::lock_guard lock(keeper_map_tables_mutex);
prepareKeeperMapTables();
return keeper_map_tables->getDataPath(table_zookeeper_root_path);
}


void BackupCoordinationRemote::addFileInfos(BackupFileInfos && file_infos_)
{
{
Expand Down
14 changes: 14 additions & 0 deletions src/Backups/BackupCoordinationRemote.h
Expand Up @@ -5,6 +5,7 @@
#include <Backups/BackupCoordinationReplicatedAccess.h>
#include <Backups/BackupCoordinationReplicatedSQLObjects.h>
#include <Backups/BackupCoordinationReplicatedTables.h>
#include <Backups/BackupCoordinationKeeperMapTables.h>
#include <Backups/BackupCoordinationStageSync.h>
#include <Backups/WithRetries.h>

Expand Down Expand Up @@ -63,6 +64,9 @@ class BackupCoordinationRemote : public IBackupCoordination
void addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & dir_path) override;
Strings getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type) const override;

void addKeeperMapTable(const String & table_zookeeper_root_path, const String & table_id, const String & data_path_in_backup) override;
String getKeeperMapDataPath(const String & table_zookeeper_root_path) const override;

void addFileInfos(BackupFileInfos && file_infos) override;
BackupFileInfos getFileInfos() const override;
BackupFileInfos getFileInfosForAllHosts() const override;
Expand All @@ -85,6 +89,7 @@ class BackupCoordinationRemote : public IBackupCoordination
void prepareReplicatedTables() const TSA_REQUIRES(replicated_tables_mutex);
void prepareReplicatedAccess() const TSA_REQUIRES(replicated_access_mutex);
void prepareReplicatedSQLObjects() const TSA_REQUIRES(replicated_sql_objects_mutex);
void prepareKeeperMapTables() const TSA_REQUIRES(keeper_map_tables_mutex);
void prepareFileInfos() const TSA_REQUIRES(file_infos_mutex);

const String root_zookeeper_path;
Expand All @@ -106,14 +111,23 @@ class BackupCoordinationRemote : public IBackupCoordination
mutable std::optional<BackupCoordinationReplicatedAccess> TSA_GUARDED_BY(replicated_access_mutex) replicated_access;
mutable std::optional<BackupCoordinationReplicatedSQLObjects> TSA_GUARDED_BY(replicated_sql_objects_mutex) replicated_sql_objects;
mutable std::optional<BackupCoordinationFileInfos> TSA_GUARDED_BY(file_infos_mutex) file_infos;
mutable std::optional<BackupCoordinationKeeperMapTables> keeper_map_tables TSA_GUARDED_BY(keeper_map_tables_mutex);
std::unordered_set<size_t> TSA_GUARDED_BY(writing_files_mutex) writing_files;

struct KeeperMapTableInfo
{
String table_id;
String data_path_in_backup;
};


mutable std::mutex zookeeper_mutex;
mutable std::mutex replicated_tables_mutex;
mutable std::mutex replicated_access_mutex;
mutable std::mutex replicated_sql_objects_mutex;
mutable std::mutex file_infos_mutex;
mutable std::mutex writing_files_mutex;
mutable std::mutex keeper_map_tables_mutex;
};

}
11 changes: 1 addition & 10 deletions src/Backups/BackupsWorker.cpp
Expand Up @@ -58,16 +58,7 @@ namespace

auto get_zookeeper = [global_context = context->getGlobalContext()] { return global_context->getZooKeeper(); };

BackupCoordinationRemote::BackupKeeperSettings keeper_settings
{
.keeper_max_retries = context->getSettingsRef().backup_restore_keeper_max_retries,
.keeper_retry_initial_backoff_ms = context->getSettingsRef().backup_restore_keeper_retry_initial_backoff_ms,
.keeper_retry_max_backoff_ms = context->getSettingsRef().backup_restore_keeper_retry_max_backoff_ms,
.batch_size_for_keeper_multiread = context->getSettingsRef().backup_restore_batch_size_for_keeper_multiread,
.keeper_fault_injection_probability = context->getSettingsRef().backup_restore_keeper_fault_injection_probability,
.keeper_fault_injection_seed = context->getSettingsRef().backup_restore_keeper_fault_injection_seed,
.keeper_value_max_size = context->getSettingsRef().backup_restore_keeper_value_max_size,
};
BackupCoordinationRemote::BackupKeeperSettings keeper_settings = WithRetries::KeeperSettings::fromContext(context);

auto all_hosts = BackupSettings::Util::filterHostIDs(
backup_settings.cluster_host_ids, backup_settings.shard_num, backup_settings.replica_num);
Expand Down
6 changes: 6 additions & 0 deletions src/Backups/IBackupCoordination.h
Expand Up @@ -56,6 +56,12 @@ class IBackupCoordination
/// Returns all mutations of a replicated table which are not finished for some data parts added by addReplicatedPartNames().
virtual std::vector<MutationInfo> getReplicatedMutations(const String & table_shared_id, const String & replica_name) const = 0;

/// Adds information about KeeperMap tables
virtual void addKeeperMapTable(const String & table_zookeeper_root_path, const String & table_id, const String & data_path_in_backup) = 0;

/// KeeperMap tables use shared storage without local data so only one table should backup the data
virtual String getKeeperMapDataPath(const String & table_zookeeper_root_path) const = 0;

/// Adds a data path in backup for a replicated table.
/// Multiple replicas of the replicated table call this function and then all the added paths can be returned by call of the function
/// getReplicatedDataPaths().
Expand Down
4 changes: 4 additions & 0 deletions src/Backups/IRestoreCoordination.h
Expand Up @@ -41,6 +41,10 @@ class IRestoreCoordination
/// The function returns false if user-defined function at a specified zk path are being already restored by another replica.
virtual bool acquireReplicatedSQLObjects(const String & loader_zk_path, UserDefinedSQLObjectType object_type) = 0;

/// Sets that this table is going to restore data into Keeper for all KeeperMap tables defined on root_zk_path.
/// The function returns false if data for this specific root path is already being restored by another table.
virtual bool acquireInsertingDataForKeeperMap(const String & root_zk_path) = 0;

/// Generates a new UUID for a table. The same UUID must be used for a replicated table on each replica,
/// (because otherwise the macro "{uuid}" in the ZooKeeper path will not work correctly).
virtual void generateUUIDForTable(ASTCreateQuery & create_query) = 0;
Expand Down
6 changes: 6 additions & 0 deletions src/Backups/RestoreCoordinationLocal.cpp
Expand Up @@ -52,6 +52,12 @@ bool RestoreCoordinationLocal::acquireReplicatedSQLObjects(const String &, UserD
return true;
}

bool RestoreCoordinationLocal::acquireInsertingDataForKeeperMap(const String & root_zk_path)
{
std::lock_guard lock{mutex};
return acquired_data_in_keeper_map_tables.emplace(root_zk_path).second;
}

void RestoreCoordinationLocal::generateUUIDForTable(ASTCreateQuery & create_query)
{
String query_str = serializeAST(create_query);
Expand Down
5 changes: 5 additions & 0 deletions src/Backups/RestoreCoordinationLocal.h
Expand Up @@ -40,6 +40,10 @@ class RestoreCoordinationLocal : public IRestoreCoordination
/// The function returns false if user-defined function at a specified zk path are being already restored by another replica.
bool acquireReplicatedSQLObjects(const String & loader_zk_path, UserDefinedSQLObjectType object_type) override;

/// Sets that this table is going to restore data into Keeper for all KeeperMap tables defined on root_zk_path.
/// The function returns false if data for this specific root path is already being restored by another table.
bool acquireInsertingDataForKeeperMap(const String & root_zk_path) override;

/// Generates a new UUID for a table. The same UUID must be used for a replicated table on each replica,
/// (because otherwise the macro "{uuid}" in the ZooKeeper path will not work correctly).
void generateUUIDForTable(ASTCreateQuery & create_query) override;
Expand All @@ -52,6 +56,7 @@ class RestoreCoordinationLocal : public IRestoreCoordination
std::set<std::pair<String /* database_zk_path */, String /* table_name */>> acquired_tables_in_replicated_databases;
std::unordered_set<String /* table_zk_path */> acquired_data_in_replicated_tables;
std::unordered_map<String, ASTCreateQuery::UUIDs> create_query_uuids;
std::unordered_set<String /* root_zk_path */> acquired_data_in_keeper_map_tables;

mutable std::mutex mutex;
};
Expand Down
40 changes: 40 additions & 0 deletions src/Backups/RestoreCoordinationRemote.cpp
Expand Up @@ -89,6 +89,7 @@ void RestoreCoordinationRemote::createRootNodes()
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/repl_tables_data_acquired", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/repl_access_storages_acquired", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/repl_sql_objects_acquired", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/keeper_map_tables", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/table_uuids", "", zkutil::CreateMode::Persistent));
zk->tryMulti(ops, responses);
});
Expand Down Expand Up @@ -234,6 +235,45 @@ bool RestoreCoordinationRemote::acquireReplicatedSQLObjects(const String & loade
return result;
}

bool RestoreCoordinationRemote::acquireInsertingDataForKeeperMap(const String & root_zk_path)
{
bool result = false;
auto holder = with_retries.createRetriesControlHolder("acquireInsertingDataForKeeperMap");
holder.retries_ctl.retryLoop(
[&, &zk = holder.faulty_zookeeper]()
{
with_retries.renewZooKeeper(zk);

fs::path base_path = fs::path(zookeeper_path) / "keeper_map_tables" / root_zk_path;
zk->createAncestors(base_path);
std::string restore_lock_path = base_path / "restore_lock";
result = zk->tryCreate(restore_lock_path, "restorelock", zkutil::CreateMode::Persistent) == Coordination::Error::ZOK;
antonio2368 marked this conversation as resolved.
Show resolved Hide resolved

if (result)
return;

/// there can be an edge case where a path contains `/restore_lock/ in the middle of it
antonio2368 marked this conversation as resolved.
Show resolved Hide resolved
/// to differentiate that case from lock we also set the data
for (size_t i = 0; i < 1000; ++i)
{
Coordination::Stat lock_stat;
auto data = zk->get(restore_lock_path, &lock_stat);
if (data == "restorelock")
return;

if (auto set_result = zk->trySet(restore_lock_path, "restorelock", lock_stat.version);
set_result == Coordination::Error::ZOK)
{
result = true;
return;
}
else if (set_result == Coordination::Error::ZNONODE)
throw zkutil::KeeperException::fromPath(set_result, restore_lock_path);
}
});
return result;
}

void RestoreCoordinationRemote::generateUUIDForTable(ASTCreateQuery & create_query)
{
String query_str = serializeAST(create_query);
Expand Down
4 changes: 4 additions & 0 deletions src/Backups/RestoreCoordinationRemote.h
Expand Up @@ -46,6 +46,10 @@ class RestoreCoordinationRemote : public IRestoreCoordination
/// The function returns false if user-defined function at a specified zk path are being already restored by another replica.
bool acquireReplicatedSQLObjects(const String & loader_zk_path, UserDefinedSQLObjectType object_type) override;

/// Sets that this table is going to restore data into Keeper for all KeeperMap tables defined on root_zk_path.
/// The function returns false if data for this specific root path is already being restored by another table.
bool acquireInsertingDataForKeeperMap(const String & root_zk_path) override;

/// Generates a new UUID for a table. The same UUID must be used for a replicated table on each replica,
/// (because otherwise the macro "{uuid}" in the ZooKeeper path will not work correctly).
void generateUUIDForTable(ASTCreateQuery & create_query) override;
Expand Down