From 03c3ec61d0be971f66f0268f249e2481cab18742 Mon Sep 17 00:00:00 2001 From: Wallace Date: Wed, 9 Sep 2020 23:26:35 +0800 Subject: [PATCH] raftstore: Remove future_poller pool and batch Ticks (#8457) Signed-off-by: Little-Wallace --- components/raftstore/src/store/fsm/peer.rs | 84 ++++++---------- components/raftstore/src/store/fsm/store.rs | 102 +++++++++++++------- components/raftstore/src/store/msg.rs | 11 +++ components/tikv_util/src/future.rs | 2 +- tests/failpoints/cases/test_conf_change.rs | 5 +- 5 files changed, 111 insertions(+), 93 deletions(-) diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index 62e291abd66..74a4eab2229 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -3,14 +3,13 @@ use std::borrow::Cow; use std::collections::Bound::{Excluded, Included, Unbounded}; use std::collections::VecDeque; -use std::time::{Duration, Instant}; +use std::time::Instant; use std::{cmp, u64}; use batch_system::{BasicMailbox, Fsm}; use engine_traits::CF_RAFT; use engine_traits::{Engines, KvEngine, RaftEngine, WriteBatchExt}; use error_code::ErrorCodeExt; -use futures::compat::Future01CompatExt; use kvproto::errorpb; use kvproto::import_sstpb::SstMeta; use kvproto::metapb::{self, Region, RegionEpoch}; @@ -911,17 +910,18 @@ where } #[inline] - fn schedule_tick(&mut self, tick: PeerTicks, timeout: Duration) { + fn schedule_tick(&mut self, tick: PeerTicks) { if self.fsm.tick_registry.contains(tick) { return; } - if is_zero_duration(&timeout) { + let idx = tick.bits() as usize; + if is_zero_duration(&self.ctx.tick_batch[idx].wait_duration) { return; } trace!( "schedule tick"; "tick" => ?tick, - "timeout" => ?timeout, + "timeout" => ?self.ctx.tick_batch[idx].wait_duration, "region_id" => self.region_id(), "peer_id" => self.fsm.peer_id(), ); @@ -942,43 +942,27 @@ where } }; let peer_id = self.fsm.peer.peer_id(); - let delay = self.ctx.timer.delay(timeout).compat(); - let f = async move { - match delay.await { - Ok(_) => { - fail_point!( - "on_raft_log_gc_tick_1", - peer_id == 1 && tick == PeerTicks::RAFT_LOG_GC, - |_| unreachable!() - ); - // This can happen only when the peer is about to be destroyed - // or the node is shutting down. So it's OK to not to clean up - // registry. - if let Err(e) = mb.force_send(PeerMsg::Tick(tick)) { - debug!( - "failed to schedule peer tick"; - "region_id" => region_id, - "peer_id" => peer_id, - "tick" => ?tick, - "err" => %e, - ); - } - } - Err(e) => { - panic!( - "[region {}] {} tick {:?} is lost due to timeout error: {:?}", - region_id, peer_id, tick, e - ); - } + let cb = Box::new(move || { + // This can happen only when the peer is about to be destroyed + // or the node is shutting down. So it's OK to not to clean up + // registry. + if let Err(e) = mb.force_send(PeerMsg::Tick(tick)) { + debug!( + "failed to schedule peer tick"; + "region_id" => region_id, + "peer_id" => peer_id, + "tick" => ?tick, + "err" => %e, + ); } - }; - self.ctx.poller_handle.spawn(f); + }); + self.ctx.tick_batch[idx].ticks.push(cb); } fn register_raft_base_tick(&mut self) { // If we register raft base tick failed, the whole raft can't run correctly, // TODO: shutdown the store? - self.schedule_tick(PeerTicks::RAFT, self.ctx.cfg.raft_base_tick_interval.0) + self.schedule_tick(PeerTicks::RAFT) } fn on_raft_base_tick(&mut self) { @@ -2169,10 +2153,7 @@ where } fn register_merge_check_tick(&mut self) { - self.schedule_tick( - PeerTicks::CHECK_MERGE, - self.ctx.cfg.merge_check_tick_interval.0, - ) + self.schedule_tick(PeerTicks::CHECK_MERGE) } /// Check if merge target region is staler than the local one in kv engine. @@ -3131,10 +3112,7 @@ where } fn register_raft_gc_log_tick(&mut self) { - self.schedule_tick( - PeerTicks::RAFT_LOG_GC, - self.ctx.cfg.raft_log_gc_tick_interval.0, - ) + self.schedule_tick(PeerTicks::RAFT_LOG_GC) } #[allow(clippy::if_same_then_else)] @@ -3142,8 +3120,9 @@ where if !self.fsm.peer.get_store().is_cache_empty() || !self.ctx.cfg.hibernate_regions { self.register_raft_gc_log_tick(); } - debug_assert!(!self.fsm.stopped); + fail_point!("on_raft_log_gc_tick_1", self.fsm.peer_id() == 1, |_| {}); fail_point!("on_raft_gc_log_tick", |_| {}); + debug_assert!(!self.fsm.stopped); // As leader, we would not keep caches for the peers that didn't response heartbeat in the // last few seconds. That happens probably because another TiKV is down. In this case if we @@ -3253,10 +3232,7 @@ where } fn register_split_region_check_tick(&mut self) { - self.schedule_tick( - PeerTicks::SPLIT_REGION_CHECK, - self.ctx.cfg.split_region_check_tick_interval.0, - ) + self.schedule_tick(PeerTicks::SPLIT_REGION_CHECK) } #[inline] @@ -3494,10 +3470,7 @@ where } fn register_pd_heartbeat_tick(&mut self) { - self.schedule_tick( - PeerTicks::PD_HEARTBEAT, - self.ctx.cfg.pd_heartbeat_tick_interval.0, - ) + self.schedule_tick(PeerTicks::PD_HEARTBEAT) } fn on_check_peer_stale_state_tick(&mut self) { @@ -3595,10 +3568,7 @@ where } fn register_check_peer_stale_state_tick(&mut self) { - self.schedule_tick( - PeerTicks::CHECK_PEER_STALE_STATE, - self.ctx.cfg.peer_stale_state_check_interval.0, - ) + self.schedule_tick(PeerTicks::CHECK_PEER_STALE_STATE) } } diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 000d3193a03..9958fd1e29a 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -15,6 +15,7 @@ use engine_rocks::{PerfContext, PerfLevel}; use engine_traits::{Engines, KvEngine, Mutable, WriteBatch, WriteBatchExt, WriteOptions}; use engine_traits::{CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE}; use futures::compat::Future01CompatExt; +use futures::FutureExt; use kvproto::import_sstpb::SstMeta; use kvproto::metapb::{self, Region, RegionEpoch}; use kvproto::pdpb::StoreStats; @@ -24,7 +25,6 @@ use kvproto::replication_modepb::{ReplicationMode, ReplicationStatus}; use protobuf::Message; use raft::{Ready, StateRole}; use time::{self, Timespec}; -use tokio::runtime::{self, Handle, Runtime}; use engine_rocks::CompactedEvent; use engine_traits::{RaftEngine, RaftLogBatch}; @@ -64,12 +64,14 @@ use crate::store::worker::{ RaftlogGcRunner, RaftlogGcTask, ReadDelegate, RegionRunner, RegionTask, SplitCheckTask, }; use crate::store::PdTask; +use crate::store::PeerTicks; use crate::store::{ util, Callback, CasualMessage, GlobalReplicationState, MergeResultKind, PeerMsg, RaftCommand, SignificantMsg, SnapManager, StoreMsg, StoreTick, }; use crate::Result; use concurrency_manager::ConcurrencyManager; +use tikv_util::future::poll_future_notify; type Key = Vec; @@ -267,6 +269,21 @@ where } } +#[derive(Default)] +pub struct PeerTickBatch { + pub ticks: Vec>, + pub wait_duration: Duration, +} + +impl Clone for PeerTickBatch { + fn clone(&self) -> PeerTickBatch { + PeerTickBatch { + ticks: vec![], + wait_duration: self.wait_duration, + } + } +} + pub struct PollContext where EK: KvEngine, @@ -295,7 +312,6 @@ where /// 1. lock the store_meta. /// 2. lock the pending_create_peers. pub pending_create_peers: Arc>>, - pub poller_handle: Handle, pub raft_metrics: RaftMetrics, pub snap_mgr: SnapManager, pub applying_snap_count: Arc, @@ -316,6 +332,7 @@ where pub need_flush_trans: bool, pub current_time: Option, pub perf_context_statistics: PerfContextStatistics, + pub tick_batch: Vec, pub node_start_time: Option, } @@ -373,6 +390,21 @@ where } timeout } + + pub fn update_ticks_timeout(&mut self) { + self.tick_batch[PeerTicks::RAFT.bits() as usize].wait_duration = + self.cfg.raft_base_tick_interval.0; + self.tick_batch[PeerTicks::RAFT_LOG_GC.bits() as usize].wait_duration = + self.cfg.raft_log_gc_tick_interval.0; + self.tick_batch[PeerTicks::PD_HEARTBEAT.bits() as usize].wait_duration = + self.cfg.pd_heartbeat_tick_interval.0; + self.tick_batch[PeerTicks::SPLIT_REGION_CHECK.bits() as usize].wait_duration = + self.cfg.split_region_check_tick_interval.0; + self.tick_batch[PeerTicks::CHECK_PEER_STALE_STATE.bits() as usize].wait_duration = + self.cfg.peer_stale_state_check_interval.0; + self.tick_batch[PeerTicks::CHECK_MERGE.bits() as usize].wait_duration = + self.cfg.merge_check_tick_interval.0; + } } impl PollContext @@ -384,24 +416,16 @@ where fn schedule_store_tick(&self, tick: StoreTick, timeout: Duration) { if !is_zero_duration(&timeout) { let mb = self.router.control_mailbox(); - let delay = self.timer.delay(timeout).compat(); - let f = async move { - match delay.await { - Ok(_) => { - if let Err(e) = mb.force_send(StoreMsg::Tick(tick)) { - info!( - "failed to schedule store tick, are we shutting down?"; - "tick" => ?tick, - "err" => ?e - ); - } - } - Err(e) => { - panic!("tick {:?} is lost due to timeout error: {:?}", tick, e); - } + let delay = self.timer.delay(timeout).compat().map(move |_| { + if let Err(e) = mb.force_send(StoreMsg::Tick(tick)) { + info!( + "failed to schedule store tick, are we shutting down?"; + "tick" => ?tick, + "err" => ?e + ); } - }; - self.poller_handle.spawn(f); + }); + poll_future_notify(delay); } } @@ -711,6 +735,27 @@ impl RaftPoller PollHandler, StoreFsm> @@ -742,6 +787,7 @@ impl PollHandler {} } self.poll_ctx.cfg = incoming.clone(); + self.poll_ctx.update_ticks_timeout(); } } @@ -813,6 +859,7 @@ impl PollHandler>]) { + self.flush_ticks(); if self.poll_ctx.has_ready { self.handle_raft_ready(peers); } @@ -847,7 +894,6 @@ pub struct RaftPollerBuilder { pub importer: Arc, pub store_meta: Arc>, pub pending_create_peers: Arc>>, - poller_handle: Handle, snap_mgr: SnapManager, pub coprocessor_host: CoprocessorHost, trans: T, @@ -1032,7 +1078,7 @@ where type Handler = RaftPoller; fn build(&mut self) -> RaftPoller { - let ctx = PollContext { + let mut ctx = PollContext { cfg: self.cfg.value().clone(), store: self.store.clone(), pd_scheduler: self.pd_scheduler.clone(), @@ -1046,7 +1092,6 @@ where importer: self.importer.clone(), store_meta: self.store_meta.clone(), pending_create_peers: self.pending_create_peers.clone(), - poller_handle: self.poller_handle.clone(), raft_metrics: RaftMetrics::default(), snap_mgr: self.snap_mgr.clone(), applying_snap_count: self.applying_snap_count.clone(), @@ -1067,8 +1112,10 @@ where need_flush_trans: false, current_time: None, perf_context_statistics: PerfContextStatistics::new(self.cfg.value().perf_level), + tick_batch: vec![PeerTickBatch::default(); 256], node_start_time: Some(TiInstant::now_coarse()), }; + ctx.update_ticks_timeout(); let tag = format!("[store {}]", ctx.store.get_id()); RaftPoller { tag: tag.clone(), @@ -1092,7 +1139,6 @@ struct Workers { raftlog_gc_worker: Worker>, region_worker: Worker>, coprocessor_host: CoprocessorHost, - future_poller: Runtime, } pub struct RaftBatchSystem { @@ -1146,12 +1192,6 @@ impl RaftBatchSystem { cleanup_worker: Worker::new("cleanup-worker"), raftlog_gc_worker: Worker::new("raft-gc-worker"), coprocessor_host: coprocessor_host.clone(), - future_poller: runtime::Builder::new() - .threaded_scheduler() - .thread_name("future-poller") - .core_threads(cfg.value().future_poll_size) - .build() - .unwrap(), }; let mut builder = RaftPollerBuilder { cfg, @@ -1175,7 +1215,6 @@ impl RaftBatchSystem { store_meta, pending_create_peers: Arc::new(Mutex::new(HashMap::default())), applying_snap_count: Arc::new(AtomicUsize::new(0)), - poller_handle: workers.future_poller.handle().clone(), }; let region_peers = builder.init()?; let engine = builder.engines.kv.clone(); @@ -1337,9 +1376,6 @@ impl RaftBatchSystem { } } workers.coprocessor_host.shutdown(); - workers - .future_poller - .shutdown_timeout(Duration::from_nanos(0)); } } diff --git a/components/raftstore/src/store/msg.rs b/components/raftstore/src/store/msg.rs index 143c14532b8..48f1165cd1b 100644 --- a/components/raftstore/src/store/msg.rs +++ b/components/raftstore/src/store/msg.rs @@ -143,6 +143,17 @@ impl PeerTicks { _ => unreachable!(), } } + pub fn get_all_ticks() -> &'static [PeerTicks] { + const TICKS: &[PeerTicks] = &[ + PeerTicks::RAFT, + PeerTicks::RAFT_LOG_GC, + PeerTicks::SPLIT_REGION_CHECK, + PeerTicks::PD_HEARTBEAT, + PeerTicks::CHECK_MERGE, + PeerTicks::CHECK_PEER_STALE_STATE, + ]; + TICKS + } } #[derive(Debug, Clone, Copy)] diff --git a/components/tikv_util/src/future.rs b/components/tikv_util/src/future.rs index 73ea0e6da21..fae8f47bbfa 100644 --- a/components/tikv_util/src/future.rs +++ b/components/tikv_util/src/future.rs @@ -70,7 +70,7 @@ where /// it will register the waker. When the event is ready, the waker will /// be notified, then the internal future is immediately polled in the /// thread calling `wake()`. -pub fn poll_future_notify(f: impl Future + Send + 'static) { +pub fn poll_future_notify + Send + 'static>(f: F) { let f: BoxFuture<'static, ()> = Box::pin(f); let waker = Arc::new(BatchCommandsWaker(Mutex::new(Some(f)))); waker.wake(); diff --git a/tests/failpoints/cases/test_conf_change.rs b/tests/failpoints/cases/test_conf_change.rs index 34353b09650..15c591997f4 100644 --- a/tests/failpoints/cases/test_conf_change.rs +++ b/tests/failpoints/cases/test_conf_change.rs @@ -143,6 +143,7 @@ fn test_write_after_destroy() { fn test_tick_after_destroy() { // 3 nodes cluster. let mut cluster = new_server_cluster(0, 3); + cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(50); let pd_client = cluster.pd_client.clone(); // Disable default max peer count check. @@ -159,7 +160,7 @@ fn test_tick_after_destroy() { must_get_equal(&engine_3, b"k1", b"v1"); let tick_fp = "on_raft_log_gc_tick_1"; - fail::cfg(tick_fp, "pause").unwrap(); + fail::cfg(tick_fp, "return").unwrap(); let poll_fp = "pause_on_peer_destroy_res"; fail::cfg(poll_fp, "pause").unwrap(); @@ -175,8 +176,8 @@ fn test_tick_after_destroy() { cluster.clear_send_filters(); cluster.must_put(b"k3", b"v3"); - thread::sleep(cluster.cfg.raft_store.raft_log_gc_tick_interval.0); fail::remove(tick_fp); + thread::sleep(cluster.cfg.raft_store.raft_log_gc_tick_interval.0); thread::sleep(Duration::from_millis(100)); fail::remove(poll_fp);