Skip to content

Commit

Permalink
raftstore: remove nested loop replica read queue (tikv#6396)
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <qupeng@pingcap.com>
  • Loading branch information
hicqu committed Jan 9, 2020
1 parent e5568d4 commit 4aecef1
Show file tree
Hide file tree
Showing 4 changed files with 360 additions and 182 deletions.
1 change: 1 addition & 0 deletions src/raftstore/store/mod.rs
Expand Up @@ -13,6 +13,7 @@ mod local_metrics;
mod metrics;
mod peer;
mod peer_storage;
mod read_queue;
mod region_snapshot;
mod snap;
mod worker;
Expand Down
257 changes: 76 additions & 181 deletions src/raftstore/store/peer.rs
Expand Up @@ -6,7 +6,7 @@ use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, AtomicU64};
use std::sync::{atomic, Arc};
use std::time::{Duration, Instant};
use std::{cmp, mem, u64};
use std::{cmp, mem, u64, usize};

use engine::rocks::{Snapshot, SyncSnapshot, WriteBatch, WriteOptions, DB};
use engine::{Engines, Peekable};
Expand Down Expand Up @@ -37,113 +37,23 @@ use crate::raftstore::store::fsm::{
};
use crate::raftstore::store::keys::{enc_end_key, enc_start_key};
use crate::raftstore::store::worker::{ReadDelegate, ReadProgress, RegionTask};
use crate::raftstore::store::{keys, Callback, Config, ReadResponse, RegionSnapshot};
use crate::raftstore::store::{Callback, Config, PdTask, ReadResponse, RegionSnapshot};
use crate::raftstore::{Error, Result};
use tikv_util::collections::HashMap;
use tikv_util::time::{duration_to_sec, monotonic_raw_now};
use tikv_util::worker::Scheduler;
use tikv_util::MustConsumeVec;

use super::cmd_resp;
use super::local_metrics::{RaftMessageMetrics, RaftReadyMetrics};
use super::metrics::*;
use super::peer_storage::{write_peer_state, ApplySnapResult, InvokeContext, PeerStorage};
use super::read_queue::{ReadIndexQueue, ReadIndexRequest};
use super::transport::Transport;
use super::util::{self, check_region_epoch, is_initial_msg, Lease, LeaseState};
use super::DestroyPeerJob;

const SHRINK_CACHE_CAPACITY: usize = 64;

struct ReadIndexRequest {
id: Uuid,
cmds: MustConsumeVec<(RaftCmdRequest, Callback)>,
renew_lease_time: Timespec,
read_index: Option<u64>,
}

impl ReadIndexRequest {
// Transmutes `self.id` to a 8 bytes slice, so that we can use the payload to do read index.
fn binary_id(&self) -> &[u8] {
self.id.as_bytes()
}

fn push_command(&mut self, req: RaftCmdRequest, cb: Callback) {
RAFT_READ_INDEX_PENDING_COUNT.inc();
self.cmds.push((req, cb));
}

fn with_command(
id: Uuid,
req: RaftCmdRequest,
cb: Callback,
renew_lease_time: Timespec,
) -> Self {
RAFT_READ_INDEX_PENDING_COUNT.inc();
let mut cmds = MustConsumeVec::with_capacity("callback of index read", 1);
cmds.push((req, cb));
ReadIndexRequest {
id,
cmds,
renew_lease_time,
read_index: None,
}
}
}

impl Drop for ReadIndexRequest {
fn drop(&mut self) {
let dur = (monotonic_raw_now() - self.renew_lease_time)
.to_std()
.unwrap();
RAFT_READ_INDEX_PENDING_DURATION.observe(duration_to_sec(dur));
}
}

#[derive(Default)]
struct ReadIndexQueue {
reads: VecDeque<ReadIndexRequest>,
ready_cnt: usize,
}

impl ReadIndexQueue {
fn clear_uncommitted(&mut self, term: u64) {
for mut read in self.reads.drain(self.ready_cnt..) {
RAFT_READ_INDEX_PENDING_COUNT.sub(read.cmds.len() as i64);
for (_, cb) in read.cmds.drain(..) {
apply::notify_stale_req(term, cb);
}
}
}

/// update the read index of the requests that before the specified id.
fn advance(&mut self, id: &[u8], read_index: u64) {
if let Some(i) = self.reads.iter().position(|x| x.binary_id() == id) {
for pos in 0..=i {
let req = &mut self.reads[pos];
let index = req.read_index.get_or_insert(read_index);
if *index > read_index {
*index = read_index;
}
}
if self.ready_cnt < i + 1 {
self.ready_cnt = i + 1;
}
} else {
error!(
"cannot find corresponding read from pending reads";
"id"=>?id, "read-index" =>read_index,
);
}
}

fn gc(&mut self) {
if self.reads.capacity() > SHRINK_CACHE_CAPACITY && self.reads.len() < SHRINK_CACHE_CAPACITY
{
self.reads.shrink_to_fit();
}
}
}

/// The returned states of the peer after checking whether it is stale
#[derive(Debug, PartialEq, Eq)]
pub enum StaleState {
Expand Down Expand Up @@ -574,12 +484,7 @@ impl Peer {
}
}

for mut read in self.pending_reads.reads.drain(..) {
RAFT_READ_INDEX_PENDING_COUNT.sub(read.cmds.len() as i64);
for (_, cb) in read.cmds.drain(..) {
apply::notify_req_region_removed(region.get_id(), cb);
}
}
self.pending_reads.clear_all(Some(region.get_id()));

for proposal in self.apply_proposals.drain(..) {
apply::notify_req_region_removed(region.get_id(), proposal.cb);
Expand Down Expand Up @@ -1384,78 +1289,80 @@ impl Peer {
self.proposals.gc();
}

fn response_read<T, C>(
&self,
read: &mut ReadIndexRequest,
ctx: &mut PollContext<T, C>,
replica_read: bool,
) {
debug!(
"handle reads with a read index";
"request_id" => ?read.binary_id(),
"region_id" => self.region_id,
"peer_id" => self.peer.get_id(),
);
RAFT_READ_INDEX_PENDING_COUNT.sub(read.cmds.len() as i64);
for (req, cb) in read.cmds.drain(..) {
if !replica_read {
cb.invoke_read(self.handle_read(ctx, req, true, read.read_index));
continue;
}
if req.get_header().get_replica_read() {
// We should check epoch since the range could be changed.
cb.invoke_read(self.handle_read(ctx, req, true, read.read_index));
} else {
// The request could be proposed when the peer was leader.
// TODO: figure out that it's necessary to notify stale or not.
let term = self.term();
apply::notify_stale_req(term, cb);
}
}
}

/// Responses to the ready read index request on the replica, the replica is not a leader.
fn post_pending_read_index_on_replica<T, C>(&mut self, ctx: &mut PollContext<T, C>) {
if self.pending_reads.ready_cnt > 0 {
for _ in 0..self.pending_reads.ready_cnt {
let mut read = self.pending_reads.reads.pop_front().unwrap();
RAFT_READ_INDEX_PENDING_COUNT.sub(read.cmds.len() as i64);
assert!(read.read_index.is_some());

let is_read_index_request = read.cmds.len() == 1
&& read.cmds[0].0.get_requests().len() == 1
&& read.cmds[0].0.get_requests()[0].get_cmd_type() == CmdType::ReadIndex;

let term = self.term();
if is_read_index_request {
for (req, cb) in read.cmds.drain(..) {
cb.invoke_read(self.handle_read(ctx, req, true, read.read_index));
}
self.pending_reads.ready_cnt -= 1;
} else if self.ready_to_handle_unsafe_replica_read(read.read_index.unwrap()) {
for (req, cb) in read.cmds.drain(..) {
// We should check epoch since the range could be changed
if req.get_header().get_replica_read() {
cb.invoke_read(self.handle_read(ctx, req, true, read.read_index));
} else {
apply::notify_stale_req(term, cb);
}
}
self.pending_reads.ready_cnt -= 1;
} else {
self.pending_reads.reads.push_front(read);
}
while let Some(mut read) = self.pending_reads.pop_front() {
assert!(read.read_index.is_some());
let is_read_index_request = read.cmds.len() == 1
&& read.cmds[0].0.get_requests().len() == 1
&& read.cmds[0].0.get_requests()[0].get_cmd_type() == CmdType::ReadIndex;

if is_read_index_request {
self.response_read(&mut read, ctx, false);
} else if self.ready_to_handle_unsafe_replica_read(read.read_index.unwrap()) {
self.response_read(&mut read, ctx, true);
} else {
// TODO: `ReadIndex` requests could be blocked.
self.pending_reads.push_front(read);
break;
}
}
}

fn apply_reads<T, C>(&mut self, ctx: &mut PollContext<T, C>, ready: &Ready) {
let mut propose_time = None;
let states = ready.read_states().iter().map(|state| {
let uuid = Uuid::from_slice(state.request_ctx.as_slice()).unwrap();
(uuid, state.index)
});

// The follower may lost `ReadIndexResp`, so the pending_reads does not
// guarantee the orders are consistent with read_states. `advance` will
// update the `read_index` of read request that before this successful
// `ready`.
if !self.is_leader() {
// NOTE: there could still be some read requests following, which will be cleared in
// `clear_uncommitted` later.
for state in &ready.read_states {
self.pending_reads
.advance(state.request_ctx.as_slice(), state.index);
}
// NOTE: there could still be some pending reads proposed by the peer when it was
// leader. They will be cleared in `clear_uncommitted` later in the function.
self.pending_reads.advance_replica_reads(states);
self.post_pending_read_index_on_replica(ctx);
} else if self.ready_to_handle_read() {
for state in &ready.read_states {
let mut read = self.pending_reads.reads.pop_front().unwrap();
assert_eq!(state.request_ctx.as_slice(), read.binary_id());
RAFT_READ_INDEX_PENDING_COUNT.sub(read.cmds.len() as i64);
debug!("handle reads with a read index";
"request_id" => ?read.binary_id(),
"region_id" => self.region_id,
"peer_id" => self.peer.get_id(),
);
for (req, cb) in read.cmds.drain(..) {
cb.invoke_read(self.handle_read(ctx, req, true, Some(state.index)));
}
for (uuid, index) in states {
let mut read = self.pending_reads.advance_leader_read_and_pop(uuid, index);
propose_time = Some(read.renew_lease_time);
self.response_read(&mut read, ctx, false);
}
} else {
for state in &ready.read_states {
let read = &mut self.pending_reads.reads[self.pending_reads.ready_cnt];
assert_eq!(state.request_ctx.as_slice(), read.binary_id());
self.pending_reads.ready_cnt += 1;
read.read_index = Some(state.index);
propose_time = Some(read.renew_lease_time);
}
propose_time = self.pending_reads.advance_leader_reads(states);
}

// Note that only after handle read_states can we identify what requests are
Expand Down Expand Up @@ -1507,21 +1414,9 @@ impl Peer {
}
if !self.is_leader() {
self.post_pending_read_index_on_replica(ctx)
} else {
if self.pending_reads.ready_cnt > 0 && self.ready_to_handle_read() {
for _ in 0..self.pending_reads.ready_cnt {
let mut read = self.pending_reads.reads.pop_front().unwrap();
debug!("handle reads with a read index";
"request_id" => ?read.binary_id(),
"region_id" => self.region_id,
"peer_id" => self.peer.get_id(),
);
RAFT_READ_INDEX_PENDING_COUNT.sub(read.cmds.len() as i64);
for (req, cb) in read.cmds.drain(..) {
cb.invoke_read(self.handle_read(ctx, req, true, read.read_index));
}
}
self.pending_reads.ready_cnt = 0;
} else if self.ready_to_handle_read() {
while let Some(mut read) = self.pending_reads.pop_front() {
self.response_read(&mut read, ctx, false);
}
}
self.pending_reads.gc();
Expand Down Expand Up @@ -1942,7 +1837,7 @@ impl Peer {
// before or after the previous read index, and the lease can be renewed when get
// heartbeat responses.
LeaseState::Valid | LeaseState::Expired => {
if let Some(read) = self.pending_reads.reads.back_mut() {
if let Some(read) = self.pending_reads.back_mut() {
let max_lease = poll_ctx.cfg.raft_store_max_leader_lease();
if read.renew_lease_time + max_lease > renew_lease_time {
read.push_command(req, cb);
Expand Down Expand Up @@ -1978,11 +1873,6 @@ impl Peer {

let id = Uuid::new_v4();
self.raft_group.read_index(id.as_bytes().to_vec());
debug!("request to get a read index";
"request_id" => ?id,
"region_id" => self.region_id,
"peer_id" => self.peer.get_id(),
);

let pending_read_count = self.raft_group.raft.pending_read_count();
let ready_read_count = self.raft_group.raft.ready_read_count();
Expand All @@ -1996,8 +1886,16 @@ impl Peer {
return false;
}

let read_proposal = ReadIndexRequest::with_command(id, req, cb, renew_lease_time);
self.pending_reads.reads.push_back(read_proposal);
let read = ReadIndexRequest::with_command(id, req, cb, renew_lease_time);
self.pending_reads.push_back(read, self.is_leader());

debug!(
"request to get a read index";
"request_id" => ?id,
"region_id" => self.region_id,
"peer_id" => self.peer.get_id(),
"is_leader" => self.is_leader(),
);

// TimeoutNow has been sent out, so we need to propose explicitly to
// update leader lease.
Expand Down Expand Up @@ -2259,7 +2157,7 @@ impl Peer {
}

fn handle_read<T, C>(
&mut self,
&self,
ctx: &mut PollContext<T, C>,
req: RaftCmdRequest,
check_epoch: bool,
Expand All @@ -2282,10 +2180,7 @@ impl Peer {

pub fn stop(&mut self) {
self.mut_store().cancel_applying_snap();
for mut read in self.pending_reads.reads.drain(..) {
RAFT_READ_INDEX_PENDING_COUNT.sub(read.cmds.len() as i64);
read.cmds.clear();
}
self.pending_reads.clear_all(None);
}
}

Expand Down

0 comments on commit 4aecef1

Please sign in to comment.