Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #2020 from akka/wip-3882-sharding-watch-after-reco…

…very-patriknw

=con #3882 Defer watch in ClusterSharding until after recovery
  • Loading branch information...
commit e3a7138991136bee960811425775d749e2ecbc20 2 parents 1e3f8b7 + 5d2761b
@patriknw patriknw authored
View
14 akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala
@@ -1082,6 +1082,8 @@ object ShardCoordinator {
*/
private case class RebalanceDone(shard: ShardId, ok: Boolean)
+ private case object AfterRecover
+
/**
* INTERNAL API. Rebalancing process is performed by this actor.
* It sends [[BeginHandOff]] to all `ShardRegion` actors followed by
@@ -1141,6 +1143,9 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite
val rebalanceTask = context.system.scheduler.schedule(rebalanceInterval, rebalanceInterval, self, RebalanceTick)
val snapshotTask = context.system.scheduler.schedule(snapshotInterval, snapshotInterval, self, SnapshotTick)
+ // this will be stashed and received when the recovery is completed
+ self ! AfterRecover
+
override def postStop(): Unit = {
super.postStop()
rebalanceTask.cancel()
@@ -1152,13 +1157,10 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite
context.watch(region)
persistentState = persistentState.updated(evt)
case ShardRegionProxyRegistered(proxy)
- context.watch(proxy)
persistentState = persistentState.updated(evt)
case ShardRegionTerminated(region)
- context.unwatch(region)
persistentState = persistentState.updated(evt)
case ShardRegionProxyTerminated(proxy)
- context.unwatch(proxy)
persistentState = persistentState.updated(evt)
case _: ShardHomeAllocated
persistentState = persistentState.updated(evt)
@@ -1168,8 +1170,6 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite
case SnapshotOffer(_, state: State)
persistentState = state
- persistentState.regionProxies.foreach(context.watch)
- persistentState.regions.foreach { case (a, _) context.watch(a) }
}
override def receiveCommand: Receive = {
@@ -1252,6 +1252,10 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite
case SaveSnapshotFailure(_, reason)
log.warning("Persistent snapshot failure: {}", reason.getMessage)
+ case AfterRecover
+ persistentState.regionProxies.foreach(context.watch)
+ persistentState.regions.foreach { case (a, _) context.watch(a) }
+
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.