Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EMA based statistically adaptive thread pool design #108

Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 38 additions & 0 deletions benches/blocking.rs
@@ -0,0 +1,38 @@
#![feature(test)]

extern crate test;

use async_std::task;
use async_std::task::blocking::JoinHandle;
use futures::future::join_all;
use std::thread;
use std::time::Duration;
use test::Bencher;

// Benchmark for a 10K burst task spawn
#[bench]
fn blocking(b: &mut Bencher) {
b.iter(|| {
let handles = (0..10_000)
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
.map(|_| {
task::blocking::spawn(async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
})
})
.collect::<Vec<JoinHandle<()>>>();

task::block_on(join_all(handles));
});
}

// Benchmark for a single blocking task spawn
#[bench]
fn blocking_single(b: &mut Bencher) {
b.iter(|| {
task::blocking::spawn(async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
})
});
}
144 changes: 125 additions & 19 deletions src/task/blocking.rs
@@ -1,5 +1,6 @@
//! A thread pool for running blocking functions asynchronously.

use std::collections::VecDeque;
use std::fmt;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
Expand All @@ -12,10 +13,29 @@ use lazy_static::lazy_static;
use crate::future::Future;
use crate::task::{Context, Poll};
use crate::utils::abort_on_panic;
use std::sync::{Arc, Mutex};
vertexclique marked this conversation as resolved.
Show resolved Hide resolved

// Low watermark value, defines the bare minimum of the pool.
// Spawns initial thread set.
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
const LOW_WATERMARK: u64 = 2;

// Pool managers interval time (milliseconds)
// This is the actual interval which makes adaptation calculation
const MANAGER_POLL_INTERVAL: u64 = 50;

// Frequency scale factor for thread adaptation calculation
const FREQUENCY_SCALE_FACTOR: u64 = 200;

// Frequency histogram's sliding window size
// Defines how many frequencies will be considered for adaptation.
const FREQUENCY_QUEUE_SIZE: usize = 10;

// Possible max threads (without OS contract)
const MAX_THREADS: u64 = 10_000;

static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0);
// Pool task frequency variable
// Holds scheduled tasks onto the thread pool for the calculation window
static FREQUENCY: AtomicU64 = AtomicU64::new(0);

struct Pool {
sender: Sender<async_task::Task<()>>,
Expand All @@ -24,7 +44,7 @@ struct Pool {

lazy_static! {
static ref POOL: Pool = {
for _ in 0..2 {
for _ in 0..LOW_WATERMARK {
thread::Builder::new()
.name("async-blocking-driver".to_string())
.spawn(|| abort_on_panic(|| {
Expand All @@ -35,6 +55,19 @@ lazy_static! {
.expect("cannot start a thread driving blocking tasks");
}

// Pool manager to check frequency of task rates
// and take action by scaling the pool accordingly.
thread::Builder::new()
.name("async-pool-manager".to_string())
.spawn(|| abort_on_panic(|| {
let poll_interval = Duration::from_millis(MANAGER_POLL_INTERVAL);
loop {
scale_pool();
thread::sleep(poll_interval);
}
}))
.expect("thread pool manager cannot be started");

// We want to use an unbuffered channel here to help
// us drive our dynamic control. In effect, the
// kernel's scheduler becomes the queue, reducing
Expand All @@ -45,20 +78,84 @@ lazy_static! {
let (sender, receiver) = bounded(0);
Pool { sender, receiver }
};

// Pool task frequency calculation variables
static ref FREQ_QUEUE: Arc<Mutex<VecDeque<u64>>> = {
Arc::new(Mutex::new(VecDeque::with_capacity(FREQUENCY_QUEUE_SIZE + 1)))
};

// Pool size variable
static ref POOL_SIZE: Arc<Mutex<u64>> = Arc::new(Mutex::new(LOW_WATERMARK));
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
}

// Create up to MAX_THREADS dynamic blocking task worker threads.
// Dynamic threads will terminate themselves if they don't
// receive any work after between one and ten seconds.
fn maybe_create_another_blocking_thread() {
// We use a `Relaxed` atomic operation because
// it's just a heuristic, and would not lose correctness
// even if it's random.
let workers = DYNAMIC_THREAD_COUNT.load(Ordering::Relaxed);
if workers >= MAX_THREADS {
return;
// Gets the current pool size
// Used for pool size boundary checking in pool manager
fn get_current_pool_size() -> u64 {
let current_arc = POOL_SIZE.clone();
let current_pool_size = *current_arc.lock().unwrap();
LOW_WATERMARK.max(current_pool_size)
}

// Adaptive pool scaling function
//
// This allows to spawn new threads to make room for incoming task pressure.
// Works in the background detached from the pool system and scales up the pool based
// on the request rate.
//
// It uses frequency based calculation to define work. Utilizing average processing rate.
fn scale_pool() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In another context there was some discussion around adding tracing (using log's trace! macro) to async-std. This functions seems like a good place for that, too.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log::trace or the new tracing library? wondering about how much overhead each would add

Copy link
Contributor

@killercup killercup Aug 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just know that the idea was thrown around, and it was probably an opt-in feature, too. @stjepang or @yoshuawuyts would know more :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log now has kv support, which we're already using in other places too. tracing can pick these up too, so using log is probably the best choice here overall (:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should get the kv support in tracing-log soon-ish? However I don't believe that log supports the span-like macros, so async-std might need to have its own, internal span-like macro and dispatch conditionally to async-log or tracing.

I haven't considered how the differences between async-log's span and tracing's span can be bridged, but I think it's feasible if tracing's span!().enter() is used to mimic async-log's nested span block.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidbarsky we could use async-log::span for this, probably. Though there's still a few open issues on the repo, the interface wouldn't change.

From convos with Eliza during RustConf there's def a desire to bridge between the two approaches, and we mostly need to figure out how to go about it. async-rs/async-log#7 is what I've currently got (repo incoming soon), but need to check how well that works (:

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yoshuawuyts Yep! I think we're on the same page about wanting to bridge the two libraries.

I'm spitballing one mechanism on bridging that gap, which consists of using mutually exclusive feature flags in a build.rs to dispatch to either async-log or tracing. This would entail using an async-std-specific span macro until the gaps—if any exist—between async-std's span and tracing's span are closed. I'm personally excited to dig into rust-lang/log#353!

I'm sorry if what I communicated wasn't clear!

// Fetch current frequency, it does matter that operations are ordered in this approach.
let current_frequency = FREQUENCY.load(Ordering::SeqCst);
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
let freq_queue_arc = FREQ_QUEUE.clone();
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
let mut freq_queue = freq_queue_arc.lock().unwrap();

// Make it safe to start for calculations by adding initial frequency scale
if freq_queue.len() == 0 {
freq_queue.push_back(0);
}

// Calculate message rate at the given time window
// Factor is there for making adaptation linear.
// Exponential bursts may create idle threads which won't be utilized.
// They may actually cause slowdown.
let current_freq_factorized = current_frequency as f64 * FREQUENCY_SCALE_FACTOR as f64;
let frequency = (current_freq_factorized / MANAGER_POLL_INTERVAL as f64) as u64;

// Adapts the thread count of pool
//
// Sliding window of frequencies visited by the pool manager.
// Select the maximum from the window and check against the current task dispatch frequency.
// If current frequency is bigger, we will scale up.
if let Some(&max_measurement) = freq_queue.iter().max() {
if frequency > max_measurement {
// Don't spawn more than cores.
// Default behaviour of most of the linear adapting thread pools.
let scale_by = num_cpus::get().max(LOW_WATERMARK as usize) as u64;

// Pool size can't reach to max_threads anyway.
// Pool manager backpressures itself while visiting message rate frequencies.
// You will get an error before hitting to limits by OS.
if get_current_pool_size() < MAX_THREADS {
(0..scale_by).for_each(|_| {
create_blocking_thread();
});
}
}
}

// Add seen frequency data to the frequency histogram.
freq_queue.push_back(frequency);
if freq_queue.len() == FREQUENCY_QUEUE_SIZE + 1 {
freq_queue.pop_front();
}

FREQUENCY.store(0, Ordering::Release);
}

// Creates yet another thread to receive tasks.
// Dynamic threads will terminate themselves if they don't
// receive any work after between one and ten seconds.
fn create_blocking_thread() {
// We want to avoid having all threads terminate at
// exactly the same time, causing thundering herd
// effects. We want to stagger their destruction over
Expand All @@ -73,24 +170,33 @@ fn maybe_create_another_blocking_thread() {
.spawn(move || {
let wait_limit = Duration::from_millis(1000 + rand_sleep_ms);

DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed);
// Adjust the pool size counter before and after spawn
{
let current_arc = POOL_SIZE.clone();
*current_arc.lock().unwrap() += 1;
}
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) {
abort_on_panic(|| task.run());
}
DYNAMIC_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed);
{
let current_arc = POOL_SIZE.clone();
*current_arc.lock().unwrap() -= 1;
}
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
})
.expect("cannot start a dynamic thread driving blocking tasks");
}

// Enqueues work, attempting to send to the threadpool in a
// nonblocking way and spinning up another worker thread if
// there is not a thread ready to accept the work.
// nonblocking way and spinning up needed amount of threads
// based on the previous statistics without relying on
// if there is not a thread ready to accept the work or not.
fn schedule(t: async_task::Task<()>) {
// Add up for every incoming task schedule
FREQUENCY.fetch_add(1, Ordering::Acquire);

if let Err(err) = POOL.sender.try_send(t) {
// We were not able to send to the channel without
// blocking. Try to spin up another thread and then
// retry sending while blocking.
maybe_create_another_blocking_thread();
// blocking.
POOL.sender.send(err.into_inner()).unwrap();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/task/mod.rs
Expand Up @@ -34,4 +34,4 @@ mod pool;
mod sleep;
mod task;

pub(crate) mod blocking;
pub mod blocking;
143 changes: 143 additions & 0 deletions tests/thread_pool.rs
@@ -0,0 +1,143 @@
use async_std::task;
use async_std::task::blocking::JoinHandle;
use futures::future::join_all;
use std::thread;
use std::time::Duration;
use std::time::Instant;

// Test for slow joins without task bursts during joins.
#[test]
#[ignore]
fn slow_join() {
let thread_join_time_max = 11_000;
let start = Instant::now();

// Send an initial batch of million bursts.
let handles = (0..1_000_000)
.map(|_| {
task::blocking::spawn(async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
})
})
.collect::<Vec<JoinHandle<()>>>();

task::block_on(join_all(handles));

// Let them join to see how it behaves under different workloads.
let duration = Duration::from_millis(thread_join_time_max);
thread::sleep(duration);

// Spawn yet another batch of work on top of it
let handles = (0..10_000)
.map(|_| {
task::blocking::spawn(async {
let duration = Duration::from_millis(100);
thread::sleep(duration);
})
})
.collect::<Vec<JoinHandle<()>>>();

task::block_on(join_all(handles));

// Slow joins shouldn't cause internal slow down
let elapsed = start.elapsed().as_millis() - thread_join_time_max as u128;
println!("Slow task join. Monotonic exec time: {:?} ns", elapsed);

// Should be less than 25_000 ns
// Previous implementation is around this threshold.
assert_eq!(elapsed < 25_000, true);
}

// Test for slow joins with task burst.
#[test]
#[ignore]
fn slow_join_interrupted() {
let thread_join_time_max = 2_000;
let start = Instant::now();

// Send an initial batch of million bursts.
let handles = (0..1_000_000)
.map(|_| {
task::blocking::spawn(async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
})
})
.collect::<Vec<JoinHandle<()>>>();

task::block_on(join_all(handles));

// Let them join to see how it behaves under different workloads.
// This time join under the time window.
let duration = Duration::from_millis(thread_join_time_max);
thread::sleep(duration);

// Spawn yet another batch of work on top of it
let handles = (0..10_000)
.map(|_| {
task::blocking::spawn(async {
let duration = Duration::from_millis(100);
thread::sleep(duration);
})
})
.collect::<Vec<JoinHandle<()>>>();

task::block_on(join_all(handles));

// Slow joins shouldn't cause internal slow down
let elapsed = start.elapsed().as_millis() - thread_join_time_max as u128;
println!("Slow task join. Monotonic exec time: {:?} ns", elapsed);

// Should be less than 25_000 ns
// Previous implementation is around this threshold.
assert_eq!(elapsed < 25_000, true);
}

// This test is expensive but it proves that longhauling tasks are working in adaptive thread pool.
// Thread pool which spawns on-demand will panic with this test.
#[test]
#[ignore]
fn longhauling_task_join() {
let thread_join_time_max = 11_000;
let start = Instant::now();

// First batch of overhauling tasks
let handles = (0..100_000)
.map(|_| {
task::blocking::spawn(async {
let duration = Duration::from_millis(1000);
thread::sleep(duration);
})
})
.collect::<Vec<JoinHandle<()>>>();

task::block_on(join_all(handles));

// Let them join to see how it behaves under different workloads.
let duration = Duration::from_millis(thread_join_time_max);
thread::sleep(duration);

// Send yet another medium sized batch to see how it scales.
let handles = (0..10_000)
.map(|_| {
task::blocking::spawn(async {
let duration = Duration::from_millis(100);
thread::sleep(duration);
})
})
.collect::<Vec<JoinHandle<()>>>();

task::block_on(join_all(handles));

// Slow joins shouldn't cause internal slow down
let elapsed = start.elapsed().as_millis() - thread_join_time_max as u128;
println!(
"Long-hauling task join. Monotonic exec time: {:?} ns",
elapsed
);

// Should be less than 200_000 ns
// Previous implementation will panic when this test is running.
assert_eq!(elapsed < 200_000, true);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these tests are generally non-reproducible, and it's unclear if they are valuable. they tend to fail on my fairly modern 2018 i7 mbp

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when I said concurrency tests before, I meant adding randomized delays to operations that involve cross-thread coordination via atomics (or higher level primitives that invoke them internally). These might be OK as a benchmark so you can compare results on your own machine before and after changes, but I don't think they are useful as tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please try running the tests with master's blocking.rs. They will panic. Also, please take a look to referenced issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They panic on this branch on a fairly modern machine, so I'm not sure they are useful on machines other than yours. In general, timing-based performance tests are an antipattern, as they have big reproducibility issues. If you want to do this kind of testing, it's better to use something like cachegrind to count CPU instructions executed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because they don't actually assert on anything now (which was what caused them to fail on my laptop), I think they are better as benchmarks, right? I don't think it makes sense to have tests that can't fail.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it makes sense to have tests that can't fail.

Please read my comments carefully. You can see what it tests in action:
https://asciinema.org/a/R6b3UcFo0xfNo1sCPOPJrB04T

when I said concurrency tests before, I meant adding randomized delays to operations that involve cross-thread coordination via atomics (or higher level primitives that invoke them internally).

Unexposed architecture shouldn't be tested with whitebox fashion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are the assertions that fail? The asciicinema is too long to watch.

Possible concurrent interleavings are exposed architecture, and this kind of test has resulted in a huge number of bugs being discovered in various lock-free projects I've been involved in. Concurrency is really hard to get right sometimes, and the original PR with more unserialized atomic accesses would have had bugs jump out pretty quickly with this approach.