Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

skiplist: a new iteator which is sendable among threads #1091

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 250 additions & 0 deletions crossbeam-skiplist/src/base.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! A lock-free skip list. See [`SkipList`].

use alloc::alloc::{alloc, dealloc, handle_alloc_error, Layout};
use alloc::sync::Arc;
use core::borrow::Borrow;
use core::cmp;
use core::fmt;
Expand Down Expand Up @@ -2121,3 +2122,252 @@ fn below_upper_bound<T: Ord + ?Sized>(bound: &Bound<&T>, other: &T) -> bool {
Bound::Excluded(key) => other < key,
}
}

/// A reference-counted entry in a skip list.
pub struct RefCountedEntry<K, V> {
node: *const Node<K, V>,
}

unsafe impl<K, V> Send for RefCountedEntry<K, V> {}

impl<K, V> Drop for RefCountedEntry<K, V> {
fn drop(&mut self) {
let guard = &epoch::pin();
unsafe { (*self.node).decrement(guard) }
}
}

impl<K, V> RefCountedEntry<K, V> {
/// Tries to create a new `RefCountedEntry` by incrementing the reference count of
/// a node.
fn try_acquire(node: &Node<K, V>) -> Option<RefCountedEntry<K, V>> {
if unsafe { node.try_increment() } {
Some(RefCountedEntry {
node: node as *const _,
})
} else {
None
}
}

/// Returns a reference to the key.
pub fn key(&self) -> &K {
unsafe { &(*self.node).key }
}

/// Returns a reference to the value.
pub fn value(&self) -> &V {
unsafe { &(*self.node).value }
}
}

/// A lock-free skip list.
pub struct ConcurrentSkipList<K, V> {
/// The underlying skip list
list: Arc<SkipList<K, V>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reply. It's a bit late here, I will see it tomorrow.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have modified the code. PTAL again, thanks @taiki-e . If this is desired, I will add more tests.

}

impl<K, V> ConcurrentSkipList<K, V> {
/// Returns a new, empty skip list.
pub fn new(collector: Collector) -> Self {
Self {
list: Arc::new(SkipList::new(collector)),
}
}
}

impl<K, V> Deref for ConcurrentSkipList<K, V>
where
K: Ord + Send + 'static,
V: Send + 'static,
{
type Target = SkipList<K, V>;

fn deref(&self) -> &Self::Target {
&self.list
}
}

impl<K, V> Clone for ConcurrentSkipList<K, V>
where
K: Ord,
{
fn clone(&self) -> Self {
Self {
list: self.list.clone(),
}
}
}

impl<K, V> ConcurrentSkipList<K, V>
where
K: Ord,
{
/// Search the first node that we acquire successfully.
fn search_bound_for_node<Q>(
&self,
bound: Bound<&Q>,
upper_bound: bool,
guard: &Guard,
) -> Option<RefCountedEntry<K, V>>
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
loop {
let node = self.list.search_bound(bound, upper_bound, guard)?;
if let Some(e) = RefCountedEntry::try_acquire(node) {
return Some(e);
}
}
}

/// Returns the successor of a node.
///
/// This will keep searching until a non-deleted node is found. If a deleted
/// node is reached then a search is performed using the given key.
fn next_node(
&self,
pred: &Tower<K, V>,
lower_bound: Bound<&K>,
guard: &Guard,
) -> Option<RefCountedEntry<K, V>> {
unsafe {
// Load the level 0 successor of the current node.
let mut curr = pred[0].load_consume(guard);

// If `curr` is marked, that means `pred` is removed and we have to use
// a key search.
if curr.tag() == 1 {
return self.search_bound_for_node(lower_bound, false, guard);
}

while let Some(c) = curr.as_ref() {
let succ = c.tower[0].load_consume(guard);

if succ.tag() == 1 {
if let Some(c) = self.list.help_unlink(&pred[0], c, succ, guard) {
// On success, continue searching through the current level.
curr = c;
continue;
} else {
// On failure, we cannot do anything reasonable to continue
// searching from the current position. Restart the search.
return self.search_bound_for_node(lower_bound, false, guard);
}
}

if let Some(e) = RefCountedEntry::try_acquire(c) {
return Some(e);
}

// acquire failed which means the node has been deleted
curr = succ;
}

None
}
}

/// Return a iterator of the skip list
pub fn iter(&self) -> IterRef<ConcurrentSkipList<K, V>, K, V> {
IterRef {
list: self.clone(),
cursor: None,
}
}
}

impl<K, V> AsRef<ConcurrentSkipList<K, V>> for ConcurrentSkipList<K, V>
where
K: Ord,
{
fn as_ref(&self) -> &ConcurrentSkipList<K, V> {
self
}
}

/// A iterator with a clone of the concurrent skip list
pub struct IterRef<T, K, V>
where
T: AsRef<ConcurrentSkipList<K, V>>,
{
list: T,
cursor: Option<RefCountedEntry<K, V>>,
}

impl<K, V, T: AsRef<ConcurrentSkipList<K, V>>> IterRef<T, K, V>
where
K: Ord,
{
/// Return whether the iterator is valid
pub fn valid(&self) -> bool {
self.cursor.is_some()
}

/// Returns a reference to the key.
pub fn key(&self) -> &K {
assert!(self.valid());
self.cursor.as_ref().unwrap().key()
}

/// Returns a reference to the value.
pub fn value(&self) -> &V {
assert!(self.valid());
self.cursor.as_ref().unwrap().value()
}

/// Move iterator to point to the next element
pub fn next(&mut self) {
assert!(self.valid());
let guard = &epoch::pin();
self.cursor = match &self.cursor {
Some(n) => self.list.as_ref().next_node(
unsafe { &(*n.node).tower },
Bound::Excluded(n.key()),
guard,
),
None => unreachable!(),
}
}

/// Move iterator to point to the previous element
pub fn prev(&mut self) {
assert!(self.valid());
let guard = &epoch::pin();
self.cursor = match &self.cursor {
Some(n) => {
self.list
.as_ref()
.search_bound_for_node(Bound::Excluded(n.key()), true, guard)
}
None => None,
};
}

/// Make iterator point to the element whose key is larger or equal to the target
pub fn seek(&mut self, target: &K) {
let guard = &epoch::pin();
self.cursor =
self.list
.as_ref()
.search_bound_for_node(Bound::Included(target), false, guard);
}

/// Make iterator point to the element whose key is less than the target
pub fn seek_for_prev(&mut self, target: &K) {
let guard = &epoch::pin();
self.cursor =
self.list
.as_ref()
.search_bound_for_node(Bound::Included(target), true, guard);
}

/// Make iterator point to the first element
pub fn seek_to_first(&mut self) {
let guard = &epoch::pin();
self.list.as_ref().list.check_guard(guard);
let pred = &self.list.as_ref().list.head;
self.cursor = self.list.as_ref().next_node(pred, Bound::Unbounded, guard);
}
}
35 changes: 35 additions & 0 deletions crossbeam-skiplist/tests/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::ops::Bound;
use std::sync::atomic::{AtomicUsize, Ordering};

use crossbeam_epoch as epoch;
use crossbeam_skiplist::base::ConcurrentSkipList;
use crossbeam_skiplist::{base, SkipList};

fn ref_entry<'a, K, V>(e: impl Into<Option<base::RefEntry<'a, K, V>>>) -> Entry<'a, K, V> {
Expand Down Expand Up @@ -104,6 +105,40 @@ fn remove() {
assert!(s.is_empty());
}

#[test]
fn remove2() {
let guard = &epoch::pin();
let insert = [0, 4, 2, 12, 8, 7, 11, 5];
let not_present = [1, 3, 6, 9, 10];
let remove = [2, 12, 8];
let remaining = [0, 4, 5, 7, 11];

let s = ConcurrentSkipList::new(epoch::default_collector().clone());

for &x in &insert {
s.insert(x, x * 10, guard).release(guard);
}
for x in &not_present {
assert!(s.remove(x, guard).is_none());
}
for x in &remove {
s.remove(x, guard).unwrap().release(guard);
}

let mut iter = s.iter();
let h = std::thread::spawn(move || {
let mut v = vec![];
iter.seek_to_first();
while iter.valid() {
v.push(*iter.key());
iter.next();
}
assert_eq!(v, remaining);
});

h.join().unwrap();
}

#[test]
fn entry() {
let guard = &epoch::pin();
Expand Down
Loading