Skip to content

Commit

Permalink
add multi-producer multi-consumer bounded queue to use for sleeper list
Browse files Browse the repository at this point in the history
  • Loading branch information
toffaletti authored and brson committed Oct 26, 2013
1 parent bf0e6eb commit 5876e21
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 56 deletions.
3 changes: 3 additions & 0 deletions src/libstd/rt/mod.rs
Expand Up @@ -139,6 +139,9 @@ mod message_queue;
/// A mostly lock-free multi-producer, single consumer queue.
mod mpsc_queue;

/// A lock-free multi-producer, multi-consumer bounded queue.
mod mpmc_bounded_queue;

/// A parallel data structure for tracking sleeping schedulers.
mod sleeper_list;

Expand Down
199 changes: 199 additions & 0 deletions src/libstd/rt/mpmc_bounded_queue.rs
@@ -0,0 +1,199 @@
/* Multi-producer/multi-consumer bounded queue
* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
* SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
* OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* The views and conclusions contained in the software and documentation are
* those of the authors and should not be interpreted as representing official
* policies, either expressed or implied, of Dmitry Vyukov.
*/

use unstable::sync::UnsafeArc;
use unstable::atomics::{AtomicUint,Relaxed,Release,Acquire};
use option::*;
use vec;
use clone::Clone;
use kinds::Send;
use num::{Exponential,Algebraic,Round};

struct Node<T> {
sequence: AtomicUint,
value: Option<T>,
}

struct State<T> {
buffer: ~[Node<T>],
mask: uint,
enqueue_pos: AtomicUint,
dequeue_pos: AtomicUint,
}

struct Queue<T> {
priv state: UnsafeArc<State<T>>,
}

impl<T: Send> State<T> {
fn with_capacity(capacity: uint) -> State<T> {
let capacity = if capacity < 2 || (capacity & (capacity - 1)) != 0 {
// use next power of 2 as capacity
2f64.pow(&((capacity as f64).log2().floor()+1f64)) as uint
} else {
capacity
};
let buffer = do vec::from_fn(capacity) |i:uint| {
Node{sequence:AtomicUint::new(i),value:None}
};
State{
buffer: buffer,
mask: capacity-1,
enqueue_pos: AtomicUint::new(0),
dequeue_pos: AtomicUint::new(0),
}
}

fn push(&mut self, value: T) -> bool {
let mask = self.mask;
let mut pos = self.enqueue_pos.load(Relaxed);
loop {
let node = &mut self.buffer[pos & mask];
let seq = node.sequence.load(Acquire);
let diff: int = seq as int - pos as int;

if diff == 0 {
let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed);
if enqueue_pos == pos {
node.value = Some(value);
node.sequence.store(pos+1, Release);
break
} else {
pos = enqueue_pos;
}
} else if (diff < 0) {
return false
} else {
pos = self.enqueue_pos.load(Relaxed);
}
}
true
}

fn pop(&mut self) -> Option<T> {
let mask = self.mask;
let mut pos = self.dequeue_pos.load(Relaxed);
loop {
let node = &mut self.buffer[pos & mask];
let seq = node.sequence.load(Acquire);
let diff: int = seq as int - (pos + 1) as int;
if diff == 0 {
let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed);
if dequeue_pos == pos {
let value = node.value.take();
node.sequence.store(pos + mask + 1, Release);
return value
} else {
pos = dequeue_pos;
}
} else if diff < 0 {
return None
} else {
pos = self.dequeue_pos.load(Relaxed);
}
}
}
}

impl<T: Send> Queue<T> {
pub fn with_capacity(capacity: uint) -> Queue<T> {
Queue{
state: UnsafeArc::new(State::with_capacity(capacity))
}
}

pub fn push(&mut self, value: T) -> bool {
unsafe { (*self.state.get()).push(value) }
}

pub fn pop(&mut self) -> Option<T> {
unsafe { (*self.state.get()).pop() }
}
}

impl<T: Send> Clone for Queue<T> {
fn clone(&self) -> Queue<T> {
Queue {
state: self.state.clone()
}
}
}

#[cfg(test)]
mod tests {
use prelude::*;
use option::*;
use task;
use comm;
use super::Queue;

#[test]
fn test() {
let nthreads = 8u;
let nmsgs = 1000u;
let mut q = Queue::with_capacity(nthreads*nmsgs);
assert_eq!(None, q.pop());

for _ in range(0, nthreads) {
let (port, chan) = comm::stream();
chan.send(q.clone());
do task::spawn_sched(task::SingleThreaded) {
let mut q = port.recv();
for i in range(0, nmsgs) {
assert!(q.push(i));
}
}
}

let mut completion_ports = ~[];
for _ in range(0, nthreads) {
let (completion_port, completion_chan) = comm::stream();
completion_ports.push(completion_port);
let (port, chan) = comm::stream();
chan.send(q.clone());
do task::spawn_sched(task::SingleThreaded) {
let mut q = port.recv();
let mut i = 0u;
loop {
match q.pop() {
None => {},
Some(_) => {
i += 1;
if i == nmsgs { break }
}
}
}
completion_chan.send(i);
}
}

for completion_port in completion_ports.iter() {
assert_eq!(nmsgs, completion_port.recv());
}
}
}
65 changes: 9 additions & 56 deletions src/libstd/rt/sleeper_list.rs
Expand Up @@ -11,84 +11,37 @@
//! Maintains a shared list of sleeping schedulers. Schedulers
//! use this to wake each other up.

use container::Container;
use vec::OwnedVector;
use option::{Option, Some, None};
use cell::Cell;
use unstable::sync::{UnsafeArc, LittleLock};
use rt::sched::SchedHandle;
use rt::mpmc_bounded_queue::Queue;
use option::*;
use clone::Clone;

pub struct SleeperList {
priv state: UnsafeArc<State>
}

struct State {
count: uint,
stack: ~[SchedHandle],
lock: LittleLock
priv q: Queue<SchedHandle>,
}

impl SleeperList {
pub fn new() -> SleeperList {
SleeperList {
state: UnsafeArc::new(State {
count: 0,
stack: ~[],
lock: LittleLock::new()
})
}
SleeperList{q: Queue::with_capacity(8*1024)}
}

pub fn push(&mut self, handle: SchedHandle) {
let handle = Cell::new(handle);
unsafe {
let state = self.state.get();
do (*state).lock.lock {
(*state).count += 1;
(*state).stack.push(handle.take());
}
}
pub fn push(&mut self, value: SchedHandle) {
assert!(self.q.push(value))
}

pub fn pop(&mut self) -> Option<SchedHandle> {
unsafe {
let state = self.state.get();
do (*state).lock.lock {
if !(*state).stack.is_empty() {
(*state).count -= 1;
Some((*state).stack.pop())
} else {
None
}
}
}
self.q.pop()
}

/// A pop that may sometimes miss enqueued elements, but is much faster
/// to give up without doing any synchronization
pub fn casual_pop(&mut self) -> Option<SchedHandle> {
unsafe {
let state = self.state.get();
// NB: Unsynchronized check
if (*state).count == 0 { return None; }
do (*state).lock.lock {
if !(*state).stack.is_empty() {
// NB: count is also protected by the lock
(*state).count -= 1;
Some((*state).stack.pop())
} else {
None
}
}
}
self.q.pop()
}
}

impl Clone for SleeperList {
fn clone(&self) -> SleeperList {
SleeperList {
state: self.state.clone()
q: self.q.clone()
}
}
}

0 comments on commit 5876e21

Please sign in to comment.