Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SYSTEM UNFREEZE for ordinary database #38262

Merged
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ BlockIO InterpreterSystemQuery::execute()
{
getContext()->checkAccess(AccessType::SYSTEM_UNFREEZE);
/// The result contains information about deleted parts as a table. It is for compatibility with ALTER TABLE UNFREEZE query.
result = Unfreezer().unfreeze(query.backup_name, getContext());
result = Unfreezer(getContext()).systemUnfreeze(query.backup_name);
break;
}
default:
Expand Down
4 changes: 4 additions & 0 deletions src/Parsers/ASTSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
if (!filesystem_cache_path.empty())
settings.ostr << (settings.hilite ? hilite_none : "") << " " << filesystem_cache_path;
}
else if (type == Type::UNFREEZE)
{
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(backup_name);
}
}


Expand Down
123 changes: 85 additions & 38 deletions src/Storages/Freeze.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,29 @@
#include <Common/escapeForFileName.h>
#include <Common/logger_useful.h>

/**
* When ClickHouse has frozen data on remote storage it required 'smart' data removing during UNFREEZE.
* For remote storage actually frozen not remote data but local metadata with referrers on remote data.
* So remote data can be referred from working and frozen data sets (or two frozen) at same time.
* In this case during UNFREEZE ClickHouse should remove only local metadata and keep remote data.
* But when data was already removed from working data set ClickHouse should remove remote data too.
* To detect is current data used or not in some other place ClickHouse uses
* - ref_count from metadata to check if data used in some other metadata on the same replica;
* - Keeper record to check if data used on other replica.
* StorageReplicatedMergeTree::removeSharedDetachedPart makes required checks, so here this method
* called for each frozen part.
*/

namespace DB
{

namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
}

void FreezeMetaData::fill(const StorageReplicatedMergeTree & storage)
{
is_replicated = storage.supportsReplication();
is_remote = storage.isRemote();
replica_name = storage.getReplicaName();
zookeeper_name = storage.getZooKeeperName();
table_shared_id = storage.getTableSharedID();
Expand All @@ -26,11 +43,17 @@ void FreezeMetaData::save(DiskPtr data_disk, const String & path) const

writeIntText(version, buffer);
buffer.write("\n", 1);
writeBoolText(is_replicated, buffer);
buffer.write("\n", 1);
writeBoolText(is_remote, buffer);
buffer.write("\n", 1);
writeString(replica_name, buffer);
if (version == 1)
{
/// is_replicated and is_remote are not used
bool is_replicated = true;
writeBoolText(is_replicated, buffer);
buffer.write("\n", 1);
bool is_remote = true;
writeBoolText(is_remote, buffer);
buffer.write("\n", 1);
}
writeString(escapeForFileName(replica_name), buffer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also check that other strings cannot contain unexpected chars

buffer.write("\n", 1);
writeString(zookeeper_name, buffer);
buffer.write("\n", 1);
Expand All @@ -51,17 +74,25 @@ bool FreezeMetaData::load(DiskPtr data_disk, const String & path)
auto metadata_str = metadata_storage->readFileToString(file_path);
ReadBufferFromString buffer(metadata_str);
readIntText(version, buffer);
if (version != 1)
if (version < 1 or version > 2)
tavplubix marked this conversation as resolved.
Show resolved Hide resolved
{
LOG_ERROR(&Poco::Logger::get("FreezeMetaData"), "Unknown freezed metadata version: {}", version);
LOG_ERROR(&Poco::Logger::get("FreezeMetaData"), "Unknown frozen metadata version: {}", version);
return false;
}
DB::assertChar('\n', buffer);
readBoolText(is_replicated, buffer);
DB::assertChar('\n', buffer);
readBoolText(is_remote, buffer);
DB::assertChar('\n', buffer);
readString(replica_name, buffer);
if (version == 1)
{
/// is_replicated and is_remote are not used
bool is_replicated;
readBoolText(is_replicated, buffer);
DB::assertChar('\n', buffer);
bool is_remote;
readBoolText(is_remote, buffer);
DB::assertChar('\n', buffer);
}
std::string unescaped_replica_name;
readString(unescaped_replica_name, buffer);
replica_name = unescapeForFileName(unescaped_replica_name);
DB::assertChar('\n', buffer);
readString(zookeeper_name, buffer);
DB::assertChar('\n', buffer);
Expand All @@ -87,43 +118,62 @@ String FreezeMetaData::getFileName(const String & path)
return fs::path(path) / "frozen_metadata.txt";
}

BlockIO Unfreezer::unfreeze(const String & backup_name, ContextPtr local_context)
Unfreezer::Unfreezer(ContextPtr context) : local_context(context)
{
if (local_context->hasZooKeeper())
zookeeper = local_context->getZooKeeper();
}

BlockIO Unfreezer::systemUnfreeze(const String & backup_name)
{
LOG_DEBUG(log, "Unfreezing backup {}", backup_name);
LOG_DEBUG(log, "Unfreezing backup {}", escapeForFileName(backup_name));

const auto & config = local_context->getConfigRef();
static constexpr auto config_key = "enable_system_unfreeze";
if (!config.getBool(config_key, false))
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Support for SYSTEM UNFREEZE query is disabled. You can enable it via '{}' server setting", config_key);
}

auto disks_map = local_context->getDisksMap();
Disks disks;
for (auto & [name, disk]: disks_map)
{
disks.push_back(disk);
}
auto backup_path = fs::path(backup_directory_prefix) / escapeForFileName(backup_name);
auto store_path = backup_path / "store";
auto store_paths = {backup_path / "store", backup_path / "data"};

PartitionCommandsResultInfo result_info;

for (const auto & disk: disks)
{
if (!disk->exists(store_path))
continue;
for (auto prefix_it = disk->iterateDirectory(store_path); prefix_it->isValid(); prefix_it->next())
for (const auto& store_path: store_paths)
{
auto prefix_directory = store_path / prefix_it->name();
for (auto table_it = disk->iterateDirectory(prefix_directory); table_it->isValid(); table_it->next())
if (!disk->exists(store_path))
continue;
for (auto prefix_it = disk->iterateDirectory(store_path); prefix_it->isValid(); prefix_it->next())
{
auto table_directory = prefix_directory / table_it->name();
auto current_result_info = unfreezePartitionsFromTableDirectory([] (const String &) { return true; }, backup_name, {disk}, table_directory, local_context);
for (auto & command_result : current_result_info)
auto prefix_directory = store_path / prefix_it->name();
for (auto table_it = disk->iterateDirectory(prefix_directory); table_it->isValid(); table_it->next())
{
command_result.command_type = "SYSTEM UNFREEZE";
auto table_directory = prefix_directory / table_it->name();
auto current_result_info = unfreezePartitionsFromTableDirectory(
[](const String &) { return true; }, backup_name, {disk}, table_directory);
for (auto & command_result : current_result_info)
{
command_result.command_type = "SYSTEM UNFREEZE";
}
result_info.insert(
result_info.end(),
std::make_move_iterator(current_result_info.begin()),
std::make_move_iterator(current_result_info.end()));
}
result_info.insert(
result_info.end(),
std::make_move_iterator(current_result_info.begin()),
std::make_move_iterator(current_result_info.end()));
}
}
if (disk->exists(backup_path))
{
/// After unfreezing we need to clear revision.txt file and empty directories
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not clear why we cannot just remove everything recursively. Why do we need complex "unfreeze" logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// After unfreezing we need to clear revision.txt file and empty directories.
/// revision.txt file shouldn't be unfreezed, it should just be deleted

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that clear?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I mean that FREEZE PARTITION only creates hardlinks. So UNFREEZE should only remove hardlinks. It's not clear why do we need some complex logic for "unfreezing", it was supposed to do just rm -rf. I understand that all this stuff is needed for "s3 zero copy replication", but I do not understand how "s3 zero copy replication" (and especially "unfreezing" with "s3 zero copy replication") work because of lack of comments. A comment that describes how all this stuff works is required.

disk->removeRecursive(backup_path);
}
}
Expand All @@ -136,18 +186,15 @@ BlockIO Unfreezer::unfreeze(const String & backup_name, ContextPtr local_context
return result;
}

bool Unfreezer::removeFreezedPart(DiskPtr disk, const String & path, const String & part_name, ContextPtr local_context)
bool Unfreezer::removeFreezedPart(DiskPtr disk, const String & path, const String & part_name, ContextPtr local_context, zkutil::ZooKeeperPtr zookeeper)
{
if (disk->supportZeroCopyReplication())
{
FreezeMetaData meta;
if (meta.load(disk, path))
{
if (meta.is_replicated)
{
FreezeMetaData::clean(disk, path);
return StorageReplicatedMergeTree::removeSharedDetachedPart(disk, path, part_name, meta.table_shared_id, meta.zookeeper_name, meta.replica_name, "", local_context);
}
FreezeMetaData::clean(disk, path);
return StorageReplicatedMergeTree::removeSharedDetachedPart(disk, path, part_name, meta.table_shared_id, meta.zookeeper_name, meta.replica_name, "", local_context, zookeeper);
}
}

Expand All @@ -156,7 +203,7 @@ bool Unfreezer::removeFreezedPart(DiskPtr disk, const String & path, const Strin
return false;
}

PartitionCommandsResultInfo Unfreezer::unfreezePartitionsFromTableDirectory(MergeTreeData::MatcherFn matcher, const String & backup_name, const Disks & disks, const fs::path & table_directory, ContextPtr local_context)
PartitionCommandsResultInfo Unfreezer::unfreezePartitionsFromTableDirectory(MergeTreeData::MatcherFn matcher, const String & backup_name, const Disks & disks, const fs::path & table_directory)
{
PartitionCommandsResultInfo result;

Expand All @@ -180,7 +227,7 @@ PartitionCommandsResultInfo Unfreezer::unfreezePartitionsFromTableDirectory(Merg

const auto & path = it->path();

bool keep_shared = removeFreezedPart(disk, path, partition_directory, local_context);
bool keep_shared = removeFreezedPart(disk, path, partition_directory, local_context, zookeeper);

result.push_back(PartitionCommandResultInfo{
.partition_id = partition_id,
Expand Down
13 changes: 7 additions & 6 deletions src/Storages/Freeze.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ struct FreezeMetaData
static String getFileName(const String & path);

public:
int version = 1;
bool is_replicated{false};
bool is_remote{false};
int version = 2;
String replica_name;
String zookeeper_name;
String table_shared_id;
Expand All @@ -34,12 +32,15 @@ struct FreezeMetaData
class Unfreezer
{
public:
PartitionCommandsResultInfo unfreezePartitionsFromTableDirectory(MergeTreeData::MatcherFn matcher, const String & backup_name, const Disks & disks, const fs::path & table_directory, ContextPtr local_context);
BlockIO unfreeze(const String & backup_name, ContextPtr local_context);
Unfreezer(ContextPtr context);
PartitionCommandsResultInfo unfreezePartitionsFromTableDirectory(MergeTreeData::MatcherFn matcher, const String & backup_name, const Disks & disks, const fs::path & table_directory);
BlockIO systemUnfreeze(const String & backup_name);
private:
ContextPtr local_context;
zkutil::ZooKeeperPtr zookeeper;
Poco::Logger * log = &Poco::Logger::get("Unfreezer");
static constexpr std::string_view backup_directory_prefix = "shadow";
static bool removeFreezedPart(DiskPtr disk, const String & path, const String & part_name, ContextPtr local_context);
static bool removeFreezedPart(DiskPtr disk, const String & path, const String & part_name, ContextPtr local_context, zkutil::ZooKeeperPtr zookeeper);
};

}
8 changes: 4 additions & 4 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1989,7 +1989,7 @@ size_t MergeTreeData::clearOldBrokenPartsFromDetachedDirecory()

for (const auto & [old_name, new_name, disk] : renamed_parts.old_and_new_names)
{
removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name, false);
removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name);
LOG_DEBUG(log, "Removed broken detached part {} due to a timeout for broken detached parts", old_name);
}

Expand Down Expand Up @@ -4676,7 +4676,7 @@ void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, ContextPtr

for (auto & [old_name, new_name, disk] : renamed_parts.old_and_new_names)
{
bool keep_shared = removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name, false);
bool keep_shared = removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name);
LOG_DEBUG(log, "Dropped detached part {}, keep shared data: {}", old_name, keep_shared);
old_name.clear();
}
Expand Down Expand Up @@ -6334,7 +6334,7 @@ PartitionCommandsResultInfo MergeTreeData::unfreezeAll(
return unfreezePartitionsByMatcher([] (const String &) { return true; }, backup_name, local_context);
}

bool MergeTreeData::removeDetachedPart(DiskPtr disk, const String & path, const String &, bool)
bool MergeTreeData::removeDetachedPart(DiskPtr disk, const String & path, const String &)
{
disk->removeRecursive(path);

Expand All @@ -6349,7 +6349,7 @@ PartitionCommandsResultInfo MergeTreeData::unfreezePartitionsByMatcher(MatcherFn

auto disks = getStoragePolicy()->getDisks();

return Unfreezer().unfreezePartitionsFromTableDirectory(matcher, backup_name, disks, backup_path, local_context);
return Unfreezer(local_context).unfreezePartitionsFromTableDirectory(matcher, backup_name, disks, backup_path);
}

bool MergeTreeData::canReplacePartition(const DataPartPtr & src_part) const
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,7 @@ class MergeTreeData : public IStorage, public WithMutableContext

/// Check shared data usage on other replicas for detached/freezed part
/// Remove local files and remote files if needed
virtual bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name, bool is_freezed);
virtual bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name);

virtual String getTableSharedID() const { return ""; }

Expand Down
22 changes: 4 additions & 18 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8164,25 +8164,12 @@ void StorageReplicatedMergeTree::createZeroCopyLockNode(
}
}

bool StorageReplicatedMergeTree::removeDetachedPart(DiskPtr disk, const String & path, const String & part_name, bool is_freezed)
bool StorageReplicatedMergeTree::removeDetachedPart(DiskPtr disk, const String & path, const String & part_name)
{
if (disk->supportZeroCopyReplication())
{
if (is_freezed)
{
FreezeMetaData meta;
if (meta.load(disk, path))
{
FreezeMetaData::clean(disk, path);
return removeSharedDetachedPart(disk, path, part_name, meta.table_shared_id, meta.zookeeper_name, meta.replica_name, "", getContext());
}
}
else
{
String table_id = getTableSharedID();

return removeSharedDetachedPart(disk, path, part_name, table_id, zookeeper_name, replica_name, zookeeper_path, getContext());
}
String table_id = getTableSharedID();
return removeSharedDetachedPart(disk, path, part_name, table_id, zookeeper_name, replica_name, zookeeper_path, getContext(), current_zookeeper);
}

disk->removeRecursive(path);
Expand All @@ -8192,11 +8179,10 @@ bool StorageReplicatedMergeTree::removeDetachedPart(DiskPtr disk, const String &


bool StorageReplicatedMergeTree::removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name, const String & table_uuid,
const String &, const String & detached_replica_name, const String & detached_zookeeper_path, ContextPtr local_context)
const String &, const String & detached_replica_name, const String & detached_zookeeper_path, ContextPtr local_context, const zkutil::ZooKeeperPtr & zookeeper)
{
bool keep_shared = false;

zkutil::ZooKeeperPtr zookeeper = local_context->getZooKeeper();
NameSet files_not_to_remove;

fs::path checksums = fs::path(path) / IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK;
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/StorageReplicatedMergeTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData
void checkBrokenDisks();

static bool removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name, const String & table_uuid,
const String & zookeeper_name, const String & replica_name, const String & zookeeper_path, ContextPtr local_context);
const String & zookeeper_name, const String & replica_name, const String & zookeeper_path, ContextPtr local_context, const zkutil::ZooKeeperPtr & zookeeper);

private:
std::atomic_bool are_restoring_replica {false};
Expand Down Expand Up @@ -827,7 +827,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData
int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false,
const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {});

bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name, bool is_freezed) override;
bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name) override;

/// Create freeze metadata for table and save in zookeeper. Required only if zero-copy replication enabled.
void createAndStoreFreezeMetadata(DiskPtr disk, DataPartPtr part, String backup_part_path) const override;
Expand Down
4 changes: 4 additions & 0 deletions tests/config/config.d/system_unfreeze.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
<?xml version="1.0"?>
<clickhouse>
<enable_system_unfreeze>true</enable_system_unfreeze>
</clickhouse>
1 change: 1 addition & 0 deletions tests/config/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ ln -sf $SRC_PATH/config.d/named_collection.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/ssl_certs.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/filesystem_cache_log.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/session_log.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/system_unfreeze.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/enable_zero_copy_replication.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/nlp.xml $DEST_SERVER_PATH/config.d/

Expand Down
17 changes: 1 addition & 16 deletions tests/integration/test_merge_tree_s3/configs/config.xml
Original file line number Diff line number Diff line change
@@ -1,19 +1,4 @@
<?xml version="1.0"?>
<clickhouse>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>

<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>

<max_concurrent_queries>500</max_concurrent_queries>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
<enable_system_unfreeze>true</enable_system_unfreeze>
</clickhouse>