Skip to content

Commit

Permalink
lock_manager: more metrics (tikv#6392) (tikv#6422)
Browse files Browse the repository at this point in the history
Signed-off-by: youjiali1995 <zlwgx1023@gmail.com>
  • Loading branch information
youjiali1995 committed Jan 9, 2020
1 parent e5568d4 commit a06f9cd
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 28 deletions.
8 changes: 5 additions & 3 deletions src/server/lock_manager/deadlock.rs
Expand Up @@ -99,7 +99,7 @@ impl DetectTable {

/// Returns the key hash which causes deadlock.
pub fn detect(&mut self, txn_ts: u64, lock_ts: u64, lock_hash: u64) -> Option<u64> {
let _timer = DETECTOR_HISTOGRAM_VEC.detect.start_coarse_timer();
let _timer = DETECT_DURATION_HISTOGRAM.start_coarse_timer();
TASK_COUNTER_VEC.detect.inc();

self.now = Instant::now_coarse();
Expand Down Expand Up @@ -509,10 +509,12 @@ where
if self.inner.borrow().role != role {
match role {
Role::Leader => {
info!("became the leader of deadlock detector!"; "self_id" => self.store_id)
info!("became the leader of deadlock detector!"; "self_id" => self.store_id);
DETECTOR_LEADER_GAUGE.set(1);
}
Role::Follower => {
info!("changed from the leader of deadlock detector to follower!"; "self_id" => self.store_id)
info!("changed from the leader of deadlock detector to follower!"; "self_id" => self.store_id);
DETECTOR_LEADER_GAUGE.set(0);
}
}
}
Expand Down
30 changes: 20 additions & 10 deletions src/server/lock_manager/metrics.rs
Expand Up @@ -25,10 +25,10 @@ make_static_metric! {
},
}

pub struct DetectorHistogramVec: Histogram {
pub struct WaitTableStatusGauge: IntGauge {
"type" => {
monitor_membership_change,
detect,
locks,
txns,
},
}
}
Expand All @@ -51,15 +51,25 @@ lazy_static! {
pub static ref WAITER_LIFETIME_HISTOGRAM: Histogram = register_histogram!(
"tikv_lock_manager_waiter_lifetime_duration",
"Duration of waiters' lifetime in seconds",
exponential_buckets(0.0005, 2.0, 20).unwrap()
exponential_buckets(0.0005, 2.0, 20).unwrap() // 0.5ms ~ 524s
)
.unwrap();
pub static ref DETECT_DURATION_HISTOGRAM: Histogram = register_histogram!(
"tikv_lock_manager_detect_duration",
"Duration of handling detect requests",
exponential_buckets(0.0001, 2.0, 20).unwrap() // 0.1ms ~ 104s
)
.unwrap();
pub static ref WAIT_TABLE_STATUS_GAUGE: WaitTableStatusGauge = register_static_int_gauge_vec!(
WaitTableStatusGauge,
"tikv_lock_manager_wait_table_status",
"Status of the wait table",
&["type"]
)
.unwrap();
pub static ref DETECTOR_HISTOGRAM_VEC: DetectorHistogramVec = register_static_histogram_vec!(
DetectorHistogramVec,
"tikv_lock_manager_detector_histogram",
"Bucketed histogram of deadlock detector",
&["type"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
pub static ref DETECTOR_LEADER_GAUGE: IntGauge = register_int_gauge!(
"tikv_lock_manager_detector_leader_heartbeat",
"Heartbeat of the leader of the deadlock detector"
)
.unwrap();
}
2 changes: 1 addition & 1 deletion src/server/lock_manager/mod.rs
Expand Up @@ -331,7 +331,7 @@ mod tests {
assert!(lock_mgr.has_waiter());
assert_elapsed(
|| expect_key_is_locked(f.wait().unwrap().unwrap().pop().unwrap(), lock_info),
3000,
2900,
3200,
);
assert!(!lock_mgr.has_waiter());
Expand Down
40 changes: 26 additions & 14 deletions src/server/lock_manager/waiter_manager.rs
Expand Up @@ -282,18 +282,27 @@ impl WaitTable {

/// Returns the duplicated `Waiter` if there is.
fn add_waiter(&mut self, waiter: Waiter) -> Option<Waiter> {
let waiters = self.wait_table.entry(waiter.lock.hash).or_default();
let waiters = self.wait_table.entry(waiter.lock.hash).or_insert_with(|| {
WAIT_TABLE_STATUS_GAUGE.locks.inc();
Waiters::default()
});
let old_idx = waiters.iter().position(|w| w.start_ts == waiter.start_ts);
waiters.push(waiter);
let old = waiters.swap_remove(old_idx?);
self.waiter_count.fetch_sub(1, Ordering::SeqCst);
Some(old)
if let Some(old_idx) = old_idx {
let old = waiters.swap_remove(old_idx);
self.waiter_count.fetch_sub(1, Ordering::SeqCst);
Some(old)
} else {
WAIT_TABLE_STATUS_GAUGE.txns.inc();
None
}
// Here we don't increase waiter_count because it's already updated in LockManager::wait_for()
}

/// Removes all waiters waiting for the lock.
fn remove(&mut self, lock: Lock) {
self.wait_table.remove(&lock.hash);
WAIT_TABLE_STATUS_GAUGE.locks.dec();
}

fn remove_waiter(&mut self, lock: Lock, waiter_ts: u64) -> Option<Waiter> {
Expand All @@ -303,8 +312,9 @@ impl WaitTable {
.position(|waiter| waiter.start_ts == waiter_ts)?;
let waiter = waiters.swap_remove(idx);
self.waiter_count.fetch_sub(1, Ordering::SeqCst);
WAIT_TABLE_STATUS_GAUGE.txns.dec();
if waiters.is_empty() {
self.wait_table.remove(&lock.hash);
self.remove(lock);
}
Some(waiter)
}
Expand All @@ -323,6 +333,7 @@ impl WaitTable {
.0;
let oldest = waiters.swap_remove(oldest_idx);
self.waiter_count.fetch_sub(1, Ordering::SeqCst);
WAIT_TABLE_STATUS_GAUGE.txns.dec();
Some((oldest, waiters))
}

Expand Down Expand Up @@ -590,7 +601,7 @@ pub mod tests {
core.run(delay.map(|not_cancelled| assert!(not_cancelled)))
.unwrap()
},
100,
50,
200,
);

Expand All @@ -603,7 +614,7 @@ pub mod tests {
core.run(delay.map(|not_cancelled| assert!(not_cancelled)))
.unwrap()
},
30,
20,
100,
);

Expand All @@ -616,7 +627,7 @@ pub mod tests {
core.run(delay.map(|not_cancelled| assert!(not_cancelled)))
.unwrap()
},
100,
50,
200,
);

Expand Down Expand Up @@ -791,7 +802,7 @@ pub mod tests {
waiter.reset_timeout(Instant::now() + Duration::from_millis(100));
let (tx, rx) = mpsc::sync_channel(1);
let f = waiter.on_timeout(move || tx.send(1).unwrap());
assert_elapsed(|| core.run(f).unwrap(), 100, 200);
assert_elapsed(|| core.run(f).unwrap(), 50, 200);
rx.try_recv().unwrap();

// The timeout handler shouldn't be invoked after waiter has been notified.
Expand Down Expand Up @@ -889,6 +900,7 @@ pub mod tests {
waiter_count.fetch_add(1, Ordering::SeqCst);
wait_table.add_waiter(dummy_waiter(1, lock.ts, lock.hash));
assert_eq!(waiter_count.load(Ordering::SeqCst), 1);
// Remove the waiter.
wait_table.remove_waiter(lock, 1).unwrap();
assert_eq!(waiter_count.load(Ordering::SeqCst), 0);
// Removing a non-existed waiter shouldn't decrease waiter count.
Expand Down Expand Up @@ -960,7 +972,7 @@ pub mod tests {
scheduler.wait_for(waiter.start_ts, waiter.cb, waiter.pr, waiter.lock, 1000);
assert_elapsed(
|| expect_key_is_locked(f.wait().unwrap().unwrap().pop().unwrap(), lock_info),
1000,
900,
1200,
);

Expand All @@ -969,7 +981,7 @@ pub mod tests {
scheduler.wait_for(waiter.start_ts, waiter.cb, waiter.pr, waiter.lock, 100);
assert_elapsed(
|| expect_key_is_locked(f.wait().unwrap().unwrap().pop().unwrap(), lock_info),
100,
50,
300,
);

Expand All @@ -978,7 +990,7 @@ pub mod tests {
scheduler.wait_for(waiter.start_ts, waiter.cb, waiter.pr, waiter.lock, 3000);
assert_elapsed(
|| expect_key_is_locked(f.wait().unwrap().unwrap().pop().unwrap(), lock_info),
1000,
900,
1200,
);

Expand Down Expand Up @@ -1056,7 +1068,7 @@ pub mod tests {
lock_info.set_lock_version(lock.ts - 1);
assert_elapsed(
|| expect_write_conflict(f.wait().unwrap(), waiter_ts, lock_info, commit_ts - 1),
wake_up_delay_duration,
wake_up_delay_duration - 50,
wake_up_delay_duration + 200,
);

Expand Down Expand Up @@ -1128,7 +1140,7 @@ pub mod tests {
// The new waiter will be wake up after timeout.
assert_elapsed(
|| expect_key_is_locked(f2.wait().unwrap().unwrap().pop().unwrap(), lock_info2),
1000,
900,
1200,
);

Expand Down

0 comments on commit a06f9cd

Please sign in to comment.