Skip to content

Commit

Permalink
raftstore: Remove extra Snapshot typaram from several types
Browse files Browse the repository at this point in the history
Signed-off-by: Brian Anderson <andersrb@gmail.com>
  • Loading branch information
brson committed Jul 24, 2020
1 parent 1388c7d commit 2646791
Show file tree
Hide file tree
Showing 23 changed files with 112 additions and 127 deletions.
4 changes: 2 additions & 2 deletions cmd/src/server.rs
Expand Up @@ -10,7 +10,7 @@

use crate::{setup::*, signal_handler};
use encryption::DataKeyManager;
use engine_rocks::{encryption::get_env, RocksEngine, RocksSnapshot};
use engine_rocks::{encryption::get_env, RocksEngine};
use engine_traits::{KvEngines, MetricsFlusher};
use fs2::FileExt;
use futures_cpupool::Builder;
Expand Down Expand Up @@ -112,7 +112,7 @@ struct TiKVServer {
cfg_controller: Option<ConfigController>,
security_mgr: Arc<SecurityManager>,
pd_client: Arc<RpcClient>,
router: RaftRouter<RocksEngine, RocksEngine, RocksSnapshot>,
router: RaftRouter<RocksEngine, RocksEngine>,
system: Option<RaftBatchSystem>,
resolver: resolve::PdStoreAddrResolver,
state: Arc<Mutex<GlobalReplicationState>>,
Expand Down
6 changes: 3 additions & 3 deletions components/raftstore/src/coprocessor/split_check/size.rs
Expand Up @@ -254,7 +254,7 @@ pub mod tests {
use engine_rocks::properties::RangePropertiesCollectorFactory;
use engine_rocks::raw::{ColumnFamilyOptions, DBOptions, Writable};
use engine_rocks::raw_util::{new_engine_opt, CFOptions};
use engine_rocks::{Compat, RocksEngine, RocksSnapshot};
use engine_rocks::{Compat, RocksEngine};
use engine_traits::CF_LOCK;
use engine_traits::{CfName, ALL_CFS, CF_DEFAULT, CF_WRITE, LARGE_CFS};
use kvproto::metapb::Peer;
Expand All @@ -275,7 +275,7 @@ pub mod tests {
use super::*;

fn must_split_at_impl(
rx: &mpsc::Receiver<(u64, CasualMessage<RocksEngine, RocksEngine, RocksSnapshot>)>,
rx: &mpsc::Receiver<(u64, CasualMessage<RocksEngine, RocksEngine>)>,
exp_region: &Region,
exp_split_keys: Vec<Vec<u8>>,
ignore_split_keys: bool,
Expand Down Expand Up @@ -307,7 +307,7 @@ pub mod tests {
}

pub fn must_split_at(
rx: &mpsc::Receiver<(u64, CasualMessage<RocksEngine, RocksEngine, RocksSnapshot>)>,
rx: &mpsc::Receiver<(u64, CasualMessage<RocksEngine, RocksEngine>)>,
exp_region: &Region,
exp_split_keys: Vec<Vec<u8>>,
) {
Expand Down
42 changes: 20 additions & 22 deletions components/raftstore/src/router.rs
Expand Up @@ -89,7 +89,7 @@ where
fn casual_send(
&self,
region_id: u64,
msg: CasualMessage<EK, RocksEngine, EK::Snapshot>,
msg: CasualMessage<EK, RocksEngine>,
) -> RaftStoreResult<()>;
}

Expand Down Expand Up @@ -125,26 +125,24 @@ where
fn casual_send(
&self,
_: u64,
_: CasualMessage<EK, RocksEngine, EK::Snapshot>,
_: CasualMessage<EK, RocksEngine>,
) -> RaftStoreResult<()> {
Ok(())
}
}

/// A router that routes messages to the raftstore
#[allow(clippy::type_complexity)] // FIXME: should be removable after the E::Snapshot
// typarams below are removed
pub struct ServerRaftStoreRouter<E>
pub struct ServerRaftStoreRouter<EK>
where
E: KvEngine,
EK: KvEngine,
{
router: RaftRouter<E, RocksEngine, E::Snapshot>,
local_reader: RefCell<LocalReader<RaftRouter<RocksEngine, RocksEngine, E::Snapshot>, E>>,
router: RaftRouter<EK, RocksEngine>,
local_reader: RefCell<LocalReader<RaftRouter<EK, RocksEngine>, EK>>,
}

impl<E> Clone for ServerRaftStoreRouter<E>
impl<EK> Clone for ServerRaftStoreRouter<EK>
where
E: KvEngine,
EK: KvEngine,
{
fn clone(&self) -> Self {
ServerRaftStoreRouter {
Expand All @@ -154,15 +152,15 @@ where
}
}

impl<E> ServerRaftStoreRouter<E>
impl<EK> ServerRaftStoreRouter<EK>
where
E: KvEngine,
EK: KvEngine,
{
/// Creates a new router.
pub fn new(
router: RaftRouter<E, RocksEngine, E::Snapshot>,
reader: LocalReader<RaftRouter<RocksEngine, RocksEngine, E::Snapshot>, E>,
) -> ServerRaftStoreRouter<E> {
router: RaftRouter<EK, RocksEngine>,
reader: LocalReader<RaftRouter<EK, RocksEngine>, EK>,
) -> ServerRaftStoreRouter<EK> {
let local_reader = RefCell::new(reader);
ServerRaftStoreRouter {
router,
Expand All @@ -188,9 +186,9 @@ pub fn handle_send_error<T>(region_id: u64, e: TrySendError<T>) -> RaftStoreErro
}
}

impl<E> RaftStoreRouter<E> for ServerRaftStoreRouter<E>
impl<EK> RaftStoreRouter<EK> for ServerRaftStoreRouter<EK>
where
E: KvEngine,
EK: KvEngine,
{
fn send_raft_msg(&self, msg: RaftMessage) -> RaftStoreResult<()> {
let region_id = msg.get_region_id();
Expand All @@ -199,7 +197,7 @@ where
.map_err(|e| handle_send_error(region_id, e))
}

fn send_command(&self, req: RaftCmdRequest, cb: Callback<E::Snapshot>) -> RaftStoreResult<()> {
fn send_command(&self, req: RaftCmdRequest, cb: Callback<EK::Snapshot>) -> RaftStoreResult<()> {
let cmd = RaftCommand::new(req, cb);
let region_id = cmd.request.get_header().get_region_id();
self.router
Expand All @@ -211,7 +209,7 @@ where
&self,
read_id: Option<ThreadReadId>,
req: RaftCmdRequest,
cb: Callback<E::Snapshot>,
cb: Callback<EK::Snapshot>,
) -> RaftStoreResult<()> {
let mut local_reader = self.local_reader.borrow_mut();
local_reader.read(read_id, req, cb);
Expand All @@ -227,7 +225,7 @@ where
&self,
req: RaftCmdRequest,
txn_extra: TxnExtra,
cb: Callback<E::Snapshot>,
cb: Callback<EK::Snapshot>,
) -> RaftStoreResult<()> {
let cmd = RaftCommand::with_txn_extra(req, cb, txn_extra);
let region_id = cmd.request.get_header().get_region_id();
Expand All @@ -239,7 +237,7 @@ where
fn significant_send(
&self,
region_id: u64,
msg: SignificantMsg<E::Snapshot>,
msg: SignificantMsg<EK::Snapshot>,
) -> RaftStoreResult<()> {
if let Err(SendError(msg)) = self
.router
Expand All @@ -256,7 +254,7 @@ where
fn casual_send(
&self,
region_id: u64,
msg: CasualMessage<E, RocksEngine, E::Snapshot>,
msg: CasualMessage<EK, RocksEngine>,
) -> RaftStoreResult<()> {
self.router
.send(region_id, PeerMsg::CasualMessage(msg))
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore/src/store/fsm/apply.rs
Expand Up @@ -305,7 +305,7 @@ pub enum Notifier<EK>
where
EK: KvEngine,
{
Router(RaftRouter<EK, RocksEngine, EK::Snapshot>),
Router(RaftRouter<EK, RocksEngine>),
#[cfg(test)]
Sender(Sender<PeerMsg<EK, RocksEngine, EK::Snapshot>>),
}
Expand Down
30 changes: 13 additions & 17 deletions components/raftstore/src/store/fsm/peer.rs
Expand Up @@ -84,11 +84,10 @@ pub enum GroupState {
Idle,
}

pub struct PeerFsm<EK, ER, S>
pub struct PeerFsm<EK, ER>
where
EK: KvEngine,
ER: KvEngine,
S: Snapshot,
{
pub peer: Peer<EK, ER>,
/// A registry for all scheduled ticks. This can avoid scheduling ticks twice accidentally.
Expand All @@ -105,8 +104,8 @@ where
stopped: bool,
has_ready: bool,
early_apply: bool,
mailbox: Option<BasicMailbox<PeerFsm<EK, ER, S>>>,
pub receiver: Receiver<PeerMsg<EK, ER, S>>,
mailbox: Option<BasicMailbox<PeerFsm<EK, ER>>>,
pub receiver: Receiver<PeerMsg<EK, ER, EK::Snapshot>>,
/// when snapshot is generating or sending, skip split check at most REGION_SPLIT_SKIT_MAX_COUNT times.
skip_split_count: usize,

Expand All @@ -125,11 +124,10 @@ where
txn_extra: TxnExtra,
}

impl<EK, ER, S> Drop for PeerFsm<EK, ER, S>
impl<EK, ER> Drop for PeerFsm<EK, ER>
where
EK: KvEngine,
ER: KvEngine,
S: Snapshot,
{
fn drop(&mut self) {
self.peer.stop();
Expand All @@ -152,14 +150,13 @@ where

pub type SenderFsmPair<EK, ER, S> = (
LooseBoundedSender<PeerMsg<EK, ER, S>>,
Box<PeerFsm<EK, ER, S>>,
Box<PeerFsm<EK, ER>>,
);

impl<EK, ER, S> PeerFsm<EK, ER, S>
impl<EK, ER> PeerFsm<EK, ER>
where
EK: KvEngine,
ER: KvEngine,
S: Snapshot,
{
// If we create the peer actively, like bootstrap/split/merge region, we should
// use this function to create the peer. The region must contain the peer info
Expand All @@ -170,7 +167,7 @@ where
sched: Scheduler<RegionTask<EK::Snapshot>>,
engines: KvEngines<EK, ER>,
region: &metapb::Region,
) -> Result<SenderFsmPair<EK, ER, S>> {
) -> Result<SenderFsmPair<EK, ER, EK::Snapshot>> {
let meta_peer = match util::find_peer(region, store_id) {
None => {
return Err(box_err!(
Expand Down Expand Up @@ -218,7 +215,7 @@ where
engines: KvEngines<EK, ER>,
region_id: u64,
peer: metapb::Peer,
) -> Result<SenderFsmPair<EK, ER, S>> {
) -> Result<SenderFsmPair<EK, ER, EK::Snapshot>> {
// We will remove tombstone key when apply snapshot
info!(
"replicate peer";
Expand Down Expand Up @@ -391,13 +388,12 @@ where
}
}

impl<EK, ER, S> Fsm for PeerFsm<EK, ER, S>
impl<EK, ER> Fsm for PeerFsm<EK, ER>
where
EK: KvEngine,
ER: KvEngine,
S: Snapshot,
{
type Message = PeerMsg<EK, ER, S>;
type Message = PeerMsg<EK, ER, EK::Snapshot>;

#[inline]
fn is_stopped(&self) -> bool {
Expand Down Expand Up @@ -429,7 +425,7 @@ where
EK: KvEngine,
ER: KvEngine,
{
fsm: &'a mut PeerFsm<EK, ER, EK::Snapshot>,
fsm: &'a mut PeerFsm<EK, ER>,
ctx: &'a mut PollContext<EK, ER, T, C>,
}

Expand All @@ -439,7 +435,7 @@ where
ER: KvEngine,
{
pub fn new(
fsm: &'a mut PeerFsm<EK, ER, EK::Snapshot>,
fsm: &'a mut PeerFsm<EK, ER>,
ctx: &'a mut PollContext<EK, ER, T, C>,
) -> PeerFsmDelegate<'a, EK, ER, T, C> {
PeerFsmDelegate { fsm, ctx }
Expand Down Expand Up @@ -515,7 +511,7 @@ where
}
}

fn on_casual_msg(&mut self, msg: CasualMessage<EK, ER, EK::Snapshot>) {
fn on_casual_msg(&mut self, msg: CasualMessage<EK, ER>) {
match msg {
CasualMessage::SplitRegion {
region_epoch,
Expand Down

0 comments on commit 2646791

Please sign in to comment.