Skip to content

Commit

Permalink
fix: dn doesn't have chance to send a heartbeat to the new leader (#2471
Browse files Browse the repository at this point in the history
)

* refactor: set meta leader lease secs to 3s

* fix: correct default heartbeat interval

* refactor: ask meta leader in parallel

* feat: configure heartbeat client timeout to 500ms

* fix: trigger to send heartbeat immediately after fail

* fix: fix clippy
  • Loading branch information
WenyXu committed Sep 26, 2023
1 parent 54e506a commit 230a302
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 29 deletions.
6 changes: 4 additions & 2 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ rpc_runtime_size = 8
require_lease_before_startup = false

[heartbeat]
# Interval for sending heartbeat messages to the Metasrv in milliseconds, 5000 by default.
interval_millis = 5000
# Interval for sending heartbeat messages to the Metasrv in milliseconds, 3000 by default.
interval_millis = 3000

# Metasrv client options.
[meta_client]
# Metasrv address list.
metasrv_addrs = ["127.0.0.1:3002"]
# Heartbeat timeout in milliseconds, 500 by default.
heartbeat_timeout_millis = 500
# Operation timeout in milliseconds, 3000 by default.
timeout_millis = 3000
# Connect server timeout in milliseconds, 5000 by default.
Expand Down
1 change: 1 addition & 0 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ mod tests {
connect_timeout_millis,
tcp_nodelay,
ddl_timeout_millis,
..
} = options.meta_client.unwrap();

assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr);
Expand Down
6 changes: 6 additions & 0 deletions src/common/meta/src/distributed_time_constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,9 @@ pub const REGION_LEASE_SECS: u64 =
/// When creating table or region failover, a target node needs to be selected.
/// If the node's lease has expired, the `Selector` will not select it.
pub const DATANODE_LEASE_SECS: u64 = REGION_LEASE_SECS;

/// The lease seconds of metasrv leader.
pub const META_LEASE_SECS: u64 = 3;

// In a lease, there are two opportunities for renewal.
pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2;
2 changes: 2 additions & 0 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ impl HeartbeatTask {
Ok(new_tx) => {
info!("Reconnected to metasrv");
tx = new_tx;
// Triggers to send heartbeat immediately.
sleep.as_mut().reset(Instant::now());
}
Err(e) => {
error!(e;"Failed to reconnect to metasrv!");
Expand Down
11 changes: 10 additions & 1 deletion src/meta-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub struct MetaClientBuilder {
enable_ddl: bool,
channel_manager: Option<ChannelManager>,
ddl_channel_manager: Option<ChannelManager>,
heartbeat_channel_manager: Option<ChannelManager>,
}

impl MetaClientBuilder {
Expand Down Expand Up @@ -122,6 +123,13 @@ impl MetaClientBuilder {
}
}

pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self {
Self {
heartbeat_channel_manager: Some(channel_manager),
..self
}
}

pub fn build(self) -> MetaClient {
let mut client = if let Some(mgr) = self.channel_manager {
MetaClient::with_channel_manager(self.id, mgr)
Expand All @@ -136,10 +144,11 @@ impl MetaClientBuilder {
let mgr = client.channel_manager.clone();

if self.enable_heartbeat {
let mgr = self.heartbeat_channel_manager.unwrap_or(mgr.clone());
client.heartbeat = Some(HeartbeatClient::new(
self.id,
self.role,
mgr.clone(),
mgr,
DEFAULT_ASK_LEADER_MAX_RETRY,
));
}
Expand Down
54 changes: 36 additions & 18 deletions src/meta-client/src/client/ask_leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
// limitations under the License.

use std::sync::{Arc, RwLock};
use std::time::Duration;

use api::v1::meta::heartbeat_client::HeartbeatClient;
use api::v1::meta::{AskLeaderRequest, RequestHeader, Role};
use common_grpc::channel_manager::ChannelManager;
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
use common_telemetry::warn;
use rand::seq::SliceRandom;
use snafu::{OptionExt, ResultExt};
use tokio::time::timeout;
use tonic::transport::Channel;

use crate::client::Id;
Expand Down Expand Up @@ -73,29 +76,44 @@ impl AskLeader {
};
peers.shuffle(&mut rand::thread_rng());

let header = RequestHeader::new(self.id, self.role);
let mut leader = None;
let req = AskLeaderRequest {
header: Some(RequestHeader::new(self.id, self.role)),
};

let (tx, mut rx) = tokio::sync::mpsc::channel(peers.len());

for addr in &peers {
let req = AskLeaderRequest {
header: Some(header.clone()),
};
let mut client = self.create_asker(addr)?;
match client.ask_leader(req).await {
Ok(res) => {
let Some(endpoint) = res.into_inner().leader else {
warn!("No leader from: {addr}");
continue;
};
leader = Some(endpoint.addr);
break;
let tx_clone = tx.clone();
let req = req.clone();
let addr = addr.to_string();
tokio::spawn(async move {
match client.ask_leader(req).await {
Ok(res) => {
if let Some(endpoint) = res.into_inner().leader {
let _ = tx_clone.send(endpoint.addr).await;
} else {
warn!("No leader from: {addr}");
};
}
Err(status) => {
warn!("Failed to ask leader from: {addr}, {status}");
}
}
Err(status) => {
warn!("Failed to ask leader from: {addr}, {status}");
}
}
});
}

let leader = leader.context(error::NoLeaderSnafu)?;
let leader = timeout(
self.channel_manager
.config()
.timeout
.unwrap_or(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS)),
rx.recv(),
)
.await
.context(error::AskLeaderTimeoutSnafu)?
.context(error::NoLeaderSnafu)?;

let mut leadership_group = self.leadership_group.write().unwrap();
leadership_group.leader = Some(leader.clone());

Expand Down
7 changes: 7 additions & 0 deletions src/meta-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ pub enum Error {
#[snafu(display("No leader, should ask leader first"))]
NoLeader { location: Location },

#[snafu(display("Ask leader timeout"))]
AskLeaderTimeout {
location: Location,
source: tokio::time::error::Elapsed,
},

#[snafu(display("Failed to create gRPC channel"))]
CreateChannel {
location: Location,
Expand Down Expand Up @@ -83,6 +89,7 @@ impl ErrorExt for Error {
Error::IllegalGrpcClientState { .. }
| Error::AskLeader { .. }
| Error::NoLeader { .. }
| Error::AskLeaderTimeout { .. }
| Error::NotStarted { .. }
| Error::SendHeartbeat { .. }
| Error::CreateHeartbeatStream { .. }
Expand Down
7 changes: 7 additions & 0 deletions src/meta-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,18 @@ pub mod error;
pub struct MetaClientOptions {
pub metasrv_addrs: Vec<String>,
pub timeout_millis: u64,
#[serde(default = "default_heartbeat_timeout_millis")]
pub heartbeat_timeout_millis: u64,
#[serde(default = "default_ddl_timeout_millis")]
pub ddl_timeout_millis: u64,
pub connect_timeout_millis: u64,
pub tcp_nodelay: bool,
}

fn default_heartbeat_timeout_millis() -> u64 {
500u64
}

fn default_ddl_timeout_millis() -> u64 {
10_000u64
}
Expand All @@ -37,6 +43,7 @@ impl Default for MetaClientOptions {
Self {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout_millis: 3_000u64,
heartbeat_timeout_millis: default_heartbeat_timeout_millis(),
ddl_timeout_millis: default_ddl_timeout_millis(),
connect_timeout_millis: 5_000u64,
tcp_nodelay: true,
Expand Down
3 changes: 0 additions & 3 deletions src/meta-srv/src/election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ use tokio::sync::broadcast::Receiver;

use crate::error::Result;

pub const LEASE_SECS: i64 = 5;
// In a lease, there are two opportunities for renewal.
pub const KEEP_ALIVE_PERIOD_SECS: u64 = LEASE_SECS as u64 / 2;
pub const ELECTION_KEY: &str = "__meta_srv_election";

#[derive(Debug, Clone)]
Expand Down
9 changes: 4 additions & 5 deletions src/meta-srv/src/election/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
use common_telemetry::{error, info, warn};
use etcd_client::Client;
use snafu::{OptionExt, ResultExt};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;

use crate::election::{
Election, LeaderChangeMessage, ELECTION_KEY, KEEP_ALIVE_PERIOD_SECS, LEASE_SECS,
};
use crate::election::{Election, LeaderChangeMessage, ELECTION_KEY};
use crate::error;
use crate::error::Result;
use crate::metasrv::{ElectionRef, LeaderValue};
Expand Down Expand Up @@ -114,7 +113,7 @@ impl Election for EtcdElection {
let mut lease_client = self.client.lease_client();
let mut election_client = self.client.election_client();
let res = lease_client
.grant(LEASE_SECS, None)
.grant(META_LEASE_SECS as i64, None)
.await
.context(error::EtcdFailedSnafu)?;
let lease_id = res.id();
Expand All @@ -140,7 +139,7 @@ impl Election for EtcdElection {
.context(error::EtcdFailedSnafu)?;

let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(KEEP_ALIVE_PERIOD_SECS));
tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS));
loop {
let _ = keep_alive_interval.tick().await;
keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
Expand Down

0 comments on commit 230a302

Please sign in to comment.