diff --git a/src/blk.rs b/src/blk.rs index cd7e097..06d618b 100644 --- a/src/blk.rs +++ b/src/blk.rs @@ -1,12 +1,7 @@ use std::os::raw::c_void; -use std::time::Duration; use ffi::*; -use {time_after_delay, Queue}; - -/// A type indicating whether a timed wait on a dispatch block returned due to a time out or not. -#[derive(Clone, Copy, Debug, PartialEq)] -pub struct WaitTimeout; +use {Queue, Timeout, WaitTimeout}; /// Creates, synchronously executes, and releases a dispatch block from the specified block and flags. pub fn perform(flags: dispatch_block_flags_t, closure: F) @@ -58,8 +53,8 @@ impl DispatchBlock { } /// Waits synchronously until execution of the specified dispatch block has completed or until the specified timeout has elapsed. - pub fn wait_timeout(&self, dur: Duration) -> Result<(), WaitTimeout> { - let when = time_after_delay(dur); + pub fn wait_timeout(&self, timeout: T) -> Result<(), WaitTimeout> { + let when = timeout.as_raw(); if unsafe { dispatch_block_wait(self.ptr, when) } == 0 { Ok(()) @@ -142,19 +137,13 @@ mod tests { assert!(!block.canceled()); - assert_eq!( - block.wait_timeout(Duration::from_millis(100)), - Err(WaitTimeout) - ); + assert_eq!(block.wait_timeout(100u32), Err(WaitTimeout)); block.cancel(); assert!(block.canceled()); - assert_eq!( - block.wait_timeout(Duration::from_millis(100)), - Err(WaitTimeout) - ); + assert_eq!(block.wait_timeout(100u32), Err(WaitTimeout)); assert!(!block.done()); } diff --git a/src/data.rs b/src/data.rs index 3bb9b60..8272bb2 100644 --- a/src/data.rs +++ b/src/data.rs @@ -8,11 +8,11 @@ use Queue; /// The destructor responsible for freeing the data when it is no longer needed. pub trait Destructor { + /// Extracts the raw `dispatch_block_t`. fn as_raw(self) -> dispatch_block_t; } impl Destructor for dispatch_block_t { - /// Extracts the raw destructor object. fn as_raw(self) -> dispatch_block_t { self } @@ -81,7 +81,7 @@ impl Data { Data { ptr } } - /// Extracts the raw dispatch data object. + /// Extracts the raw `dispatch_data_t`. pub fn as_raw(&self) -> dispatch_data_t { self.ptr } @@ -98,6 +98,7 @@ impl Data { unsafe { dispatch_data_get_size(self.ptr) } } + /// Returns `true` if the data has a length of 0. pub fn is_empty(&self) -> bool { self.ptr == unsafe { &_dispatch_data_empty as *const dispatch_object_s as dispatch_data_t } || self.len() == 0 diff --git a/src/io.rs b/src/io.rs index 14d2bbf..4828b88 100644 --- a/src/io.rs +++ b/src/io.rs @@ -27,8 +27,6 @@ impl ChannelType { } } -pub type CleanupHandler = fn(i32); - pub struct Channel { ptr: dispatch_io_t, } diff --git a/src/lib.rs b/src/lib.rs index 7347e1e..9a286ef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,9 +73,12 @@ mod blk; mod data; #[cfg(target_os = "macos")] mod io; +#[cfg(target_os = "macos")] +mod sem; +mod time; #[cfg(target_os = "macos")] -pub use blk::{perform, DispatchBlock, WaitTimeout}; +pub use blk::{perform, DispatchBlock}; #[cfg(target_os = "macos")] pub use data::{ dispatch_data_destructor_default, dispatch_data_destructor_free, @@ -83,6 +86,9 @@ pub use data::{ }; #[cfg(target_os = "macos")] pub use ffi::{DISPATCH_IO_STOP, DISPATCH_IO_STRICT_INTERVAL}; +#[cfg(target_os = "macos")] +pub use sem::Semaphore; +pub use time::{after, at, now, Timeout, WaitTimeout}; /// The type of a dispatch queue. #[derive(Clone, Debug, Hash, PartialEq)] @@ -149,28 +155,6 @@ pub struct Queue { ptr: dispatch_queue_t, } -fn time_after_delay(delay: Duration) -> dispatch_time_t { - delay - .as_secs() - .checked_mul(1_000_000_000) - .and_then(|i| i.checked_add(delay.subsec_nanos() as u64)) - .and_then(|i| { - if i < (i64::max_value() as u64) { - Some(i as i64) - } else { - None - } - }) - .map_or(DISPATCH_TIME_FOREVER, |i| unsafe { - dispatch_time(DISPATCH_TIME_NOW, i) - }) -} - -/// Returns a `dispatch_time_t` corresponding to the wall time. -pub fn now() -> dispatch_time_t { - unsafe { dispatch_walltime(ptr::null(), 0) } -} - fn context_and_function(closure: F) -> (*mut c_void, dispatch_function_t) where F: FnOnce(), @@ -325,11 +309,12 @@ impl Queue { /// After the specified delay, submits a closure for asynchronous execution /// on self. - pub fn after(&self, delay: Duration, work: F) + pub fn after(&self, delay: T, work: F) where F: 'static + Send + FnOnce(), + T: Timeout, { - let when = time_after_delay(delay); + let when = delay.as_raw(); let (context, work) = context_and_function(work); unsafe { dispatch_after_f(when, self.ptr, context, work); @@ -581,8 +566,8 @@ impl Group { /// Waits for all tasks associated with self to complete within the /// specified duration. /// Returns true if the tasks completed or false if the timeout elapsed. - pub fn wait_timeout(&self, timeout: Duration) -> bool { - let when = time_after_delay(timeout); + pub fn wait_timeout(&self, timeout: T) -> bool { + let when = timeout.as_raw(); let result = unsafe { dispatch_group_wait(self.ptr, when) }; result == 0 } diff --git a/src/sem.rs b/src/sem.rs new file mode 100644 index 0000000..c06a2f2 --- /dev/null +++ b/src/sem.rs @@ -0,0 +1,97 @@ +use ffi::*; +use {Timeout, WaitTimeout}; + +/// A counting semaphore. +/// +/// Calls to `Semaphore::signal` must be balanced with calls to `Semaphore::wait`. +/// Attempting to dispose of a semaphore with a count lower than value causes an EXC_BAD_INSTRUCTION exception. +#[derive(Debug)] +pub struct Semaphore { + ptr: dispatch_semaphore_t, +} + +impl Semaphore { + /// Creates new counting semaphore with an initial value. + /// + /// Passing zero for the value is useful for + /// when two threads need to reconcile the completion of a particular event. + /// Passing a value greater than zero is useful for managing a finite pool of resources, + /// where the pool size is equal to the value. + pub fn new(n: u64) -> Self { + let ptr = unsafe { dispatch_semaphore_create(n as i64) }; + + Semaphore { ptr } + } + + /// Wait (decrement) for a semaphore. + /// + /// Decrement the counting semaphore. + pub fn wait(&self) -> Result<(), WaitTimeout> { + self.wait_timeout(DISPATCH_TIME_FOREVER) + } + + /// Wait (decrement) for a semaphoreor until the specified timeout has elapsed. + /// + /// Decrement the counting semaphore. + pub fn wait_timeout(&self, timeout: T) -> Result<(), WaitTimeout> { + let when = timeout.as_raw(); + + let n = unsafe { dispatch_semaphore_wait(self.ptr, when) }; + + if n == 0 { + Ok(()) + } else { + Err(WaitTimeout) + } + } + + /// Signal (increment) a semaphore. + /// + /// Increment the counting semaphore. + /// If the previous value was less than zero, this function wakes a waiting thread before returning. + /// + /// This function returns `true` if a thread is woken. Otherwise, `false` is returned. + pub fn signal(&self) -> bool { + unsafe { dispatch_semaphore_signal(self.ptr) != 0 } + } +} + +unsafe impl Sync for Semaphore {} +unsafe impl Send for Semaphore {} + +impl Clone for Semaphore { + fn clone(&self) -> Self { + unsafe { + dispatch_retain(self.ptr); + } + Semaphore { ptr: self.ptr } + } +} + +impl Drop for Semaphore { + fn drop(&mut self) { + unsafe { + dispatch_release(self.ptr); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_semaphore() { + let sem = Semaphore::new(1); + + assert!(sem.wait().is_ok()); + assert_eq!(sem.wait_timeout(0).unwrap_err(), WaitTimeout); + + assert!(!sem.signal()); + assert!(sem.wait_timeout(DISPATCH_TIME_FOREVER).is_ok()); + + // Calls to dispatch_semaphore_signal must be balanced with calls to wait(). + // Attempting to dispose of a semaphore with a count lower than value causes an EXC_BAD_INSTRUCTION exception. + assert!(!sem.signal()); + } +} diff --git a/src/time.rs b/src/time.rs new file mode 100644 index 0000000..09fcf5a --- /dev/null +++ b/src/time.rs @@ -0,0 +1,96 @@ +use std::ptr; +use std::time::{Duration, Instant, SystemTime, SystemTimeError, UNIX_EPOCH}; + +use libc::timespec; + +use ffi::*; + +/// A type indicating whether a timed wait on a dispatch object returned due to a time out or not. +#[derive(Clone, Copy, Debug, PartialEq)] +pub struct WaitTimeout; + +/// When to timeout. +pub trait Timeout { + /// Extracts the raw `dispatch_time_t`. + fn as_raw(self) -> dispatch_time_t; +} + +impl Timeout for Option { + fn as_raw(self) -> dispatch_time_t { + if let Some(timeout) = self { + timeout.as_raw() + } else { + DISPATCH_TIME_NOW + } + } +} + +impl Timeout for i32 { + fn as_raw(self) -> dispatch_time_t { + Duration::from_millis(self as u64).as_raw() + } +} + +impl Timeout for u32 { + fn as_raw(self) -> dispatch_time_t { + Duration::from_millis(self as u64).as_raw() + } +} + +impl Timeout for Duration { + fn as_raw(self) -> dispatch_time_t { + after(self) + } +} + +impl Timeout for dispatch_time_t { + fn as_raw(self) -> dispatch_time_t { + self + } +} + +impl Timeout for Instant { + fn as_raw(self) -> dispatch_time_t { + self.duration_since(Instant::now()).as_raw() + } +} + +impl Timeout for SystemTime { + fn as_raw(self) -> dispatch_time_t { + self.duration_since(SystemTime::now()).unwrap().as_raw() + } +} + +/// Returns a `dispatch_time_t` corresponding to the given time. +pub fn after(delay: Duration) -> dispatch_time_t { + delay + .as_secs() + .checked_mul(1_000_000_000) + .and_then(|i| i.checked_add(delay.subsec_nanos() as u64)) + .and_then(|i| { + if i < (i64::max_value() as u64) { + Some(i as i64) + } else { + None + } + }) + .map_or(DISPATCH_TIME_FOREVER, |i| unsafe { + dispatch_time(DISPATCH_TIME_NOW, i) + }) +} + +/// Returns a `dispatch_time_t` corresponding to the wall time. +pub fn now() -> dispatch_time_t { + unsafe { dispatch_walltime(ptr::null(), 0) } +} + +/// Returns a `dispatch_time_t` corresponding to the given time. +pub fn at(tm: SystemTime) -> Result { + let dur = tm.duration_since(UNIX_EPOCH)?; + let ts = timespec { + tv_sec: dur.as_secs() as i64, + tv_nsec: dur.subsec_nanos() as i64, + }; + + Ok(unsafe { dispatch_walltime(&ts, 0) }) +}