Skip to content

Commit

Permalink
Recover special handling of ephemeral nodes in ZooKeeperWithFaultInje…
Browse files Browse the repository at this point in the history
…ction
  • Loading branch information
Algunenano committed Nov 20, 2023
1 parent 8217915 commit 04f966c
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 26 deletions.
139 changes: 126 additions & 13 deletions src/Common/ZooKeeper/ZooKeeperWithFaultInjection.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <base/defines.h>
#include <Common/ZooKeeper/ZooKeeperWithFaultInjection.h>

namespace DB
Expand All @@ -17,6 +18,68 @@ ZooKeeperWithFaultInjection::ZooKeeperWithFaultInjection(
{
}

void ZooKeeperWithFaultInjection::resetKeeper()
{
/// When an error is injected, we need to reset keeper for several reasons
/// a) Avoid processing further requests in this keeper (in async code)
/// b) Simulate a fault as ZooKeeperImpl does, forcing a new session (which drops ephemeral nodes)
///
/// Ideally we would call `keeper->finalize("Fault injection");` to force the session reload.
/// The problem with that is that many operations currently aren't able to cope with keeper faults correctly,
/// so they would fail. While this is what happens in production, it's not what we want in the CI.
///
/// Until all the code can handle keeper session resets, we need to simulate it so the code that relies on its
/// behaviour keeps working. An example of such code is insert block ids: If keeper dies between the block id being
/// reserved (via ephemeral node) and the metadata being pushed, the reserved block id will be deleted automatically
/// in keeper (connection drop == delete all ephemeral nodes attached to that connection). This way retrying and
/// getting a new block id is ok. But without a connection reset (because ZooKeeperWithFaultInjection doesn't
/// enforce it yet), the old ephemeral nodes associated with "committing_blocks" will still be there and operations
/// such as block merges, mutations, etc., will think they are alive and wait for them to be ready (which will never
/// happen)
/// Our poor man session reload is to keep track of ephemeral nodes created by this Faulty keeper and delete
/// them manually when we force a fault. This is obviously limited as it will only apply for operations processed by
/// this instance, but let's trust more and more code can handle session reloads and we can eliminate the hack.
/// Until that time, the hack remains.
if (keeper)
{
for (const auto & path_created : session_ephemeral_nodes)
{
try
{
keeper->remove(path_created);
}
catch (const Coordination::Exception &)
{
if (logger)
LOG_TRACE(logger, "Failed to delete ephemeral node ({}) during fault cleanup", path_created);
}
}
}


keeper.reset();
}

void ZooKeeperWithFaultInjection::multiResponseSaveEphemeralNodePaths(
const Coordination::Requests & requests, const Coordination::Responses & responses)
{
if (responses.empty())
return;

chassert(requests.size() == responses.size());

for (size_t i = 0; i < requests.size(); i++)
{
const auto * create_req = dynamic_cast<const Coordination::CreateRequest *>(requests[i].get());
if (create_req && create_req->is_ephemeral)
{
const auto * create_resp = dynamic_cast<const Coordination::CreateResponse *>(responses.at(i).get());
chassert(create_resp);
session_ephemeral_nodes.emplace_back(create_resp->path_created);
}
}
}

void ZooKeeperWithFaultInjection::injectFailureBeforeOperationThrow(const char * func_name, const String & path)
{
if (unlikely(!keeper))
Expand All @@ -31,7 +94,7 @@ void ZooKeeperWithFaultInjection::injectFailureBeforeOperationThrow(const char *

if (unlikely(fault_policy) && fault_policy->beforeOperation())
{
keeper.reset();
resetKeeper();
if (logger)
LOG_TRACE(
logger,
Expand All @@ -49,7 +112,7 @@ void ZooKeeperWithFaultInjection::injectFailureAfterOperationThrow(const char *
{
if (unlikely(fault_policy) && fault_policy->afterOperation())
{
keeper.reset();
resetKeeper();
if (logger)
LOG_TRACE(
logger,
Expand Down Expand Up @@ -96,7 +159,7 @@ bool ZooKeeperWithFaultInjection::injectFailureBeforeOperationPromise(const char

if (unlikely(fault_policy) && fault_policy->beforeOperation())
{
keeper.reset();
resetKeeper();
if (logger)
LOG_TRACE(
logger, "ZooKeeperWithFaultInjection injected fault before operation: seed={} func={} path={}", seed, func_name, path);
Expand All @@ -112,7 +175,7 @@ bool ZooKeeperWithFaultInjection::injectFailureAfterOperationPromise(const char
{
if (unlikely(fault_policy) && fault_policy->afterOperation())
{
keeper.reset();
resetKeeper();
promise->set_exception(std::make_exception_ptr(
zkutil::KeeperException::fromMessage(RandomFaultInjection::error_after_op, RandomFaultInjection::msg_after_op)));
if (logger)
Expand Down Expand Up @@ -234,23 +297,52 @@ zkutil::ZooKeeper::MultiExistsResponse ZooKeeperWithFaultInjection::exists(const

std::string ZooKeeperWithFaultInjection::create(const std::string & path, const std::string & data, int32_t mode)
{
return executeWithFaultSync(__func__, path, [&]() { return keeper->create(path, data, mode); });
return executeWithFaultSync(
__func__,
path,
[&]()
{
auto path_created = keeper->create(path, data, mode);
if (unlikely(fault_policy) && (mode == zkutil::CreateMode::EphemeralSequential || mode == zkutil::CreateMode::Ephemeral))
session_ephemeral_nodes.emplace_back(path_created);
return path_created;
});
}

Coordination::Error
ZooKeeperWithFaultInjection::tryCreate(const std::string & path, const std::string & data, int32_t mode, std::string & path_created)
{
return executeWithFaultSync(__func__, path, [&]() { return keeper->tryCreate(path, data, mode, path_created); });
return executeWithFaultSync(
__func__,
path,
[&]()
{
Coordination::Error code = keeper->tryCreate(path, data, mode, path_created);
if (unlikely(fault_policy) && code == Coordination::Error::ZOK
&& (mode == zkutil::CreateMode::EphemeralSequential || mode == zkutil::CreateMode::Ephemeral))
session_ephemeral_nodes.emplace_back(path_created);
return code;
});
}

Coordination::Error ZooKeeperWithFaultInjection::tryCreate(const std::string & path, const std::string & data, int32_t mode)
{
return executeWithFaultSync(__func__, path, [&]() { return keeper->tryCreate(path, data, mode); });
std::string path_created;
return tryCreate(path, data, mode, path_created);
}

Coordination::Responses ZooKeeperWithFaultInjection::multi(const Coordination::Requests & requests)
{
return executeWithFaultSync(__func__, !requests.empty() ? requests.front()->getPath() : "", [&]() { return keeper->multi(requests); });
return executeWithFaultSync(
__func__,
!requests.empty() ? requests.front()->getPath() : "",
[&]()
{
auto responses = keeper->multi(requests);
if (unlikely(fault_policy))
multiResponseSaveEphemeralNodePaths(requests, responses);
return responses;
});
}

void ZooKeeperWithFaultInjection::createIfNotExists(const std::string & path, const std::string & data)
Expand All @@ -260,6 +352,7 @@ void ZooKeeperWithFaultInjection::createIfNotExists(const std::string & path, co

void ZooKeeperWithFaultInjection::createOrUpdate(const std::string & path, const std::string & data, int32_t mode)
{
chassert(mode != zkutil::CreateMode::EphemeralSequential && mode != zkutil::CreateMode::Ephemeral);
return executeWithFaultSync(__func__, path, [&]() { return keeper->createOrUpdate(path, data, mode); });
}

Expand Down Expand Up @@ -325,7 +418,15 @@ void ZooKeeperWithFaultInjection::deleteEphemeralNodeIfContentMatches(
Coordination::Error ZooKeeperWithFaultInjection::tryMulti(const Coordination::Requests & requests, Coordination::Responses & responses)
{
return executeWithFaultSync(
__func__, !requests.empty() ? requests.front()->getPath() : "", [&]() { return keeper->tryMulti(requests, responses); });
__func__,
!requests.empty() ? requests.front()->getPath() : "",
[&]()
{
auto code = keeper->tryMulti(requests, responses);
if (unlikely(fault_policy) && code == Coordination::Error::ZOK)
multiResponseSaveEphemeralNodePaths(requests, responses);
return code;
});
}

Coordination::Error
Expand Down Expand Up @@ -391,14 +492,26 @@ zkutil::ZooKeeper::FutureGet ZooKeeperWithFaultInjection::asyncTryGet(std::strin

zkutil::ZooKeeper::FutureMulti ZooKeeperWithFaultInjection::asyncTryMultiNoThrow(const Coordination::Requests & ops)
{
#ifndef NDEBUG
/// asyncTryMultiNoThrow is not setup to handle faults with ephemeral nodes
/// To do it we'd need to look at ops and save the indexes BEFORE the callback, as the ops are not
/// guaranteed to live until then
for (size_t i = 0; i < ops.size(); i++)
{
const auto * create_req = dynamic_cast<const Coordination::CreateRequest *>(ops[i].get());
if (create_req)
chassert(!create_req->is_ephemeral);
}
#endif

auto promise = std::make_shared<std::promise<Coordination::MultiResponse>>();
auto future = promise->get_future();
size_t request_size = ops.size();
String path = ops.empty() ? "" : ops.front()->getPath();

if (!keeper || (unlikely(fault_policy) && fault_policy->beforeOperation()))
{
keeper.reset();
resetKeeper();
if (logger)
LOG_TRACE(logger, "ZooKeeperWithFaultInjection injected fault before operation: seed={} func={} path={}", seed, __func__, path);
Coordination::MultiResponse errors;
Expand All @@ -416,7 +529,7 @@ zkutil::ZooKeeper::FutureMulti ZooKeeperWithFaultInjection::asyncTryMultiNoThrow
{
if (unlikely(fault_policy) && fault_policy->afterOperation())
{
keeper.reset();
resetKeeper();
if (logger)
LOG_TRACE(
logger, "ZooKeeperWithFaultInjection injected fault after operation: seed={} func={} path={}", seed, __func__, path);
Expand Down Expand Up @@ -472,7 +585,7 @@ zkutil::ZooKeeper::FutureRemove ZooKeeperWithFaultInjection::asyncTryRemoveNoThr

if (!keeper || (unlikely(fault_policy) && fault_policy->beforeOperation()))
{
keeper.reset();
resetKeeper();
if (logger)
LOG_TRACE(logger, "ZooKeeperWithFaultInjection injected fault before operation: seed={} func={} path={}", seed, __func__, path);
Coordination::RemoveResponse r;
Expand All @@ -485,7 +598,7 @@ zkutil::ZooKeeper::FutureRemove ZooKeeperWithFaultInjection::asyncTryRemoveNoThr
{
if (unlikely(fault_policy) && fault_policy->afterOperation())
{
keeper.reset();
resetKeeper();
if (logger)
LOG_TRACE(
logger, "ZooKeeperWithFaultInjection injected fault after operation: seed={} func={} path={}", seed, __func__, path);
Expand Down
5 changes: 5 additions & 0 deletions src/Common/ZooKeeper/ZooKeeperWithFaultInjection.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class ZooKeeperWithFaultInjection
Poco::Logger * logger = nullptr;
const UInt64 seed = 0;

std::vector<std::string> session_ephemeral_nodes;

template <typename Operation>
std::invoke_result_t<Operation> executeWithFaultSync(const char * func_name, const std::string & path, Operation);
void injectFailureBeforeOperationThrow(const char * func_name, const String & path);
Expand All @@ -73,6 +75,9 @@ class ZooKeeperWithFaultInjection
template <typename Promise>
bool injectFailureAfterOperationPromise(const char * func_name, Promise & promise, const String & path);

void resetKeeper();
void multiResponseSaveEphemeralNodePaths(const Coordination::Requests & requests, const Coordination::Responses & responses);

public:
using Ptr = std::shared_ptr<ZooKeeperWithFaultInjection>;

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1935,7 +1935,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep

/// We need to check committing block numbers and new parts which could be committed.
/// Actually we don't need most of predicate logic here but it all the code related to committing blocks
/// and updatating queue state is implemented there.
/// and updating queue state is implemented there.
PartitionIdsHint partition_ids_hint;
for (const auto & candidate : candidates)
for (const auto & partitions : candidate->block_numbers)
Expand Down
30 changes: 18 additions & 12 deletions src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -883,16 +883,17 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::

fiu_do_on(FailPoints::replicated_merge_tree_commit_zk_fail_after_op, { zookeeper->forceFailureAfterOperation(); });

Coordination::Responses responses;
Coordination::Error multi_code = zookeeper->tryMultiNoThrow(ops, responses); /// 1 RTT
if (multi_code == Coordination::Error::ZOK)
auto action_when_ok = [&]
{
auto sleep_before_commit_local_part_in_replicated_table_ms = storage.getSettings()->sleep_before_commit_local_part_in_replicated_table_ms;
if (sleep_before_commit_local_part_in_replicated_table_ms.totalMilliseconds())
auto sleep_before_commit_ms = storage.getSettings()->sleep_before_commit_local_part_in_replicated_table_ms.totalMilliseconds();
if (sleep_before_commit_ms)
{
LOG_INFO(log, "committing part {}, triggered sleep_before_commit_local_part_in_replicated_table_ms {}",
part->name, sleep_before_commit_local_part_in_replicated_table_ms.totalMilliseconds());
sleepForMilliseconds(sleep_before_commit_local_part_in_replicated_table_ms.totalMilliseconds());
LOG_DEBUG(
log,
"committing part {}, triggered sleep_before_commit_local_part_in_replicated_table_ms {}",
part->name,
sleep_before_commit_ms);
sleepForMilliseconds(sleep_before_commit_ms);
}

part->new_part_was_committed_to_zookeeper_after_rename_on_disk = true;
Expand All @@ -902,6 +903,13 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
/// Lock nodes have been already deleted, do not delete them in destructor
if (block_number_lock)
block_number_lock->assumeUnlocked();
};

Coordination::Responses responses;
Coordination::Error multi_code = zookeeper->tryMultiNoThrow(ops, responses); /// 1 RTT
if (multi_code == Coordination::Error::ZOK)
{
action_when_ok();
}
else if (multi_code == Coordination::Error::ZNONODE && zkutil::getFailedOpIndex(multi_code, responses) == block_unlock_op_idx)
{
Expand Down Expand Up @@ -944,17 +952,15 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
if (node_exists)
{
LOG_TRACE(log, "Insert of part {} recovered from keeper successfully. It will be committed", part->name);
part->new_part_was_committed_to_zookeeper_after_rename_on_disk = true;
transaction.commit();
storage.merge_selecting_task->schedule();
action_when_ok();
}
else
{
LOG_TRACE(log, "Insert of part {} was not committed to keeper. Will try again with a new block", part->name);
rename_part_to_temporary();
retries_ctl.setUserError(
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR,
"Insert of part {} failed when committing to keeper (Reason: {}",
"Insert of part {} failed when committing to keeper (Reason: {})",
part->name,
multi_code);
}
Expand Down

0 comments on commit 04f966c

Please sign in to comment.