Skip to content

Commit

Permalink
[Utility] Added thread safety to Process.
Browse files Browse the repository at this point in the history
  • Loading branch information
Steveybrown committed Feb 4, 2017
1 parent baa42a7 commit cb70454
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 31 deletions.
80 changes: 50 additions & 30 deletions Sources/Utility/Process.swift
Expand Up @@ -13,6 +13,7 @@ import class Foundation.ProcessInfo
import enum POSIX.SystemError
import libc
import Basic
import Dispatch

/// Process result data which is available after process termination.
public struct ProcessResult {
Expand Down Expand Up @@ -63,7 +64,7 @@ public struct ProcessResult {

/// Process allows spawning new subprocesses and working with them.
///
/// Note: This class is not thread safe.
/// Note: This class is thread safe.
public final class Process: ObjectIdentifierProtocol {

/// Typealias for process id type.
Expand All @@ -77,16 +78,26 @@ public final class Process: ObjectIdentifierProtocol {

/// The process id of the spawned process, available after the process is launched.
public private(set) var processID = ProcessID()

/// The result of the process execution. Available after process is terminated.
public private(set) var result: ProcessResult?


/// If the subprocess has launched.
// Note: This property is not protected by the serial queue because it is only mutated in `launch()`, which will be called only once.
public private(set) var launched = false


/// The result of the process execution. Available after process is terminated.
public var result: ProcessResult? {
get {
return self.serialQueue.sync {
self._result
}
}
}

/// If process was asked to redirect its output.
public let redirectOutput: Bool


/// The result of the process execution. Available after process is terminated.
private var _result: ProcessResult?

/// The thread to read the output from the process, if redirected.
private var readOutputThread: Thread? = nil

Expand All @@ -95,6 +106,9 @@ public final class Process: ObjectIdentifierProtocol {

/// The output read from the process, if redirected.
private var output: [Int8] = []

/// Queue to protect concurrent reads.
private let serialQueue = DispatchQueue(label: "org.swift.swiftpm.process")

/// Create a new process instance.
///
Expand All @@ -112,7 +126,7 @@ public final class Process: ObjectIdentifierProtocol {

/// Launch the subprocess.
public func launch() throws {
assert(!launched, "It is not allowed to launch the same process object again.")
precondition(!launched, "It is not allowed to launch the same process object again.")
launched = true

// Initialize the spawn attributes.
Expand Down Expand Up @@ -202,7 +216,6 @@ public final class Process: ObjectIdentifierProtocol {
var argv: [UnsafeMutablePointer<CChar>?] = arguments.map{ $0.withCString(strdup) }
argv.append(nil)
defer { for case let arg? in argv { free(arg) } }

var env: [UnsafeMutablePointer<CChar>?] = environment.map{ "\($0.0)=\($0.1)".withCString(strdup) }
env.append(nil)
defer { for case let arg? in env { free(arg) } }
Expand Down Expand Up @@ -233,27 +246,35 @@ public final class Process: ObjectIdentifierProtocol {
/// Blocks the calling process until the subprocess finishes execution.
@discardableResult
public func waitUntilExit() throws -> ProcessResult {
precondition(launched, "The process is not yet launched.")
// If we're reading output, make sure that is finished.
if let thread = readOutputThread {
assert(redirectOutput)
thread.join()
}
// Wait until process finishes execution.
var exitStatus: Int32 = 0
var result = waitpid(processID, &exitStatus, 0)
while (result == -1 && errno == EINTR) {
result = waitpid(processID, &exitStatus, 0)
}
if result == -1 {
throw SystemError.waitpid(errno)
return try serialQueue.sync {
precondition(launched, "The process is not yet launched.")

// If the process has already finsihed, return it.
if let existingResult = _result {
return existingResult
}

// If we're reading output, make sure that is finished.
if let thread = readOutputThread {
assert(redirectOutput)
thread.join()
}
// Wait until process finishes execution.
var exitStatus: Int32 = 0
var result = waitpid(processID, &exitStatus, 0)
while (result == -1 && errno == EINTR) {
result = waitpid(processID, &exitStatus, 0)
}
if result == -1 {
throw SystemError.waitpid(errno)
}

// Construct the result.
let outputResult = readOutputError.map(Result.init) ?? Result(output)
let executionResult = ProcessResult(exitStatus: exitStatus, output: outputResult)
self._result = executionResult
return executionResult
}

// Construct the result.
let outputResult = readOutputError.map(Result.init) ?? Result(output)
let executionResult = ProcessResult(exitStatus: exitStatus, output: outputResult)
self.result = executionResult
return executionResult
}

/// Reads the output from the passed fd and writes in the output variable
Expand Down Expand Up @@ -293,7 +314,6 @@ public final class Process: ObjectIdentifierProtocol {
/// Note: This will signal all processes in the process group.
public func signal(_ signal: Int32) {
assert(launched, "The process is not yet launched.")
assert(result == nil, "Trying to signal \(signal) but the process has already terminated. \(result.debugDescription)")
_ = libc.kill(-processID, signal)
}
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/Utility/ProcessSet.swift
Expand Up @@ -119,7 +119,7 @@ public final class ProcessSet {
private func signalAll(_ signal: Int32) {
serialQueue.sync {
// Signal all active processes.
for process in self.processes where process.result == nil {
for process in self.processes {
process.signal(signal)
}
}
Expand Down
25 changes: 25 additions & 0 deletions Tests/UtilityTests/ProcessTests.swift
Expand Up @@ -100,10 +100,35 @@ class ProcessTests: XCTestCase {
XCTAssertFalse(try Process.running(ProcessID(child), orDefunct: true))
}
}

func testThreadSafetyOnWaitUntilExit() throws {
let process = Process(args: "echo", "hello")
try process.launch()

var result1: String = ""
var result2: String = ""

let t1 = Thread {
result1 = try! process.waitUntilExit().utf8Output()
}

let t2 = Thread {
result2 = try! process.waitUntilExit().utf8Output()
}

t1.start()
t2.start()
t1.join()
t2.join()

XCTAssertEqual(result1, "hello\n")
XCTAssertEqual(result2, "hello\n")
}

static var allTests = [
("testBasics", testBasics),
("testPopen", testPopen),
("testSignals", testSignals),
("testThreadSafetyOnWaitUntilExit", testThreadSafetyOnWaitUntilExit),
]
}

0 comments on commit cb70454

Please sign in to comment.