diff --git a/crossbeam-channel/src/select.rs b/crossbeam-channel/src/select.rs index ac9e408d3..80fb32a50 100644 --- a/crossbeam-channel/src/select.rs +++ b/crossbeam-channel/src/select.rs @@ -177,6 +177,7 @@ enum Timeout { fn run_select( handles: &mut [(&dyn SelectHandle, usize, *const u8)], timeout: Timeout, + is_biased: bool, ) -> Option<(Token, usize, *const u8)> { if handles.is_empty() { // Wait until the timeout and return. @@ -193,8 +194,10 @@ fn run_select( } } - // Shuffle the operations for fairness. - utils::shuffle(handles); + if !is_biased { + // Shuffle the operations for fairness. + utils::shuffle(handles); + } // Create a token, which serves as a temporary variable that gets initialized in this function // and is later used by a call to `channel::read()` or `channel::write()` that completes the @@ -325,6 +328,7 @@ fn run_select( fn run_ready( handles: &mut [(&dyn SelectHandle, usize, *const u8)], timeout: Timeout, + is_biased: bool, ) -> Option { if handles.is_empty() { // Wait until the timeout and return. @@ -341,8 +345,10 @@ fn run_ready( } } - // Shuffle the operations for fairness. - utils::shuffle(handles); + if !is_biased { + // Shuffle the operations for fairness. + utils::shuffle(handles); + } loop { let backoff = Backoff::new(); @@ -450,8 +456,9 @@ fn run_ready( #[inline] pub fn try_select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], + is_biased: bool, ) -> Result, TrySelectError> { - match run_select(handles, Timeout::Now) { + match run_select(handles, Timeout::Now, is_biased) { None => Err(TrySelectError), Some((token, index, ptr)) => Ok(SelectedOperation { token, @@ -467,12 +474,13 @@ pub fn try_select<'a>( #[inline] pub fn select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], + is_biased: bool, ) -> SelectedOperation<'a> { if handles.is_empty() { panic!("no operations have been added to `Select`"); } - let (token, index, ptr) = run_select(handles, Timeout::Never).unwrap(); + let (token, index, ptr) = run_select(handles, Timeout::Never, is_biased).unwrap(); SelectedOperation { token, index, @@ -487,10 +495,11 @@ pub fn select<'a>( pub fn select_timeout<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], timeout: Duration, + is_biased: bool, ) -> Result, SelectTimeoutError> { match Instant::now().checked_add(timeout) { - Some(deadline) => select_deadline(handles, deadline), - None => Ok(select(handles)), + Some(deadline) => select_deadline(handles, deadline, is_biased), + None => Ok(select(handles, is_biased)), } } @@ -499,8 +508,9 @@ pub fn select_timeout<'a>( pub(crate) fn select_deadline<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], deadline: Instant, + is_biased: bool, ) -> Result, SelectTimeoutError> { - match run_select(handles, Timeout::At(deadline)) { + match run_select(handles, Timeout::At(deadline), is_biased) { None => Err(SelectTimeoutError), Some((token, index, ptr)) => Ok(SelectedOperation { token, @@ -764,7 +774,7 @@ impl<'a> Select<'a> { /// } /// ``` pub fn try_select(&mut self) -> Result, TrySelectError> { - try_select(&mut self.handles) + try_select(&mut self.handles, false) } /// Blocks until one of the operations becomes ready and selects it. @@ -815,7 +825,7 @@ impl<'a> Select<'a> { /// # t2.join().unwrap(); // join thread to avoid https://github.com/rust-lang/miri/issues/1371 /// ``` pub fn select(&mut self) -> SelectedOperation<'a> { - select(&mut self.handles) + select(&mut self.handles, false) } /// Blocks for a limited time until one of the operations becomes ready and selects it. @@ -869,7 +879,7 @@ impl<'a> Select<'a> { &mut self, timeout: Duration, ) -> Result, SelectTimeoutError> { - select_timeout(&mut self.handles, timeout) + select_timeout(&mut self.handles, timeout, false) } /// Blocks until a given deadline, or until one of the operations becomes ready and selects it. @@ -925,7 +935,7 @@ impl<'a> Select<'a> { &mut self, deadline: Instant, ) -> Result, SelectTimeoutError> { - select_deadline(&mut self.handles, deadline) + select_deadline(&mut self.handles, deadline, false) } /// Attempts to find a ready operation without blocking. @@ -964,7 +974,7 @@ impl<'a> Select<'a> { /// } /// ``` pub fn try_ready(&mut self) -> Result { - match run_ready(&mut self.handles, Timeout::Now) { + match run_ready(&mut self.handles, Timeout::Now, false) { None => Err(TryReadyError), Some(index) => Ok(index), } @@ -1021,7 +1031,7 @@ impl<'a> Select<'a> { panic!("no operations have been added to `Select`"); } - run_ready(&mut self.handles, Timeout::Never).unwrap() + run_ready(&mut self.handles, Timeout::Never, false).unwrap() } /// Blocks for a limited time until one of the operations becomes ready. @@ -1122,7 +1132,7 @@ impl<'a> Select<'a> { /// # t2.join().unwrap(); // join thread to avoid https://github.com/rust-lang/miri/issues/1371 /// ``` pub fn ready_deadline(&mut self, deadline: Instant) -> Result { - match run_ready(&mut self.handles, Timeout::At(deadline)) { + match run_ready(&mut self.handles, Timeout::At(deadline), false) { None => Err(ReadyTimeoutError), Some(index) => Ok(index), } diff --git a/crossbeam-channel/src/select_macro.rs b/crossbeam-channel/src/select_macro.rs index 3b71e1e50..f7dfe8c9a 100644 --- a/crossbeam-channel/src/select_macro.rs +++ b/crossbeam-channel/src/select_macro.rs @@ -750,7 +750,7 @@ macro_rules! crossbeam_channel_internal { $cases:tt ) => {{ let _oper: $crate::SelectedOperation<'_> = { - let _oper = $crate::internal::select(&mut $sel); + let _oper = $crate::internal::select(&mut $sel, _IS_BIASED); // Erase the lifetime so that `sel` can be dropped early even without NLL. unsafe { ::std::mem::transmute(_oper) } @@ -772,7 +772,7 @@ macro_rules! crossbeam_channel_internal { $cases:tt ) => {{ let _oper: ::std::option::Option<$crate::SelectedOperation<'_>> = { - let _oper = $crate::internal::try_select(&mut $sel); + let _oper = $crate::internal::try_select(&mut $sel, _IS_BIASED); // Erase the lifetime so that `sel` can be dropped early even without NLL. unsafe { ::std::mem::transmute(_oper) } @@ -802,7 +802,7 @@ macro_rules! crossbeam_channel_internal { $cases:tt ) => {{ let _oper: ::std::option::Option<$crate::SelectedOperation<'_>> = { - let _oper = $crate::internal::select_timeout(&mut $sel, $timeout); + let _oper = $crate::internal::select_timeout(&mut $sel, $timeout, _IS_BIASED); // Erase the lifetime so that `sel` can be dropped early even without NLL. unsafe { ::std::mem::transmute(_oper) } @@ -985,7 +985,8 @@ macro_rules! crossbeam_channel_internal { /// /// This macro allows you to define a set of channel operations, wait until any one of them becomes /// ready, and finally execute it. If multiple operations are ready at the same time, a random one -/// among them is selected. +/// among them is selected (i.e. the unbiased selection). Use `select_biased!` for the biased +/// selection. /// /// It is also possible to define a `default` case that gets executed if none of the operations are /// ready, either right away or for a certain duration of time. @@ -1121,8 +1122,33 @@ macro_rules! crossbeam_channel_internal { #[macro_export] macro_rules! select { ($($tokens:tt)*) => { - $crate::crossbeam_channel_internal!( - $($tokens)* - ) + { + const _IS_BIASED: bool = false; + + $crate::crossbeam_channel_internal!( + $($tokens)* + ) + } + }; +} + +/// Selects from a set of channel operations. +/// +/// This macro allows you to define a list of channel operations, wait until any one of them +/// becomes ready, and finally execute it. If multiple operations are ready at the same time, the +/// operation nearest to the front of the list is always selected (i.e. the biased selection). Use +/// [`select!`] for the unbiased selection. +/// +/// Otherwise, this macro's functionality is identical to [`select!`]. Refer to it for the syntax. +#[macro_export] +macro_rules! select_biased { + ($($tokens:tt)*) => { + { + const _IS_BIASED: bool = true; + + $crate::crossbeam_channel_internal!( + $($tokens)* + ) + } }; } diff --git a/crossbeam-channel/tests/mpsc.rs b/crossbeam-channel/tests/mpsc.rs index 5c09ee88c..307e2f400 100644 --- a/crossbeam-channel/tests/mpsc.rs +++ b/crossbeam-channel/tests/mpsc.rs @@ -176,6 +176,8 @@ macro_rules! select { ( $($name:pat = $rx:ident.$meth:ident() => $code:expr),+ ) => ({ + const _IS_BIASED: bool = false; + cc::crossbeam_channel_internal! { $( $meth(($rx).inner) -> res => { diff --git a/crossbeam-channel/tests/select_macro.rs b/crossbeam-channel/tests/select_macro.rs index c8b96c18d..618faf4e1 100644 --- a/crossbeam-channel/tests/select_macro.rs +++ b/crossbeam-channel/tests/select_macro.rs @@ -9,7 +9,7 @@ use std::ops::Deref; use std::thread; use std::time::{Duration, Instant}; -use crossbeam_channel::{after, bounded, never, select, tick, unbounded}; +use crossbeam_channel::{after, bounded, never, select, select_biased, tick, unbounded}; use crossbeam_channel::{Receiver, RecvError, SendError, Sender, TryRecvError}; use crossbeam_utils::thread::scope; @@ -943,6 +943,121 @@ fn fairness_send() { assert!(hits.iter().all(|x| *x >= COUNT / 4)); } +#[test] +fn unfairness() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = unbounded::<()>(); + let (s2, r2) = unbounded::<()>(); + let (s3, r3) = unbounded::<()>(); + + for _ in 0..COUNT { + s1.send(()).unwrap(); + s2.send(()).unwrap(); + } + s3.send(()).unwrap(); + + let mut hits = [0usize; 3]; + for _ in 0..COUNT { + select_biased! { + recv(r1) -> _ => hits[0] += 1, + recv(r2) -> _ => hits[1] += 1, + recv(r3) -> _ => hits[2] += 1, + } + } + assert_eq!(hits, [COUNT, 0, 0]); + + for _ in 0..COUNT { + select_biased! { + recv(r1) -> _ => hits[0] += 1, + recv(r2) -> _ => hits[1] += 1, + recv(r3) -> _ => hits[2] += 1, + } + } + assert_eq!(hits, [COUNT, COUNT, 0]); +} + +#[test] +fn unfairness_timeout() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = unbounded::<()>(); + let (s2, r2) = unbounded::<()>(); + let (s3, r3) = unbounded::<()>(); + + for _ in 0..COUNT { + s1.send(()).unwrap(); + s2.send(()).unwrap(); + } + s3.send(()).unwrap(); + + let mut hits = [0usize; 3]; + for _ in 0..COUNT { + select_biased! { + recv(r1) -> _ => hits[0] += 1, + recv(r2) -> _ => hits[1] += 1, + recv(r3) -> _ => hits[2] += 1, + default(ms(1000)) => unreachable!(), + } + } + assert_eq!(hits, [COUNT, 0, 0]); + + for _ in 0..COUNT { + select_biased! { + recv(r1) -> _ => hits[0] += 1, + recv(r2) -> _ => hits[1] += 1, + recv(r3) -> _ => hits[2] += 1, + default(ms(1000)) => unreachable!(), + } + } + assert_eq!(hits, [COUNT, COUNT, 0]); +} + +#[test] +fn unfairness_try() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = unbounded::<()>(); + let (s2, r2) = unbounded::<()>(); + let (s3, r3) = unbounded::<()>(); + + for _ in 0..COUNT { + s1.send(()).unwrap(); + s2.send(()).unwrap(); + } + s3.send(()).unwrap(); + + let mut hits = [0usize; 3]; + for _ in 0..COUNT { + select_biased! { + recv(r1) -> _ => hits[0] += 1, + recv(r2) -> _ => hits[1] += 1, + recv(r3) -> _ => hits[2] += 1, + default() => unreachable!(), + } + } + assert_eq!(hits, [COUNT, 0, 0]); + + for _ in 0..COUNT { + select_biased! { + recv(r1) -> _ => hits[0] += 1, + recv(r2) -> _ => hits[1] += 1, + recv(r3) -> _ => hits[2] += 1, + default() => unreachable!(), + } + } + assert_eq!(hits, [COUNT, COUNT, 0]); +} + #[allow(clippy::or_fun_call, clippy::unnecessary_literal_unwrap)] // This is intentional. #[test] fn references() {