diff --git a/src/raftstore/coprocessor/mod.rs b/src/raftstore/coprocessor/mod.rs index 03801fd86d9..3cab481b677 100644 --- a/src/raftstore/coprocessor/mod.rs +++ b/src/raftstore/coprocessor/mod.rs @@ -22,12 +22,14 @@ pub mod config; pub mod dispatcher; mod error; mod metrics; +pub mod region_collection; mod split_check; pub mod split_observer; pub use self::config::Config; pub use self::dispatcher::{CoprocessorHost, Registry}; pub use self::error::{Error, Result}; +pub use self::region_collection::RegionCollection; pub use self::split_check::{ HalfCheckObserver, Host as SplitCheckerHost, KeysCheckObserver, SizeCheckObserver, TableCheckObserver, diff --git a/src/raftstore/coprocessor/region_collection.rs b/src/raftstore/coprocessor/region_collection.rs new file mode 100644 index 00000000000..96e36d0d0ff --- /dev/null +++ b/src/raftstore/coprocessor/region_collection.rs @@ -0,0 +1,685 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::{ + Coprocessor, CoprocessorHost, ObserverContext, RegionChangeEvent, RegionChangeObserver, + RoleObserver, +}; +use kvproto::metapb::{Peer, Region}; +use raft::StateRole; +use raftstore::store::keys::{data_end_key, data_key, origin_key, DATA_MAX_KEY}; +use raftstore::store::msg::{SeekRegionCallback, SeekRegionFilter, SeekRegionResult}; +use raftstore::store::util; +use std::collections::BTreeMap; +use std::collections::Bound::{Excluded, Unbounded}; +use std::fmt::{Display, Formatter, Result as FmtResult}; +use std::sync::{mpsc, Arc, Mutex}; +use std::usize; +use storage::engine::{RegionInfoProvider, Result as EngineResult}; +use util::collections::HashMap; +use util::escape; +use util::worker::{Builder as WorkerBuilder, Runnable, Scheduler, Worker}; + +const CHANNEL_BUFFER_SIZE: usize = usize::MAX; // Unbounded + +/// `RegionCollection` is used to collect all regions on this TiKV into a collection so that other +/// parts of TiKV can get region information from it. It registers a observer to raftstore, which +/// is named `EventSender`, and it simply send some specific types of events through a channel. +/// In the mean time, `RegionCollectionWorker` keeps fetching messages from the channel, and mutate +/// the collection according tho the messages. When an accessor method of `RegionCollection` is +/// called, it also simply send a message to `RegionCollectionWorker`, and the result will be send +/// back through as soon as it's finished. +/// In fact, the channel mentioned above is actually a `util::worker::Worker`. + +/// `RaftStoreEvent` Represents events dispatched from raftstore coprocessor. +#[derive(Debug)] +enum RaftStoreEvent { + CreateRegion { region: Region }, + UpdateRegion { region: Region }, + DestroyRegion { region: Region }, + RoleChange { region: Region, role: StateRole }, +} + +type RegionsMap = HashMap; +type RegionRangesMap = BTreeMap, u64>; + +/// `RegionCollection` has its own thread (namely RegionCollectionWorker). Queries and updates are +/// done by sending commands to the thread. +enum RegionCollectionMsg { + RaftStoreEvent(RaftStoreEvent), + SeekRegion { + from: Vec, + filter: SeekRegionFilter, + limit: u32, + callback: SeekRegionCallback, + }, + /// Get all contents from the collection. Only used for testing. + DebugDump(mpsc::Sender<(RegionsMap, RegionRangesMap)>), +} + +impl Display for RegionCollectionMsg { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + match self { + RegionCollectionMsg::RaftStoreEvent(e) => write!(f, "RaftStoreEvent({:?})", e), + RegionCollectionMsg::SeekRegion { from, limit, .. } => { + write!(f, "SeekRegion(from: {}, limit: {})", escape(from), limit) + } + RegionCollectionMsg::DebugDump(_) => write!(f, "DebugDump"), + } + } +} + +/// `EventSender` implements observer traits. It simply send the events that we are interested in +/// through the `scheduler`. +#[derive(Clone)] +struct EventSender { + scheduler: Scheduler, +} + +impl Coprocessor for EventSender {} + +impl RegionChangeObserver for EventSender { + fn on_region_changed(&self, context: &mut ObserverContext, event: RegionChangeEvent) { + let region = context.region().clone(); + let event = match event { + RegionChangeEvent::Create => RaftStoreEvent::CreateRegion { region }, + RegionChangeEvent::Update => RaftStoreEvent::UpdateRegion { region }, + RegionChangeEvent::Destroy => RaftStoreEvent::DestroyRegion { region }, + }; + self.scheduler + .schedule(RegionCollectionMsg::RaftStoreEvent(event)) + .unwrap(); + } +} + +impl RoleObserver for EventSender { + fn on_role_change(&self, context: &mut ObserverContext, role: StateRole) { + let region = context.region().clone(); + let event = RaftStoreEvent::RoleChange { region, role }; + self.scheduler + .schedule(RegionCollectionMsg::RaftStoreEvent(event)) + .unwrap(); + } +} + +/// Create an `EventSender` and register it to given coprocessor host. +fn register_raftstore_event_sender( + host: &mut CoprocessorHost, + scheduler: Scheduler, +) { + let event_sender = EventSender { scheduler }; + + host.registry + .register_role_observer(1, box event_sender.clone()); + host.registry + .register_region_change_observer(1, box event_sender.clone()); +} + +/// `RegionCollectionWorker` is the underlying runner of `RegionCollection`. It listens on events +/// sent by the `EventSender` and maintains the collection of all regions. Role of each region +/// are also tracked. +struct RegionCollectionWorker { + self_store_id: u64, + // region_id -> (Region, State) + regions: HashMap, + // region_id -> prefixed end key ('z' + key) + region_ranges: BTreeMap, u64>, +} + +impl RegionCollectionWorker { + fn new(self_store_id: u64) -> Self { + Self { + self_store_id, + regions: HashMap::default(), + region_ranges: BTreeMap::default(), + } + } + + fn handle_create_region(&mut self, region: Region) { + if self.regions.get(®ion.get_id()).is_some() { + warn!( + "region_collection: trying to create new region {} but it already exists. \ + try to update it.", + region.get_id(), + ); + self.handle_update_region(region); + return; + } + + self.region_ranges + .insert(data_end_key(region.get_end_key()), region.get_id()); + // TODO: Should we set it follower? + self.regions + .insert(region.get_id(), (region, StateRole::Follower)); + } + + fn handle_update_region(&mut self, region: Region) { + let mut is_new_region = true; + if let Some((ref mut old_region, _)) = self.regions.get_mut(®ion.get_id()) { + is_new_region = false; + assert_eq!(old_region.get_id(), region.get_id()); + + // If the end_key changed, the old item in `region_ranges` should be removed. + // However it shouldn't be removed if it was already updated by another region. In this + // case, the old_end_key + if old_region.get_end_key() != region.get_end_key() { + // The region's end_key has changed. + // Remove the old entry in `self.region_ranges` if it haven't been updated by + // other items in `regions`. + let old_end_key = data_end_key(old_region.get_end_key()); + if let Some(old_id) = self.region_ranges.get(&old_end_key).cloned() { + // If they don't equal, we shouldn't remove it because it was updated by another + // region. + if old_id == region.get_id() { + self.region_ranges.remove(&old_end_key); + } + } + } + + // If the region already exists, update it and keep the original role. + *old_region = region.clone(); + } + + if is_new_region { + warn!( + "region_collection: trying to update region {} but it doesn't exist.", + region.get_id() + ); + // If it's a new region, set it to follower state. + // TODO: Should we set it follower? + self.regions + .insert(region.get_id(), (region.clone(), StateRole::Follower)); + } + + // If the end_key changed or the region didn't exist previously, insert a new item; + // otherwise, update the old item. All regions in param `regions` must have unique + // end_keys, so it won't conflict with each other. + self.region_ranges + .insert(data_end_key(region.get_end_key()), region.get_id()); + } + + fn handle_destroy_region(&mut self, region: Region) { + if let Some((removed_region, _)) = self.regions.remove(®ion.get_id()) { + assert_eq!(removed_region.get_id(), region.get_id()); + let end_key = data_end_key(removed_region.get_end_key()); + + // The entry may be updated by other regions. + if let Some(id) = self.region_ranges.get(&end_key).cloned() { + if id == region.get_id() { + self.region_ranges.remove(&end_key); + } + } + } else { + warn!( + "region_collection: destroying region {} but it doesn't exist", + region.get_id() + ) + } + } + + fn handle_role_change(&mut self, region: Region, new_role: StateRole) { + let region_id = region.get_id(); + if self.regions.get(®ion_id).is_none() { + warn!("region_collection: role change on region {} but the region doesn't exist. create it.", region_id); + self.handle_create_region(region); + } + + let role = &mut self.regions.get_mut(®ion_id).unwrap().1; + *role = new_role; + } + + fn handle_seek_region( + &self, + from_key: Vec, + filter: SeekRegionFilter, + mut limit: u32, + callback: SeekRegionCallback, + ) { + assert!(limit > 0); + + let from_key = data_key(&from_key); + for (end_key, region_id) in self.region_ranges.range((Excluded(from_key), Unbounded)) { + let (region, role) = &self.regions[region_id]; + if filter(region, *role) { + callback(SeekRegionResult::Found { + local_peer: util::find_peer(region, self.self_store_id) + .cloned() + .unwrap_or_else(Peer::default), + region: region.clone(), + }); + return; + } + + limit -= 1; + if limit == 0 { + // `origin_key` does not handle `DATA_MAX_KEY`, but we can return `Ended` rather + // than `LimitExceeded`. + if end_key.as_slice() >= DATA_MAX_KEY { + break; + } + + callback(SeekRegionResult::LimitExceeded { + next_key: origin_key(end_key).to_vec(), + }); + return; + } + } + callback(SeekRegionResult::Ended); + } + + fn handle_raftstore_event(&mut self, event: RaftStoreEvent) { + match event { + RaftStoreEvent::CreateRegion { region } => { + self.handle_create_region(region); + } + RaftStoreEvent::UpdateRegion { region } => { + self.handle_update_region(region); + } + RaftStoreEvent::DestroyRegion { region } => { + self.handle_destroy_region(region); + } + RaftStoreEvent::RoleChange { region, role } => { + self.handle_role_change(region, role); + } + } + } +} + +impl Runnable for RegionCollectionWorker { + fn run(&mut self, task: RegionCollectionMsg) { + match task { + RegionCollectionMsg::RaftStoreEvent(event) => { + self.handle_raftstore_event(event); + } + RegionCollectionMsg::SeekRegion { + from, + filter, + limit, + callback, + } => { + self.handle_seek_region(from, filter, limit, callback); + } + RegionCollectionMsg::DebugDump(tx) => { + tx.send((self.regions.clone(), self.region_ranges.clone())) + .unwrap(); + } + } + } +} + +/// `RegionCollection` keeps all region information separately from raftstore itself. +#[derive(Clone)] +pub struct RegionCollection { + self_store_id: u64, + worker: Arc>>, + scheduler: Scheduler, +} + +impl RegionCollection { + /// Create a new `RegionCollection` and register to `host`. + /// `RegionCollection` doesn't need, and should not be created more than once. If it's needed + /// in different places, just clone it, and their contents are shared. + pub fn new(host: &mut CoprocessorHost, self_store_id: u64) -> Self { + let worker = WorkerBuilder::new("region-collection-worker") + .pending_capacity(CHANNEL_BUFFER_SIZE) + .create(); + let scheduler = worker.scheduler(); + + register_raftstore_event_sender(host, scheduler.clone()); + + let scheduler = worker.scheduler(); + Self { + self_store_id, + worker: Arc::new(Mutex::new(worker)), + scheduler, + } + } + + /// Start the `RegionCollection`. It should be started before raftstore. + pub fn start(&self) { + self.worker + .lock() + .unwrap() + .start(RegionCollectionWorker::new(self.self_store_id)) + .unwrap(); + } + + /// Stop the `RegionCollection`. It should be stopped after raftstore. + pub fn stop(&self) { + self.worker.lock().unwrap().stop().unwrap().join().unwrap(); + } + + /// Get all content from the collection. Only used for testing. + pub fn debug_dump(&self) -> (RegionsMap, RegionRangesMap) { + let (tx, rx) = mpsc::channel(); + self.scheduler + .schedule(RegionCollectionMsg::DebugDump(tx)) + .unwrap(); + rx.recv().unwrap() + } +} + +impl RegionInfoProvider for RegionCollection { + fn seek_region( + &self, + from: &[u8], + filter: SeekRegionFilter, + limit: u32, + ) -> EngineResult { + let (tx, rx) = mpsc::channel(); + let msg = RegionCollectionMsg::SeekRegion { + from: from.to_vec(), + filter, + limit, + callback: box move |res| { + tx.send(res).unwrap_or_else(|e| { + panic!( + "region collection failed to send result back to caller: {:?}", + e + ) + }) + }, + }; + self.scheduler + .schedule(msg) + .map_err(|e| box_err!("failed to send request to region collection: {:?}", e)) + .and_then(|_| { + rx.recv().map_err(|e| { + box_err!( + "failed to receive seek region result from region collection: {:?}", + e + ) + }) + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn new_region(id: u64, start_key: &[u8], end_key: &[u8]) -> Region { + let mut region = Region::default(); + region.set_id(id); + region.set_start_key(start_key.to_vec()); + region.set_end_key(end_key.to_vec()); + region + } + + fn check_collection(c: &RegionCollectionWorker, regions: &[(Region, StateRole)]) { + let region_ranges: Vec<_> = regions + .iter() + .map(|(r, _)| (data_end_key(r.get_end_key()), r.get_id())) + .collect(); + + let mut is_regions_equal = c.regions.len() == regions.len(); + + if is_regions_equal { + for (expect_region, expect_role) in regions { + is_regions_equal = is_regions_equal + && c.regions + .get(&expect_region.get_id()) + .map_or(false, |(region, role)| { + expect_region == region && expect_role == role + }); + + if !is_regions_equal { + break; + } + } + } + if !is_regions_equal { + panic!("regions: expect {:?}, but got {:?}", regions, c.regions); + } + + let mut is_ranges_equal = c.region_ranges.len() == region_ranges.len(); + is_ranges_equal = is_ranges_equal + && c.region_ranges.iter().zip(region_ranges.iter()).all( + |((actual_key, actual_id), (expect_key, expect_id))| { + actual_key == expect_key && actual_id == expect_id + }, + ); + if !is_ranges_equal { + panic!( + "region_ranges: expect {:?}, but got {:?}", + region_ranges, c.region_ranges + ); + } + } + + /// Add a set of regions to an empty collection and check if it's successfully loaded. + fn must_load_regions(c: &mut RegionCollectionWorker, regions: &[Region]) { + assert!(c.regions.is_empty()); + assert!(c.region_ranges.is_empty()); + + for region in regions { + must_create_region(c, ®ion); + } + + let expected_regions: Vec<_> = regions + .iter() + .map(|r| (r.clone(), StateRole::Follower)) + .collect(); + check_collection(&c, &expected_regions); + } + + fn must_create_region(c: &mut RegionCollectionWorker, region: &Region) { + assert!(c.regions.get(®ion.get_id()).is_none()); + + c.handle_create_region(region.clone()); + + assert_eq!(&c.regions[®ion.get_id()].0, region); + assert_eq!( + c.region_ranges[&data_end_key(region.get_end_key())], + region.get_id() + ); + } + + fn must_update_region(c: &mut RegionCollectionWorker, region: &Region) { + assert!(c.regions.get(®ion.get_id()).is_some()); + let old_end_key = c.regions[®ion.get_id()].0.get_end_key().to_vec(); + + c.handle_update_region(region.clone()); + + assert_eq!(&c.regions[®ion.get_id()].0, region); + assert_eq!( + c.region_ranges[&data_end_key(region.get_end_key())], + region.get_id() + ); + // If end_key is updated and the region_id corresponding to the `old_end_key` doesn't equals + // to `region.id`, it shouldn't be removed since it was used by another region. + if old_end_key.as_slice() != region.get_end_key() { + assert!( + c.region_ranges + .get(&data_end_key(&old_end_key)) + .map_or(true, |id| *id != region.get_id()) + ); + } + } + + fn must_destroy_region(c: &mut RegionCollectionWorker, id: u64) { + let end_key = c.regions[&id].0.get_end_key().to_vec(); + + c.handle_destroy_region(new_region(id, b"", b"")); + + assert!(c.regions.get(&id).is_none()); + // If the region_id corresponding to the end_key doesn't equals to `id`, it shouldn't be + // removed since it was used by another region. + assert!( + c.region_ranges + .get(&data_end_key(&end_key)) + .map_or(true, |r| *r != id) + ); + } + + fn must_change_role(c: &mut RegionCollectionWorker, region: &Region, role: StateRole) { + assert!(c.regions.get(®ion.get_id()).is_some()); + + c.handle_role_change(region.clone(), role); + + assert_eq!(c.regions[®ion.get_id()].1, role); + } + + #[test] + fn test_basic_updating() { + let mut c = RegionCollectionWorker::new(0); + let init_regions = &[ + new_region(1, b"", b"k1"), + new_region(2, b"k1", b"k9"), + new_region(3, b"k9", b""), + ]; + + must_load_regions(&mut c, init_regions); + + // end_key changed + must_update_region(&mut c, &new_region(2, b"k2", b"k8")); + // end_key changed (previous end_key is empty) + must_update_region(&mut c, &new_region(3, b"k9", b"k99")); + // end_key not changed + must_update_region(&mut c, &new_region(1, b"k0", b"k1")); + check_collection( + &c, + &[ + (new_region(1, b"k0", b"k1"), StateRole::Follower), + (new_region(2, b"k2", b"k8"), StateRole::Follower), + (new_region(3, b"k9", b"k99"), StateRole::Follower), + ], + ); + + must_change_role(&mut c, &new_region(1, b"k0", b"k1"), StateRole::Candidate); + must_create_region(&mut c, &new_region(5, b"k99", b"")); + must_change_role(&mut c, &new_region(2, b"k2", b"k8"), StateRole::Leader); + must_update_region(&mut c, &new_region(2, b"k3", b"k7")); + must_create_region(&mut c, &new_region(4, b"k1", b"k3")); + check_collection( + &c, + &[ + (new_region(1, b"k0", b"k1"), StateRole::Candidate), + (new_region(4, b"k1", b"k3"), StateRole::Follower), + (new_region(2, b"k3", b"k7"), StateRole::Leader), + (new_region(3, b"k9", b"k99"), StateRole::Follower), + (new_region(5, b"k99", b""), StateRole::Follower), + ], + ); + + must_destroy_region(&mut c, 4); + must_destroy_region(&mut c, 3); + check_collection( + &c, + &[ + (new_region(1, b"k0", b"k1"), StateRole::Candidate), + (new_region(2, b"k3", b"k7"), StateRole::Leader), + (new_region(5, b"k99", b""), StateRole::Follower), + ], + ); + } + + /// Simulate splitting a region into 3 regions, and the region with old id will be the + /// `derive_index`-th region of them. The events are triggered in order indicated by `seq`. + /// This is to ensure the collection is correct, no matter what the events' order to happen is. + /// Values in `seq` and of `derive_index` start from 1. + fn test_split_impl(derive_index: usize, seq: &[usize]) { + let mut c = RegionCollectionWorker::new(0); + let init_regions = &[ + new_region(1, b"", b"k1"), + new_region(2, b"k1", b"k9"), + new_region(3, b"k9", b""), + ]; + must_load_regions(&mut c, init_regions); + + let mut final_regions = vec![ + new_region(1, b"", b"k1"), + new_region(4, b"k1", b"k3"), + new_region(5, b"k3", b"k6"), + new_region(6, b"k6", b"k9"), + new_region(3, b"k9", b""), + ]; + // `derive_index` starts from 1 + final_regions[derive_index].set_id(2); + + for idx in seq { + if *idx == derive_index { + must_update_region(&mut c, &final_regions[*idx]); + } else { + must_create_region(&mut c, &final_regions[*idx]); + } + } + + let final_regions = final_regions + .into_iter() + .map(|r| (r, StateRole::Follower)) + .collect::>(); + check_collection(&c, &final_regions); + } + + #[test] + fn test_split() { + let indices = &[1, 2, 3]; + let orders = &[ + &[1, 2, 3], + &[1, 3, 2], + &[2, 1, 3], + &[2, 3, 1], + &[3, 1, 2], + &[3, 2, 1], + ]; + + for index in indices { + for order in orders { + test_split_impl(*index, *order); + } + } + } + + fn test_merge_impl(to_left: bool, update_first: bool) { + let mut c = RegionCollectionWorker::new(0); + let init_regions = &[ + new_region(1, b"", b"k1"), + new_region(2, b"k1", b"k2"), + new_region(3, b"k2", b"k3"), + new_region(4, b"k3", b""), + ]; + must_load_regions(&mut c, init_regions); + + let (mut updating_region, destroying_region_id) = if to_left { + (init_regions[1].clone(), init_regions[2].get_id()) + } else { + (init_regions[2].clone(), init_regions[1].get_id()) + }; + updating_region.set_start_key(b"k1".to_vec()); + updating_region.set_end_key(b"k3".to_vec()); + + if update_first { + must_update_region(&mut c, &updating_region); + must_destroy_region(&mut c, destroying_region_id); + } else { + must_destroy_region(&mut c, destroying_region_id); + must_update_region(&mut c, &updating_region); + } + + let final_regions = &[ + (new_region(1, b"", b"k1"), StateRole::Follower), + (updating_region, StateRole::Follower), + (new_region(4, b"k3", b""), StateRole::Follower), + ]; + check_collection(&c, final_regions); + } + + #[test] + fn test_merge() { + test_merge_impl(false, false); + test_merge_impl(false, true); + test_merge_impl(true, false); + test_merge_impl(true, true); + } +} diff --git a/src/raftstore/store/fsm/store.rs b/src/raftstore/store/fsm/store.rs index bc96b3f08d3..02f27c32cae 100644 --- a/src/raftstore/store/fsm/store.rs +++ b/src/raftstore/store/fsm/store.rs @@ -957,7 +957,7 @@ impl Store { let from_key = data_key(from_key); for (end_key, region_id) in self.region_ranges.range((Excluded(from_key), Unbounded)) { let peer = &self.region_peers[region_id]; - if filter(peer) { + if filter(peer.region(), peer.raft_group.raft.state) { callback(SeekRegionResult::Found { local_peer: peer.peer.clone(), region: peer.region().clone(), diff --git a/src/raftstore/store/msg.rs b/src/raftstore/store/msg.rs index 5fdfbd65210..692750e45df 100644 --- a/src/raftstore/store/msg.rs +++ b/src/raftstore/store/msg.rs @@ -22,12 +22,12 @@ use kvproto::pdpb::CheckPolicy; use kvproto::raft_cmdpb::{RaftCmdRequest, RaftCmdResponse}; use kvproto::raft_serverpb::RaftMessage; -use raft::SnapshotStatus; +use raft::{SnapshotStatus, StateRole}; use raftstore::store::util::KeysInfoFormatter; use util::escape; use util::rocksdb::CompactedEvent; -use super::{Peer, RegionSnapshot}; +use super::RegionSnapshot; #[derive(Debug, Clone)] pub struct ReadResponse { @@ -56,7 +56,7 @@ pub type ReadCallback = Box; pub type WriteCallback = Box; pub type SeekRegionCallback = Box; -pub type SeekRegionFilter = Box bool + Send>; +pub type SeekRegionFilter = Box bool + Send>; /// Variants of callbacks for `Msg`. /// - `Read`: a callbak for read only requests including `StatusRequest`, diff --git a/tests/raftstore_cases/mod.rs b/tests/raftstore_cases/mod.rs index 2377a978ab1..a8a10e767d3 100644 --- a/tests/raftstore_cases/mod.rs +++ b/tests/raftstore_cases/mod.rs @@ -22,6 +22,7 @@ mod test_merge; mod test_multi; mod test_prevote; mod test_region_change_observer; +mod test_region_collections; mod test_region_heartbeat; mod test_service; mod test_single; diff --git a/tests/raftstore_cases/test_region_collections.rs b/tests/raftstore_cases/test_region_collections.rs new file mode 100644 index 00000000000..8df6df28c4b --- /dev/null +++ b/tests/raftstore_cases/test_region_collections.rs @@ -0,0 +1,200 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +use kvproto::metapb::Region; +use raft::StateRole; +use std::sync::mpsc::channel; +use std::sync::Arc; +use std::thread; +use std::time::Duration; +use test_raftstore::{configure_for_merge, new_node_cluster, Cluster, NodeCluster}; +use tikv::raftstore::coprocessor::RegionCollection; +use tikv::raftstore::store::keys::data_end_key; +use tikv::raftstore::store::util::{find_peer, new_peer}; +use tikv::util::HandyRwLock; + +fn dump(c: &RegionCollection) -> Vec<(Region, StateRole)> { + let (regions, region_ranges) = c.debug_dump(); + + assert_eq!(regions.len(), region_ranges.len()); + + let mut res = Vec::new(); + for (end_key, id) in region_ranges { + let (ref region, role) = regions[&id]; + assert_eq!(end_key, data_end_key(region.get_end_key())); + assert_eq!(id, region.get_id()); + res.push((region.clone(), role)); + } + + res +} + +fn check_region_ranges(regions: &[(Region, StateRole)], ranges: &[(&[u8], &[u8])]) { + assert_eq!(regions.len(), ranges.len()); + regions + .iter() + .zip(ranges.iter()) + .for_each(|((r, _), (start_key, end_key))| { + assert_eq!(r.get_start_key(), *start_key); + assert_eq!(r.get_end_key(), *end_key); + }) +} + +fn test_region_collection_impl(cluster: &mut Cluster, c: &RegionCollection) { + for i in 0..9 { + let k = format!("k{}", i).into_bytes(); + let v = format!("v{}", i).into_bytes(); + cluster.must_put(&k, &v); + } + + let pd_client = Arc::clone(&cluster.pd_client); + + let init_regions = dump(c); + check_region_ranges(&init_regions, &[(&b""[..], &b""[..])]); + assert_eq!(init_regions[0].0, cluster.get_region(b"k1")); + + // Split + { + let r1 = cluster.get_region(b"k1"); + cluster.must_split(&r1, b"k1"); + let r2 = cluster.get_region(b"k4"); + cluster.must_split(&r2, b"k4"); + let r3 = cluster.get_region(b"k2"); + cluster.must_split(&r3, b"k2"); + let r4 = cluster.get_region(b"k3"); + cluster.must_split(&r4, b"k3"); + } + + let split_regions = dump(c); + check_region_ranges( + &split_regions, + &[ + (&b""[..], &b"k1"[..]), + (b"k1", b"k2"), + (b"k2", b"k3"), + (b"k3", b"k4"), + (b"k4", b""), + ], + ); + for (ref region, _) in &split_regions { + if region.get_id() == init_regions[0].0.get_id() { + assert_ne!( + region.get_region_epoch(), + init_regions[0].0.get_region_epoch() + ); + } + } + + // Merge from left to right + pd_client.must_merge(split_regions[1].0.get_id(), split_regions[2].0.get_id()); + let merge_regions = dump(&c); + check_region_ranges( + &merge_regions, + &[ + (&b""[..], &b"k1"[..]), + (b"k1", b"k3"), + (b"k3", b"k4"), + (b"k4", b""), + ], + ); + + // Merge from right to left + pd_client.must_merge(merge_regions[2].0.get_id(), merge_regions[1].0.get_id()); + let mut merge_regions_2 = dump(&c); + check_region_ranges( + &merge_regions_2, + &[(&b""[..], &b"k1"[..]), (b"k1", b"k4"), (b"k4", b"")], + ); + + // Add peer + let (region1, role1) = merge_regions_2.remove(1); + assert_eq!(role1, StateRole::Leader); + assert_eq!(region1.get_peers().len(), 1); + assert_eq!(region1.get_peers()[0].get_store_id(), 1); + + pd_client.must_add_peer(region1.get_id(), new_peer(2, 100)); + let (region2, role2) = dump(c).remove(1); + assert_eq!(role2, StateRole::Leader); + assert_eq!(region2.get_peers().len(), 2); + assert!(find_peer(®ion2, 1).is_some()); + assert!(find_peer(®ion2, 2).is_some()); + + // Change leader + pd_client.transfer_leader(region2.get_id(), find_peer(®ion2, 2).unwrap().clone()); + let mut region3 = Region::default(); + let mut role3 = StateRole::default(); + // Wait for transfer leader finish + for _ in 0..100 { + let r = dump(c).remove(1); + region3 = r.0; + role3 = r.1; + if role3 == StateRole::Follower { + break; + } + thread::sleep(Duration::from_millis(20)); + } + assert_eq!(role3, StateRole::Follower); + + // Remove peer + check_region_ranges( + &dump(c), + &[(&b""[..], &b"k1"[..]), (b"k1", b"k4"), (b"k4", b"")], + ); + + pd_client.must_remove_peer(region3.get_id(), find_peer(®ion3, 1).unwrap().clone()); + + let mut regions_after_removing = Vec::new(); + // It seems region_collection is a little delayed than raftstore... + for _ in 0..100 { + regions_after_removing = dump(c); + if regions_after_removing.len() == 2 { + break; + } + thread::sleep(Duration::from_millis(20)); + } + check_region_ranges( + ®ions_after_removing, + &[(&b""[..], &b"k1"[..]), (b"k4", b"")], + ); +} + +#[test] +fn test_node_cluster_region_collection() { + let mut cluster = new_node_cluster(1, 3); + configure_for_merge(&mut cluster); + + let pd_client = Arc::clone(&cluster.pd_client); + pd_client.disable_default_operator(); + + // Create a RegionCollection on node 1 + let (tx, rx) = channel(); + cluster + .sim + .wl() + .post_create_coprocessor_host(box move |id, host| { + if id == 1 { + let c = RegionCollection::new(host, id); + tx.send(c).unwrap(); + } + }); + cluster.run_conf_change(); + let c = rx.recv().unwrap(); + c.start(); + // We only created it on the node whose id == 1 so we shouldn't receive more than one item. + assert!(rx.try_recv().is_err()); + + test_region_collection_impl(&mut cluster, &c); + + drop(cluster); + c.stop(); +} diff --git a/tests/storage_cases/test_seek_region.rs b/tests/storage_cases/test_seek_region.rs index caf356965f2..9a371f32d13 100644 --- a/tests/storage_cases/test_seek_region.rs +++ b/tests/storage_cases/test_seek_region.rs @@ -11,20 +11,21 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::mpsc::channel; use std::thread; use std::time::Duration; use test_raftstore::*; +use tikv::raftstore::coprocessor::RegionCollection; use tikv::raftstore::store::SeekRegionResult; use tikv::storage::engine::RegionInfoProvider; +use tikv::util::collections::HashMap; use tikv::util::HandyRwLock; -#[test] -fn test_seek_region() { - // Prepare - let mut cluster = new_server_cluster(0, 3); - cluster.run(); - +fn test_seek_region_impl( + mut cluster: Cluster, + region_info_providers: HashMap, +) { for i in 0..15 { let i = i + b'0'; let key = vec![b'k', i]; @@ -73,13 +74,13 @@ fn test_seek_region() { thread::sleep(Duration::from_secs(2)); for node_id in cluster.get_node_ids() { - let engine = cluster.sim.rl().storages[&node_id].clone(); + let engine = ®ion_info_providers[&node_id]; // Test traverse all regions let mut sought_regions = Vec::new(); let mut key = b"".to_vec(); loop { - let res = engine.seek_region(&key, box |_| true, 100).unwrap(); + let res = engine.seek_region(&key, box |_, _| true, 100).unwrap(); match res { SeekRegionResult::Found { local_peer, region } => { assert_eq!(local_peer.get_store_id(), node_id); @@ -97,7 +98,7 @@ fn test_seek_region() { assert_eq!(sought_regions, regions); // Test end_key is exclusive - let res = engine.seek_region(b"k1", box |_| true, 100).unwrap(); + let res = engine.seek_region(b"k1", box |_, _| true, 100).unwrap(); match res { SeekRegionResult::Found { local_peer, region } => { assert_eq!(local_peer.get_store_id(), node_id); @@ -108,7 +109,7 @@ fn test_seek_region() { // Test exactly reaches limit let res = engine - .seek_region(b"", box |p| p.region().get_end_key() == b"k9", 5) + .seek_region(b"", box |r, _| r.get_end_key() == b"k9", 5) .unwrap(); match res { SeekRegionResult::Found { local_peer, region } => { @@ -120,7 +121,7 @@ fn test_seek_region() { // Test exactly exceeds limit let res = engine - .seek_region(b"", box |p| p.region().get_end_key() == b"k9", 4) + .seek_region(b"", box |r, _| r.get_end_key() == b"k9", 4) .unwrap(); match res { SeekRegionResult::LimitExceeded { next_key } => { @@ -130,7 +131,7 @@ fn test_seek_region() { } // Test seek to the end - let res = engine.seek_region(b"", box |_| false, 100).unwrap(); + let res = engine.seek_region(b"", box |_, _| false, 100).unwrap(); match res { SeekRegionResult::Ended => {} r => panic!("expect getting Ended, but got {:?}", r), @@ -138,7 +139,7 @@ fn test_seek_region() { // Test exactly to the end let res = engine - .seek_region(b"", box |p| p.region().get_end_key().is_empty(), 6) + .seek_region(b"", box |r, _| r.get_end_key().is_empty(), 6) .unwrap(); match res { SeekRegionResult::Found { local_peer, region } => { @@ -149,7 +150,7 @@ fn test_seek_region() { } // Test limit exactly reaches end - let res = engine.seek_region(b"", box |_| false, 6).unwrap(); + let res = engine.seek_region(b"", box |_, _| false, 6).unwrap(); match res { SeekRegionResult::Ended => {} r => panic!("expect getting Ended, but got {:?}", r), @@ -157,7 +158,7 @@ fn test_seek_region() { // Test seek from non-starting key let res = engine - .seek_region(b"k6\xff\xff\xff\xff\xff", box |_| true, 1) + .seek_region(b"k6\xff\xff\xff\xff\xff", box |_, _| true, 1) .unwrap(); match res { SeekRegionResult::Found { local_peer, region } => { @@ -167,7 +168,7 @@ fn test_seek_region() { r => panic!("expect getting a region, but got {:?}", r), } let res = engine - .seek_region(b"\xff\xff\xff\xff\xff\xff\xff\xff", box |_| true, 1) + .seek_region(b"\xff\xff\xff\xff\xff\xff\xff\xff", box |_, _| true, 1) .unwrap(); match res { SeekRegionResult::Found { local_peer, region } => { @@ -178,3 +179,41 @@ fn test_seek_region() { } } } + +#[test] +fn test_raftkv_seek_region() { + let mut cluster = new_server_cluster(0, 3); + cluster.run(); + + let mut region_info_providers = HashMap::default(); + for node_id in cluster.get_node_ids() { + region_info_providers.insert(node_id, cluster.sim.rl().storages[&node_id].clone()); + } + + test_seek_region_impl(cluster, region_info_providers); +} + +#[test] +fn test_region_collection_seek_region() { + let mut cluster = new_node_cluster(0, 3); + + let (tx, rx) = channel(); + cluster + .sim + .wl() + .post_create_coprocessor_host(box move |id, host| { + let p = RegionCollection::new(host, id); + p.start(); + tx.send((id, p)).unwrap() + }); + + cluster.run(); + let region_info_providers: HashMap<_, _> = rx.try_iter().collect(); + assert_eq!(region_info_providers.len(), 3); + + test_seek_region_impl(cluster, region_info_providers.clone()); + + for (_, p) in region_info_providers { + p.stop(); + } +}