Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry disconnects and expired sessions when reading system.zookeeper #59388

Merged
merged 6 commits into from Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Backups/BackupEntriesCollector.h
Expand Up @@ -6,7 +6,7 @@
#include <Parsers/ASTBackupQuery.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/TableLockHolder.h>
#include <Storages/MergeTree/ZooKeeperRetries.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
#include <filesystem>
#include <queue>

Expand Down
2 changes: 1 addition & 1 deletion src/Backups/WithRetries.h
@@ -1,6 +1,6 @@
#pragma once

#include <Storages/MergeTree/ZooKeeperRetries.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
#include <Common/ZooKeeper/Common.h>
#include <Common/ZooKeeper/ZooKeeperWithFaultInjection.h>

Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/executeDDLQueryOnCluster.h
Expand Up @@ -5,7 +5,7 @@
#include <Processors/ISource.h>
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/MergeTree/ZooKeeperRetries.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>


namespace zkutil
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/ReplicatedMergeTreeSink.h
Expand Up @@ -3,7 +3,7 @@
#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <base/types.h>
#include <Storages/MergeTree/ZooKeeperRetries.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
#include <Common/ZooKeeper/ZooKeeperWithFaultInjection.h>
#include <Storages/MergeTree/AsyncBlockIDsCache.h>

Expand Down
39 changes: 36 additions & 3 deletions src/Storages/System/StorageSystemZooKeeper.cpp
Expand Up @@ -12,6 +12,8 @@
#include <Interpreters/ProcessList.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
#include <Common/ZooKeeper/ZooKeeperWithFaultInjection.h>
#include <Common/typeid_cast.h>
#include <Columns/ColumnSet.h>
#include <Columns/ColumnConst.h>
Expand Down Expand Up @@ -426,7 +428,30 @@ void ReadFromSystemZooKeeper::applyFilters()

void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
{
zkutil::ZooKeeperPtr zookeeper = context->getZooKeeper();
QueryStatusPtr query_status = context->getProcessListElement();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the whole pack:

  • Use insert_keeper_max_retries and similar for settings. I hope we will unify it with backup_restore_keeper_max_retries and have a single common setting for retries at some point, so we shouldn't add a new one.
  • Use ZooKeeperWithFaultInjection and refresh it in the loop if necessary.


const auto & settings = context->getSettingsRef();
/// Use insert settings for now in order not to introduce new settings.
/// Hopefully insert settings will also be unified and replaced with some generic retry settings.
ZooKeeperRetriesInfo retries_seetings(
settings.insert_keeper_max_retries,
settings.insert_keeper_retry_initial_backoff_ms,
settings.insert_keeper_retry_max_backoff_ms);

ZooKeeperWithFaultInjection::Ptr zookeeper;
/// Handles reconnects when needed
auto get_zookeeper = [&] ()
{
if (!zookeeper || zookeeper->expired())
{
zookeeper = ZooKeeperWithFaultInjection::createInstance(
settings.insert_keeper_fault_injection_probability,
settings.insert_keeper_fault_injection_seed,
context->getZooKeeper(),
"", nullptr);
}
return zookeeper;
};

if (paths.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
Expand All @@ -448,6 +473,9 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
std::unordered_set<String> added;
while (!paths.empty())
{
if (query_status)
query_status->checkTimeLimit();

list_tasks.clear();
std::vector<String> paths_to_list;
while (!paths.empty() && static_cast<Int64>(list_tasks.size()) < max_inflight_requests)
Expand All @@ -470,7 +498,10 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
paths_to_list.emplace_back(task.path_corrected);
list_tasks.emplace_back(std::move(task));
}
auto list_responses = zookeeper->tryGetChildren(paths_to_list);

zkutil::ZooKeeper::MultiTryGetChildrenResponse list_responses;
ZooKeeperRetriesControl("", nullptr, retries_seetings, query_status).retryLoop(
[&]() { list_responses = get_zookeeper()->tryGetChildren(paths_to_list); });

struct GetTask
{
Expand Down Expand Up @@ -514,7 +545,9 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
}
}

auto get_responses = zookeeper->tryGet(paths_to_get);
zkutil::ZooKeeper::MultiTryGetResponse get_responses;
ZooKeeperRetriesControl("", nullptr, retries_seetings, query_status).retryLoop(
[&]() { get_responses = get_zookeeper()->tryGet(paths_to_get); });

for (size_t i = 0, size = get_tasks.size(); i < size; ++i)
{
Expand Down
@@ -0,0 +1,3 @@
/keeper api_version
/keeper feature_flags
1
22 changes: 22 additions & 0 deletions tests/queries/0_stateless/02975_system_zookeeper_retries.sql
@@ -0,0 +1,22 @@
-- Tags: zookeeper, no-parallel, no-fasttest

SELECT path, name
FROM system.zookeeper
WHERE path = '/keeper'
ORDER BY path, name
SETTINGS
insert_keeper_retry_initial_backoff_ms = 1,
insert_keeper_retry_max_backoff_ms = 20,
insert_keeper_fault_injection_probability=0.3,
insert_keeper_fault_injection_seed=4,
log_comment='02975_system_zookeeper_retries';


SYSTEM FLUSH LOGS;

-- Check that there where zk session failures
SELECT ProfileEvents['ZooKeeperHardwareExceptions'] > 0
FROM system.query_log
WHERE current_database = currentDatabase() AND type = 'QueryFinish' AND log_comment='02975_system_zookeeper_retries'
ORDER BY event_time_microseconds DESC
LIMIT 1;