Navigation Menu

Skip to content

Commit

Permalink
Remove the unstable and deprecated mpsc_select
Browse files Browse the repository at this point in the history
This removes macro `select!` and `std::sync::mpsc::{Handle, Select}`,
which were all unstable and have been deprecated since 1.32.
  • Loading branch information
cuviper committed May 17, 2019
1 parent b982867 commit f950193
Show file tree
Hide file tree
Showing 10 changed files with 9 additions and 1,130 deletions.
55 changes: 0 additions & 55 deletions src/libstd/macros.rs
Expand Up @@ -357,61 +357,6 @@ macro_rules! dbg {
};
}

/// Selects the first successful receive event from a number of receivers.
///
/// This macro is used to wait for the first event to occur on a number of
/// receivers. It places no restrictions on the types of receivers given to
/// this macro, this can be viewed as a heterogeneous select.
///
/// # Examples
///
/// ```
/// #![feature(mpsc_select)]
///
/// use std::thread;
/// use std::sync::mpsc;
///
/// // two placeholder functions for now
/// fn long_running_thread() {}
/// fn calculate_the_answer() -> u32 { 42 }
///
/// let (tx1, rx1) = mpsc::channel();
/// let (tx2, rx2) = mpsc::channel();
///
/// thread::spawn(move|| { long_running_thread(); tx1.send(()).unwrap(); });
/// thread::spawn(move|| { tx2.send(calculate_the_answer()).unwrap(); });
///
/// select! {
/// _ = rx1.recv() => println!("the long running thread finished first"),
/// answer = rx2.recv() => {
/// println!("the answer was: {}", answer.unwrap());
/// }
/// }
/// # drop(rx1.recv());
/// # drop(rx2.recv());
/// ```
///
/// For more information about select, see the `std::sync::mpsc::Select` structure.
#[macro_export]
#[unstable(feature = "mpsc_select", issue = "27800")]
#[rustc_deprecated(since = "1.32.0",
reason = "channel selection will be removed in a future release")]
macro_rules! select {
(
$($name:pat = $rx:ident.$meth:ident() => $code:expr),+
) => ({
use $crate::sync::mpsc::Select;
let sel = Select::new();
$( let mut $rx = sel.handle(&$rx); )+
unsafe {
$( $rx.add(); )+
}
let ret = sel.wait();
$( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+
{ unreachable!() }
})
}

#[cfg(test)]
macro_rules! assert_approx_eq {
($a:expr, $b:expr) => ({
Expand Down
85 changes: 2 additions & 83 deletions src/libstd/sync/mpsc/mod.rs
Expand Up @@ -116,7 +116,6 @@
//! ```

#![stable(feature = "rust1", since = "1.0.0")]
#![allow(deprecated)] // for mpsc_select

// A description of how Rust's channel implementation works
//
Expand Down Expand Up @@ -263,6 +262,8 @@
// believe that there is anything fundamental that needs to change about these
// channels, however, in order to support a more efficient select().
//
// FIXME: Select is now removed, so these factors are ready to be cleaned up!
//
// # Conclusion
//
// And now that you've seen all the races that I found and attempted to fix,
Expand All @@ -275,18 +276,8 @@ use crate::mem;
use crate::cell::UnsafeCell;
use crate::time::{Duration, Instant};

#[unstable(feature = "mpsc_select", issue = "27800")]
pub use self::select::{Select, Handle};
use self::select::StartResult;
use self::select::StartResult::*;
use self::blocking::SignalToken;

#[cfg(all(test, not(target_os = "emscripten")))]
mod select_tests;

mod blocking;
mod oneshot;
mod select;
mod shared;
mod stream;
mod sync;
Expand Down Expand Up @@ -1514,78 +1505,6 @@ impl<T> Receiver<T> {

}

impl<T> select::Packet for Receiver<T> {
fn can_recv(&self) -> bool {
loop {
let new_port = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => {
match p.can_recv() {
Ok(ret) => return ret,
Err(upgrade) => upgrade,
}
}
Flavor::Stream(ref p) => {
match p.can_recv() {
Ok(ret) => return ret,
Err(upgrade) => upgrade,
}
}
Flavor::Shared(ref p) => return p.can_recv(),
Flavor::Sync(ref p) => return p.can_recv(),
};
unsafe {
mem::swap(self.inner_mut(),
new_port.inner_mut());
}
}
}

fn start_selection(&self, mut token: SignalToken) -> StartResult {
loop {
let (t, new_port) = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => {
match p.start_selection(token) {
oneshot::SelSuccess => return Installed,
oneshot::SelCanceled => return Abort,
oneshot::SelUpgraded(t, rx) => (t, rx),
}
}
Flavor::Stream(ref p) => {
match p.start_selection(token) {
stream::SelSuccess => return Installed,
stream::SelCanceled => return Abort,
stream::SelUpgraded(t, rx) => (t, rx),
}
}
Flavor::Shared(ref p) => return p.start_selection(token),
Flavor::Sync(ref p) => return p.start_selection(token),
};
token = t;
unsafe {
mem::swap(self.inner_mut(), new_port.inner_mut());
}
}
}

fn abort_selection(&self) -> bool {
let mut was_upgrade = false;
loop {
let result = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => p.abort_selection(),
Flavor::Stream(ref p) => p.abort_selection(was_upgrade),
Flavor::Shared(ref p) => return p.abort_selection(was_upgrade),
Flavor::Sync(ref p) => return p.abort_selection(),
};
let new_port = match result { Ok(b) => return b, Err(p) => p };
was_upgrade = true;
unsafe {
mem::swap(self.inner_mut(),
new_port.inner_mut());
}
}
}
}

#[stable(feature = "rust1", since = "1.0.0")]
impl<'a, T> Iterator for Iter<'a, T> {
type Item = T;
Expand Down
72 changes: 0 additions & 72 deletions src/libstd/sync/mpsc/oneshot.rs
Expand Up @@ -24,7 +24,6 @@

pub use self::Failure::*;
pub use self::UpgradeResult::*;
pub use self::SelectionResult::*;
use self::MyUpgrade::*;

use crate::sync::mpsc::Receiver;
Expand Down Expand Up @@ -66,12 +65,6 @@ pub enum UpgradeResult {
UpWoke(SignalToken),
}

pub enum SelectionResult<T> {
SelCanceled,
SelUpgraded(SignalToken, Receiver<T>),
SelSuccess,
}

enum MyUpgrade<T> {
NothingSent,
SendUsed,
Expand Down Expand Up @@ -264,71 +257,6 @@ impl<T> Packet<T> {
// select implementation
////////////////////////////////////////////////////////////////////////////

// If Ok, the value is whether this port has data, if Err, then the upgraded
// port needs to be checked instead of this one.
pub fn can_recv(&self) -> Result<bool, Receiver<T>> {
unsafe {
match self.state.load(Ordering::SeqCst) {
EMPTY => Ok(false), // Welp, we tried
DATA => Ok(true), // we have some un-acquired data
DISCONNECTED if (*self.data.get()).is_some() => Ok(true), // we have data
DISCONNECTED => {
match ptr::replace(self.upgrade.get(), SendUsed) {
// The other end sent us an upgrade, so we need to
// propagate upwards whether the upgrade can receive
// data
GoUp(upgrade) => Err(upgrade),

// If the other end disconnected without sending an
// upgrade, then we have data to receive (the channel is
// disconnected).
up => { ptr::write(self.upgrade.get(), up); Ok(true) }
}
}
_ => unreachable!(), // we're the "one blocker"
}
}
}

// Attempts to start selection on this port. This can either succeed, fail
// because there is data, or fail because there is an upgrade pending.
pub fn start_selection(&self, token: SignalToken) -> SelectionResult<T> {
unsafe {
let ptr = token.cast_to_usize();
match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) {
EMPTY => SelSuccess,
DATA => {
drop(SignalToken::cast_from_usize(ptr));
SelCanceled
}
DISCONNECTED if (*self.data.get()).is_some() => {
drop(SignalToken::cast_from_usize(ptr));
SelCanceled
}
DISCONNECTED => {
match ptr::replace(self.upgrade.get(), SendUsed) {
// The other end sent us an upgrade, so we need to
// propagate upwards whether the upgrade can receive
// data
GoUp(upgrade) => {
SelUpgraded(SignalToken::cast_from_usize(ptr), upgrade)
}

// If the other end disconnected without sending an
// upgrade, then we have data to receive (the channel is
// disconnected).
up => {
ptr::write(self.upgrade.get(), up);
drop(SignalToken::cast_from_usize(ptr));
SelCanceled
}
}
}
_ => unreachable!(), // we're the "one blocker"
}
}
}

// Remove a previous selecting thread from this port. This ensures that the
// blocked thread will no longer be visible to any other threads.
//
Expand Down

0 comments on commit f950193

Please sign in to comment.