From f141b0eb27550c4991b5eb504a66bf0c545cba5d Mon Sep 17 00:00:00 2001 From: Norio Nomura Date: Tue, 2 Jun 2015 23:36:40 +0900 Subject: [PATCH] Add waitForAllTaskTermination Fix segmentation fault on Carthage termination. https://github.com/Carthage/Carthage/pull/474#issuecomment-105467060 --- ReactiveTask/Task.swift | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/ReactiveTask/Task.swift b/ReactiveTask/Task.swift index eb502fd..c60bcf8 100644 --- a/ReactiveTask/Task.swift +++ b/ReactiveTask/Task.swift @@ -43,6 +43,14 @@ public struct TaskDescription { self.environment = environment self.standardInput = standardInput } + + /// A GCD group which to wait completion + private static var group = dispatch_group_create() + + /// wait for all task termination + public static func waitForAllTaskTermination() { + dispatch_group_wait(TaskDescription.group, DISPATCH_TIME_FOREVER) + } } extension TaskDescription: Printable { @@ -63,6 +71,9 @@ private final class Pipe { /// A GCD queue upon which to deliver I/O callbacks. let queue: dispatch_queue_t + + /// A GCD group which to wait completion + let group: dispatch_group_t /// Creates an NSFileHandle corresponding to the `readFD`. The file handle /// will not automatically close the descriptor. @@ -77,20 +88,21 @@ private final class Pipe { } /// Initializes a pipe object using existing file descriptors. - init(readFD: Int32, writeFD: Int32, queue: dispatch_queue_t) { + init(readFD: Int32, writeFD: Int32, queue: dispatch_queue_t, group: dispatch_group_t) { precondition(readFD >= 0) precondition(writeFD >= 0) self.readFD = readFD self.writeFD = writeFD self.queue = queue + self.group = group } /// Instantiates a new descriptor pair. - class func create(queue: dispatch_queue_t) -> Result { + class func create(queue: dispatch_queue_t, _ group: dispatch_group_t) -> Result { var fildes: [Int32] = [ 0, 0 ] if pipe(&fildes) == 0 { - return .success(self(readFD: fildes[0], writeFD: fildes[1], queue: queue)) + return .success(self(readFD: fildes[0], writeFD: fildes[1], queue: queue, group: group)) } else { return .failure(.POSIXError(errno)) } @@ -109,6 +121,7 @@ private final class Pipe { /// anywhere else, as it may close unexpectedly. func transferReadsToProducer() -> SignalProducer { return SignalProducer { observer, disposable in + dispatch_group_enter(self.group) let channel = dispatch_io_create(DISPATCH_IO_STREAM, self.readFD, self.queue) { error in if error == 0 { sendCompleted(observer) @@ -119,6 +132,7 @@ private final class Pipe { } close(self.readFD) + dispatch_group_leave(self.group) } dispatch_io_set_low_water(channel, 1) @@ -154,6 +168,7 @@ private final class Pipe { /// Returns a producer that will complete or error. func writeDataFromProducer(producer: SignalProducer) -> SignalProducer<(), ReactiveTaskError> { return SignalProducer { observer, disposable in + dispatch_group_enter(self.group) let channel = dispatch_io_create(DISPATCH_IO_STREAM, self.writeFD, self.queue) { error in if error == 0 { sendCompleted(observer) @@ -164,6 +179,7 @@ private final class Pipe { } close(self.writeFD) + dispatch_group_leave(self.group) } producer.startWithSignal { signal, producerDisposable in @@ -363,6 +379,7 @@ public func ignoreTaskData(signal: Signal, Error>) -> Sig public func launchTask(taskDescription: TaskDescription) -> SignalProducer, ReactiveTaskError> { return SignalProducer { observer, disposable in let queue = dispatch_queue_create(taskDescription.description, DISPATCH_QUEUE_SERIAL) + let group = TaskDescription.group let task = NSTask() task.launchPath = taskDescription.launchPath @@ -379,7 +396,7 @@ public func launchTask(taskDescription: TaskDescription) -> SignalProducer = .empty if let input = taskDescription.standardInput { - switch Pipe.create(queue) { + switch Pipe.create(queue, group) { case let .Success(pipe): task.standardInput = pipe.value.readHandle @@ -392,11 +409,11 @@ public func launchTask(taskDescription: TaskDescription) -> SignalProducer flatMap(.Concat) { stdoutPipe -> SignalProducer, ReactiveTaskError> in let stdoutProducer = aggregateDataReadFromPipe(stdoutPipe) - return SignalProducer(result: Pipe.create(queue)) + return SignalProducer(result: Pipe.create(queue, group)) |> flatMap(.Merge) { stderrPipe -> SignalProducer, ReactiveTaskError> in let stderrProducer = aggregateDataReadFromPipe(stderrPipe) @@ -449,6 +466,7 @@ public func launchTask(taskDescription: TaskDescription) -> SignalProducer SignalProducer start(observer) } + dispatch_group_leave(group) } task.launch()