Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 219 additions & 52 deletions ReactiveTask/Task.swift
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ private final class Pipe {
let channel = dispatch_io_create(DISPATCH_IO_STREAM, self.readFD, self.queue) { error in
if error == 0 {
sendCompleted(observer)
} else if error == ECANCELED {
sendInterrupted(observer)
} else {
sendError(observer, .POSIXError(error))
}
Expand All @@ -125,7 +127,9 @@ private final class Pipe {
sendNext(observer, data)
}

if error != 0 {
if error == ECANCELED {
sendInterrupted(observer)
} else if error != 0 {
sendError(observer, .POSIXError(error))
}

Expand Down Expand Up @@ -153,6 +157,8 @@ private final class Pipe {
let channel = dispatch_io_create(DISPATCH_IO_STREAM, self.writeFD, self.queue) { error in
if error == 0 {
sendCompleted(observer)
} else if error == ECANCELED {
sendInterrupted(observer)
} else {
sendError(observer, .POSIXError(error))
}
Expand All @@ -167,7 +173,9 @@ private final class Pipe {
let dispatchData = dispatch_data_create(data.bytes, data.length, self.queue, nil)

dispatch_io_write(channel, 0, dispatchData, self.queue) { (done, data, error) in
if error != 0 {
if error == ECANCELED {
sendInterrupted(observer)
} else if error != 0 {
sendError(observer, .POSIXError(error))
}
}
Expand All @@ -185,12 +193,34 @@ private final class Pipe {
}
}

/// Takes ownership of the read handle from the given pipe, then aggregates all
/// data into one `NSData` object, which is then sent upon the returned signal.
///
/// If `forwardingSink` is non-nil, each incremental piece of data will be sent
/// to it as data is received.
private func aggregateDataReadFromPipe(pipe: Pipe, forwardingSink: SinkOf<NSData>?) -> SignalProducer<NSData, ReactiveTaskError> {
/// Sent when reading from a pipe.
private enum ReadData {
/// A chunk of data, sent as soon as it is received.
case Chunk(NSData)

/// The aggregate of all data sent so far, sent right before completion.
///
/// No further chunks will occur after this has been sent.
case Aggregated(NSData)

/// Convenience constructor for a `Chunk` from `dispatch_data_t`.
static func chunk(data: dispatch_data_t) -> ReadData {
return .Chunk(data as! NSData)
}

/// Convenience constructor for an `Aggregated` from `dispatch_data_t`.
static func aggregated(data: dispatch_data_t?) -> ReadData {
if let data = data {
return .Aggregated(data as! NSData)
} else {
return .Aggregated(NSData())
}
}
}

/// Takes ownership of the read handle from the given pipe, then sends
/// `ReadData` values for all data read.
private func aggregateDataReadFromPipe(pipe: Pipe) -> SignalProducer<ReadData, ReactiveTaskError> {
let readProducer = pipe.transferReadsToProducer()

return SignalProducer { observer, disposable in
Expand All @@ -200,7 +230,8 @@ private func aggregateDataReadFromPipe(pipe: Pipe, forwardingSink: SinkOf<NSData
disposable.addDisposable(signalDisposable)

signal.observe(next: { data in
forwardingSink?.put(data as! NSData)
sendNext(observer, .chunk(data))

if let existingBuffer = buffer.value {
buffer.value = dispatch_data_create_concat(existingBuffer, data)
} else {
Expand All @@ -209,12 +240,7 @@ private func aggregateDataReadFromPipe(pipe: Pipe, forwardingSink: SinkOf<NSData
}, error: { error in
sendError(observer, error)
}, completed: {
if let existingBuffer = buffer.value {
sendNext(observer, existingBuffer as! NSData)
} else {
sendNext(observer, NSData())
}

sendNext(observer, .aggregated(buffer.value))
sendCompleted(observer)
}, interrupted: {
sendInterrupted(observer)
Expand All @@ -223,12 +249,118 @@ private func aggregateDataReadFromPipe(pipe: Pipe, forwardingSink: SinkOf<NSData
}
}

/// Represents events that can occur during the execution of a task that is
/// expected to terminate with a result of type T (upon success).
public enum TaskEvent<T> {
/// Some data arrived from the task on `stdout`.
case StandardOutput(NSData)

/// Some data arrived from the task on `stderr`.
case StandardError(NSData)

/// The task exited successfully (with status 0), and value T was produced
/// as a result.
case Success(Box<T>)

/// The resulting value, if the event is `Success`.
public var value: T? {
switch self {
case .StandardOutput, .StandardError:
return nil

case let .Success(box):
return box.value
}
}

/// Maps over the value embedded in a `Success` event.
public func map<U>(@noescape transform: T -> U) -> TaskEvent<U> {
switch self {
case let .StandardOutput(data):
return .StandardOutput(data)

case let .StandardError(data):
return .StandardError(data)

case let .Success(box):
return .Success(Box(transform(box.value)))
}
}

/// Convenience operator for mapping TaskEvents to SignalProducers.
public func producerMap<U, Error>(@noescape transform: T -> SignalProducer<U, Error>) -> SignalProducer<TaskEvent<U>, Error> {
switch self {
case let .StandardOutput(data):
return SignalProducer<TaskEvent<U>, Error>(value: .StandardOutput(data))

case let .StandardError(data):
return SignalProducer<TaskEvent<U>, Error>(value: .StandardError(data))

case let .Success(box):
return transform(box.value) |> ReactiveCocoa.map { .Success(Box($0)) }
}
}
}

public func == <T: Equatable>(lhs: TaskEvent<T>, rhs: TaskEvent<T>) -> Bool {
switch (lhs, rhs) {
case let (.StandardOutput(left), .StandardOutput(right)):
return left == right

case let (.StandardError(left), .StandardError(right)):
return left == right

case let (.Success(left), .Success(right)):
return left == right

default:
return false
}
}

extension TaskEvent: Printable {
public var description: String {
func dataDescription(data: NSData) -> String {
return NSString(data: data, encoding: NSUTF8StringEncoding).map { $0 as String } ?? data.description
}

switch self {
case let .StandardOutput(data):
return "stdout: " + dataDescription(data)

case let .StandardError(data):
return "stderr: " + dataDescription(data)

case let .Success(value):
return "success(\(value))"
}
}
}

/// Maps the values inside a stream of TaskEvents into new SignalProducers.
public func flatMapTaskEvents<T, U, Error>(strategy: FlattenStrategy, transform: T -> SignalProducer<U, Error>) -> SignalProducer<TaskEvent<T>, Error> -> SignalProducer<TaskEvent<U>, Error> {
return { producer in
return producer |> flatMap(strategy) { taskEvent in
return taskEvent.producerMap(transform)
}
}
}

/// Ignores incremental standard output and standard error data from the given
/// task, sending only a single value with the final, aggregated result.
public func ignoreTaskData<T, Error>(signal: Signal<TaskEvent<T>, Error>) -> Signal<T, Error> {
return signal
|> map { event in
return event.value.map { $0 }
}
|> ignoreNil
}

/// Launches a new shell task, using the parameters from `taskDescription`.
///
/// Returns a producer that will launch the task when started, then send one
/// `NSData` value (representing aggregated data from `stdout`) and complete
/// upon success.
public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf<NSData>? = nil, standardError: SinkOf<NSData>? = nil) -> SignalProducer<NSData, ReactiveTaskError> {
/// Returns a producer that will launch the task when started, then send
/// `TaskEvent`s as execution proceeds.
public func launchTask(taskDescription: TaskDescription) -> SignalProducer<TaskEvent<NSData>, ReactiveTaskError> {
return SignalProducer { observer, disposable in
let queue = dispatch_queue_create(taskDescription.description, DISPATCH_QUEUE_SERIAL)

Expand Down Expand Up @@ -261,61 +393,96 @@ public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf<
}

SignalProducer(result: Pipe.create(queue))
|> flatMap(.Concat) { stdoutPipe -> SignalProducer<NSData, ReactiveTaskError> in
let stdoutProducer = aggregateDataReadFromPipe(stdoutPipe, standardOutput)
|> flatMap(.Concat) { stdoutPipe -> SignalProducer<TaskEvent<NSData>, ReactiveTaskError> in
let stdoutProducer = aggregateDataReadFromPipe(stdoutPipe)

return SignalProducer(result: Pipe.create(queue))
|> flatMap(.Merge) { stderrPipe -> SignalProducer<NSData, ReactiveTaskError> in
let stderrProducer = aggregateDataReadFromPipe(stderrPipe, standardError)
|> flatMap(.Merge) { stderrPipe -> SignalProducer<TaskEvent<NSData>, ReactiveTaskError> in
let stderrProducer = aggregateDataReadFromPipe(stderrPipe)

return SignalProducer { observer, disposable in
let (stdoutAggregated, stdoutAggregatedSink) = SignalProducer<NSData, ReactiveTaskError>.buffer(1)
let (stderrAggregated, stderrAggregatedSink) = SignalProducer<NSData, ReactiveTaskError>.buffer(1)

stdoutProducer.startWithSignal { signal, signalDisposable in
disposable += signalDisposable

signal.observe(next: { readData in
switch readData {
case let .Chunk(data):
sendNext(observer, .StandardOutput(data))

case let .Aggregated(data):
sendNext(stdoutAggregatedSink, data)
}
}, error: { error in
sendError(observer, error)
sendError(stdoutAggregatedSink, error)
}, completed: {
sendCompleted(stdoutAggregatedSink)
}, interrupted: {
sendInterrupted(stdoutAggregatedSink)
})
}

let terminationStatusProducer = SignalProducer<Int32, NoError> { observer, disposable in
task.terminationHandler = { task in
sendNext(observer, task.terminationStatus)
sendCompleted(observer)
stderrProducer.startWithSignal { signal, signalDisposable in
disposable += signalDisposable

signal.observe(next: { readData in
switch readData {
case let .Chunk(data):
sendNext(observer, .StandardError(data))

case let .Aggregated(data):
sendNext(stderrAggregatedSink, data)
}
}, error: { error in
sendError(observer, error)
sendError(stderrAggregatedSink, error)
}, completed: {
sendCompleted(stderrAggregatedSink)
}, interrupted: {
sendInterrupted(stderrAggregatedSink)
})
}

task.standardOutput = stdoutPipe.writeHandle
task.standardError = stderrPipe.writeHandle

if disposable.disposed {
stdoutPipe.closePipe()
stderrPipe.closePipe()

// Clean up the stdin pipe in a roundabout way.
stdinProducer.startWithSignal { signal, signalDisposable in
signalDisposable.dispose()
task.terminationHandler = { task in
let terminationStatus = task.terminationStatus
if terminationStatus == EXIT_SUCCESS {
// Wait for stderr to finish, then pass
// through stdout.
disposable += stderrAggregated
|> then(stdoutAggregated)
|> map { data in .Success(Box(data)) }
|> start(observer)
} else {
// Wait for stdout to finish, then pass
// through stderr.
disposable += stdoutAggregated
|> then(stderrAggregated)
|> flatMap(.Concat) { data in
let errorString = (data.length > 0 ? String(UTF8String: UnsafePointer<CChar>(data.bytes)) : nil)
return SignalProducer(error: .ShellTaskFailed(exitCode: terminationStatus, standardError: errorString))
}
|> start(observer)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When terminationStatus != EXIT_SUCCESS, signal from stderrProducer does not wait signal from stdoutProducer. Signal from other hand does not be needed, but should be waited.
I guess this cause segmentation fault of Carthage/Carthage#497

changed comment position

}

return
}

task.launch()
close(stdoutPipe.writeFD)
close(stderrPipe.writeFD)

stdinProducer.startWithSignal { signal, signalDisposable in
disposable.addDisposable(signalDisposable)
disposable += signalDisposable
}

disposable.addDisposable {
task.terminate()
}
}

return
zip(
stdoutProducer,
stderrProducer,
terminationStatusProducer |> promoteErrors(ReactiveTaskError.self)
)
|> tryMap { stdoutData, stderrData, terminationStatus -> Result<NSData, ReactiveTaskError> in
if terminationStatus == EXIT_SUCCESS {
return .success(stdoutData)
} else {
let errorString = (stderrData.length > 0 ? String(UTF8String: UnsafePointer<CChar>(stderrData.bytes)) : nil)
return .failure(.ShellTaskFailed(exitCode: terminationStatus, standardError: errorString))
}
}
}
}
|> startWithSignal { signal, taskDisposable in
Expand Down
Loading