Skip to content

Commit

Permalink
Merge pull request #50424 from tonickkozlov/tonickkozlov/zk-session-l…
Browse files Browse the repository at this point in the history
…ifetime

Introduce fallback ZooKeeper sessions
  • Loading branch information
tavplubix committed Jul 31, 2023
2 parents 891ae75 + 5dfc305 commit 01f05e1
Show file tree
Hide file tree
Showing 15 changed files with 229 additions and 36 deletions.
Expand Up @@ -2288,6 +2288,8 @@ This section contains the following parameters:
- `session_timeout_ms` — Maximum timeout for the client session in milliseconds.
- `operation_timeout_ms` — Maximum timeout for one operation in milliseconds.
- `root` — The [znode](http://zookeeper.apache.org/doc/r3.5.5/zookeeperOver.html#Nodes+and+ephemeral+nodes) that is used as the root for znodes used by the ClickHouse server. Optional.
- `fallback_session_lifetime.min` - If the first zookeeper host resolved by zookeeper_load_balancing strategy is unavailable, limit the lifetime of a zookeeper session to the fallback node. This is done for load-balancing purposes to avoid excessive load on one of zookeeper hosts. This setting sets the minimal duration of the fallback session. Set in seconds. Optional. Default is 3 hours.
- `fallback_session_lifetime.max` - If the first zookeeper host resolved by zookeeper_load_balancing strategy is unavailable, limit the lifetime of a zookeeper session to the fallback node. This is done for load-balancing purposes to avoid excessive load on one of zookeeper hosts. This setting sets the maximum duration of the fallback session. Set in seconds. Optional. Default is 6 hours.
- `identity` — User and password, that can be required by ZooKeeper to give access to requested znodes. Optional.
- zookeeper_load_balancing - Specifies the algorithm of ZooKeeper node selection.
* random - randomly selects one of ZooKeeper nodes.
Expand Down
6 changes: 4 additions & 2 deletions src/Common/ZooKeeper/IKeeper.h
Expand Up @@ -492,8 +492,6 @@ class IKeeper
/// Useful to check owner of ephemeral node.
virtual int64_t getSessionID() const = 0;

virtual Poco::Net::SocketAddress getConnectedAddress() const = 0;

/// If the method will throw an exception, callbacks won't be called.
///
/// After the method is executed successfully, you must wait for callbacks
Expand Down Expand Up @@ -566,6 +564,10 @@ class IKeeper

virtual const DB::KeeperFeatureFlags * getKeeperFeatureFlags() const { return nullptr; }

/// A ZooKeeper session can have an optional deadline set on it.
/// After it has been reached, the session needs to be finalized.
virtual bool hasReachedDeadline() const = 0;

/// Expire session and finish all pending requests
virtual void finalize(const String & reason) = 0;
};
Expand Down
4 changes: 1 addition & 3 deletions src/Common/ZooKeeper/TestKeeper.h
Expand Up @@ -39,8 +39,8 @@ class TestKeeper final : public IKeeper
~TestKeeper() override;

bool isExpired() const override { return expired; }
bool hasReachedDeadline() const override { return false; }
int64_t getSessionID() const override { return 0; }
Poco::Net::SocketAddress getConnectedAddress() const override { return connected_zk_address; }


void create(
Expand Down Expand Up @@ -135,8 +135,6 @@ class TestKeeper final : public IKeeper

zkutil::ZooKeeperArgs args;

Poco::Net::SocketAddress connected_zk_address;

std::mutex push_request_mutex;
std::atomic<bool> expired{false};

Expand Down
26 changes: 6 additions & 20 deletions src/Common/ZooKeeper/ZooKeeper.cpp
Expand Up @@ -112,31 +112,17 @@ void ZooKeeper::init(ZooKeeperArgs args_)
throw KeeperException("Cannot use any of provided ZooKeeper nodes", Coordination::Error::ZCONNECTIONLOSS);
}

impl = std::make_unique<Coordination::ZooKeeper>(nodes, args, zk_log);
impl = std::make_unique<Coordination::ZooKeeper>(nodes, args, zk_log, [this](size_t node_idx, const Coordination::ZooKeeper::Node & node)
{
connected_zk_host = node.address.host().toString();
connected_zk_port = node.address.port();
connected_zk_index = node_idx;
});

if (args.chroot.empty())
LOG_TRACE(log, "Initialized, hosts: {}", fmt::join(args.hosts, ","));
else
LOG_TRACE(log, "Initialized, hosts: {}, chroot: {}", fmt::join(args.hosts, ","), args.chroot);

Poco::Net::SocketAddress address = impl->getConnectedAddress();

connected_zk_host = address.host().toString();
connected_zk_port = address.port();

connected_zk_index = 0;

if (args.hosts.size() > 1)
{
for (size_t i = 0; i < args.hosts.size(); i++)
{
if (args.hosts[i] == address.toString())
{
connected_zk_index = i;
break;
}
}
}
}
else if (args.implementation == "testkeeper")
{
Expand Down
1 change: 1 addition & 0 deletions src/Common/ZooKeeper/ZooKeeper.h
Expand Up @@ -521,6 +521,7 @@ class ZooKeeper
void setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_);

UInt32 getSessionUptime() const { return static_cast<UInt32>(session_uptime.elapsedSeconds()); }
bool hasReachedDeadline() const { return impl->hasReachedDeadline(); }

void setServerCompletelyStarted();

Expand Down
8 changes: 8 additions & 0 deletions src/Common/ZooKeeper/ZooKeeperArgs.cpp
Expand Up @@ -204,6 +204,14 @@ void ZooKeeperArgs::initFromKeeperSection(const Poco::Util::AbstractConfiguratio
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unknown load balancing: {}", load_balancing_str);
get_priority_load_balancing.load_balancing = *load_balancing;
}
else if (key == "fallback_session_lifetime")
{
fallback_session_lifetime = SessionLifetimeConfiguration
{
.min_sec = config.getUInt(config_name + "." + key + ".min"),
.max_sec = config.getUInt(config_name + "." + key + ".max"),
};
}
else
throw KeeperException(std::string("Unknown key ") + key + " in config file", Coordination::Error::ZBADARGUMENTS);
}
Expand Down
10 changes: 10 additions & 0 deletions src/Common/ZooKeeper/ZooKeeperArgs.h
Expand Up @@ -11,8 +11,17 @@ namespace Poco::Util
namespace zkutil
{

constexpr UInt32 ZK_MIN_FALLBACK_SESSION_DEADLINE_SEC = 3 * 60 * 60;
constexpr UInt32 ZK_MAX_FALLBACK_SESSION_DEADLINE_SEC = 6 * 60 * 60;

struct ZooKeeperArgs
{
struct SessionLifetimeConfiguration
{
UInt32 min_sec = ZK_MIN_FALLBACK_SESSION_DEADLINE_SEC;
UInt32 max_sec = ZK_MAX_FALLBACK_SESSION_DEADLINE_SEC;
bool operator == (const SessionLifetimeConfiguration &) const = default;
};
ZooKeeperArgs(const Poco::Util::AbstractConfiguration & config, const String & config_name);

/// hosts_string -- comma separated [secure://]host:port list
Expand All @@ -36,6 +45,7 @@ struct ZooKeeperArgs
UInt64 send_sleep_ms = 0;
UInt64 recv_sleep_ms = 0;

SessionLifetimeConfiguration fallback_session_lifetime = {};
DB::GetPriorityForLoadBalancing get_priority_load_balancing;

private:
Expand Down
40 changes: 34 additions & 6 deletions src/Common/ZooKeeper/ZooKeeperImpl.cpp
Expand Up @@ -313,8 +313,8 @@ ZooKeeper::~ZooKeeper()
ZooKeeper::ZooKeeper(
const Nodes & nodes,
const zkutil::ZooKeeperArgs & args_,
std::shared_ptr<ZooKeeperLog> zk_log_)
: args(args_)
std::shared_ptr<ZooKeeperLog> zk_log_, std::optional<ConnectedCallback> && connected_callback_)
: args(args_), connected_callback(std::move(connected_callback_))
{
log = &Poco::Logger::get("ZooKeeperClient");
std::atomic_store(&zk_log, std::move(zk_log_));
Expand Down Expand Up @@ -395,8 +395,9 @@ void ZooKeeper::connect(
WriteBufferFromOwnString fail_reasons;
for (size_t try_no = 0; try_no < num_tries; ++try_no)
{
for (const auto & node : nodes)
for (size_t i = 0; i < nodes.size(); ++i)
{
const auto & node = nodes[i];
try
{
/// Reset the state of previous attempt.
Expand Down Expand Up @@ -443,9 +444,25 @@ void ZooKeeper::connect(
e.addMessage("while receiving handshake from ZooKeeper");
throw;
}

connected = true;
connected_zk_address = node.address;

if (connected_callback.has_value())
(*connected_callback)(i, node);

if (i != 0)
{
std::uniform_int_distribution<UInt32> fallback_session_lifetime_distribution
{
args.fallback_session_lifetime.min_sec,
args.fallback_session_lifetime.max_sec,
};
UInt32 session_lifetime_seconds = fallback_session_lifetime_distribution(thread_local_rng);
client_session_deadline = clock::now() + std::chrono::seconds(session_lifetime_seconds);

LOG_DEBUG(log, "Connected to a suboptimal ZooKeeper host ({}, index {})."
" To preserve balance in ZooKeeper usage, this ZooKeeper session will expire in {} seconds",
node.address.toString(), i, session_lifetime_seconds);
}

break;
}
Expand All @@ -462,7 +479,6 @@ void ZooKeeper::connect(
if (!connected)
{
WriteBufferFromOwnString message;
connected_zk_address = Poco::Net::SocketAddress();

message << "All connection tries failed while connecting to ZooKeeper. nodes: ";
bool first = true;
Expand Down Expand Up @@ -1060,6 +1076,7 @@ void ZooKeeper::pushRequest(RequestInfo && info)
{
try
{
checkSessionDeadline();
info.time = clock::now();
if (zk_log)
{
Expand Down Expand Up @@ -1482,6 +1499,17 @@ void ZooKeeper::setupFaultDistributions()
inject_setup.test_and_set();
}

void ZooKeeper::checkSessionDeadline() const
{
if (unlikely(hasReachedDeadline()))
throw Exception(Error::ZSESSIONEXPIRED, "Session expired (force expiry client-side)");
}

bool ZooKeeper::hasReachedDeadline() const
{
return client_session_deadline.has_value() && clock::now() >= client_session_deadline.value();
}

void ZooKeeper::maybeInjectSendFault()
{
if (unlikely(inject_setup.test() && send_inject_fault && send_inject_fault.value()(thread_local_rng)))
Expand Down
15 changes: 11 additions & 4 deletions src/Common/ZooKeeper/ZooKeeperImpl.h
Expand Up @@ -107,6 +107,7 @@ class ZooKeeper final : public IKeeper
};

using Nodes = std::vector<Node>;
using ConnectedCallback = std::function<void(size_t, const Node&)>;

/** Connection to nodes is performed in order. If you want, shuffle them manually.
* Operation timeout couldn't be greater than session timeout.
Expand All @@ -115,19 +116,22 @@ class ZooKeeper final : public IKeeper
ZooKeeper(
const Nodes & nodes,
const zkutil::ZooKeeperArgs & args_,
std::shared_ptr<ZooKeeperLog> zk_log_);
std::shared_ptr<ZooKeeperLog> zk_log_,
std::optional<ConnectedCallback> && connected_callback_ = {});

~ZooKeeper() override;


/// If expired, you can only destroy the object. All other methods will throw exception.
bool isExpired() const override { return requests_queue.isFinished(); }

/// A ZooKeeper session can have an optional deadline set on it.
/// After it has been reached, the session needs to be finalized.
bool hasReachedDeadline() const override;

/// Useful to check owner of ephemeral node.
int64_t getSessionID() const override { return session_id; }

Poco::Net::SocketAddress getConnectedAddress() const override { return connected_zk_address; }

void executeGenericRequest(
const ZooKeeperRequestPtr & request,
ResponseCallback callback);
Expand Down Expand Up @@ -213,9 +217,9 @@ class ZooKeeper final : public IKeeper

private:
ACLs default_acls;
Poco::Net::SocketAddress connected_zk_address;

zkutil::ZooKeeperArgs args;
std::optional<ConnectedCallback> connected_callback = {};

/// Fault injection
void maybeInjectSendFault();
Expand Down Expand Up @@ -252,6 +256,7 @@ class ZooKeeper final : public IKeeper
clock::time_point time;
};

std::optional<clock::time_point> client_session_deadline {};
using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>;

RequestsQueue requests_queue{1024};
Expand Down Expand Up @@ -324,6 +329,8 @@ class ZooKeeper final : public IKeeper

void initFeatureFlags();

void checkSessionDeadline() const;

CurrentMetrics::Increment active_session_metric_increment{CurrentMetrics::ZooKeeperSession};
std::shared_ptr<ZooKeeperLog> zk_log;

Expand Down
5 changes: 4 additions & 1 deletion src/Interpreters/Context.cpp
Expand Up @@ -2711,7 +2711,10 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
const auto & config = shared->zookeeper_config ? *shared->zookeeper_config : getConfigRef();
if (!shared->zookeeper)
shared->zookeeper = std::make_shared<zkutil::ZooKeeper>(config, zkutil::getZooKeeperConfigName(config), getZooKeeperLog());
else if (shared->zookeeper->expired())
else if (shared->zookeeper->hasReachedDeadline())
shared->zookeeper->finalize("ZooKeeper session has reached its deadline");

if (shared->zookeeper->expired())
{
Stopwatch watch;
LOG_DEBUG(shared->log, "Trying to establish a new connection with ZooKeeper");
Expand Down
1 change: 1 addition & 0 deletions tests/integration/parallel_skip.json
Expand Up @@ -48,6 +48,7 @@
"test_system_metrics/test.py::test_readonly_metrics",
"test_system_replicated_fetches/test.py::test_system_replicated_fetches",
"test_zookeeper_config_load_balancing/test.py::test_round_robin",
"test_zookeeper_fallback_session/test.py::test_fallback_session",

"test_global_overcommit_tracker/test.py::test_global_overcommit",

Expand Down
Empty file.
@@ -0,0 +1,23 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>

<replica>
<host>node2</host>
<port>9000</port>
</replica>

<replica>
<host>node3</host>
<port>9000</port>
</replica>

</shard>
</test_cluster>
</remote_servers>
</clickhouse>
@@ -0,0 +1,23 @@
<clickhouse>
<zookeeper>
<!--<zookeeper_load_balancing> random / in_order / nearest_hostname / first_or_random / round_robin </zookeeper_load_balancing>-->
<zookeeper_load_balancing>in_order</zookeeper_load_balancing>
<fallback_session_lifetime>
<min>2</min>
<max>4</max>
</fallback_session_lifetime>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
</node>
<node index="3">
<host>zoo3</host>
<port>2181</port>
</node>
<session_timeout_ms>500</session_timeout_ms>
</zookeeper>
</clickhouse>

0 comments on commit 01f05e1

Please sign in to comment.