Skip to content

Commit

Permalink
raftstore: Remove future_poller pool and batch Ticks (tikv#8457)
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
  • Loading branch information
Little-Wallace committed Sep 9, 2020
1 parent 5496d07 commit 03c3ec6
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 93 deletions.
84 changes: 27 additions & 57 deletions components/raftstore/src/store/fsm/peer.rs
Expand Up @@ -3,14 +3,13 @@
use std::borrow::Cow;
use std::collections::Bound::{Excluded, Included, Unbounded};
use std::collections::VecDeque;
use std::time::{Duration, Instant};
use std::time::Instant;
use std::{cmp, u64};

use batch_system::{BasicMailbox, Fsm};
use engine_traits::CF_RAFT;
use engine_traits::{Engines, KvEngine, RaftEngine, WriteBatchExt};
use error_code::ErrorCodeExt;
use futures::compat::Future01CompatExt;
use kvproto::errorpb;
use kvproto::import_sstpb::SstMeta;
use kvproto::metapb::{self, Region, RegionEpoch};
Expand Down Expand Up @@ -911,17 +910,18 @@ where
}

#[inline]
fn schedule_tick(&mut self, tick: PeerTicks, timeout: Duration) {
fn schedule_tick(&mut self, tick: PeerTicks) {
if self.fsm.tick_registry.contains(tick) {
return;
}
if is_zero_duration(&timeout) {
let idx = tick.bits() as usize;
if is_zero_duration(&self.ctx.tick_batch[idx].wait_duration) {
return;
}
trace!(
"schedule tick";
"tick" => ?tick,
"timeout" => ?timeout,
"timeout" => ?self.ctx.tick_batch[idx].wait_duration,
"region_id" => self.region_id(),
"peer_id" => self.fsm.peer_id(),
);
Expand All @@ -942,43 +942,27 @@ where
}
};
let peer_id = self.fsm.peer.peer_id();
let delay = self.ctx.timer.delay(timeout).compat();
let f = async move {
match delay.await {
Ok(_) => {
fail_point!(
"on_raft_log_gc_tick_1",
peer_id == 1 && tick == PeerTicks::RAFT_LOG_GC,
|_| unreachable!()
);
// This can happen only when the peer is about to be destroyed
// or the node is shutting down. So it's OK to not to clean up
// registry.
if let Err(e) = mb.force_send(PeerMsg::Tick(tick)) {
debug!(
"failed to schedule peer tick";
"region_id" => region_id,
"peer_id" => peer_id,
"tick" => ?tick,
"err" => %e,
);
}
}
Err(e) => {
panic!(
"[region {}] {} tick {:?} is lost due to timeout error: {:?}",
region_id, peer_id, tick, e
);
}
let cb = Box::new(move || {
// This can happen only when the peer is about to be destroyed
// or the node is shutting down. So it's OK to not to clean up
// registry.
if let Err(e) = mb.force_send(PeerMsg::Tick(tick)) {
debug!(
"failed to schedule peer tick";
"region_id" => region_id,
"peer_id" => peer_id,
"tick" => ?tick,
"err" => %e,
);
}
};
self.ctx.poller_handle.spawn(f);
});
self.ctx.tick_batch[idx].ticks.push(cb);
}

fn register_raft_base_tick(&mut self) {
// If we register raft base tick failed, the whole raft can't run correctly,
// TODO: shutdown the store?
self.schedule_tick(PeerTicks::RAFT, self.ctx.cfg.raft_base_tick_interval.0)
self.schedule_tick(PeerTicks::RAFT)
}

fn on_raft_base_tick(&mut self) {
Expand Down Expand Up @@ -2169,10 +2153,7 @@ where
}

fn register_merge_check_tick(&mut self) {
self.schedule_tick(
PeerTicks::CHECK_MERGE,
self.ctx.cfg.merge_check_tick_interval.0,
)
self.schedule_tick(PeerTicks::CHECK_MERGE)
}

/// Check if merge target region is staler than the local one in kv engine.
Expand Down Expand Up @@ -3131,19 +3112,17 @@ where
}

fn register_raft_gc_log_tick(&mut self) {
self.schedule_tick(
PeerTicks::RAFT_LOG_GC,
self.ctx.cfg.raft_log_gc_tick_interval.0,
)
self.schedule_tick(PeerTicks::RAFT_LOG_GC)
}

#[allow(clippy::if_same_then_else)]
fn on_raft_gc_log_tick(&mut self, force_compact: bool) {
if !self.fsm.peer.get_store().is_cache_empty() || !self.ctx.cfg.hibernate_regions {
self.register_raft_gc_log_tick();
}
debug_assert!(!self.fsm.stopped);
fail_point!("on_raft_log_gc_tick_1", self.fsm.peer_id() == 1, |_| {});
fail_point!("on_raft_gc_log_tick", |_| {});
debug_assert!(!self.fsm.stopped);

// As leader, we would not keep caches for the peers that didn't response heartbeat in the
// last few seconds. That happens probably because another TiKV is down. In this case if we
Expand Down Expand Up @@ -3253,10 +3232,7 @@ where
}

fn register_split_region_check_tick(&mut self) {
self.schedule_tick(
PeerTicks::SPLIT_REGION_CHECK,
self.ctx.cfg.split_region_check_tick_interval.0,
)
self.schedule_tick(PeerTicks::SPLIT_REGION_CHECK)
}

#[inline]
Expand Down Expand Up @@ -3494,10 +3470,7 @@ where
}

fn register_pd_heartbeat_tick(&mut self) {
self.schedule_tick(
PeerTicks::PD_HEARTBEAT,
self.ctx.cfg.pd_heartbeat_tick_interval.0,
)
self.schedule_tick(PeerTicks::PD_HEARTBEAT)
}

fn on_check_peer_stale_state_tick(&mut self) {
Expand Down Expand Up @@ -3595,10 +3568,7 @@ where
}

fn register_check_peer_stale_state_tick(&mut self) {
self.schedule_tick(
PeerTicks::CHECK_PEER_STALE_STATE,
self.ctx.cfg.peer_stale_state_check_interval.0,
)
self.schedule_tick(PeerTicks::CHECK_PEER_STALE_STATE)
}
}

Expand Down

0 comments on commit 03c3ec6

Please sign in to comment.