Skip to content

Commit

Permalink
fallback to blocking when waiting for block or message writes
Browse files Browse the repository at this point in the history
  • Loading branch information
ibraheemdev committed May 2, 2024
1 parent 61013ca commit cff8e5e
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 26 deletions.
4 changes: 2 additions & 2 deletions crossbeam-channel/src/flavors/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl<T> Channel<T> {

// The head was advanced but the stamp hasn't been updated yet,
// meaning a receive is in-progress. Spin for a bit waiting for
// the receive to complete before falling back to parking.
// the receive to complete before falling back to blocking.
if backoff.is_completed() {
return Status::InProgress;
}
Expand Down Expand Up @@ -308,7 +308,7 @@ impl<T> Channel<T> {

// The tail was advanced but the stamp hasn't been updated yet,
// meaning a send is in-progress. Spin for a bit waiting for
// the send to complete before falling back to parking.
// the send to complete before falling back to blocking.
if backoff.is_completed() {
return Status::InProgress;
}
Expand Down
172 changes: 148 additions & 24 deletions crossbeam-channel/src/flavors/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::boxed::Box;
use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::ptr;
use std::ptr::{self, NonNull};
use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
use std::time::Instant;

Expand Down Expand Up @@ -56,10 +56,50 @@ impl<T> Slot<T> {
};

/// Waits until a message is written into the slot.
fn wait_write(&self) {
let backoff = Backoff::new();
while self.state.load(Ordering::Acquire) & WRITE == 0 {
backoff.snooze();
fn wait_write(&self, receivers: &SyncWaker, token: &mut Token) {
let mut state = receivers.start();

loop {
// Try reading the message several times.
let backoff = Backoff::new();
loop {
if self.state.load(Ordering::Acquire) & WRITE != 0 {
return;
}

if backoff.is_completed() {
break;
} else {
backoff.snooze();
}
}

// Prepare for blocking until a sender wakes us up.
Context::with(|cx| {
let oper = Operation::hook(token);
// Register to be notified after any message is sent because
// we're not sure which notification is ours and we don't to
// get stuck behind other receivers.
receivers.watch(oper, cx, &state);

// Was the emssage just sent?
if self.state.load(Ordering::Acquire) & WRITE != 0 {
let _ = cx.try_select(Selected::Aborted);
}

// Block the current thread.
let sel = cx.wait_until(None);

match sel {
Selected::Waiting => unreachable!(),
Selected::Aborted | Selected::Disconnected => {
receivers.unwatch(oper);
}
Selected::Operation(_) => {}
}

state.unpark();
});
}
}
}
Expand All @@ -85,14 +125,47 @@ impl<T> Block<T> {
}

/// Waits until the next pointer is set.
fn wait_next(&self) -> *mut Self {
let backoff = Backoff::new();
fn wait_next(&self, receivers: &SyncWaker, token: &mut Token) -> *mut Self {
let mut state = receivers.start();
loop {
let next = self.next.load(Ordering::Acquire);
if !next.is_null() {
return next;
// Try reading the message several times.
let backoff = Backoff::new();
loop {
if let Some(next) = NonNull::new(self.next.load(Ordering::Acquire)) {
return next.as_ptr();
}

if backoff.is_completed() {
break;
} else {
backoff.snooze();
}
}
backoff.snooze();

// Prepare for blocking until a sender wakes us up.
Context::with(|cx| {
let oper = Operation::hook(token);
// Register to be notified after any message is sent.
receivers.watch(oper, cx, &state);

// Was the next pointer just written?
if !self.next.load(Ordering::Acquire).is_null() {
let _ = cx.try_select(Selected::Aborted);
}

// Block the current thread.
let sel = cx.wait_until(None);

match sel {
Selected::Waiting => unreachable!(),
Selected::Aborted | Selected::Disconnected => {
receivers.unwatch(oper);
}
Selected::Operation(_) => {}
}

state.unpark();
});
}
}

Expand Down Expand Up @@ -208,7 +281,7 @@ impl<T> Channel<T> {
}

/// Attempts to reserve a slot for sending a message.
fn start_send(&self, token: &mut Token) -> bool {
fn start_send(&self, token: &mut Token) -> Status {
let backoff = Backoff::new();
let mut tail = self.tail.index.load(Ordering::Acquire);
let mut block = self.tail.block.load(Ordering::Acquire);
Expand All @@ -218,14 +291,19 @@ impl<T> Channel<T> {
// Check if the channel is disconnected.
if tail & MARK_BIT != 0 {
token.list.block = ptr::null();
return true;
return Status::Ready;
}

// Calculate the offset of the index into the block.
let offset = (tail >> SHIFT) % LAP;

// If we reached the end of the block, wait until the next one is installed.
// If we've been waiting for too long, fallback to blocking.
if offset == BLOCK_CAP {
if backoff.is_completed() {
return Status::InProgress;
}

backoff.snooze();
tail = self.tail.index.load(Ordering::Acquire);
block = self.tail.block.load(Ordering::Acquire);
Expand Down Expand Up @@ -279,7 +357,7 @@ impl<T> Channel<T> {

token.list.block = block as *const u8;
token.list.offset = offset;
return true;
return Status::Ready;
},
Err(t) => {
tail = t;
Expand Down Expand Up @@ -320,7 +398,7 @@ impl<T> Channel<T> {
let offset = (head >> SHIFT) % LAP;

// We reached the end of the block but the block is not installed yet, meaning
// the last send on previous block is still in progress. The send is likely to
// the last send on the previous block is still in progress. The send is likely to
// be soon so we spin here before falling back to blocking.
if offset == BLOCK_CAP {
if backoff.is_completed() {
Expand Down Expand Up @@ -382,7 +460,7 @@ impl<T> Channel<T> {
Ok(_) => unsafe {
// If we've reached the end of the block, move to the next one.
if offset + 1 == BLOCK_CAP {
let next = (*block).wait_next();
let next = (*block).wait_next(&self.receivers, token);
let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
if !(*next).next.load(Ordering::Relaxed).is_null() {
next_index |= MARK_BIT;
Expand Down Expand Up @@ -416,7 +494,7 @@ impl<T> Channel<T> {
let block = token.list.block as *mut Block<T>;
let offset = token.list.offset;
let slot = unsafe { (*block).slots.get_unchecked(offset) };
slot.wait_write();
slot.wait_write(&self.receivers, token);
let msg = unsafe { slot.msg.get().read().assume_init() };

// Destroy the block if we've reached the end, or if another thread wanted to destroy but
Expand Down Expand Up @@ -447,10 +525,55 @@ impl<T> Channel<T> {
_deadline: Option<Instant>,
) -> Result<(), SendTimeoutError<T>> {
let token = &mut Token::default();
assert!(self.start_send(token));
unsafe {
self.write(token, msg)
.map_err(SendTimeoutError::Disconnected)

// It's possible that we can't proceed because of the sender that
// is supposed to install the next block lagging, so we might have to
// block for a message to be sent.
let mut state = self.receivers.start();
let mut succeeded = false;
loop {
// Try sending a message several times.
let backoff = Backoff::new();
loop {
if succeeded || self.start_send(token) == Status::Ready {
return unsafe {
self.write(token, msg)
.map_err(SendTimeoutError::Disconnected)
};
}

if backoff.is_completed() {
break;
} else {
backoff.snooze();
}
}

// Prepare for blocking until a sender wakes us up.
Context::with(|cx| {
let oper = Operation::hook(token);
// Register to be notified after any message is sent.
self.receivers.watch(oper, cx, &state);

// Has the channel become ready just now?
if self.start_send(token) == Status::Ready {
succeeded = true;
let _ = cx.try_select(Selected::Aborted);
}

// Block the current thread.
let sel = cx.wait_until(None);

match sel {
Selected::Waiting => unreachable!(),
Selected::Aborted | Selected::Disconnected => {
self.receivers.unwatch(oper);
}
Selected::Operation(_) => {}
}

state.unpark();
});
}
}

Expand Down Expand Up @@ -610,6 +733,7 @@ impl<T> Channel<T> {
///
/// This method should only be called when all receivers are dropped.
fn discard_all_messages(&self) {
let token = &mut Token::default();
let backoff = Backoff::new();
let mut tail = self.tail.index.load(Ordering::Acquire);
loop {
Expand Down Expand Up @@ -651,10 +775,10 @@ impl<T> Channel<T> {
if offset < BLOCK_CAP {
// Drop the message in the slot.
let slot = (*block).slots.get_unchecked(offset);
slot.wait_write();
slot.wait_write(&self.receivers, token);
(*slot.msg.get()).assume_init_drop();
} else {
(*block).wait_next();
(*block).wait_next(&self.receivers, token);
// Deallocate the block and move to the next one.
let next = (*block).next.load(Ordering::Acquire);
drop(Box::from_raw(block));
Expand Down Expand Up @@ -796,7 +920,7 @@ impl<'a, T> SelectHandle for Sender<'a, T> {
}

fn try_select(&self, token: &mut Token) -> bool {
self.0.start_send(token)
self.0.start_send(token) == Status::Ready
}

fn deadline(&self) -> Option<Instant> {
Expand Down

0 comments on commit cff8e5e

Please sign in to comment.