Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions ReactiveTask/Task.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ public struct TaskDescription {
self.environment = environment
self.standardInput = standardInput
}

/// A GCD group which to wait completion
private static var group = dispatch_group_create()

/// wait for all task termination
public static func waitForAllTaskTermination() {
dispatch_group_wait(TaskDescription.group, DISPATCH_TIME_FOREVER)
}
}

extension TaskDescription: Printable {
Expand All @@ -63,6 +71,9 @@ private final class Pipe {

/// A GCD queue upon which to deliver I/O callbacks.
let queue: dispatch_queue_t

/// A GCD group which to wait completion
let group: dispatch_group_t

/// Creates an NSFileHandle corresponding to the `readFD`. The file handle
/// will not automatically close the descriptor.
Expand All @@ -77,20 +88,21 @@ private final class Pipe {
}

/// Initializes a pipe object using existing file descriptors.
init(readFD: Int32, writeFD: Int32, queue: dispatch_queue_t) {
init(readFD: Int32, writeFD: Int32, queue: dispatch_queue_t, group: dispatch_group_t) {
precondition(readFD >= 0)
precondition(writeFD >= 0)

self.readFD = readFD
self.writeFD = writeFD
self.queue = queue
self.group = group
}

/// Instantiates a new descriptor pair.
class func create(queue: dispatch_queue_t) -> Result<Pipe, ReactiveTaskError> {
class func create(queue: dispatch_queue_t, _ group: dispatch_group_t) -> Result<Pipe, ReactiveTaskError> {
var fildes: [Int32] = [ 0, 0 ]
if pipe(&fildes) == 0 {
return .success(self(readFD: fildes[0], writeFD: fildes[1], queue: queue))
return .success(self(readFD: fildes[0], writeFD: fildes[1], queue: queue, group: group))
} else {
return .failure(.POSIXError(errno))
}
Expand All @@ -109,6 +121,7 @@ private final class Pipe {
/// anywhere else, as it may close unexpectedly.
func transferReadsToProducer() -> SignalProducer<dispatch_data_t, ReactiveTaskError> {
return SignalProducer { observer, disposable in
dispatch_group_enter(self.group)
let channel = dispatch_io_create(DISPATCH_IO_STREAM, self.readFD, self.queue) { error in
if error == 0 {
sendCompleted(observer)
Expand All @@ -119,6 +132,7 @@ private final class Pipe {
}

close(self.readFD)
dispatch_group_leave(self.group)
}

dispatch_io_set_low_water(channel, 1)
Expand Down Expand Up @@ -154,6 +168,7 @@ private final class Pipe {
/// Returns a producer that will complete or error.
func writeDataFromProducer(producer: SignalProducer<NSData, NoError>) -> SignalProducer<(), ReactiveTaskError> {
return SignalProducer { observer, disposable in
dispatch_group_enter(self.group)
let channel = dispatch_io_create(DISPATCH_IO_STREAM, self.writeFD, self.queue) { error in
if error == 0 {
sendCompleted(observer)
Expand All @@ -164,6 +179,7 @@ private final class Pipe {
}

close(self.writeFD)
dispatch_group_leave(self.group)
}

producer.startWithSignal { signal, producerDisposable in
Expand Down Expand Up @@ -363,6 +379,7 @@ public func ignoreTaskData<T, Error>(signal: Signal<TaskEvent<T>, Error>) -> Sig
public func launchTask(taskDescription: TaskDescription) -> SignalProducer<TaskEvent<NSData>, ReactiveTaskError> {
return SignalProducer { observer, disposable in
let queue = dispatch_queue_create(taskDescription.description, DISPATCH_QUEUE_SERIAL)
let group = TaskDescription.group

let task = NSTask()
task.launchPath = taskDescription.launchPath
Expand All @@ -379,7 +396,7 @@ public func launchTask(taskDescription: TaskDescription) -> SignalProducer<TaskE
var stdinProducer: SignalProducer<(), ReactiveTaskError> = .empty

if let input = taskDescription.standardInput {
switch Pipe.create(queue) {
switch Pipe.create(queue, group) {
case let .Success(pipe):
task.standardInput = pipe.value.readHandle

Expand All @@ -392,11 +409,11 @@ public func launchTask(taskDescription: TaskDescription) -> SignalProducer<TaskE
}
}

SignalProducer(result: Pipe.create(queue))
SignalProducer(result: Pipe.create(queue, group))
|> flatMap(.Concat) { stdoutPipe -> SignalProducer<TaskEvent<NSData>, ReactiveTaskError> in
let stdoutProducer = aggregateDataReadFromPipe(stdoutPipe)

return SignalProducer(result: Pipe.create(queue))
return SignalProducer(result: Pipe.create(queue, group))
|> flatMap(.Merge) { stderrPipe -> SignalProducer<TaskEvent<NSData>, ReactiveTaskError> in
let stderrProducer = aggregateDataReadFromPipe(stderrPipe)

Expand Down Expand Up @@ -449,6 +466,7 @@ public func launchTask(taskDescription: TaskDescription) -> SignalProducer<TaskE
task.standardOutput = stdoutPipe.writeHandle
task.standardError = stderrPipe.writeHandle

dispatch_group_enter(group)
task.terminationHandler = { task in
let terminationStatus = task.terminationStatus
if terminationStatus == EXIT_SUCCESS {
Expand All @@ -469,6 +487,7 @@ public func launchTask(taskDescription: TaskDescription) -> SignalProducer<TaskE
}
|> start(observer)
}
dispatch_group_leave(group)
}

task.launch()
Expand Down