Skip to content

Commit

Permalink
trace peers' availability info on leader side (tikv#13209)
Browse files Browse the repository at this point in the history
ref tikv#12876

Signed-off-by: Wenbo Zhang <ethercflow@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
ethercflow and ti-chi-bot committed Oct 12, 2022
1 parent 6061d42 commit 45afd61
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions components/raftstore/src/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,12 @@ pub struct Config {
pub max_snapshot_file_raw_size: ReadableSize,

pub unreachable_backoff: ReadableDuration,

#[doc(hidden)]
#[serde(skip_serializing)]
#[online_config(hidden)]
// Interval to check peers availability info.
pub check_peers_availability_interval: ReadableDuration,
}

impl Default for Config {
Expand Down Expand Up @@ -407,6 +413,8 @@ impl Default for Config {
report_region_buckets_tick_interval: ReadableDuration::secs(10),
max_snapshot_file_raw_size: ReadableSize::mb(100),
unreachable_backoff: ReadableDuration::secs(10),
// TODO: make its value reasonable
check_peers_availability_interval: ReadableDuration::secs(30),
}
}
}
Expand Down
65 changes: 65 additions & 0 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,7 @@ where
PeerTick::ReactivateMemoryLock => self.on_reactivate_memory_lock_tick(),
PeerTick::ReportBuckets => self.on_report_region_buckets_tick(),
PeerTick::CheckLongUncommitted => self.on_check_long_uncommitted_tick(),
PeerTick::CheckPeersAvailability => self.on_check_peers_availability(),
}
}

Expand Down Expand Up @@ -2638,6 +2639,42 @@ where
self.fsm.hibernate_state.count_vote(from.get_id());
}

fn on_availability_response(&mut self, from: &metapb::Peer, msg: &ExtraMessage) {
if !self.fsm.peer.is_leader() {
return;
}
if !msg.wait_data {
self.fsm
.peer
.wait_data_peers
.retain(|id| *id != from.get_id());
debug!(
"receive peer ready info";
"peer_id" => self.fsm.peer.peer.get_id(),
);
return;
}
self.register_check_peers_availability_tick();
}

fn on_availability_request(&mut self, from: &metapb::Peer) {
if self.fsm.peer.is_leader() {
return;
}
let mut resp = ExtraMessage::default();
resp.set_type(ExtraMessageType::MsgAvailabilityResponse);
resp.wait_data = self.fsm.peer.wait_data;
self.fsm
.peer
.send_extra_message(resp, &mut self.ctx.trans, from);
debug!(
"peer responses availability info to leader";
"region_id" => self.region().get_id(),
"peer_id" => self.fsm.peer.peer.get_id(),
"leader_id" => from.id,
);
}

fn on_extra_message(&mut self, mut msg: RaftMessage) {
match msg.get_extra_msg().get_type() {
ExtraMessageType::MsgRegionWakeUp | ExtraMessageType::MsgCheckStalePeer => {
Expand Down Expand Up @@ -2671,6 +2708,12 @@ where
ExtraMessageType::MsgRejectRaftLogCausedByMemoryUsage => {
unimplemented!()
}
ExtraMessageType::MsgAvailabilityRequest => {
self.on_availability_request(msg.get_from_peer());
}
ExtraMessageType::MsgAvailabilityResponse => {
self.on_availability_response(msg.get_from_peer(), msg.get_extra_msg());
}
}
}

Expand Down Expand Up @@ -3220,6 +3263,7 @@ where
);
} else {
self.fsm.peer.transfer_leader(&from);
self.fsm.peer.wait_data_peers.clear();
}
}
}
Expand Down Expand Up @@ -3693,6 +3737,7 @@ where
.peer
.peers_start_pending_time
.retain(|&(p, _)| p != peer_id);
self.fsm.peer.wait_data_peers.retain(|id| *id != peer_id);
}
self.fsm.peer.remove_peer_from_cache(peer_id);
// We only care remove itself now.
Expand Down Expand Up @@ -5920,6 +5965,26 @@ where
self.schedule_tick(PeerTick::PdHeartbeat)
}

fn register_check_peers_availability_tick(&mut self) {
fail_point!("ignore schedule check peers availability tick", |_| {});
self.schedule_tick(PeerTick::CheckPeersAvailability)
}

fn on_check_peers_availability(&mut self) {
for peer_id in self.fsm.peer.wait_data_peers.iter() {
let peer = self.fsm.peer.get_peer_from_cache(*peer_id).unwrap();
let mut msg = ExtraMessage::default();
msg.set_type(ExtraMessageType::MsgAvailabilityRequest);
self.fsm
.peer
.send_extra_message(msg, &mut self.ctx.trans, &peer);
debug!(
"check peer availability";
"target peer id" => *peer_id,
);
}
}

fn on_check_peer_stale_state_tick(&mut self) {
if self.fsm.peer.pending_remove {
return;
Expand Down
2 changes: 2 additions & 0 deletions components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,8 @@ where
self.cfg.report_region_buckets_tick_interval.0;
self.tick_batch[PeerTick::CheckLongUncommitted as usize].wait_duration =
self.cfg.check_long_uncommitted_interval.0;
self.tick_batch[PeerTick::CheckPeersAvailability as usize].wait_duration =
self.cfg.check_peers_availability_interval.0;
}
}

Expand Down
3 changes: 3 additions & 0 deletions components/raftstore/src/store/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ pub enum PeerTick {
ReactivateMemoryLock = 8,
ReportBuckets = 9,
CheckLongUncommitted = 10,
CheckPeersAvailability = 11,
}

impl PeerTick {
Expand All @@ -395,6 +396,7 @@ impl PeerTick {
PeerTick::ReactivateMemoryLock => "reactivate_memory_lock",
PeerTick::ReportBuckets => "report_buckets",
PeerTick::CheckLongUncommitted => "check_long_uncommitted",
PeerTick::CheckPeersAvailability => "check_peers_availability",
}
}

Expand All @@ -411,6 +413,7 @@ impl PeerTick {
PeerTick::ReactivateMemoryLock,
PeerTick::ReportBuckets,
PeerTick::CheckLongUncommitted,
PeerTick::CheckPeersAvailability,
];
TICKS
}
Expand Down
37 changes: 37 additions & 0 deletions components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,8 @@ where
peer_cache: RefCell<HashMap<u64, metapb::Peer>>,
/// Record the last instant of each peer's heartbeat response.
pub peer_heartbeats: HashMap<u64, Instant>,
/// Record the waiting data status of each follower or learner peer.
pub wait_data_peers: Vec<u64>,

proposals: ProposalQueue<Callback<EK::Snapshot>>,
leader_missing_time: Option<Instant>,
Expand All @@ -951,6 +953,13 @@ where
/// target peer.
/// - all read requests must be rejected.
pub pending_remove: bool,
/// Currently it's used to indicate whether the witness -> non-witess
/// convertion operation is complete. The meaning of completion is that
/// this peer must contain the applied data, then PD can consider that
/// the conversion operation is complete, and can continue to schedule
/// other operators to prevent the existence of multiple witnesses in
/// the same time period.
pub wait_data: bool,

/// Force leader state is only used in online recovery when the majority of
/// peers are missing. In this state, it forces one peer to become leader
Expand Down Expand Up @@ -1157,6 +1166,7 @@ where
long_uncommitted_threshold: cfg.long_uncommitted_base_threshold.0,
peer_cache: RefCell::new(HashMap::default()),
peer_heartbeats: HashMap::default(),
wait_data_peers: Vec::default(),
peers_start_pending_time: vec![],
down_peer_ids: vec![],
size_diff_hint: 0,
Expand All @@ -1167,6 +1177,7 @@ where
compaction_declined_bytes: 0,
leader_unreachable: false,
pending_remove: false,
wait_data: false,
should_wake_up: false,
force_leader: None,
pending_merge_state: None,
Expand Down Expand Up @@ -2056,6 +2067,7 @@ where
if !self.is_leader() {
self.peer_heartbeats.clear();
self.peers_start_pending_time.clear();
self.wait_data_peers.clear();
return;
}

Expand Down Expand Up @@ -2616,6 +2628,7 @@ where
// Update apply index to `last_applying_idx`
self.read_progress
.update_applied(self.last_applying_idx, &ctx.coprocessor_host);
self.notify_leader_the_peer_is_available(ctx);
}
CheckApplyingSnapStatus::Idle => {
// FIXME: It's possible that the snapshot applying task is canceled.
Expand All @@ -2632,6 +2645,29 @@ where
true
}

fn notify_leader_the_peer_is_available<T: Transport>(
&mut self,
ctx: &mut PollContext<EK, ER, T>,
) {
if self.wait_data {
self.wait_data = false;
fail_point!("ignore notify leader the peer is available", |_| {});
let leader_id = self.leader_id();
let leader = self.get_peer_from_cache(leader_id);
if let Some(leader) = leader {
let mut msg = ExtraMessage::default();
msg.set_type(ExtraMessageType::MsgAvailabilityResponse);
msg.wait_data = false;
self.send_extra_message(msg, &mut ctx.trans, &leader);
info!(
"notify leader the leader is available";
"region id" => self.region().get_id(),
"peer id" => self.peer.id
);
}
}
}

pub fn handle_raft_ready_append<T: Transport>(
&mut self,
ctx: &mut PollContext<EK, ER, T>,
Expand Down Expand Up @@ -5422,6 +5458,7 @@ where
approximate_size: self.approximate_size,
approximate_keys: self.approximate_keys,
replication_status: self.region_replication_status(),
wait_data_peers: self.wait_data_peers.clone(),
});
if let Err(e) = ctx.pd_scheduler.schedule(task) {
error!(
Expand Down
1 change: 1 addition & 0 deletions components/raftstore/src/store/worker/pd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ pub struct HeartbeatTask {
pub approximate_size: Option<u64>,
pub approximate_keys: Option<u64>,
pub replication_status: Option<RegionReplicationStatus>,
pub wait_data_peers: Vec<u64>,
}

/// Uses an asynchronous thread to tell PD something.
Expand Down
1 change: 1 addition & 0 deletions src/server/service/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1375,6 +1375,7 @@ fn handle_batch_commands_request<
response_batch_commands_request(id, resp, tx.clone(), begin_instant, GrpcTypeKind::$metric_name, source);
})*
Some(batch_commands_request::request::Cmd::Import(_)) => unimplemented!(),
Some(batch_commands_request::request::Cmd::PrepareFlashbackToVersion(_)) => unimplemented!(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ fn test_serde_custom_tikv_config() {
long_uncommitted_base_threshold: ReadableDuration::secs(1),
max_snapshot_file_raw_size: ReadableSize::gb(10),
unreachable_backoff: ReadableDuration::secs(111),
check_peers_availability_interval: ReadableDuration::secs(30),
};
value.pd = PdConfig::new(vec!["example.com:443".to_owned()]);
let titan_cf_config = TitanCfConfig {
Expand Down

0 comments on commit 45afd61

Please sign in to comment.