From 8f9825d252877413a4828c742f89b8ea67ce9f5e Mon Sep 17 00:00:00 2001 From: Syo Ikeda Date: Fri, 12 Jun 2015 12:19:21 +0900 Subject: [PATCH 1/2] ++RAC, ++Result --- Cartfile.resolved | 4 ++-- Carthage/Checkouts/ReactiveCocoa | 2 +- Carthage/Checkouts/Result | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cartfile.resolved b/Cartfile.resolved index 8c0b70d..8c27ff1 100644 --- a/Cartfile.resolved +++ b/Cartfile.resolved @@ -2,5 +2,5 @@ github "robrix/Box" "1.2.2" github "Quick/Nimble" "v0.4.2" github "Quick/Quick" "v0.3.1" github "jspahrsummers/xcconfigs" "0.8.1" -github "antitypical/Result" "0.4.3" -github "ReactiveCocoa/ReactiveCocoa" "ad1fd34561d3f2f42e0feb420ace5de321906b78" +github "antitypical/Result" "0.4.4" +github "ReactiveCocoa/ReactiveCocoa" "362c55c0ef976f7ceccc60e63c3f9001d3384748" diff --git a/Carthage/Checkouts/ReactiveCocoa b/Carthage/Checkouts/ReactiveCocoa index ad1fd34..362c55c 160000 --- a/Carthage/Checkouts/ReactiveCocoa +++ b/Carthage/Checkouts/ReactiveCocoa @@ -1 +1 @@ -Subproject commit ad1fd34561d3f2f42e0feb420ace5de321906b78 +Subproject commit 362c55c0ef976f7ceccc60e63c3f9001d3384748 diff --git a/Carthage/Checkouts/Result b/Carthage/Checkouts/Result index f9045c2..81b1896 160000 --- a/Carthage/Checkouts/Result +++ b/Carthage/Checkouts/Result @@ -1 +1 @@ -Subproject commit f9045c2a1fee1af321e29eea7c633be5a6a42532 +Subproject commit 81b189645babd30c6f70ed3f82a83988a467fb02 From 7d9cbab153e46e6216b7e06365f8dfabb7ff69c2 Mon Sep 17 00:00:00 2001 From: Syo Ikeda Date: Fri, 12 Jun 2015 12:20:06 +0900 Subject: [PATCH 2/2] Use Result conjunction operator in launchTask() This restores the structure replaced with flatMapping in c852a5c without use of zipWith(). --- ReactiveTask/Task.swift | 162 ++++++++++++++++++++-------------------- 1 file changed, 79 insertions(+), 83 deletions(-) diff --git a/ReactiveTask/Task.swift b/ReactiveTask/Task.swift index c60bcf8..3bf5522 100644 --- a/ReactiveTask/Task.swift +++ b/ReactiveTask/Task.swift @@ -409,100 +409,96 @@ public func launchTask(taskDescription: TaskDescription) -> SignalProducer flatMap(.Concat) { stdoutPipe -> SignalProducer, ReactiveTaskError> in + SignalProducer(result: Pipe.create(queue, group) &&& Pipe.create(queue, group)) + |> flatMap(.Merge) { stdoutPipe, stderrPipe -> SignalProducer, ReactiveTaskError> in let stdoutProducer = aggregateDataReadFromPipe(stdoutPipe) + let stderrProducer = aggregateDataReadFromPipe(stderrPipe) - return SignalProducer(result: Pipe.create(queue, group)) - |> flatMap(.Merge) { stderrPipe -> SignalProducer, ReactiveTaskError> in - let stderrProducer = aggregateDataReadFromPipe(stderrPipe) - - return SignalProducer { observer, disposable in - let (stdoutAggregated, stdoutAggregatedSink) = SignalProducer.buffer(1) - let (stderrAggregated, stderrAggregatedSink) = SignalProducer.buffer(1) - - stdoutProducer.startWithSignal { signal, signalDisposable in - disposable += signalDisposable - - signal.observe(next: { readData in - switch readData { - case let .Chunk(data): - sendNext(observer, .StandardOutput(data)) - - case let .Aggregated(data): - sendNext(stdoutAggregatedSink, data) - } - }, error: { error in - sendError(observer, error) - sendError(stdoutAggregatedSink, error) - }, completed: { - sendCompleted(stdoutAggregatedSink) - }, interrupted: { - sendInterrupted(stdoutAggregatedSink) - }) - } + return SignalProducer { observer, disposable in + let (stdoutAggregated, stdoutAggregatedSink) = SignalProducer.buffer(1) + let (stderrAggregated, stderrAggregatedSink) = SignalProducer.buffer(1) - stderrProducer.startWithSignal { signal, signalDisposable in - disposable += signalDisposable - - signal.observe(next: { readData in - switch readData { - case let .Chunk(data): - sendNext(observer, .StandardError(data)) - - case let .Aggregated(data): - sendNext(stderrAggregatedSink, data) - } - }, error: { error in - sendError(observer, error) - sendError(stderrAggregatedSink, error) - }, completed: { - sendCompleted(stderrAggregatedSink) - }, interrupted: { - sendInterrupted(stderrAggregatedSink) - }) - } + stdoutProducer.startWithSignal { signal, signalDisposable in + disposable += signalDisposable - task.standardOutput = stdoutPipe.writeHandle - task.standardError = stderrPipe.writeHandle - - dispatch_group_enter(group) - task.terminationHandler = { task in - let terminationStatus = task.terminationStatus - if terminationStatus == EXIT_SUCCESS { - // Wait for stderr to finish, then pass - // through stdout. - disposable += stderrAggregated - |> then(stdoutAggregated) - |> map { data in .Success(Box(data)) } - |> start(observer) - } else { - // Wait for stdout to finish, then pass - // through stderr. - disposable += stdoutAggregated - |> then(stderrAggregated) - |> flatMap(.Concat) { data in - let errorString = (data.length > 0 ? String(UTF8String: UnsafePointer(data.bytes)) : nil) - return SignalProducer(error: .ShellTaskFailed(exitCode: terminationStatus, standardError: errorString)) - } - |> start(observer) - } - dispatch_group_leave(group) + signal.observe(next: { readData in + switch readData { + case let .Chunk(data): + sendNext(observer, .StandardOutput(data)) + + case let .Aggregated(data): + sendNext(stdoutAggregatedSink, data) } + }, error: { error in + sendError(observer, error) + sendError(stdoutAggregatedSink, error) + }, completed: { + sendCompleted(stdoutAggregatedSink) + }, interrupted: { + sendInterrupted(stdoutAggregatedSink) + }) + } - task.launch() - close(stdoutPipe.writeFD) - close(stderrPipe.writeFD) + stderrProducer.startWithSignal { signal, signalDisposable in + disposable += signalDisposable - stdinProducer.startWithSignal { signal, signalDisposable in - disposable += signalDisposable - } + signal.observe(next: { readData in + switch readData { + case let .Chunk(data): + sendNext(observer, .StandardError(data)) - disposable.addDisposable { - task.terminate() + case let .Aggregated(data): + sendNext(stderrAggregatedSink, data) } + }, error: { error in + sendError(observer, error) + sendError(stderrAggregatedSink, error) + }, completed: { + sendCompleted(stderrAggregatedSink) + }, interrupted: { + sendInterrupted(stderrAggregatedSink) + }) + } + + task.standardOutput = stdoutPipe.writeHandle + task.standardError = stderrPipe.writeHandle + + dispatch_group_enter(group) + task.terminationHandler = { task in + let terminationStatus = task.terminationStatus + if terminationStatus == EXIT_SUCCESS { + // Wait for stderr to finish, then pass + // through stdout. + disposable += stderrAggregated + |> then(stdoutAggregated) + |> map { data in .Success(Box(data)) } + |> start(observer) + } else { + // Wait for stdout to finish, then pass + // through stderr. + disposable += stdoutAggregated + |> then(stderrAggregated) + |> flatMap(.Concat) { data in + let errorString = (data.length > 0 ? String(UTF8String: UnsafePointer(data.bytes)) : nil) + return SignalProducer(error: .ShellTaskFailed(exitCode: terminationStatus, standardError: errorString)) + } + |> start(observer) } + dispatch_group_leave(group) + } + + task.launch() + close(stdoutPipe.writeFD) + close(stderrPipe.writeFD) + + stdinProducer.startWithSignal { signal, signalDisposable in + disposable += signalDisposable } + + disposable.addDisposable { + task.terminate() + } + } } |> startWithSignal { signal, taskDisposable in disposable.addDisposable(taskDisposable)