Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry disconnects and expired sessions when reading system.zookeeper #59388

Merged
merged 6 commits into from Feb 2, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 36 additions & 3 deletions src/Storages/System/StorageSystemZooKeeper.cpp
Expand Up @@ -424,9 +424,35 @@ void ReadFromSystemZooKeeper::applyFilters()
paths = extractPath(getFilterNodes().nodes, context, context->getSettingsRef().allow_unrestricted_reads_from_keeper);
}

/// Executes a request to Keeper and retries it in case of expired sessions and disconnects
template <typename Result, typename Operation>
static Result runWithReconnects(Operation && operation, ContextPtr context, QueryStatusPtr query_status)
{
constexpr int max_retries = 20; /// Limit retries by some reasonable number to avoid infinite loops
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't this using ZooKeeperRetriesControl? It's better to adapt that if we need to rather than re-implement retries

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok
Do you think we should move ZooKeeperRetriesControl from Storages/MergeTree to Common?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely. Ideally we should be using ZooKeeperRetriesControl + ZooKeeperWithFaultInjectionPtr everywhere so we handle both injections and retries.

for (int attempt = 0; ; ++attempt)
{
if (query_status)
query_status->checkTimeLimit();

zkutil::ZooKeeperPtr keeper = context->getZooKeeper();

try
{
return operation(keeper);
}
catch (const Coordination::Exception & e)
{
if (!Coordination::isHardwareError(e.code) ||
attempt >= max_retries ||
e.code == Coordination::Error::ZOPERATIONTIMEOUT)
throw;
}
}
}

void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
{
zkutil::ZooKeeperPtr zookeeper = context->getZooKeeper();
QueryStatusPtr query_status = context->getProcessListElement();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the whole pack:

  • Use insert_keeper_max_retries and similar for settings. I hope we will unify it with backup_restore_keeper_max_retries and have a single common setting for retries at some point, so we shouldn't add a new one.
  • Use ZooKeeperWithFaultInjection and refresh it in the loop if necessary.


if (paths.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
Expand All @@ -448,6 +474,9 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
std::unordered_set<String> added;
while (!paths.empty())
{
if (query_status)
query_status->checkTimeLimit();

list_tasks.clear();
std::vector<String> paths_to_list;
while (!paths.empty() && static_cast<Int64>(list_tasks.size()) < max_inflight_requests)
Expand All @@ -470,7 +499,9 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
paths_to_list.emplace_back(task.path_corrected);
list_tasks.emplace_back(std::move(task));
}
auto list_responses = zookeeper->tryGetChildren(paths_to_list);
auto list_responses = runWithReconnects<zkutil::ZooKeeper::MultiTryGetChildrenResponse>(
[&paths_to_list](zkutil::ZooKeeperPtr zookeeper) { return zookeeper->tryGetChildren(paths_to_list); },
context, query_status);

struct GetTask
{
Expand Down Expand Up @@ -514,7 +545,9 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
}
}

auto get_responses = zookeeper->tryGet(paths_to_get);
auto get_responses = runWithReconnects<zkutil::ZooKeeper::MultiTryGetResponse>(
[&paths_to_get](zkutil::ZooKeeperPtr zookeeper) { return zookeeper->tryGet(paths_to_get); },
context, query_status);

for (size_t i = 0, size = get_tasks.size(); i < size; ++i)
{
Expand Down