Skip to content

Commit

Permalink
Merge pull request tikv#18 from SpadeA-Tang/test-fix
Browse files Browse the repository at this point in the history
Bug fix
  • Loading branch information
SpadeA-Tang committed Nov 22, 2022
2 parents e444bb4 + bdb9a13 commit c52be47
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 36 deletions.
9 changes: 1 addition & 8 deletions components/raftstore-v2/src/operation/command/admin/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,6 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
regions: Vec<Region>,
) {
fail_point!("on_split", self.peer().get_store_id() == 3, |_| {});

info!(
self.logger,
"on_ready_split_region";
"regions" => ?regions,
);

let derived = &regions[derived_index];
let derived_epoch = derived.get_region_epoch().clone();
let region_id = derived.get_id();
Expand Down Expand Up @@ -338,7 +331,7 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {

if self.is_leader() {
self.heartbeat_pd(store_ctx);

// Notify pd immediately to let it update the region meta.
info!(
self.logger,
Expand Down
67 changes: 39 additions & 28 deletions components/raftstore-v2/src/operation/query/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,41 +110,52 @@ where
req: RaftCmdRequest,
) -> std::result::Result<Option<RegionSnapshot<E::Snapshot, Prefix>>, RaftCmdResponse> {
match self.pre_propose_raft_command(&req) {
Ok(Some((mut delegate, policy))) => match policy {
RequestPolicy::ReadLocal => {
let region = Arc::clone(&delegate.region);
let snap = RegionSnapshot::from_snapshot(delegate.get_snapshot(&None), region);
// Ensures the snapshot is acquired before getting the time
atomic::fence(atomic::Ordering::Release);
let snapshot_ts = monotonic_raw_now();

if !delegate.is_in_leader_lease(snapshot_ts) {
return Ok(None);
Ok(Some((mut delegate, policy))) => {
let mut snap = match policy {
RequestPolicy::ReadLocal => {
let region = Arc::clone(&delegate.region);
let snap =
RegionSnapshot::from_snapshot(delegate.get_snapshot(&None), region);
// Ensures the snapshot is acquired before getting the time
atomic::fence(atomic::Ordering::Release);
let snapshot_ts = monotonic_raw_now();

if !delegate.is_in_leader_lease(snapshot_ts) {
return Ok(None);
}

TLS_LOCAL_READ_METRICS
.with(|m| m.borrow_mut().local_executed_requests.inc());

// Try renew lease in advance
self.maybe_renew_lease_in_advance(&delegate, &req, snapshot_ts);
snap
}
RequestPolicy::StaleRead => {
let read_ts = decode_u64(&mut req.get_header().get_flag_data()).unwrap();
delegate.check_stale_read_safe(read_ts)?;

TLS_LOCAL_READ_METRICS.with(|m| m.borrow_mut().local_executed_requests.inc());
let region = Arc::clone(&delegate.region);
let snap =
RegionSnapshot::from_snapshot(delegate.get_snapshot(&None), region);

// Try renew lease in advance
self.maybe_renew_lease_in_advance(&delegate, &req, snapshot_ts);
Ok(Some(snap))
}
RequestPolicy::StaleRead => {
let read_ts = decode_u64(&mut req.get_header().get_flag_data()).unwrap();
delegate.check_stale_read_safe(read_ts)?;
TLS_LOCAL_READ_METRICS
.with(|m| m.borrow_mut().local_executed_requests.inc());

let region = Arc::clone(&delegate.region);
let snap = RegionSnapshot::from_snapshot(delegate.get_snapshot(&None), region);
delegate.check_stale_read_safe(read_ts)?;

TLS_LOCAL_READ_METRICS.with(|m| m.borrow_mut().local_executed_requests.inc());
TLS_LOCAL_READ_METRICS
.with(|m| m.borrow_mut().local_executed_stale_read_requests.inc());
snap
}
_ => unreachable!(),
};

delegate.check_stale_read_safe(read_ts)?;
snap.txn_ext = Some(delegate.txn_ext.clone());
snap.bucket_meta = delegate.bucket_meta.clone();

TLS_LOCAL_READ_METRICS
.with(|m| m.borrow_mut().local_executed_stale_read_requests.inc());
Ok(Some(snap))
}
_ => unreachable!(),
},
Ok(Some(snap))
}
Ok(None) => Ok(None),
Err(e) => {
let mut response = cmd_resp::new_error(e);
Expand Down

0 comments on commit c52be47

Please sign in to comment.