diff --git a/src/server/lock_manager/deadlock.rs b/src/server/lock_manager/deadlock.rs index 24c0b8c92e6..d9cd468a99b 100644 --- a/src/server/lock_manager/deadlock.rs +++ b/src/server/lock_manager/deadlock.rs @@ -99,7 +99,7 @@ impl DetectTable { /// Returns the key hash which causes deadlock. pub fn detect(&mut self, txn_ts: u64, lock_ts: u64, lock_hash: u64) -> Option { - let _timer = DETECTOR_HISTOGRAM_VEC.detect.start_coarse_timer(); + let _timer = DETECT_DURATION_HISTOGRAM.start_coarse_timer(); TASK_COUNTER_VEC.detect.inc(); self.now = Instant::now_coarse(); @@ -509,10 +509,12 @@ where if self.inner.borrow().role != role { match role { Role::Leader => { - info!("became the leader of deadlock detector!"; "self_id" => self.store_id) + info!("became the leader of deadlock detector!"; "self_id" => self.store_id); + DETECTOR_LEADER_GAUGE.set(1); } Role::Follower => { - info!("changed from the leader of deadlock detector to follower!"; "self_id" => self.store_id) + info!("changed from the leader of deadlock detector to follower!"; "self_id" => self.store_id); + DETECTOR_LEADER_GAUGE.set(0); } } } diff --git a/src/server/lock_manager/metrics.rs b/src/server/lock_manager/metrics.rs index 7af235945f0..34df503aab8 100644 --- a/src/server/lock_manager/metrics.rs +++ b/src/server/lock_manager/metrics.rs @@ -25,10 +25,10 @@ make_static_metric! { }, } - pub struct DetectorHistogramVec: Histogram { + pub struct WaitTableStatusGauge: IntGauge { "type" => { - monitor_membership_change, - detect, + locks, + txns, }, } } @@ -51,15 +51,25 @@ lazy_static! { pub static ref WAITER_LIFETIME_HISTOGRAM: Histogram = register_histogram!( "tikv_lock_manager_waiter_lifetime_duration", "Duration of waiters' lifetime in seconds", - exponential_buckets(0.0005, 2.0, 20).unwrap() + exponential_buckets(0.0005, 2.0, 20).unwrap() // 0.5ms ~ 524s + ) + .unwrap(); + pub static ref DETECT_DURATION_HISTOGRAM: Histogram = register_histogram!( + "tikv_lock_manager_detect_duration", + "Duration of handling detect requests", + exponential_buckets(0.0001, 2.0, 20).unwrap() // 0.1ms ~ 104s + ) + .unwrap(); + pub static ref WAIT_TABLE_STATUS_GAUGE: WaitTableStatusGauge = register_static_int_gauge_vec!( + WaitTableStatusGauge, + "tikv_lock_manager_wait_table_status", + "Status of the wait table", + &["type"] ) .unwrap(); - pub static ref DETECTOR_HISTOGRAM_VEC: DetectorHistogramVec = register_static_histogram_vec!( - DetectorHistogramVec, - "tikv_lock_manager_detector_histogram", - "Bucketed histogram of deadlock detector", - &["type"], - exponential_buckets(0.0005, 2.0, 20).unwrap() + pub static ref DETECTOR_LEADER_GAUGE: IntGauge = register_int_gauge!( + "tikv_lock_manager_detector_leader_heartbeat", + "Heartbeat of the leader of the deadlock detector" ) .unwrap(); } diff --git a/src/server/lock_manager/mod.rs b/src/server/lock_manager/mod.rs index b1e8c792aea..d4b24793af6 100644 --- a/src/server/lock_manager/mod.rs +++ b/src/server/lock_manager/mod.rs @@ -331,7 +331,7 @@ mod tests { assert!(lock_mgr.has_waiter()); assert_elapsed( || expect_key_is_locked(f.wait().unwrap().unwrap().pop().unwrap(), lock_info), - 3000, + 2900, 3200, ); assert!(!lock_mgr.has_waiter()); diff --git a/src/server/lock_manager/waiter_manager.rs b/src/server/lock_manager/waiter_manager.rs index e4c44a3552e..0a25aca9854 100644 --- a/src/server/lock_manager/waiter_manager.rs +++ b/src/server/lock_manager/waiter_manager.rs @@ -282,18 +282,27 @@ impl WaitTable { /// Returns the duplicated `Waiter` if there is. fn add_waiter(&mut self, waiter: Waiter) -> Option { - let waiters = self.wait_table.entry(waiter.lock.hash).or_default(); + let waiters = self.wait_table.entry(waiter.lock.hash).or_insert_with(|| { + WAIT_TABLE_STATUS_GAUGE.locks.inc(); + Waiters::default() + }); let old_idx = waiters.iter().position(|w| w.start_ts == waiter.start_ts); waiters.push(waiter); - let old = waiters.swap_remove(old_idx?); - self.waiter_count.fetch_sub(1, Ordering::SeqCst); - Some(old) + if let Some(old_idx) = old_idx { + let old = waiters.swap_remove(old_idx); + self.waiter_count.fetch_sub(1, Ordering::SeqCst); + Some(old) + } else { + WAIT_TABLE_STATUS_GAUGE.txns.inc(); + None + } // Here we don't increase waiter_count because it's already updated in LockManager::wait_for() } /// Removes all waiters waiting for the lock. fn remove(&mut self, lock: Lock) { self.wait_table.remove(&lock.hash); + WAIT_TABLE_STATUS_GAUGE.locks.dec(); } fn remove_waiter(&mut self, lock: Lock, waiter_ts: u64) -> Option { @@ -303,8 +312,9 @@ impl WaitTable { .position(|waiter| waiter.start_ts == waiter_ts)?; let waiter = waiters.swap_remove(idx); self.waiter_count.fetch_sub(1, Ordering::SeqCst); + WAIT_TABLE_STATUS_GAUGE.txns.dec(); if waiters.is_empty() { - self.wait_table.remove(&lock.hash); + self.remove(lock); } Some(waiter) } @@ -323,6 +333,7 @@ impl WaitTable { .0; let oldest = waiters.swap_remove(oldest_idx); self.waiter_count.fetch_sub(1, Ordering::SeqCst); + WAIT_TABLE_STATUS_GAUGE.txns.dec(); Some((oldest, waiters)) } @@ -590,7 +601,7 @@ pub mod tests { core.run(delay.map(|not_cancelled| assert!(not_cancelled))) .unwrap() }, - 100, + 50, 200, ); @@ -603,7 +614,7 @@ pub mod tests { core.run(delay.map(|not_cancelled| assert!(not_cancelled))) .unwrap() }, - 30, + 20, 100, ); @@ -616,7 +627,7 @@ pub mod tests { core.run(delay.map(|not_cancelled| assert!(not_cancelled))) .unwrap() }, - 100, + 50, 200, ); @@ -791,7 +802,7 @@ pub mod tests { waiter.reset_timeout(Instant::now() + Duration::from_millis(100)); let (tx, rx) = mpsc::sync_channel(1); let f = waiter.on_timeout(move || tx.send(1).unwrap()); - assert_elapsed(|| core.run(f).unwrap(), 100, 200); + assert_elapsed(|| core.run(f).unwrap(), 50, 200); rx.try_recv().unwrap(); // The timeout handler shouldn't be invoked after waiter has been notified. @@ -889,6 +900,7 @@ pub mod tests { waiter_count.fetch_add(1, Ordering::SeqCst); wait_table.add_waiter(dummy_waiter(1, lock.ts, lock.hash)); assert_eq!(waiter_count.load(Ordering::SeqCst), 1); + // Remove the waiter. wait_table.remove_waiter(lock, 1).unwrap(); assert_eq!(waiter_count.load(Ordering::SeqCst), 0); // Removing a non-existed waiter shouldn't decrease waiter count. @@ -960,7 +972,7 @@ pub mod tests { scheduler.wait_for(waiter.start_ts, waiter.cb, waiter.pr, waiter.lock, 1000); assert_elapsed( || expect_key_is_locked(f.wait().unwrap().unwrap().pop().unwrap(), lock_info), - 1000, + 900, 1200, ); @@ -969,7 +981,7 @@ pub mod tests { scheduler.wait_for(waiter.start_ts, waiter.cb, waiter.pr, waiter.lock, 100); assert_elapsed( || expect_key_is_locked(f.wait().unwrap().unwrap().pop().unwrap(), lock_info), - 100, + 50, 300, ); @@ -978,7 +990,7 @@ pub mod tests { scheduler.wait_for(waiter.start_ts, waiter.cb, waiter.pr, waiter.lock, 3000); assert_elapsed( || expect_key_is_locked(f.wait().unwrap().unwrap().pop().unwrap(), lock_info), - 1000, + 900, 1200, ); @@ -1056,7 +1068,7 @@ pub mod tests { lock_info.set_lock_version(lock.ts - 1); assert_elapsed( || expect_write_conflict(f.wait().unwrap(), waiter_ts, lock_info, commit_ts - 1), - wake_up_delay_duration, + wake_up_delay_duration - 50, wake_up_delay_duration + 200, ); @@ -1128,7 +1140,7 @@ pub mod tests { // The new waiter will be wake up after timeout. assert_elapsed( || expect_key_is_locked(f2.wait().unwrap().unwrap().pop().unwrap(), lock_info2), - 1000, + 900, 1200, );