Skip to content

Commit

Permalink
Merge pull request #49211 from ClickHouse/fix_zero_copy_not_atomic
Browse files Browse the repository at this point in the history
Lock zero copy parts more atomically
  • Loading branch information
alesapin committed May 3, 2023
2 parents d3c7054 + 6f3f202 commit 5504475
Show file tree
Hide file tree
Showing 9 changed files with 285 additions and 79 deletions.
25 changes: 25 additions & 0 deletions src/Common/ZooKeeper/ZooKeeper.cpp
Expand Up @@ -342,6 +342,31 @@ void ZooKeeper::createAncestors(const std::string & path)
}
}

void ZooKeeper::checkExistsAndGetCreateAncestorsOps(const std::string & path, Coordination::Requests & requests)
{
std::vector<std::string> paths_to_check;
size_t pos = 1;
while (true)
{
pos = path.find('/', pos);
if (pos == std::string::npos)
break;
paths_to_check.emplace_back(path.substr(0, pos));
++pos;
}

MultiExistsResponse response = exists(paths_to_check);

for (size_t i = 0; i < paths_to_check.size(); ++i)
{
if (response[i].error != Coordination::Error::ZOK)
{
/// Ephemeral nodes cannot have children
requests.emplace_back(makeCreateRequest(paths_to_check[i], "", CreateMode::Persistent));
}
}
}

Coordination::Error ZooKeeper::removeImpl(const std::string & path, int32_t version)
{
auto future_result = asyncTryRemoveNoThrow(path, version);
Expand Down
4 changes: 2 additions & 2 deletions src/Common/ZooKeeper/ZooKeeper.h
Expand Up @@ -237,6 +237,8 @@ class ZooKeeper
/// Does not create the node itself.
void createAncestors(const std::string & path);

void checkExistsAndGetCreateAncestorsOps(const std::string & path, Coordination::Requests & requests);

/// Remove the node if the version matches. (if version == -1, remove any version).
void remove(const std::string & path, int32_t version = -1);

Expand Down Expand Up @@ -522,8 +524,6 @@ class ZooKeeper
void setServerCompletelyStarted();

private:
friend class EphemeralNodeHolder;

void init(ZooKeeperArgs args_);

/// The following methods don't any throw exceptions but return error codes.
Expand Down
4 changes: 4 additions & 0 deletions src/Common/ZooKeeper/ZooKeeperWithFaultInjection.h
Expand Up @@ -361,6 +361,10 @@ class ZooKeeperWithFaultInjection
return access("trySet", path, [&]() { return keeper->trySet(path, data, version, stat); });
}

void checkExistsAndGetCreateAncestorsOps(const std::string & path, Coordination::Requests & requests)
{
return access("checkExistsAndGetCreateAncestorsOps", path, [&]() { return keeper->checkExistsAndGetCreateAncestorsOps(path, requests); });
}

void handleEphemeralNodeExistenceNoFailureInjection(const std::string & path, const std::string & fast_delete_if_equal_value)
{
Expand Down
7 changes: 7 additions & 0 deletions src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp
Expand Up @@ -578,6 +578,9 @@ void DataPartStorageOnDiskBase::remove(
if (e.code() == ErrorCodes::FILE_DOESNT_EXIST)
{
LOG_ERROR(log, "Directory {} (part to remove) doesn't exist or one of nested files has gone. Most likely this is due to manual removing. This should be discouraged. Ignoring.", fullPath(disk, from));
/// We will never touch this part again, so unlocking it from zero-copy
if (!can_remove_description)
can_remove_description.emplace(can_remove_callback());
return;
}
throw;
Expand All @@ -588,6 +591,10 @@ void DataPartStorageOnDiskBase::remove(
{
LOG_ERROR(log, "Directory {} (part to remove) doesn't exist or one of nested files has gone. "
"Most likely this is due to manual removing. This should be discouraged. Ignoring.", fullPath(disk, from));
/// We will never touch this part again, so unlocking it from zero-copy
if (!can_remove_description)
can_remove_description.emplace(can_remove_callback());

return;
}
throw;
Expand Down
7 changes: 5 additions & 2 deletions src/Storages/MergeTree/DataPartsExchange.cpp
Expand Up @@ -821,6 +821,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
const auto data_settings = data.getSettings();
MergeTreeData::DataPart::Checksums data_checksums;

zkutil::EphemeralNodeHolderPtr zero_copy_temporary_lock_holder;
if (to_remote_disk)
{
readStringBinary(part_id, in);
Expand All @@ -829,7 +830,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
throw Exception(ErrorCodes::ZERO_COPY_REPLICATION_ERROR, "Part {} unique id {} doesn't exist on {} (with type {}).", part_name, part_id, disk->getName(), toString(disk->getDataSourceDescription().type));

LOG_DEBUG(log, "Downloading part {} unique id {} metadata onto disk {}.", part_name, part_id, disk->getName());
data.lockSharedDataTemporary(part_name, part_id, disk);
zero_copy_temporary_lock_holder = data.lockSharedDataTemporary(part_name, part_id, disk);
}
else
{
Expand Down Expand Up @@ -938,7 +939,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(

if (to_remote_disk)
{
data.lockSharedData(*new_data_part, /* replace_existing_lock = */ true, {});
LOG_DEBUG(log, "Download of part {} unique id {} metadata onto disk {} finished.", part_name, part_id, disk->getName());
}
else
Expand All @@ -948,6 +948,9 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
LOG_DEBUG(log, "Download of part {} onto disk {} finished.", part_name, disk->getName());
}

if (zero_copy_temporary_lock_holder)
zero_copy_temporary_lock_holder->setAlreadyRemoved();

return new_data_part;
}

Expand Down

0 comments on commit 5504475

Please sign in to comment.