Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add restore setting "restore_broken_parts_as_detached" #53877

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Backups/BackupSettings.cpp
Expand Up @@ -31,6 +31,7 @@ namespace ErrorCodes
M(Bool, read_from_filesystem_cache) \
M(UInt64, shard_num) \
M(UInt64, replica_num) \
M(Bool, check_parts) \
M(Bool, internal) \
M(String, host_id) \
M(OptionalUUID, backup_uuid)
Expand Down
3 changes: 3 additions & 0 deletions src/Backups/BackupSettings.h
Expand Up @@ -59,6 +59,9 @@ struct BackupSettings
/// Can only be used with BACKUP ON CLUSTER.
size_t replica_num = 0;

/// Check checksums of the data parts before writing them to a backup.
bool check_parts = true;

/// Internal, should not be specified by user.
/// Whether this backup is a part of a distributed backup created by BACKUP ON CLUSTER.
bool internal = false;
Expand Down
1 change: 1 addition & 0 deletions src/Backups/RestoreSettings.cpp
Expand Up @@ -164,6 +164,7 @@ namespace
M(RestoreUDFCreationMode, create_function) \
M(Bool, allow_s3_native_copy) \
M(Bool, use_same_s3_credentials_for_base_backup) \
M(Bool, restore_broken_parts_as_detached) \
M(Bool, internal) \
M(String, host_id) \
M(OptionalString, storage_policy) \
Expand Down
4 changes: 4 additions & 0 deletions src/Backups/RestoreSettings.h
Expand Up @@ -113,6 +113,10 @@ struct RestoreSettings
/// Whether base backup from S3 should inherit credentials from the RESTORE query.
bool use_same_s3_credentials_for_base_backup = false;

/// If it's true RESTORE won't stop on broken parts while restoring, instead they will be restored as detached parts
/// to the `detached` folder with names starting with `broken-from-backup'.
bool restore_broken_parts_as_detached = false;

/// Internal, should not be specified by user.
bool internal = false;

Expand Down
7 changes: 7 additions & 0 deletions src/Storages/MergeTree/IMergeTreeDataPart.cpp
Expand Up @@ -1909,6 +1909,13 @@ void IMergeTreeDataPart::checkConsistency(bool /* require_part_metadata */) cons
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'checkConsistency' is not implemented for part with type {}", getType().toString());
}

void IMergeTreeDataPart::checkConsistencyWithProjections(bool require_part_metadata) const
{
checkConsistency(require_part_metadata);
for (const auto & [_, proj_part] : projection_parts)
proj_part->checkConsistency(require_part_metadata);
}

void IMergeTreeDataPart::calculateColumnsAndSecondaryIndicesSizesOnDisk()
{
calculateColumnsSizesOnDisk();
Expand Down
7 changes: 6 additions & 1 deletion src/Storages/MergeTree/IMergeTreeDataPart.h
Expand Up @@ -489,6 +489,12 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar

void writeChecksums(const MergeTreeDataPartChecksums & checksums_, const WriteSettings & settings);

/// Checks the consistency of this data part.
virtual void checkConsistency(bool require_part_metadata) const;

/// Checks the consistency of this data part, and check the consistency of its projections (if any) as well.
void checkConsistencyWithProjections(bool require_part_metadata) const;

/// "delete-on-destroy.txt" is deprecated. It is no longer being created, only is removed.
/// TODO: remove this method after some time.
void removeDeleteOnDestroyMarker();
Expand Down Expand Up @@ -534,7 +540,6 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar

void removeIfNeeded();

virtual void checkConsistency(bool require_part_metadata) const;
void checkConsistencyBase() const;

/// Fill each_columns_size and total_size with sizes from columns files on
Expand Down
149 changes: 128 additions & 21 deletions src/Storages/MergeTree/MergeTreeData.cpp
Expand Up @@ -5254,6 +5254,9 @@ MergeTreeData::PartsBackupEntries MergeTreeData::backupParts(
if (hold_table_lock && !table_lock)
table_lock = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);

if (backup_settings.check_parts)
part->checkConsistencyWithProjections(/* require_part_metadata= */ true);

BackupEntries backup_entries_from_part;
part->getDataPartStorage().backup(
part->checksums,
Expand Down Expand Up @@ -5314,8 +5317,8 @@ void MergeTreeData::restoreDataFromBackup(RestorerFromBackup & restorer, const S
class MergeTreeData::RestoredPartsHolder
{
public:
RestoredPartsHolder(const std::shared_ptr<MergeTreeData> & storage_, const BackupPtr & backup_, size_t num_parts_)
: storage(storage_), backup(backup_), num_parts(num_parts_)
RestoredPartsHolder(const std::shared_ptr<MergeTreeData> & storage_, const BackupPtr & backup_)
: storage(storage_), backup(backup_)
{
}

Expand All @@ -5328,6 +5331,13 @@ class MergeTreeData::RestoredPartsHolder
attachIfAllPartsRestored();
}

void increaseNumBrokenParts()
{
std::lock_guard lock{mutex};
++num_broken_parts;
attachIfAllPartsRestored();
}

void addPart(MutableDataPartPtr part)
{
std::lock_guard lock{mutex};
Expand All @@ -5347,7 +5357,7 @@ class MergeTreeData::RestoredPartsHolder
private:
void attachIfAllPartsRestored()
{
if (!num_parts || (parts.size() < num_parts))
if (!num_parts || (parts.size() + num_broken_parts < num_parts))
return;

/// Sort parts by min_block (because we need to preserve the order of parts).
Expand All @@ -5362,9 +5372,10 @@ class MergeTreeData::RestoredPartsHolder
num_parts = 0;
}

std::shared_ptr<MergeTreeData> storage;
BackupPtr backup;
const std::shared_ptr<MergeTreeData> storage;
const BackupPtr backup;
size_t num_parts = 0;
size_t num_broken_parts = 0;
MutableDataPartsVector parts;
std::map<DiskPtr, std::shared_ptr<TemporaryFileOnDisk>> temp_dirs;
mutable std::mutex mutex;
Expand All @@ -5380,8 +5391,9 @@ void MergeTreeData::restorePartsFromBackup(RestorerFromBackup & restorer, const
Strings part_names = backup->listFiles(data_path_in_backup);
boost::remove_erase(part_names, "mutations");

auto restored_parts_holder
= std::make_shared<RestoredPartsHolder>(std::static_pointer_cast<MergeTreeData>(shared_from_this()), backup, part_names.size());
bool restore_broken_parts_as_detached = restorer.getRestoreSettings().restore_broken_parts_as_detached;

auto restored_parts_holder = std::make_shared<RestoredPartsHolder>(std::static_pointer_cast<MergeTreeData>(shared_from_this()), backup);

fs::path data_path_in_backup_fs = data_path_in_backup;
size_t num_parts = 0;
Expand All @@ -5403,42 +5415,45 @@ void MergeTreeData::restorePartsFromBackup(RestorerFromBackup & restorer, const
backup,
part_path_in_backup = data_path_in_backup_fs / part_name,
my_part_info = *part_info,
restore_broken_parts_as_detached,
restored_parts_holder]
{ storage->restorePartFromBackup(restored_parts_holder, my_part_info, part_path_in_backup); });
{ storage->restorePartFromBackup(restored_parts_holder, my_part_info, part_path_in_backup, restore_broken_parts_as_detached); });

++num_parts;
}

restored_parts_holder->setNumParts(num_parts);
}

void MergeTreeData::restorePartFromBackup(std::shared_ptr<RestoredPartsHolder> restored_parts_holder, const MergeTreePartInfo & part_info, const String & part_path_in_backup) const
void MergeTreeData::restorePartFromBackup(std::shared_ptr<RestoredPartsHolder> restored_parts_holder, const MergeTreePartInfo & part_info, const String & part_path_in_backup, bool detach_if_broken) const
{
String part_name = part_info.getPartNameAndCheckFormat(format_version);
auto backup = restored_parts_holder->getBackup();

/// Calculate the total size of the part.
UInt64 total_size_of_part = 0;
Strings filenames = backup->listFiles(part_path_in_backup, /* recursive= */ true);
fs::path part_path_in_backup_fs = part_path_in_backup;
for (const String & filename : filenames)
total_size_of_part += backup->getFileSize(part_path_in_backup_fs / filename);

std::shared_ptr<IReservation> reservation = getStoragePolicy()->reserveAndCheck(total_size_of_part);
auto disk = reservation->getDisk();

fs::path temp_dir = restored_parts_holder->getTemporaryDirectory(disk);
fs::path temp_part_dir = temp_dir / part_path_in_backup_fs.relative_path();
disk->createDirectories(temp_part_dir);

/// For example:
/// Calculate paths, for example:
/// part_name = 0_1_1_0
/// part_path_in_backup = /data/test/table/0_1_1_0
/// tmp_dir = tmp/1aaaaaa
/// tmp_part_dir = tmp/1aaaaaa/data/test/table/0_1_1_0
auto disk = reservation->getDisk();
fs::path temp_dir = restored_parts_holder->getTemporaryDirectory(disk);
fs::path temp_part_dir = temp_dir / part_path_in_backup_fs.relative_path();

/// Subdirectories in the part's directory. It's used to restore projections.
std::unordered_set<String> subdirs;

/// Copy files from the backup to the directory `tmp_part_dir`.
disk->createDirectories(temp_part_dir);

for (const String & filename : filenames)
{
/// Needs to create subdirectories before copying the files. Subdirectories are used to represent projections.
Expand All @@ -5458,14 +5473,106 @@ void MergeTreeData::restorePartFromBackup(std::shared_ptr<RestoredPartsHolder> r
reservation->update(reservation->getSize() - file_size);
}

if (auto part = loadPartRestoredFromBackup(disk, temp_part_dir.parent_path(), part_name, detach_if_broken))
restored_parts_holder->addPart(part);
else
restored_parts_holder->increaseNumBrokenParts();
}

MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartRestoredFromBackup(const DiskPtr & disk, const String & temp_dir, const String & part_name, bool detach_if_broken) const
{
MutableDataPartPtr part;

auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk, 0);
MergeTreeDataPartBuilder builder(*this, part_name, single_disk_volume, temp_part_dir.parent_path(), part_name);
builder.withPartFormatFromDisk();
auto part = std::move(builder).build();
part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
part->loadColumnsChecksumsIndexes(false, true);

restored_parts_holder->addPart(part);
/// Load this part from the directory `tmp_part_dir`.
auto load_part = [&]
{
MergeTreeDataPartBuilder builder(*this, part_name, single_disk_volume, temp_dir, part_name);
builder.withPartFormatFromDisk();
part = std::move(builder).build();
part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
part->loadColumnsChecksumsIndexes(/* require_columns_checksums= */ false, /* check_consistency= */ true);
};

/// Broken parts can appear in a backup sometimes.
auto mark_broken = [&](const std::exception_ptr error)
{
tryLogException(error, log,
fmt::format("Part {} will be restored as detached because it's broken. You need to resolve this manually", part_name));
if (!part)
{
/// Make a fake data part only to copy its files to /detached/.
part = MergeTreeDataPartBuilder{*this, part_name, single_disk_volume, temp_dir, part_name}
.withPartStorageType(MergeTreeDataPartStorageType::Full)
.withPartType(MergeTreeDataPartType::Wide)
.build();
}
part->renameToDetached("broken-from-backup");
};

/// Try to load this part multiple times.
auto backoff_ms = loading_parts_initial_backoff_ms;
for (size_t try_no = 0; try_no < loading_parts_max_tries; ++try_no)
{
std::exception_ptr error;
bool retryable = false;
try
{
load_part();
}
catch (const Exception & e)
{
error = std::current_exception();
retryable = isRetryableException(e);
}
catch (const Poco::Net::NetException &)
{
error = std::current_exception();
retryable = true;
}
catch (const Poco::TimeoutException &)
{
error = std::current_exception();
retryable = true;
}
catch (...)
{
error = std::current_exception();
}

if (!error)
return part;

if (!retryable && detach_if_broken)
{
mark_broken(error);
return nullptr;
}

if (!retryable)
{
LOG_ERROR(log,
"Failed to restore part {} because it's broken. You can skip broken parts while restoring by setting "
"'restore_broken_parts_as_detached = true'",
part_name);
}

if (!retryable || (try_no + 1 == loading_parts_max_tries))
{
if (Exception * e = exception_cast<Exception *>(error))
e->addMessage("while restoring part {} of table {}", part->name, getStorageID());
std::rethrow_exception(error);
}

tryLogException(error, log,
fmt::format("Failed to load part {} at try {} with a retryable error. Will retry in {} ms", part_name, try_no, backoff_ms));

std::this_thread::sleep_for(std::chrono::milliseconds(backoff_ms));
backoff_ms = std::min(backoff_ms * 2, loading_parts_max_backoff_ms);
}

UNREACHABLE();
}


Expand Down
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/MergeTreeData.h
Expand Up @@ -1357,7 +1357,8 @@ class MergeTreeData : public IStorage, public WithMutableContext

/// Restores the parts of this table from backup.
void restorePartsFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional<ASTs> & partitions);
void restorePartFromBackup(std::shared_ptr<RestoredPartsHolder> restored_parts_holder, const MergeTreePartInfo & part_info, const String & part_path_in_backup) const;
void restorePartFromBackup(std::shared_ptr<RestoredPartsHolder> restored_parts_holder, const MergeTreePartInfo & part_info, const String & part_path_in_backup, bool detach_if_broken) const;
MutableDataPartPtr loadPartRestoredFromBackup(const DiskPtr & disk, const String & temp_dir, const String & part_name, bool detach_if_broken) const;

/// Attaches restored parts to the storage.
virtual void attachRestoredParts(MutableDataPartsVector && parts) = 0;
Expand Down
6 changes: 4 additions & 2 deletions src/Storages/MergeTree/MergeTreePartInfo.h
Expand Up @@ -163,7 +163,8 @@ struct DetachedPartInfo : public MergeTreePartInfo
"tmp-fetch",
"covered-by-broken",
"merge-not-byte-identical",
"mutate-not-byte-identical"
"mutate-not-byte-identical",
"broken-from-backup",
});

static constexpr auto DETACHED_REASONS_REMOVABLE_BY_TIMEOUT = std::to_array<std::string_view>({
Expand All @@ -175,7 +176,8 @@ struct DetachedPartInfo : public MergeTreePartInfo
"deleting",
"clone",
"merge-not-byte-identical",
"mutate-not-byte-identical"
"mutate-not-byte-identical",
"broken-from-backup",
});

/// NOTE: It may parse part info incorrectly.
Expand Down
11 changes: 9 additions & 2 deletions tests/config/config.d/backups.xml
@@ -1,6 +1,13 @@
<clickhouse>
<storage_configuration>
<disks>
<backups>
<type>local</type>
<path>/var/lib/clickhouse/disks/backups/</path>
</backups>
</disks>
</storage_configuration>
<backups>
<allowed_disk>default</allowed_disk>
<allowed_path>/backups</allowed_path>
<allowed_disk>backups</allowed_disk>
</backups>
</clickhouse>
2 changes: 1 addition & 1 deletion tests/queries/0_stateless/02704_max_backup_bandwidth.sh
Expand Up @@ -14,7 +14,7 @@ $CLICKHOUSE_CLIENT -nm -q "
$CLICKHOUSE_CLIENT -q "insert into data select * from numbers(1e6)"

query_id=$(random_str 10)
$CLICKHOUSE_CLIENT --query_id "$query_id" -q "backup table data to Disk('default', 'backups/$CLICKHOUSE_DATABASE/data/backup1')" --max_backup_bandwidth=1M > /dev/null
$CLICKHOUSE_CLIENT --query_id "$query_id" -q "backup table data to Disk('backups', '$CLICKHOUSE_DATABASE/data/backup1')" --max_backup_bandwidth=1M > /dev/null
$CLICKHOUSE_CLIENT -nm -q "
SYSTEM FLUSH LOGS;
SELECT
Expand Down
Expand Up @@ -39,7 +39,7 @@ $CLICKHOUSE_CLIENT -q "BACKUP TABLE data TO S3($(s3_location inc_4_bad)) SETTING
$CLICKHOUSE_CLIENT -q "BACKUP TABLE data TO S3($(s3_location inc_5_bad), 'foo') SETTINGS base_backup=S3($(s3_location inc_1)), use_same_s3_credentials_for_base_backup=1" |& grep -o -m1 NUMBER_OF_ARGUMENTS_DOESNT_MATCH

echo 'use_same_s3_credentials_for_base_backup for Disk'
$CLICKHOUSE_CLIENT -q "BACKUP TABLE data TO Disk('default', '$CLICKHOUSE_DATABASE/backup_1') SETTINGS use_same_s3_credentials_for_base_backup=1" | cut -f2
$CLICKHOUSE_CLIENT -q "BACKUP TABLE data TO Disk('default', '$CLICKHOUSE_DATABASE/backup_2') SETTINGS use_same_s3_credentials_for_base_backup=1, base_backup=Disk('default', '$CLICKHOUSE_DATABASE/backup_1')" |& grep -o -m1 BAD_ARGUMENTS
$CLICKHOUSE_CLIENT -q "BACKUP TABLE data TO Disk('backups', '$CLICKHOUSE_DATABASE/backup_1') SETTINGS use_same_s3_credentials_for_base_backup=1" | cut -f2
$CLICKHOUSE_CLIENT -q "BACKUP TABLE data TO Disk('backups', '$CLICKHOUSE_DATABASE/backup_2') SETTINGS use_same_s3_credentials_for_base_backup=1, base_backup=Disk('backups', '$CLICKHOUSE_DATABASE/backup_1')" |& grep -o -m1 BAD_ARGUMENTS

exit 0
@@ -0,0 +1,5 @@
data.bin doesn't exist: while restoring part all_2_2_0
RESTORED
1
3
broken-from-backup_all_2_2_0 broken-from-backup