Skip to content

Commit

Permalink
Fix singleton issue when leaving several nodes, #27487 (#27488)
Browse files Browse the repository at this point in the history
* Fix singleton issue when leaving several nodes, #27487

* When leaving several nodes at about the same time the new singleton
  could be started before previous had been completely stopped.
* Found two possible ways this could happen.
  * Acting on MemberRemoved that is emitted when the self
    cluster node is shutting down.
  * The HandOverDone confirmation when in Younger state,
    but that node is also Leaving so could be seen as Exiting
    from a third node that is the next singleton.

* keep track of all previous oldest, not only the latest

* Option => List
* Otherwise in BecomingOldest it could transition to Oldest
  when the previous oldest was removed even though the previous-previous wasn't removed yet

* fix failure in ClusterSingletonRestart2Spec

* OldestChanged was not emitted when Exiting member was removed
* The initial membersByAge must also contain Leaving, Exiting members

(cherry picked from commit ee18856)
  • Loading branch information
patriknw committed Aug 19, 2019
1 parent c97e966 commit ddb0852
Show file tree
Hide file tree
Showing 3 changed files with 287 additions and 53 deletions.
@@ -1,3 +1,8 @@
# Protobuf 3
ProblemFilters.exclude[Problem]("akka.cluster.client.protobuf.msg.*")
ProblemFilters.exclude[Problem]("akka.cluster.pubsub.protobuf.msg.*")

# #27487 Singleton issue when several nodes leaving
ProblemFilters.exclude[Problem]("akka.cluster.singleton.ClusterSingletonManager#Internal#OldestChangedBuffer*")
ProblemFilters.exclude[Problem]("akka.cluster.singleton.ClusterSingletonManager#Internal#YoungerData.*")
ProblemFilters.exclude[Problem]("akka.cluster.singleton.ClusterSingletonManager#Internal#BecomingOldestData.*")
Expand Up @@ -233,8 +233,8 @@ object ClusterSingletonManager {
case object End extends State

case object Uninitialized extends Data
final case class YoungerData(oldestOption: Option[UniqueAddress]) extends Data
final case class BecomingOldestData(previousOldestOption: Option[UniqueAddress]) extends Data
final case class YoungerData(oldest: List[UniqueAddress]) extends Data
final case class BecomingOldestData(previousOldest: List[UniqueAddress]) extends Data
final case class OldestData(singleton: Option[ActorRef]) extends Data
final case class WasOldestData(singleton: Option[ActorRef], newOldestOption: Option[UniqueAddress]) extends Data
final case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef]) extends Data
Expand All @@ -259,7 +259,7 @@ object ClusterSingletonManager {
/**
* The first event, corresponding to CurrentClusterState.
*/
final case class InitialOldestState(oldest: Option[UniqueAddress], safeToBeOldest: Boolean)
final case class InitialOldestState(oldest: List[UniqueAddress], safeToBeOldest: Boolean)

final case class OldestChanged(oldest: Option[UniqueAddress])
}
Expand Down Expand Up @@ -324,19 +324,23 @@ object ClusterSingletonManager {
}

def handleInitial(state: CurrentClusterState): Unit = {
// all members except Joining and WeaklyUp
membersByAge = immutable.SortedSet
.empty(ageOrdering)
.union(state.members.filter(m => m.status == MemberStatus.Up && matchingRole(m)))
.union(state.members.filter(m => m.upNumber != Int.MaxValue && matchingRole(m)))

// If there is some removal in progress of an older node it's not safe to immediately become oldest,
// removal of younger nodes doesn't matter. Note that it can also be started via restart after
// ClusterSingletonManagerIsStuck.
val selfUpNumber = state.members
.collectFirst { case m if m.uniqueAddress == cluster.selfUniqueAddress => m.upNumber }
.getOrElse(Int.MaxValue)
val safeToBeOldest = !state.members.exists { m =>
m.upNumber <= selfUpNumber && matchingRole(m) && (m.status == MemberStatus.Down || m.status == MemberStatus.Exiting || m.status == MemberStatus.Leaving)
val oldest = membersByAge.takeWhile(_.upNumber <= selfUpNumber)
val safeToBeOldest = !oldest.exists { m =>
m.status == MemberStatus.Down || m.status == MemberStatus.Exiting || m.status == MemberStatus.Leaving
}
val initial = InitialOldestState(membersByAge.headOption.map(_.uniqueAddress), safeToBeOldest)

val initial = InitialOldestState(oldest.toList.map(_.uniqueAddress), safeToBeOldest)
changes :+= initial
}

Expand Down Expand Up @@ -600,36 +604,40 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
getNextOldestChanged()
stay

case Event(InitialOldestState(oldestOption, safeToBeOldest), _) =>
case Event(InitialOldestState(oldest, safeToBeOldest), _) =>
oldestChangedReceived = true
if (oldestOption == selfUniqueAddressOption && safeToBeOldest)

if (oldest.headOption == selfUniqueAddressOption && safeToBeOldest)
// oldest immediately
tryGotoOldest()
else if (oldestOption == selfUniqueAddressOption)
goto(BecomingOldest).using(BecomingOldestData(None))
else if (oldest.headOption == selfUniqueAddressOption)
goto(BecomingOldest).using(BecomingOldestData(oldest.filterNot(_ == cluster.selfUniqueAddress)))
else
goto(Younger).using(YoungerData(oldestOption))
goto(Younger).using(YoungerData(oldest.filterNot(_ == cluster.selfUniqueAddress)))
}

when(Younger) {
case Event(OldestChanged(oldestOption), YoungerData(previousOldestOption)) =>
case Event(OldestChanged(oldestOption), YoungerData(previousOldest)) =>
oldestChangedReceived = true
if (oldestOption == selfUniqueAddressOption) {
logInfo("Younger observed OldestChanged: [{} -> myself]", previousOldestOption.map(_.address))
previousOldestOption match {
case None => tryGotoOldest()
case Some(prev) if removed.contains(prev) => tryGotoOldest()
case Some(prev) =>
peer(prev.address) ! HandOverToMe
goto(BecomingOldest).using(BecomingOldestData(previousOldestOption))
logInfo("Younger observed OldestChanged: [{} -> myself]", previousOldest.headOption.map(_.address))
if (previousOldest.forall(removed.contains))
tryGotoOldest()
else {
peer(previousOldest.head.address) ! HandOverToMe
goto(BecomingOldest).using(BecomingOldestData(previousOldest))
}
} else {
logInfo(
"Younger observed OldestChanged: [{} -> {}]",
previousOldestOption.map(_.address),
previousOldest.headOption.map(_.address),
oldestOption.map(_.address))
getNextOldestChanged()
stay.using(YoungerData(oldestOption))
val newPreviousOldest = oldestOption match {
case Some(oldest) if !previousOldest.contains(oldest) => oldest :: previousOldest
case _ => previousOldest
}
stay.using(YoungerData(newPreviousOldest))
}

case Event(MemberDowned(m), _) if m.uniqueAddress == cluster.selfUniqueAddress =>
Expand All @@ -644,16 +652,23 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
scheduleDelayedMemberRemoved(m)
stay

case Event(DelayedMemberRemoved(m), YoungerData(Some(previousOldest))) if m.uniqueAddress == previousOldest =>
logInfo("Previous oldest removed [{}]", m.address)
case Event(DelayedMemberRemoved(m), YoungerData(previousOldest)) =>
if (!selfExited)
logInfo("Member removed [{}]", m.address)
addRemoved(m.uniqueAddress)
// transition when OldestChanged
stay.using(YoungerData(None))
stay.using(YoungerData(previousOldest.filterNot(_ == m.uniqueAddress)))

case Event(HandOverToMe, _) =>
// this node was probably quickly restarted with same hostname:port,
// confirm that the old singleton instance has been stopped
sender() ! HandOverDone
val selfStatus = cluster.selfMember.status
if (selfStatus == MemberStatus.Leaving || selfStatus == MemberStatus.Exiting)
logInfo("Ignoring HandOverToMe in Younger from [{}] because self is [{}].", sender().path.address, selfStatus)
else {
// this node was probably quickly restarted with same hostname:port,
// confirm that the old singleton instance has been stopped
sender() ! HandOverDone
}

stay
}

Expand All @@ -665,15 +680,21 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
cancelTimer(HandOverRetryTimer)
stay

case Event(HandOverDone, BecomingOldestData(Some(previousOldest))) =>
if (sender().path.address == previousOldest.address)
tryGotoOldest()
else {
logInfo(
"Ignoring HandOverDone in BecomingOldest from [{}]. Expected previous oldest [{}]",
sender().path.address,
previousOldest.address)
stay
case Event(HandOverDone, BecomingOldestData(previousOldest)) =>
previousOldest.headOption match {
case Some(oldest) =>
if (sender().path.address == oldest.address)
tryGotoOldest()
else {
logInfo(
"Ignoring HandOverDone in BecomingOldest from [{}]. Expected previous oldest [{}]",
sender().path.address,
oldest.address)
stay
}
case None =>
logInfo("Ignoring HandOverDone in BecomingOldest from [{}].", sender().path.address)
stay
}

case Event(MemberDowned(m), _) if m.uniqueAddress == cluster.selfUniqueAddress =>
Expand All @@ -688,13 +709,20 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
scheduleDelayedMemberRemoved(m)
stay

case Event(DelayedMemberRemoved(m), BecomingOldestData(Some(previousOldest)))
if m.uniqueAddress == previousOldest =>
logInfo("Previous oldest [{}] removed", previousOldest.address)
case Event(DelayedMemberRemoved(m), BecomingOldestData(previousOldest)) =>
if (!selfExited)
logInfo("Member removed [{}], previous oldest [{}]", m.address, previousOldest.map(_.address).mkString(", "))
addRemoved(m.uniqueAddress)
tryGotoOldest()
if (cluster.isTerminated) {
// don't act on DelayedMemberRemoved (starting singleton) if this node is shutting its self down,
// just wait for self MemberRemoved
stay
} else if (previousOldest.contains(m.uniqueAddress) && previousOldest.forall(removed.contains))
tryGotoOldest()
else
stay.using(BecomingOldestData(previousOldest.filterNot(_ == m.uniqueAddress)))

case Event(TakeOverFromMe, BecomingOldestData(previousOldestOption)) =>
case Event(TakeOverFromMe, BecomingOldestData(previousOldest)) =>
val senderAddress = sender().path.address
// it would have been better to include the UniqueAddress in the TakeOverFromMe message,
// but can't change due to backwards compatibility
Expand All @@ -704,28 +732,29 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
logInfo("Ignoring TakeOver request from unknown node in BecomingOldest from [{}].", senderAddress)
stay
case Some(senderUniqueAddress) =>
previousOldestOption match {
case Some(previousOldest) =>
if (previousOldest == senderUniqueAddress) sender() ! HandOverToMe
previousOldest.headOption match {
case Some(oldest) =>
if (oldest == senderUniqueAddress)
sender() ! HandOverToMe
else
logInfo(
"Ignoring TakeOver request in BecomingOldest from [{}]. Expected previous oldest [{}]",
sender().path.address,
previousOldest.address)
oldest.address)
stay
case None =>
sender() ! HandOverToMe
stay.using(BecomingOldestData(Some(senderUniqueAddress)))
stay.using(BecomingOldestData(senderUniqueAddress :: previousOldest))
}
}

case Event(HandOverRetry(count), BecomingOldestData(previousOldestOption)) =>
case Event(HandOverRetry(count), BecomingOldestData(previousOldest)) =>
if (count <= maxHandOverRetries) {
logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousOldestOption.map(_.address))
previousOldestOption.foreach(node => peer(node.address) ! HandOverToMe)
logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousOldest.headOption.map(_.address))
previousOldest.headOption.foreach(node => peer(node.address) ! HandOverToMe)
startSingleTimer(HandOverRetryTimer, HandOverRetry(count + 1), handOverRetryInterval)
stay()
} else if (previousOldestOption.forall(removed.contains)) {
} else if (previousOldest.forall(removed.contains)) {
// can't send HandOverToMe, previousOldest unknown for new node (or restart)
// previous oldest might be down or removed, so no TakeOverFromMe message is received
logInfo("Timeout in BecomingOldest. Previous oldest unknown, removed and no TakeOver request.")
Expand All @@ -734,7 +763,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
stop()
else
throw new ClusterSingletonManagerIsStuck(
s"Becoming singleton oldest was stuck because previous oldest [$previousOldestOption] is unresponsive")
s"Becoming singleton oldest was stuck because previous oldest [${previousOldest.headOption}] is unresponsive")
}

def scheduleDelayedMemberRemoved(m: Member): Unit = {
Expand Down Expand Up @@ -964,7 +993,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
} else if (handOverTo.isEmpty)
goto(Younger).using(YoungerData(None))
goto(Younger).using(YoungerData(Nil))
else
goto(End).using(EndData)
}
Expand Down

0 comments on commit ddb0852

Please sign in to comment.