Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ case let unknown:
exit(-1)
}

system.park()
try! system.park()
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ try! _file.append("service booted...\n")
let service = try XPCActorableService(system, XPCEchoService.init)

service.park()
system.park() // TODO: system park should invoke the service park, we only need to park once for XPC to kickoff dispatch_main
try! system.park() // TODO: system park should invoke the service park, we only need to park once for XPC to kickoff dispatch_main
// unreachable, park never exits
2 changes: 1 addition & 1 deletion Samples/Sources/SampleCRDTPlayground/CRDTPlayground.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ struct CRDTPlayground {
peer.tell("write-2-\(peer.path.name)")
}

first.park(atMost: time)
try! first.park(atMost: time)
}
}

Expand Down
5 changes: 3 additions & 2 deletions Samples/Sources/SampleCluster/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ system.cluster.events.subscribe(eventsListener) // <2>
// case announcement(String)
// case text(String, from: ActorRef<ChatMessage>)
// }

// tag::cluster-sample-actors-discover-and-chat[]
let chatter: ActorRef<String> = try system.spawn(
"chatter",
Expand All @@ -87,7 +88,7 @@ let chatter: ActorRef<String> = try system.spawn(
system.receptionist.register(chatter, with: "chat-room") // <1>

if system.cluster.uniqueNode.port == 7337 { // <2>
let greeter = try system.spawn(
try system.spawn(
"greeter",
of: Reception.Listing<ActorRef<String>>.self,
.setup { context in // <3>
Expand All @@ -105,4 +106,4 @@ if system.cluster.uniqueNode.port == 7337 { // <2>

// end::cluster-sample-actors-discover-and-chat[]

system.park(atMost: .seconds(6000))
try! system.park(atMost: .seconds(6000))
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ struct DiningPhilosophers {
let _: Philosopher.Ref = try system.spawn("Cory", Philosopher(left: fork3, right: fork4).behavior)
let _: Philosopher.Ref = try system.spawn("Norman", Philosopher(left: fork4, right: fork5).behavior)

system.park(atMost: time)
try system.park(atMost: time)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,6 @@ struct DistributedDiningPhilosophers {
_ = try systemC.spawn("Cory", Philosopher(left: fork3, right: fork4).behavior)
_ = try systemC.spawn("Norman", Philosopher(left: fork4, right: fork5).behavior)

systemA.park(atMost: time)
try systemA.park(atMost: time)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ struct ScoreGame {
// which other non participants may observe as well.
_ = try first.spawn("game-engine", self.game(with: players))

first.park(atMost: time)
try first.park(atMost: time)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,6 @@ struct DistributedDiningPhilosophers {
_ = try systemC.spawn("Cory") { Philosopher(context: $0, leftFork: fork3, rightFork: fork4) }
_ = try systemC.spawn("Norman") { Philosopher(context: $0, leftFork: fork4, rightFork: fork5) }

systemA.park(atMost: time)
try systemA.park(atMost: time)
}
}
2 changes: 1 addition & 1 deletion Samples/Sources/SampleMetrics/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,5 @@ for i in 1 ... 10 {

Thread.sleep(.seconds(100))

system.shutdown().wait()
try! system.shutdown().wait()
print("~~~~~~~~~~~~~~~ SHUTTING DOWN ~~~~~~~~~~~~~~~")
2 changes: 1 addition & 1 deletion Samples/Sources/XPCActorCaller/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ reply.withTimeout(after: .seconds(3))._onComplete {
system.log.info("Reply from service.greet(Capybara) = \($0)")
}

system.park()
try! system.park()

// end::xpc_example[]
#endif
57 changes: 38 additions & 19 deletions Sources/DistributedActors/ActorSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public final class ActorSystem {

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Shutdown
private var shutdownReceptacle = BlockingReceptacle<Void>()
private var shutdownReceptacle = BlockingReceptacle<Error?>()
private let shutdownLock = Lock()

/// Greater than 0 shutdown has been initiated / is in progress.
Expand Down Expand Up @@ -322,7 +322,7 @@ public final class ActorSystem {
/// This call is also offered to underlying transports which may have to perform the blocking wait themselves
/// (most notably, `ProcessIsolated` does so). Please refer to your configured transports documentation,
/// to learn about exact semantics of parking a system while using them.
public func park(atMost parkTimeout: TimeAmount? = nil) {
public func park(atMost parkTimeout: TimeAmount? = nil) throws {
let howLongParkingMsg = parkTimeout == nil ? "indefinitely" : "for \(parkTimeout!.prettyDescription)"
self.log.info("Parking actor system \(howLongParkingMsg)...")

Expand All @@ -332,9 +332,13 @@ public final class ActorSystem {
}

if let maxParkingTime = parkTimeout {
self.shutdownReceptacle.wait(atMost: maxParkingTime)
if let error = self.shutdownReceptacle.wait(atMost: maxParkingTime).flatMap({ $0 }) {
throw error
}
} else {
self.shutdownReceptacle.wait()
if let error = self.shutdownReceptacle.wait() {
throw error
}
}
}

Expand All @@ -345,49 +349,63 @@ public final class ActorSystem {
#endif

public struct Shutdown {
private let receptacle: BlockingReceptacle<Void>
private let receptacle: BlockingReceptacle<Error?>

init(receptacle: BlockingReceptacle<Void>) {
init(receptacle: BlockingReceptacle<Error?>) {
self.receptacle = receptacle
}

public func wait(atMost timeout: TimeAmount) throws {
guard self.receptacle.wait(atMost: timeout) != nil else {
throw TimeoutError(message: "Shutdown did not complete", timeout: timeout)
if let error = self.receptacle.wait(atMost: timeout).flatMap({ $0 }) {
throw error
}
}

public func wait() {
self.receptacle.wait()
public func wait() throws {
if let error = self.receptacle.wait() {
throw error
}
}
}

/// Forcefully stops this actor system and all actors that live within. This is an asynchronous operation
/// and will be executed on a separate thread.
/// Forcefully stops this actor system and all actors that live within it.
/// This is an asynchronous operation and will be executed on a separate thread.
///
/// You can use `shutdown().wait()` to synchronously await on the system's termination,
/// or provide a callback to be executed after the system has completed it's shutdown.
///
/// - Parameters:
/// - queue: allows configuring on which dispatch queue the shutdown operation will be finalized.
/// - afterShutdownCompleted: optional callback to be invoked when the system has completed shutting down.
/// Will be invoked on the passed in `queue` (which defaults to `DispatchQueue.global()`).
/// - Returns: A `Shutdown` value that can be waited upon until the system has completed the shutdown.
@discardableResult
public func shutdown() -> Shutdown {
public func shutdown(queue: DispatchQueue = DispatchQueue.global(), afterShutdownCompleted: @escaping (Error?) -> Void = { _ in () }) -> Shutdown {
guard self.shutdownFlag.add(1) == 0 else {
// shutdown already kicked off by someone else
return Shutdown(receptacle: self.shutdownReceptacle)
}

self.settings.plugins.stopAll(self)

DispatchQueue.global().async {
queue.async {
self.log.log(level: .debug, "Shutting down actor system [\(self.name)]. All actors will be stopped.", file: #file, function: #function, line: #line)
if let cluster = self._cluster {
let receptacle = BlockingReceptacle<Void>()
cluster.ref.tell(.command(.shutdown(receptacle))) // FIXME: should be shutdown
receptacle.wait(atMost: .milliseconds(300)) // FIXME: configure
cluster.ref.tell(.command(.shutdown(receptacle)))
receptacle.wait()
}
self.userProvider.stopAll()
self.systemProvider.stopAll()
self.dispatcher.shutdown()

try! self._eventLoopGroup.syncShutdownGracefully()
self._receptionistRef = self.deadLetters.adapted()
do {
try self._eventLoopGroup.syncShutdownGracefully()
self._receptionistRef = self.deadLetters.adapted()
} catch {
self.shutdownReceptacle.offerOnce(error)
afterShutdownCompleted(error)
}

/// Only once we've shutdown all dispatchers and loops, we clear cycles between the serialization and system,
/// as they should never be invoked anymore.
Expand All @@ -396,7 +414,8 @@ public final class ActorSystem {
self._cluster = nil
}

self.shutdownReceptacle.offerOnce(())
self.shutdownReceptacle.offerOnce(nil)
afterShutdownCompleted(nil)
}

return Shutdown(receptacle: self.shutdownReceptacle)
Expand Down
2 changes: 1 addition & 1 deletion Sources/DistributedActors/Refs.swift
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ public class Guardian {
system.log.error("\(message)", metadata: ["actor/path": "\(self.address.path)"])

_ = try! Thread {
system.shutdown().wait() // so we don't block anyone who sent us this signal (as we execute synchronously in the guardian)
try! system.shutdown().wait() // so we don't block anyone who sent us this signal (as we execute synchronously in the guardian)
print("Guardian shutdown of [\(system.name)] ActorSystem complete.")
}
#if os(iOS) || os(watchOS) || os(tvOS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private func setUp() {
}

private func tearDown() {
system.shutdown().wait()
try! system.shutdown().wait()
_system = nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private func setUp(and postSetUp: () -> Void = { () in () }) {
}

private func tearDown() {
system.shutdown().wait()
try! system.shutdown().wait()
_system = nil
}

Expand Down Expand Up @@ -255,6 +255,6 @@ private func bench_actors_ping_pong(numActors: Int) -> (Int) -> Void {
let time = SwiftBenchmarkTools.Timer().getTimeAsInt() - startNanoTime

pprint(" \(totalNumMessages) messages by \(numActors) actors took: \(time.milliseconds) ms (total: \(totalNumMessages / time.milliseconds * 1000) msg/s)")
system.shutdown().wait()
try! system.shutdown().wait()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ private func setUp(and postSetUp: () -> Void = { () in () }) {
}

private func tearDown() {
system.shutdown().wait()
try! system.shutdown().wait()
_system = nil
_pongNode?.shutdown().wait()
try! _pongNode?.shutdown().wait()
_pongNode = nil
}

Expand Down Expand Up @@ -243,6 +243,6 @@ private func bench_actors_remote_ping_pong(numActors: Int) -> (Int) -> Void {
let time = SwiftBenchmarkTools.Timer().getTimeAsInt() - startNanoTime

pprint(" \(totalNumMessages) messages by \(numActors) actors took: \(time.milliseconds) ms (total: \(totalNumMessages / time.milliseconds * 1000) msg/s)")
system.shutdown().wait()
try! system.shutdown().wait()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private func setUp(and postSetUp: () -> Void) {
}

private func tearDown() {
system.shutdown().wait()
try! system.shutdown().wait()
_system = nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private func setUp(and postSetUp: () -> Void = { () in () }) {
}

private func tearDown() {
system.shutdown().wait()
try! system.shutdown().wait()
_system = nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func bench_SpawnTopLevel(_ actorCount: Int) throws {
print("Spawned \(actorCount) top-level actors in \(String(format: "%.3f", seconds)) seconds. (\(perSecond) actors/s)")

let shutdownStart = timer.getTime()
system.shutdown().wait()
try! system.shutdown().wait()
let shutdownStop = timer.getTime()
let shutdownTime = timer.diffTimeInNanoSeconds(from: shutdownStart, to: shutdownStop)
let shutdownSeconds = (Double(shutdownTime) / 1_000_000_000)
Expand Down Expand Up @@ -99,7 +99,7 @@ func bench_SpawnChildren(_ actorCount: Int) throws {
print("Spawned \(actorCount) child actors in \(String(format: "%.3f", seconds)) seconds. (\(perSecond) actors/s)")

let shutdownStart = timer.getTime()
system.shutdown().wait()
try! system.shutdown().wait()
let shutdownStop = timer.getTime()
let shutdownTime = timer.diffTimeInNanoSeconds(from: shutdownStart, to: shutdownStop)
let shutdownSeconds = (Double(shutdownTime) / 1_000_000_000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private func setUp(and postSetUp: () -> Void = { () in () }) {
}

private func tearDown() {
system.shutdown().wait()
try! system.shutdown().wait()
_system = nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private func setUp(and postSetUp: () -> Void = { () in
}

private func tearDown() {
system.shutdown().wait()
try! system.shutdown().wait()
_system = nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase {
self.printAllCapturedLogs()
}

self._nodes.forEach { $0.shutdown().wait() }
self._nodes.forEach { try! $0.shutdown().wait() }

self._nodes = []
self._testKits = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ final class ActorSingletonPluginTests: ActorSystemXCTestCase {
}

defer {
system.shutdown().wait()
try! system.shutdown().wait()
}

let replyProbe = ActorTestKit(system).spawnTestProbe(expecting: String.self)
Expand All @@ -50,7 +50,7 @@ final class ActorSingletonPluginTests: ActorSystemXCTestCase {
}

defer {
system.shutdown().wait()
try! system.shutdown().wait()
}

let replyProbe = ActorTestKit(system).spawnTestProbe(expecting: String.self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ final class ActorTestKitTests: XCTestCase {
}

override func tearDown() {
self.system.shutdown().wait()
try! self.system.shutdown().wait()
}

// tag::test[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ClusteringDocExamples: XCTestCase {
}
// end::config_tls[]

system.shutdown().wait()
try! system.shutdown().wait()
}

func example_config_tls_passphrase() throws {
Expand All @@ -44,6 +44,6 @@ class ClusteringDocExamples: XCTestCase {
}
// end::config_tls_passphrase[]

system.shutdown().wait()
try! system.shutdown().wait()
}
}
Loading