Skip to content

Commit

Permalink
Auto merge of #54174 - parched:park, r=alexcrichton
Browse files Browse the repository at this point in the history
Fix `thread` `park`/`unpark` synchronization

Previously the code below would not be guaranteed to exit when the
second unpark took the `return, // already unparked` path because there
was no write to synchronize with a read in `park`.

EDIT: doesn't actually require third thread
```
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{current, spawn, park};

static FLAG: AtomicBool = AtomicBool::new(false);

fn main() {
    let thread_0 = current();
    spawn(move || {
        thread_0.unpark();
        FLAG.store(true, Ordering::Relaxed);
        thread_0.unpark();
    });

    while !FLAG.load(Ordering::Relaxed) {
        park();
    }
}
```

I have some other ideas on how to improve the performance of `park` and `unpark` using fences, avoiding any atomic RMW when the state is already `NOTIFIED`, and also how to avoid calling `notify_one` without the mutex locked. But I need to write some micro benchmarks first, so I'll submit those changes at a later date if they prove to be faster.

Fixes #53366 I hope.
  • Loading branch information
bors committed Sep 19, 2018
2 parents 4f3ff5a + a3b8705 commit 20dc0c5
Showing 1 changed file with 26 additions and 18 deletions.
44 changes: 26 additions & 18 deletions src/libstd/thread/mod.rs
Expand Up @@ -800,7 +800,14 @@ pub fn park() {
match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
Err(NOTIFIED) => {
thread.inner.state.store(EMPTY, SeqCst);
// We must read here, even though we know it will be `NOTIFIED`.
// This is because `unpark` may have been called again since we read
// `NOTIFIED` in the `compare_exchange` above. We must perform an
// acquire operation that synchronizes with that `unpark` to observe
// any writes it made before the call to unpark. To do that we must
// read from the write it made to `state`.
let old = thread.inner.state.swap(EMPTY, SeqCst);
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
return;
} // should consume this notification, so prohibit spurious wakeups in next park.
Err(_) => panic!("inconsistent park state"),
Expand Down Expand Up @@ -889,7 +896,9 @@ pub fn park_timeout(dur: Duration) {
match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
Err(NOTIFIED) => {
thread.inner.state.store(EMPTY, SeqCst);
// We must read again here, see `park`.
let old = thread.inner.state.swap(EMPTY, SeqCst);
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
return;
} // should consume this notification, so prohibit spurious wakeups in next park.
Err(_) => panic!("inconsistent park_timeout state"),
Expand Down Expand Up @@ -1058,23 +1067,22 @@ impl Thread {
/// [park]: fn.park.html
#[stable(feature = "rust1", since = "1.0.0")]
pub fn unpark(&self) {
loop {
match self.inner.state.compare_exchange(EMPTY, NOTIFIED, SeqCst, SeqCst) {
Ok(_) => return, // no one was waiting
Err(NOTIFIED) => return, // already unparked
Err(PARKED) => {} // gotta go wake someone up
_ => panic!("inconsistent state in unpark"),
}

// Coordinate wakeup through the mutex and a condvar notification
let _lock = self.inner.lock.lock().unwrap();
match self.inner.state.compare_exchange(PARKED, NOTIFIED, SeqCst, SeqCst) {
Ok(_) => return self.inner.cvar.notify_one(),
Err(NOTIFIED) => return, // a different thread unparked
Err(EMPTY) => {} // parked thread went away, try again
_ => panic!("inconsistent state in unpark"),
}
// To ensure the unparked thread will observe any writes we made
// before this call, we must perform a release operation that `park`
// can synchronize with. To do that we must write `NOTIFIED` even if
// `state` is already `NOTIFIED`. That is why this must be a swap
// rather than a compare-and-swap that returns if it reads `NOTIFIED`
// on failure.
match self.inner.state.swap(NOTIFIED, SeqCst) {
EMPTY => return, // no one was waiting
NOTIFIED => return, // already unparked
PARKED => {} // gotta go wake someone up
_ => panic!("inconsistent state in unpark"),
}

// Coordinate wakeup through the mutex and a condvar notification
let _lock = self.inner.lock.lock().unwrap();
self.inner.cvar.notify_one()
}

/// Gets the thread's unique identifier.
Expand Down

0 comments on commit 20dc0c5

Please sign in to comment.