Skip to content

Commit

Permalink
gc_worker: report error when registering conflict lock observer (tikv…
Browse files Browse the repository at this point in the history
…#8209)

Signed-off-by: youjiali1995 <zlwgx1023@gmail.com>
  • Loading branch information
youjiali1995 committed Jul 14, 2020
1 parent 3d6dbd7 commit 655c505
Showing 1 changed file with 57 additions and 35 deletions.
92 changes: 57 additions & 35 deletions src/server/gc_worker/applied_lock_collector.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use keys::origin_key;
use std::cmp::Ordering::*;
use std::fmt::{self, Debug, Display};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -34,6 +35,20 @@ struct LockObserverState {
is_clean: AtomicBool,
}

impl LockObserverState {
fn load_max_ts(&self) -> TimeStamp {
self.max_ts.load(Ordering::Acquire).into()
}

fn is_clean(&self) -> bool {
self.is_clean.load(Ordering::Acquire)
}

fn mark_dirty(&self) {
self.is_clean.store(false, Ordering::Release);
}
}

pub type Callback<T> = Box<dyn FnOnce(Result<T>) + Send>;

enum LockCollectorTask {
Expand Down Expand Up @@ -117,34 +132,22 @@ impl LockObserver {
error!("lock observer failed to send locks because collector is stopped");
}
Err(ScheduleError::Full(_)) => {
self.mark_dirty();
self.state.mark_dirty();
warn!("cannot collect all applied lock because channel is full");
}
}
}

fn mark_dirty(&self) {
self.state.is_clean.store(false, Ordering::Release);
}

fn is_clean(&self) -> bool {
self.state.is_clean.load(Ordering::Acquire)
}

fn load_max_ts(&self) -> TimeStamp {
self.state.max_ts.load(Ordering::Acquire).into()
}
}

impl Coprocessor for LockObserver {}

impl QueryObserver for LockObserver {
fn pre_apply_query(&self, _: &mut ObserverContext<'_>, requests: &[RaftRequest]) {
if !self.is_clean() {
if !self.state.is_clean() {
return;
}

let max_ts = self.load_max_ts();
let max_ts = self.state.load_max_ts();
if max_ts.is_zero() {
return;
}
Expand All @@ -168,7 +171,7 @@ impl QueryObserver for LockObserver {
"value" => hex::encode_upper(put_request.get_value()),
"err" => ?e
);
self.mark_dirty();
self.state.mark_dirty();
return;
}
};
Expand All @@ -195,11 +198,11 @@ impl ApplySnapshotObserver for LockObserver {
return;
}

if !self.is_clean() {
if !self.state.is_clean() {
return;
}

let max_ts = self.load_max_ts();
let max_ts = self.state.load_max_ts();
if max_ts.is_zero() {
return;
}
Expand All @@ -225,7 +228,7 @@ impl ApplySnapshotObserver for LockObserver {
"cannot parse lock";
"err" => ?e
);
self.mark_dirty()
self.state.mark_dirty()
}
Ok(l) => self.send(l),
}
Expand All @@ -234,7 +237,7 @@ impl ApplySnapshotObserver for LockObserver {
fn pre_apply_sst(&self, _: &mut ObserverContext<'_>, cf: CfName, _path: &str) {
if cf == CF_LOCK {
error!("cannot collect all applied lock: snapshot of lock cf applied from sst file");
self.mark_dirty();
self.state.mark_dirty();
}
}
}
Expand Down Expand Up @@ -267,20 +270,29 @@ impl LockCollectorRunner {
}

fn start_collecting(&mut self, max_ts: TimeStamp) -> Result<()> {
if self.observer_state.max_ts.load(Ordering::Acquire) >= max_ts.into_inner() {
// Stale request. Ignore it.
return Ok(());
let curr_max_ts: TimeStamp = self.observer_state.max_ts.load(Ordering::Acquire).into();
match max_ts.cmp(&self.observer_state.load_max_ts()) {
Less => Err(box_err!(
"collecting locks with a greater max_ts: {}",
curr_max_ts
)),
Equal => {
// Stale request. Ignore it.
Ok(())
}
Greater => {
info!("start collecting locks"; "max_ts" => max_ts);
self.collected_locks.clear();
// TODO: `is_clean` may be unexpectedly set to false here, if any error happens on a
// previous observing. It need to be solved, although it's very unlikely to happen and
// doesn't affect correctness of data.
self.observer_state.is_clean.store(true, Ordering::Release);
self.observer_state
.max_ts
.store(max_ts.into_inner(), Ordering::Release);
Ok(())
}
}
info!("start collecting locks"; "max_ts" => max_ts);
self.collected_locks.clear();
// TODO: `is_clean` may be unexpectedly set to false here, if any error happens on a
// previous observing. It need to be solved, although it's very unlikely to happen and
// doesn't affect correctness of data.
self.observer_state.is_clean.store(true, Ordering::Release);
self.observer_state
.max_ts
.store(max_ts.into_inner(), Ordering::Release);
Ok(())
}

fn get_collected_locks(&mut self, max_ts: TimeStamp) -> Result<(Vec<LockInfo>, bool)> {
Expand Down Expand Up @@ -532,9 +544,12 @@ mod tests {
get_collected_locks(&c, 3).unwrap_err();
get_collected_locks(&c, 4).unwrap();
// Do not allow aborting previous observing with a smaller max_ts.
start_collecting(&c, 3).unwrap();
start_collecting(&c, 3).unwrap_err();
get_collected_locks(&c, 3).unwrap_err();
get_collected_locks(&c, 4).unwrap();
// Do not allow stoping observing with a different max_ts.
stop_collecting(&c, 3).unwrap_err();
stop_collecting(&c, 5).unwrap_err();
stop_collecting(&c, 4).unwrap();
}

Expand Down Expand Up @@ -587,6 +602,13 @@ mod tests {
(expected_result.clone(), true)
);

// When start collecting with the same max_ts again, shouldn't clean up the observer state.
start_collecting(&c, 100).unwrap();
assert_eq!(
get_collected_locks(&c, 100).unwrap(),
(expected_result.clone(), true)
);

// Only locks with ts <= 100 will be collected.
let req: Vec<_> = lock_kvs
.iter()
Expand Down Expand Up @@ -673,7 +695,7 @@ mod tests {
get_collected_locks(&c, 100).unwrap(),
(expected_locks.clone(), true)
);
start_collecting(&c, 90).unwrap();
start_collecting(&c, 90).unwrap_err();
assert_eq!(
get_collected_locks(&c, 100).unwrap(),
(expected_locks, true)
Expand Down

0 comments on commit 655c505

Please sign in to comment.