Skip to content

Commit

Permalink
Merge pull request #18746 from akka/wip-18554-singleton-startup-patriknw
Browse files Browse the repository at this point in the history
=clu #18554 Make oldest assignment deterministic when joining
  • Loading branch information
patriknw committed Nov 6, 2015
2 parents b6ec600 + c7c187f commit 1e36e5e
Show file tree
Hide file tree
Showing 27 changed files with 162 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ private[metrics] class ClusterMetricsCollector extends Actor with ActorLogging {
* Updates the initial node ring for those nodes that are [[akka.cluster.MemberStatus]] `Up`.
*/
def receiveState(state: CurrentClusterState): Unit =
nodes = (state.members -- state.unreachable) collect {
nodes = (state.members diff state.unreachable) collect {
case m if m.status == MemberStatus.Up || m.status == MemberStatus.WeaklyUp m.address
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ final case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Met
if (timestamp >= that.timestamp) this // that is older
else {
// equality is based on the name of the Metric and Set doesn't replace existing element
copy(metrics = that.metrics ++ metrics, timestamp = that.timestamp)
copy(metrics = that.metrics union metrics, timestamp = that.timestamp)
}
}

Expand All @@ -303,7 +303,7 @@ final case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Met
}
// Append metrics missing from either latest or current.
// Equality is based on the [[Metric.name]] and [[Set]] doesn't replace existing elements.
val merged = updated ++ latestNode.metrics ++ currentNode.metrics
val merged = updated union latestNode.metrics union currentNode.metrics
copy(metrics = merged, timestamp = latestNode.timestamp)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ class SigarMetricsCollector(address: Address, decayFactor: Double, sigar: SigarP
override def metrics(): Set[Metric] = {
// Must obtain cpuPerc in one shot. See https://github.com/akka/akka/issues/16121
val cpuPerc = sigar.getCpuPerc
super.metrics ++ Set(cpuCombined(cpuPerc), cpuStolen(cpuPerc)).flatten
super.metrics union Set(cpuCombined(cpuPerc), cpuStolen(cpuPerc)).flatten
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti
rebalanceInProgress += shard
log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion)
context.actorOf(rebalanceWorkerProps(shard, rebalanceFromRegion, handOffTimeout,
state.regions.keySet ++ state.regionProxies)
state.regions.keySet union state.regionProxies)
.withDispatcher(context.props.dispatcher))
case None
log.debug("Rebalance of non-existing shard [{}] is ignored", shard)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ class ShardRegion(
val cluster = Cluster(context.system)

// sort by age, oldest first
val ageOrdering = Ordering.fromLessThan[Member] { (a, b) a.isOlderThan(b) }
val ageOrdering = Member.ageOrdering
var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering)

var regions = Map.empty[ActorRef, Set[ShardId]]
Expand Down Expand Up @@ -318,14 +318,14 @@ class ShardRegion(
}

def receiveClusterState(state: CurrentClusterState): Unit = {
changeMembers(immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m
changeMembers(immutable.SortedSet.empty(ageOrdering) union state.members.filter(m
m.status == MemberStatus.Up && matchingRole(m)))
}

def receiveClusterEvent(evt: ClusterDomainEvent): Unit = evt match {
case MemberUp(m)
if (matchingRole(m))
changeMembers(membersByAge + m)
changeMembers(membersByAge - m + m) // replace

case MemberRemoved(m, _)
if (m.uniqueAddress == cluster.selfUniqueAddress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac
def sendGetContacts(): Unit = {
val sendTo =
if (contacts.isEmpty) initialContactsSel
else if (contacts.size == 1) (initialContactsSel ++ contacts)
else if (contacts.size == 1) (initialContactsSel union contacts)
else contacts
if (log.isDebugEnabled)
log.debug(s"""Sending GetContacts to [${sendTo.mkString(",")}]""")
Expand Down Expand Up @@ -639,7 +639,7 @@ final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterRecep
val slice = {
val first = nodes.from(a).tail.take(numberOfContacts)
if (first.size == numberOfContacts) first
else first ++ nodes.take(numberOfContacts - first.size)
else first union nodes.take(numberOfContacts - first.size)
}
val contacts = Contacts(slice.map(a self.path.toStringWithAddress(a))(collection.breakOut))
if (log.isDebugEnabled)
Expand All @@ -648,7 +648,7 @@ final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterRecep
}

case state: CurrentClusterState
nodes = nodes.empty ++ state.members.collect { case m if m.status != MemberStatus.Joining && matchingRole(m) m.address }
nodes = nodes.empty union state.members.collect { case m if m.status != MemberStatus.Joining && matchingRole(m) m.address }
consistentHash = ConsistentHash(nodes, virtualNodesFactor)

case MemberUp(m)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ object ClusterSingletonManager {

val cluster = Cluster(context.system)
// sort by age, oldest first
val ageOrdering = Ordering.fromLessThan[Member] { (a, b) a.isOlderThan(b) }
val ageOrdering = Member.ageOrdering
var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering)

var changes = Vector.empty[AnyRef]
Expand All @@ -251,7 +251,7 @@ object ClusterSingletonManager {
}

def handleInitial(state: CurrentClusterState): Unit = {
membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m
membersByAge = immutable.SortedSet.empty(ageOrdering) union state.members.filter(m
(m.status == MemberStatus.Up || m.status == MemberStatus.Leaving) && matchingRole(m))
val safeToBeOldest = !state.members.exists { m (m.status == MemberStatus.Down || m.status == MemberStatus.Exiting) }
val initial = InitialOldestState(membersByAge.headOption.map(_.address), safeToBeOldest)
Expand All @@ -260,7 +260,10 @@ object ClusterSingletonManager {

def add(m: Member): Unit = {
if (matchingRole(m))
trackChange { () membersByAge += m }
trackChange { ()
membersByAge -= m // replace
membersByAge += m
}
}

def remove(m: Member): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,7 @@ final class ClusterSingletonProxy(singletonManagerPath: String, settings: Cluste
val cluster = Cluster(context.system)
var singleton: Option[ActorRef] = None
// sort by age, oldest first
val ageOrdering = Ordering.fromLessThan[Member] {
(a, b) a.isOlderThan(b)
}
val ageOrdering = Member.ageOrdering
var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering)

var buffer = new java.util.LinkedList[(Any, ActorRef)]
Expand Down Expand Up @@ -170,7 +168,7 @@ final class ClusterSingletonProxy(singletonManagerPath: String, settings: Cluste
def handleInitial(state: CurrentClusterState): Unit = {
trackChange {
()
membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.collect {
membersByAge = immutable.SortedSet.empty(ageOrdering) union state.members.collect {
case m if m.status == MemberStatus.Up && matchingRole(m) m
}
}
Expand Down Expand Up @@ -203,8 +201,9 @@ final class ClusterSingletonProxy(singletonManagerPath: String, settings: Cluste
*/
def add(m: Member): Unit = {
if (matchingRole(m))
trackChange {
() membersByAge += m
trackChange { ()
membersByAge -= m // replace
membersByAge += m
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,77 +46,6 @@ public void demo() {
//#create-singleton-proxy
}

static//documentation of how to keep track of the oldest member in user land
//#singleton-proxy
public class ConsumerProxy extends UntypedActor {

final Cluster cluster = Cluster.get(getContext().system());

final Comparator<Member> ageComparator = new Comparator<Member>() {
public int compare(Member a, Member b) {
if (a.isOlderThan(b))
return -1;
else if (b.isOlderThan(a))
return 1;
else
return 0;
}
};
final SortedSet<Member> membersByAge = new TreeSet<Member>(ageComparator);

final String role = "worker";

//subscribe to cluster changes
@Override
public void preStart() {
cluster.subscribe(getSelf(), MemberEvent.class);
}

//re-subscribe when restart
@Override
public void postStop() {
cluster.unsubscribe(getSelf());
}

@Override
public void onReceive(Object message) {
if (message instanceof CurrentClusterState) {
CurrentClusterState state = (CurrentClusterState) message;
List<Member> members = new ArrayList<Member>();
for (Member m : state.getMembers()) {
if (m.status().equals(MemberStatus.up()) && m.hasRole(role))
members.add(m);
}
membersByAge.clear();
membersByAge.addAll(members);

} else if (message instanceof MemberUp) {
Member m = ((MemberUp) message).member();
if (m.hasRole(role))
membersByAge.add(m);

} else if (message instanceof MemberRemoved) {
Member m = ((MemberUp) message).member();
if (m.hasRole(role))
membersByAge.remove(m);

} else if (message instanceof MemberEvent) {
// not interesting

} else if (!membersByAge.isEmpty()) {
currentMaster().tell(message, getSender());

}
}

ActorSelection currentMaster() {
return getContext().actorSelection(membersByAge.first().address() + "/user/singleton/statsService");
}

}

//#singleton-proxy

public static class End {
}

Expand Down
11 changes: 7 additions & 4 deletions akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
updateLatestGossip(newGossip)

logInfo("Node [{}] is JOINING, roles [{}]", node.address, roles.mkString(", "))
if (node != selfUniqueAddress)
if (node == selfUniqueAddress) {
if (localMembers.isEmpty)
leaderActions() // important for deterministic oldest when bootstrapping
} else
sender() ! Welcome(selfUniqueAddress, latestGossip)

publish(latestGossip)
Expand Down Expand Up @@ -923,11 +926,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
// handle changes

// replace changed members
val newMembers = changedMembers ++ localMembers -- removedUnreachable
val newMembers = changedMembers union localMembers diff removedUnreachable

// removing REMOVED nodes from the `seen` table
val removed = removedUnreachable.map(_.uniqueAddress)
val newSeen = localSeen -- removed
val newSeen = localSeen diff removed
// removing REMOVED nodes from the `reachability` table
val newReachability = localOverview.reachability.remove(removed)
val newOverview = localOverview copy (seen = newSeen, reachability = newReachability)
Expand Down Expand Up @@ -985,7 +988,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with

if (changedMembers.nonEmpty) {
// replace changed members
val newMembers = changedMembers ++ localMembers
val newMembers = changedMembers union localMembers
val newGossip = localGossip.copy(members = newMembers)
updateLatestGossip(newGossip)

Expand Down
9 changes: 5 additions & 4 deletions akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,11 @@ object ClusterEvent {
private[cluster] def diffMemberEvents(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[MemberEvent] =
if (newGossip eq oldGossip) Nil
else {
val newMembers = newGossip.members -- oldGossip.members
val newMembers = newGossip.members diff oldGossip.members
val membersGroupedByAddress = List(newGossip.members, oldGossip.members).flatten.groupBy(_.uniqueAddress)
val changedMembers = membersGroupedByAddress collect {
case (_, newMember :: oldMember :: Nil) if newMember.status != oldMember.status newMember
case (_, newMember :: oldMember :: Nil) if newMember.status != oldMember.status || newMember.upNumber != oldMember.upNumber
newMember
}
val memberEvents = (newMembers ++ changedMembers) collect {
case m if m.status == WeaklyUp MemberWeaklyUp(m)
Expand All @@ -284,7 +285,7 @@ object ClusterEvent {
// no events for other transitions
}

val removedMembers = oldGossip.members -- newGossip.members
val removedMembers = oldGossip.members diff newGossip.members
val removedEvents = removedMembers.map(m MemberRemoved(m.copy(status = Removed), m.status))

(new VectorBuilder[MemberEvent]() ++= memberEvents ++= removedEvents).result()
Expand All @@ -304,7 +305,7 @@ object ClusterEvent {
*/
private[cluster] def diffRolesLeader(oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): Set[RoleLeaderChanged] = {
for {
role (oldGossip.allRoles ++ newGossip.allRoles)
role (oldGossip.allRoles union newGossip.allRoles)
newLeader = newGossip.roleLeader(role, selfUniqueAddress)
if newLeader != oldGossip.roleLeader(role, selfUniqueAddress)
} yield RoleLeaderChanged(role, newLeader.map(_.address))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ private[cluster] final case class ClusterHeartbeatSenderState(
oldReceiversNowUnreachable: Set[UniqueAddress],
failureDetector: FailureDetectorRegistry[Address]) {

val activeReceivers: Set[UniqueAddress] = ring.myReceivers ++ oldReceiversNowUnreachable
val activeReceivers: Set[UniqueAddress] = ring.myReceivers union oldReceiversNowUnreachable

def selfAddress = ring.selfAddress

Expand Down Expand Up @@ -212,7 +212,7 @@ private[cluster] final case class ClusterHeartbeatSenderState(

private def membershipChange(newRing: HeartbeatNodeRing): ClusterHeartbeatSenderState = {
val oldReceivers = ring.myReceivers
val removedReceivers = oldReceivers -- newRing.myReceivers
val removedReceivers = oldReceivers diff newRing.myReceivers
var adjustedOldReceiversNowUnreachable = oldReceiversNowUnreachable
removedReceivers foreach { a
if (failureDetector.isAvailable(a.address))
Expand Down Expand Up @@ -260,7 +260,7 @@ private[cluster] final case class HeartbeatNodeRing(
ha < hb || (ha == hb && Member.addressOrdering.compare(a.address, b.address) < 0)
}

immutable.SortedSet() ++ nodes
immutable.SortedSet() union nodes
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ final case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Met
if (timestamp >= that.timestamp) this // that is older
else {
// equality is based on the name of the Metric and Set doesn't replace existing element
copy(metrics = that.metrics ++ metrics, timestamp = that.timestamp)
copy(metrics = that.metrics union metrics, timestamp = that.timestamp)
}
}

Expand Down Expand Up @@ -742,7 +742,7 @@ class SigarMetricsCollector(address: Address, decayFactor: Double, sigar: AnyRef
}

override def metrics: Set[Metric] = {
super.metrics.filterNot(_.name == SystemLoadAverage) ++ Set(systemLoadAverage, cpuCombined).flatten
super.metrics.filterNot(_.name == SystemLoadAverage) union Set(systemLoadAverage, cpuCombined).flatten
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private[cluster] class ClusterRemoteWatcher(
case state: CurrentClusterState
clusterNodes = state.members.collect { case m if m.address != selfAddress m.address }
clusterNodes foreach takeOverResponsibility
unreachable --= clusterNodes
unreachable = unreachable diff clusterNodes
case MemberUp(m) memberUp(m)
case MemberWeaklyUp(m) memberUp(m)
case MemberRemoved(m, previousStatus) memberRemoved(m, previousStatus)
Expand Down
8 changes: 4 additions & 4 deletions akka-cluster/src/main/scala/akka/cluster/Gossip.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ private[cluster] final case class Gossip(
throw new IllegalArgumentException(s"Live members must have status [${Removed}], " +
s"got [${members.filter(_.status == Removed)}]")

val inReachabilityButNotMember = overview.reachability.allObservers -- members.map(_.uniqueAddress)
val inReachabilityButNotMember = overview.reachability.allObservers diff members.map(_.uniqueAddress)
if (inReachabilityButNotMember.nonEmpty)
throw new IllegalArgumentException("Nodes not part of cluster in reachability table, got [%s]"
format inReachabilityButNotMember.mkString(", "))

val seenButNotMember = overview.seen -- members.map(_.uniqueAddress)
val seenButNotMember = overview.seen diff members.map(_.uniqueAddress)
if (seenButNotMember.nonEmpty)
throw new IllegalArgumentException("Nodes not part of cluster have marked the Gossip as seen, got [%s]"
format seenButNotMember.mkString(", "))
Expand Down Expand Up @@ -129,7 +129,7 @@ private[cluster] final case class Gossip(
* Merges the seen table of two Gossip instances.
*/
def mergeSeen(that: Gossip): Gossip =
this copy (overview = overview copy (seen = overview.seen ++ that.overview.seen))
this copy (overview = overview copy (seen = overview.seen union that.overview.seen))

/**
* Merges two Gossip instances including membership tables, and the VectorClock histories.
Expand All @@ -141,7 +141,7 @@ private[cluster] final case class Gossip(
val mergedVClock = this.version merge that.version

// 2. merge members by selecting the single Member with highest MemberStatus out of the Member groups
val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members)
val mergedMembers = Gossip.emptyMembers union Member.pickHighestPriority(this.members, that.members)

// 3. merge reachability table by picking records with highest version
val mergedReachability = this.overview.reachability.merge(mergedMembers.map(_.uniqueAddress),
Expand Down
Loading

0 comments on commit 1e36e5e

Please sign in to comment.