Skip to content


Terminate StreamRef on node failure, #25960
Browse files Browse the repository at this point in the history
* manage AddressTerminated subscription in FunctionRef
* implementation can be compared with akka/actor/dungeon/DeathWatch.scala
  • Loading branch information
patriknw committed Jan 7, 2019
1 parent c8e6500 commit 8f2baad
Show file tree
Hide file tree
Showing 5 changed files with 396 additions and 17 deletions.
Expand Up @@ -61,8 +61,15 @@ class FunctionRefSpec extends AkkaSpec with ImplicitSender {
s ! GetForwarder(testActor)
val f = expectMsgType[FunctionRef]
forwarder.isWatching(f) should ===(true)
s ! DropForwarder(f)
expectMsg(Forwarded(Terminated(f)(true, false), f))

// Upon receiving the Terminated message, unwatch() must be called from a
// safe context (i.e. normally from the parent Actor).
forwarder.isWatching(f) should ===(true)
forwarder.isWatching(f) should ===(false)

"terminate when their parent terminates" in {
Expand All @@ -87,7 +94,7 @@ class FunctionRefSpec extends AkkaSpec with ImplicitSender {
"not registered" must {
"not be found" in {
val provider = system.asInstanceOf[ExtendedActorSystem].provider
val ref = new FunctionRef(testActor.path / "blabla", provider, system.eventStream, (x, y) ())
val ref = new FunctionRef(testActor.path / "blabla", provider, system, (x, y) ())
EventFilter[SerializationCheckFailedException](start = "Failed to serialize and deserialize message of type", occurrences = 1) intercept {
// needs to be something that fails when the deserialized form is not a FunctionRef
// this relies upon serialize-messages during tests
Expand Down
6 changes: 5 additions & 1 deletion akka-actor/src/main/mima-filters/2.5.19.backwards.excludes
Expand Up @@ -4,4 +4,8 @@ ProblemFilters.exclude[MissingClassProblem]("

# #25960 AddressTerminated in FunctionRef
145 changes: 131 additions & 14 deletions akka-actor/src/main/scala/akka/actor/ActorRef.scala
Expand Up @@ -5,19 +5,22 @@

import scala.collection.immutable

import akka.dispatch._
import akka.dispatch.sysmsg._
import java.lang.{ IllegalStateException, UnsupportedOperationException }

import akka.serialization.{ JavaSerializer, Serialization }
import akka.event.{ EventStream, Logging, MarkerLoggingAdapter }

import scala.annotation.tailrec
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReference

import scala.util.control.NonFatal

import akka.event.AddressTerminatedTopic
import akka.util.OptionVal

object ActorRef {

Expand Down Expand Up @@ -708,18 +711,24 @@ private[akka] class VirtualPathContainer(
private[akka] final class FunctionRef(
override val path: ActorPath,
override val provider: ActorRefProvider,
val eventStream: EventStream,
system: ActorSystem,
f: (ActorRef, Any) Unit) extends MinimalActorRef {

override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = {
f(sender, message)
message match {
case AddressTerminated(address) addressTerminated(address)
case _ f(sender, message)

override def sendSystemMessage(message: SystemMessage): Unit = {
message match {
case w: Watch addWatcher(w.watchee, w.watcher)
case u: Unwatch remWatcher(u.watchee, u.watcher)
case DeathWatchNotification(actorRef, _, _)
// Upon receiving the Terminated message, unwatch() must be called from a
// safe context (i.e. normally from the parent Actor).
// That is the reason for nor unwatching here.
this.!(Terminated(actorRef)(existenceConfirmed = true, addressTerminated = false))(actorRef)
case _ //ignore all other messages
Expand All @@ -728,10 +737,13 @@ private[akka] final class FunctionRef(
private[this] var watching = ActorCell.emptyActorRefSet
private[this] val _watchedBy = new AtomicReference[Set[ActorRef]](ActorCell.emptyActorRefSet)

override def isTerminated = _watchedBy.get() == null
override def isTerminated: Boolean = _watchedBy.get() == null

//noinspection EmptyCheck
protected def sendTerminated(): Unit = {
def unwatchWatched(watched: ActorRef): Unit =
watched.asInstanceOf[InternalActorRef].sendSystemMessage(Unwatch(watched, this))

val watchedBy = _watchedBy.getAndSet(null)
if (watchedBy != null) {
if (watchedBy.nonEmpty) {
Expand All @@ -743,18 +755,39 @@ private[akka] final class FunctionRef(
watching = Set.empty

private def sendTerminated(ifLocal: Boolean)(watcher: ActorRef): Unit =
if (watcher.asInstanceOf[ActorRefScope].isLocal == ifLocal)
watcher.asInstanceOf[InternalActorRef].sendSystemMessage(DeathWatchNotification(this, existenceConfirmed = true, addressTerminated = false))

private def unwatchWatched(watched: ActorRef): Unit =
watched.asInstanceOf[InternalActorRef].sendSystemMessage(Unwatch(watched, this))
@tailrec private def addressTerminated(address: Address): Unit = {
// cleanup watchedBy since we know they are dead
_watchedBy.get() match {
case null // terminated
case watchedBy
val watchingBefore = watching
val watchByBefore = watchedBy
var watchedByAfter = watchedBy
for (a watchedBy; if a.path.address == address)
watchedByAfter -= a
if (_watchedBy.compareAndSet(watchedBy, watchedByAfter)) {
val watchingAfter = watching
val watchByAfter = watchedByOrEmpty
updateAddressTerminatedSubscription(OptionVal.None, watchingBefore, watchingAfter, watchByBefore, watchByAfter)
// send DeathWatchNotification to self for all matching subjects
for (a watching; if a.path.address == address) {
this.sendSystemMessage(DeathWatchNotification(a, existenceConfirmed = false, addressTerminated = true))
} else
addressTerminated(address) // try again

override def stop(): Unit = sendTerminated()

@tailrec private def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit =
@tailrec private def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = {
_watchedBy.get() match {
case null
sendTerminated(ifLocal = true)(watcher)
Expand All @@ -765,15 +798,24 @@ private[akka] final class FunctionRef(
val watcherSelf = watcher == this

if (watcheeSelf && !watcherSelf) {
if (!watchedBy.contains(watcher))
if (!_watchedBy.compareAndSet(watchedBy, watchedBy + watcher))
if (!watchedBy.contains(watcher)) {
val watchingBefore = watching
val watchByBefore = watchedBy
if (_watchedBy.compareAndSet(watchedBy, watchedBy + watcher)) {
val watchingAfter = watching
val watchByAfter = watchedByOrEmpty
updateAddressTerminatedSubscription(OptionVal.Some(watcher), watchingBefore, watchingAfter,
watchByBefore, watchByAfter)
} else
addWatcher(watchee, watcher) // try again
} else if (!watcheeSelf && watcherSelf) {
publish(Logging.Warning(path.toString, classOf[FunctionRef], s"externally triggered watch from $watcher to $watchee is illegal on FunctionRef"))
} else {
publish(Logging.Error(path.toString, classOf[FunctionRef], s"BUG: illegal Watch($watchee,$watcher) for $this"))

@tailrec private def remWatcher(watchee: ActorRef, watcher: ActorRef): Unit = {
_watchedBy.get() match {
Expand All @@ -783,9 +825,17 @@ private[akka] final class FunctionRef(
val watcherSelf = watcher == this

if (watcheeSelf && !watcherSelf) {
if (watchedBy.contains(watcher))
if (!_watchedBy.compareAndSet(watchedBy, watchedBy - watcher))
if (watchedBy.contains(watcher)) {
val watchingBefore = watching
val watchByBefore = watchedBy
if (_watchedBy.compareAndSet(watchedBy, watchedBy - watcher)) {
val watchingAfter = watching
val watchByAfter = watchedByOrEmpty
updateAddressTerminatedSubscription(OptionVal.Some(watcher), watchingBefore, watchingAfter,
watchByBefore, watchByAfter)
} else
remWatcher(watchee, watcher) // try again
} else if (!watcheeSelf && watcherSelf) {
publish(Logging.Warning(path.toString, classOf[FunctionRef], s"externally triggered unwatch from $watcher to $watchee is illegal on FunctionRef"))
} else {
Expand All @@ -794,7 +844,7 @@ private[akka] final class FunctionRef(

private def publish(e: Logging.LogEvent): Unit = try eventStream.publish(e) catch { case NonFatal(_) }
private def publish(e: Logging.LogEvent): Unit = try system.eventStream.publish(e) catch { case NonFatal(_) }

* Have this FunctionRef watch the given Actor. This method must not be
Expand All @@ -805,7 +855,9 @@ private[akka] final class FunctionRef(
* safe context (i.e. normally from the parent Actor).
def watch(actorRef: ActorRef): Unit = {
watching += actorRef
maintainAddressTerminatedSubscription(actorRef) {
watching += actorRef
actorRef.asInstanceOf[InternalActorRef].sendSystemMessage(Watch(actorRef.asInstanceOf[InternalActorRef], this))

Expand All @@ -815,7 +867,9 @@ private[akka] final class FunctionRef(
* its parent Actor.
def unwatch(actorRef: ActorRef): Unit = {
watching -= actorRef
maintainAddressTerminatedSubscription(actorRef) {
watching -= actorRef
actorRef.asInstanceOf[InternalActorRef].sendSystemMessage(Unwatch(actorRef.asInstanceOf[InternalActorRef], this))

Expand All @@ -825,4 +879,67 @@ private[akka] final class FunctionRef(
* only be called by its parent Actor.
def isWatching(actorRef: ActorRef): Boolean = watching.contains(actorRef)

private def watchedByOrEmpty: Set[ActorRef] =
_watchedBy.get match {
case null ActorCell.emptyActorRefSet // terminated
case watchedBy watchedBy

* Starts subscription to AddressTerminated if not already subscribing and the
* block adds a non-local ref to watching or watchedBy.
* Ends subscription to AddressTerminated if subscribing and the
* block removes the last non-local ref from watching and watchedBy.
private def maintainAddressTerminatedSubscription[T](change: ActorRef)(block: T): T = {
if (isNonLocal(change)) {
val watchingBefore = watching
val watchedByBefore = watchedByOrEmpty

val result = block

val watchingAfter = watching
val watchedByAfter = watchedByOrEmpty

updateAddressTerminatedSubscription(OptionVal.Some(change), watchingBefore, watchingAfter, watchedByBefore, watchedByAfter)

} else {

* Starts subscription to AddressTerminated if not already subscribing and the
* change in the before/after sets adds a non-local ref to watching or watchedBy.
* Ends subscription to AddressTerminated if subscribing and the
* change in the before/after sets removes the last non-local ref from watching and watchedBy.
private def updateAddressTerminatedSubscription(
change: OptionVal[ActorRef],
watchingBefore: Set[ActorRef],
watchingAfter: Set[ActorRef],
watchedByBefore: Set[ActorRef],
watchedByAfter: Set[ActorRef]): Unit = {

change match {
case OptionVal.Some(ref) if !isNonLocal(ref) // update not needed
case _
val had = (watchingBefore exists isNonLocal) || (watchedByBefore exists isNonLocal)
val has = (watchingAfter exists isNonLocal) || (watchedByAfter exists isNonLocal)
if (had && !has) unsubscribeAddressTerminated()
else if (!had && has) subscribeAddressTerminated()

private def isNonLocal(ref: ActorRef) = ref match {
case null true
case a: InternalActorRef if !a.isLocal true
case _ false

private def unsubscribeAddressTerminated(): Unit = AddressTerminatedTopic(system).unsubscribe(this)

private def subscribeAddressTerminated(): Unit = AddressTerminatedTopic(system).subscribe(this)
Expand Up @@ -64,7 +64,7 @@ private[akka] trait Children { this: ActorCell ⇒
val r = randomName(new java.lang.StringBuilder("$$"))
val n = if (name != "") s"$r-$name" else r
val childPath = new ChildActorPath(self.path, n, ActorCell.newUid())
val ref = new FunctionRef(childPath, provider, system.eventStream, f)
val ref = new FunctionRef(childPath, provider, system, f)

@tailrec def rec(): Unit = {
val old = functionRefs
Expand Down

0 comments on commit 8f2baad

Please sign in to comment.