Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure keep alive is completed in time #4349

Merged
merged 9 commits into from
Jul 17, 2024
86 changes: 65 additions & 21 deletions src/meta-srv/src/election/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ use std::time::Duration;

use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
use common_telemetry::{error, info, warn};
use etcd_client::{Client, GetOptions, PutOptions};
use etcd_client::{Client, GetOptions, LeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions};
use snafu::{OptionExt, ResultExt};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
use tokio::time::timeout;

use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY};
use crate::error;
Expand Down Expand Up @@ -235,34 +236,32 @@ impl Election for EtcdElection {
tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS));
loop {
let _ = keep_alive_interval.tick().await;
keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;

if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
if res.ttl() > 0 {
// Only after a successful `keep_alive` is the leader considered official.
if self
.is_leader
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.infancy.store(true, Ordering::Relaxed);

if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::Elected(Arc::new(leader.clone())))
{
error!(e; "Failed to send leader change message");
}
}
} else {
// The keep alive operation MUST be done in `META_KEEP_ALIVE_INTERVAL_SECS`.
match timeout(
Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS),
self.keep_alive(&mut keeper, &mut receiver, leader),
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
)
.await
{
Ok(Ok(())) => {
// Do nothing
}
Ok(Err(err)) => {
error!(err; "Failed to keep alive");
break;
}
Err(_) => {
error!("Refresh lease timeout");
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
if self.is_leader.load(Ordering::Relaxed) {
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::StepDown(Arc::new(leader.clone())))
{
error!("Failed to send leader change message, error: {e}");
error!(e; "Failed to send leader change message");
}
}

break;
}
}
Expand Down Expand Up @@ -297,3 +296,48 @@ impl Election for EtcdElection {
self.leader_watcher.subscribe()
}
}

impl EtcdElection {
async fn keep_alive(
&self,
keeper: &mut LeaseKeeper,
receiver: &mut LeaseKeepAliveStream,
leader: &LeaderKey,
) -> Result<()> {
keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
if res.ttl() > 0 {
// Only after a successful `keep_alive` is the leader considered official.
if self
.is_leader
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.infancy.store(true, Ordering::Relaxed);

if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::Elected(Arc::new(leader.clone())))
{
error!(e; "Failed to send leader change message");
}
}
} else {
if self.is_leader.load(Ordering::Relaxed) {
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::StepDown(Arc::new(leader.clone())))
{
error!(e; "Failed to send leader change message");
}
}
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
return error::UnexpectedSnafu {
violated: "Failed to refresh the lease",
}
.fail();
}
}

Ok(())
}
}
Loading