Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deque: add MPMC queue and new batched steal methods #290

Merged
merged 15 commits into from Jan 21, 2019
Merged

deque: add MPMC queue and new batched steal methods #290

merged 15 commits into from Jan 21, 2019

Conversation

ghost
Copy link

@ghost ghost commented Jan 17, 2019

This PR introduces a big set of changes:

  • Injector<T> is introduced, which is a MPMC FIFO queue. Every task scheduler (Rayon, Tokio, Go, TBB, Cilk, you name it) has an "injector" queue, through which tasks are spawned onto the thread pool. This is a special queue supports it batched steal operations.

  • fifo() and lifo() constructors are changed to Worker::new_fifo() and Worker::new_lifo(). Stealers are created using Worker::stealer().

  • steal_many() is renamed to steal_batch_and_pop().

  • There's a new batched steal method named steal_batch(). It moves only a batch of tasks into another Worker - it doesn't return an additional task.

  • Steal::Data is renamed to Steal::Success because steal_batch() returns Steal<()>, which means the success case doesn't have data.

  • Steal has FromIterator and or_else, which are useful combinators for implementing stealing strategies. There are a few other useful methods on it.

  • Steal<T> is now #[must_use].

As a simple demonstration, this is how one might efficiently implement a strategy for finding the next task to run. The strategy in this example is:

  1. Try popping one task from the local worker queue.
  2. Try stealing a batch of tasks from the global injector queue.
  3. Try stealing one task from another thread using the stealer list.
fn find_task<T>(
    local: &Worker<T>,
    global: &Injector<T>,
    stealers: &[Stealer<T>],
) -> Option<T> {
    // Pop a task from the local queue, if not empty.
    local.pop().or_else(|| {
        // Otherwise, we need to look for a task elsewhere.
        iter::repeat_with(|| {
            // Try stealing a batch of tasks from the global queue.
            global.steal_batch_and_pop(local)
                // Or try stealing a task from one of the other threads.
                .or_else(|| stealers.iter().map(|s| s.steal()).collect())
        })
        // Loop while no task was stolen and any steal operation needs to be retried.
        .find(|s| !s.is_retry())
        // Extract the stolen task, if there is one.
        .and_then(|s| s.success())
    })
}

The new crossbeam-deque interface looks like this:

struct Injector<T>;
struct Worker<T>;
struct Stealer<T>;

#[must_use]
enum Steal<T> {
    Empty,
    Success(T),
    Retry,
}

impl<T> Injector<T> {
    fn new() -> Injector<T>;

    fn is_empty(&self) -> bool;
    fn push(&self, task: T);

    fn steal(&self) -> Steal<T>;
    fn steal_batch(&self, dest: &Worker<T>) -> Steal<()>;
    fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>;
}

impl<T> Worker<T> {
    fn new_fifo() -> Worker<T>;
    fn new_lifo() -> Worker<T>;

    fn stealer(&self) -> Stealer<T>;
    fn is_empty(&self) -> bool;

    fn push(&self, task: T);
    fn pop(&self) -> Option<T>;
}

impl<T> Stealer<T> {
    fn is_empty(&self) -> bool;

    fn steal(&self) -> Steal<T>;
    fn steal_batch(&self, dest: &Worker<T>) -> Steal<()>;
    fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>;
}

impl<T> Steal<T> {
    fn is_empty(&self) -> bool;
    fn is_success(&self) -> bool;
    fn is_retry(&self) -> bool;

    fn success(self) -> Option<T>;
    fn or_else<F: FnOnce() -> Steal<T>>(self, f: F);
}

impl<T> FromIterator<Steal<T>> for Steal<T>;

Finally, a benchmark with Tokio updated to this branch and changed to use batched stealing from Injector:

 name                    before ns/iter  after ns/iter  diff ns/iter  diff %  speedup
 threadpool::spawn_many  3,055,261       2,767,283          -287,978  -9.43%   x 1.10
 threadpool::yield_many  10,258,470      10,092,862         -165,608  -1.61%   x 1.02

@ghost
Copy link
Author

ghost commented Jan 21, 2019

Let's merge so that we can start using this in Tokio.

bors r+

bors bot added a commit that referenced this pull request Jan 21, 2019
290: deque: add MPMC queue and new batched steal methods r=stjepang a=stjepang

This PR introduces a big set of changes:

* `Injector<T>` is introduced, which is a MPMC FIFO queue. Every task scheduler (Rayon, Tokio, Go, TBB, Cilk, you name it) has an "injector" queue, through which tasks are spawned onto the thread pool. This is a special queue supports it batched steal operations.

* `fifo()` and `lifo()` constructors are changed to `Worker::new_fifo()` and `Worker::new_lifo()`. Stealers are created using `Worker::stealer()`.

* `steal_many()` is renamed to `steal_batch_and_pop()`.

* There's a new batched steal method named `steal_batch()`. It moves only a batch of tasks into another `Worker` - it doesn't return an additional task.

* `Steal::Data` is renamed to `Steal::Success` because `steal_batch()` returns `Steal<()>`, which means the success case doesn't have data.

* `Steal` has `FromIterator` and `or_else`, which are useful combinators for implementing stealing strategies. There are a few other useful methods on it.

* `Steal<T>` is now `#[must_use]`.

As a simple demonstration, this is how one might efficiently implement a strategy for finding the next task to run. The strategy in this example is:

1. Try popping one task from the local worker queue.
2. Try stealing a batch of tasks from the global injector queue.
3. Try stealing one task from another thread using the stealer list.

```rust
fn find_task<T>(
    local: &Worker<T>,
    global: &Injector<T>,
    stealers: &[Stealer<T>],
) -> Option<T> {
    // Pop a task from the local queue, if not empty.
    local.pop().or_else(|| {
        // Otherwise, we need to look for a task elsewhere.
        iter::repeat_with(|| {
            // Try stealing a batch of tasks from the global queue.
            global.steal_batch_and_pop(local)
                // Or try stealing a task from one of the other threads.
                .or_else(|| stealers.iter().map(|s| s.steal()).collect())
        })
        // Loop while no task was stolen and any steal operation needs to be retried.
        .find(|s| !s.is_retry())
        // Extract the stolen task, if there is one.
        .and_then(|s| s.success())
    })
}
```

The new `crossbeam-deque` interface looks like this:

```rust
struct Injector<T>;
struct Worker<T>;
struct Stealer<T>;

#[must_use]
enum Steal<T> {
    Empty,
    Success(T),
    Retry,
}

impl<T> Injector<T> {
    fn new() -> Injector<T>;

    fn is_empty(&self) -> bool;
    fn push(&self, task: T);

    fn steal(&self) -> Steal<T>;
    fn steal_batch(&self, dest: &Worker<T>) -> Steal<()>;
    fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>;
}

impl<T> Worker<T> {
    fn new_fifo() -> Worker<T>;
    fn new_lifo() -> Worker<T>;

    fn stealer(&self) -> Stealer<T>;
    fn is_empty(&self) -> bool;

    fn push(&self, task: T);
    fn pop(&self) -> Option<T>;
}

impl<T> Stealer<T> {
    fn is_empty(&self) -> bool;

    fn steal(&self) -> Steal<T>;
    fn steal_batch(&self, dest: &Worker<T>) -> Steal<()>;
    fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>;
}

impl<T> Steal<T> {
    fn is_empty(&self) -> bool;
    fn is_success(&self) -> bool;
    fn is_retry(&self) -> bool;

    fn success(self) -> Option<T>;
    fn or_else<F: FnOnce() -> Steal<T>>(self, f: F);
}

impl<T> FromIterator<Steal<T>> for Steal<T>;
```

Finally, a benchmark with Tokio updated to this branch and changed to use batched stealing from `Injector`:

```
 name                    before ns/iter  after ns/iter  diff ns/iter  diff %  speedup
 threadpool::spawn_many  3,055,261       2,767,283          -287,978  -9.43%   x 1.10
 threadpool::yield_many  10,258,470      10,092,862         -165,608  -1.61%   x 1.02
```


Co-authored-by: Stjepan Glavina <stjepang@gmail.com>
@bors
Copy link
Contributor

bors bot commented Jan 21, 2019

Build succeeded

@bors bors bot merged commit ec7a260 into crossbeam-rs:master Jan 21, 2019
@ghost ghost deleted the deque-new-interface branch January 21, 2019 15:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

0 participants