Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change ZooKeeper path for zero-copy locks for shared data #32061

Merged
merged 24 commits into from Jan 8, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
827fa51
Add test for freeze/unfreeze with S3 zero-copy
ianton-ru Nov 18, 2021
0f9038e
Zero-copy: move shared mark outside table node in ZooKeeper
ianton-ru Nov 23, 2021
d409ab0
Fix wait for freeze in tests
ianton-ru Nov 29, 2021
80ab73c
Fix Zero-Copy replication lost locks, fix remove used remote data in …
ianton-ru Dec 1, 2021
c8fe1dc
Add tests for new zero-copy schema
ianton-ru Dec 1, 2021
e0a16a4
Merge master
ianton-ru Dec 1, 2021
0e685c1
Fix types
ianton-ru Dec 1, 2021
98bae1b
Fix tests
ianton-ru Dec 2, 2021
f0b9a43
Use table UUID in zero-copy shared label in ZooKeeper
ianton-ru Dec 17, 2021
c724b07
Remove zero-copy version converter
ianton-ru Dec 20, 2021
f390111
Add zero-copy version converted script
ianton-ru Dec 20, 2021
0c0bf66
Merge master
ianton-ru Dec 21, 2021
33cbfc8
Move logic for replicated part to StorageReplicatedMergeTree class
ianton-ru Dec 21, 2021
e6fd4bf
Merge branch 'master' into MDB-15474
ianton-ru Dec 21, 2021
e88b97d
Fix typos
ianton-ru Dec 21, 2021
0465aef
Fixes by code review responces
ianton-ru Dec 27, 2021
2d87f0a
Fix debug build
ianton-ru Dec 28, 2021
7a3c874
Merge branch 'master' into ianton-ru-MDB-15474
alesapin Dec 29, 2021
8b331cd
Remove method from IStorage
alesapin Dec 29, 2021
cbdba89
Merge branch 'master' into ianton-ru-MDB-15474
alesapin Dec 30, 2021
92cb451
Merge branch 'master' into MDB-15474
ianton-ru Dec 30, 2021
91e1ac4
Tiny improvements
alesapin Dec 30, 2021
b426cc4
Merge remote-tracking branch 'ianton-ru/MDB-15474' into ianton-ru-MDB…
alesapin Dec 30, 2021
6fcd5a7
Merge branch 'master' into MDB-15474
mergify[bot] Jan 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/Disks/DiskDecorator.h
Expand Up @@ -70,6 +70,20 @@ class DiskDecorator : public IDisk
void startup() override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) override;

std::unique_ptr<ReadBufferFromFileBase> readMetaFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> size) const override { return delegate->readMetaFile(path, settings, size); }

std::unique_ptr<WriteBufferFromFileBase> writeMetaFile(
const String & path,
size_t buf_size,
WriteMode mode) override { return delegate->writeMetaFile(path, buf_size, mode); }

void removeMetaFileIfExists(const String & path) override { delegate->removeMetaFileIfExists(path); }

UInt32 getRefCount(const String & path) const override { return delegate->getRefCount(path); }

protected:
Executor & getExecutor() override;

Expand Down
24 changes: 24 additions & 0 deletions src/Disks/IDisk.cpp
Expand Up @@ -86,4 +86,28 @@ SyncGuardPtr IDisk::getDirectorySyncGuard(const String & /* path */) const
return nullptr;
}

std::unique_ptr<ReadBufferFromFileBase> IDisk::readMetaFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> size) const
{
LOG_TRACE(&Poco::Logger::get("IDisk"), "Read local metafile: {}", path);
return readFile(path, settings, size);
}

std::unique_ptr<WriteBufferFromFileBase> IDisk::writeMetaFile(
const String & path,
size_t buf_size,
WriteMode mode)
{
LOG_TRACE(&Poco::Logger::get("IDisk"), "Write local metafile: {}", path);
return writeFile(path, buf_size, mode);
}

void IDisk::removeMetaFileIfExists(const String & path)
{
LOG_TRACE(&Poco::Logger::get("IDisk"), "Remove local metafile: {}", path);
removeFileIfExists(path);
}

}
22 changes: 22 additions & 0 deletions src/Disks/IDisk.h
Expand Up @@ -247,6 +247,28 @@ class IDisk : public Space
/// Applies new settings for disk in runtime.
virtual void applyNewSettings(const Poco::Util::AbstractConfiguration &, ContextPtr, const String &, const DisksMap &) {}

/// Open the local file for read and return ReadBufferFromFileBase object.
/// Overridden in IDiskRemote.
/// Used for work with custom metadata.
virtual std::unique_ptr<ReadBufferFromFileBase> readMetaFile(
const String & path,
const ReadSettings & settings = ReadSettings{},
std::optional<size_t> size = {}) const;

/// Open the local file for write and return WriteBufferFromFileBase object.
/// Overridden in IDiskRemote.
/// Used for work with custom metadata.
virtual std::unique_ptr<WriteBufferFromFileBase> writeMetaFile(
const String & path,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
WriteMode mode = WriteMode::Rewrite);

virtual void removeMetaFileIfExists(const String & path);

/// Return reference count for remote FS.
/// Overridden in IDiskRemote.
virtual UInt32 getRefCount(const String &) const { return 0; }

protected:
friend class DiskDecorator;

Expand Down
31 changes: 31 additions & 0 deletions src/Disks/IDiskRemote.cpp
Expand Up @@ -484,6 +484,7 @@ bool IDiskRemote::tryReserve(UInt64 bytes)

String IDiskRemote::getUniqueId(const String & path) const
{
LOG_TRACE(log, "Remote path: {}, Path: {}", remote_fs_root_path, path);
Metadata metadata(remote_fs_root_path, metadata_disk, path);
String id;
if (!metadata.remote_fs_objects.empty())
Expand All @@ -500,4 +501,34 @@ AsynchronousReaderPtr IDiskRemote::getThreadPoolReader()
return reader;
}

std::unique_ptr<ReadBufferFromFileBase> IDiskRemote::readMetaFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> size) const
{
LOG_TRACE(log, "Read metafile: {}", path);
return metadata_disk->readFile(path, settings, size);
}

std::unique_ptr<WriteBufferFromFileBase> IDiskRemote::writeMetaFile(
const String & path,
size_t buf_size,
WriteMode mode)
{
LOG_TRACE(log, "Write metafile: {}", path);
return metadata_disk->writeFile(path, buf_size, mode);
}

void IDiskRemote::removeMetaFileIfExists(const String & path)
{
LOG_TRACE(log, "Remove metafile: {}", path);
return metadata_disk->removeFileIfExists(path);
}

UInt32 IDiskRemote::getRefCount(const String & path) const
{
auto meta = readMeta(path);
return meta.ref_count;
}

}
15 changes: 15 additions & 0 deletions src/Disks/IDiskRemote.h
Expand Up @@ -136,6 +136,21 @@ friend class DiskRemoteReservation;

static AsynchronousReaderPtr getThreadPoolReader();

virtual std::unique_ptr<ReadBufferFromFileBase> readMetaFile(
const String & path,
const ReadSettings & settings = ReadSettings{},
std::optional<size_t> size = {}) const override;

virtual std::unique_ptr<WriteBufferFromFileBase> writeMetaFile(
const String & path,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
WriteMode mode = WriteMode::Rewrite) override;

virtual void removeMetaFileIfExists(
const String & path) override;

UInt32 getRefCount(const String & path) const override;

protected:
Poco::Logger * log;
const String name;
Expand Down
6 changes: 6 additions & 0 deletions src/Storages/IStorage.h
Expand Up @@ -114,6 +114,9 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
/// The name of the table.
StorageID getStorageID() const;

/// Unique ID, synchronized between replicas for replicated storage
virtual String getTableUniqID() const { return ""; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove, only used for StorageReplicatedMergeTree.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


/// Returns true if the storage receives data from a remote server or servers.
virtual bool isRemote() const { return false; }

Expand All @@ -138,6 +141,9 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
/// Returns true if the storage replicates SELECT, INSERT and ALTER commands among replicas.
virtual bool supportsReplication() const { return false; }

/// Returns replica name for replicated storage
virtual String getReplicaName() const { return ""; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used only in replicated storage. We don't need it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


/// Returns true if the storage supports parallel insert.
virtual bool supportsParallelInsert() const { return false; }

Expand Down
14 changes: 14 additions & 0 deletions src/Storages/MergeTree/IMergeTreeDataPart.cpp
Expand Up @@ -1153,6 +1153,14 @@ void IMergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_
storage.lockSharedData(*this);
}

void IMergeTreeDataPart::cleanupOldName(const String & old_part_name) const
{
if (name == old_part_name)
return;

storage.unlockSharedData(*this, old_part_name);
}

std::optional<bool> IMergeTreeDataPart::keepSharedDataInDecoupledStorage() const
{
/// NOTE: It's needed for zero-copy replication
Expand Down Expand Up @@ -1615,6 +1623,12 @@ String IMergeTreeDataPart::getUniqueId() const
}


UInt32 IMergeTreeDataPart::getRefCount() const
{
return volume->getDisk()->getRefCount(fs::path(getFullRelativePath()) / "checksums.txt");
alesapin marked this conversation as resolved.
Show resolved Hide resolved
}


String IMergeTreeDataPart::getZeroLevelPartBlockID() const
{
if (info.level != 0)
Expand Down
11 changes: 9 additions & 2 deletions src/Storages/MergeTree/IMergeTreeDataPart.h
Expand Up @@ -338,6 +338,9 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
/// Changes only relative_dir_name, you need to update other metadata (name, is_temp) explicitly
virtual void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const;

/// Cleanup after change part
virtual void cleanupOldName(const String & old_part_name) const;

/// Makes clone of a part in detached/ directory via hard links
virtual void makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const;

Expand Down Expand Up @@ -404,10 +407,14 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
/// part creation (using alter query with materialize_ttl setting).
bool checkAllTTLCalculated(const StorageMetadataPtr & metadata_snapshot) const;

/// Return some uniq string for file
/// Required for distinguish different copies of the same part on S3
/// Return some uniq string for file.
/// Required for distinguish different copies of the same part on remote FS.
String getUniqueId() const;

/// Return hardlink count for part.
/// Required for keep data on remote FS when part has shadow copies.
UInt32 getRefCount() const;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to clarify that it's not some general-purpose refcount. The name should be something like getNumberOfReferenecesForZeroCopyReplication.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to getNumberOfRefereneces


protected:

/// Total size of all columns, calculated once in calcuateColumnSizesOnDisk
Expand Down
40 changes: 31 additions & 9 deletions src/Storages/MergeTree/MergeTreeData.cpp
Expand Up @@ -62,6 +62,7 @@

#include <boost/range/adaptor/filtered.hpp>
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string/replace.hpp>

#include <base/insertAtEnd.h>
#include <base/scope_guard_safe.h>
Expand Down Expand Up @@ -2457,6 +2458,8 @@ bool MergeTreeData::renameTempPartAndReplace(
MergeTreePartInfo part_info = part->info;
String part_name;

String old_part_name = part->name;

if (DataPartPtr existing_part_in_partition = getAnyPartInPartition(part->info.partition_id, lock))
{
if (part->partition.value != existing_part_in_partition->partition.value)
Expand Down Expand Up @@ -2520,6 +2523,7 @@ bool MergeTreeData::renameTempPartAndReplace(
/// So, we maintain invariant: if a non-temporary part in filesystem then it is in data_parts
///
/// If out_transaction is null, we commit the part to the active set immediately, else add it to the transaction.

part->name = part_name;
part->info = part_info;
part->is_temp = false;
Expand Down Expand Up @@ -2568,6 +2572,8 @@ bool MergeTreeData::renameTempPartAndReplace(
out_covered_parts->emplace_back(std::move(covered_part));
}

part->cleanupOldName(old_part_name);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add something in changelog about it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


return true;
}

Expand Down Expand Up @@ -3906,8 +3912,8 @@ void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, ContextPtr

for (auto & [old_name, new_name, disk] : renamed_parts.old_and_new_names)
{
disk->removeRecursive(fs::path(relative_data_path) / "detached" / new_name / "");
LOG_DEBUG(log, "Dropped detached part {}", old_name);
bool keep_shared = removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name, false);
LOG_DEBUG(log, "Dropped detached part {}, keep shared data: {}", old_name, keep_shared);
old_name.clear();
}
}
Expand Down Expand Up @@ -5195,7 +5201,9 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher(

LOG_DEBUG(log, "Freezing part {} snapshot will be placed at {}", part->name, backup_path);

part->volume->getDisk()->createDirectories(backup_path);
auto disk = part->volume->getDisk();

disk->createDirectories(backup_path);

String src_part_path = part->getFullRelativePath();
String backup_part_path = fs::path(backup_path) / relative_data_path / part->relative_path;
Expand All @@ -5206,16 +5214,18 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher(
src_part_path = fs::path(relative_data_path) / flushed_part_path / "";
}

localBackup(part->volume->getDisk(), src_part_path, backup_part_path);
localBackup(disk, src_part_path, backup_part_path);

freezeMetaData(disk, part, backup_part_path);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment -- "Store metadata for replicated tables and do nothing for ordinary merge tree"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


part->volume->getDisk()->removeFileIfExists(fs::path(backup_part_path) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME);
disk->removeFileIfExists(fs::path(backup_part_path) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME);

part->is_frozen.store(true, std::memory_order_relaxed);
result.push_back(PartitionCommandResultInfo{
.partition_id = part->info.partition_id,
.part_name = part->name,
.backup_path = fs::path(part->volume->getDisk()->getPath()) / backup_path,
.part_backup_path = fs::path(part->volume->getDisk()->getPath()) / backup_part_path,
.backup_path = fs::path(disk->getPath()) / backup_path,
.part_backup_path = fs::path(disk->getPath()) / backup_part_path,
.backup_name = backup_name,
});
++parts_processed;
Expand All @@ -5225,6 +5235,11 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher(
return result;
}

void MergeTreeData::freezeMetaData(DiskPtr, DataPartPtr, String) const
{

}

PartitionCommandsResultInfo MergeTreeData::unfreezePartition(
const ASTPtr & partition,
const String & backup_name,
Expand All @@ -5242,6 +5257,13 @@ PartitionCommandsResultInfo MergeTreeData::unfreezeAll(
return unfreezePartitionsByMatcher([] (const String &) { return true; }, backup_name, local_context);
}

bool MergeTreeData::removeDetachedPart(DiskPtr disk, const String & path, const String &, bool)
{
disk->removeRecursive(path);

return false;
}

PartitionCommandsResultInfo MergeTreeData::unfreezePartitionsByMatcher(MatcherFn matcher, const String & backup_name, ContextPtr)
{
auto backup_path = fs::path("shadow") / escapeForFileName(backup_name) / relative_data_path;
Expand Down Expand Up @@ -5270,7 +5292,7 @@ PartitionCommandsResultInfo MergeTreeData::unfreezePartitionsByMatcher(MatcherFn

const auto & path = it->path();

disk->removeRecursive(path);
bool keep_shared = removeDetachedPart(disk, path, partition_directory, true);

result.push_back(PartitionCommandResultInfo{
.partition_id = partition_id,
Expand All @@ -5280,7 +5302,7 @@ PartitionCommandsResultInfo MergeTreeData::unfreezePartitionsByMatcher(MatcherFn
.backup_name = backup_name,
});

LOG_DEBUG(log, "Unfreezed part by path {}", disk->getPath() + path);
LOG_DEBUG(log, "Unfreezed part by path {}, keep shared data: {}", disk->getPath() + path, keep_shared);
}
}

Expand Down
10 changes: 10 additions & 0 deletions src/Storages/MergeTree/MergeTreeData.h
Expand Up @@ -873,10 +873,20 @@ class MergeTreeData : public IStorage, public WithMutableContext
/// Overridden in StorageReplicatedMergeTree
virtual bool unlockSharedData(const IMergeTreeDataPart &) const { return true; }

/// Remove lock with old name for shared data part after rename
virtual bool unlockSharedData(const IMergeTreeDataPart &, const String &) const { return true; }
alesapin marked this conversation as resolved.
Show resolved Hide resolved

/// Fetch part only if some replica has it on shared storage like S3
/// Overridden in StorageReplicatedMergeTree
virtual bool tryToFetchIfShared(const IMergeTreeDataPart &, const DiskPtr &, const String &) { return false; }

/// Check shared data usage on other replicas for detached/freezed part
/// Remove local files and remote files if needed
virtual bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name, bool is_freezed);

/// Store some metadata for freezed part if needed
virtual void freezeMetaData(DiskPtr disk, DataPartPtr part, String backup_part_path) const;

/// Parts that currently submerging (merging to bigger parts) or emerging
/// (to be appeared after merging finished). These two variables have to be used
/// with `currently_submerging_emerging_mutex`.
Expand Down
6 changes: 4 additions & 2 deletions src/Storages/MergeTree/MergeTreeSettings.h
Expand Up @@ -125,8 +125,10 @@ struct Settings;
M(UInt64, concurrent_part_removal_threshold, 100, "Activate concurrent part removal (see 'max_part_removal_threads') only if the number of inactive data parts is at least this.", 0) \
M(String, storage_policy, "default", "Name of storage disk policy", 0) \
M(Bool, allow_nullable_key, false, "Allow Nullable types as primary keys.", 0) \
M(Bool, allow_remote_fs_zero_copy_replication, true, "Allow Zero-copy replication over remote fs", 0) \
M(Bool, remove_empty_parts, true, "Remove empty parts after they were pruned by TTL, mutation, or collapsing merge algorithm", 0) \
M(Bool, allow_remote_fs_zero_copy_replication, true, "Allow Zero-copy replication over remote fs.", 0) \
M(String, remote_fs_zero_copy_zookeeper_path, "/clickhouse/zero_copy", "ZooKeeper path for Zero-copy table-independet info.", 0) \
M(Bool, remote_fs_zero_copy_path_compatible_mode, false, "Run zero-copy in compatible mode during conversion process.", 0) \
M(Bool, remove_empty_parts, true, "Remove empty parts after they were pruned by TTL, mutation, or collapsing merge algorithm.", 0) \
M(Bool, assign_part_uuids, false, "Generate UUIDs for parts. Before enabling check that all replicas support new format.", 0) \
M(Int64, max_partitions_to_read, -1, "Limit the max number of partitions that can be accessed in one query. <= 0 means unlimited. This setting is the default that can be overridden by the query-level setting with the same name.", 0) \
M(UInt64, max_concurrent_queries, 0, "Max number of concurrently executed queries related to the MergeTree table (0 - disabled). Queries will still be limited by other max_concurrent_queries settings.", 0) \
Expand Down
4 changes: 4 additions & 0 deletions src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp
Expand Up @@ -228,6 +228,8 @@ void ReplicatedMergeTreeSink::commitPart(

bool is_already_existing_part = false;

String old_part_name = part->name;

while (true)
{
/// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem.
Expand Down Expand Up @@ -508,6 +510,8 @@ void ReplicatedMergeTreeSink::commitPart(

waitForQuorum(zookeeper, part->name, quorum_info.status_path, quorum_info.is_active_node_value);
}

part->cleanupOldName(old_part_name);
}

void ReplicatedMergeTreeSink::onStart()
Expand Down