From cff8e5ea4986ce49168a4e93e167197a1d3d209e Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Thu, 2 May 2024 00:31:31 -0400 Subject: [PATCH] fallback to blocking when waiting for block or message writes --- crossbeam-channel/src/flavors/array.rs | 4 +- crossbeam-channel/src/flavors/list.rs | 172 +++++++++++++++++++++---- 2 files changed, 150 insertions(+), 26 deletions(-) diff --git a/crossbeam-channel/src/flavors/array.rs b/crossbeam-channel/src/flavors/array.rs index 2f9c2f3e9..dafe0aaad 100644 --- a/crossbeam-channel/src/flavors/array.rs +++ b/crossbeam-channel/src/flavors/array.rs @@ -210,7 +210,7 @@ impl Channel { // The head was advanced but the stamp hasn't been updated yet, // meaning a receive is in-progress. Spin for a bit waiting for - // the receive to complete before falling back to parking. + // the receive to complete before falling back to blocking. if backoff.is_completed() { return Status::InProgress; } @@ -308,7 +308,7 @@ impl Channel { // The tail was advanced but the stamp hasn't been updated yet, // meaning a send is in-progress. Spin for a bit waiting for - // the send to complete before falling back to parking. + // the send to complete before falling back to blocking. if backoff.is_completed() { return Status::InProgress; } diff --git a/crossbeam-channel/src/flavors/list.rs b/crossbeam-channel/src/flavors/list.rs index 04aeb866a..b21b34224 100644 --- a/crossbeam-channel/src/flavors/list.rs +++ b/crossbeam-channel/src/flavors/list.rs @@ -4,7 +4,7 @@ use std::boxed::Box; use std::cell::UnsafeCell; use std::marker::PhantomData; use std::mem::MaybeUninit; -use std::ptr; +use std::ptr::{self, NonNull}; use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; use std::time::Instant; @@ -56,10 +56,50 @@ impl Slot { }; /// Waits until a message is written into the slot. - fn wait_write(&self) { - let backoff = Backoff::new(); - while self.state.load(Ordering::Acquire) & WRITE == 0 { - backoff.snooze(); + fn wait_write(&self, receivers: &SyncWaker, token: &mut Token) { + let mut state = receivers.start(); + + loop { + // Try reading the message several times. + let backoff = Backoff::new(); + loop { + if self.state.load(Ordering::Acquire) & WRITE != 0 { + return; + } + + if backoff.is_completed() { + break; + } else { + backoff.snooze(); + } + } + + // Prepare for blocking until a sender wakes us up. + Context::with(|cx| { + let oper = Operation::hook(token); + // Register to be notified after any message is sent because + // we're not sure which notification is ours and we don't to + // get stuck behind other receivers. + receivers.watch(oper, cx, &state); + + // Was the emssage just sent? + if self.state.load(Ordering::Acquire) & WRITE != 0 { + let _ = cx.try_select(Selected::Aborted); + } + + // Block the current thread. + let sel = cx.wait_until(None); + + match sel { + Selected::Waiting => unreachable!(), + Selected::Aborted | Selected::Disconnected => { + receivers.unwatch(oper); + } + Selected::Operation(_) => {} + } + + state.unpark(); + }); } } } @@ -85,14 +125,47 @@ impl Block { } /// Waits until the next pointer is set. - fn wait_next(&self) -> *mut Self { - let backoff = Backoff::new(); + fn wait_next(&self, receivers: &SyncWaker, token: &mut Token) -> *mut Self { + let mut state = receivers.start(); loop { - let next = self.next.load(Ordering::Acquire); - if !next.is_null() { - return next; + // Try reading the message several times. + let backoff = Backoff::new(); + loop { + if let Some(next) = NonNull::new(self.next.load(Ordering::Acquire)) { + return next.as_ptr(); + } + + if backoff.is_completed() { + break; + } else { + backoff.snooze(); + } } - backoff.snooze(); + + // Prepare for blocking until a sender wakes us up. + Context::with(|cx| { + let oper = Operation::hook(token); + // Register to be notified after any message is sent. + receivers.watch(oper, cx, &state); + + // Was the next pointer just written? + if !self.next.load(Ordering::Acquire).is_null() { + let _ = cx.try_select(Selected::Aborted); + } + + // Block the current thread. + let sel = cx.wait_until(None); + + match sel { + Selected::Waiting => unreachable!(), + Selected::Aborted | Selected::Disconnected => { + receivers.unwatch(oper); + } + Selected::Operation(_) => {} + } + + state.unpark(); + }); } } @@ -208,7 +281,7 @@ impl Channel { } /// Attempts to reserve a slot for sending a message. - fn start_send(&self, token: &mut Token) -> bool { + fn start_send(&self, token: &mut Token) -> Status { let backoff = Backoff::new(); let mut tail = self.tail.index.load(Ordering::Acquire); let mut block = self.tail.block.load(Ordering::Acquire); @@ -218,14 +291,19 @@ impl Channel { // Check if the channel is disconnected. if tail & MARK_BIT != 0 { token.list.block = ptr::null(); - return true; + return Status::Ready; } // Calculate the offset of the index into the block. let offset = (tail >> SHIFT) % LAP; // If we reached the end of the block, wait until the next one is installed. + // If we've been waiting for too long, fallback to blocking. if offset == BLOCK_CAP { + if backoff.is_completed() { + return Status::InProgress; + } + backoff.snooze(); tail = self.tail.index.load(Ordering::Acquire); block = self.tail.block.load(Ordering::Acquire); @@ -279,7 +357,7 @@ impl Channel { token.list.block = block as *const u8; token.list.offset = offset; - return true; + return Status::Ready; }, Err(t) => { tail = t; @@ -320,7 +398,7 @@ impl Channel { let offset = (head >> SHIFT) % LAP; // We reached the end of the block but the block is not installed yet, meaning - // the last send on previous block is still in progress. The send is likely to + // the last send on the previous block is still in progress. The send is likely to // be soon so we spin here before falling back to blocking. if offset == BLOCK_CAP { if backoff.is_completed() { @@ -382,7 +460,7 @@ impl Channel { Ok(_) => unsafe { // If we've reached the end of the block, move to the next one. if offset + 1 == BLOCK_CAP { - let next = (*block).wait_next(); + let next = (*block).wait_next(&self.receivers, token); let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT); if !(*next).next.load(Ordering::Relaxed).is_null() { next_index |= MARK_BIT; @@ -416,7 +494,7 @@ impl Channel { let block = token.list.block as *mut Block; let offset = token.list.offset; let slot = unsafe { (*block).slots.get_unchecked(offset) }; - slot.wait_write(); + slot.wait_write(&self.receivers, token); let msg = unsafe { slot.msg.get().read().assume_init() }; // Destroy the block if we've reached the end, or if another thread wanted to destroy but @@ -447,10 +525,55 @@ impl Channel { _deadline: Option, ) -> Result<(), SendTimeoutError> { let token = &mut Token::default(); - assert!(self.start_send(token)); - unsafe { - self.write(token, msg) - .map_err(SendTimeoutError::Disconnected) + + // It's possible that we can't proceed because of the sender that + // is supposed to install the next block lagging, so we might have to + // block for a message to be sent. + let mut state = self.receivers.start(); + let mut succeeded = false; + loop { + // Try sending a message several times. + let backoff = Backoff::new(); + loop { + if succeeded || self.start_send(token) == Status::Ready { + return unsafe { + self.write(token, msg) + .map_err(SendTimeoutError::Disconnected) + }; + } + + if backoff.is_completed() { + break; + } else { + backoff.snooze(); + } + } + + // Prepare for blocking until a sender wakes us up. + Context::with(|cx| { + let oper = Operation::hook(token); + // Register to be notified after any message is sent. + self.receivers.watch(oper, cx, &state); + + // Has the channel become ready just now? + if self.start_send(token) == Status::Ready { + succeeded = true; + let _ = cx.try_select(Selected::Aborted); + } + + // Block the current thread. + let sel = cx.wait_until(None); + + match sel { + Selected::Waiting => unreachable!(), + Selected::Aborted | Selected::Disconnected => { + self.receivers.unwatch(oper); + } + Selected::Operation(_) => {} + } + + state.unpark(); + }); } } @@ -610,6 +733,7 @@ impl Channel { /// /// This method should only be called when all receivers are dropped. fn discard_all_messages(&self) { + let token = &mut Token::default(); let backoff = Backoff::new(); let mut tail = self.tail.index.load(Ordering::Acquire); loop { @@ -651,10 +775,10 @@ impl Channel { if offset < BLOCK_CAP { // Drop the message in the slot. let slot = (*block).slots.get_unchecked(offset); - slot.wait_write(); + slot.wait_write(&self.receivers, token); (*slot.msg.get()).assume_init_drop(); } else { - (*block).wait_next(); + (*block).wait_next(&self.receivers, token); // Deallocate the block and move to the next one. let next = (*block).next.load(Ordering::Acquire); drop(Box::from_raw(block)); @@ -796,7 +920,7 @@ impl<'a, T> SelectHandle for Sender<'a, T> { } fn try_select(&self, token: &mut Token) -> bool { - self.0.start_send(token) + self.0.start_send(token) == Status::Ready } fn deadline(&self) -> Option {