Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Sources/DistributedActors/ActorShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ public final class ActorShell<Message: ActorMessage>: ActorContext<Message>, Abs
@usableFromInline
var behavior: Behavior<Message>

@usableFromInline
let _parent: AddressableActorRef

@usableFromInline
let _address: ActorAddress

let _props: Props
Expand Down Expand Up @@ -659,7 +661,7 @@ public final class ActorShell<Message: ActorMessage>: ActorContext<Message>, Abs
with terminationMessage: Message? = nil,
file: String = #file, line: UInt = #line
) -> Watchee where Watchee: DeathWatchable {
self.deathWatch.watch(watchee: watchee, with: terminationMessage, myself: self.myself, parent: self._parent, file: file, line: line)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • the parent is super unnecessary for ISSUING a watch;
    • we don't care about OUR parent, only if we are the parent of the being watched actor, as such we can pass myself (self in order to have _children to check them inside there)
  • it optionally necessary for HANDLING a watch.
    • it's not required, but we're being defensive now, even if someone sent us such system message we'd do the right thing

self.deathWatch.watch(watchee: watchee, with: terminationMessage, myself: self, file: file, line: line)
return watchee
}

Expand Down Expand Up @@ -780,7 +782,7 @@ extension ActorShell {
@inlinable internal func interpretSystemWatch(watcher: AddressableActorRef) {
if self.behavior.isStillAlive {
self.instrumentation.actorWatchReceived(watchee: self.address, watcher: watcher.address)
self.deathWatch.becomeWatchedBy(watcher: watcher, myself: self.myself)
self.deathWatch.becomeWatchedBy(watcher: watcher, myself: self.myself, parent: self._parent)
} else {
// so we are in the middle of terminating already anyway
watcher._sendSystemMessage(.terminated(ref: self.asAddressable, existenceConfirmed: true))
Expand Down
2 changes: 1 addition & 1 deletion Sources/DistributedActors/Behaviors.swift
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ extension Behavior {
if signal is Signals.PostStop {
try postStop(context)
}
return .same // will be ignored
return .stop // will be ignored
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just cosmetic tbh

},
reason: .stopMyself
)
Expand Down
4 changes: 2 additions & 2 deletions Sources/DistributedActors/DeadLetters.swift
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ public final class DeadLetterOffice {
"Dead letter was not delivered \(recipientString)",
metadata: { () -> Logger.Metadata in
// TODO: more metadata (from Envelope) (e.g. sender)
if !recipientString.isEmpty {
metadata["deadLetter/recipient"] = "\(recipientString)"
if let recipient = deadLetter.recipient?.detailedDescription {
metadata["deadLetter/recipient"] = "\(recipient)"
}
metadata["deadLetter/location"] = "\(file):\(line)"
metadata["deadLetter/message"] = "\(deadLetter.message)"
Expand Down
43 changes: 26 additions & 17 deletions Sources/DistributedActors/DeathWatch.swift
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,10 @@ internal struct DeathWatch<Message: ActorMessage> {
// MARK: perform watch/unwatch

/// Performed by the sending side of "watch", therefore the `watcher` should equal `context.myself`
public mutating func watch<Watchee>(
mutating func watch<Watchee>(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does not have to be public

watchee: Watchee,
with terminationMessage: Message?,
myself watcher: ActorRef<Message>,
parent: AddressableActorRef,
myself watcher: ActorShell<Message>,
file: String, line: UInt
) where Watchee: DeathWatchable {
traceLog_DeathWatch("issue watch: \(watchee) (from \(watcher) (myself))")
Expand All @@ -158,15 +157,6 @@ internal struct DeathWatch<Message: ActorMessage> {
return
}

guard addressableWatchee.address != parent.address else {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whooop! this is in the wrong function, moved to becomeWatchedBy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙀

// No need to store the parent in watchedBy, since we ALWAYS let the parent know about termination
// and do so by sending an ChildTerminated, which is a sub class of Terminated.
//
// What matters more is that the parent stores the child in its own `watching` -- since thanks to that
// it knows if it has to execute an DeathPact when the child terminates.
return
}

if self.isWatching(addressableWatchee.address) {
// While we bail out early here, we DO override whichever value was set as the customized termination message.
// This is to enable being able to keep updating the context associated with a watched actor, e.g. if how
Expand All @@ -175,13 +165,23 @@ internal struct DeathWatch<Message: ActorMessage> {

return
} else {
// not yet watching
addressableWatchee._sendSystemMessage(.watch(watchee: addressableWatchee, watcher: AddressableActorRef(watcher)), file: file, line: line)
// not yet watching, so let's add it:
self.watching[addressableWatchee] = OnTerminationMessage(customize: terminationMessage)

if !watcher.children.contains(identifiedBy: watchee.asAddressable.address) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we proactively avoid sending a message which would have been ignored anyway

// We ONLY send the watch message if it is NOT our own child;
//
// Because a child ALWAYS sends a .childTerminated to its parent on termination, so there is no need to watch it again,
// other than _us_ remembering that we issued such watch. A child can also never be remote, so the node deathwatcher does not matter either.
//
// A childTerminated is transformed into `Signals.ChildTerminated` which subclasses `Signals.Terminated`,
// so this way we achieve exactly one termination notification already.
addressableWatchee._sendSystemMessage(.watch(watchee: addressableWatchee, watcher: watcher.asAddressable), file: file, line: line)
}

// TODO: this is specific to the transport (!), if we only do XPC but not cluster, this does not make sense
if addressableWatchee.address.uniqueNode.node.protocol == "sact" { // FIXME: this is an ugly workaround; proper many transports support would be the right thing
self.subscribeNodeTerminatedEvents(myself: watcher, watchedAddress: addressableWatchee.address, file: file, line: line)
self.subscribeNodeTerminatedEvents(myself: watcher.myself, watchedAddress: addressableWatchee.address, file: file, line: line)
}
}
}
Expand Down Expand Up @@ -210,13 +210,22 @@ internal struct DeathWatch<Message: ActorMessage> {
// ==== ------------------------------------------------------------------------------------------------------------
// MARK: react to watch or unwatch signals

public mutating func becomeWatchedBy(watcher: AddressableActorRef, myself: ActorRef<Message>) {
public mutating func becomeWatchedBy(watcher: AddressableActorRef, myself: ActorRef<Message>, parent: AddressableActorRef) {
guard watcher.address != myself.address else {
traceLog_DeathWatch("Attempted to watch 'myself' [\(myself)], which is a no-op, since such watch's terminated can never be observed. " +
"Likely a programming error where the wrong actor ref was passed to watch(), please check your code.")
return
}

guard watcher != parent else {
// This is fairly defensive, as the parent should already know to never send such message, but let's better be safe than sorry.
//
// no need to become watched by parent, we always notify our parent with `childTerminated` anyway already
// so if we added it also to `watchedBy` we would potentially send terminated twice: terminated and childTerminated,
// which is NOT good -- we only should notify it once, specifically with the childTerminated signal handled by the ActorShell itself.
return
}

traceLog_DeathWatch("Become watched by: \(watcher.address) inside: \(myself)")
self.watchedBy.insert(watcher)
}
Expand Down Expand Up @@ -283,7 +292,7 @@ internal struct DeathWatch<Message: ActorMessage> {
traceLog_DeathWatch("[\(myself)] notifyWatchers that we are terminating. Watchers: \(self.watchedBy)...")

for watcher in self.watchedBy {
traceLog_DeathWatch("[\(myself)] Notify \(watcher) that we died...")
traceLog_DeathWatch("[\(myself)] Notify \(watcher) that we died")
watcher._sendSystemMessage(.terminated(ref: AddressableActorRef(myself), existenceConfirmed: true), file: #file, line: #line)
}
}
Expand Down
18 changes: 17 additions & 1 deletion Sources/DistributedActorsTestKit/TestProbes.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import XCTest
internal enum ActorTestProbeCommand<M> {
case watchCommand(who: AddressableActorRef, file: String, line: UInt)
case unwatchCommand(who: AddressableActorRef)
case forwardCommand(send: () -> Void)
case stopCommand

case realMessage(message: M)
Expand Down Expand Up @@ -113,13 +114,17 @@ public class ActorTestProbe<Message: ActorMessage> {

// probe commands:
case .watchCommand(let who, let file, let line):
cell.deathWatch.watch(watchee: who, with: nil, myself: context.myself, parent: cell._parent, file: file, line: line)
cell.deathWatch.watch(watchee: who, with: nil, myself: context._downcastUnsafe, file: file, line: line)
return .same

case .unwatchCommand(let who):
cell.deathWatch.unwatch(watchee: who, myself: context.myself)
return .same

case .forwardCommand(let send):
send()
return .same

case .stopCommand:
return .stop
}
Expand Down Expand Up @@ -656,6 +661,17 @@ extension ActorTestProbe {
return watchee
}

/// Instructs the probe to send a message on our behalf, this is useful to enforce ordering e.g. when the probe has to perform a watch,
/// followed by a message send and we want to ensure that the watch has been processed -- we can do so by forwarding a message through
/// the probe, which ensures ordering between the watch being sent and the message sent just now.
///
/// Without this it may happen that we asked the probe to watch an actor, and send a message to the actor directly,
/// and our direct message arrives first, before the watch at the destination, causing potentially confusing behavior
/// in some very ordering delicate testing scenarios.
public func forward<Message>(_ message: Message, to target: ActorRef<Message>, file: String = #file, line: UInt = #line) where Message: Codable {
self.internalRef.tell(ProbeCommands.forwardCommand(send: { () in target.tell(message, file: file, line: line) }))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really necessary here, since watching a dead actor works as well, but I wanted to make sure to see the exact ordering I'm testing.

}

/// Instructs this probe to unwatch the passed in reference.
///
/// Note that unwatch MIGHT when used with testProbes since the probe may have already stored
Expand Down
37 changes: 37 additions & 0 deletions Tests/DistributedActorsTests/DeathWatchTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,43 @@ final class DeathWatchTests: ActorSystemXCTestCase {

struct TakePoisonError: Error {}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Watching child actors

func test_ensureOnlySingleTerminatedSignal_emittedByWatchedChildDies() throws {
let p: ActorTestProbe<Signals.Terminated> = self.testKit.spawnTestProbe()
let pp: ActorTestProbe<String> = self.testKit.spawnTestProbe()

let spawnSomeStoppers = Behavior<String>.setup { context in
let one: ActorRef<String> = try context.spawnWatch(
"stopper",
.receiveMessage { _ in
.stop
}
)
one.tell("stop")

return .same
}.receiveSignal { _, signal in switch signal {
case let terminated as Signals.Terminated:
p.tell(terminated)
default:
() // ok
}
pp.tell("\(signal)")
return .same // ignore the child death, remain alive
}

let _: ActorRef<String> = try system.spawn("parent", spawnSomeStoppers)

let terminated = try p.expectMessage()
terminated.address.path.shouldEqual(try! ActorPath._user.appending("parent").appending("stopper"))
terminated.existenceConfirmed.shouldBeTrue()
terminated.nodeTerminated.shouldBeFalse()
terminated.shouldBe(Signals.ChildTerminated.self)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which would have potentially failed and may have received Terminated before (!)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(or rather, the expect no message would have gotten it)

try p.expectNoMessage(for: .milliseconds(200))
}

// ==== ------------------------------------------------------------------------------------------------------------
// MARK: Watching dead letters ref

Expand Down
9 changes: 6 additions & 3 deletions Tests/DistributedActorsTests/InterceptorTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ final class TerminatedInterceptor<Message: ActorMessage>: Interceptor<Message> {
switch signal {
case let terminated as Signals.Terminated:
self.probe.tell(terminated) // we forward all termination signals to someone
case is Signals.PostStop:
() // ok
default:
fatalError("Other signal: \(signal)")
()
}
return try target.interpretSignal(context: context, signal: signal)
Expand Down Expand Up @@ -119,7 +122,7 @@ final class InterceptorTests: ActorSystemXCTestCase {

let spyOnTerminationSignals: Interceptor<String> = TerminatedInterceptor(probe: p)

let spawnSomeStoppers: Behavior<String> = .setup { context in
let spawnSomeStoppers = Behavior<String>.setup { context in
let one: ActorRef<String> = try context.spawnWatch(
"stopperOne",
.receiveMessage { _ in
Expand Down Expand Up @@ -149,7 +152,7 @@ final class InterceptorTests: ActorSystemXCTestCase {
// any additional messages
let terminated = try p.expectMessage()
(terminated.address.name == "stopperOne" || terminated.address.name == "stopperTwo").shouldBeTrue()
try p.expectNoMessage(for: .milliseconds(100))
try p.expectNoMessage(for: .milliseconds(500))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be extra sure

}

class SignalToStringInterceptor<Message: ActorMessage>: Interceptor<Message> {
Expand All @@ -165,7 +168,7 @@ final class InterceptorTests: ActorSystemXCTestCase {
}
}

func test_interceptor_shouldRemainWHenReturningStoppingWithPostStop() throws {
func test_interceptor_shouldRemainWhenReturningStoppingWithPostStop() throws {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was a typo

let p: ActorTestProbe<String> = self.testKit.spawnTestProbe()

let behavior: Behavior<String> = .receiveMessage { _ in
Expand Down