Skip to content

Commit

Permalink
add integrated test
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
  • Loading branch information
MyonKeminta committed Oct 10, 2018
1 parent e160a0c commit 592fcfa
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 22 deletions.
51 changes: 29 additions & 22 deletions src/raftstore/coprocessor/region_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,24 @@ use raftstore::store::msg::{SeekRegionCallback, SeekRegionFilter, SeekRegionResu
use raftstore::store::util;
use std::collections::BTreeMap;
use std::collections::Bound::{Excluded, Unbounded};
use std::fmt::{Debug, Display, Formatter, Result as FmtResult};
use std::fmt::{Display, Formatter, Result as FmtResult};
use std::sync::{mpsc, Arc, Mutex};
use std::usize;
use storage::engine::{RegionInfoProvider, Result as EngineResult};
use util::collections::HashMap;
use util::escape;
use util::worker::{Builder as WorkerBuilder, Runnable, Scheduler, Worker};

const CHANNEL_BUFFER_SIZE: usize = usize::MAX; // Unbounded

/// `RegionCollection` is used to collect all regions on this TiKV into a collection so that other
/// parts of TiKV can get region information from it. It registers several observers to raftstore,
/// which is named `EventSender`, and it simply send some specific types of events throw a channel.
/// parts of TiKV can get region information from it. It registers a observer to raftstore, which
/// is named `EventSender`, and it simply send some specific types of events through a channel.
/// In the mean time, `RegionCollectionWorker` keeps fetching messages from the channel, and mutate
/// the collection according tho the messages. When an accessor method of `RegionCollection` is
/// called, it also simply send a message to `RegionCollectionWorker`, and the result will be send
/// back through as soon as it's finished.
/// In fact, the channel mentioned above is actually a `util::worker::Worker`.

/// `RaftStoreEvent` Represents events dispatched from raftstore coprocessor.
#[derive(Debug)]
Expand All @@ -48,15 +50,11 @@ enum RaftStoreEvent {
RoleChange { region: Region, role: StateRole },
}

impl Display for RaftStoreEvent {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
Debug::fmt(self, f)
}
}
type RegionsMap = HashMap<u64, (Region, StateRole)>;
type RegionRangesMap = BTreeMap<Vec<u8>, u64>;

/// `RegionCollection` has its own thread (namely RegionCollectionWorker). Queries and updates are
/// done by sending commands to the thread.
#[derive(Debug)]
enum RegionCollectionMsg {
RaftStoreEvent(RaftStoreEvent),
SeekRegion {
Expand All @@ -65,23 +63,19 @@ enum RegionCollectionMsg {
limit: u32,
callback: SeekRegionCallback,
},
}

// So we can derive `Debug` on `RegionCollectionMsg`
impl Debug for SeekRegionFilter {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
write!(f, "<filter>")
}
}
impl Debug for SeekRegionCallback {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
write!(f, "<callback>")
}
/// Get all contents from the collection. Only used for testing.
DebugDump(mpsc::Sender<(RegionsMap, RegionRangesMap)>),
}

impl Display for RegionCollectionMsg {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
Debug::fmt(self, f)
match self {
RegionCollectionMsg::RaftStoreEvent(e) => write!(f, "RaftStoreEvent({:?})", e),
RegionCollectionMsg::SeekRegion { from, limit, .. } => {
write!(f, "SeekRegion(from: {}, limit: {})", escape(from), limit)
}
RegionCollectionMsg::DebugDump(_) => write!(f, "DebugDump"),
}
}
}

Expand Down Expand Up @@ -313,6 +307,10 @@ impl Runnable<RegionCollectionMsg> for RegionCollectionWorker {
} => {
self.handle_seek_region(from, filter, limit, callback);
}
RegionCollectionMsg::DebugDump(tx) => {
tx.send((self.regions.clone(), self.region_ranges.clone()))
.unwrap();
}
}
}
}
Expand Down Expand Up @@ -353,6 +351,15 @@ impl RegionCollection {
pub fn stop(&self) {
self.worker.lock().unwrap().stop().unwrap().join().unwrap();
}

/// Get all content from the collection. Only used for testing.
pub fn debug_dump(&self) -> (RegionsMap, RegionRangesMap) {
let (tx, rx) = mpsc::channel();
self.scheduler
.schedule(RegionCollectionMsg::DebugDump(tx))
.unwrap();
rx.recv().unwrap()
}
}

impl RegionInfoProvider for RegionCollection {
Expand Down
1 change: 1 addition & 0 deletions tests/raftstore_cases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod test_merge;
mod test_multi;
mod test_prevote;
mod test_region_change_observer;
mod test_region_collections;
mod test_region_heartbeat;
mod test_service;
mod test_single;
Expand Down
186 changes: 186 additions & 0 deletions tests/raftstore_cases/test_region_collections.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

use kvproto::metapb::Region;
use raft::StateRole;
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use test_raftstore::{configure_for_merge, new_node_cluster, Cluster, NodeCluster};
use tikv::raftstore::coprocessor::RegionCollection;
use tikv::raftstore::store::keys::data_end_key;
use tikv::raftstore::store::util::{find_peer, new_peer};
use tikv::util::HandyRwLock;

fn dump(c: &RegionCollection) -> Vec<(Region, StateRole)> {
let (regions, region_ranges) = c.debug_dump();

assert_eq!(regions.len(), region_ranges.len());

let mut res = Vec::new();
for (end_key, id) in region_ranges {
let (ref region, role) = regions[&id];
assert_eq!(end_key, data_end_key(region.get_end_key()));
assert_eq!(id, region.get_id());
res.push((region.clone(), role));
}

res
}

fn check_region_ranges(regions: &[(Region, StateRole)], ranges: &[(&[u8], &[u8])]) {
assert_eq!(regions.len(), ranges.len());
regions
.iter()
.zip(ranges.iter())
.for_each(|((r, _), (start_key, end_key))| {
assert_eq!(r.get_start_key(), *start_key);
assert_eq!(r.get_end_key(), *end_key);
})
}

fn test_region_collection_impl(cluster: &mut Cluster<NodeCluster>, c: &RegionCollection) {
for i in 0..9 {
let k = format!("k{}", i).into_bytes();
let v = format!("v{}", i).into_bytes();
cluster.must_put(&k, &v);
}

let pd_client = Arc::clone(&cluster.pd_client);

let init_regions = dump(c);
check_region_ranges(&init_regions, &[(&b""[..], &b""[..])]);
assert_eq!(init_regions[0].0, cluster.get_region(b"k1"));

// Split
{
let r1 = cluster.get_region(b"k1");
cluster.must_split(&r1, b"k1");
let r2 = cluster.get_region(b"k4");
cluster.must_split(&r2, b"k4");
let r3 = cluster.get_region(b"k2");
cluster.must_split(&r3, b"k2");
let r4 = cluster.get_region(b"k3");
cluster.must_split(&r4, b"k3");
}

let split_regions = dump(c);
check_region_ranges(
&split_regions,
&[
(&b""[..], &b"k1"[..]),
(b"k1", b"k2"),
(b"k2", b"k3"),
(b"k3", b"k4"),
(b"k4", b""),
],
);
for (ref region, _) in &split_regions {
if region.get_id() == init_regions[0].0.get_id() {
assert_ne!(
region.get_region_epoch(),
init_regions[0].0.get_region_epoch()
);
}
}

// Merge from left to right
pd_client.must_merge(split_regions[1].0.get_id(), split_regions[2].0.get_id());
let merge_regions = dump(&c);
check_region_ranges(
&merge_regions,
&[
(&b""[..], &b"k1"[..]),
(b"k1", b"k3"),
(b"k3", b"k4"),
(b"k4", b""),
],
);

// Merge from right to left
pd_client.must_merge(merge_regions[2].0.get_id(), merge_regions[1].0.get_id());
let mut merge_regions_2 = dump(&c);
check_region_ranges(
&merge_regions_2,
&[(&b""[..], &b"k1"[..]), (b"k1", b"k4"), (b"k4", b"")],
);

// Add peer
let (region1, role1) = merge_regions_2.remove(1);
assert_eq!(role1, StateRole::Leader);
assert_eq!(region1.get_peers().len(), 1);
assert_eq!(region1.get_peers()[0].get_store_id(), 1);

pd_client.must_add_peer(region1.get_id(), new_peer(2, 100));
let (region2, role2) = dump(c).remove(1);
assert_eq!(role2, StateRole::Leader);
assert_eq!(region2.get_peers().len(), 2);
assert!(find_peer(&region2, 1).is_some());
assert!(find_peer(&region2, 2).is_some());

// Change leader
pd_client.transfer_leader(region2.get_id(), find_peer(&region2, 2).unwrap().clone());
let mut region3 = Region::default();
let mut role3 = StateRole::default();
// Wait for transfer leader finish
for _ in 0..100 {
let r = dump(c).remove(1);
region3 = r.0;
role3 = r.1;
if role3 == StateRole::Follower {
break;
}
thread::sleep(Duration::from_millis(20));
}
assert_eq!(role3, StateRole::Follower);

// Remove peer
pd_client.must_remove_peer(region3.get_id(), find_peer(&region3, 1).unwrap().clone());
let regions_after_removing = dump(c);
check_region_ranges(
&regions_after_removing,
&[(&b""[..], &b"k1"[..]), (b"k4", b"")],
);
}

#[test]
fn test_node_cluster_region_collection() {
let mut cluster = new_node_cluster(1, 3);
configure_for_merge(&mut cluster);

let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();

// Create a RegionCollection on node 1
let (tx, rx) = channel();
cluster
.sim
.wl()
.post_create_coprocessor_host(box move |id, host| {
if id == 1 {
let c = RegionCollection::new(host, id);
tx.send(c).unwrap();
}
});
cluster.run_conf_change();
let c = rx.recv().unwrap();
c.start();
// We only created it on the node whose id == 1 so we shouldn't receive more than one item.
assert!(rx.try_recv().is_err());

test_region_collection_impl(&mut cluster, &c);

drop(cluster);
c.stop();
}

0 comments on commit 592fcfa

Please sign in to comment.