Skip to content

Commit

Permalink
Rewrite Condvar::wait_timeout and make it public
Browse files Browse the repository at this point in the history
**The implementation is a direct adaptation of libcxx's
condition_variable implementation.**

pthread_cond_timedwait uses the non-monotonic system clock. It's
possible to change the clock to a monotonic via pthread_cond_attr, but
this is incompatible with static initialization. To deal with this, we
calculate the timeout using the system clock, and maintain a separate
record of the start and end times with a monotonic clock to be used for
calculation of the return value.
  • Loading branch information
sfackler committed Jan 16, 2015
1 parent 3d5fbae commit 08f6380
Show file tree
Hide file tree
Showing 8 changed files with 343 additions and 97 deletions.
123 changes: 113 additions & 10 deletions src/libstd/sync/condvar.rs
Expand Up @@ -12,6 +12,7 @@ use prelude::v1::*;

use sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use sync::poison::{self, LockResult};
use sys::time::SteadyTime;
use sys_common::condvar as sys;
use sys_common::mutex as sys_mutex;
use time::Duration;
Expand Down Expand Up @@ -153,20 +154,34 @@ impl Condvar {
///
/// Like `wait`, the lock specified will be re-acquired when this function
/// returns, regardless of whether the timeout elapsed or not.
// Note that this method is *not* public, and this is quite intentional
// because we're not quite sure about the semantics of relative vs absolute
// durations or how the timing guarantees play into what the system APIs
// provide. There are also additional concerns about the unix-specific
// implementation which may need to be addressed.
#[allow(dead_code)]
fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>, dur: Duration)
#[unstable]
pub fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>, dur: Duration)
-> LockResult<(MutexGuard<'a, T>, bool)> {
unsafe {
let me: &'static Condvar = &*(self as *const _);
me.inner.wait_timeout(guard, dur)
}
}

/// Wait on this condition variable for a notification, timing out after a
/// specified duration.
///
/// The semantics of this function are equivalent to `wait_timeout` except
/// that the implementation will repeatedly wait while the duration has not
/// passed and the provided function returns `false`.
#[unstable]
pub fn wait_timeout_with<'a, T, F>(&self,
guard: MutexGuard<'a, T>,
dur: Duration,
f: F)
-> LockResult<(MutexGuard<'a, T>, bool)>
where F: FnMut(LockResult<&mut T>) -> bool {
unsafe {
let me: &'static Condvar = &*(self as *const _);
me.inner.wait_timeout_with(guard, dur, f)
}
}

/// Wake up one blocked thread on this condvar.
///
/// If there is a blocked thread on this condition variable, then it will
Expand Down Expand Up @@ -220,9 +235,9 @@ impl StaticCondvar {
/// specified duration.
///
/// See `Condvar::wait_timeout`.
#[allow(dead_code)] // may want to stabilize this later, see wait_timeout above
fn wait_timeout<'a, T>(&'static self, guard: MutexGuard<'a, T>, dur: Duration)
-> LockResult<(MutexGuard<'a, T>, bool)> {
#[unstable = "may be merged with Condvar in the future"]
pub fn wait_timeout<'a, T>(&'static self, guard: MutexGuard<'a, T>, dur: Duration)
-> LockResult<(MutexGuard<'a, T>, bool)> {
let (poisoned, success) = unsafe {
let lock = mutex::guard_lock(&guard);
self.verify(lock);
Expand All @@ -236,6 +251,50 @@ impl StaticCondvar {
}
}

/// Wait on this condition variable for a notification, timing out after a
/// specified duration.
///
/// The implementation will repeatedly wait while the duration has not
/// passed and the function returns `false`.
///
/// See `Condvar::wait_timeout_with`.
#[unstable = "may be merged with Condvar in the future"]
pub fn wait_timeout_with<'a, T, F>(&'static self,
guard: MutexGuard<'a, T>,
dur: Duration,
mut f: F)
-> LockResult<(MutexGuard<'a, T>, bool)>
where F: FnMut(LockResult<&mut T>) -> bool {
// This could be made more efficient by pushing the implementation into sys::condvar
let start = SteadyTime::now();
let mut guard_result: LockResult<MutexGuard<'a, T>> = Ok(guard);
while !f(guard_result
.as_mut()
.map(|g| &mut **g)
.map_err(|e| poison::new_poison_error(&mut **e.get_mut()))) {
let now = SteadyTime::now();
let consumed = &now - &start;
let guard = guard_result.unwrap_or_else(|e| e.into_inner());
let (new_guard_result, no_timeout) = match self.wait_timeout(guard, dur - consumed) {
Ok((new_guard, no_timeout)) => (Ok(new_guard), no_timeout),
Err(err) => {
let (new_guard, no_timeout) = err.into_inner();
(Err(poison::new_poison_error(new_guard)), no_timeout)
}
};
guard_result = new_guard_result;
if !no_timeout {
let result = f(guard_result
.as_mut()
.map(|g| &mut **g)
.map_err(|e| poison::new_poison_error(&mut **e.get_mut())));
return poison::map_result(guard_result, |g| (g, result));
}
}

poison::map_result(guard_result, |g| (g, true))
}

/// Wake up one blocked thread on this condvar.
///
/// See `Condvar::notify_one`.
Expand Down Expand Up @@ -285,6 +344,7 @@ mod tests {
use super::{StaticCondvar, CONDVAR_INIT};
use sync::mpsc::channel;
use sync::{StaticMutex, MUTEX_INIT, Condvar, Mutex, Arc};
use sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
use thread::Thread;
use time::Duration;

Expand Down Expand Up @@ -372,6 +432,49 @@ mod tests {
unsafe { C.destroy(); M.destroy(); }
}

#[test]
fn wait_timeout_with() {
static C: StaticCondvar = CONDVAR_INIT;
static M: StaticMutex = MUTEX_INIT;
static S: AtomicUsize = ATOMIC_USIZE_INIT;

let g = M.lock().unwrap();
let (g, success) = C.wait_timeout_with(g, Duration::nanoseconds(1000), |_| false).unwrap();
assert!(!success);

let (tx, rx) = channel();
let _t = Thread::scoped(move || {
rx.recv().unwrap();
let g = M.lock().unwrap();
S.store(1, Ordering::SeqCst);
C.notify_one();
drop(g);

rx.recv().unwrap();
let g = M.lock().unwrap();
S.store(2, Ordering::SeqCst);
C.notify_one();
drop(g);

rx.recv().unwrap();
let _g = M.lock().unwrap();
S.store(3, Ordering::SeqCst);
C.notify_one();
});

let mut state = 0;
let (_g, success) = C.wait_timeout_with(g, Duration::days(1), |_| {
assert_eq!(state, S.load(Ordering::SeqCst));
tx.send(()).unwrap();
state += 1;
match state {
1|2 => false,
_ => true,
}
}).unwrap();
assert!(success);
}

#[test]
#[should_fail]
fn two_mutexes() {
Expand Down
17 changes: 16 additions & 1 deletion src/libstd/sync/poison.rs
Expand Up @@ -99,8 +99,23 @@ impl<T> fmt::Show for PoisonError<T> {
impl<T> PoisonError<T> {
/// Consumes this error indicating that a lock is poisoned, returning the
/// underlying guard to allow access regardless.
#[stable]
#[deprecated="renamed to into_inner"]
pub fn into_guard(self) -> T { self.guard }

/// Consumes this error indicating that a lock is poisoned, returning the
/// underlying guard to allow access regardless.
#[unstable]
pub fn into_inner(self) -> T { self.guard }

/// Reaches into this error indicating that a lock is poisoned, returning a
/// reference to the underlying guard to allow access regardless.
#[unstable]
pub fn get_ref(&self) -> &T { &self.guard }

/// Reaches into this error indicating that a lock is poisoned, returning a
/// mutable reference to the underlying guard to allow access regardless.
#[unstable]
pub fn get_mut(&mut self) -> &mut T { &mut self.guard }
}

impl<T> FromError<PoisonError<T>> for TryLockError<T> {
Expand Down
56 changes: 36 additions & 20 deletions src/libstd/sys/unix/condvar.rs
Expand Up @@ -10,9 +10,12 @@

use cell::UnsafeCell;
use libc;
use std::option::Option::{Some, None};
use sys::mutex::{self, Mutex};
use sys::time;
use sys::sync as ffi;
use time::Duration;
use num::{Int, NumCast};

pub struct Condvar { inner: UnsafeCell<ffi::pthread_cond_t> }

Expand Down Expand Up @@ -46,33 +49,46 @@ impl Condvar {
debug_assert_eq!(r, 0);
}

// This implementation is modeled after libcxx's condition_variable
// https://github.com/llvm-mirror/libcxx/blob/release_35/src/condition_variable.cpp#L46
// https://github.com/llvm-mirror/libcxx/blob/release_35/include/__mutex_base#L367
pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool {
assert!(dur >= Duration::nanoseconds(0));
if dur <= Duration::zero() {
return false;
}

// First, figure out what time it currently is
let mut tv = libc::timeval { tv_sec: 0, tv_usec: 0 };
let r = ffi::gettimeofday(&mut tv, 0 as *mut _);
// First, figure out what time it currently is, in both system and stable time.
// pthread_cond_timedwait uses system time, but we want to report timeout based on stable
// time.
let mut sys_now = libc::timeval { tv_sec: 0, tv_usec: 0 };
let stable_now = time::SteadyTime::now();
let r = ffi::gettimeofday(&mut sys_now, 0 as *mut _);
debug_assert_eq!(r, 0);

// Offset that time with the specified duration
let abs = Duration::seconds(tv.tv_sec as i64) +
Duration::microseconds(tv.tv_usec as i64) +
dur;
let ns = abs.num_nanoseconds().unwrap() as u64;
let timeout = libc::timespec {
tv_sec: (ns / 1000000000) as libc::time_t,
tv_nsec: (ns % 1000000000) as libc::c_long,
let seconds = NumCast::from(dur.num_seconds());
let timeout = match seconds.and_then(|s| sys_now.tv_sec.checked_add(s)) {
Some(sec) => {
libc::timespec {
tv_sec: sec,
tv_nsec: (dur - Duration::seconds(dur.num_seconds()))
.num_nanoseconds().unwrap() as libc::c_long,
}
}
None => {
libc::timespec {
tv_sec: Int::max_value(),
tv_nsec: 1_000_000_000 - 1,
}
}
};

// And wait!
let r = ffi::pthread_cond_timedwait(self.inner.get(), mutex::raw(mutex),
&timeout);
if r != 0 {
debug_assert_eq!(r as int, libc::ETIMEDOUT as int);
false
} else {
true
}
let r = ffi::pthread_cond_timedwait(self.inner.get(), mutex::raw(mutex), &timeout);
debug_assert!(r == libc::ETIMEDOUT || r == 0);

// ETIMEDOUT is not a totally reliable method of determining timeout due to clock shifts,
// so do the check ourselves
&time::SteadyTime::now() - &stable_now < dur
}

#[inline]
Expand Down
1 change: 1 addition & 0 deletions src/libstd/sys/unix/mod.rs
Expand Up @@ -52,6 +52,7 @@ pub mod sync;
pub mod tcp;
pub mod thread;
pub mod thread_local;
pub mod time;
pub mod timer;
pub mod tty;
pub mod udp;
Expand Down

12 comments on commit 08f6380

@bors
Copy link
Contributor

@bors bors commented on 08f6380 Jan 17, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

saw approval from alexcrichton
at sfackler@08f6380

@bors
Copy link
Contributor

@bors bors commented on 08f6380 Jan 17, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merging sfackler/rust/wait_timeout = 08f6380 into auto

@bors
Copy link
Contributor

@bors bors commented on 08f6380 Jan 17, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

status: {"merge_sha": "fb8d9f4ce24f577364402d120c0d5d734e0f90f3"}

@bors
Copy link
Contributor

@bors bors commented on 08f6380 Jan 17, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sfackler/rust/wait_timeout = 08f6380 merged ok, testing candidate = fb8d9f4

@bors
Copy link
Contributor

@bors bors commented on 08f6380 Jan 17, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

saw approval from alexcrichton
at sfackler@08f6380

@bors
Copy link
Contributor

@bors bors commented on 08f6380 Jan 17, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merging sfackler/rust/wait_timeout = 08f6380 into auto

@bors
Copy link
Contributor

@bors bors commented on 08f6380 Jan 17, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

status: {"merge_sha": "378fb5846d2d8dbc5ab24a5e92794c5c39d492dc"}

@bors
Copy link
Contributor

@bors bors commented on 08f6380 Jan 17, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sfackler/rust/wait_timeout = 08f6380 merged ok, testing candidate = 378fb58

@bors
Copy link
Contributor

@bors bors commented on 08f6380 Jan 17, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fast-forwarding master to auto = 378fb58

@bors
Copy link
Contributor

@bors bors commented on 08f6380 Jan 17, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fast-forwarding master to auto = 378fb58

Please sign in to comment.