Skip to content

Commit 8c647cf

Browse files
author
Stjepan Glavina
authored
Merge pull request #32 from async-rs/tyler_randomized_spindown
Stagger background thread spin-down to avoid a thundering herd
2 parents 487811e + 10146e3 commit 8c647cf

File tree

1 file changed

+39
-3
lines changed

1 file changed

+39
-3
lines changed

src/task/blocking.rs

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ lazy_static! {
4949

5050
// Create up to MAX_THREADS dynamic blocking task worker threads.
5151
// Dynamic threads will terminate themselves if they don't
52-
// receive any work after one second.
52+
// receive any work after between one and ten seconds.
5353
fn maybe_create_another_blocking_thread() {
5454
// We use a `Relaxed` atomic operation because
5555
// it's just a heuristic, and would not lose correctness
@@ -59,10 +59,19 @@ fn maybe_create_another_blocking_thread() {
5959
return;
6060
}
6161

62+
// We want to avoid having all threads terminate at
63+
// exactly the same time, causing thundering herd
64+
// effects. We want to stagger their destruction over
65+
// 10 seconds or so to make the costs fade into
66+
// background noise.
67+
//
68+
// Generate a simple random number of milliseconds
69+
let rand_sleep_ms = u64::from(random(10_000));
70+
6271
thread::Builder::new()
6372
.name("async-blocking-driver-dynamic".to_string())
64-
.spawn(|| {
65-
let wait_limit = Duration::from_secs(1);
73+
.spawn(move || {
74+
let wait_limit = Duration::from_millis(1000 + rand_sleep_ms);
6675

6776
DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed);
6877
while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) {
@@ -119,3 +128,30 @@ impl<R> fmt::Debug for JoinHandle<R> {
119128
.finish()
120129
}
121130
}
131+
132+
/// Generates a random number in `0..n`.
133+
fn random(n: u32) -> u32 {
134+
use std::cell::Cell;
135+
use std::num::Wrapping;
136+
137+
thread_local! {
138+
static RNG: Cell<Wrapping<u32>> = Cell::new(Wrapping(1406868647));
139+
}
140+
141+
RNG.with(|rng| {
142+
// This is the 32-bit variant of Xorshift.
143+
//
144+
// Source: https://en.wikipedia.org/wiki/Xorshift
145+
let mut x = rng.get();
146+
x ^= x << 13;
147+
x ^= x >> 17;
148+
x ^= x << 5;
149+
rng.set(x);
150+
151+
// This is a fast alternative to `x % n`.
152+
//
153+
// Author: Daniel Lemire
154+
// Source: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
155+
((x.0 as u64).wrapping_mul(n as u64) >> 32) as u32
156+
})
157+
}

0 commit comments

Comments
 (0)