Skip to content

Commit

Permalink
extract sem/time mod
Browse files Browse the repository at this point in the history
  • Loading branch information
flier committed Sep 3, 2018
1 parent 99fb734 commit ae3ee1d
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 47 deletions.
21 changes: 5 additions & 16 deletions src/blk.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
use std::os::raw::c_void;
use std::time::Duration;

use ffi::*;
use {time_after_delay, Queue};

/// A type indicating whether a timed wait on a dispatch block returned due to a time out or not.
#[derive(Clone, Copy, Debug, PartialEq)]
pub struct WaitTimeout;
use {Queue, Timeout, WaitTimeout};

/// Creates, synchronously executes, and releases a dispatch block from the specified block and flags.
pub fn perform<F>(flags: dispatch_block_flags_t, closure: F)
Expand Down Expand Up @@ -58,8 +53,8 @@ impl DispatchBlock {
}

/// Waits synchronously until execution of the specified dispatch block has completed or until the specified timeout has elapsed.
pub fn wait_timeout(&self, dur: Duration) -> Result<(), WaitTimeout> {
let when = time_after_delay(dur);
pub fn wait_timeout<T: Timeout>(&self, timeout: T) -> Result<(), WaitTimeout> {
let when = timeout.as_raw();

if unsafe { dispatch_block_wait(self.ptr, when) } == 0 {
Ok(())
Expand Down Expand Up @@ -142,19 +137,13 @@ mod tests {

assert!(!block.canceled());

assert_eq!(
block.wait_timeout(Duration::from_millis(100)),
Err(WaitTimeout)
);
assert_eq!(block.wait_timeout(100u32), Err(WaitTimeout));

block.cancel();

assert!(block.canceled());

assert_eq!(
block.wait_timeout(Duration::from_millis(100)),
Err(WaitTimeout)
);
assert_eq!(block.wait_timeout(100u32), Err(WaitTimeout));

assert!(!block.done());
}
Expand Down
5 changes: 3 additions & 2 deletions src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use Queue;

/// The destructor responsible for freeing the data when it is no longer needed.
pub trait Destructor {
/// Extracts the raw `dispatch_block_t`.
fn as_raw(self) -> dispatch_block_t;
}

impl Destructor for dispatch_block_t {
/// Extracts the raw destructor object.
fn as_raw(self) -> dispatch_block_t {
self
}
Expand Down Expand Up @@ -81,7 +81,7 @@ impl Data {
Data { ptr }
}

/// Extracts the raw dispatch data object.
/// Extracts the raw `dispatch_data_t`.
pub fn as_raw(&self) -> dispatch_data_t {
self.ptr
}
Expand All @@ -98,6 +98,7 @@ impl Data {
unsafe { dispatch_data_get_size(self.ptr) }
}

/// Returns `true` if the data has a length of 0.
pub fn is_empty(&self) -> bool {
self.ptr == unsafe { &_dispatch_data_empty as *const dispatch_object_s as dispatch_data_t }
|| self.len() == 0
Expand Down
2 changes: 0 additions & 2 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ impl ChannelType {
}
}

pub type CleanupHandler = fn(i32);

pub struct Channel {
ptr: dispatch_io_t,
}
Expand Down
39 changes: 12 additions & 27 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,22 @@ mod blk;
mod data;
#[cfg(target_os = "macos")]
mod io;
#[cfg(target_os = "macos")]
mod sem;
mod time;

#[cfg(target_os = "macos")]
pub use blk::{perform, DispatchBlock, WaitTimeout};
pub use blk::{perform, DispatchBlock};
#[cfg(target_os = "macos")]
pub use data::{
dispatch_data_destructor_default, dispatch_data_destructor_free,
dispatch_data_destructor_munmap, Data, Destructor,
};
#[cfg(target_os = "macos")]
pub use ffi::{DISPATCH_IO_STOP, DISPATCH_IO_STRICT_INTERVAL};
#[cfg(target_os = "macos")]
pub use sem::Semaphore;
pub use time::{at, now, Timeout, WaitTimeout};

/// The type of a dispatch queue.
#[derive(Clone, Debug, Hash, PartialEq)]
Expand Down Expand Up @@ -149,28 +155,6 @@ pub struct Queue {
ptr: dispatch_queue_t,
}

fn time_after_delay(delay: Duration) -> dispatch_time_t {
delay
.as_secs()
.checked_mul(1_000_000_000)
.and_then(|i| i.checked_add(delay.subsec_nanos() as u64))
.and_then(|i| {
if i < (i64::max_value() as u64) {
Some(i as i64)
} else {
None
}
})
.map_or(DISPATCH_TIME_FOREVER, |i| unsafe {
dispatch_time(DISPATCH_TIME_NOW, i)
})
}

/// Returns a `dispatch_time_t` corresponding to the wall time.
pub fn now() -> dispatch_time_t {
unsafe { dispatch_walltime(ptr::null(), 0) }
}

fn context_and_function<F>(closure: F) -> (*mut c_void, dispatch_function_t)
where
F: FnOnce(),
Expand Down Expand Up @@ -325,11 +309,12 @@ impl Queue {

/// After the specified delay, submits a closure for asynchronous execution
/// on self.
pub fn after<F>(&self, delay: Duration, work: F)
pub fn after<F, T>(&self, delay: T, work: F)
where
F: 'static + Send + FnOnce(),
T: Timeout,
{
let when = time_after_delay(delay);
let when = delay.as_raw();
let (context, work) = context_and_function(work);
unsafe {
dispatch_after_f(when, self.ptr, context, work);
Expand Down Expand Up @@ -581,8 +566,8 @@ impl Group {
/// Waits for all tasks associated with self to complete within the
/// specified duration.
/// Returns true if the tasks completed or false if the timeout elapsed.
pub fn wait_timeout(&self, timeout: Duration) -> bool {
let when = time_after_delay(timeout);
pub fn wait_timeout<T: Timeout>(&self, timeout: T) -> bool {
let when = timeout.as_raw();
let result = unsafe { dispatch_group_wait(self.ptr, when) };
result == 0
}
Expand Down
97 changes: 97 additions & 0 deletions src/sem.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use ffi::*;
use {Timeout, WaitTimeout};

/// A counting semaphore.
///
/// Calls to `Semaphore::signal` must be balanced with calls to `Semaphore::wait`.
/// Attempting to dispose of a semaphore with a count lower than value causes an EXC_BAD_INSTRUCTION exception.
#[derive(Debug)]
pub struct Semaphore {
ptr: dispatch_semaphore_t,
}

impl Semaphore {
/// Creates new counting semaphore with an initial value.
///
/// Passing zero for the value is useful for
/// when two threads need to reconcile the completion of a particular event.
/// Passing a value greater than zero is useful for managing a finite pool of resources,
/// where the pool size is equal to the value.
pub fn new(n: u64) -> Self {
let ptr = unsafe { dispatch_semaphore_create(n as i64) };

Semaphore { ptr }
}

/// Wait (decrement) for a semaphore.
///
/// Decrement the counting semaphore.
pub fn wait(&self) -> Result<(), WaitTimeout> {
self.wait_timeout(DISPATCH_TIME_FOREVER)
}

/// Wait (decrement) for a semaphoreor until the specified timeout has elapsed.
///
/// Decrement the counting semaphore.
pub fn wait_timeout<T: Timeout>(&self, timeout: T) -> Result<(), WaitTimeout> {
let when = timeout.as_raw();

let n = unsafe { dispatch_semaphore_wait(self.ptr, when) };

if n == 0 {
Ok(())
} else {
Err(WaitTimeout)
}
}

/// Signal (increment) a semaphore.
///
/// Increment the counting semaphore.
/// If the previous value was less than zero, this function wakes a waiting thread before returning.
///
/// This function returns `true` if a thread is woken. Otherwise, `false` is returned.
pub fn signal(&self) -> bool {
unsafe { dispatch_semaphore_signal(self.ptr) != 0 }
}
}

unsafe impl Sync for Semaphore {}
unsafe impl Send for Semaphore {}

impl Clone for Semaphore {
fn clone(&self) -> Self {
unsafe {
dispatch_retain(self.ptr);
}
Semaphore { ptr: self.ptr }
}
}

impl Drop for Semaphore {
fn drop(&mut self) {
unsafe {
dispatch_release(self.ptr);
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_semaphore() {
let sem = Semaphore::new(1);

assert!(sem.wait().is_ok());
assert_eq!(sem.wait_timeout(0).unwrap_err(), WaitTimeout);

assert!(!sem.signal());
assert!(sem.wait_timeout(DISPATCH_TIME_FOREVER).is_ok());

// Calls to dispatch_semaphore_signal must be balanced with calls to wait().
// Attempting to dispose of a semaphore with a count lower than value causes an EXC_BAD_INSTRUCTION exception.
assert!(!sem.signal());
}
}
95 changes: 95 additions & 0 deletions src/time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use std::ptr;
use std::time::{Duration, Instant, SystemTime, SystemTimeError, UNIX_EPOCH};

use libc::timespec;

use ffi::*;

/// A type indicating whether a timed wait on a dispatch object returned due to a time out or not.
#[derive(Clone, Copy, Debug, PartialEq)]
pub struct WaitTimeout;

/// When to timeout.
pub trait Timeout {
/// Extracts the raw `dispatch_time_t`.
fn as_raw(self) -> dispatch_time_t;
}

impl<T: Timeout> Timeout for Option<T> {
fn as_raw(self) -> dispatch_time_t {
if let Some(timeout) = self {
timeout.as_raw()
} else {
DISPATCH_TIME_NOW
}
}
}

impl Timeout for i32 {
fn as_raw(self) -> dispatch_time_t {
Duration::from_millis(self as u64).as_raw()
}
}

impl Timeout for u32 {
fn as_raw(self) -> dispatch_time_t {
Duration::from_millis(self as u64).as_raw()
}
}

impl Timeout for Duration {
fn as_raw(self) -> dispatch_time_t {
time_after_delay(self)
}
}

impl Timeout for dispatch_time_t {
fn as_raw(self) -> dispatch_time_t {
self
}
}

impl Timeout for Instant {
fn as_raw(self) -> dispatch_time_t {
self.duration_since(Instant::now()).as_raw()
}
}

impl Timeout for SystemTime {
fn as_raw(self) -> dispatch_time_t {
self.duration_since(SystemTime::now()).unwrap().as_raw()
}
}

fn time_after_delay(delay: Duration) -> dispatch_time_t {
delay
.as_secs()
.checked_mul(1_000_000_000)
.and_then(|i| i.checked_add(delay.subsec_nanos() as u64))
.and_then(|i| {
if i < (i64::max_value() as u64) {
Some(i as i64)
} else {
None
}
})
.map_or(DISPATCH_TIME_FOREVER, |i| unsafe {
dispatch_time(DISPATCH_TIME_NOW, i)
})
}

/// Returns a `dispatch_time_t` corresponding to the wall time.
pub fn now() -> dispatch_time_t {
unsafe { dispatch_walltime(ptr::null(), 0) }
}

/// Returns a `dispatch_time_t` corresponding to the given time.
pub fn at(tm: SystemTime) -> Result<dispatch_time_t, SystemTimeError> {
let dur = tm.duration_since(UNIX_EPOCH)?;
let ts = timespec {
tv_sec: dur.as_secs() as i64,
tv_nsec: dur.subsec_nanos() as i64,
};

Ok(unsafe { dispatch_walltime(&ts, 0) })
}

0 comments on commit ae3ee1d

Please sign in to comment.