Skip to content

Commit

Permalink
raftstore: fix duplicate ctx of read index (tikv#5213)
Browse files Browse the repository at this point in the history
Signed-off-by: 5kbpers <tangminghua@pingcap.com>
  • Loading branch information
5kbpers authored and YangKeao committed Aug 26, 2019
1 parent 506684a commit 2590d0d
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 16 deletions.
23 changes: 7 additions & 16 deletions src/raftstore/store/peer.rs
Expand Up @@ -6,7 +6,7 @@ use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::{atomic, Arc};
use std::time::{Duration, Instant};
use std::{cmp, mem, slice, u64};
use std::{cmp, mem, u64};

use engine::rocks::{Snapshot, SyncSnapshot, WriteBatch, WriteOptions, DB};
use engine::{Engines, Peekable};
Expand All @@ -27,6 +27,7 @@ use raft::{
NO_LIMIT,
};
use time::Timespec;
use uuid::Uuid;

use crate::raftstore::coprocessor::{CoprocessorHost, RegionChangeEvent};
use crate::raftstore::store::fsm::store::PollContext;
Expand Down Expand Up @@ -55,7 +56,7 @@ use super::DestroyPeerJob;
const SHRINK_CACHE_CAPACITY: usize = 64;

struct ReadIndexRequest {
id: u64,
id: Uuid,
cmds: MustConsumeVec<(RaftCmdRequest, Callback)>,
renew_lease_time: Timespec,
read_index: Option<u64>,
Expand All @@ -64,10 +65,7 @@ struct ReadIndexRequest {
impl ReadIndexRequest {
// Transmutes `self.id` to a 8 bytes slice, so that we can use the payload to do read index.
fn binary_id(&self) -> &[u8] {
unsafe {
let id = &self.id as *const u64 as *const u8;
slice::from_raw_parts(id, 8)
}
self.id.as_bytes()
}

fn push_command(&mut self, req: RaftCmdRequest, cb: Callback) {
Expand All @@ -76,7 +74,7 @@ impl ReadIndexRequest {
}

fn with_command(
id: u64,
id: Uuid,
req: RaftCmdRequest,
cb: Callback,
renew_lease_time: Timespec,
Expand Down Expand Up @@ -104,17 +102,11 @@ impl Drop for ReadIndexRequest {

#[derive(Default)]
struct ReadIndexQueue {
id_allocator: u64,
reads: VecDeque<ReadIndexRequest>,
ready_cnt: usize,
}

impl ReadIndexQueue {
fn next_id(&mut self) -> u64 {
self.id_allocator += 1;
self.id_allocator
}

fn clear_uncommitted(&mut self, term: u64) {
for mut read in self.reads.drain(self.ready_cnt..) {
RAFT_READ_INDEX_PENDING_COUNT.sub(read.cmds.len() as i64);
Expand Down Expand Up @@ -1950,9 +1942,8 @@ impl Peer {
let last_pending_read_count = self.raft_group.raft.pending_read_count();
let last_ready_read_count = self.raft_group.raft.ready_read_count();

let id = self.pending_reads.next_id();
let ctx = id.to_ne_bytes();
self.raft_group.read_index(ctx.to_vec());
let id = Uuid::new_v4();
self.raft_group.read_index(id.as_bytes().to_vec());

let pending_read_count = self.raft_group.raft.pending_read_count();
let ready_read_count = self.raft_group.raft.ready_read_count();
Expand Down
76 changes: 76 additions & 0 deletions tests/failpoints/cases/test_replica_read.rs
Expand Up @@ -3,6 +3,10 @@
use std::sync::Arc;

use fail;
use std::mem;
use std::sync::atomic::AtomicBool;
use std::sync::Mutex;
use std::thread;
use std::time::Duration;
use test_raftstore::*;
use tikv_util::HandyRwLock;
Expand Down Expand Up @@ -63,3 +67,75 @@ fn test_wait_for_apply_index() {
Err(_) => panic!("follower read failed"),
}
}

#[test]
fn test_duplicate_read_index_ctx() {
let _guard = crate::setup();
// Initialize cluster
let mut cluster = new_node_cluster(0, 3);
configure_for_lease_read(&mut cluster, Some(50), Some(10_000));
cluster.cfg.raft_store.raft_heartbeat_ticks = 1;
let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();

// Set region and peers
let r1 = cluster.run_conf_change();
let p1 = new_peer(1, 1);
cluster.must_put(b"k0", b"v0");
let p2 = new_peer(2, 2);
cluster.pd_client.must_add_peer(r1, p2.clone());
let p3 = new_peer(3, 3);
cluster.pd_client.must_add_peer(r1, p3.clone());
must_get_equal(&cluster.get_engine(3), b"k0", b"v0");
let region = cluster.get_region(b"k0");
assert_eq!(cluster.leader_of_region(region.get_id()).unwrap(), p1);

// Delay all raft messages to peer 1.
let dropped_msgs = Arc::new(Mutex::new(Vec::new()));
let filter = Box::new(
RegionPacketFilter::new(region.get_id(), 1)
.direction(Direction::Recv)
.when(Arc::new(AtomicBool::new(true)))
.reserve_dropped(Arc::clone(&dropped_msgs)),
);
cluster.sim.wl().add_recv_filter(1, filter);

// send two read index requests to leader
let mut request = new_request(
region.get_id(),
region.get_region_epoch().clone(),
vec![new_read_index_cmd()],
true,
);
request.mut_header().set_peer(p2.clone());
let (cb2, rx2) = make_cb(&request);
// send to peer 2
cluster
.sim
.rl()
.async_command_on_node(2, request.clone(), cb2)
.unwrap();
thread::sleep(Duration::from_millis(200));

request.mut_header().set_peer(p3.clone());
let (cb3, rx3) = make_cb(&request);
// send to peer 3
cluster
.sim
.rl()
.async_command_on_node(3, request.clone(), cb3)
.unwrap();
thread::sleep(Duration::from_millis(200));

let router = cluster.sim.wl().get_router(1).unwrap();
fail::cfg("pause_on_peer_collect_message", "pause").unwrap();
cluster.sim.wl().clear_recv_filters(1);
for raft_msg in mem::replace(dropped_msgs.lock().unwrap().as_mut(), vec![]) {
router.send_raft_message(raft_msg).unwrap();
}
fail::cfg("pause_on_peer_collect_message", "off").unwrap();

// read index response must not be dropped
assert!(rx2.recv_timeout(Duration::from_millis(500)).is_ok());
assert!(rx3.recv_timeout(Duration::from_millis(500)).is_ok());
}

0 comments on commit 2590d0d

Please sign in to comment.