Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions src/task/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ lazy_static! {

// Create up to MAX_THREADS dynamic blocking task worker threads.
// Dynamic threads will terminate themselves if they don't
// receive any work after one second.
// receive any work after between one and ten seconds.
fn maybe_create_another_blocking_thread() {
// We use a `Relaxed` atomic operation because
// it's just a heuristic, and would not lose correctness
Expand All @@ -59,10 +59,19 @@ fn maybe_create_another_blocking_thread() {
return;
}

// We want to avoid having all threads terminate at
// exactly the same time, causing thundering herd
// effects. We want to stagger their destruction over
// 10 seconds or so to make the costs fade into
// background noise.
//
// Generate a simple random number of milliseconds
let rand_sleep_ms = u64::from(random(10_000));

thread::Builder::new()
.name("async-blocking-driver-dynamic".to_string())
.spawn(|| {
let wait_limit = Duration::from_secs(1);
.spawn(move || {
let wait_limit = Duration::from_millis(1000 + rand_sleep_ms);

DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed);
while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) {
Expand Down Expand Up @@ -119,3 +128,30 @@ impl<R> fmt::Debug for JoinHandle<R> {
.finish()
}
}

/// Generates a random number in `0..n`.
fn random(n: u32) -> u32 {
use std::cell::Cell;
use std::num::Wrapping;

thread_local! {
static RNG: Cell<Wrapping<u32>> = Cell::new(Wrapping(1406868647));
}

RNG.with(|rng| {
// This is the 32-bit variant of Xorshift.
//
// Source: https://en.wikipedia.org/wiki/Xorshift
let mut x = rng.get();
x ^= x << 13;
x ^= x >> 17;
x ^= x << 5;
rng.set(x);

// This is a fast alternative to `x % n`.
//
// Author: Daniel Lemire
// Source: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
((x.0 as u64).wrapping_mul(n as u64) >> 32) as u32
})
}