Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Enforcing ordering of Terminated wrt remote/local #2835 #997

Merged
merged 2 commits into from

5 participants

@drewhk
Owner

Due to the undefined ordering between Terminated messages it was possible for the RemoteDaemon to shut down and proceed with shutting down the Remoting before a Terminated was sent to remote watchers -- causing the message to be dropped.

I am not sure this is the best way to distinguish between local/remote ActorRefs, but the bug is confirmed to be fixed. Also, some feedback how this might affect cluster deployed actors is welcome.

...or/src/main/scala/akka/actor/dungeon/DeathWatch.scala
((11 lines not shown))
}
+
+ val (remote, local) = watchedBy.partition(_.path.address.host.isDefined)
+
+ remote foreach sendTerminated
+ local foreach sendTerminated
@viktorklang Owner

Can we come up with a more efficient solution?

@drewhk Owner
drewhk added a note

Two loops is more efficient, but uglier as well
or we can keep the watchedBy collection sorted
or we can have to watchedBy collections.
or any more ideas you have

@viktorklang Owner

Since the above solution is O(2*N) anyway, there's no need for creating an intermediate collection.

def sendTerminated(ifLocal: Boolean)(watcher: ActorRef): Unit = 
  watcher match {
    case a: ActorRefScope if a.isLocal == ifLocal =>
      try watcher.tell(terminated, self) catch {
        case NonFatal(t) ⇒ publish(Error(t, self.path.toString, clazz(actor), "deathwatch"))
      }
    case _ => // N/A
  }

watchedBy foreach sendTerminated(ifLocal = false)
watchedBy foreach sendTerminated(ifLocal = true)
@drewhk Owner
drewhk added a note

That's the two loops approach, but much prettier than I thought. I like it.

@drewhk Owner
drewhk added a note

Is it safe to ignore ActorRefs without ActorRefScope?

@viktorklang Owner

@drewhk Yes, it should be: private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRef { this: ActorRefScope ⇒

However, you can add ERROR logging there to make sure.

@patriknw Owner
patriknw added a note

I think that is good
do we have any ActorRef without ActorRefScope? should it be silently ignored?

@patriknw Owner
patriknw added a note

or just remove the case, if it should never happen

@drewhk Owner
drewhk added a note

How does logging work from here? I am inside DeathWatch (and ActorCell via self type)

@viktorklang Owner

Did you see my example code above?

@drewhk Owner
drewhk added a note

I saw publish, but I was not sure if it applies to my case.

@viktorklang Owner

Why wouldn't it?

protected final def publish(e: LogEvent): Unit = try system.eventStream.publish(e) catch { case NonFatal(_) ⇒ }
@patriknw Owner
patriknw added a note

I would let it blow up instead (remove the case _)

@viktorklang Owner

The irony is that it will blow up if the default case is removed, since the local-check is done in the guard. So if you remove the default, you haveto make the guard into an if-expression in the closure body instead.

@drewhk Owner
drewhk added a note

I noticed that already. I just added another case statement without the guard but with matching on ActorRefScope. Of course I can move the guard inside the case statement as well. Which one do you prefer?

@viktorklang Owner

perhaps just scrap the match all together and just do:

if (watcher.asInstanceOf[ActorRefScope].isLocal == ifLocal)
  watcher.tell(terminated, self)  
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
akka-actor/src/main/scala/akka/actor/Address.scala
@@ -19,6 +19,7 @@ import scala.collection.immutable
*/
@SerialVersionUID(1L)
final case class Address private (protocol: String, system: String, host: Option[String], port: Option[Int]) {
+ // Please note that local/non-local distinction must be preserved: host.isDefined == !isLocal
@patriknw Owner
patriknw added a note

perhaps add a

def isLocal: Boolean = host.isEmpty
@drewhk Owner
drewhk added a note

Do we need this comment anymore if we use ActorRefScope wich Viktor proposed?

@rkuhn Owner
rkuhn added a note

I am at least half-certain that Addresses may also be interrogated for local-ness, and there ActorRefScope does not help; I’d like to keep this comment; and we should also add an isLocal method which encodes this convention.

@drewhk Owner
drewhk added a note

But if the Address is the same as our listen address then isLocal should return true.

@rkuhn Owner
rkuhn added a note

no, Address has nothing to do with the remoting implementation: the address either specifies host/port, in which case it is non-local, or it does not. Address.isLocal only means that this Address cannot be migrated to other systems or nodes without additional information.

@drewhk Owner
drewhk added a note

Maybe we should use a different name for that function.
isParial? migratable?
WDYT?

@rkuhn Owner
rkuhn added a note
@drewhk Owner
drewhk added a note

locallyDefined? totallyDefined?

@rkuhn Owner
rkuhn added a note

no, it’s really about the scope of the address, not unlike IPv6 addresses (host-only, link-local, global); it is basically the realm within which this address is supposed to be unique; hence I think there should be both

def hasLocalScope = host.isEmpty
def hasGlobalScope = host.isDefined
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@patriknw patriknw commented on the diff
...or/src/main/scala/akka/actor/dungeon/DeathWatch.scala
@@ -51,12 +51,14 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
if (!watchedBy.isEmpty) {
val terminated = Terminated(self)(existenceConfirmed = true, addressTerminated = false)
try {
- watchedBy foreach {
- watcher ⇒
- try watcher.tell(terminated, self) catch {
@patriknw Owner
patriknw added a note

why do we have the try-catch around tell?

@drewhk Owner
drewhk added a note

My intuition would be that not all ActorRef's are actual actors, so we might get exceptions when invoking tell. But I am not a dungeon-dweller :)

@viktorklang Owner

I think the try-catch is an artifact from the past where tell was allowed to throw exceptions. However, if it does throw an exception, if we don't catch and continue, the system is broken.

@patriknw Owner
patriknw added a note

but we rely on that tell should not throw exception in other places, to avoid sprinkling this all over the place

@viktorklang Owner

That's true, let's remove and pray :-)

@drewhk Owner
drewhk added a note

Could sendSystemMessage throw an exception? Because there is a similar catch construct just one method down.

@viktorklang Owner

Not likely, but lets open a ticket about fixing all such occurences separately.

@rkuhn Owner
rkuhn added a note

+1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@patriknw
Owner

LGTM after some optimization

@drewhk
Owner

Updated according to review comments.

@viktorklang
Owner

+1!

@drewhk
Owner

Umm, looking again, I should probably add a comment why the ordering is important.

@akka-ci
Owner

Started jenkins job akka-pr-validator at https://jenkins.akka.io/job/akka-pr-validator/274/

@akka-ci
Owner

jenkins job akka-pr-validator: Success - https://jenkins.akka.io/job/akka-pr-validator/274/

@drewhk
Owner

Added comment describing why the specific ordering is needed, and squashed commits.

@viktorklang
Owner

:+1:

@drewhk
Owner

PLS REBUILD ALL

@drewhk
Owner

@rkuhn Can you look at this one before I merge?

@rkuhn
Owner

LGTM

@drewhk
Owner

Updated: added scope queries to Address

@rkuhn
Owner

thanks, merge!

@drewhk drewhk merged commit cdd86cb into akka:master
@akka-ci
Owner

Started jenkins job akka-pr-validator at https://jenkins.akka.io/job/akka-pr-validator/279/

@akka-ci
Owner

jenkins job akka-pr-validator: Failed - https://jenkins.akka.io/job/akka-pr-validator/279/

@drewhk
Owner
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jan 4, 2013
  1. Enforcing ordering of Terminated wrt remote/local #2835

    Endre Sándor Varga authored
  2. Added scope query methods to Address.

    Endre Sándor Varga authored
This page is out of date. Refresh to see the latest.
View
17 akka-actor/src/main/scala/akka/actor/Address.scala
@@ -19,11 +19,28 @@ import scala.collection.immutable
*/
@SerialVersionUID(1L)
final case class Address private (protocol: String, system: String, host: Option[String], port: Option[Int]) {
+ // Please note that local/non-local distinction must be preserved:
+ // host.isDefined == hasGlobalScope
+ // host.isEmpty == hasLocalScope
+ // hasLocalScope == !hasGlobalScope
def this(protocol: String, system: String) = this(protocol, system, None, None)
def this(protocol: String, system: String, host: String, port: Int) = this(protocol, system, Option(host), Some(port))
/**
+ * Returns true if this Address is only defined locally. It is not safe to send locally scoped addresses to remote
+ * hosts. See also [[akka.actor.Address#hasGlobalScope]].
+ */
+ def hasLocalScope: Boolean = host.isEmpty
+
+ /**
+ * Returns true if this Address is usable globally. Unlike locally defined addresses ([[akka.actor.Address#hasLocalScope]])
+ * addresses of global scope are safe to sent to other hosts, as they globally and uniquely identify an addressable
+ * entity.
+ */
+ def hasGlobalScope: Boolean = host.isDefined
+
+ /**
* Returns the canonical String representation of this Address formatted as:
*
* <protocol>://<system>@<host>:<port>
View
26 akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala
@@ -4,7 +4,7 @@
package akka.actor.dungeon
-import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorCell, Actor, Address, AddressTerminated }
+import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorRefScope, ActorCell, Actor, Address, AddressTerminated }
import akka.dispatch.{ Watch, Unwatch }
import akka.event.Logging.{ Warning, Error, Debug }
import scala.util.control.NonFatal
@@ -51,12 +51,24 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
if (!watchedBy.isEmpty) {
val terminated = Terminated(self)(existenceConfirmed = true, addressTerminated = false)
try {
- watchedBy foreach {
- watcher
- try watcher.tell(terminated, self) catch {
@patriknw Owner
patriknw added a note

why do we have the try-catch around tell?

@drewhk Owner
drewhk added a note

My intuition would be that not all ActorRef's are actual actors, so we might get exceptions when invoking tell. But I am not a dungeon-dweller :)

@viktorklang Owner

I think the try-catch is an artifact from the past where tell was allowed to throw exceptions. However, if it does throw an exception, if we don't catch and continue, the system is broken.

@patriknw Owner
patriknw added a note

but we rely on that tell should not throw exception in other places, to avoid sprinkling this all over the place

@viktorklang Owner

That's true, let's remove and pray :-)

@drewhk Owner
drewhk added a note

Could sendSystemMessage throw an exception? Because there is a similar catch construct just one method down.

@viktorklang Owner

Not likely, but lets open a ticket about fixing all such occurences separately.

@rkuhn Owner
rkuhn added a note

+1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
- case NonFatal(t) publish(Error(t, self.path.toString, clazz(actor), "deathwatch"))
- }
- }
+ def sendTerminated(ifLocal: Boolean)(watcher: ActorRef): Unit =
+ if (watcher.asInstanceOf[ActorRefScope].isLocal == ifLocal) watcher.tell(terminated, self)
+
+ /*
+ * It is important to notify the remote watchers first, otherwise RemoteDaemon might shut down, causing
+ * the remoting to shut down as well. At this point Terminated messages to remote watchers are no longer
+ * deliverable.
+ *
+ * The problematic case is:
+ * 1. Terminated is sent to RemoteDaemon
+ * 1a. RemoteDaemon is fast enough to notify the terminator actor in RemoteActorRefProvider
+ * 1b. The terminator is fast enough to enqueue the shutdown command in the remoting
+ * 2. Only at this point is the Terminated (to be sent remotely) enqueued in the mailbox of remoting
+ *
+ * If the remote watchers are notified first, then the mailbox of the Remoting will guarantee the correct order.
+ */
+ watchedBy foreach sendTerminated(ifLocal = false)
+ watchedBy foreach sendTerminated(ifLocal = true)
} finally watchedBy = ActorCell.emptyActorRefSet
}
}
View
6 akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala
@@ -190,7 +190,11 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
private def finishTerminate() {
val a = actor
- // The following order is crucial for things to work properly. Only chnage this if you're very confident and lucky.
+ /* The following order is crucial for things to work properly. Only change this if you're very confident and lucky.
+ *
+ * Please note that if a parent is also a watcher then ChildTerminated and Terminated must be processed in this
+ * specific order.
+ */
try if (a ne null) a.postStop()
finally try dispatcher.detach(this)
finally try parent.sendSystemMessage(ChildTerminated(self))
Something went wrong with that request. Please try again.