Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SYSTEM UNFREEZE for ordinary database #38262

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ BlockIO InterpreterSystemQuery::execute()
{
getContext()->checkAccess(AccessType::SYSTEM_UNFREEZE);
/// The result contains information about deleted parts as a table. It is for compatibility with ALTER TABLE UNFREEZE query.
result = Unfreezer().unfreeze(query.backup_name, getContext());
result = Unfreezer(getContext()).unfreeze(query.backup_name);
break;
}
default:
Expand Down
4 changes: 4 additions & 0 deletions src/Parsers/ASTSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
if (!filesystem_cache_path.empty())
settings.ostr << (settings.hilite ? hilite_none : "") << " " << filesystem_cache_path;
}
else if (type == Type::UNFREEZE)
{
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(backup_name);
}
}


Expand Down
99 changes: 60 additions & 39 deletions src/Storages/Freeze.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@

namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_PARTITION_VALUE;
}

void FreezeMetaData::fill(const StorageReplicatedMergeTree & storage)
{
is_replicated = storage.supportsReplication();
is_remote = storage.isRemote();
replica_name = storage.getReplicaName();
zookeeper_name = storage.getZooKeeperName();
table_shared_id = storage.getTableSharedID();
Expand All @@ -26,11 +29,16 @@ void FreezeMetaData::save(DiskPtr data_disk, const String & path) const

writeIntText(version, buffer);
buffer.write("\n", 1);
writeBoolText(is_replicated, buffer);
buffer.write("\n", 1);
writeBoolText(is_remote, buffer);
buffer.write("\n", 1);
writeString(replica_name, buffer);
if (version == 1) {
/// is_replicated and is_remote are not used
bool is_replicated = true;
writeBoolText(is_replicated, buffer);
buffer.write("\n", 1);
bool is_remote = true;
writeBoolText(is_remote, buffer);
buffer.write("\n", 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to completely remove these flags without changing version. It will break compatibility with 22.6.1, but I think it's not a problem, because 22.6 was just released recently and I doubt that someone has already started to use 22.6.1 with this freeze-metadata-stuff in production. And previous serialization format was invalid anyway (I mean issues with escaping).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is, flags were introduced before this PR. See history of removed code for details

}
writeString(escapeForFileName(replica_name), buffer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also check that other strings cannot contain unexpected chars

buffer.write("\n", 1);
writeString(zookeeper_name, buffer);
buffer.write("\n", 1);
Expand All @@ -51,17 +59,23 @@ bool FreezeMetaData::load(DiskPtr data_disk, const String & path)
auto metadata_str = metadata_storage->readFileToString(file_path);
ReadBufferFromString buffer(metadata_str);
readIntText(version, buffer);
if (version != 1)
if (version < 1 or version > 2)
tavplubix marked this conversation as resolved.
Show resolved Hide resolved
{
LOG_ERROR(&Poco::Logger::get("FreezeMetaData"), "Unknown freezed metadata version: {}", version);
return false;
}
DB::assertChar('\n', buffer);
readBoolText(is_replicated, buffer);
DB::assertChar('\n', buffer);
readBoolText(is_remote, buffer);
DB::assertChar('\n', buffer);
readString(replica_name, buffer);
if (version == 1) {
/// is_replicated and is_remote are not used
bool is_replicated;
readBoolText(is_replicated, buffer);
DB::assertChar('\n', buffer);
bool is_remote;
readBoolText(is_remote, buffer);
DB::assertChar('\n', buffer);
}
auto unescaped_replica_name = unescapeForFileName(replica_name);
readString(unescaped_replica_name, buffer);
DB::assertChar('\n', buffer);
readString(zookeeper_name, buffer);
DB::assertChar('\n', buffer);
Expand All @@ -87,43 +101,48 @@ String FreezeMetaData::getFileName(const String & path)
return fs::path(path) / "frozen_metadata.txt";
}

BlockIO Unfreezer::unfreeze(const String & backup_name, ContextPtr local_context)
BlockIO Unfreezer::unfreeze(const String & backup_name)
{
LOG_DEBUG(log, "Unfreezing backup {}", backup_name);
auto disks_map = local_context->getDisksMap();
LOG_DEBUG(log, "Unfreezing backup {}", escapeForFileName(backup_name));
auto disks_map = local_context_->getDisksMap();
Disks disks;
for (auto & [name, disk]: disks_map)
{
disks.push_back(disk);
}
auto backup_path = fs::path(backup_directory_prefix) / escapeForFileName(backup_name);
auto store_path = backup_path / "store";
auto store_paths = {backup_path / "store", backup_path / "data"};

PartitionCommandsResultInfo result_info;

for (const auto & disk: disks)
{
if (!disk->exists(store_path))
continue;
for (auto prefix_it = disk->iterateDirectory(store_path); prefix_it->isValid(); prefix_it->next())
for (auto store_path: store_paths)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for loop added

{
auto prefix_directory = store_path / prefix_it->name();
for (auto table_it = disk->iterateDirectory(prefix_directory); table_it->isValid(); table_it->next())
if (!disk->exists(store_path))
continue;
for (auto prefix_it = disk->iterateDirectory(store_path); prefix_it->isValid(); prefix_it->next())
{
auto table_directory = prefix_directory / table_it->name();
auto current_result_info = unfreezePartitionsFromTableDirectory([] (const String &) { return true; }, backup_name, {disk}, table_directory, local_context);
for (auto & command_result : current_result_info)
auto prefix_directory = store_path / prefix_it->name();
for (auto table_it = disk->iterateDirectory(prefix_directory); table_it->isValid(); table_it->next())
{
command_result.command_type = "SYSTEM UNFREEZE";
auto table_directory = prefix_directory / table_it->name();
auto current_result_info = unfreezePartitionsFromTableDirectory(
[](const String &) { return true; }, backup_name, {disk}, table_directory, std::nullopt);
for (auto & command_result : current_result_info)
{
command_result.command_type = "SYSTEM UNFREEZE";
}
result_info.insert(
result_info.end(),
std::make_move_iterator(current_result_info.begin()),
std::make_move_iterator(current_result_info.end()));
}
result_info.insert(
result_info.end(),
std::make_move_iterator(current_result_info.begin()),
std::make_move_iterator(current_result_info.end()));
}
}
if (disk->exists(backup_path))
{
/// After unfreezing we need to clear revision.txt file and empty directories
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not clear why we cannot just remove everything recursively. Why do we need complex "unfreeze" logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// After unfreezing we need to clear revision.txt file and empty directories.
/// revision.txt file shouldn't be unfreezed, it should just be deleted

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that clear?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I mean that FREEZE PARTITION only creates hardlinks. So UNFREEZE should only remove hardlinks. It's not clear why do we need some complex logic for "unfreezing", it was supposed to do just rm -rf. I understand that all this stuff is needed for "s3 zero copy replication", but I do not understand how "s3 zero copy replication" (and especially "unfreezing" with "s3 zero copy replication") work because of lack of comments. A comment that describes how all this stuff works is required.

disk->removeRecursive(backup_path);
}
}
Expand All @@ -136,18 +155,15 @@ BlockIO Unfreezer::unfreeze(const String & backup_name, ContextPtr local_context
return result;
}

bool Unfreezer::removeFreezedPart(DiskPtr disk, const String & path, const String & part_name, ContextPtr local_context)
bool Unfreezer::removeFreezedPart(DiskPtr disk, const String & path, const String & part_name, ContextPtr local_context, zkutil::ZooKeeperPtr zookeeper)
{
if (disk->supportZeroCopyReplication())
{
FreezeMetaData meta;
if (meta.load(disk, path))
{
if (meta.is_replicated)
{
FreezeMetaData::clean(disk, path);
return StorageReplicatedMergeTree::removeSharedDetachedPart(disk, path, part_name, meta.table_shared_id, meta.zookeeper_name, meta.replica_name, "", local_context);
}
FreezeMetaData::clean(disk, path);
return StorageReplicatedMergeTree::removeSharedDetachedPart(disk, path, part_name, meta.table_shared_id, meta.zookeeper_name, meta.replica_name, "", local_context, zookeeper);
}
}

Expand All @@ -156,7 +172,7 @@ bool Unfreezer::removeFreezedPart(DiskPtr disk, const String & path, const Strin
return false;
}

PartitionCommandsResultInfo Unfreezer::unfreezePartitionsFromTableDirectory(MergeTreeData::MatcherFn matcher, const String & backup_name, const Disks & disks, const fs::path & table_directory, ContextPtr local_context)
PartitionCommandsResultInfo Unfreezer::unfreezePartitionsFromTableDirectory(MergeTreeData::MatcherFn matcher, const String & backup_name, const Disks & disks, const fs::path & table_directory, std::optional<MergeTreeDataFormatVersion> format_version)
{
PartitionCommandsResultInfo result;

Expand All @@ -168,8 +184,13 @@ PartitionCommandsResultInfo Unfreezer::unfreezePartitionsFromTableDirectory(Merg
for (auto it = disk->iterateDirectory(table_directory); it->isValid(); it->next())
{
const auto & partition_directory = it->name();

int count_underscores = std::count_if(partition_directory.begin(), partition_directory.end(), []( char c ){return c =='_';});
if ((format_version.has_value() && format_version == 0) || count_underscores == 4) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Number of underscores can be 4 with new syntax as well (if mutation version is not 0)

Copy link
Member

@tavplubix tavplubix Jul 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also we don't need to parse partition_id at all if matcher always returns true (when it called from Unfreezer::unfreeze or similar place). Currently it can parse some garbage, but simply does not throw exception in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand. Parsing partition_id is needed in order to output it. If I remove check with underscores,

  • ALTER TABLE UNFREEZE will delete data only in case if format_version is more than zero.
  • SYSTEM UNFREEZE will delete any data despite of format_version

So not checking this leads to different behaviour of these queries. Am I right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed that SYSTEM UNFREEZE prints partition ids in query result, I though ids are needed only for filtering. So currently it prints invalid ids if table was created with old syntax. It's not clear how to fix it, I see the following options:

  • Do not return list of partitions for SYSTEM UNFREEZE
  • Autodetect format_version somehow for SYSTEM UNFREEZE (not sure if it's possible, but maybe checking some files inside frozen data part can help)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think not returning list is easier and for sure possible. But still what about deleting data? Different data will be deleted for alter and system queries if not doing autodetect

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because if format_version is 0, there will be an exception and since then the partition will not be removed with ALTER query. But as soon as there is no exception when called from Unfreezer::unfreeze, the partition will be removed with SYSTEM query

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok, SYSTEM UNFREEZE will simply remove everything, because it's impossible to distinguish partitions in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, it seems like throwing an exception may break some user queries since this logic was introduced for alter unfreeze. May it? Is it critical?

if (!matcher(partition_id))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather mark it as TODO and / or create a new issue about this. If I touch this code, the only solution that doesn't break any query I see is to autodetect format version. Otherwise, it may break query and behave differently for alter and system queries. I would create a separate issue for autodetection as soon as it is not about just SYSTEM UNFREEZE feature. It is about fixing small bug for both ALTER UNFREEZE and SYSTEM UNFREEZE which may introduce bigger bug

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw Exception(ErrorCodes::INVALID_PARTITION_VALUE, "Can not complete unfreeze query because part directory name is obsolete: " + partition_directory);
tavplubix marked this conversation as resolved.
Show resolved Hide resolved
}

/// Partition ID is prefix of part directory name: <partition id>_<rest of part directory name>
/// For format_version == 1, partition ID is prefix of part directory name: <partition id>_<rest of part directory name>
auto found = partition_directory.find('_');
if (found == std::string::npos)
continue;
Expand All @@ -180,7 +201,7 @@ PartitionCommandsResultInfo Unfreezer::unfreezePartitionsFromTableDirectory(Merg

const auto & path = it->path();

bool keep_shared = removeFreezedPart(disk, path, partition_directory, local_context);
bool keep_shared = removeFreezedPart(disk, path, partition_directory, local_context_, zookeeper_);

result.push_back(PartitionCommandResultInfo{
.partition_id = partition_id,
Expand Down
13 changes: 7 additions & 6 deletions src/Storages/Freeze.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ struct FreezeMetaData
static String getFileName(const String & path);

public:
int version = 1;
bool is_replicated{false};
bool is_remote{false};
int version = 2;
String replica_name;
String zookeeper_name;
String table_shared_id;
Expand All @@ -34,12 +32,15 @@ struct FreezeMetaData
class Unfreezer
{
public:
PartitionCommandsResultInfo unfreezePartitionsFromTableDirectory(MergeTreeData::MatcherFn matcher, const String & backup_name, const Disks & disks, const fs::path & table_directory, ContextPtr local_context);
BlockIO unfreeze(const String & backup_name, ContextPtr local_context);
Unfreezer(ContextPtr local_context) : local_context_(local_context) { zookeeper_ = local_context->getZooKeeper(); }
PartitionCommandsResultInfo unfreezePartitionsFromTableDirectory(MergeTreeData::MatcherFn matcher, const String & backup_name, const Disks & disks, const fs::path & table_directory, std::optional<MergeTreeDataFormatVersion> format_version);
BlockIO unfreeze(const String & backup_name);
private:
ContextPtr local_context_;
zkutil::ZooKeeperPtr zookeeper_;
tavplubix marked this conversation as resolved.
Show resolved Hide resolved
Poco::Logger * log = &Poco::Logger::get("Unfreezer");
static constexpr std::string_view backup_directory_prefix = "shadow";
static bool removeFreezedPart(DiskPtr disk, const String & path, const String & part_name, ContextPtr local_context);
static bool removeFreezedPart(DiskPtr disk, const String & path, const String & part_name, ContextPtr local_context, zkutil::ZooKeeperPtr zookeeper);
};

}
8 changes: 4 additions & 4 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1890,7 +1890,7 @@ size_t MergeTreeData::clearOldBrokenPartsFromDetachedDirecory()

for (const auto & [old_name, new_name, disk] : renamed_parts.old_and_new_names)
{
removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name, false);
removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name);
LOG_DEBUG(log, "Removed broken detached part {} due to a timeout for broken detached parts", old_name);
}

Expand Down Expand Up @@ -4575,7 +4575,7 @@ void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, ContextPtr

for (auto & [old_name, new_name, disk] : renamed_parts.old_and_new_names)
{
bool keep_shared = removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name, false);
bool keep_shared = removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name);
LOG_DEBUG(log, "Dropped detached part {}, keep shared data: {}", old_name, keep_shared);
old_name.clear();
}
Expand Down Expand Up @@ -6172,7 +6172,7 @@ PartitionCommandsResultInfo MergeTreeData::unfreezeAll(
return unfreezePartitionsByMatcher([] (const String &) { return true; }, backup_name, local_context);
}

bool MergeTreeData::removeDetachedPart(DiskPtr disk, const String & path, const String &, bool)
bool MergeTreeData::removeDetachedPart(DiskPtr disk, const String & path, const String &)
{
disk->removeRecursive(path);

Expand All @@ -6187,7 +6187,7 @@ PartitionCommandsResultInfo MergeTreeData::unfreezePartitionsByMatcher(MatcherFn

auto disks = getStoragePolicy()->getDisks();

return Unfreezer().unfreezePartitionsFromTableDirectory(matcher, backup_name, disks, backup_path, local_context);
return Unfreezer(local_context).unfreezePartitionsFromTableDirectory(matcher, backup_name, disks, backup_path, format_version);
}

bool MergeTreeData::canReplacePartition(const DataPartPtr & src_part) const
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,7 @@ class MergeTreeData : public IStorage, public WithMutableContext

/// Check shared data usage on other replicas for detached/freezed part
/// Remove local files and remote files if needed
virtual bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name, bool is_freezed);
virtual bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name);

virtual String getTableSharedID() const { return ""; }

Expand Down
22 changes: 4 additions & 18 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8185,25 +8185,12 @@ void StorageReplicatedMergeTree::createZeroCopyLockNode(
}
}

bool StorageReplicatedMergeTree::removeDetachedPart(DiskPtr disk, const String & path, const String & part_name, bool is_freezed)
bool StorageReplicatedMergeTree::removeDetachedPart(DiskPtr disk, const String & path, const String & part_name)
{
if (disk->supportZeroCopyReplication())
{
if (is_freezed)
{
FreezeMetaData meta;
if (meta.load(disk, path))
{
FreezeMetaData::clean(disk, path);
return removeSharedDetachedPart(disk, path, part_name, meta.table_shared_id, meta.zookeeper_name, meta.replica_name, "", getContext());
}
}
else
{
String table_id = getTableSharedID();

return removeSharedDetachedPart(disk, path, part_name, table_id, zookeeper_name, replica_name, zookeeper_path, getContext());
}
String table_id = getTableSharedID();
return removeSharedDetachedPart(disk, path, part_name, table_id, zookeeper_name, replica_name, zookeeper_path, getContext(), current_zookeeper);
}

disk->removeRecursive(path);
Expand All @@ -8213,11 +8200,10 @@ bool StorageReplicatedMergeTree::removeDetachedPart(DiskPtr disk, const String &


bool StorageReplicatedMergeTree::removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name, const String & table_uuid,
const String &, const String & detached_replica_name, const String & detached_zookeeper_path, ContextPtr local_context)
const String &, const String & detached_replica_name, const String & detached_zookeeper_path, ContextPtr local_context, const zkutil::ZooKeeperPtr & zookeeper)
{
bool keep_shared = false;

zkutil::ZooKeeperPtr zookeeper = local_context->getZooKeeper();
NameSet files_not_to_remove;

fs::path checksums = fs::path(path) / IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK;
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/StorageReplicatedMergeTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData
void checkBrokenDisks();

static bool removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name, const String & table_uuid,
const String & zookeeper_name, const String & replica_name, const String & zookeeper_path, ContextPtr local_context);
const String & zookeeper_name, const String & replica_name, const String & zookeeper_path, ContextPtr local_context, const zkutil::ZooKeeperPtr & zookeeper);

private:
std::atomic_bool are_restoring_replica {false};
Expand Down Expand Up @@ -813,7 +813,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData
int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false,
const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {});

bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name, bool is_freezed) override;
bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name) override;

/// Create freeze metadata for table and save in zookeeper. Required only if zero-copy replication enabled.
void createAndStoreFreezeMetadata(DiskPtr disk, DataPartPtr part, String backup_part_path) const override;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: no-replicated-database, no-parallel, no-ordinary-database
# Tags: no-replicated-database, no-parallel
# Tag no-replicated-database: Unsupported type of ALTER query

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
Expand Down