Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support faster send/recv by not notifying the other side #1039

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
148 changes: 139 additions & 9 deletions crossbeam-channel/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,31 @@ impl<T> Sender<T> {
/// ```
pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.try_send(msg),
SenderFlavor::List(chan) => chan.try_send(msg),
SenderFlavor::Array(chan) => chan.try_send(msg, true),
SenderFlavor::List(chan) => chan.try_send(msg, true),
SenderFlavor::Zero(chan) => chan.try_send(msg),
}
}

/// Attempts to send a message into the channel without blocking and notifying receiver.
///
/// This method will either send a message into the channel immediately or return an error if
/// the channel is full or disconnected. The returned error contains the original message.
///
/// Omitting notification makes the send operation complete faster. However, this means
/// receiver won't be notified about the existence of new message immediately. This means the
/// sole use of this call for a given channel could cause receivers to be blocked indefinitely.
/// Thus, this call must be accompanied with some explicit mechanism to wake up the receiver
/// like mixed use of notifying send operations or receive operations with retry or
/// timeout/deadline.
///
/// If called on a zero-capacity channel, this method will send the message only if there
/// happens to be a receive operation on the other side of the channel at the same time. This
/// means this is equivalent to the [try_send](Sender::try_send) operation.
pub fn try_send_unnotified(&self, msg: T) -> Result<(), TrySendError<T>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.try_send(msg, false),
SenderFlavor::List(chan) => chan.try_send(msg, false),
SenderFlavor::Zero(chan) => chan.try_send(msg),
}
}
Expand Down Expand Up @@ -440,8 +463,54 @@ impl<T> Sender<T> {
/// ```
pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.send(msg, None),
SenderFlavor::List(chan) => chan.send(msg, None),
SenderFlavor::Array(chan) => chan.send(msg, None, true),
SenderFlavor::List(chan) => chan.send(msg, None, true),
SenderFlavor::Zero(chan) => chan.send(msg, None),
}
.map_err(|err| match err {
SendTimeoutError::Disconnected(msg) => SendError(msg),
SendTimeoutError::Timeout(_) => unreachable!(),
})
}

/// Blocks the current thread until a message is sent without notifying receiver or the
/// channel is disconnected.
///
/// Omitting notification makes the send operation complete faster. However, this means
/// receiver won't be notified about the existence of new message immediately. This means the
/// sole use of this call for a given channel could cause receivers to be blocked indefinitely.
/// Thus, this call must be accompanied with some explicit mechanism to wake up the receiver
/// like mixed use of notifying send operations or receive operations with retry or
/// timeout/deadline.
///
/// If the channel is full and not disconnected, this call will block until the send operation
/// can proceed. If the channel becomes disconnected, this call will wake up and return an
/// error. The returned error contains the original message.
///
/// If called on a zero-capacity channel, this method will wait for a receive operation to
/// appear on the other side of the channel. This means this is equivalent to the
/// [send](Sender::send) operation.
///
/// # Examples
///
/// ```
/// use std::thread;
/// use std::time::Duration;
/// use crossbeam_channel::bounded;
///
/// let (s, r) = bounded(1);
/// assert_eq!(s.send_unnotified(1), Ok(()));
///
/// thread::spawn(move || {
/// // This sleep is crucial; otherwise `.recv()` could block indefinitely!
/// thread::sleep(Duration::from_secs(1));
/// assert_eq!(r.recv(), Ok(1));
/// }).join().unwrap();
/// ```
pub fn send_unnotified(&self, msg: T) -> Result<(), SendError<T>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.send(msg, None, false),
SenderFlavor::List(chan) => chan.send(msg, None, false),
SenderFlavor::Zero(chan) => chan.send(msg, None),
}
.map_err(|err| match err {
Expand Down Expand Up @@ -489,7 +558,35 @@ impl<T> Sender<T> {
/// ```
pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
match Instant::now().checked_add(timeout) {
Some(deadline) => self.send_deadline(msg, deadline),
Some(deadline) => self.send_internal(msg, deadline, true),
None => self.send(msg).map_err(SendTimeoutError::from),
}
}

/// Waits for a message to be sent into the channel without notifying receiver, but only for a
/// limited time.
///
/// Omitting notification makes the send operation complete faster. However, this means
/// receiver won't be notified about the existence of new message immediately. This means the
/// sole use of this call for a given channel could cause receivers to be blocked indefinitely.
/// Thus, this call must be accompanied with some explicit mechanism to wake up the receiver
/// like mixed use of notifying send operations or receive operations with retry or
/// timeout/deadline.
///
/// If the channel is full and not disconnected, this call will block until the send operation
/// can proceed or the operation times out. If the channel becomes disconnected, this call will
/// wake up and return an error. The returned error contains the original message.
///
/// If called on a zero-capacity channel, this method will wait for a receive operation to
/// appear on the other side of the channel. This means this is equivalent to the
/// [send_timeout](Sender::send_timeout) operation.
pub fn send_timeout_unnotified(
&self,
msg: T,
timeout: Duration,
) -> Result<(), SendTimeoutError<T>> {
match Instant::now().checked_add(timeout) {
Some(deadline) => self.send_internal(msg, deadline, false),
None => self.send(msg).map_err(SendTimeoutError::from),
}
}
Expand Down Expand Up @@ -534,9 +631,42 @@ impl<T> Sender<T> {
/// );
/// ```
pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
self.send_internal(msg, deadline, true)
}

/// Waits for a message to be sent into the channel without notifying receiver, but only until a given deadline.
///
/// Omitting notification makes the send operation complete faster. However, this means
/// receiver won't be notified about the existence of new message immediately. This means the
/// sole use of this call for a given channel could cause receivers to be blocked indefinitely.
/// Thus, this call must be accompanied with some explicit mechanism to wake up the receiver
/// like mixed use of notifying send operations or receive operations with retry or
/// timeout/deadline.
///
/// If the channel is full and not disconnected, this call will block until the send operation
/// can proceed or the operation times out. If the channel becomes disconnected, this call will
/// wake up and return an error. The returned error contains the original message.
///
/// If called on a zero-capacity channel, this method will wait for a receive operation to
/// appear on the other side of the channel. This means this is equivalent to the
/// [send_deadline](Sender::send_deadline) operation.
pub fn send_deadline_unnotified(
&self,
msg: T,
deadline: Instant,
) -> Result<(), SendTimeoutError<T>> {
self.send_internal(msg, deadline, false)
}

fn send_internal(
&self,
msg: T,
deadline: Instant,
notify: bool,
) -> Result<(), SendTimeoutError<T>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
SenderFlavor::Array(chan) => chan.send(msg, Some(deadline), notify),
SenderFlavor::List(chan) => chan.send(msg, Some(deadline), notify),
SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
}
}
Expand Down Expand Up @@ -1512,8 +1642,8 @@ impl<T> SelectHandle for Receiver<T> {
/// Writes a message into the channel.
pub(crate) unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> {
match &s.flavor {
SenderFlavor::Array(chan) => chan.write(token, msg),
SenderFlavor::List(chan) => chan.write(token, msg),
SenderFlavor::Array(chan) => chan.write(token, msg, true),
SenderFlavor::List(chan) => chan.write(token, msg, true),
SenderFlavor::Zero(chan) => chan.write(token, msg),
}
}
Expand Down
16 changes: 11 additions & 5 deletions crossbeam-channel/src/flavors/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl<T> Channel<T> {
}

/// Writes a message into the channel.
pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
pub(crate) unsafe fn write(&self, token: &mut Token, msg: T, notify: bool) -> Result<(), T> {
// If there is no slot, the channel is disconnected.
if token.array.slot.is_null() {
return Err(msg);
Expand All @@ -223,7 +223,9 @@ impl<T> Channel<T> {
slot.stamp.store(token.array.stamp, Ordering::Release);

// Wake a sleeping receiver.
self.receivers.notify();
if notify {
self.receivers.notify();
}
Ok(())
}

Expand Down Expand Up @@ -319,10 +321,13 @@ impl<T> Channel<T> {
}

/// Attempts to send a message into the channel.
pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
pub(crate) fn try_send(&self, msg: T, notify: bool) -> Result<(), TrySendError<T>> {
let token = &mut Token::default();
if self.start_send(token) {
unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
unsafe {
self.write(token, msg, notify)
.map_err(TrySendError::Disconnected)
}
} else {
Err(TrySendError::Full(msg))
}
Expand All @@ -333,14 +338,15 @@ impl<T> Channel<T> {
&self,
msg: T,
deadline: Option<Instant>,
notify: bool,
) -> Result<(), SendTimeoutError<T>> {
let token = &mut Token::default();
loop {
// Try sending a message several times.
let backoff = Backoff::new();
loop {
if self.start_send(token) {
let res = unsafe { self.write(token, msg) };
let res = unsafe { self.write(token, msg, notify) };
return res.map_err(SendTimeoutError::Disconnected);
}

Expand Down
13 changes: 8 additions & 5 deletions crossbeam-channel/src/flavors/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ impl<T> Channel<T> {
}

/// Writes a message into the channel.
pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
pub(crate) unsafe fn write(&self, token: &mut Token, msg: T, notify: bool) -> Result<(), T> {
// If there is no slot, the channel is disconnected.
if token.list.block.is_null() {
return Err(msg);
Expand All @@ -292,7 +292,9 @@ impl<T> Channel<T> {
slot.state.fetch_or(WRITE, Ordering::Release);

// Wake a sleeping receiver.
self.receivers.notify();
if notify {
self.receivers.notify();
}
Ok(())
}

Expand Down Expand Up @@ -407,8 +409,8 @@ impl<T> Channel<T> {
}

/// Attempts to send a message into the channel.
pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
self.send(msg, None).map_err(|err| match err {
pub(crate) fn try_send(&self, msg: T, notify: bool) -> Result<(), TrySendError<T>> {
self.send(msg, None, notify).map_err(|err| match err {
SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
SendTimeoutError::Timeout(_) => unreachable!(),
})
Expand All @@ -419,11 +421,12 @@ impl<T> Channel<T> {
&self,
msg: T,
_deadline: Option<Instant>,
notify: bool,
) -> Result<(), SendTimeoutError<T>> {
let token = &mut Token::default();
assert!(self.start_send(token));
unsafe {
self.write(token, msg)
self.write(token, msg, notify)
.map_err(SendTimeoutError::Disconnected)
}
}
Expand Down