22
33use std:: fmt;
44use std:: pin:: Pin ;
5+ use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
56use std:: thread;
7+ use std:: time:: Duration ;
68
7- use crossbeam_channel:: { unbounded , Receiver , Sender } ;
9+ use crossbeam_channel:: { bounded , Receiver , Sender } ;
810use lazy_static:: lazy_static;
911
1012use crate :: future:: Future ;
1113use crate :: task:: { Context , Poll } ;
1214use crate :: utils:: abort_on_panic;
1315
16+ const MAX_THREADS : u64 = 10_000 ;
17+
18+ static DYNAMIC_THREAD_COUNT : AtomicU64 = AtomicU64 :: new ( 0 ) ;
19+
1420struct Pool {
1521 sender : Sender < async_task:: Task < ( ) > > ,
1622 receiver : Receiver < async_task:: Task < ( ) > > ,
@@ -29,11 +35,69 @@ lazy_static! {
2935 . expect( "cannot start a thread driving blocking tasks" ) ;
3036 }
3137
32- let ( sender, receiver) = unbounded( ) ;
38+ // We want to use an unbuffered channel here to help
39+ // us drive our dynamic control. In effect, the
40+ // kernel's scheduler becomes the queue, reducing
41+ // the number of buffers that work must flow through
42+ // before being acted on by a core. This helps keep
43+ // latency snappy in the overall async system by
44+ // reducing bufferbloat.
45+ let ( sender, receiver) = bounded( 0 ) ;
3346 Pool { sender, receiver }
3447 } ;
3548}
3649
50+ // Create up to MAX_THREADS dynamic blocking task worker threads.
51+ // Dynamic threads will terminate themselves if they don't
52+ // receive any work after one second.
53+ fn maybe_create_another_blocking_thread ( ) {
54+ // We use a `Relaxed` atomic operation because
55+ // it's just a heuristic, and would not lose correctness
56+ // even if it's random.
57+ let workers = DYNAMIC_THREAD_COUNT . load ( Ordering :: Relaxed ) ;
58+ if workers >= MAX_THREADS {
59+ return ;
60+ }
61+
62+ thread:: Builder :: new ( )
63+ . name ( "async-blocking-driver-dynamic" . to_string ( ) )
64+ . spawn ( || {
65+ let wait_limit = Duration :: from_secs ( 1 ) ;
66+
67+ DYNAMIC_THREAD_COUNT . fetch_add ( 1 , Ordering :: Relaxed ) ;
68+ while let Ok ( task) = POOL . receiver . recv_timeout ( wait_limit) {
69+ abort_on_panic ( || task. run ( ) ) ;
70+ }
71+ DYNAMIC_THREAD_COUNT . fetch_sub ( 1 , Ordering :: Relaxed ) ;
72+ } )
73+ . expect ( "cannot start a dynamic thread driving blocking tasks" ) ;
74+ }
75+
76+ // Enqueues work, attempting to send to the threadpool in a
77+ // nonblocking way and spinning up another worker thread if
78+ // there is not a thread ready to accept the work.
79+ fn schedule ( t : async_task:: Task < ( ) > ) {
80+ let first_try_result = POOL . sender . try_send ( t) ;
81+ match first_try_result {
82+ Ok ( ( ) ) => {
83+ // NICEEEE
84+ }
85+ Err ( crossbeam:: channel:: TrySendError :: Full ( t) ) => {
86+ // We were not able to send to the channel without
87+ // blocking. Try to spin up another thread and then
88+ // retry sending while blocking.
89+ maybe_create_another_blocking_thread ( ) ;
90+ POOL . sender . send ( t) . unwrap ( )
91+ }
92+ Err ( crossbeam:: channel:: TrySendError :: Disconnected ( _) ) => {
93+ panic ! (
94+ "unable to send to blocking threadpool \
95+ due to receiver disconnection"
96+ ) ;
97+ }
98+ }
99+ }
100+
37101/// Spawns a blocking task.
38102///
39103/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks.
42106 F : Future < Output = R > + Send + ' static ,
43107 R : Send + ' static ,
44108{
45- let schedule = |t| POOL . sender . send ( t) . unwrap ( ) ;
46109 let ( task, handle) = async_task:: spawn ( future, schedule, ( ) ) ;
47110 task. schedule ( ) ;
48111 JoinHandle ( handle)
0 commit comments