Skip to content

Commit

Permalink
Define BackupReferenceEntry
Browse files Browse the repository at this point in the history
  • Loading branch information
antonio2368 committed Nov 13, 2023
1 parent 5948020 commit 6443211
Show file tree
Hide file tree
Showing 19 changed files with 170 additions and 51 deletions.
8 changes: 8 additions & 0 deletions src/Backups/BackupCoordinationFileInfos.cpp
Expand Up @@ -84,6 +84,10 @@ void BackupCoordinationFileInfos::prepare() const
for (size_t i = 0; i != file_infos_for_all_hosts.size(); ++i)
{
auto & info = *(file_infos_for_all_hosts[i]);

if (!info.reference_target.empty())
continue;

info.data_file_name = info.file_name;
info.data_file_index = i;
info.base_size = 0; /// Base backup must not be used while creating a plain backup.
Expand All @@ -101,6 +105,10 @@ void BackupCoordinationFileInfos::prepare() const
for (size_t i = 0; i != file_infos_for_all_hosts.size(); ++i)
{
auto & info = *(file_infos_for_all_hosts[i]);

if (!info.reference_target.empty())
continue;

if (info.size == info.base_size)
{
/// A file is either empty or can be get from the base backup as a whole.
Expand Down
2 changes: 2 additions & 0 deletions src/Backups/BackupCoordinationRemote.cpp
Expand Up @@ -116,6 +116,7 @@ namespace
writeBinary(info.base_size, out);
writeBinary(info.base_checksum, out);
writeBinary(info.encrypted_by_disk, out);
writeBinary(info.reference_target, out);
/// We don't store `info.data_file_name` and `info.data_file_index` because they're determined automalically
/// after reading file infos for all the hosts (see the class BackupCoordinationFileInfos).
}
Expand All @@ -138,6 +139,7 @@ namespace
readBinary(info.base_size, in);
readBinary(info.base_checksum, in);
readBinary(info.encrypted_by_disk, in);
readBinary(info.reference_target, in);
}
return res;
}
Expand Down
47 changes: 47 additions & 0 deletions src/Backups/BackupEntryReference.cpp
@@ -0,0 +1,47 @@
#include <Backups/BackupEntryReference.h>

namespace DB
{

namespace ErrorCodes
{

extern const int NOT_IMPLEMENTED;

}

BackupEntryReference::BackupEntryReference(std::string reference_target_)
: reference_target(std::move(reference_target_))
{}

bool BackupEntryReference::isReference() const
{
return true;
}

String BackupEntryReference::getReferenceTarget() const
{
return reference_target;
}

UInt64 BackupEntryReference::getSize() const
{
return 0;
}

UInt128 BackupEntryReference::getChecksum(const ReadSettings & /*read_settings*/) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Checksum not implemented for reference backup entries");
}

std::unique_ptr<SeekableReadBuffer> BackupEntryReference::getReadBuffer(const ReadSettings & /*read_settings*/) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Reading not implemented for reference backup entries");
}

DataSourceDescription BackupEntryReference::getDataSourceDescription() const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Data source description not implemented for reference backup entries");
}

}
25 changes: 25 additions & 0 deletions src/Backups/BackupEntryReference.h
@@ -0,0 +1,25 @@
#pragma once

#include <Backups/IBackupEntry.h>

namespace DB
{

/// Represents a reference to another backup entry.
class BackupEntryReference : public IBackupEntry
{
public:
explicit BackupEntryReference(std::string reference_target_);

UInt64 getSize() const override;
UInt128 getChecksum(const ReadSettings & read_settings) const override;
std::unique_ptr<SeekableReadBuffer> getReadBuffer(const ReadSettings & read_settings) const override;
DataSourceDescription getDataSourceDescription() const override;

bool isReference() const override;
String getReferenceTarget() const override;
private:
String reference_target;
};

}
10 changes: 10 additions & 0 deletions src/Backups/BackupFileInfo.cpp
Expand Up @@ -89,6 +89,8 @@ String BackupFileInfo::describe() const
result += fmt::format("data_file_name: {};\n", data_file_name);
result += fmt::format("data_file_index: {};\n", data_file_index);
result += fmt::format("encrypted_by_disk: {};\n", encrypted_by_disk);
if (!reference_target.empty())
result += fmt::format("reference_target: {};\n", reference_target);
return result;
}

Expand All @@ -104,6 +106,14 @@ BackupFileInfo buildFileInfoForBackupEntry(

BackupFileInfo info;
info.file_name = adjusted_path;

/// If it's a "reference" just set the target to a concrete file
if (backup_entry->isReference())
{
info.reference_target = removeLeadingSlash(backup_entry->getReferenceTarget());
return info;
}

info.size = backup_entry->getSize();
info.encrypted_by_disk = backup_entry->isEncryptedByDisk();

Expand Down
3 changes: 3 additions & 0 deletions src/Backups/BackupFileInfo.h
Expand Up @@ -39,6 +39,9 @@ struct BackupFileInfo
/// Whether this file is encrypted by an encrypted disk.
bool encrypted_by_disk = false;

/// Set if this file is just a reference to another file
String reference_target;

struct LessByFileName
{
bool operator()(const BackupFileInfo & lhs, const BackupFileInfo & rhs) const { return (lhs.file_name < rhs.file_name); }
Expand Down
27 changes: 26 additions & 1 deletion src/Backups/BackupImpl.cpp
Expand Up @@ -362,10 +362,10 @@ void BackupImpl::writeBackupMetadata()
*out << "<file>";

*out << "<name>" << xml << info.file_name << "</name>";
*out << "<size>" << info.size << "</size>";

if (info.size)
{
*out << "<size>" << info.size << "</size>";
*out << "<checksum>" << hexChecksum(info.checksum) << "</checksum>";
if (info.base_size)
{
Expand All @@ -381,6 +381,10 @@ void BackupImpl::writeBackupMetadata()
if (info.encrypted_by_disk)
*out << "<encrypted_by_disk>true</encrypted_by_disk>";
}
else if (!info.reference_target.empty())
*out << "<reference_target>" << xml << info.reference_target << "</reference_target>";
else
*out << "<size>" << info.size << "</size>";

total_size += info.size;
bool has_entry = !deduplicate_files || (info.size && (info.size != info.base_size) && (info.data_file_name.empty() || (info.data_file_name == info.file_name)));
Expand Down Expand Up @@ -452,13 +456,22 @@ void BackupImpl::readBackupMetadata()
size_of_entries = 0;

const auto * contents = config_root->getNodeByPath("contents");
std::vector<std::pair<String /*source*/, String /*target*/>> reference_files;
for (const Poco::XML::Node * child = contents->firstChild(); child; child = child->nextSibling())
{
if (child->nodeName() == "file")
{
const Poco::XML::Node * file_config = child;
BackupFileInfo info;
info.file_name = getString(file_config, "name");

info.reference_target = getString(file_config, "reference_target", "");
if (!info.reference_target.empty())
{
reference_files.emplace_back(std::move(info.file_name), std::move(info.reference_target));
continue;
}

info.size = getUInt64(file_config, "size");
if (info.size)
{
Expand Down Expand Up @@ -508,6 +521,14 @@ void BackupImpl::readBackupMetadata()
}
}

for (auto & [source_file, target_file] : reference_files)
{
auto it = file_names.find(target_file);
if (it == file_names.end())
throw Exception(ErrorCodes::BACKUP_ENTRY_NOT_FOUND, "Backup entry {} referenced by {} not found", target_file, source_file);
file_names.emplace(std::move(source_file), it->second);
}

uncompressed_size = size_of_entries + str.size();
compressed_size = uncompressed_size;
if (!use_archive)
Expand Down Expand Up @@ -873,6 +894,10 @@ size_t BackupImpl::copyFileToDisk(const SizeAndChecksum & size_and_checksum,

void BackupImpl::writeFile(const BackupFileInfo & info, BackupEntryPtr entry)
{
/// we don't write anything for reference files
if (entry->isReference())
return;

if (open_mode != OpenMode::WRITE)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Backup is not opened for writing");

Expand Down
7 changes: 4 additions & 3 deletions src/Backups/BackupImpl.h
Expand Up @@ -72,14 +72,11 @@ class BackupImpl : public IBackup
Strings listFiles(const String & directory, bool recursive) const override;
bool hasFiles(const String & directory) const override;
bool fileExists(const String & file_name) const override;
bool fileExists(const SizeAndChecksum & size_and_checksum) const override;
UInt64 getFileSize(const String & file_name) const override;
UInt128 getFileChecksum(const String & file_name) const override;
SizeAndChecksum getFileSizeAndChecksum(const String & file_name) const override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) const override;
std::unique_ptr<SeekableReadBuffer> readFile(const SizeAndChecksum & size_and_checksum) const override;
size_t copyFileToDisk(const String & file_name, DiskPtr destination_disk, const String & destination_path, WriteMode write_mode) const override;
size_t copyFileToDisk(const SizeAndChecksum & size_and_checksum, DiskPtr destination_disk, const String & destination_path, WriteMode write_mode) const override;
void writeFile(const BackupFileInfo & info, BackupEntryPtr entry) override;
void finalizeWriting() override;
bool supportsWritingInMultipleThreads() const override { return !use_archive; }
Expand Down Expand Up @@ -114,6 +111,10 @@ class BackupImpl : public IBackup

std::unique_ptr<SeekableReadBuffer> readFileImpl(const SizeAndChecksum & size_and_checksum, bool read_encrypted) const;

bool fileExists(const SizeAndChecksum & size_and_checksum) const override;
std::unique_ptr<SeekableReadBuffer> readFile(const SizeAndChecksum & size_and_checksum) const override;
size_t copyFileToDisk(const SizeAndChecksum & size_and_checksum, DiskPtr destination_disk, const String & destination_path, WriteMode write_mode) const override;

BackupInfo backup_info;
const String backup_name_for_logging;
const bool use_archive;
Expand Down
2 changes: 2 additions & 0 deletions src/Backups/IBackupEntriesLazyBatch.cpp
Expand Up @@ -27,6 +27,8 @@ class IBackupEntriesLazyBatch::BackupEntryFromBatch : public IBackupEntry
bool isFromImmutableFile() const override { return getInternalBackupEntry()->isFromImmutableFile(); }
String getFilePath() const override { return getInternalBackupEntry()->getFilePath(); }
DiskPtr getDisk() const override { return getInternalBackupEntry()->getDisk(); }
bool isReference() const override { return getInternalBackupEntry()->isReference(); }
String getReferenceTarget() const override { return getInternalBackupEntry()->getReferenceTarget(); }

private:
BackupEntryPtr getInternalBackupEntry() const
Expand Down
12 changes: 12 additions & 0 deletions src/Backups/IBackupEntry.h
Expand Up @@ -9,6 +9,12 @@

namespace DB
{

namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}

class SeekableReadBuffer;

/// A backup entry represents some data which should be written to the backup or has been read from the backup.
Expand Down Expand Up @@ -39,6 +45,12 @@ class IBackupEntry
virtual String getFilePath() const { return ""; }
virtual DiskPtr getDisk() const { return nullptr; }

virtual bool isReference() const { return false; }
virtual String getReferenceTarget() const
{
throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "getReferenceTarget not implemented for the backup entry");
}

virtual DataSourceDescription getDataSourceDescription() const = 0;
};

Expand Down
2 changes: 1 addition & 1 deletion src/Backups/IRestoreCoordination.h
Expand Up @@ -43,7 +43,7 @@ class IRestoreCoordination

/// Sets that this table is going to restore data into Keeper for all KeeperMap tables defined on root_zk_path.
/// The function returns false if data for this specific root path is already being restored by another table.
virtual bool acquireInsertingDataForKeeperMap(const String & root_zk_path) = 0;
virtual bool acquireInsertingDataForKeeperMap(const String & root_zk_path, const String & table_unique_id) = 0;

/// Generates a new UUID for a table. The same UUID must be used for a replicated table on each replica,
/// (because otherwise the macro "{uuid}" in the ZooKeeper path will not work correctly).
Expand Down
2 changes: 1 addition & 1 deletion src/Backups/RestoreCoordinationLocal.cpp
Expand Up @@ -52,7 +52,7 @@ bool RestoreCoordinationLocal::acquireReplicatedSQLObjects(const String &, UserD
return true;
}

bool RestoreCoordinationLocal::acquireInsertingDataForKeeperMap(const String & root_zk_path)
bool RestoreCoordinationLocal::acquireInsertingDataForKeeperMap(const String & root_zk_path, const String & /*table_unique_id*/)
{
std::lock_guard lock{mutex};
return acquired_data_in_keeper_map_tables.emplace(root_zk_path).second;
Expand Down
2 changes: 1 addition & 1 deletion src/Backups/RestoreCoordinationLocal.h
Expand Up @@ -42,7 +42,7 @@ class RestoreCoordinationLocal : public IRestoreCoordination

/// Sets that this table is going to restore data into Keeper for all KeeperMap tables defined on root_zk_path.
/// The function returns false if data for this specific root path is already being restored by another table.
bool acquireInsertingDataForKeeperMap(const String & root_zk_path) override;
bool acquireInsertingDataForKeeperMap(const String & root_zk_path, const String & table_unique_id) override;

/// Generates a new UUID for a table. The same UUID must be used for a replicated table on each replica,
/// (because otherwise the macro "{uuid}" in the ZooKeeper path will not work correctly).
Expand Down
37 changes: 13 additions & 24 deletions src/Backups/RestoreCoordinationRemote.cpp
Expand Up @@ -235,9 +235,9 @@ bool RestoreCoordinationRemote::acquireReplicatedSQLObjects(const String & loade
return result;
}

bool RestoreCoordinationRemote::acquireInsertingDataForKeeperMap(const String & root_zk_path)
bool RestoreCoordinationRemote::acquireInsertingDataForKeeperMap(const String & root_zk_path, const String & table_unique_id)
{
bool result = false;
bool lock_acquired = false;
auto holder = with_retries.createRetriesControlHolder("acquireInsertingDataForKeeperMap");
holder.retries_ctl.retryLoop(
[&, &zk = holder.faulty_zookeeper]()
Expand All @@ -246,33 +246,22 @@ bool RestoreCoordinationRemote::acquireInsertingDataForKeeperMap(const String &

/// we need to remove leading '/' from root_zk_path
auto normalized_root_zk_path = std::string_view{root_zk_path}.substr(1);
std::string restore_lock_path = fs::path(zookeeper_path) / "keeper_map_tables" / normalized_root_zk_path / "restore_lock";
std::string restore_lock_path = fs::path(zookeeper_path) / "keeper_map_tables" / escapeForFileName(normalized_root_zk_path);
zk->createAncestors(restore_lock_path);
result = zk->tryCreate(restore_lock_path, "restorelock", zkutil::CreateMode::Persistent) == Coordination::Error::ZOK;

if (result)
return;
auto code = zk->tryCreate(restore_lock_path, table_unique_id, zkutil::CreateMode::Persistent);

/// there can be an edge case where a path contains `/restore_lock/ in the middle of it
/// to differentiate that case from lock we also set the data
for (size_t i = 0; i < 1000; ++i)
if (code == Coordination::Error::ZOK)
{
Coordination::Stat lock_stat;
auto data = zk->get(restore_lock_path, &lock_stat);
if (data == "restorelock")
return;

if (auto set_result = zk->trySet(restore_lock_path, "restorelock", lock_stat.version);
set_result == Coordination::Error::ZOK)
{
result = true;
return;
}
else if (set_result == Coordination::Error::ZNONODE)
throw zkutil::KeeperException::fromPath(set_result, restore_lock_path);
lock_acquired = true;
return;
}

if (code == Coordination::Error::ZNODEEXISTS)
lock_acquired = table_unique_id == zk->get(restore_lock_path);
else
zkutil::KeeperException::fromPath(code, restore_lock_path);
});
return result;
return lock_acquired;
}

void RestoreCoordinationRemote::generateUUIDForTable(ASTCreateQuery & create_query)
Expand Down
2 changes: 1 addition & 1 deletion src/Backups/RestoreCoordinationRemote.h
Expand Up @@ -48,7 +48,7 @@ class RestoreCoordinationRemote : public IRestoreCoordination

/// Sets that this table is going to restore data into Keeper for all KeeperMap tables defined on root_zk_path.
/// The function returns false if data for this specific root path is already being restored by another table.
bool acquireInsertingDataForKeeperMap(const String & root_zk_path) override;
bool acquireInsertingDataForKeeperMap(const String & root_zk_path, const String & table_unique_id) override;

/// Generates a new UUID for a table. The same UUID must be used for a replicated table on each replica,
/// (because otherwise the macro "{uuid}" in the ZooKeeper path will not work correctly).
Expand Down
5 changes: 5 additions & 0 deletions src/Common/escapeForFileName.cpp
Expand Up @@ -6,6 +6,11 @@ namespace DB
{

std::string escapeForFileName(const std::string & s)
{
return escapeForFileName(std::string_view{s});
}

std::string escapeForFileName(std::string_view s)
{
std::string res;
const char * pos = s.data();
Expand Down
1 change: 1 addition & 0 deletions src/Common/escapeForFileName.h
Expand Up @@ -11,6 +11,7 @@ namespace DB
*/

std::string escapeForFileName(const std::string & s);
std::string escapeForFileName(std::string_view s);
std::string unescapeForFileName(const std::string & s);

}

0 comments on commit 6443211

Please sign in to comment.