Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented GCD timer #22

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "dispatch"
version = "0.2.0"
authors = ["Steven Sheldon"]
version = "0.2.2"
authors = ["Steven Sheldon", "Sunip K. Mukherjee"]
edition = "2018"

description = "Rust wrapper for Apple's Grand Central Dispatch."
Expand Down
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,34 @@ assert!(nums == [2, 3]);
let nums = queue.map(nums, |x| x.to_string());
assert!(nums[0] == "2");
```

# Timer Events

GCD provides a timer facility that can be used to schedule blocks of code to
execute periodically, starting after a delay. The `TimerNode` type is a wrapper
around a dispatch source that can be used to schedule timer events.

`TimerNode` has the `schedule` method to schedule a timer event, the `update`
method to update the timer's interval, and the `cancel` method to cancel the
timer. Dropping a `TimerNode` will cancel the timer.

```rust
use dispatch::TimerNode;
use std::time::Duration;
use std::thread::sleep;
use std::sync::{Arc, Mutex};

let count = Arc::new(Mutex::new(0));
let count_clone = count.clone();
let mut timer = TimerNode::schedule(Duration::from_millis(10), Duration::from_secs(0), None, move || {
let mut count = count_clone.lock().unwrap();
*count += 1;
println!("Hello, counter! -> {}", *count);
}).unwrap();
sleep(Duration::from_millis(100));
timer.update(Duration::from_millis(20), Duration::from_secs(0), None); // change the time period
sleep(Duration::from_millis(100));
drop(timer); // cancel the timer
println!("Counter: {}", *count.lock().unwrap());
assert!(*count.lock().unwrap() >= 15);
```
2 changes: 1 addition & 1 deletion examples/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
extern crate dispatch;

use dispatch::{Queue, QueuePriority};
use std::io;
use std::process::exit;
use dispatch::{Queue, QueuePriority};

/// Prompts for a number and adds it to the given sum.
///
Expand Down
154 changes: 124 additions & 30 deletions src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
use std::os::raw::{c_char, c_long, c_ulong, c_void};

#[repr(C)]
pub struct dispatch_object_s { _private: [u8; 0] }
pub struct dispatch_object_s {
_private: [u8; 0],
}

// dispatch_block_t
pub type dispatch_function_t = extern fn(*mut c_void);
pub type dispatch_function_t = extern "C" fn(*mut c_void);
pub type dispatch_semaphore_t = *mut dispatch_object_s;
pub type dispatch_group_t = *mut dispatch_object_s;
pub type dispatch_object_t = *mut dispatch_object_s;
pub type dispatch_source_t = *mut dispatch_object_s;
pub type dispatch_source_type_t = *const dispatch_object_s;
pub type dispatch_once_t = c_long;
pub type dispatch_queue_t = *mut dispatch_object_s;
pub type dispatch_time_t = u64;
Expand All @@ -25,39 +29,88 @@ pub type dispatch_time_t = u64;
// dispatch_io_interval_flags_t
pub type dispatch_queue_attr_t = *const dispatch_object_s;

#[cfg_attr(any(target_os = "macos", target_os = "ios"),
link(name = "System", kind = "dylib"))]
#[cfg_attr(not(any(target_os = "macos", target_os = "ios")),
link(name = "dispatch", kind = "dylib"))]
extern {
#[cfg_attr(
any(target_os = "macos", target_os = "ios"),
link(name = "System", kind = "dylib")
)]
#[cfg_attr(
not(any(target_os = "macos", target_os = "ios")),
link(name = "dispatch", kind = "dylib")
)]
extern "C" {
static _dispatch_main_q: dispatch_object_s;
static _dispatch_queue_attr_concurrent: dispatch_object_s;
static _dispatch_source_type_data_add: dispatch_object_s;
static _dispatch_source_type_data_or: dispatch_object_s;
static _dispatch_source_type_mach_recv: dispatch_object_s;
static _dispatch_source_type_mach_send: dispatch_object_s;
static _dispatch_source_type_proc: dispatch_object_s;
static _dispatch_source_type_read: dispatch_object_s;
static _dispatch_source_type_signal: dispatch_object_s;
static _dispatch_source_type_timer: dispatch_object_s;
static _dispatch_source_type_vnode: dispatch_object_s;
static _dispatch_source_type_write: dispatch_object_s;

pub fn dispatch_get_global_queue(identifier: c_long, flags: c_ulong) -> dispatch_queue_t;
pub fn dispatch_queue_create(label: *const c_char, attr: dispatch_queue_attr_t) -> dispatch_queue_t;
pub fn dispatch_queue_create(
label: *const c_char,
attr: dispatch_queue_attr_t,
) -> dispatch_queue_t;
// dispatch_queue_attr_t dispatch_queue_attr_make_with_qos_class ( dispatch_queue_attr_t attr, dispatch_qos_class_t qos_class, int relative_priority );
pub fn dispatch_queue_get_label(queue: dispatch_queue_t) -> *const c_char;
pub fn dispatch_set_target_queue(object: dispatch_object_t, queue: dispatch_queue_t);
pub fn dispatch_main();

// void dispatch_async ( dispatch_queue_t queue, dispatch_block_t block );
pub fn dispatch_async_f(queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t);
pub fn dispatch_async_f(
queue: dispatch_queue_t,
context: *mut c_void,
work: dispatch_function_t,
);
// void dispatch_sync ( dispatch_queue_t queue, dispatch_block_t block );
pub fn dispatch_sync_f(queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t);
pub fn dispatch_sync_f(
queue: dispatch_queue_t,
context: *mut c_void,
work: dispatch_function_t,
);
// void dispatch_after ( dispatch_time_t when, dispatch_queue_t queue, dispatch_block_t block );
pub fn dispatch_after_f(when: dispatch_time_t, queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t);
pub fn dispatch_after_f(
when: dispatch_time_t,
queue: dispatch_queue_t,
context: *mut c_void,
work: dispatch_function_t,
);
// void dispatch_apply ( size_t iterations, dispatch_queue_t queue, void (^block)(size_t) );
pub fn dispatch_apply_f(iterations: usize, queue: dispatch_queue_t, context: *mut c_void, work: extern fn(*mut c_void, usize));
pub fn dispatch_apply_f(
iterations: usize,
queue: dispatch_queue_t,
context: *mut c_void,
work: extern "C" fn(*mut c_void, usize),
);
// void dispatch_once ( dispatch_once_t *predicate, dispatch_block_t block );
pub fn dispatch_once_f(predicate: *mut dispatch_once_t, context: *mut c_void, function: dispatch_function_t);
pub fn dispatch_once_f(
predicate: *mut dispatch_once_t,
context: *mut c_void,
function: dispatch_function_t,
);

// void dispatch_group_async ( dispatch_group_t group, dispatch_queue_t queue, dispatch_block_t block );
pub fn dispatch_group_async_f(group: dispatch_group_t, queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t);
pub fn dispatch_group_async_f(
group: dispatch_group_t,
queue: dispatch_queue_t,
context: *mut c_void,
work: dispatch_function_t,
);
pub fn dispatch_group_create() -> dispatch_group_t;
pub fn dispatch_group_enter(group: dispatch_group_t);
pub fn dispatch_group_leave(group: dispatch_group_t);
// void dispatch_group_notify ( dispatch_group_t group, dispatch_queue_t queue, dispatch_block_t block );
pub fn dispatch_group_notify_f(group: dispatch_group_t, queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t);
pub fn dispatch_group_notify_f(
group: dispatch_group_t,
queue: dispatch_queue_t,
context: *mut c_void,
work: dispatch_function_t,
);
pub fn dispatch_group_wait(group: dispatch_group_t, timeout: dispatch_time_t) -> c_long;

pub fn dispatch_get_context(object: dispatch_object_t) -> *mut c_void;
Expand All @@ -70,26 +123,50 @@ extern {

pub fn dispatch_semaphore_create(value: c_long) -> dispatch_semaphore_t;
pub fn dispatch_semaphore_signal(dsema: dispatch_semaphore_t) -> c_long;
pub fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) -> c_long;
pub fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t)
-> c_long;

// void dispatch_barrier_async ( dispatch_queue_t queue, dispatch_block_t block );
pub fn dispatch_barrier_async_f(queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t);
pub fn dispatch_barrier_async_f(
queue: dispatch_queue_t,
context: *mut c_void,
work: dispatch_function_t,
);
// void dispatch_barrier_sync ( dispatch_queue_t queue, dispatch_block_t block );
pub fn dispatch_barrier_sync_f(queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t);

// void dispatch_source_cancel ( dispatch_source_t source );
// dispatch_source_t dispatch_source_create ( dispatch_source_type_t type, uintptr_t handle, unsigned long mask, dispatch_queue_t queue );
pub fn dispatch_barrier_sync_f(
queue: dispatch_queue_t,
context: *mut c_void,
work: dispatch_function_t,
);
pub fn dispatch_source_cancel(source: dispatch_source_t);
pub fn dispatch_source_create(
type_: dispatch_source_type_t,
handle: *const c_void,
mask: c_ulong,
queue: dispatch_queue_t,
) -> dispatch_object_t;
// unsigned long dispatch_source_get_data ( dispatch_source_t source );
// uintptr_t dispatch_source_get_handle ( dispatch_source_t source );
// unsigned long dispatch_source_get_mask ( dispatch_source_t source );
// void dispatch_source_merge_data ( dispatch_source_t source, unsigned long value );
// void dispatch_source_set_registration_handler ( dispatch_source_t source, dispatch_block_t handler );
// void dispatch_source_set_registration_handler_f ( dispatch_source_t source, dispatch_function_t handler );
// void dispatch_source_set_cancel_handler ( dispatch_source_t source, dispatch_block_t handler );
// void dispatch_source_set_cancel_handler_f ( dispatch_source_t source, dispatch_function_t handler );
pub fn dispatch_source_set_cancel_handler_f(
source: dispatch_source_t,
handler: dispatch_function_t,
);
// void dispatch_source_set_event_handler ( dispatch_source_t source, dispatch_block_t handler );
// void dispatch_source_set_event_handler_f ( dispatch_source_t source, dispatch_function_t handler );
// void dispatch_source_set_timer ( dispatch_source_t source, dispatch_time_t start, uint64_t interval, uint64_t leeway );
pub fn dispatch_source_set_event_handler_f(
source: dispatch_source_t,
handler: dispatch_function_t,
);
pub fn dispatch_source_set_timer(
source: dispatch_source_t,
start: dispatch_time_t,
interval: u64,
leeway: u64,
);
// long dispatch_source_testcancel ( dispatch_source_t source );

// void dispatch_read ( dispatch_fd_t fd, size_t length, dispatch_queue_t queue, void (^handler)(dispatch_data_t data, int error) );
Expand Down Expand Up @@ -136,14 +213,31 @@ pub fn dispatch_get_main_queue() -> dispatch_queue_t {
}

pub const DISPATCH_QUEUE_SERIAL: dispatch_queue_attr_t = 0 as dispatch_queue_attr_t;
pub static DISPATCH_QUEUE_CONCURRENT: &'static dispatch_object_s = unsafe { &_dispatch_queue_attr_concurrent };
pub static DISPATCH_QUEUE_CONCURRENT: &dispatch_object_s =
unsafe { &_dispatch_queue_attr_concurrent };

pub static DISPATCH_SOURCE_TYPE_DATA_ADD: &dispatch_object_s =
unsafe { &_dispatch_source_type_data_add };
pub static DISPATCH_SOURCE_TYPE_DATA_OR: &dispatch_object_s =
unsafe { &_dispatch_source_type_data_or };
pub static DISPATCH_SOURCE_TYPE_MACH_RECV: &dispatch_object_s =
unsafe { &_dispatch_source_type_mach_recv };
pub static DISPATCH_SOURCE_TYPE_MACH_SEND: &dispatch_object_s =
unsafe { &_dispatch_source_type_mach_send };
pub static DISPATCH_SOURCE_TYPE_PROC: &dispatch_object_s = unsafe { &_dispatch_source_type_proc };
pub static DISPATCH_SOURCE_TYPE_READ: &dispatch_object_s = unsafe { &_dispatch_source_type_read };
pub static DISPATCH_SOURCE_TYPE_SIGNAL: &dispatch_object_s =
unsafe { &_dispatch_source_type_signal };
pub static DISPATCH_SOURCE_TYPE_TIMER: &dispatch_object_s = unsafe { &_dispatch_source_type_timer };
pub static DISPATCH_SOURCE_TYPE_VNODE: &dispatch_object_s = unsafe { &_dispatch_source_type_vnode };
pub static DISPATCH_SOURCE_TYPE_WRITE: &dispatch_object_s = unsafe { &_dispatch_source_type_write };

pub const DISPATCH_QUEUE_PRIORITY_HIGH: c_long = 2;
pub const DISPATCH_QUEUE_PRIORITY_DEFAULT: c_long = 0;
pub const DISPATCH_QUEUE_PRIORITY_LOW: c_long = -2;
pub const DISPATCH_QUEUE_PRIORITY_HIGH: c_long = 2;
pub const DISPATCH_QUEUE_PRIORITY_DEFAULT: c_long = 0;
pub const DISPATCH_QUEUE_PRIORITY_LOW: c_long = -2;
pub const DISPATCH_QUEUE_PRIORITY_BACKGROUND: c_long = -1 << 15;

pub const DISPATCH_TIME_NOW: dispatch_time_t = 0;
pub const DISPATCH_TIME_NOW: dispatch_time_t = 0;
pub const DISPATCH_TIME_FOREVER: dispatch_time_t = !0;

#[cfg(test)]
Expand All @@ -155,7 +249,7 @@ mod tests {
use std::os::raw::c_void;
use std::ptr;

extern fn serial_queue_test_add(num: *mut c_void) {
extern "C" fn serial_queue_test_add(num: *mut c_void) {
unsafe {
*(num as *mut u32) = 1;
}
Expand Down
40 changes: 21 additions & 19 deletions src/group.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::time::Duration;

use crate::ffi::*;
use crate::{context_and_function, time_after_delay, WaitTimeout};
use crate::queue::Queue;
use crate::{context_and_function, time_after_delay, WaitTimeout};

/// A Grand Central Dispatch group.
///
Expand All @@ -18,7 +18,9 @@ impl Group {
/// Creates a new dispatch `Group`.
pub fn create() -> Group {
unsafe {
Group { ptr: dispatch_group_create() }
Group {
ptr: dispatch_group_create(),
}
}
}

Expand All @@ -32,7 +34,9 @@ impl Group {
/// Submits a closure asynchronously to the given `Queue` and associates it
/// with self.
pub fn exec_async<F>(&self, queue: &Queue, work: F)
where F: 'static + Send + FnOnce() {
where
F: 'static + Send + FnOnce(),
{
let (context, work) = context_and_function(work);
unsafe {
dispatch_group_async_f(self.ptr, queue.ptr, context, work);
Expand All @@ -43,7 +47,9 @@ impl Group {
/// associated with self have completed.
/// If self is empty, the closure is submitted immediately.
pub fn notify<F>(&self, queue: &Queue, work: F)
where F: 'static + Send + FnOnce() {
where
F: 'static + Send + FnOnce(),
{
let (context, work) = context_and_function(work);
unsafe {
dispatch_group_notify_f(self.ptr, queue.ptr, context, work);
Expand All @@ -52,9 +58,7 @@ impl Group {

/// Waits synchronously for all tasks associated with self to complete.
pub fn wait(&self) {
let result = unsafe {
dispatch_group_wait(self.ptr, DISPATCH_TIME_FOREVER)
};
let result = unsafe { dispatch_group_wait(self.ptr, DISPATCH_TIME_FOREVER) };
assert!(result == 0, "Dispatch group wait errored");
}

Expand All @@ -63,9 +67,7 @@ impl Group {
/// Returns true if the tasks completed or false if the timeout elapsed.
pub fn wait_timeout(&self, timeout: Duration) -> Result<(), WaitTimeout> {
let when = time_after_delay(timeout);
let result = unsafe {
dispatch_group_wait(self.ptr, when)
};
let result = unsafe { dispatch_group_wait(self.ptr, when) };
if result == 0 {
Ok(())
} else {
Expand All @@ -75,15 +77,13 @@ impl Group {

/// Returns whether self is currently empty.
pub fn is_empty(&self) -> bool {
let result = unsafe {
dispatch_group_wait(self.ptr, DISPATCH_TIME_NOW)
};
let result = unsafe { dispatch_group_wait(self.ptr, DISPATCH_TIME_NOW) };
result == 0
}
}

unsafe impl Sync for Group { }
unsafe impl Send for Group { }
unsafe impl Sync for Group {}
unsafe impl Send for Group {}

impl Clone for Group {
fn clone(&self) -> Self {
Expand Down Expand Up @@ -113,11 +113,13 @@ impl GroupGuard {
unsafe {
dispatch_group_enter(group.ptr);
}
GroupGuard { group: group.clone() }
GroupGuard {
group: group.clone(),
}
}

/// Drops self, leaving the `Group`.
pub fn leave(self) { }
pub fn leave(self) {}
}

impl Clone for GroupGuard {
Expand All @@ -136,9 +138,9 @@ impl Drop for GroupGuard {

#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use crate::{Queue, QueueAttribute};
use super::Group;
use crate::{Queue, QueueAttribute};
use std::sync::{Arc, Mutex};

#[test]
fn test_group() {
Expand Down
Loading