Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SYSTEM UNFREEZE for ordinary database #38262

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/Parsers/ASTSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
if (!filesystem_cache_path.empty())
settings.ostr << (settings.hilite ? hilite_none : "") << " " << filesystem_cache_path;
}
else if (type == Type::UNFREEZE)
{
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(backup_name);
}
}


Expand Down
77 changes: 44 additions & 33 deletions src/Storages/Freeze.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ namespace DB
{
void FreezeMetaData::fill(const StorageReplicatedMergeTree & storage)
{
is_replicated = storage.supportsReplication();
is_remote = storage.isRemote();
replica_name = storage.getReplicaName();
zookeeper_name = storage.getZooKeeperName();
table_shared_id = storage.getTableSharedID();
Expand All @@ -26,11 +24,16 @@ void FreezeMetaData::save(DiskPtr data_disk, const String & path) const

writeIntText(version, buffer);
buffer.write("\n", 1);
writeBoolText(is_replicated, buffer);
buffer.write("\n", 1);
writeBoolText(is_remote, buffer);
buffer.write("\n", 1);
writeString(replica_name, buffer);
if (version == 1) {
/// is_replicated and is_remote are not used
bool is_replicated = true;
writeBoolText(is_replicated, buffer);
buffer.write("\n", 1);
bool is_remote = true;
writeBoolText(is_remote, buffer);
buffer.write("\n", 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to completely remove these flags without changing version. It will break compatibility with 22.6.1, but I think it's not a problem, because 22.6 was just released recently and I doubt that someone has already started to use 22.6.1 with this freeze-metadata-stuff in production. And previous serialization format was invalid anyway (I mean issues with escaping).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is, flags were introduced before this PR. See history of removed code for details

}
writeString(escapeForFileName(replica_name), buffer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also check that other strings cannot contain unexpected chars

buffer.write("\n", 1);
writeString(zookeeper_name, buffer);
buffer.write("\n", 1);
Expand All @@ -51,17 +54,23 @@ bool FreezeMetaData::load(DiskPtr data_disk, const String & path)
auto metadata_str = metadata_storage->readFileToString(file_path);
ReadBufferFromString buffer(metadata_str);
readIntText(version, buffer);
if (version != 1)
if (version < 1 or version > 2)
tavplubix marked this conversation as resolved.
Show resolved Hide resolved
{
LOG_ERROR(&Poco::Logger::get("FreezeMetaData"), "Unknown freezed metadata version: {}", version);
return false;
}
DB::assertChar('\n', buffer);
readBoolText(is_replicated, buffer);
DB::assertChar('\n', buffer);
readBoolText(is_remote, buffer);
DB::assertChar('\n', buffer);
readString(replica_name, buffer);
if (version == 1) {
/// is_replicated and is_remote are not used
bool is_replicated;
readBoolText(is_replicated, buffer);
DB::assertChar('\n', buffer);
bool is_remote;
readBoolText(is_remote, buffer);
DB::assertChar('\n', buffer);
}
auto unescaped_replica_name = unescapeForFileName(replica_name);
readString(unescaped_replica_name, buffer);
DB::assertChar('\n', buffer);
readString(zookeeper_name, buffer);
DB::assertChar('\n', buffer);
Expand Down Expand Up @@ -89,41 +98,46 @@ String FreezeMetaData::getFileName(const String & path)

BlockIO Unfreezer::unfreeze(const String & backup_name, ContextPtr local_context)
{
LOG_DEBUG(log, "Unfreezing backup {}", backup_name);
LOG_DEBUG(log, "Unfreezing backup {}", escapeForFileName(backup_name));
auto disks_map = local_context->getDisksMap();
Disks disks;
for (auto & [name, disk]: disks_map)
{
disks.push_back(disk);
}
auto backup_path = fs::path(backup_directory_prefix) / escapeForFileName(backup_name);
auto store_path = backup_path / "store";
auto store_paths = {backup_path / "store", backup_path / "data"};

PartitionCommandsResultInfo result_info;

for (const auto & disk: disks)
{
if (!disk->exists(store_path))
continue;
for (auto prefix_it = disk->iterateDirectory(store_path); prefix_it->isValid(); prefix_it->next())
for (auto store_path: store_paths)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for loop added

{
auto prefix_directory = store_path / prefix_it->name();
for (auto table_it = disk->iterateDirectory(prefix_directory); table_it->isValid(); table_it->next())
if (!disk->exists(store_path))
continue;
for (auto prefix_it = disk->iterateDirectory(store_path); prefix_it->isValid(); prefix_it->next())
{
auto table_directory = prefix_directory / table_it->name();
auto current_result_info = unfreezePartitionsFromTableDirectory([] (const String &) { return true; }, backup_name, {disk}, table_directory, local_context);
for (auto & command_result : current_result_info)
auto prefix_directory = store_path / prefix_it->name();
for (auto table_it = disk->iterateDirectory(prefix_directory); table_it->isValid(); table_it->next())
{
command_result.command_type = "SYSTEM UNFREEZE";
auto table_directory = prefix_directory / table_it->name();
auto current_result_info = unfreezePartitionsFromTableDirectory(
[](const String &) { return true; }, backup_name, {disk}, table_directory, local_context);
for (auto & command_result : current_result_info)
{
command_result.command_type = "SYSTEM UNFREEZE";
}
result_info.insert(
result_info.end(),
std::make_move_iterator(current_result_info.begin()),
std::make_move_iterator(current_result_info.end()));
}
result_info.insert(
result_info.end(),
std::make_move_iterator(current_result_info.begin()),
std::make_move_iterator(current_result_info.end()));
}
}
if (disk->exists(backup_path))
{
/// After unfreezing we need to clear revision.txt file and empty directories
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not clear why we cannot just remove everything recursively. Why do we need complex "unfreeze" logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// After unfreezing we need to clear revision.txt file and empty directories.
/// revision.txt file shouldn't be unfreezed, it should just be deleted

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that clear?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I mean that FREEZE PARTITION only creates hardlinks. So UNFREEZE should only remove hardlinks. It's not clear why do we need some complex logic for "unfreezing", it was supposed to do just rm -rf. I understand that all this stuff is needed for "s3 zero copy replication", but I do not understand how "s3 zero copy replication" (and especially "unfreezing" with "s3 zero copy replication") work because of lack of comments. A comment that describes how all this stuff works is required.

disk->removeRecursive(backup_path);
}
}
Expand All @@ -143,11 +157,8 @@ bool Unfreezer::removeFreezedPart(DiskPtr disk, const String & path, const Strin
FreezeMetaData meta;
if (meta.load(disk, path))
{
if (meta.is_replicated)
{
FreezeMetaData::clean(disk, path);
return StorageReplicatedMergeTree::removeSharedDetachedPart(disk, path, part_name, meta.table_shared_id, meta.zookeeper_name, meta.replica_name, "", local_context);
}
FreezeMetaData::clean(disk, path);
return StorageReplicatedMergeTree::removeSharedDetachedPart(disk, path, part_name, meta.table_shared_id, meta.zookeeper_name, meta.replica_name, "", local_context);
}
}

Expand Down
4 changes: 1 addition & 3 deletions src/Storages/Freeze.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ struct FreezeMetaData
static String getFileName(const String & path);

public:
int version = 1;
bool is_replicated{false};
bool is_remote{false};
int version = 2;
String replica_name;
String zookeeper_name;
String table_shared_id;
Expand Down
6 changes: 3 additions & 3 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1890,7 +1890,7 @@ size_t MergeTreeData::clearOldBrokenPartsFromDetachedDirecory()

for (const auto & [old_name, new_name, disk] : renamed_parts.old_and_new_names)
{
removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name, false);
removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name);
LOG_DEBUG(log, "Removed broken detached part {} due to a timeout for broken detached parts", old_name);
}

Expand Down Expand Up @@ -4575,7 +4575,7 @@ void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, ContextPtr

for (auto & [old_name, new_name, disk] : renamed_parts.old_and_new_names)
{
bool keep_shared = removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name, false);
bool keep_shared = removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name);
LOG_DEBUG(log, "Dropped detached part {}, keep shared data: {}", old_name, keep_shared);
old_name.clear();
}
Expand Down Expand Up @@ -6172,7 +6172,7 @@ PartitionCommandsResultInfo MergeTreeData::unfreezeAll(
return unfreezePartitionsByMatcher([] (const String &) { return true; }, backup_name, local_context);
}

bool MergeTreeData::removeDetachedPart(DiskPtr disk, const String & path, const String &, bool)
bool MergeTreeData::removeDetachedPart(DiskPtr disk, const String & path, const String &)
{
disk->removeRecursive(path);

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,7 @@ class MergeTreeData : public IStorage, public WithMutableContext

/// Check shared data usage on other replicas for detached/freezed part
/// Remove local files and remote files if needed
virtual bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name, bool is_freezed);
virtual bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name);

virtual String getTableSharedID() const { return ""; }

Expand Down
19 changes: 3 additions & 16 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8185,25 +8185,12 @@ void StorageReplicatedMergeTree::createZeroCopyLockNode(
}
}

bool StorageReplicatedMergeTree::removeDetachedPart(DiskPtr disk, const String & path, const String & part_name, bool is_freezed)
bool StorageReplicatedMergeTree::removeDetachedPart(DiskPtr disk, const String & path, const String & part_name)
{
if (disk->supportZeroCopyReplication())
{
if (is_freezed)
{
FreezeMetaData meta;
if (meta.load(disk, path))
{
FreezeMetaData::clean(disk, path);
return removeSharedDetachedPart(disk, path, part_name, meta.table_shared_id, meta.zookeeper_name, meta.replica_name, "", getContext());
}
}
else
{
String table_id = getTableSharedID();

return removeSharedDetachedPart(disk, path, part_name, table_id, zookeeper_name, replica_name, zookeeper_path, getContext());
}
String table_id = getTableSharedID();
return removeSharedDetachedPart(disk, path, part_name, table_id, zookeeper_name, replica_name, zookeeper_path, getContext());
}

disk->removeRecursive(path);
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageReplicatedMergeTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData
int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false,
const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {});

bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name, bool is_freezed) override;
bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name) override;

/// Create freeze metadata for table and save in zookeeper. Required only if zero-copy replication enabled.
void createAndStoreFreezeMetadata(DiskPtr disk, DataPartPtr part, String backup_part_path) const override;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: no-replicated-database, no-parallel, no-ordinary-database
# Tags: no-replicated-database, no-parallel
# Tag no-replicated-database: Unsupported type of ALTER query

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
Expand Down