diff --git a/crossbeam-channel/src/channel.rs b/crossbeam-channel/src/channel.rs index 61aefc2ce..0ebf872d2 100644 --- a/crossbeam-channel/src/channel.rs +++ b/crossbeam-channel/src/channel.rs @@ -1149,6 +1149,32 @@ impl Receiver { _ => false, } } + + /// Disconnects this from the channel. + /// + /// If this is the last one connected to the channel, sender start to fail and + /// the returned iterator can be used to drain any remaining sent messages. Otherwise, it + /// always returns [`None`]. + pub fn disconnect(self) -> DisconnectIter { + let was_last = unsafe { + match &self.flavor { + ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()), + _ => { + panic!(); + } + } + }; + if let Some(true) = was_last { + DisconnectIter { + last_receiver: Some(self), + } + } else { + std::mem::forget(self); + DisconnectIter { + last_receiver: None, + } + } + } } impl Drop for Receiver { @@ -1156,12 +1182,14 @@ impl Drop for Receiver { unsafe { match &self.flavor { ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()), - ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()), + ReceiverFlavor::List(chan) => { + chan.release(|c| c.disconnect_receivers_and_discard_messages()) + } ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()), - ReceiverFlavor::At(_) => {} - ReceiverFlavor::Tick(_) => {} - ReceiverFlavor::Never(_) => {} - } + ReceiverFlavor::At(_) => None, + ReceiverFlavor::Tick(_) => None, + ReceiverFlavor::Never(_) => None, + }; } } } @@ -1289,6 +1317,39 @@ pub struct TryIter<'a, T> { receiver: &'a Receiver, } +/// A non-blocking draining iterator over unreceived messages after channel termination. +/// +/// Each call to [`next`] returns a message if there is still more to be received. The iterator +/// never blocks waiting for the next message as the channel don't allow sending anymore. +/// +/// [`next`]: Iterator::next +pub struct DisconnectIter { + last_receiver: Option>, +} + +impl FusedIterator for DisconnectIter {} + +impl Iterator for DisconnectIter { + type Item = T; + + fn next(&mut self) -> Option { + self.last_receiver + .as_ref() + .and_then(|r| match r.try_recv() { + Ok(msg) => Some(msg), + Err(TryRecvError::Disconnected) => None, + Err(TryRecvError::Empty) => unreachable!(), + }) + } +} + +impl DisconnectIter { + /// Returns true if this was returned from the last receiver's disconnection. + pub fn is_last_receiver(&self) -> bool { + self.last_receiver.is_some() + } +} + impl Iterator for TryIter<'_, T> { type Item = T; @@ -1303,6 +1364,26 @@ impl fmt::Debug for TryIter<'_, T> { } } +impl fmt::Debug for DisconnectIter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("DisconnectIter { .. }") + } +} + +impl Drop for DisconnectIter { + fn drop(&mut self) { + if let Some(last_receiver) = self.last_receiver.take() { + match &last_receiver.flavor { + ReceiverFlavor::List(chan) => chan.discard_all_messages(), + _ => { + panic!(); + } + } + std::mem::forget(last_receiver); + } + } +} + /// A blocking iterator over messages in a channel. /// /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the diff --git a/crossbeam-channel/src/counter.rs b/crossbeam-channel/src/counter.rs index 2c27f7c6b..77397e561 100644 --- a/crossbeam-channel/src/counter.rs +++ b/crossbeam-channel/src/counter.rs @@ -118,14 +118,18 @@ impl Receiver { /// Releases the receiver reference. /// /// Function `disconnect` will be called if this is the last receiver reference. - pub(crate) unsafe fn release bool>(&self, disconnect: F) { + pub(crate) unsafe fn release T>(&self, disconnect: F) -> Option { + let mut from_callback = None; + if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 { - disconnect(&self.counter().chan); + from_callback = Some(disconnect(&self.counter().chan)); if self.counter().destroy.swap(true, Ordering::AcqRel) { drop(Box::from_raw(self.counter)); } } + + from_callback } } diff --git a/crossbeam-channel/src/flavors/list.rs b/crossbeam-channel/src/flavors/list.rs index 6090b8d47..caa61dc47 100644 --- a/crossbeam-channel/src/flavors/list.rs +++ b/crossbeam-channel/src/flavors/list.rs @@ -549,10 +549,8 @@ impl Channel { /// Disconnects receivers. /// /// Returns `true` if this call disconnected the channel. - pub(crate) fn disconnect_receivers(&self) -> bool { - let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst); - - if tail & MARK_BIT == 0 { + pub(crate) fn disconnect_receivers_and_discard_messages(&self) -> bool { + if self.disconnect_receivers() { // If receivers are dropped first, discard all messages to free // memory eagerly. self.discard_all_messages(); @@ -562,10 +560,15 @@ impl Channel { } } + pub(crate) fn disconnect_receivers(&self) -> bool { + let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst); + tail & MARK_BIT == 0 + } + /// Discards all messages. /// /// This method should only be called when all receivers are dropped. - fn discard_all_messages(&self) { + pub(crate) fn discard_all_messages(&self) { let backoff = Backoff::new(); let mut tail = self.tail.index.load(Ordering::Acquire); loop { diff --git a/crossbeam-channel/src/lib.rs b/crossbeam-channel/src/lib.rs index cc1ef112f..80a6b836d 100644 --- a/crossbeam-channel/src/lib.rs +++ b/crossbeam-channel/src/lib.rs @@ -359,7 +359,7 @@ cfg_if! { pub use crate::channel::{after, at, never, tick}; pub use crate::channel::{bounded, unbounded}; - pub use crate::channel::{IntoIter, Iter, TryIter}; + pub use crate::channel::{IntoIter, Iter, TryIter, DisconnectIter}; pub use crate::channel::{Receiver, Sender}; pub use crate::select::{Select, SelectedOperation};