Skip to content

Commit

Permalink
Merge the tikv#13541
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Oct 11, 2022
2 parents 5f3e24f + c22d186 commit e679a3f
Show file tree
Hide file tree
Showing 10 changed files with 531 additions and 192 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ procinfo = { git = "https://github.com/tikv/procinfo-rs", rev = "6599eb9dca74229
# After the PR to kvproto is merged, remember to comment this out and run `cargo update -p kvproto`.
[patch.'https://github.com/pingcap/kvproto']
# kvproto = { git = "https://github.com/your_github_id/kvproto", branch="your_branch" }
kvproto = { git = "https://github.com/JmPotato/kvproto", branch = "flashback_2pc" }

[workspace]
# See https://github.com/rust-lang/rfcs/blob/master/text/2957-cargo-features2.md
Expand Down
151 changes: 147 additions & 4 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,9 @@ use crate::{
msg::{Callback, ErrorCallback, PeerMsg, ReadResponse, SignificantMsg},
peer::Peer,
peer_storage::{write_initial_apply_state, write_peer_state},
util,
util::{
admin_cmd_epoch_lookup, check_region_epoch, compare_region_epoch, ChangePeerI,
ConfChangeKind, KeysInfoFormatter, LatencyInspector,
self, admin_cmd_epoch_lookup, check_flashback_state, check_region_epoch,
compare_region_epoch, ChangePeerI, ConfChangeKind, KeysInfoFormatter, LatencyInspector,
},
Config, RegionSnapshot, RegionTask, WriteCallback,
},
Expand Down Expand Up @@ -277,6 +276,9 @@ pub enum ExecResult<S> {
TransferLeader {
term: u64,
},
SetFlashbackState {
region: Region,
},
}

/// The possible returned value when applying logs.
Expand Down Expand Up @@ -1342,6 +1344,12 @@ where
"peer_id" => self.id(),
"err" => ?e
),
Error::FlashbackInProgress(..) => debug!(
"flashback is in process";
"region_id" => self.region_id(),
"peer_id" => self.id(),
"err" => ?e
),
_ => error!(?e;
"execute raft command";
"region_id" => self.region_id(),
Expand All @@ -1368,6 +1376,7 @@ where
ExecResult::CommitMerge { ref region, .. } => (Some(region.clone()), None),
ExecResult::RollbackMerge { ref region, .. } => (Some(region.clone()), None),
ExecResult::IngestSst { ref ssts } => (None, Some(ssts.clone())),
ExecResult::SetFlashbackState { region } => (Some(region.clone()), None),
_ => (None, None),
},
_ => (None, None),
Expand Down Expand Up @@ -1432,6 +1441,9 @@ where
self.region = region.clone();
self.is_merging = false;
}
ExecResult::SetFlashbackState { ref region } => {
self.region = region.clone();
}
}
}
if let Some(epoch) = origin_epoch {
Expand Down Expand Up @@ -1510,6 +1522,7 @@ where
let include_region =
req.get_header().get_region_epoch().get_version() >= self.last_merge_version;
check_region_epoch(req, &self.region, include_region)?;
check_flashback_state(req, &self.region)?;
if req.has_admin_request() {
self.exec_admin_cmd(ctx, req)
} else {
Expand Down Expand Up @@ -1548,6 +1561,9 @@ where
AdminCmdType::PrepareMerge => self.exec_prepare_merge(ctx, request),
AdminCmdType::CommitMerge => self.exec_commit_merge(ctx, request),
AdminCmdType::RollbackMerge => self.exec_rollback_merge(ctx, request),
AdminCmdType::PrepareFlashback | AdminCmdType::FinishFlashback => {
self.set_flashback_state_persist(ctx, request)
}
AdminCmdType::InvalidAdmin => Err(box_err!("unsupported admin command type")),
}?;
response.set_cmd_type(cmd_type);
Expand Down Expand Up @@ -2792,6 +2808,41 @@ where
))
}

fn set_flashback_state_persist(
&self,
ctx: &mut ApplyContext<EK>,
req: &AdminRequest,
) -> Result<(AdminResponse, ApplyResult<EK::Snapshot>)> {
let region_id = self.region_id();
let region_state_key = keys::region_state_key(region_id);
let mut old_state = match ctx
.engine
.get_msg_cf::<RegionLocalState>(CF_RAFT, &region_state_key)
{
Ok(Some(s)) => s,
_ => {
return Err(box_err!("failed to get region state of {}", region_id));
}
};
let is_in_flashback = req.get_cmd_type() == AdminCmdType::PrepareFlashback;
old_state.mut_region().set_is_in_flashback(is_in_flashback);
let mut region = self.region.clone();
region.set_is_in_flashback(is_in_flashback);
ctx.kv_wb_mut()
.put_msg_cf(CF_RAFT, &keys::region_state_key(region_id), &old_state)
.unwrap_or_else(|e| {
error!(
"{} failed to change flashback state to {:?} for region {}: {:?}",
self.tag, req, region_id, e
)
});

Ok((
AdminResponse::default(),
ApplyResult::Res(ExecResult::SetFlashbackState { region }),
))
}

fn exec_compact_log(
&mut self,
req: &AdminRequest,
Expand Down Expand Up @@ -4439,7 +4490,7 @@ mod tests {

use engine_panic::PanicEngine;
use engine_test::kv::{new_engine, KvTestEngine, KvTestSnapshot};
use engine_traits::{Peekable as PeekableTrait, WriteBatchExt};
use engine_traits::{Peekable as PeekableTrait, SyncMutable, WriteBatchExt};
use kvproto::{
kvrpcpb::ApiVersion,
metapb::{self, RegionEpoch},
Expand All @@ -4454,6 +4505,7 @@ mod tests {
store::{new_learner_peer, new_peer},
worker::dummy_scheduler,
};
use txn_types::WriteBatchFlags;
use uuid::Uuid;

use super::*;
Expand Down Expand Up @@ -5110,6 +5162,7 @@ mod tests {
true
}
AdminCmdType::BatchSplit => true,
AdminCmdType::PrepareFlashback | AdminCmdType::FinishFlashback => true,
_ => false,
}
}
Expand Down Expand Up @@ -6516,4 +6569,94 @@ mod tests {
});
res.unwrap_err();
}

#[test]
fn flashback_need_to_be_applied() {
let (_path, engine) = create_tmp_engine("flashback_need_to_be_applied");
let (_, importer) = create_tmp_importer("flashback_need_to_be_applied");
let mut host = CoprocessorHost::<KvTestEngine>::default();
host.registry
.register_query_observer(1, BoxQueryObserver::new(ApplyObserver::default()));

let (tx, rx) = mpsc::channel();
let (region_scheduler, _) = dummy_scheduler();
let sender = Box::new(TestNotifier { tx });
let cfg = Arc::new(VersionTrack::new(Config::default()));
let (router, mut system) = create_apply_batch_system(&cfg.value());
let pending_create_peers = Arc::new(Mutex::new(HashMap::default()));
let builder = super::Builder::<KvTestEngine> {
tag: "flashback_need_to_be_applied".to_owned(),
cfg,
sender,
region_scheduler,
coprocessor_host: host,
importer,
engine: engine.clone(),
router: router.clone(),
store_id: 1,
pending_create_peers,
};
system.spawn("flashback_need_to_be_applied".to_owned(), builder);

let peer_id = 3;
let mut reg = Registration {
id: peer_id,
..Default::default()
};
reg.region.set_id(1);
reg.region.mut_peers().push(new_peer(2, 3));
reg.region.mut_region_epoch().set_conf_ver(1);
reg.region.mut_region_epoch().set_version(3);
reg.region.set_is_in_flashback(true);
router.schedule_task(1, Msg::Registration(reg));

let (capture_tx, capture_rx) = mpsc::channel();
let mut region_state = RegionLocalState::default();
region_state.mut_region().set_is_in_flashback(false);
let region_state_key = keys::region_state_key(1);
engine
.put_msg_cf(CF_RAFT, &region_state_key, &region_state)
.unwrap();
// Check for not flashback request.
let mut cmd = AdminRequest::default();
cmd.set_cmd_type(AdminCmdType::TransferLeader);
let mut flashback_req = EntryBuilder::new(1, 1).epoch(1, 3);
flashback_req.req.set_admin_request(cmd.clone());
router.schedule_task(
1,
Msg::apply(apply(
peer_id,
1,
1,
vec![flashback_req.build()],
vec![cb(1, 1, capture_tx.clone())],
)),
);
let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap();
assert!(resp.get_header().get_error().has_flashback_in_progress());
// Check for flashback request.
cmd.set_cmd_type(AdminCmdType::PrepareFlashback);
region_state.mut_region().set_is_in_flashback(false);
let mut flashback_req = EntryBuilder::new(2, 2).epoch(1, 3);
flashback_req.req.set_admin_request(cmd.clone());
flashback_req
.req
.mut_header()
.set_flags(WriteBatchFlags::FLASHBACK.bits());
router.schedule_task(
1,
Msg::apply(apply(
peer_id,
1,
2,
vec![flashback_req.build()],
vec![cb(2, 2, capture_tx)],
)),
);
let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap();
assert!(!resp.get_header().has_error(), "{:?}", resp);

rx.recv_timeout(Duration::from_millis(500)).unwrap();
system.shutdown();
}
}
65 changes: 18 additions & 47 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,10 @@ use crate::{
metrics::*,
msg::{Callback, ExtCallback, InspectedRaftMessage},
peer::{
ConsistencyState, FlashbackState, ForceLeaderState, Peer, PersistSnapshotResult,
SnapshotRecoveryState, SnapshotRecoveryWaitApplySyncer, StaleState,
UnsafeRecoveryExecutePlanSyncer, UnsafeRecoveryFillOutReportSyncer,
UnsafeRecoveryForceLeaderSyncer, UnsafeRecoveryState, UnsafeRecoveryWaitApplySyncer,
TRANSFER_LEADER_COMMAND_REPLY_CTX,
ConsistencyState, ForceLeaderState, Peer, PersistSnapshotResult, SnapshotRecoveryState,
SnapshotRecoveryWaitApplySyncer, StaleState, UnsafeRecoveryExecutePlanSyncer,
UnsafeRecoveryFillOutReportSyncer, UnsafeRecoveryForceLeaderSyncer,
UnsafeRecoveryState, UnsafeRecoveryWaitApplySyncer, TRANSFER_LEADER_COMMAND_REPLY_CTX,
},
region_meta::RegionMeta,
transport::Transport,
Expand Down Expand Up @@ -989,40 +988,8 @@ where

// Check if the region is in the flashback state.
fn on_check_flashback(&mut self, ch: Sender<bool>) {
// TODO: check the persistent state first.
ch.send(self.fsm.peer.flashback_state.is_some()).unwrap();
}

// Call msg PrepareFlashback to stop the scheduling and RW tasks.
// Once called, it will wait for the channel's notification in FlashbackState to
// finish. We place a flag in the request, which is checked when the
// pre_propose_raft_command is called. Stopping tasks is done by applying
// the flashback-only command in this way, But for RW local reads which need
// to be considered, we let the leader lease to None to ensure that local reads
// are not executed.
fn on_prepare_flashback(&mut self, ch: Sender<bool>) {
info!(
"prepare flashback";
"region_id" => self.region().get_id(),
"peer_id" => self.fsm.peer.peer_id(),
);
if self.fsm.peer.flashback_state.is_some() {
ch.send(false).unwrap();
return;
}
self.fsm.peer.flashback_state = Some(FlashbackState::new(ch));
// Let the leader lease to None to ensure that local reads are not executed.
self.fsm.peer.leader_lease_mut().expire_remote_lease();
self.fsm.peer.maybe_finish_flashback_wait_apply();
}

fn on_finish_flashback(&mut self) {
info!(
"finish flashback";
"region_id" => self.region().get_id(),
"peer_id" => self.fsm.peer.peer_id(),
);
self.fsm.peer.flashback_state.take();
// TODO: maybe should check the persistent state also.
ch.send(self.fsm.peer.is_in_flashback).unwrap();
}

fn on_check_pending_admin(&mut self, ch: UnboundedSender<CheckAdminResponse>) {
Expand Down Expand Up @@ -1469,14 +1436,12 @@ where
SignificantMsg::UnsafeRecoveryFillOutReport(syncer) => {
self.on_unsafe_recovery_fill_out_report(syncer)
}
SignificantMsg::CheckFlashback(ch) => self.on_check_flashback(ch),
SignificantMsg::PrepareFlashback(ch) => self.on_prepare_flashback(ch),
SignificantMsg::FinishFlashback => self.on_finish_flashback(),
// for snapshot recovery (safe recovery)
SignificantMsg::SnapshotRecoveryWaitApply(syncer) => {
self.on_snapshot_recovery_wait_apply(syncer)
}
SignificantMsg::CheckPendingAdmin(ch) => self.on_check_pending_admin(ch),
SignificantMsg::CheckFlashback(ch) => self.on_check_flashback(ch),
}
}

Expand Down Expand Up @@ -2314,10 +2279,6 @@ where
if self.fsm.peer.unsafe_recovery_state.is_some() {
self.check_unsafe_recovery_state();
}
// TODO: combine recovery state and flashback state as a wait apply queue.
if self.fsm.peer.flashback_state.is_some() {
self.fsm.peer.maybe_finish_flashback_wait_apply();
}

if self.fsm.peer.snapshot_recovery_state.is_some() {
self.fsm
Expand Down Expand Up @@ -4792,6 +4753,9 @@ where
}
ExecResult::IngestSst { ssts } => self.on_ingest_sst_result(ssts),
ExecResult::TransferLeader { term } => self.on_transfer_leader(term),
ExecResult::SetFlashbackState { region } => {
self.on_flashback_memory_set(region.get_is_in_flashback())
}
}
}

Expand Down Expand Up @@ -4899,7 +4863,7 @@ where
let region_id = self.region_id();
// When in the flashback state, we should not allow any other request to be
// proposed.
if self.fsm.peer.flashback_state.is_some() {
if self.fsm.peer.is_in_flashback {
self.ctx.raft_metrics.invalid_proposal.flashback.inc();
let flags = WriteBatchFlags::from_bits_truncate(msg.get_header().get_flags());
if !flags.contains(WriteBatchFlags::FLASHBACK) {
Expand Down Expand Up @@ -6134,6 +6098,13 @@ where
self.fsm.has_ready = true;
}

fn on_flashback_memory_set(&mut self, is_in_flashback: bool) {
// Set flashback memory
self.fsm.peer.is_in_flashback = is_in_flashback;
// Let the leader lease to None to ensure that local reads are not executed.
self.fsm.peer.leader_lease_mut().expire_remote_lease();
}

/// Verify and store the hash to state. return true means the hash has been
/// stored successfully.
// TODO: Consider context in the function.
Expand Down
7 changes: 1 addition & 6 deletions components/raftstore/src/store/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,14 +513,9 @@ where
UnsafeRecoveryWaitApply(UnsafeRecoveryWaitApplySyncer),
UnsafeRecoveryFillOutReport(UnsafeRecoveryFillOutReportSyncer),
SnapshotRecoveryWaitApply(SnapshotRecoveryWaitApplySyncer),
CheckPendingAdmin(UnboundedSender<CheckAdminResponse>),
// Check if the region is in the flashback state.
CheckFlashback(Sender<bool>),
// Prepare the region for the later flashback,
// i.e, put the region in the flashback state.
PrepareFlashback(Sender<bool>),
// Finish the flashback, recover the region from the flashback state.
FinishFlashback,
CheckPendingAdmin(UnboundedSender<CheckAdminResponse>),
}

/// Message that will be sent to a peer.
Expand Down

0 comments on commit e679a3f

Please sign in to comment.