Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

skiplist: a new iteator which is sendable among threads #1091

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 269 additions & 0 deletions crossbeam-skiplist/src/base.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! A lock-free skip list. See [`SkipList`].

use alloc::alloc::{alloc, dealloc, handle_alloc_error, Layout};
use alloc::sync::Arc;
use core::borrow::Borrow;
use core::cmp;
use core::fmt;
Expand Down Expand Up @@ -515,6 +516,14 @@ where
}
}

/// Returns an iterator over all entries in the skip list who owns a reference to the skiplist.
pub fn owned_iter(self: &Arc<Self>) -> OwnedIter<K, V> {
OwnedIter {
list: self.clone(),
cursor: None,
}
}

/// Returns an iterator over a subset of entries in the skip list.
pub fn range<'a: 'g, 'g, Q, R>(
&'a self,
Expand Down Expand Up @@ -670,6 +679,72 @@ where
}
}

/// Returns the successor of a node.
///
/// This will keep searching until a non-deleted node is found. If a deleted
/// node is reached then a search is performed using the given key.
fn next_node_acquire(
&self,
pred: &Tower<K, V>,
lower_bound: Bound<&K>,
guard: &Guard,
) -> Option<OwnedEntry<K, V>> {
unsafe {
// Load the level 0 successor of the current node.
let mut curr = pred[0].load_consume(guard);

// If `curr` is marked, that means `pred` is removed and we have to use
// a key search.
if curr.tag() == 1 {
return self.search_bound_for_node_acquire(lower_bound, false, guard);
}

while let Some(c) = curr.as_ref() {
let succ = c.tower[0].load_consume(guard);

if succ.tag() == 1 {
if let Some(c) = self.help_unlink(&pred[0], c, succ, guard) {
// On success, continue searching through the current level.
curr = c;
continue;
} else {
// On failure, we cannot do anything reasonable to continue
// searching from the current position. Restart the search.
return self.search_bound_for_node_acquire(lower_bound, false, guard);
}
}

if let Some(e) = OwnedEntry::try_acquire(c) {
return Some(e);
}

// acquire failed which means the node has been deleted
curr = succ;
}

None
}
}

/// Search the first node that we acquire successfully.
fn search_bound_for_node_acquire<Q>(
&self,
bound: Bound<&Q>,
upper_bound: bool,
guard: &Guard,
) -> Option<OwnedEntry<K, V>>
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
loop {
let node = self.search_bound(bound, upper_bound, guard)?;
if let Some(e) = OwnedEntry::try_acquire(node) {
return Some(e);
}
}
}

/// Searches for first/last node that is greater/less/equal to a key in the skip list.
///
/// If `upper_bound == true`: the last node less than (or equal to) the key.
Expand Down Expand Up @@ -2121,3 +2196,197 @@ fn below_upper_bound<T: Ord + ?Sized>(bound: &Bound<&T>, other: &T) -> bool {
Bound::Excluded(key) => other < key,
}
}

/// An entry where the node is ref counted in a skip list.
///
/// You *must* call `release` to free this type, otherwise the node will be
/// leaked. This is because releasing the entry requires a `Guard`.
struct OwnedEntry<K, V> {
node: *const Node<K, V>,
released: bool,
}

impl<K, V> OwnedEntry<K, V> {
/// Tries to create a new `RefCountedEntry` by incrementing the reference count of
/// a node.
fn try_acquire(node: &Node<K, V>) -> Option<OwnedEntry<K, V>> {
if unsafe { node.try_increment() } {
Some(OwnedEntry {
node: node as *const _,
released: false,
})
} else {
None
}
}

/// Returns a reference to the key.
fn key(&self) -> &K {
unsafe { &(*self.node).key }
}

/// Returns a reference to the value.
fn value(&self) -> &V {
unsafe { &(*self.node).value }
}

/// Releases the reference on the entry.
fn release(mut self, guard: &Guard) {
self.released = true;
unsafe { (*self.node).decrement(guard) }
}
}

impl<K, V> fmt::Debug for OwnedEntry<K, V>
where
K: fmt::Debug,
V: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("OwnedEntry")
.field(self.key())
.field(self.value())
.finish()
}
}

unsafe impl<K, V> Send for OwnedEntry<K, V> {}

impl<K, V> Drop for OwnedEntry<K, V> {
fn drop(&mut self) {
assert!(self.released);
}
}

/// A iterator with a clone of the concurrent skip list
pub struct OwnedIter<K, V> {
list: Arc<SkipList<K, V>>,
cursor: Option<OwnedEntry<K, V>>,
}

impl<K, V> fmt::Debug for OwnedIter<K, V>
where
K: fmt::Debug,
V: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut d = f.debug_struct("OwnedIter");
match &self.cursor {
None => d.field("cursor", &None::<(&K, &V)>),
Some(e) => d.field("cursor", &(e.key(), e.value())),
};
d.finish()
}
}

impl<K, V> Drop for OwnedIter<K, V> {
fn drop(&mut self) {
if let Some(cursor) = self.cursor.take() {
let guard = &epoch::pin();
cursor.release(guard);
}
}
}

impl<K, V> OwnedIter<K, V>
where
K: Ord,
{
/// Return whether the iterator is valid
pub fn valid(&self) -> bool {
self.cursor.is_some()
}

/// Returns a reference to the key.
pub fn key(&self) -> &K {
assert!(self.valid());
self.cursor.as_ref().unwrap().key()
}

/// Returns a reference to the value.
pub fn value(&self) -> &V {
assert!(self.valid());
self.cursor.as_ref().unwrap().value()
}

/// Move iterator to point to the next element
pub fn next(&mut self, guard: &Guard) {
assert!(self.valid());
self.list.as_ref().check_guard(guard);
self.cursor = match self.cursor.take() {
Some(n) => {
let next_node = self.list.as_ref().next_node_acquire(
unsafe { &(*n.node).tower },
Bound::Excluded(n.key()),
guard,
);
n.release(guard);
next_node
}
None => unreachable!(),
}
}

/// Move iterator to point to the previous element
pub fn prev(&mut self, guard: &Guard) {
assert!(self.valid());
self.list.as_ref().check_guard(guard);
self.cursor = match self.cursor.take() {
Some(n) => {
let next_node = self.list.as_ref().search_bound_for_node_acquire(
Bound::Excluded(n.key()),
true,
guard,
);
n.release(guard);
next_node
}
None => None,
};
}

/// Make iterator point to the element whose key is larger or equal to the target
pub fn seek<Q>(&mut self, target: &Q, guard: &Guard)
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
self.list.as_ref().check_guard(guard);
if let Some(n) = self.cursor.take() {
n.release(guard);
}
self.cursor =
self.list
.as_ref()
.search_bound_for_node_acquire(Bound::Included(target), false, guard);
}

/// Make iterator point to the element whose key is less than the target
pub fn seek_for_prev<Q>(&mut self, target: &Q, guard: &Guard)
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
self.list.as_ref().check_guard(guard);
if let Some(n) = self.cursor.take() {
n.release(guard);
}
self.cursor =
self.list
.as_ref()
.search_bound_for_node_acquire(Bound::Included(target), true, guard);
}

/// Make iterator point to the first element
pub fn seek_to_first(&mut self, guard: &Guard) {
self.list.as_ref().check_guard(guard);
if let Some(n) = self.cursor.take() {
n.release(guard);
}
let pred = &self.list.as_ref().head;
self.cursor = self
.list
.as_ref()
.next_node_acquire(pred, Bound::Unbounded, guard);
}
}
36 changes: 36 additions & 0 deletions crossbeam-skiplist/tests/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::ops::Bound;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use crossbeam_epoch as epoch;
use crossbeam_skiplist::{base, SkipList};
Expand Down Expand Up @@ -104,6 +105,41 @@ fn remove() {
assert!(s.is_empty());
}

#[test]
fn remove2() {
let guard = &epoch::pin();
let insert = [0, 4, 2, 12, 8, 7, 11, 5];
let not_present = [1, 3, 6, 9, 10];
let remove = [2, 12, 8];
let remaining = [0, 4, 5, 7, 11];

let s = Arc::new(SkipList::new(epoch::default_collector().clone()));

for &x in &insert {
s.insert(x, x * 10, guard).release(guard);
}
for x in &not_present {
assert!(s.remove(x, guard).is_none());
}
for x in &remove {
s.remove(x, guard).unwrap().release(guard);
}

let mut iter = s.owned_iter();
let h = std::thread::spawn(move || {
let mut v = vec![];
let guard = &epoch::pin();
iter.seek_to_first(guard);
while iter.valid() {
v.push(*iter.key());
iter.next(guard);
}
assert_eq!(v, remaining);
});

h.join().unwrap();
}

#[test]
fn entry() {
let guard = &epoch::pin();
Expand Down
Loading