diff --git a/Sources/DistributedActors/ActorShell.swift b/Sources/DistributedActors/ActorShell.swift index 80f8e8a45..135520f6d 100644 --- a/Sources/DistributedActors/ActorShell.swift +++ b/Sources/DistributedActors/ActorShell.swift @@ -33,8 +33,10 @@ public final class ActorShell: ActorContext, Abs @usableFromInline var behavior: Behavior + @usableFromInline let _parent: AddressableActorRef + @usableFromInline let _address: ActorAddress let _props: Props @@ -659,7 +661,7 @@ public final class ActorShell: ActorContext, Abs with terminationMessage: Message? = nil, file: String = #file, line: UInt = #line ) -> Watchee where Watchee: DeathWatchable { - self.deathWatch.watch(watchee: watchee, with: terminationMessage, myself: self.myself, parent: self._parent, file: file, line: line) + self.deathWatch.watch(watchee: watchee, with: terminationMessage, myself: self, file: file, line: line) return watchee } @@ -780,7 +782,7 @@ extension ActorShell { @inlinable internal func interpretSystemWatch(watcher: AddressableActorRef) { if self.behavior.isStillAlive { self.instrumentation.actorWatchReceived(watchee: self.address, watcher: watcher.address) - self.deathWatch.becomeWatchedBy(watcher: watcher, myself: self.myself) + self.deathWatch.becomeWatchedBy(watcher: watcher, myself: self.myself, parent: self._parent) } else { // so we are in the middle of terminating already anyway watcher._sendSystemMessage(.terminated(ref: self.asAddressable, existenceConfirmed: true)) diff --git a/Sources/DistributedActors/Behaviors.swift b/Sources/DistributedActors/Behaviors.swift index 8c33aab72..5def94bbc 100644 --- a/Sources/DistributedActors/Behaviors.swift +++ b/Sources/DistributedActors/Behaviors.swift @@ -125,7 +125,7 @@ extension Behavior { if signal is Signals.PostStop { try postStop(context) } - return .same // will be ignored + return .stop // will be ignored }, reason: .stopMyself ) diff --git a/Sources/DistributedActors/DeadLetters.swift b/Sources/DistributedActors/DeadLetters.swift index 1059933f4..2faa2fd21 100644 --- a/Sources/DistributedActors/DeadLetters.swift +++ b/Sources/DistributedActors/DeadLetters.swift @@ -206,8 +206,8 @@ public final class DeadLetterOffice { "Dead letter was not delivered \(recipientString)", metadata: { () -> Logger.Metadata in // TODO: more metadata (from Envelope) (e.g. sender) - if !recipientString.isEmpty { - metadata["deadLetter/recipient"] = "\(recipientString)" + if let recipient = deadLetter.recipient?.detailedDescription { + metadata["deadLetter/recipient"] = "\(recipient)" } metadata["deadLetter/location"] = "\(file):\(line)" metadata["deadLetter/message"] = "\(deadLetter.message)" diff --git a/Sources/DistributedActors/DeathWatch.swift b/Sources/DistributedActors/DeathWatch.swift index b1669d9fb..4beb0878c 100644 --- a/Sources/DistributedActors/DeathWatch.swift +++ b/Sources/DistributedActors/DeathWatch.swift @@ -143,11 +143,10 @@ internal struct DeathWatch { // MARK: perform watch/unwatch /// Performed by the sending side of "watch", therefore the `watcher` should equal `context.myself` - public mutating func watch( + mutating func watch( watchee: Watchee, with terminationMessage: Message?, - myself watcher: ActorRef, - parent: AddressableActorRef, + myself watcher: ActorShell, file: String, line: UInt ) where Watchee: DeathWatchable { traceLog_DeathWatch("issue watch: \(watchee) (from \(watcher) (myself))") @@ -158,15 +157,6 @@ internal struct DeathWatch { return } - guard addressableWatchee.address != parent.address else { - // No need to store the parent in watchedBy, since we ALWAYS let the parent know about termination - // and do so by sending an ChildTerminated, which is a sub class of Terminated. - // - // What matters more is that the parent stores the child in its own `watching` -- since thanks to that - // it knows if it has to execute an DeathPact when the child terminates. - return - } - if self.isWatching(addressableWatchee.address) { // While we bail out early here, we DO override whichever value was set as the customized termination message. // This is to enable being able to keep updating the context associated with a watched actor, e.g. if how @@ -175,13 +165,23 @@ internal struct DeathWatch { return } else { - // not yet watching - addressableWatchee._sendSystemMessage(.watch(watchee: addressableWatchee, watcher: AddressableActorRef(watcher)), file: file, line: line) + // not yet watching, so let's add it: self.watching[addressableWatchee] = OnTerminationMessage(customize: terminationMessage) + if !watcher.children.contains(identifiedBy: watchee.asAddressable.address) { + // We ONLY send the watch message if it is NOT our own child; + // + // Because a child ALWAYS sends a .childTerminated to its parent on termination, so there is no need to watch it again, + // other than _us_ remembering that we issued such watch. A child can also never be remote, so the node deathwatcher does not matter either. + // + // A childTerminated is transformed into `Signals.ChildTerminated` which subclasses `Signals.Terminated`, + // so this way we achieve exactly one termination notification already. + addressableWatchee._sendSystemMessage(.watch(watchee: addressableWatchee, watcher: watcher.asAddressable), file: file, line: line) + } + // TODO: this is specific to the transport (!), if we only do XPC but not cluster, this does not make sense if addressableWatchee.address.uniqueNode.node.protocol == "sact" { // FIXME: this is an ugly workaround; proper many transports support would be the right thing - self.subscribeNodeTerminatedEvents(myself: watcher, watchedAddress: addressableWatchee.address, file: file, line: line) + self.subscribeNodeTerminatedEvents(myself: watcher.myself, watchedAddress: addressableWatchee.address, file: file, line: line) } } } @@ -210,13 +210,22 @@ internal struct DeathWatch { // ==== ------------------------------------------------------------------------------------------------------------ // MARK: react to watch or unwatch signals - public mutating func becomeWatchedBy(watcher: AddressableActorRef, myself: ActorRef) { + public mutating func becomeWatchedBy(watcher: AddressableActorRef, myself: ActorRef, parent: AddressableActorRef) { guard watcher.address != myself.address else { traceLog_DeathWatch("Attempted to watch 'myself' [\(myself)], which is a no-op, since such watch's terminated can never be observed. " + "Likely a programming error where the wrong actor ref was passed to watch(), please check your code.") return } + guard watcher != parent else { + // This is fairly defensive, as the parent should already know to never send such message, but let's better be safe than sorry. + // + // no need to become watched by parent, we always notify our parent with `childTerminated` anyway already + // so if we added it also to `watchedBy` we would potentially send terminated twice: terminated and childTerminated, + // which is NOT good -- we only should notify it once, specifically with the childTerminated signal handled by the ActorShell itself. + return + } + traceLog_DeathWatch("Become watched by: \(watcher.address) inside: \(myself)") self.watchedBy.insert(watcher) } @@ -283,7 +292,7 @@ internal struct DeathWatch { traceLog_DeathWatch("[\(myself)] notifyWatchers that we are terminating. Watchers: \(self.watchedBy)...") for watcher in self.watchedBy { - traceLog_DeathWatch("[\(myself)] Notify \(watcher) that we died...") + traceLog_DeathWatch("[\(myself)] Notify \(watcher) that we died") watcher._sendSystemMessage(.terminated(ref: AddressableActorRef(myself), existenceConfirmed: true), file: #file, line: #line) } } diff --git a/Sources/DistributedActorsTestKit/TestProbes.swift b/Sources/DistributedActorsTestKit/TestProbes.swift index c12ed82e1..8d62f9ef8 100644 --- a/Sources/DistributedActorsTestKit/TestProbes.swift +++ b/Sources/DistributedActorsTestKit/TestProbes.swift @@ -20,6 +20,7 @@ import XCTest internal enum ActorTestProbeCommand { case watchCommand(who: AddressableActorRef, file: String, line: UInt) case unwatchCommand(who: AddressableActorRef) + case forwardCommand(send: () -> Void) case stopCommand case realMessage(message: M) @@ -113,13 +114,17 @@ public class ActorTestProbe { // probe commands: case .watchCommand(let who, let file, let line): - cell.deathWatch.watch(watchee: who, with: nil, myself: context.myself, parent: cell._parent, file: file, line: line) + cell.deathWatch.watch(watchee: who, with: nil, myself: context._downcastUnsafe, file: file, line: line) return .same case .unwatchCommand(let who): cell.deathWatch.unwatch(watchee: who, myself: context.myself) return .same + case .forwardCommand(let send): + send() + return .same + case .stopCommand: return .stop } @@ -656,6 +661,17 @@ extension ActorTestProbe { return watchee } + /// Instructs the probe to send a message on our behalf, this is useful to enforce ordering e.g. when the probe has to perform a watch, + /// followed by a message send and we want to ensure that the watch has been processed -- we can do so by forwarding a message through + /// the probe, which ensures ordering between the watch being sent and the message sent just now. + /// + /// Without this it may happen that we asked the probe to watch an actor, and send a message to the actor directly, + /// and our direct message arrives first, before the watch at the destination, causing potentially confusing behavior + /// in some very ordering delicate testing scenarios. + public func forward(_ message: Message, to target: ActorRef, file: String = #file, line: UInt = #line) where Message: Codable { + self.internalRef.tell(ProbeCommands.forwardCommand(send: { () in target.tell(message, file: file, line: line) })) + } + /// Instructs this probe to unwatch the passed in reference. /// /// Note that unwatch MIGHT when used with testProbes since the probe may have already stored diff --git a/Tests/DistributedActorsTests/DeathWatchTests.swift b/Tests/DistributedActorsTests/DeathWatchTests.swift index 8f5c56e7b..089ca75e6 100644 --- a/Tests/DistributedActorsTests/DeathWatchTests.swift +++ b/Tests/DistributedActorsTests/DeathWatchTests.swift @@ -396,6 +396,43 @@ final class DeathWatchTests: ActorSystemXCTestCase { struct TakePoisonError: Error {} + // ==== ---------------------------------------------------------------------------------------------------------------- + // MARK: Watching child actors + + func test_ensureOnlySingleTerminatedSignal_emittedByWatchedChildDies() throws { + let p: ActorTestProbe = self.testKit.spawnTestProbe() + let pp: ActorTestProbe = self.testKit.spawnTestProbe() + + let spawnSomeStoppers = Behavior.setup { context in + let one: ActorRef = try context.spawnWatch( + "stopper", + .receiveMessage { _ in + .stop + } + ) + one.tell("stop") + + return .same + }.receiveSignal { _, signal in switch signal { + case let terminated as Signals.Terminated: + p.tell(terminated) + default: + () // ok + } + pp.tell("\(signal)") + return .same // ignore the child death, remain alive + } + + let _: ActorRef = try system.spawn("parent", spawnSomeStoppers) + + let terminated = try p.expectMessage() + terminated.address.path.shouldEqual(try! ActorPath._user.appending("parent").appending("stopper")) + terminated.existenceConfirmed.shouldBeTrue() + terminated.nodeTerminated.shouldBeFalse() + terminated.shouldBe(Signals.ChildTerminated.self) + try p.expectNoMessage(for: .milliseconds(200)) + } + // ==== ------------------------------------------------------------------------------------------------------------ // MARK: Watching dead letters ref diff --git a/Tests/DistributedActorsTests/InterceptorTests.swift b/Tests/DistributedActorsTests/InterceptorTests.swift index b0e676112..4683ec187 100644 --- a/Tests/DistributedActorsTests/InterceptorTests.swift +++ b/Tests/DistributedActorsTests/InterceptorTests.swift @@ -45,7 +45,10 @@ final class TerminatedInterceptor: Interceptor { switch signal { case let terminated as Signals.Terminated: self.probe.tell(terminated) // we forward all termination signals to someone + case is Signals.PostStop: + () // ok default: + fatalError("Other signal: \(signal)") () } return try target.interpretSignal(context: context, signal: signal) @@ -119,7 +122,7 @@ final class InterceptorTests: ActorSystemXCTestCase { let spyOnTerminationSignals: Interceptor = TerminatedInterceptor(probe: p) - let spawnSomeStoppers: Behavior = .setup { context in + let spawnSomeStoppers = Behavior.setup { context in let one: ActorRef = try context.spawnWatch( "stopperOne", .receiveMessage { _ in @@ -149,7 +152,7 @@ final class InterceptorTests: ActorSystemXCTestCase { // any additional messages let terminated = try p.expectMessage() (terminated.address.name == "stopperOne" || terminated.address.name == "stopperTwo").shouldBeTrue() - try p.expectNoMessage(for: .milliseconds(100)) + try p.expectNoMessage(for: .milliseconds(500)) } class SignalToStringInterceptor: Interceptor { @@ -165,7 +168,7 @@ final class InterceptorTests: ActorSystemXCTestCase { } } - func test_interceptor_shouldRemainWHenReturningStoppingWithPostStop() throws { + func test_interceptor_shouldRemainWhenReturningStoppingWithPostStop() throws { let p: ActorTestProbe = self.testKit.spawnTestProbe() let behavior: Behavior = .receiveMessage { _ in