Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lock_manager: more metrics (#6392) #6422

Merged
merged 1 commit into from Jan 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/server/lock_manager/deadlock.rs
Expand Up @@ -99,7 +99,7 @@ impl DetectTable {

/// Returns the key hash which causes deadlock.
pub fn detect(&mut self, txn_ts: u64, lock_ts: u64, lock_hash: u64) -> Option<u64> {
let _timer = DETECTOR_HISTOGRAM_VEC.detect.start_coarse_timer();
let _timer = DETECT_DURATION_HISTOGRAM.start_coarse_timer();
TASK_COUNTER_VEC.detect.inc();

self.now = Instant::now_coarse();
Expand Down Expand Up @@ -509,10 +509,12 @@ where
if self.inner.borrow().role != role {
match role {
Role::Leader => {
info!("became the leader of deadlock detector!"; "self_id" => self.store_id)
info!("became the leader of deadlock detector!"; "self_id" => self.store_id);
DETECTOR_LEADER_GAUGE.set(1);
}
Role::Follower => {
info!("changed from the leader of deadlock detector to follower!"; "self_id" => self.store_id)
info!("changed from the leader of deadlock detector to follower!"; "self_id" => self.store_id);
DETECTOR_LEADER_GAUGE.set(0);
}
}
}
Expand Down
30 changes: 20 additions & 10 deletions src/server/lock_manager/metrics.rs
Expand Up @@ -25,10 +25,10 @@ make_static_metric! {
},
}

pub struct DetectorHistogramVec: Histogram {
pub struct WaitTableStatusGauge: IntGauge {
"type" => {
monitor_membership_change,
detect,
locks,
txns,
},
}
}
Expand All @@ -51,15 +51,25 @@ lazy_static! {
pub static ref WAITER_LIFETIME_HISTOGRAM: Histogram = register_histogram!(
"tikv_lock_manager_waiter_lifetime_duration",
"Duration of waiters' lifetime in seconds",
exponential_buckets(0.0005, 2.0, 20).unwrap()
exponential_buckets(0.0005, 2.0, 20).unwrap() // 0.5ms ~ 524s
)
.unwrap();
pub static ref DETECT_DURATION_HISTOGRAM: Histogram = register_histogram!(
"tikv_lock_manager_detect_duration",
"Duration of handling detect requests",
exponential_buckets(0.0001, 2.0, 20).unwrap() // 0.1ms ~ 104s
)
.unwrap();
pub static ref WAIT_TABLE_STATUS_GAUGE: WaitTableStatusGauge = register_static_int_gauge_vec!(
WaitTableStatusGauge,
"tikv_lock_manager_wait_table_status",
"Status of the wait table",
&["type"]
)
.unwrap();
pub static ref DETECTOR_HISTOGRAM_VEC: DetectorHistogramVec = register_static_histogram_vec!(
DetectorHistogramVec,
"tikv_lock_manager_detector_histogram",
"Bucketed histogram of deadlock detector",
&["type"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
pub static ref DETECTOR_LEADER_GAUGE: IntGauge = register_int_gauge!(
"tikv_lock_manager_detector_leader_heartbeat",
"Heartbeat of the leader of the deadlock detector"
)
.unwrap();
}
2 changes: 1 addition & 1 deletion src/server/lock_manager/mod.rs
Expand Up @@ -331,7 +331,7 @@ mod tests {
assert!(lock_mgr.has_waiter());
assert_elapsed(
|| expect_key_is_locked(f.wait().unwrap().unwrap().pop().unwrap(), lock_info),
3000,
2900,
3200,
);
assert!(!lock_mgr.has_waiter());
Expand Down
40 changes: 26 additions & 14 deletions src/server/lock_manager/waiter_manager.rs
Expand Up @@ -282,18 +282,27 @@ impl WaitTable {

/// Returns the duplicated `Waiter` if there is.
fn add_waiter(&mut self, waiter: Waiter) -> Option<Waiter> {
let waiters = self.wait_table.entry(waiter.lock.hash).or_default();
let waiters = self.wait_table.entry(waiter.lock.hash).or_insert_with(|| {
WAIT_TABLE_STATUS_GAUGE.locks.inc();
Waiters::default()
});
let old_idx = waiters.iter().position(|w| w.start_ts == waiter.start_ts);
waiters.push(waiter);
let old = waiters.swap_remove(old_idx?);
self.waiter_count.fetch_sub(1, Ordering::SeqCst);
Some(old)
if let Some(old_idx) = old_idx {
let old = waiters.swap_remove(old_idx);
self.waiter_count.fetch_sub(1, Ordering::SeqCst);
Some(old)
} else {
WAIT_TABLE_STATUS_GAUGE.txns.inc();
None
}
// Here we don't increase waiter_count because it's already updated in LockManager::wait_for()
}

/// Removes all waiters waiting for the lock.
fn remove(&mut self, lock: Lock) {
self.wait_table.remove(&lock.hash);
WAIT_TABLE_STATUS_GAUGE.locks.dec();
}

fn remove_waiter(&mut self, lock: Lock, waiter_ts: u64) -> Option<Waiter> {
Expand All @@ -303,8 +312,9 @@ impl WaitTable {
.position(|waiter| waiter.start_ts == waiter_ts)?;
let waiter = waiters.swap_remove(idx);
self.waiter_count.fetch_sub(1, Ordering::SeqCst);
WAIT_TABLE_STATUS_GAUGE.txns.dec();
if waiters.is_empty() {
self.wait_table.remove(&lock.hash);
self.remove(lock);
}
Some(waiter)
}
Expand All @@ -323,6 +333,7 @@ impl WaitTable {
.0;
let oldest = waiters.swap_remove(oldest_idx);
self.waiter_count.fetch_sub(1, Ordering::SeqCst);
WAIT_TABLE_STATUS_GAUGE.txns.dec();
Some((oldest, waiters))
}

Expand Down Expand Up @@ -590,7 +601,7 @@ pub mod tests {
core.run(delay.map(|not_cancelled| assert!(not_cancelled)))
.unwrap()
},
100,
50,
200,
);

Expand All @@ -603,7 +614,7 @@ pub mod tests {
core.run(delay.map(|not_cancelled| assert!(not_cancelled)))
.unwrap()
},
30,
20,
100,
);

Expand All @@ -616,7 +627,7 @@ pub mod tests {
core.run(delay.map(|not_cancelled| assert!(not_cancelled)))
.unwrap()
},
100,
50,
200,
);

Expand Down Expand Up @@ -791,7 +802,7 @@ pub mod tests {
waiter.reset_timeout(Instant::now() + Duration::from_millis(100));
let (tx, rx) = mpsc::sync_channel(1);
let f = waiter.on_timeout(move || tx.send(1).unwrap());
assert_elapsed(|| core.run(f).unwrap(), 100, 200);
assert_elapsed(|| core.run(f).unwrap(), 50, 200);
rx.try_recv().unwrap();

// The timeout handler shouldn't be invoked after waiter has been notified.
Expand Down Expand Up @@ -889,6 +900,7 @@ pub mod tests {
waiter_count.fetch_add(1, Ordering::SeqCst);
wait_table.add_waiter(dummy_waiter(1, lock.ts, lock.hash));
assert_eq!(waiter_count.load(Ordering::SeqCst), 1);
// Remove the waiter.
wait_table.remove_waiter(lock, 1).unwrap();
assert_eq!(waiter_count.load(Ordering::SeqCst), 0);
// Removing a non-existed waiter shouldn't decrease waiter count.
Expand Down Expand Up @@ -960,7 +972,7 @@ pub mod tests {
scheduler.wait_for(waiter.start_ts, waiter.cb, waiter.pr, waiter.lock, 1000);
assert_elapsed(
|| expect_key_is_locked(f.wait().unwrap().unwrap().pop().unwrap(), lock_info),
1000,
900,
1200,
);

Expand All @@ -969,7 +981,7 @@ pub mod tests {
scheduler.wait_for(waiter.start_ts, waiter.cb, waiter.pr, waiter.lock, 100);
assert_elapsed(
|| expect_key_is_locked(f.wait().unwrap().unwrap().pop().unwrap(), lock_info),
100,
50,
300,
);

Expand All @@ -978,7 +990,7 @@ pub mod tests {
scheduler.wait_for(waiter.start_ts, waiter.cb, waiter.pr, waiter.lock, 3000);
assert_elapsed(
|| expect_key_is_locked(f.wait().unwrap().unwrap().pop().unwrap(), lock_info),
1000,
900,
1200,
);

Expand Down Expand Up @@ -1056,7 +1068,7 @@ pub mod tests {
lock_info.set_lock_version(lock.ts - 1);
assert_elapsed(
|| expect_write_conflict(f.wait().unwrap(), waiter_ts, lock_info, commit_ts - 1),
wake_up_delay_duration,
wake_up_delay_duration - 50,
wake_up_delay_duration + 200,
);

Expand Down Expand Up @@ -1128,7 +1140,7 @@ pub mod tests {
// The new waiter will be wake up after timeout.
assert_elapsed(
|| expect_key_is_locked(f2.wait().unwrap().unwrap().pop().unwrap(), lock_info2),
1000,
900,
1200,
);

Expand Down