Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

!act,doc #3893 Removed isTerminated checks from ActorClassification #2037

Conversation

ktoso
Copy link
Member

@ktoso ktoso commented Feb 24, 2014

Instead of isTerminated we now use death watch on subscribers in ActorClassification

This PR is a follow up from #2011 (comment) where isTerminated has been replaced with death watching actors.

Was suggested to be pulled in for 2.4, since it's breaking API (ActorClassification now requires an ActorSystem, and it's required to stop the system actor ("unwatcher"), when done using such an event bus).
Or can be rewritten to use init method based approach instead.

@ktoso ktoso added the tested label Feb 25, 2014
@@ -314,9 +335,10 @@ trait ActorClassification { this: ActorEventBus with ActorClassifier ⇒
val removed = v - monitor
if (removed eq raw) false
else if (removed.isEmpty) {
if (!mappings.remove(monitored, v)) dissociate(monitored, monitor) else true
unregisterFromUnsubscriber(monitor)
mappings.remove(monitored, v) || dissociate(monitored, monitor)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this is not free from races: the removal happens after the insertion allright, but there is no relationship between the corresponding messages to the unsubscriber, meaning that it could get them out of order.

One fix would be to make sure to only send one notification in any case and keep track of “odd” requests in the unsubscriber to correct the ordering after the fact.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I see hm... Embarrased to have pull requested another race - will think about it, thanks a lot for checking!

A quick idea I had right now was to keep some "subscription identifier", to guarantee we unsubscribe exactly that subscription we expected to unsubscribe (in unsubscriber)... Will think more in detail about this - boarding a plane right now.

// I'm running around airports today and showing a friend around kraków, will give this some thought, but won't be today -- sorry

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries, this stuff is not trivial! (Otherwise you wouldn’t have this ticket to work on right now ;-) )

@ktoso ktoso added tested and removed tested labels Mar 1, 2014
private val empty = TreeSet.empty[ActorRef]
private val mappings = new ConcurrentHashMap[ActorRef, TreeSet[ActorRef]](mapSize)

private[akka] lazy val unsubscriber = ActorClassificationUnsubscriber(system).newUnsubscriber(this)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extension not needed here, since we are in the akka code base, simply cast it to ExtendedActorSystem to create the system actor

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extension removed, had to cast to Impl for now, as exposing systemActorOf will be exposed once #2052 is merged :)

@patriknw
Copy link
Member

patriknw commented Mar 4, 2014

LGTM, after some minor adjustments

@ktoso ktoso added reviewed and removed tested labels Mar 4, 2014
@ktoso
Copy link
Member Author

ktoso commented Mar 4, 2014

#2052 was merged so updated this PS with casting to ExtendedActorSystem instead of ActorSystemImpl. Ready to roll :shipit:

@ktoso ktoso added tested and removed tested labels Mar 4, 2014
else {
val added = v + monitor
if (!mappings.replace(monitored, v, added)) associate(monitored, monitor)
else if (monitored.isTerminated) !dissociate(monitored, monitor) else true
else {
registerWithUnsubscriber(monitor)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a race here: the subscriptionActionSeqNr is not used/updated together with the CAS update of mappings

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argh! This is getting really complicated... will try to sketch another impl like we discussed,

@ktoso ktoso removed the tested label Mar 9, 2014
@ktoso
Copy link
Member Author

ktoso commented Mar 9, 2014

I've had an idea to work around the problems we found using an versioned container, that would be compare and swapped.

The last commit contains rewrite so we don't do two atomic things (which was the reason for the race we found before). We now CAS on a "versioned container", which is safe because there can be no race between bumping the seqNumber and putting stuff into mappings. Since this makes us not use concurrent hash map, I checked the performance implications:

I've "stress" tested it a bit, and the performance loss is quite big. Test case is this naive thing:

"subscribe / unsubscribe as many times as possible" in {
    val s1, a1 = createSubscriber(system.deadLetters)
    val bus = createNewEventBus()
    val stop = System.currentTimeMillis() + 5.seconds.toMillis

    def test(): Long = {
      var count = 0L
      while (System.currentTimeMillis < stop) {
        var i = 100
        while (i > 0) {
          bus.subscribe(s1, a1)
          bus.unsubscribe(s1, a1)
          count += 1
          i -= 1
        }
      }
      info(s"${Thread.currentThread.getName} -- subscribe/unsubscribe called: $count times")
      count
    }

    val run = new Runnable {
      def run() = test()
    }
    val t1, t2, t3 = new Thread(run)

    t1.start()
    t2.start()
    t3.start()

    t1.join()
    t2.join()
    t3.join()

    bus.subscribe(s1, a1) should be(true)

    disposeSubscriber(system, s1)
    disposeSubscriber(system, a1)
  }

Using only concurrenthashmap yet with relying on isTerminated (before this PR) we get:

[info]   + Thread-14 -- subscribe/unsubscribe called: 4223300 times
[info]   + Thread-13 -- subscribe/unsubscribe called: 4395200 times
[info]   + Thread-12 -- subscribe/unsubscribe called: 4221200 times 
// ~1688480 ops/s

Using the last commit in this PR I'm getting:

[info]   + Thread-10 -- subscribe/unsubscribe called: 22100 times
[info]   + Thread-11 -- subscribe/unsubscribe called: 22400 times
[info]   + Thread-9 -- subscribe/unsubscribe called: 22000 times
// 8800 ops/s

The loss is huge I guess... On the other hand I would have assumed less subscribe calls than publish calls, but unsubscribing everyone during a systems shutdown perhaps would have a problem due to this perf loss.

Looking forward to your opinions, but I guess that's not fast enough. Which leaves me with the idea of the "unsubscriber actor looks at insides of the event bus, and updates it's watched list". There I'll have to keep the list of subscribers in the actor, because I don't want to reach directly into deathwatch's list, and I may need to unwatch actors that have unsubscribed. So it'll be more space usage - in the unsubscriber to be precise.

This seemingly simple issue is getting very tricky indeed 💭 !

@patriknw
Copy link
Member

have you checked if the performance drop is due to contention? what is the difference with 1 thread?

how does the gc pressure look like? the unsubscriber actor might fall behind, which you don't measure? what results do you see when you remove the the send to the unsubscriber?

how complicated is the background pruning approach?

@ktoso ktoso added the tested label Mar 10, 2014
@ktoso
Copy link
Member Author

ktoso commented Mar 11, 2014

Very good hints, thanks a lot @patriknw!
After poking around it for a while I think I found some interesting things.

I don't think it's thread contention. Same perf on 1 thread (20k - 30k).

Biggest fail I found was that the messages sent to the unsubscriber were serialized - because it seems system.settings.SerializeAllMessages is set to true in tests. I noticed this when commenting out the unsubscriber ! ... lines, which bumped perf from ~30k to millions/s again:

disclaimer: I've been running those many times, pasted "representative" examples. Order of magnitude only matters anyway.

cas, initial
[info]   + Thread-17 -- subscribe/unsubscribe called: 33,200 times
[info]   + Thread-16 -- subscribe/unsubscribe called: 33,500 times
[info]   + Thread-15 -- subscribe/unsubscribe called: 33,400 times

cas, commented out message sending
[info]   + Thread-33 -- subscribe/unsubscribe called: 2,660,800 times
[info]   + Thread-34 -- subscribe/unsubscribe called: 2,578,000 times
[info]   + Thread-35 -- subscribe/unsubscribe called: 2,632,600 times

Which got me thinking, and then I found that all msgs are being serialized (above setting). Serialization is still "sending thread", so we're blocking way longer during the ! than expected.

Since having this setting on seems to be suggested only for tests: http://doc.akka.io/docs/akka/snapshot/java/serialization.html#verification I've then run the "naive perf test" again with serialization disabled: akka.actor.serialize-messages = off, getting numbers around these:

cas, no serialization of msgs
[info]   + Thread-14 -- subscribe/unsubscribe called: 986,800 times
[info]   + Thread-12 -- subscribe/unsubscribe called: 983,100 times
[info]   + Thread-13 -- subscribe/unsubscribe called: 978,000 times

Then I noticed ("wat" moment) I was still emitting debug events from the bus when running the above measurements - rookie error... :~ Without debug events, but with serialization I get:

cas, no serialization, no debug events - so, actual "prod" situation
[info]   + Thread-45 -- subscribe/unsubscribe called: 1,335,000 times
[info]   + Thread-43 -- subscribe/unsubscribe called: 1,346,100 times
[info]   + Thread-44 -- subscribe/unsubscribe called: 1,349,600 times

So 539.840 ops/s instead of the previous impl which was around 1.688.480 ops/s (where op = subscribe | unsubscribe).

The original (with isTerminated) impl is able to go around:

original impl
[info]   + Thread-30 -- subscribe/unsubscribe called: 4,466,900 times
[info]   + Thread-29 -- subscribe/unsubscribe called: 4,531,800 times
[info]   + Thread-28 -- subscribe/unsubscribe called: 4,731,700 times

Since in prod serialization of all messages is not encouraged anyway, and same goes for debugging the eventbus, I'd assume this impl is way better off than it looked initially. If we think that's reasonable perf to go in, I'll merge in minor details (unsubscriber can be val instead of lazy val etc) - if not, there's still the "pruning" idea to go (which after thinking a lot about it starts to feel like an "cop-out" option hm...).

TODO: 1) I did not measure if the unsubscriber is falling behind; 2) If we'd expose an internal API for the unsubscriber, we could spare one message send per unsubscribe "because actor terminated", but I believe we talked that thats a minor thing for the time being.

@patriknw
Copy link
Member

That's great @ktoso, yeah serialization is only enabled in tests to verify correctness and should be off for this kind of test. We have all done this mistake.

Instead of isTerminated we now use death watch on subscribers.

! Breaking change - ActorClassification based event buses now require
  and actor system. Previously no actors were involved, but now someone
  has to `watch` the subscribers. The unsubscriber is an system actor,
  and won't be stopped automagically if a bus stops to be used (hard to
  determine what "stops being used" is)
* Replaced isTerminated checks with watching actors
* backing structure for ActorClassification swaped from
  ConcurrentHashMap to immutable.Map with CAS operations on it. This is
  required to avoid races and guarantee register/unregister ordering
  (messages sent with proper sequence numbers) to the unsubscriber.
  Performance tested it and still above 1.3million subscribe+unsubscribe
  ops per second (mac i7, retina), where as the CHM version was
  4 million - but that one could only work in the presence of
  itTerminated - so we pay the price here for removing it.
* `ActorClassification` starts the unsubscriber instance by itself,
  the unsubscriber is an system actor, and can be stopped via
  `ActorClassification#shutdown`
* Will unregister from unsubscriber, when no more subscriptions for
  given subscriber are left in this bus.
* Added missing "Java API: " for some types
* Updated docs to point out the automatic subscriber purging (on terminated)
@ktoso
Copy link
Member Author

ktoso commented Apr 16, 2014

Closing and will reopen as merged pull req together with: #2011 as requested by @patriknw

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs-attention Indicates a PR validation failure (set by CI infrastructure) reviewed Ready to merge from review perspetive, but awaiting some additional action (e.g. order of merges)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants