Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions ReactiveTask/Task.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ public struct TaskDescription {
self.environment = environment
self.standardInput = standardInput
}

/// A GCD group which to wait completion
private static var group = dispatch_group_create()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be created within launchTask, since TaskDescriptions can be reused across multiple task invocations.


/// wait for all task termination
public static func waitForAllTaskTermination() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this automatically before notifying launchTask's observer of success/failure? In other words, we could automatically wait for termination before sending completed/error.

dispatch_group_wait(TaskDescription.group, DISPATCH_TIME_FOREVER)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dispatch_group_notify would be preferable, to avoid blocking.

}
}

extension TaskDescription: Printable {
Expand All @@ -63,6 +71,9 @@ private final class Pipe {

/// A GCD queue upon which to deliver I/O callbacks.
let queue: dispatch_queue_t

/// A GCD group which to wait completion
let group: dispatch_group_t

/// Creates an NSFileHandle corresponding to the `readFD`. The file handle
/// will not automatically close the descriptor.
Expand All @@ -77,20 +88,21 @@ private final class Pipe {
}

/// Initializes a pipe object using existing file descriptors.
init(readFD: Int32, writeFD: Int32, queue: dispatch_queue_t) {
init(readFD: Int32, writeFD: Int32, queue: dispatch_queue_t, group: dispatch_group_t) {
precondition(readFD >= 0)
precondition(writeFD >= 0)

self.readFD = readFD
self.writeFD = writeFD
self.queue = queue
self.group = group
}

/// Instantiates a new descriptor pair.
class func create(queue: dispatch_queue_t) -> Result<Pipe, ReactiveTaskError> {
class func create(queue: dispatch_queue_t, _ group: dispatch_group_t) -> Result<Pipe, ReactiveTaskError> {
var fildes: [Int32] = [ 0, 0 ]
if pipe(&fildes) == 0 {
return .success(self(readFD: fildes[0], writeFD: fildes[1], queue: queue))
return .success(self(readFD: fildes[0], writeFD: fildes[1], queue: queue, group: group))
} else {
return .failure(.POSIXError(errno))
}
Expand All @@ -109,6 +121,7 @@ private final class Pipe {
/// anywhere else, as it may close unexpectedly.
func transferReadsToProducer() -> SignalProducer<dispatch_data_t, ReactiveTaskError> {
return SignalProducer { observer, disposable in
dispatch_group_enter(self.group)
let channel = dispatch_io_create(DISPATCH_IO_STREAM, self.readFD, self.queue) { error in
if error == 0 {
sendCompleted(observer)
Expand All @@ -117,6 +130,7 @@ private final class Pipe {
}

close(self.readFD)
dispatch_group_leave(self.group)
}

dispatch_io_set_low_water(channel, 1)
Expand Down Expand Up @@ -150,6 +164,7 @@ private final class Pipe {
/// Returns a producer that will complete or error.
func writeDataFromProducer(producer: SignalProducer<NSData, NoError>) -> SignalProducer<(), ReactiveTaskError> {
return SignalProducer { observer, disposable in
dispatch_group_enter(self.group)
let channel = dispatch_io_create(DISPATCH_IO_STREAM, self.writeFD, self.queue) { error in
if error == 0 {
sendCompleted(observer)
Expand All @@ -158,6 +173,7 @@ private final class Pipe {
}

close(self.writeFD)
dispatch_group_leave(self.group)
}

producer.startWithSignal { signal, producerDisposable in
Expand Down Expand Up @@ -231,6 +247,7 @@ private func aggregateDataReadFromPipe(pipe: Pipe, forwardingSink: SinkOf<NSData
public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf<NSData>? = nil, standardError: SinkOf<NSData>? = nil) -> SignalProducer<NSData, ReactiveTaskError> {
return SignalProducer { observer, disposable in
let queue = dispatch_queue_create(taskDescription.description, DISPATCH_QUEUE_SERIAL)
let group = TaskDescription.group

let task = NSTask()
task.launchPath = taskDescription.launchPath
Expand All @@ -247,7 +264,7 @@ public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf<
var stdinProducer: SignalProducer<(), ReactiveTaskError> = .empty

if let input = taskDescription.standardInput {
switch Pipe.create(queue) {
switch Pipe.create(queue, group) {
case let .Success(pipe):
task.standardInput = pipe.value.readHandle

Expand All @@ -260,18 +277,21 @@ public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf<
}
}

SignalProducer(result: Pipe.create(queue))
SignalProducer(result: Pipe.create(queue, group))
|> flatMap(.Concat) { stdoutPipe -> SignalProducer<NSData, ReactiveTaskError> in
let stdoutProducer = aggregateDataReadFromPipe(stdoutPipe, standardOutput)

return SignalProducer(result: Pipe.create(queue))
return SignalProducer(result: Pipe.create(queue, group))
|> flatMap(.Merge) { stderrPipe -> SignalProducer<NSData, ReactiveTaskError> in
let stderrProducer = aggregateDataReadFromPipe(stderrPipe, standardError)

let terminationStatusProducer = SignalProducer<Int32, NoError> { observer, disposable in
dispatch_group_enter(group)

task.terminationHandler = { task in
sendNext(observer, task.terminationStatus)
sendCompleted(observer)
dispatch_group_leave(group)
}

task.standardOutput = stdoutPipe.writeHandle
Expand Down