Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cartfile.resolved
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ github "robrix/Box" "1.2.2"
github "Quick/Nimble" "v0.4.2"
github "Quick/Quick" "v0.3.1"
github "jspahrsummers/xcconfigs" "0.8.1"
github "antitypical/Result" "0.4.3"
github "ReactiveCocoa/ReactiveCocoa" "ad1fd34561d3f2f42e0feb420ace5de321906b78"
github "antitypical/Result" "0.4.4"
github "ReactiveCocoa/ReactiveCocoa" "362c55c0ef976f7ceccc60e63c3f9001d3384748"
2 changes: 1 addition & 1 deletion Carthage/Checkouts/ReactiveCocoa
Submodule ReactiveCocoa updated 36 files
+118 −124 Documentation/FrameworkOverview.md
+1 −1 ReactiveCocoa/Objective-C/MKAnnotationView+RACSignalSupport.m
+1 −1 ReactiveCocoa/Objective-C/NSControl+RACTextSignalSupport.m
+1 −1 ReactiveCocoa/Objective-C/NSFileHandle+RACSupport.m
+2 −7 ReactiveCocoa/Objective-C/NSObject+RACDescription.h
+12 −12 ReactiveCocoa/Objective-C/NSObject+RACDescription.m
+3 −3 ReactiveCocoa/Objective-C/NSObject+RACLifting.m
+3 −3 ReactiveCocoa/Objective-C/NSObject+RACPropertySubscribing.h
+3 −3 ReactiveCocoa/Objective-C/NSObject+RACPropertySubscribing.m
+1 −1 ReactiveCocoa/Objective-C/NSObject+RACSelectorSignal.m
+1 −1 ReactiveCocoa/Objective-C/NSText+RACSignalSupport.m
+1 −1 ReactiveCocoa/Objective-C/RACCommand.m
+1 −1 ReactiveCocoa/Objective-C/RACEagerSequence.m
+1 −1 ReactiveCocoa/Objective-C/RACQueueScheduler+Subclass.h
+2 −2 ReactiveCocoa/Objective-C/RACQueueScheduler.m
+15 −0 ReactiveCocoa/Objective-C/RACSignal+Operations.h
+13 −2 ReactiveCocoa/Objective-C/RACSignal+Operations.m
+6 −6 ReactiveCocoa/Objective-C/RACStream.m
+1 −1 ReactiveCocoa/Objective-C/RACUnarySequence.m
+1 −1 ReactiveCocoa/Objective-C/UIActionSheet+RACSignalSupport.m
+2 −2 ReactiveCocoa/Objective-C/UIAlertView+RACSignalSupport.m
+1 −1 ReactiveCocoa/Objective-C/UICollectionReusableView+RACSignalSupport.m
+1 −1 ReactiveCocoa/Objective-C/UIControl+RACSignalSupport.m
+1 −1 ReactiveCocoa/Objective-C/UIGestureRecognizer+RACSignalSupport.m
+1 −1 ReactiveCocoa/Objective-C/UIImagePickerController+RACSignalSupport.m
+1 −1 ReactiveCocoa/Objective-C/UITableViewCell+RACSignalSupport.m
+1 −1 ReactiveCocoa/Objective-C/UITableViewHeaderFooterView+RACSignalSupport.m
+1 −1 ReactiveCocoa/Objective-C/UITextField+RACSignalSupport.m
+1 −1 ReactiveCocoa/Objective-C/UITextView+RACSignalSupport.m
+2 −6 ReactiveCocoa/Swift/Action.swift
+5 −5 ReactiveCocoa/Swift/DebugSinkOf.swift
+1 −2 ReactiveCocoa/Swift/ObjectiveCBridging.swift
+3 −6 ReactiveCocoa/Swift/Property.swift
+1 −2 ReactiveCocoa/Swift/Signal.swift
+15 −15 ReactiveCocoa/Swift/SignalProducer.swift
+59 −0 ReactiveCocoaTests/Objective-C/RACSignalSpec.m
2 changes: 1 addition & 1 deletion Carthage/Checkouts/Result
162 changes: 79 additions & 83 deletions ReactiveTask/Task.swift
Original file line number Diff line number Diff line change
Expand Up @@ -409,100 +409,96 @@ public func launchTask(taskDescription: TaskDescription) -> SignalProducer<TaskE
}
}

SignalProducer(result: Pipe.create(queue, group))
|> flatMap(.Concat) { stdoutPipe -> SignalProducer<TaskEvent<NSData>, ReactiveTaskError> in
SignalProducer(result: Pipe.create(queue, group) &&& Pipe.create(queue, group))
|> flatMap(.Merge) { stdoutPipe, stderrPipe -> SignalProducer<TaskEvent<NSData>, ReactiveTaskError> in
let stdoutProducer = aggregateDataReadFromPipe(stdoutPipe)
let stderrProducer = aggregateDataReadFromPipe(stderrPipe)

return SignalProducer(result: Pipe.create(queue, group))
|> flatMap(.Merge) { stderrPipe -> SignalProducer<TaskEvent<NSData>, ReactiveTaskError> in
let stderrProducer = aggregateDataReadFromPipe(stderrPipe)

return SignalProducer { observer, disposable in
let (stdoutAggregated, stdoutAggregatedSink) = SignalProducer<NSData, ReactiveTaskError>.buffer(1)
let (stderrAggregated, stderrAggregatedSink) = SignalProducer<NSData, ReactiveTaskError>.buffer(1)

stdoutProducer.startWithSignal { signal, signalDisposable in
disposable += signalDisposable

signal.observe(next: { readData in
switch readData {
case let .Chunk(data):
sendNext(observer, .StandardOutput(data))

case let .Aggregated(data):
sendNext(stdoutAggregatedSink, data)
}
}, error: { error in
sendError(observer, error)
sendError(stdoutAggregatedSink, error)
}, completed: {
sendCompleted(stdoutAggregatedSink)
}, interrupted: {
sendInterrupted(stdoutAggregatedSink)
})
}
return SignalProducer { observer, disposable in
let (stdoutAggregated, stdoutAggregatedSink) = SignalProducer<NSData, ReactiveTaskError>.buffer(1)
let (stderrAggregated, stderrAggregatedSink) = SignalProducer<NSData, ReactiveTaskError>.buffer(1)

stderrProducer.startWithSignal { signal, signalDisposable in
disposable += signalDisposable

signal.observe(next: { readData in
switch readData {
case let .Chunk(data):
sendNext(observer, .StandardError(data))

case let .Aggregated(data):
sendNext(stderrAggregatedSink, data)
}
}, error: { error in
sendError(observer, error)
sendError(stderrAggregatedSink, error)
}, completed: {
sendCompleted(stderrAggregatedSink)
}, interrupted: {
sendInterrupted(stderrAggregatedSink)
})
}
stdoutProducer.startWithSignal { signal, signalDisposable in
disposable += signalDisposable

task.standardOutput = stdoutPipe.writeHandle
task.standardError = stderrPipe.writeHandle

dispatch_group_enter(group)
task.terminationHandler = { task in
let terminationStatus = task.terminationStatus
if terminationStatus == EXIT_SUCCESS {
// Wait for stderr to finish, then pass
// through stdout.
disposable += stderrAggregated
|> then(stdoutAggregated)
|> map { data in .Success(Box(data)) }
|> start(observer)
} else {
// Wait for stdout to finish, then pass
// through stderr.
disposable += stdoutAggregated
|> then(stderrAggregated)
|> flatMap(.Concat) { data in
let errorString = (data.length > 0 ? String(UTF8String: UnsafePointer<CChar>(data.bytes)) : nil)
return SignalProducer(error: .ShellTaskFailed(exitCode: terminationStatus, standardError: errorString))
}
|> start(observer)
}
dispatch_group_leave(group)
signal.observe(next: { readData in
switch readData {
case let .Chunk(data):
sendNext(observer, .StandardOutput(data))

case let .Aggregated(data):
sendNext(stdoutAggregatedSink, data)
}
}, error: { error in
sendError(observer, error)
sendError(stdoutAggregatedSink, error)
}, completed: {
sendCompleted(stdoutAggregatedSink)
}, interrupted: {
sendInterrupted(stdoutAggregatedSink)
})
}

task.launch()
close(stdoutPipe.writeFD)
close(stderrPipe.writeFD)
stderrProducer.startWithSignal { signal, signalDisposable in
disposable += signalDisposable

stdinProducer.startWithSignal { signal, signalDisposable in
disposable += signalDisposable
}
signal.observe(next: { readData in
switch readData {
case let .Chunk(data):
sendNext(observer, .StandardError(data))

disposable.addDisposable {
task.terminate()
case let .Aggregated(data):
sendNext(stderrAggregatedSink, data)
}
}, error: { error in
sendError(observer, error)
sendError(stderrAggregatedSink, error)
}, completed: {
sendCompleted(stderrAggregatedSink)
}, interrupted: {
sendInterrupted(stderrAggregatedSink)
})
}

task.standardOutput = stdoutPipe.writeHandle
task.standardError = stderrPipe.writeHandle

dispatch_group_enter(group)
task.terminationHandler = { task in
let terminationStatus = task.terminationStatus
if terminationStatus == EXIT_SUCCESS {
// Wait for stderr to finish, then pass
// through stdout.
disposable += stderrAggregated
|> then(stdoutAggregated)
|> map { data in .Success(Box(data)) }
|> start(observer)
} else {
// Wait for stdout to finish, then pass
// through stderr.
disposable += stdoutAggregated
|> then(stderrAggregated)
|> flatMap(.Concat) { data in
let errorString = (data.length > 0 ? String(UTF8String: UnsafePointer<CChar>(data.bytes)) : nil)
return SignalProducer(error: .ShellTaskFailed(exitCode: terminationStatus, standardError: errorString))
}
|> start(observer)
}
dispatch_group_leave(group)
}

task.launch()
close(stdoutPipe.writeFD)
close(stderrPipe.writeFD)

stdinProducer.startWithSignal { signal, signalDisposable in
disposable += signalDisposable
}

disposable.addDisposable {
task.terminate()
}
}
}
|> startWithSignal { signal, taskDisposable in
disposable.addDisposable(taskDisposable)
Expand Down