Skip to content

Commit

Permalink
Rollup merge of rust-lang#102773 - joboet:apple_parker, r=thomcc
Browse files Browse the repository at this point in the history
Use semaphores for thread parking on Apple platforms

Currently we use a mutex-condvar pair for thread parking on Apple systems. Unfortunately, `pthread_cond_timedwait` uses the real-time clock for measuring time, which causes problems when the system time changes. The parking implementation in this PR uses a semaphore instead, which measures monotonic time by default, avoiding these issues. As a further benefit, this has the potential to improve performance a bit, since `unpark` does not need to wait for a lock to be released.

Since the Mach semaphores are poorly documented (I could not find availability or stability guarantees for instance), this uses a [dispatch semaphore](https://developer.apple.com/documentation/dispatch/dispatch_semaphore?language=objc) instead. While it adds a layer of indirection (it uses Mach semaphores internally), the overhead is probably negligible.

Tested on macOS 12.5.

r? ``````@thomcc``````
  • Loading branch information
Dylan-DPC committed Oct 15, 2022
2 parents 46244f3 + c320ab9 commit cbe5e7b
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 1 deletion.
131 changes: 131 additions & 0 deletions library/std/src/sys/unix/thread_parker/darwin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
//! Thread parking for Darwin-based systems.
//!
//! Darwin actually has futex syscalls (`__ulock_wait`/`__ulock_wake`), but they
//! cannot be used in `std` because they are non-public (their use will lead to
//! rejection from the App Store) and because they are only available starting
//! with macOS version 10.12, even though the minimum target version is 10.7.
//!
//! Therefore, we need to look for other synchronization primitives. Luckily, Darwin
//! supports semaphores, which allow us to implement the behaviour we need with
//! only one primitive (as opposed to a mutex-condvar pair). We use the semaphore
//! provided by libdispatch, as the underlying Mach semaphore is only dubiously
//! public.

use crate::pin::Pin;
use crate::sync::atomic::{
AtomicI8,
Ordering::{Acquire, Release},
};
use crate::time::Duration;

type dispatch_semaphore_t = *mut crate::ffi::c_void;
type dispatch_time_t = u64;

const DISPATCH_TIME_NOW: dispatch_time_t = 0;
const DISPATCH_TIME_FOREVER: dispatch_time_t = !0;

// Contained in libSystem.dylib, which is linked by default.
extern "C" {
fn dispatch_time(when: dispatch_time_t, delta: i64) -> dispatch_time_t;
fn dispatch_semaphore_create(val: isize) -> dispatch_semaphore_t;
fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) -> isize;
fn dispatch_semaphore_signal(dsema: dispatch_semaphore_t) -> isize;
fn dispatch_release(object: *mut crate::ffi::c_void);
}

const EMPTY: i8 = 0;
const NOTIFIED: i8 = 1;
const PARKED: i8 = -1;

pub struct Parker {
semaphore: dispatch_semaphore_t,
state: AtomicI8,
}

unsafe impl Sync for Parker {}
unsafe impl Send for Parker {}

impl Parker {
pub unsafe fn new(parker: *mut Parker) {
let semaphore = dispatch_semaphore_create(0);
assert!(
!semaphore.is_null(),
"failed to create dispatch semaphore for thread synchronization"
);
parker.write(Parker { semaphore, state: AtomicI8::new(EMPTY) })
}

// Does not need `Pin`, but other implementation do.
pub unsafe fn park(self: Pin<&Self>) {
// The semaphore counter must be zero at this point, because unparking
// threads will not actually increase it until we signalled that we
// are waiting.

// Change NOTIFIED to EMPTY and EMPTY to PARKED.
if self.state.fetch_sub(1, Acquire) == NOTIFIED {
return;
}

// Another thread may increase the semaphore counter from this point on.
// If it is faster than us, we will decrement it again immediately below.
// If we are faster, we wait.

// Ensure that the semaphore counter has actually been decremented, even
// if the call timed out for some reason.
while dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER) != 0 {}

// At this point, the semaphore counter is zero again.

// We were definitely woken up, so we don't need to check the state.
// Still, we need to reset the state using a swap to observe the state
// change with acquire ordering.
self.state.swap(EMPTY, Acquire);
}

// Does not need `Pin`, but other implementation do.
pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
if self.state.fetch_sub(1, Acquire) == NOTIFIED {
return;
}

let nanos = dur.as_nanos().try_into().unwrap_or(i64::MAX);
let timeout = dispatch_time(DISPATCH_TIME_NOW, nanos);

let timeout = dispatch_semaphore_wait(self.semaphore, timeout) != 0;

let state = self.state.swap(EMPTY, Acquire);
if state == NOTIFIED && timeout {
// If the state was NOTIFIED but semaphore_wait returned without
// decrementing the count because of a timeout, it means another
// thread is about to call semaphore_signal. We must wait for that
// to happen to ensure the semaphore count is reset.
while dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER) != 0 {}
} else {
// Either a timeout occurred and we reset the state before any thread
// tried to wake us up, or we were woken up and reset the state,
// making sure to observe the state change with acquire ordering.
// Either way, the semaphore counter is now zero again.
}
}

// Does not need `Pin`, but other implementation do.
pub fn unpark(self: Pin<&Self>) {
let state = self.state.swap(NOTIFIED, Release);
if state == PARKED {
unsafe {
dispatch_semaphore_signal(self.semaphore);
}
}
}
}

impl Drop for Parker {
fn drop(&mut self) {
// SAFETY:
// We always ensure that the semaphore count is reset, so this will
// never cause an exception.
unsafe {
dispatch_release(self.semaphore);
}
}
}
13 changes: 12 additions & 1 deletion library/std/src/sys/unix/thread_parker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,18 @@
)))]

cfg_if::cfg_if! {
if #[cfg(target_os = "netbsd")] {
if #[cfg(all(
any(
target_os = "macos",
target_os = "ios",
target_os = "watchos",
target_os = "tvos",
),
not(miri),
))] {
mod darwin;
pub use darwin::Parker;
} else if #[cfg(target_os = "netbsd")] {
mod netbsd;
pub use netbsd::Parker;
} else {
Expand Down
22 changes: 22 additions & 0 deletions library/std/src/thread/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,28 @@ fn test_try_panic_any_message_unit_struct() {
}
}

#[test]
fn test_park_unpark_before() {
for _ in 0..10 {
thread::current().unpark();
thread::park();
}
}

#[test]
fn test_park_unpark_called_other_thread() {
for _ in 0..10 {
let th = thread::current();

let _guard = thread::spawn(move || {
super::sleep(Duration::from_millis(50));
th.unpark();
});

thread::park();
}
}

#[test]
fn test_park_timeout_unpark_before() {
for _ in 0..10 {
Expand Down

0 comments on commit cbe5e7b

Please sign in to comment.