From 480ea2d005845ce2b38bb1500a9860e83395ce60 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Wed, 27 May 2015 22:07:41 -0700 Subject: [PATCH 1/6] Refactor incremental task data using a TaskEvent enum --- ReactiveTask/Task.swift | 208 +++++++++++++++++++++++-------- ReactiveTaskTests/TaskSpec.swift | 77 ++++++------ 2 files changed, 197 insertions(+), 88 deletions(-) diff --git a/ReactiveTask/Task.swift b/ReactiveTask/Task.swift index a7aaa07..b282b3f 100644 --- a/ReactiveTask/Task.swift +++ b/ReactiveTask/Task.swift @@ -185,12 +185,34 @@ private final class Pipe { } } -/// Takes ownership of the read handle from the given pipe, then aggregates all -/// data into one `NSData` object, which is then sent upon the returned signal. -/// -/// If `forwardingSink` is non-nil, each incremental piece of data will be sent -/// to it as data is received. -private func aggregateDataReadFromPipe(pipe: Pipe, forwardingSink: SinkOf?) -> SignalProducer { +/// Sent when reading from a pipe. +private enum ReadData { + /// A chunk of data, sent as soon as it is received. + case Chunk(NSData) + + /// The aggregate of all data sent so far, sent right before completion. + /// + /// No further chunks will occur after this has been sent. + case Aggregated(NSData) + + /// Convenience constructor for a `Chunk` from `dispatch_data_t`. + static func chunk(data: dispatch_data_t) -> ReadData { + return .Chunk(data as! NSData) + } + + /// Convenience constructor for an `Aggregated` from `dispatch_data_t`. + static func aggregated(data: dispatch_data_t?) -> ReadData { + if let data = data { + return .Aggregated(data as! NSData) + } else { + return .Aggregated(NSData()) + } + } +} + +/// Takes ownership of the read handle from the given pipe, then sends +/// `ReadData` values for all data read. +private func aggregateDataReadFromPipe(pipe: Pipe) -> SignalProducer { let readProducer = pipe.transferReadsToProducer() return SignalProducer { observer, disposable in @@ -200,7 +222,8 @@ private func aggregateDataReadFromPipe(pipe: Pipe, forwardingSink: SinkOf { + /// Some data arrived from the task on `stdout`. + case StandardOutput(NSData) + + /// Some data arrived from the task on `stderr`. + case StandardError(NSData) + + /// The task exited successfully (with status 0), and value T was produced + /// as a result. + /// + /// No further TaskEvents will occur after this has been sent. + case Success(Box) + + /// The resulting value, if the event is `Success`. + public var value: T? { + switch self { + case .StandardOutput, .StandardError: + return nil + + case let .Success(box): + return box.value + } + } +} + +public func == (lhs: TaskEvent, rhs: TaskEvent) -> Bool { + switch (lhs, rhs) { + case let (.StandardOutput(left), .StandardOutput(right)): + return left == right + + case let (.StandardError(left), .StandardError(right)): + return left == right + + case let (.Success(left), .Success(right)): + return left == right + + default: + return false + } +} + +extension TaskEvent: Printable { + public var description: String { + func dataDescription(data: NSData) -> String { + return NSString(data: data, encoding: NSUTF8StringEncoding).map { $0 as String } ?? data.description + } + + switch self { + case let .StandardOutput(data): + return "stdout: " + dataDescription(data) + + case let .StandardError(data): + return "stderr: " + dataDescription(data) + + case let .Success(value): + return "success(\(value))" + } + } +} + /// Launches a new shell task, using the parameters from `taskDescription`. /// -/// Returns a producer that will launch the task when started, then send one -/// `NSData` value (representing aggregated data from `stdout`) and complete -/// upon success. -public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf? = nil, standardError: SinkOf? = nil) -> SignalProducer { +/// Returns a producer that will launch the task when started, then send +/// `TaskEvent`s as execution proceeds. +public func launchTask(taskDescription: TaskDescription) -> SignalProducer, ReactiveTaskError> { return SignalProducer { observer, disposable in let queue = dispatch_queue_create(taskDescription.description, DISPATCH_QUEUE_SERIAL) @@ -261,32 +340,76 @@ public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf< } SignalProducer(result: Pipe.create(queue)) - |> flatMap(.Concat) { stdoutPipe -> SignalProducer in - let stdoutProducer = aggregateDataReadFromPipe(stdoutPipe, standardOutput) + |> flatMap(.Concat) { stdoutPipe -> SignalProducer, ReactiveTaskError> in + let stdoutProducer = aggregateDataReadFromPipe(stdoutPipe) return SignalProducer(result: Pipe.create(queue)) - |> flatMap(.Merge) { stderrPipe -> SignalProducer in - let stderrProducer = aggregateDataReadFromPipe(stderrPipe, standardError) + |> flatMap(.Merge) { stderrPipe -> SignalProducer, ReactiveTaskError> in + let stderrProducer = aggregateDataReadFromPipe(stderrPipe) + + return SignalProducer { observer, disposable in + let (stdoutAggregated, stdoutAggregatedSink) = SignalProducer.buffer(1) + let (stderrAggregated, stderrAggregatedSink) = SignalProducer.buffer(1) + + stdoutProducer.startWithSignal { signal, signalDisposable in + disposable += signalDisposable + + signal.observe(next: { readData in + switch readData { + case let .Chunk(data): + sendNext(observer, .StandardOutput(data)) + + case let .Aggregated(data): + sendNext(stdoutAggregatedSink, data) + } + }, error: { error in + sendError(observer, error) + sendError(stdoutAggregatedSink, error) + }, completed: { + sendCompleted(stdoutAggregatedSink) + }, interrupted: { + sendInterrupted(stdoutAggregatedSink) + }) + } - let terminationStatusProducer = SignalProducer { observer, disposable in - task.terminationHandler = { task in - sendNext(observer, task.terminationStatus) - sendCompleted(observer) + stderrProducer.startWithSignal { signal, signalDisposable in + disposable += signalDisposable + + signal.observe(next: { readData in + switch readData { + case let .Chunk(data): + sendNext(observer, .StandardError(data)) + + case let .Aggregated(data): + sendNext(stderrAggregatedSink, data) + } + }, error: { error in + sendError(observer, error) + sendError(stderrAggregatedSink, error) + }, completed: { + sendCompleted(stderrAggregatedSink) + }, interrupted: { + sendInterrupted(stderrAggregatedSink) + }) } task.standardOutput = stdoutPipe.writeHandle task.standardError = stderrPipe.writeHandle - if disposable.disposed { - stdoutPipe.closePipe() - stderrPipe.closePipe() - - // Clean up the stdin pipe in a roundabout way. - stdinProducer.startWithSignal { signal, signalDisposable in - signalDisposable.dispose() + task.terminationHandler = { task in + let terminationStatus = task.terminationStatus + if terminationStatus == EXIT_SUCCESS { + disposable += stdoutAggregated + |> map { data in .Success(Box(data)) } + |> start(observer) + } else { + disposable += stderrAggregated + |> flatMap(.Concat) { data in + let errorString = (data.length > 0 ? String(UTF8String: UnsafePointer(data.bytes)) : nil) + return SignalProducer(error: .ShellTaskFailed(exitCode: terminationStatus, standardError: errorString)) + } + |> start(observer) } - - return } task.launch() @@ -294,28 +417,13 @@ public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf< close(stderrPipe.writeFD) stdinProducer.startWithSignal { signal, signalDisposable in - disposable.addDisposable(signalDisposable) + disposable += signalDisposable } disposable.addDisposable { task.terminate() } } - - return - zip( - stdoutProducer, - stderrProducer, - terminationStatusProducer |> promoteErrors(ReactiveTaskError.self) - ) - |> tryMap { stdoutData, stderrData, terminationStatus -> Result in - if terminationStatus == EXIT_SUCCESS { - return .success(stdoutData) - } else { - let errorString = (stderrData.length > 0 ? String(UTF8String: UnsafePointer(stderrData.bytes)) : nil) - return .failure(.ShellTaskFailed(exitCode: terminationStatus, standardError: errorString)) - } - } } } |> startWithSignal { signal, taskDisposable in diff --git a/ReactiveTaskTests/TaskSpec.swift b/ReactiveTaskTests/TaskSpec.swift index fa665c3..407a105 100644 --- a/ReactiveTaskTests/TaskSpec.swift +++ b/ReactiveTaskTests/TaskSpec.swift @@ -15,60 +15,61 @@ import Result class TaskSpec: QuickSpec { override func spec() { - let standardOutput = MutableProperty(NSData()) - let standardError = MutableProperty(NSData()) - - beforeEach { - standardOutput.value = NSData() - standardError.value = NSData() - } - - func accumulatingSinkForProperty(property: MutableProperty) -> SinkOf { - let (signal, sink) = Signal.pipe() + it("should launch a task that writes to stdout") { + let result = launchTask(TaskDescription(launchPath: "/bin/echo", arguments: [ "foobar" ])) + |> reduce(NSMutableData()) { aggregated, event in + switch event { + case let .StandardOutput(data): + aggregated.appendData(data) - property <~ signal - |> scan(NSData()) { (accum, data) in - let buffer = accum.mutableCopy() as! NSMutableData - buffer.appendData(data) + default: + break + } - return buffer - } + return aggregated + } + |> single - return SinkOf { data in - sendNext(sink, data) + expect(result).notTo(beNil()) + if let data = result?.value { + expect(NSString(data: data, encoding: NSUTF8StringEncoding)).to(equal("foobar\n")) } } - it("should launch a task that writes to stdout") { - let desc = TaskDescription(launchPath: "/bin/echo", arguments: [ "foobar" ]) - let task = launchTask(desc, standardOutput: accumulatingSinkForProperty(standardOutput)) - expect(standardOutput.value).to(equal(NSData())) + it("should launch a task that writes to stderr") { + let result = launchTask(TaskDescription(launchPath: "/usr/bin/stat", arguments: [ "not-a-real-file" ])) + |> reduce(NSMutableData()) { aggregated, event in + switch event { + case let .StandardError(data): + aggregated.appendData(data) - let result = task |> wait - expect(result.value).notTo(beNil()) - expect(NSString(data: standardOutput.value, encoding: NSUTF8StringEncoding)).to(equal("foobar\n")) - } + default: + break + } - it("should launch a task that writes to stderr") { - let desc = TaskDescription(launchPath: "/usr/bin/stat", arguments: [ "not-a-real-file" ]) - let task = launchTask(desc, standardError: accumulatingSinkForProperty(standardError)) - expect(standardError.value).to(equal(NSData())) + return aggregated + } + |> single - let result = task |> wait - expect(result.value).to(beNil()) - expect(NSString(data: standardError.value, encoding: NSUTF8StringEncoding)).to(equal("stat: not-a-real-file: stat: No such file or directory\n")) + expect(result).notTo(beNil()) + if let data = result?.value { + expect(NSString(data: data, encoding: NSUTF8StringEncoding)).to(equal("stat: not-a-real-file: stat: No such file or directory\n")) + } } it("should launch a task with standard input") { let strings = [ "foo\n", "bar\n", "buzz\n", "fuzz\n" ] let data = strings.map { $0.dataUsingEncoding(NSUTF8StringEncoding)! } - let desc = TaskDescription(launchPath: "/usr/bin/sort", standardInput: SignalProducer(values: data)) - let task = launchTask(desc, standardOutput: accumulatingSinkForProperty(standardOutput)) + let result = launchTask(TaskDescription(launchPath: "/usr/bin/sort", standardInput: SignalProducer(values: data))) + |> map { event in event.value } + |> ignoreNil + |> single - let result = task |> wait - expect(result.value).notTo(beNil()) - expect(NSString(data: standardOutput.value, encoding: NSUTF8StringEncoding)).to(equal("bar\nbuzz\nfoo\nfuzz\n")) + expect(result).notTo(beNil()) + if let data = result?.value { + expect(NSString(data: data, encoding: NSUTF8StringEncoding)).to(equal("bar\nbuzz\nfoo\nfuzz\n")) + } } } } From c66e2848eeaa91a870c45dbec9b546a5673dbb74 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Wed, 27 May 2015 22:26:15 -0700 Subject: [PATCH 2/6] Add TaskEvent.map --- ReactiveTask/Task.swift | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/ReactiveTask/Task.swift b/ReactiveTask/Task.swift index b282b3f..102e99d 100644 --- a/ReactiveTask/Task.swift +++ b/ReactiveTask/Task.swift @@ -266,6 +266,20 @@ public enum TaskEvent { return box.value } } + + /// Maps over the value embedded in a `Success` event. + public func map(transform: T -> U) -> TaskEvent { + switch self { + case let .StandardOutput(data): + return .StandardOutput(data) + + case let .StandardError(data): + return .StandardError(data) + + case let .Success(box): + return .Success(box.map(transform)) + } + } } public func == (lhs: TaskEvent, rhs: TaskEvent) -> Bool { From e268a730caf4ff220bf6bd79af2385370afd44ff Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Wed, 27 May 2015 22:29:38 -0700 Subject: [PATCH 3/6] Add ignoreTaskData operator --- ReactiveTask/Task.swift | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/ReactiveTask/Task.swift b/ReactiveTask/Task.swift index 102e99d..154f44f 100644 --- a/ReactiveTask/Task.swift +++ b/ReactiveTask/Task.swift @@ -317,6 +317,16 @@ extension TaskEvent: Printable { } } +/// Ignores incremental standard output and standard error data from the given +/// task, sending only a single value with the final, aggregated result. +public func ignoreTaskData(signal: Signal, Error>) -> Signal { + return signal + |> map { event in + return event.value.map { $0 } + } + |> ignoreNil +} + /// Launches a new shell task, using the parameters from `taskDescription`. /// /// Returns a producer that will launch the task when started, then send From 4a3ccc6cb8d33d5986a67e4cee81a9e2cdfd2f5e Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Wed, 27 May 2015 23:25:48 -0700 Subject: [PATCH 4/6] Add conveniences for obtaining producers out of TaskEvents --- ReactiveTask/Task.swift | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/ReactiveTask/Task.swift b/ReactiveTask/Task.swift index 154f44f..6be50f9 100644 --- a/ReactiveTask/Task.swift +++ b/ReactiveTask/Task.swift @@ -252,8 +252,6 @@ public enum TaskEvent { /// The task exited successfully (with status 0), and value T was produced /// as a result. - /// - /// No further TaskEvents will occur after this has been sent. case Success(Box) /// The resulting value, if the event is `Success`. @@ -268,7 +266,7 @@ public enum TaskEvent { } /// Maps over the value embedded in a `Success` event. - public func map(transform: T -> U) -> TaskEvent { + public func map(@noescape transform: T -> U) -> TaskEvent { switch self { case let .StandardOutput(data): return .StandardOutput(data) @@ -277,7 +275,21 @@ public enum TaskEvent { return .StandardError(data) case let .Success(box): - return .Success(box.map(transform)) + return .Success(Box(transform(box.value))) + } + } + + /// Convenience operator for mapping TaskEvents to SignalProducers. + public func producerMap(@noescape transform: T -> SignalProducer) -> SignalProducer, Error> { + switch self { + case let .StandardOutput(data): + return SignalProducer, Error>(value: .StandardOutput(data)) + + case let .StandardError(data): + return SignalProducer, Error>(value: .StandardError(data)) + + case let .Success(box): + return transform(box.value) |> ReactiveCocoa.map { .Success(Box($0)) } } } } @@ -317,6 +329,15 @@ extension TaskEvent: Printable { } } +/// Maps the values inside a stream of TaskEvents into new SignalProducers. +public func flatMapTaskEvents(strategy: FlattenStrategy, transform: T -> SignalProducer) -> SignalProducer, Error> -> SignalProducer, Error> { + return { producer in + return producer |> flatMap(strategy) { taskEvent in + return taskEvent.producerMap(transform) + } + } +} + /// Ignores incremental standard output and standard error data from the given /// task, sending only a single value with the final, aggregated result. public func ignoreTaskData(signal: Signal, Error>) -> Signal { From dd7a3ce6f72d61ab0486f1d10f72dc71470dd4c3 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Fri, 29 May 2015 21:18:32 -0700 Subject: [PATCH 5/6] Wait for both stderr and stdout, no matter what the status was See https://github.com/Carthage/Carthage/issues/497#issuecomment-106657735. --- ReactiveTask/Task.swift | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/ReactiveTask/Task.swift b/ReactiveTask/Task.swift index 6be50f9..f72097f 100644 --- a/ReactiveTask/Task.swift +++ b/ReactiveTask/Task.swift @@ -444,11 +444,17 @@ public func launchTask(taskDescription: TaskDescription) -> SignalProducer then(stdoutAggregated) |> map { data in .Success(Box(data)) } |> start(observer) } else { - disposable += stderrAggregated + // Wait for stdout to finish, then pass + // through stderr. + disposable += stdoutAggregated + |> then(stderrAggregated) |> flatMap(.Concat) { data in let errorString = (data.length > 0 ? String(UTF8String: UnsafePointer(data.bytes)) : nil) return SignalProducer(error: .ShellTaskFailed(exitCode: terminationStatus, standardError: errorString)) From 18cdc3fc44e65a1564bc3ff201d49230eae7a986 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Fri, 29 May 2015 21:30:34 -0700 Subject: [PATCH 6/6] Properly handle ECANCELED --- ReactiveTask/Task.swift | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/ReactiveTask/Task.swift b/ReactiveTask/Task.swift index f72097f..eb502fd 100644 --- a/ReactiveTask/Task.swift +++ b/ReactiveTask/Task.swift @@ -112,6 +112,8 @@ private final class Pipe { let channel = dispatch_io_create(DISPATCH_IO_STREAM, self.readFD, self.queue) { error in if error == 0 { sendCompleted(observer) + } else if error == ECANCELED { + sendInterrupted(observer) } else { sendError(observer, .POSIXError(error)) } @@ -125,7 +127,9 @@ private final class Pipe { sendNext(observer, data) } - if error != 0 { + if error == ECANCELED { + sendInterrupted(observer) + } else if error != 0 { sendError(observer, .POSIXError(error)) } @@ -153,6 +157,8 @@ private final class Pipe { let channel = dispatch_io_create(DISPATCH_IO_STREAM, self.writeFD, self.queue) { error in if error == 0 { sendCompleted(observer) + } else if error == ECANCELED { + sendInterrupted(observer) } else { sendError(observer, .POSIXError(error)) } @@ -167,7 +173,9 @@ private final class Pipe { let dispatchData = dispatch_data_create(data.bytes, data.length, self.queue, nil) dispatch_io_write(channel, 0, dispatchData, self.queue) { (done, data, error) in - if error != 0 { + if error == ECANCELED { + sendInterrupted(observer) + } else if error != 0 { sendError(observer, .POSIXError(error)) } }