From 05554e65289951e95735258500a9eeed3ac74065 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Fri, 28 Dec 2018 10:50:09 +0100 Subject: [PATCH] Optimize unbounded channels --- crossbeam-channel/Cargo.toml | 4 - crossbeam-channel/src/flavors/list.rs | 532 ++++++++++++++------------ crossbeam-channel/src/lib.rs | 1 - 3 files changed, 282 insertions(+), 255 deletions(-) diff --git a/crossbeam-channel/Cargo.toml b/crossbeam-channel/Cargo.toml index 4297a3924..3e1d1a957 100644 --- a/crossbeam-channel/Cargo.toml +++ b/crossbeam-channel/Cargo.toml @@ -20,10 +20,6 @@ parking_lot = "0.7" rand = "0.6" smallvec = "0.6.2" -[dependencies.crossbeam-epoch] -version = "0.7" -path = "../crossbeam-epoch" - [dependencies.crossbeam-utils] version = "0.6.3" path = "../crossbeam-utils" diff --git a/crossbeam-channel/src/flavors/list.rs b/crossbeam-channel/src/flavors/list.rs index 95b701c93..1b87ff656 100644 --- a/crossbeam-channel/src/flavors/list.rs +++ b/crossbeam-channel/src/flavors/list.rs @@ -4,10 +4,9 @@ use std::cell::UnsafeCell; use std::marker::PhantomData; use std::mem::{self, ManuallyDrop}; use std::ptr; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; use std::time::Instant; -use crossbeam_epoch::{self as epoch, Atomic, Guard, Owned, Shared}; use crossbeam_utils::CachePadded; use context::Context; @@ -16,39 +15,46 @@ use select::{Operation, SelectHandle, Selected, Token}; use utils::Backoff; use waker::SyncWaker; -// TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, reapply the +// TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the // following changes by @kleimkuhler: // // 1. https://github.com/crossbeam-rs/crossbeam-channel/pull/100 // 2. https://github.com/crossbeam-rs/crossbeam-channel/pull/101 -/// The maximum number of messages a block can hold. -const BLOCK_CAP: usize = 32; +// Bits indicating the state of a slot: +// * If a message has been written into the slot, `WRITE` is set. +// * If a message has been read from the slot, `READ` is set. +// * If the block is being destroyed, `DESTROY` is set. +const WRITE: usize = 1; +const READ: usize = 2; +const DESTROY: usize = 4; + +// Each block covers one "lap" of indices. +const LAP: usize = 32; +// The maximum number of messages a block can hold. +const BLOCK_CAP: usize = LAP - 1; +// How many lower bits are reserved for metadata. +const SHIFT: usize = 2; +// Indicates that the block is not the last one. +const HAS_NEXT: usize = 1; +// Indicates that the channel is disconnected. +const DISCONNECTED: usize = 2; /// A slot in a block. struct Slot { /// The message. msg: UnsafeCell>, - /// Equals `true` if the message is ready for reading. - ready: AtomicBool, + /// The state of the slot. + state: AtomicUsize, } -/// The token type for the list flavor. -pub struct ListToken { - /// Slot to read from or write to. - slot: *const u8, - - /// Guard keeping alive the block that contains the slot. - guard: Option, -} - -impl Default for ListToken { - #[inline] - fn default() -> Self { - ListToken { - slot: ptr::null(), - guard: None, +impl Slot { + /// Waits until a message is written into the slot. + fn wait_write(&self) { + let mut backoff = Backoff::new(); + while self.state.load(Ordering::Acquire) & WRITE == 0 { + backoff.snooze(); } } } @@ -57,38 +63,79 @@ impl Default for ListToken { /// /// Each block in the list can hold up to `BLOCK_CAP` messages. struct Block { - /// The start index of this block. - /// - /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`. - start_index: usize, - /// The next block in the linked list. - next: Atomic>, + next: AtomicPtr>, /// Slots for messages. - slots: [UnsafeCell>; BLOCK_CAP], + slots: [Slot; BLOCK_CAP], } impl Block { - /// Creates an empty block that starts at `start_index`. - fn new(start_index: usize) -> Block { - Block { - start_index, - slots: unsafe { mem::zeroed() }, - next: Atomic::null(), + /// Creates an empty block. + fn new() -> Block { + unsafe { mem::zeroed() } + } + + /// Waits until the next pointer is set. + fn wait_next(&self) -> *mut Block { + let mut backoff = Backoff::new(); + loop { + let next = self.next.load(Ordering::Acquire); + if !next.is_null() { + return next; + } + backoff.snooze(); + } + } + + /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. + unsafe fn destroy(this: *mut Block, start: usize) { + // It is not necessary to set the `DESTROY bit in the last slot because that slot has begun + // destruction of the block. + for i in start..BLOCK_CAP - 1 { + let slot = (*this).slots.get_unchecked(i); + + // Mark the `DESTROY` bit if a thread is still using the slot. + if slot.state.load(Ordering::Acquire) & READ == 0 + && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 + { + // If a thread is still using the slot, it will continue destruction of the block. + return; + } } + + // No thread is using the block, now it is safe to destroy it. + drop(Box::from_raw(this)); } } -/// Position in the channel (index and block). -/// -/// This struct describes the current position of the head or the tail in a linked list. +/// A position in a channel. +#[derive(Debug)] struct Position { /// The index in the channel. index: AtomicUsize, /// The block in the linked list. - block: Atomic>, + block: AtomicPtr>, +} + +/// The token type for the list flavor. +pub struct ListToken { + /// The block of slots. + block: *const u8, + + /// The offset into the block. + offset: usize, +} + +impl Default for ListToken { + #[inline] + fn default() -> Self { + ListToken { + block: ptr::null(), + offset: 0, + } + } } /// Unbounded channel implemented as a linked list. @@ -105,39 +152,30 @@ pub struct Channel { /// The tail of the channel. tail: CachePadded>, - /// Equals `true` when the channel is disconnected. - is_disconnected: AtomicBool, - /// Receivers waiting while the channel is empty and not disconnected. receivers: SyncWaker, - /// Indicates that dropping a `Channel` may drop values of type `T`. + /// Indicates that dropping a `Channel` may drop messages of type `T`. _marker: PhantomData, } impl Channel { /// Creates a new unbounded channel. pub fn new() -> Self { - let channel = Channel { + let block = Box::into_raw(Box::new(Block::::new())); + + Channel { head: CachePadded::new(Position { + block: AtomicPtr::new(block), index: AtomicUsize::new(0), - block: Atomic::null(), }), tail: CachePadded::new(Position { + block: AtomicPtr::new(block), index: AtomicUsize::new(0), - block: Atomic::null(), }), - is_disconnected: AtomicBool::new(false), receivers: SyncWaker::new(), _marker: PhantomData, - }; - - // Allocate an empty block for the first batch of messages. - let block = unsafe { Owned::new(Block::new(0)).into_shared(epoch::unprotected()) }; - channel.head.block.store(block, Ordering::Relaxed); - channel.tail.block.store(block, Ordering::Relaxed); - - channel + } } /// Returns a receiver handle to the channel. @@ -152,94 +190,82 @@ impl Channel { /// Attempts to reserve a slot for sending a message. fn start_send(&self, token: &mut Token) -> bool { - // If the channel is disconnected, return early. - if self.is_disconnected() { - token.list.slot = ptr::null(); - return true; - } - - let guard = epoch::pin(); let mut backoff = Backoff::new(); + let mut tail = self.tail.index.load(Ordering::Acquire); + let mut next_block = None; loop { - // These two load operations don't have to be `SeqCst`. If they happen to retrieve - // stale values, the following CAS will fail or won't even be attempted. - let tail_ptr = self.tail.block.load(Ordering::Acquire, &guard); - let tail = unsafe { tail_ptr.deref() }; - let tail_index = self.tail.index.load(Ordering::Relaxed); - - // Calculate the index of the corresponding slot in the block. - let offset = tail_index.wrapping_sub(tail.start_index); - - // Advance the current index one slot forward. - let new_index = tail_index.wrapping_add(1); - - // A closure that installs a block following `tail` in case it hasn't been yet. - let install_next_block = || { - let current = tail - .next - .compare_and_set( - Shared::null(), - Owned::new(Block::new(tail.start_index.wrapping_add(BLOCK_CAP))), - Ordering::AcqRel, - &guard, - ).unwrap_or_else(|err| err.current); - - let _ = - self.tail - .block - .compare_and_set(tail_ptr, current, Ordering::Release, &guard); - }; - - // If `tail_index` is pointing into `tail`... - if offset < BLOCK_CAP { - // Try moving the tail index forward. - if self - .tail - .index - .compare_exchange_weak( - tail_index, - new_index, - Ordering::SeqCst, - Ordering::Relaxed, - ).is_ok() - { - // If this was the last slot in the block, install a new block. + // Check if the channel is disconnected. + if tail & DISCONNECTED != 0 { + token.list.block = ptr::null(); + return true; + } + + // Calculate the offset of the index into the block. + let offset = (tail >> SHIFT) % LAP; + + // If we reached the end of the block, wait until the next one is installed. + if offset == BLOCK_CAP { + backoff.snooze(); + tail = self.tail.index.load(Ordering::Acquire); + continue; + } + + let block = self.tail.block.load(Ordering::Acquire); + + // If we're going to have to install the next block, allocate it in advance in order to + // make the wait for other threads as short as possible. + if offset + 1 == BLOCK_CAP && next_block.is_none() { + next_block = Some(Box::new(Block::::new())); + } + + let new_tail = tail + (1 << SHIFT); + + // Try advancing the tail forward. + match self.tail.index + .compare_exchange_weak( + tail, + new_tail, + Ordering::SeqCst, + Ordering::Acquire, + ) + { + Ok(_) => unsafe { + // If we've reached the end of the block, install the next one. if offset + 1 == BLOCK_CAP { - install_next_block(); - } + let next_block = Box::into_raw(next_block.unwrap()); + let next_index = new_tail.wrapping_add(1 << SHIFT); - unsafe { - let slot = tail.slots.get_unchecked(offset).get(); - token.list.slot = slot as *const Slot as *const u8; + self.tail.block.store(next_block, Ordering::Release); + self.tail.index.store(next_index, Ordering::Release); + (*block).next.store(next_block, Ordering::Release); } - break; - } - backoff.spin(); - } else if offset == BLOCK_CAP { - // Help install the next block. - install_next_block(); + token.list.block = block as *const u8; + token.list.offset = offset; + return true; + } + Err(t) => { + tail = t; + backoff.spin(); + } } } - - token.list.guard = Some(guard); - true } /// Writes a message into the channel. pub unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { // If there is no slot, the channel is disconnected. - if token.list.slot.is_null() { + if token.list.block.is_null() { return Err(msg); } - let slot = &*(token.list.slot as *const Slot); - let _guard: Guard = token.list.guard.take().unwrap(); - // Write the message into the slot. - (*slot).msg.get().write(ManuallyDrop::new(msg)); - (*slot).ready.store(true, Ordering::Release); + let block = token.list.block as *mut Block; + let offset = token.list.offset; + let slot = (*block).slots.get_unchecked(offset); + slot.msg.get().write(ManuallyDrop::new(msg)); + slot.state.fetch_or(WRITE, Ordering::Release); // Wake a sleeping receiver. self.receivers.notify(); @@ -248,120 +274,104 @@ impl Channel { /// Attempts to reserve a slot for receiving a message. fn start_recv(&self, token: &mut Token) -> bool { - let guard = epoch::pin(); let mut backoff = Backoff::new(); + let mut head = self.head.index.load(Ordering::Acquire); loop { - // Loading the head block doesn't have to be a `SeqCst` operation. If we get a stale - // value, the following CAS will fail or not even be attempted. Loading the head index - // must be `SeqCst` because we need the up-to-date value when checking whether the - // channel is empty. - let head_ptr = self.head.block.load(Ordering::Acquire, &guard); - let head = unsafe { head_ptr.deref() }; - let head_index = self.head.index.load(Ordering::SeqCst); - - // Calculate the index of the corresponding slot in the block. - let offset = head_index.wrapping_sub(head.start_index); - - // Advance the current index one slot forward. - let new_index = head_index.wrapping_add(1); - - // A closure that installs a block following `head` in case it hasn't been yet. - let install_next_block = || { - let current = head - .next - .compare_and_set( - Shared::null(), - Owned::new(Block::new(head.start_index.wrapping_add(BLOCK_CAP))), - Ordering::AcqRel, - &guard, - ).unwrap_or_else(|err| err.current); - - let _ = - self.head - .block - .compare_and_set(head_ptr, current, Ordering::Release, &guard); - }; - - // If `head_index` is pointing into `head`... - if offset < BLOCK_CAP { - let slot = unsafe { &*head.slots.get_unchecked(offset).get() }; - - // If this slot does not contain a message... - if !slot.ready.load(Ordering::Relaxed) { - let tail_index = self.tail.index.load(Ordering::SeqCst); - - // If the tail equals the head, that means the channel is empty. - if tail_index == head_index { - // If the channel is disconnected... - if self.is_disconnected() { - // ...and still empty... - if self.tail.index.load(Ordering::SeqCst) == tail_index { - // ...then receive an error. - token.list.slot = ptr::null(); - return true; - } - } else { - // Otherwise, the receive operation is not ready. - return false; - } + // Calculate the offset of the index into the block. + let offset = (head >> SHIFT) % LAP; + + // If we reached the end of the block, wait until the next one is installed. + if offset == BLOCK_CAP { + backoff.snooze(); + head = self.head.index.load(Ordering::Acquire); + continue; + } + + let mut new_head = head + (1 << SHIFT); + + if new_head & HAS_NEXT == 0 { + atomic::fence(Ordering::SeqCst); + let tail = self.tail.index.load(Ordering::Relaxed); + + // If the tail equals the head, that means the channel is empty. + if head >> SHIFT == tail >> SHIFT { + // If the channel is disconnected... + if tail & DISCONNECTED != 0 { + // ...then receive an error. + token.list.block = ptr::null(); + return true; + } else { + // Otherwise, the receive operation is not ready. + return false; } } - // Try moving the head index forward. - if self - .head - .index - .compare_exchange_weak( - head_index, - new_index, - Ordering::SeqCst, - Ordering::Relaxed, - ).is_ok() - { - // If this was the last slot in the block, install a new block and destroy the - // old one. + // If head and tail are not in the same block, set `HAS_NEXT` in head. + if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { + new_head |= HAS_NEXT; + } + } + + let block = self.head.block.load(Ordering::Acquire); + + // Try moving the head index forward. + match self.head.index + .compare_exchange_weak( + head, + new_head, + Ordering::SeqCst, + Ordering::Acquire, + ) + { + Ok(_) => unsafe { + // If we've reached the end of the block, move to the next one. if offset + 1 == BLOCK_CAP { - install_next_block(); - unsafe { - guard.defer_destroy(head_ptr); + let next = (*block).wait_next(); + let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); + if !(*next).next.load(Ordering::Relaxed).is_null() { + next_index |= HAS_NEXT; } + + self.head.block.store(next, Ordering::Release); + self.head.index.store(next_index, Ordering::Release); } - token.list.slot = slot as *const Slot as *const u8; - break; + token.list.block = block as *const u8; + token.list.offset = offset; + return true; + } + Err(h) => { + backoff.spin(); + head = h; } - - backoff.spin(); - } else if offset == BLOCK_CAP { - // Help install the next block. - install_next_block(); } } - - token.list.guard = Some(guard); - true } /// Reads a message from the channel. pub unsafe fn read(&self, token: &mut Token) -> Result { - if token.list.slot.is_null() { + if token.list.block.is_null() { // The channel is disconnected. return Err(()); } - let slot = &*(token.list.slot as *const Slot); - let _guard: Guard = token.list.guard.take().unwrap(); - - // Wait until the message becomes ready. - let mut backoff = Backoff::new(); - while !slot.ready.load(Ordering::Acquire) { - backoff.snooze(); - } - // Read the message. + let block = token.list.block as *mut Block; + let offset = token.list.offset; + let slot = (*block).slots.get_unchecked(offset); + slot.wait_write(); let m = slot.msg.get().read(); let msg = ManuallyDrop::into_inner(m); + + // Destroy the block if we've reached the end, or if another thread wanted to destroy but + // couldn't because we were busy reading from the slot. + if offset + 1 == BLOCK_CAP { + Block::destroy(block, 0); + } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { + Block::destroy(block, offset + 1); + } + Ok(msg) } @@ -447,12 +457,35 @@ impl Channel { pub fn len(&self) -> usize { loop { // Load the tail index, then load the head index. - let tail_index = self.tail.index.load(Ordering::SeqCst); - let head_index = self.head.index.load(Ordering::SeqCst); + let mut tail = self.tail.index.load(Ordering::SeqCst); + let mut head = self.head.index.load(Ordering::SeqCst); // If the tail index didn't change, we've got consistent indices to work with. - if self.tail.index.load(Ordering::SeqCst) == tail_index { - return tail_index.wrapping_sub(head_index); + if self.tail.index.load(Ordering::SeqCst) == tail { + // Erase the lower bits. + tail &= !((1 << SHIFT) - 1); + head &= !((1 << SHIFT) - 1); + + // Rotate indices so that head falls into the first block. + let lap = (head >> SHIFT) / LAP; + tail = tail.wrapping_sub((lap * LAP) << SHIFT); + head = head.wrapping_sub((lap * LAP) << SHIFT); + + // Remove the lower bits. + tail >>= SHIFT; + head >>= SHIFT; + + // Fix up indices if they fall onto block ends. + if head == BLOCK_CAP { + head = 0; + tail -= LAP; + } + if tail == BLOCK_CAP { + tail += 1; + } + + // Return the difference minus the number of blocks between tail and head. + return tail - head - tail / LAP; } } } @@ -464,21 +497,23 @@ impl Channel { /// Disconnects the channel and wakes up all blocked receivers. pub fn disconnect(&self) { - if !self.is_disconnected.swap(true, Ordering::SeqCst) { + let tail = self.tail.index.fetch_or(DISCONNECTED, Ordering::SeqCst); + + if tail & DISCONNECTED == 0 { self.receivers.disconnect(); } } /// Returns `true` if the channel is disconnected. pub fn is_disconnected(&self) -> bool { - self.is_disconnected.load(Ordering::SeqCst) + self.tail.index.load(Ordering::SeqCst) & DISCONNECTED != 0 } /// Returns `true` if the channel is empty. pub fn is_empty(&self) -> bool { - let head_index = self.head.index.load(Ordering::SeqCst); - let tail_index = self.tail.index.load(Ordering::SeqCst); - head_index == tail_index + let head = self.head.index.load(Ordering::SeqCst); + let tail = self.tail.index.load(Ordering::SeqCst); + head >> SHIFT == tail >> SHIFT } /// Returns `true` if the channel is full. @@ -489,38 +524,35 @@ impl Channel { impl Drop for Channel { fn drop(&mut self) { - // Get the tail and head indices. - let tail_index = self.tail.index.load(Ordering::Relaxed); - let mut head_index = self.head.index.load(Ordering::Relaxed); + let mut head = self.head.index.load(Ordering::Relaxed); + let mut tail = self.tail.index.load(Ordering::Relaxed); + let mut block = self.head.block.load(Ordering::Relaxed); + + // Erase the lower bits. + head &= !((1 << SHIFT) - 1); + tail &= !((1 << SHIFT) - 1); unsafe { - let mut head_ptr = self - .head - .block - .load(Ordering::Relaxed, epoch::unprotected()); - - // Manually drop all messages between `head_index` and `tail_index` and destroy the - // heap-allocated nodes along the way. - while head_index != tail_index { - let head = head_ptr.deref(); - let offset = head_index.wrapping_sub(head.start_index); - - let slot = &mut *head.slots.get_unchecked(offset).get(); - ManuallyDrop::drop(&mut (*slot).msg.get().read()); - - if offset + 1 == BLOCK_CAP { - let next = head.next.load(Ordering::Relaxed, epoch::unprotected()); - drop(head_ptr.into_owned()); - head_ptr = next; + // Drop all messages between head and tail and deallocate the heap-allocated blocks. + while head != tail { + let offset = (head >> SHIFT) % LAP; + + if offset < BLOCK_CAP { + // Drop the message in the slot. + let slot = (*block).slots.get_unchecked(offset); + ManuallyDrop::drop(&mut *(*slot).msg.get()); + } else { + // Deallocate the block and move to the next one. + let next = (*block).next.load(Ordering::Relaxed); + drop(Box::from_raw(block)); + block = next; } - head_index = head_index.wrapping_add(1); + head = head.wrapping_add(1 << SHIFT); } - // If there is one last remaining block in the end, destroy it. - if !head_ptr.is_null() { - drop(head_ptr.into_owned()); - } + // Deallocate the last remaining block. + drop(Box::from_raw(block)); } } } diff --git a/crossbeam-channel/src/lib.rs b/crossbeam-channel/src/lib.rs index 9c202cb92..dab2384a7 100644 --- a/crossbeam-channel/src/lib.rs +++ b/crossbeam-channel/src/lib.rs @@ -347,7 +347,6 @@ #![warn(missing_docs)] #![warn(missing_debug_implementations)] -extern crate crossbeam_epoch; extern crate crossbeam_utils; extern crate parking_lot; extern crate rand;