Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock zero copy parts more atomically #49211

Merged
merged 17 commits into from May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/Common/ZooKeeper/ZooKeeper.cpp
Expand Up @@ -342,6 +342,31 @@ void ZooKeeper::createAncestors(const std::string & path)
}
}

void ZooKeeper::checkExistsAndGetCreateAncestorsOps(const std::string & path, Coordination::Requests & requests)
{
std::vector<std::string> paths_to_check;
size_t pos = 1;
while (true)
{
pos = path.find('/', pos);
if (pos == std::string::npos)
break;
paths_to_check.emplace_back(path.substr(0, pos));
++pos;
}

MultiExistsResponse response = exists(paths_to_check);

for (size_t i = 0; i < paths_to_check.size(); ++i)
{
if (response[i].error != Coordination::Error::ZOK)
{
/// Ephemeral nodes cannot have children
requests.emplace_back(makeCreateRequest(paths_to_check[i], "", CreateMode::Persistent));
}
}
}

Coordination::Error ZooKeeper::removeImpl(const std::string & path, int32_t version)
{
auto future_result = asyncTryRemoveNoThrow(path, version);
Expand Down
4 changes: 2 additions & 2 deletions src/Common/ZooKeeper/ZooKeeper.h
Expand Up @@ -237,6 +237,8 @@ class ZooKeeper
/// Does not create the node itself.
void createAncestors(const std::string & path);

void checkExistsAndGetCreateAncestorsOps(const std::string & path, Coordination::Requests & requests);

/// Remove the node if the version matches. (if version == -1, remove any version).
void remove(const std::string & path, int32_t version = -1);

Expand Down Expand Up @@ -522,8 +524,6 @@ class ZooKeeper
void setServerCompletelyStarted();

private:
friend class EphemeralNodeHolder;

void init(ZooKeeperArgs args_);

/// The following methods don't any throw exceptions but return error codes.
Expand Down
4 changes: 4 additions & 0 deletions src/Common/ZooKeeper/ZooKeeperWithFaultInjection.h
Expand Up @@ -361,6 +361,10 @@ class ZooKeeperWithFaultInjection
return access("trySet", path, [&]() { return keeper->trySet(path, data, version, stat); });
}

void checkExistsAndGetCreateAncestorsOps(const std::string & path, Coordination::Requests & requests)
{
return access("checkExistsAndGetCreateAncestorsOps", path, [&]() { return keeper->checkExistsAndGetCreateAncestorsOps(path, requests); });
}

void handleEphemeralNodeExistenceNoFailureInjection(const std::string & path, const std::string & fast_delete_if_equal_value)
{
Expand Down
7 changes: 7 additions & 0 deletions src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp
Expand Up @@ -578,6 +578,9 @@ void DataPartStorageOnDiskBase::remove(
if (e.code() == ErrorCodes::FILE_DOESNT_EXIST)
{
LOG_ERROR(log, "Directory {} (part to remove) doesn't exist or one of nested files has gone. Most likely this is due to manual removing. This should be discouraged. Ignoring.", fullPath(disk, from));
/// We will never touch this part again, so unlocking it from zero-copy
if (!can_remove_description)
can_remove_description.emplace(can_remove_callback());
return;
}
throw;
Expand All @@ -588,6 +591,10 @@ void DataPartStorageOnDiskBase::remove(
{
LOG_ERROR(log, "Directory {} (part to remove) doesn't exist or one of nested files has gone. "
"Most likely this is due to manual removing. This should be discouraged. Ignoring.", fullPath(disk, from));
/// We will never touch this part again, so unlocking it from zero-copy
if (!can_remove_description)
can_remove_description.emplace(can_remove_callback());

return;
}
throw;
Expand Down
7 changes: 5 additions & 2 deletions src/Storages/MergeTree/DataPartsExchange.cpp
Expand Up @@ -821,6 +821,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
const auto data_settings = data.getSettings();
MergeTreeData::DataPart::Checksums data_checksums;

zkutil::EphemeralNodeHolderPtr zero_copy_temporary_lock_holder;
if (to_remote_disk)
{
readStringBinary(part_id, in);
Expand All @@ -829,7 +830,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
throw Exception(ErrorCodes::ZERO_COPY_REPLICATION_ERROR, "Part {} unique id {} doesn't exist on {} (with type {}).", part_name, part_id, disk->getName(), toString(disk->getDataSourceDescription().type));

LOG_DEBUG(log, "Downloading part {} unique id {} metadata onto disk {}.", part_name, part_id, disk->getName());
data.lockSharedDataTemporary(part_name, part_id, disk);
zero_copy_temporary_lock_holder = data.lockSharedDataTemporary(part_name, part_id, disk);
}
else
{
Expand Down Expand Up @@ -938,7 +939,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(

if (to_remote_disk)
{
data.lockSharedData(*new_data_part, /* replace_existing_lock = */ true, {});
LOG_DEBUG(log, "Download of part {} unique id {} metadata onto disk {} finished.", part_name, part_id, disk->getName());
}
else
Expand All @@ -948,6 +948,9 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
LOG_DEBUG(log, "Download of part {} onto disk {} finished.", part_name, disk->getName());
}

if (zero_copy_temporary_lock_holder)
zero_copy_temporary_lock_holder->setAlreadyRemoved();

return new_data_part;
}

Expand Down