Skip to content

Commit

Permalink
Merge pull request #23529 from akka/wip-23371-CrossDcHeartbeatSender-…
Browse files Browse the repository at this point in the history
…patriknw

Missing become after CurrentClusterState in CrossDcHeartbeatSender, #23371
  • Loading branch information
patriknw committed Aug 22, 2017
2 parents cff43a1 + 659b28e commit eefe647
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 30 deletions.
Expand Up @@ -58,8 +58,7 @@ private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogg
selfDataCenter,
crossDcFailureDetector,
crossDcSettings.NrOfMonitoringActors,
SortedSet.empty
)
SortedSet.empty)

// start periodic heartbeat to other nodes in cluster
val heartbeatTask = scheduler.schedule(
Expand Down Expand Up @@ -125,10 +124,11 @@ private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogg
val nodes = snapshot.members
val nrOfMonitoredNodes = crossDcSettings.NrOfMonitoringActors
dataCentersState = CrossDcHeartbeatingState.init(selfDataCenter, crossDcFailureDetector, nrOfMonitoredNodes, nodes)
becomeActiveIfResponsibleForHeartbeat()
}

def addMember(m: Member): Unit =
if (m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp) {
if (CrossDcHeartbeatingState.atLeastInUpState(m)) {
// since we only monitor nodes in Up or later states, due to the n-th oldest requirement
dataCentersState = dataCentersState.addMember(m)
if (verboseHeartbeat && m.dataCenter != selfDataCenter)
Expand Down Expand Up @@ -194,15 +194,15 @@ private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogg
@InternalApi
private[akka] object CrossDcHeartbeatSender {

// -- messages intended only for local messaging during testing --
// -- messages intended only for local messaging during testing --
sealed trait InspectionCommand extends NoSerializationVerificationNeeded
final case class ReportStatus()

sealed trait StatusReport extends NoSerializationVerificationNeeded
sealed trait MonitoringStateReport extends StatusReport
final case class MonitoringActive(state: CrossDcHeartbeatingState) extends MonitoringStateReport
final case class MonitoringDormant() extends MonitoringStateReport
// -- end of messages intended only for local messaging during testing --
// -- end of messages intended only for local messaging during testing --
}

/** INTERNAL API */
Expand All @@ -219,9 +219,7 @@ private[cluster] final case class CrossDcHeartbeatingState(
* Only the `nrOfMonitoredNodesPerDc`-oldest nodes in each DC fulfil this role.
*/
def shouldActivelyMonitorNodes(selfDc: ClusterSettings.DataCenter, selfAddress: UniqueAddress): Boolean = {
/** Since we need ordering of oldests guaranteed, we must only look at Up (or Leaving, Exiting...) nodes */
def atLeastInUpState(m: Member): Boolean =
m.status != MemberStatus.WeaklyUp && m.status != MemberStatus.Joining
// Since we need ordering of oldests guaranteed, we must only look at Up (or Leaving, Exiting...) nodes

val selfDcNeighbours: SortedSet[Member] = state.getOrElse(selfDc, emptyMembersSortedSet)
val selfDcOldOnes = selfDcNeighbours.filter(atLeastInUpState).take(nrOfMonitoredNodesPerDc)
Expand All @@ -244,12 +242,12 @@ private[cluster] final case class CrossDcHeartbeatingState(
val updatedState = this.copy(state = state.updated(dc, updatedMembers))

// guarding against the case of two members having the same upNumber, in which case the activeReceivers
// which are based on the ageOrdering could actually have changed by adding a node. In practice this
// should happen rarely, since upNumbers are assigned sequentially, and we only ever compare nodes
// in the same DC. If it happens though, we need to remove the previously monitored node from the failure
// which are based on the ageOrdering could actually have changed by adding a node. In practice this
// should happen rarely, since upNumbers are assigned sequentially, and we only ever compare nodes
// in the same DC. If it happens though, we need to remove the previously monitored node from the failure
// detector, to prevent both a resource leak and that node actually appearing as unreachable in the gossip (!)
val stoppedMonitoringReceivers = updatedState.activeReceiversIn(dc) diff this.activeReceiversIn(dc)
stoppedMonitoringReceivers.foreach(m failureDetector.remove(m.address)) // at most one element difference
stoppedMonitoringReceivers.foreach(m failureDetector.remove(m.address)) // at most one element difference

updatedState
}
Expand All @@ -263,7 +261,7 @@ private[cluster] final case class CrossDcHeartbeatingState(
failureDetector.remove(m.address)
copy(state = state.updated(dc, updatedMembers))
case None
this // no change needed, was certainly not present (not even its DC was)
this // no change needed, was certainly not present (not even its DC was)
}
}

Expand All @@ -274,8 +272,7 @@ private[cluster] final case class CrossDcHeartbeatingState(

allOtherNodes.flatMap(
_.take(nrOfMonitoredNodesPerDc)
.map(_.uniqueAddress)(breakOut)
).toSet
.map(_.uniqueAddress)(breakOut)).toSet
}

/** Lists addresses in diven DataCenter that this node should send heartbeats to */
Expand Down Expand Up @@ -310,6 +307,10 @@ private[cluster] object CrossDcHeartbeatingState {
/** Sorted by age */
private def emptyMembersSortedSet: SortedSet[Member] = SortedSet.empty[Member](Member.ageOrdering)

// Since we need ordering of oldests guaranteed, we must only look at Up (or Leaving, Exiting...) nodes
def atLeastInUpState(m: Member): Boolean =
m.status != MemberStatus.WeaklyUp && m.status != MemberStatus.Joining

def init(
selfDataCenter: DataCenter,
crossDcFailureDetector: FailureDetectorRegistry[Address],
Expand All @@ -321,7 +322,7 @@ private[cluster] object CrossDcHeartbeatingState {
nrOfMonitoredNodesPerDc,
state = {
// TODO unduplicate this with the logic in MembershipState.ageSortedTopOldestMembersPerDc
val groupedByDc = members.groupBy(_.dataCenter)
val groupedByDc = members.filter(atLeastInUpState).groupBy(_.dataCenter)

if (members.ordering == Member.ageOrdering) {
// we already have the right ordering
Expand Down
Expand Up @@ -16,7 +16,7 @@ import scala.concurrent.duration._

object MultiDcHeartbeatTakingOverSpecMultiJvmSpec extends MultiNodeConfig {
val first = role("first") // alpha
val second = role("second") // alpha
val second = role("second") // alpha
val third = role("third") // alpha

val fourth = role("fourth") // beta
Expand All @@ -40,15 +40,15 @@ object MultiDcHeartbeatTakingOverSpecMultiJvmSpec extends MultiNodeConfig {
"""
akka {
actor.provider = cluster
loggers = ["akka.testkit.TestEventListener"]
loglevel = INFO
remote.log-remote-lifecycle-events = off
cluster {
debug.verbose-heartbeat-logging = off
multi-data-center {
cross-data-center-connections = 2
}
Expand Down Expand Up @@ -136,7 +136,7 @@ abstract class MultiDcHeartbeatTakingOverSpec extends MultiNodeSpec(MultiDcHeart
"other node must become oldest when current DC-oldest Leaves" taggedAs LongRunningTest in {
val observer = TestProbe("alpha-observer-prime")

// we leave one of the current oldest nodes of the `alpha` DC,
// we leave one of the current oldest nodes of the `alpha` DC,
// since it has 3 members the "not yet oldest" one becomes oldest and should start monitoring across datacenter
val preLeaveOldestAlphaRole = expectedAlphaHeartbeaterRoles.head
val preLeaveOldestAlphaAddress = expectedAlphaHeartbeaterNodes.find(_.address.port.get == preLeaveOldestAlphaRole.port.get).get.address
Expand Down Expand Up @@ -182,13 +182,12 @@ abstract class MultiDcHeartbeatTakingOverSpec extends MultiNodeSpec(MultiDcHeart
*/
private def membersByAge(): immutable.SortedSet[Member] =
SortedSet.empty(Member.ageOrdering)
.union(cluster.state.members.filter(m m.status != MemberStatus.WeaklyUp && m.status != MemberStatus.WeaklyUp))
.union(cluster.state.members.filter(m m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp))

/** INTERNAL API */
@InternalApi
private[cluster] def takeNOldestMembers(memberFilter: Member Boolean, n: Int): immutable.SortedSet[Member] =
membersByAge()
.filter(m m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp)
.filter(memberFilter)
.take(n)

Expand Down
Expand Up @@ -38,15 +38,15 @@ object MultiDcSunnyWeatherMultiJvmSpec extends MultiNodeConfig {
"""
akka {
actor.provider = cluster
loggers = ["akka.testkit.TestEventListener"]
loglevel = INFO
remote.log-remote-lifecycle-events = off
cluster {
debug.verbose-heartbeat-logging = off
multi-data-center {
cross-data-center-connections = 2
}
Expand Down Expand Up @@ -149,13 +149,12 @@ abstract class MultiDcSunnyWeatherSpec extends MultiNodeSpec(MultiDcSunnyWeather
*/
private def membersByAge(): immutable.SortedSet[Member] =
SortedSet.empty(Member.ageOrdering)
.union(cluster.state.members.filter(m m.status != MemberStatus.WeaklyUp && m.status != MemberStatus.WeaklyUp))
.union(cluster.state.members.filter(m m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp))

/** INTERNAL API */
@InternalApi
private[cluster] def takeNOldestMembers(memberFilter: Member Boolean, n: Int): immutable.SortedSet[Member] =
membersByAge()
.filter(m m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp)
.filter(memberFilter)
.take(n)

Expand Down

0 comments on commit eefe647

Please sign in to comment.