-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
=act #3623 unsubscribe when actors terminate, instead of isTerminated
#2011
=act #3623 unsubscribe when actors terminate, instead of isTerminated
#2011
Conversation
@@ -59,4 +61,13 @@ class EventStream(private val debug: Boolean = false) extends LoggingBus with Su | |||
if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from all channels")) | |||
} | |||
|
|||
def initTerminatedUnsubscriber(unsubscriber: ActorRef) { | |||
this.unsubscriber = Some(unsubscriber) | |||
// todo what do we do about already subscribed ones? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideas:
- make subscribers set accessible here and add to unsubscriber? not sure if needed, since init is run so early - during
_start
- don't care?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about changing unsubscriber to an AtomicReference[Either[Seq[ActorRef], ActorRef]
which starts out at Left(Nil)
, and here we swap it out against Right(unsubscriber)
, registering all the early-comers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds neat, will do!
I had some unanswered questions during impl, brought them up in comments. |
Investigating - was green locally, will check why failed on jenkins. |
PLS REBUILD/pr-validator-per-commit@7da363c |
(kitty-note-to-self: ignore 35196996) |
PLS REBUILD/pr-validator-per-commit@5adef0d // not sure why akka-ci didn't comment on passed build, was green: https://jenkins.akka.io:8498/view/PR%20Validators/job/pr-validator-per-commit/1903/ |
(kitty-note-to-self: ignore 35197449) |
Feedback addressed with ktoso@4a12449 . |
Updated |
@@ -59,4 +64,34 @@ class EventStream(private val debug: Boolean = false) extends LoggingBus with Su | |||
if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from all channels")) | |||
} | |||
|
|||
@tailrec final def initTerminatedUnsubscriber(unsubscriber: ActorRef): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method should be private[akka]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
LGTM after fixing the last nitpicks. Then awaiting team review (but this won’t make 2.3.0 anymore, slated for 2.3.1). |
val a1, a2 = TestProbe() | ||
val tm = new A | ||
|
||
val target = system.actorOf(Props(new Actor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a reason to use system
instead of sys
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should have been sys
, yes.
@@ -642,6 +643,10 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, | |||
guardian.stop() | |||
} | |||
|
|||
def startEventStreamTerminationWatcher(eventStream: EventStream) { | |||
systemActorOf(Props(classOf[EventStreamTerminatedUnsubscriber], eventStream, DebugEventStream), "eventStreamTerminatedUnsubscriber") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the name EventStreamTerminatedUnsubscriber
feels a bit clunky, would EventStreamUnsubscriber
be enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true, might have even given the wrong idea about when it does it's job.
EventStreamUnsubscriber
sounds good, changed!
} | ||
|
||
override def subscribe(subscriber: ActorRef, channel: Class[_]): Boolean = { | ||
if (subscriber eq null) throw new IllegalArgumentException("subscriber is null") | ||
if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "subscribing " + subscriber + " to channel " + channel)) | ||
registerWithUnsubscriber(subscriber) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we also unwatch when unsubscribe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, addressed in ktoso@8ead935#diff-a5669e25b3993c6bf5ae1e262bff557bR64
Over the weekend I worked on killing Addressed also similar issues that we had here, about when to stop watching the subscriber etc. I hope I didn't miss anything this time (hints from implementing EventStream certainly helped). If seems cool, I can make it part of this pr or make a separate one "just about ActorClassification". Q: Not sure if we can break |
Hmm, I have not fully reviewed the ActorClassification commit yet, but either way it would not be binary backwards compatible, so any such change cannot go into 2.3.1 but must wait for 2.4.0. It is probably best to create a separate pull request for that one. |
Roger that about separate PR. Squashed this one. |
|
||
def receive = { | ||
case Register(actor) ⇒ | ||
if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"watching $actor in order to unsubscribe from EventStream when it terminates")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there will be a debug message from DeathWatch saying that the actor is watched by unsubscriber.
Isn't it enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That message depends on akka.actor.debug.lifecycle
, which should be orthogonal to the setting used here (akka.actor.debug.eventstream
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, but I meant they are all debug messages at the end, and for that purpose one is probably enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But not all debug messages are created equal: the Akka philosophy is to enable them separately because just switching them all on is rarely useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, i see the point, I guess more is better than less here
@ktoso I’ll get back to this next week after 2.3.0 is out. |
2.3.0, right - was expecting just that, thanks! :-) |
* INTERNAL API | ||
*/ | ||
private[akka] def unregisterIfNoMoreSubscribedChannels(unsubscribeHadDiff: Boolean, subscriber: ActorRef, channel: Class[_]) { | ||
if (!unsubscribeHadDiff || !hasSubscriptions(subscriber)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to have a second opinion about this hasSubscriptions
. Is it to costly? Is it worth it? If we skip it the actor ref would remain until the subscriber terminates. //cc @rkuhn @bantonsson @drewhk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for keeping an eye out: you are right in that it has a potentially high cost. Not doing this means that we can leak state in the unsubscriber (if these actors do not terminate soon-ish); imagine a use-case where you keep starting actors which use the event bus only during initialization and which are long-running. Perhaps we need to do low-frequency pruning instead of doing it upon each unsubscription.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it would be a leak, but only some references to objects that already exists in the jvm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great that we re-reviewed it then. I'll work on an "pruning" approach.
What is the status here? Should we put the two related changes into one pull request and re-start reviewing? |
Yeah, I'd love to revive these. Let me bring them up to date with |
yes, sounds like a plan! |
if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"watching $actor in order to unsubscribe from EventStream when it terminates")) | ||
context watch actor | ||
|
||
case UnregisterIfNoMoreSubscribedChannels(actor) if eventStream.hasSubscriptions(actor) ⇒ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What kind of sorcery is this with an empty block? Should this guard be negated and moved to the case below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find those sometimes more readable than !event....
, but yeah could be made an if in the case
bellow... What do the others think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. Then maybe a comment like "// do nothing" after an arrow would be nice. Otherwise it may induce some head scratching after a month or two when someone gets back to this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the convention is to put a // do nothing
comment in the obvious place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okey, done :)
* Moved to removing actors proactively when they are terminated instead of checking `isTerminated` during publish. * Subscribers which have registered before initializing the unsubscriber will be aggregated in a Seq until one is registered and then it will take responsibility of unregistering them on termination. * Initialization of the unsubscriber can only be run once - attempting to initialize the event stream with another unsubscriber will fail, and init will return false. * Assumed having an init (mutable) method on the `EventBus` is fine, as it has such methods already and @patriknw's comment in the task for this. * since we must check if the subscriber has any subscribed channels left we had to expose this detail from SubchannelClassification via `hasSubscriptions`. Increases cost of ubsubscribe(actor, channel) a bit. * Evacuated the expensive `hasSubscription` call out of eventstream's `unsubscribe` call, and instead making the Unsubscriber check this before it stops watching an actor. If in the mean time the same actor got subscribed, there will be a new Subscribe message emited - so we're good on that side. Also, if it would terminate before the unsubscriber gets the Register message it will call `watch(actor)` on a dead actor, which results in getting Terminated for it, thus we'll stop watching it from the Unsubscriber as expected. Final squash and small cleanup. Please review again;
Closing and will re-open as merged req to remove isTerminated from both evenbus + eventstream |
Moved to removing actors proactively when they are terminated instead of checking
isTerminated
during publish.Assumed having an
init
method on theEventBus
is fine, as it is mutable already anyway, as @patriknw points out in the task.Related question: should this change be documented in the docs too? I figured that it's more of an internals change so maybe not?