Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Make it possible to define appVersion later #31934

Merged
merged 4 commits into from May 3, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 28 additions & 0 deletions akka-cluster-typed/src/main/scala/akka/cluster/typed/Cluster.scala
Expand Up @@ -4,7 +4,10 @@

package akka.cluster.typed

import java.util.concurrent.CompletionStage

import scala.collection.immutable
import scala.concurrent.Future

import akka.actor.Address
import akka.actor.typed.{ ActorRef, ActorSystem, Extension, ExtensionId }
Expand All @@ -14,6 +17,7 @@ import akka.cluster._
import akka.cluster.ClusterEvent.{ ClusterDomainEvent, CurrentClusterState }
import akka.cluster.typed.internal.AdapterClusterImpl
import akka.japi.Util
import akka.util.Version

/**
* Messages for subscribing to changes in the cluster state
Expand Down Expand Up @@ -118,6 +122,30 @@ final case class JoinSeedNodes(seedNodes: immutable.Seq[Address]) extends Cluste

}

/**
* Scala API: If the `appVersion` is read from an external system (e.g. Kubernetes) it can be defined after
* system startup but before joining by completing the `appVersion` `Future`. In that case,
* `SetAppVersionLater` should be sent before [[Join]] or [[JoinSeedNodes]] It's fine to send
* `Join` or `JoinSeedNodes` immediately afterwards (before the `Future` is completed. The join will
* then wait for the `appVersion` to be completed.
*/
final case class SetAppVersionLater(appVersion: Future[Version]) extends ClusterCommand

object SetAppVersionLater {

/**
* Java API: If the `appVersion` is read from an external system (e.g. Kubernetes) it can be defined after
* system startup but before joining by completing the `appVersion` `CompletionStage`. In that case,
* `SetAppVersionLater` should be sent before [[Join]] or [[JoinSeedNodes]] It's fine to send
* `Join` or `JoinSeedNodes` immediately afterwards (before the `CompletionStage` is completed. The join will
* then wait for the `appVersion` to be completed.
*/
def create(appVersion: CompletionStage[Version]): SetAppVersionLater = {
import scala.compat.java8.FutureConverters._
SetAppVersionLater(appVersion.toScala)
}
}

/**
* Send command to issue state transition to LEAVING for the node specified by 'address'.
* The member will go through the status changes [[MemberStatus]] `Leaving` (not published to
Expand Down
Expand Up @@ -132,6 +132,10 @@ private[akka] object AdapterClusterImpl {
adaptedCluster.joinSeedNodes(addresses)
Behaviors.same

case SetAppVersionLater(version) =>
adaptedCluster.setAppVersionLater(version)
Behaviors.same

case PrepareForFullClusterShutdown =>
adaptedCluster.prepareForFullClusterShutdown()
Behaviors.same
Expand Down
38 changes: 37 additions & 1 deletion akka-cluster/src/main/scala/akka/cluster/Cluster.scala
Expand Up @@ -5,6 +5,7 @@
package akka.cluster

import java.io.Closeable
import java.util.concurrent.CompletionStage
import java.util.concurrent.ThreadFactory
import java.util.concurrent.atomic.AtomicBoolean

Expand All @@ -13,8 +14,11 @@ import scala.collection.immutable
import scala.concurrent.{ Await, ExecutionContext }
import scala.concurrent.duration._
import scala.util.control.NonFatal

import scala.annotation.nowarn
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success

import com.typesafe.config.{ Config, ConfigFactory }

import akka.ConfigurationException
Expand All @@ -29,6 +33,7 @@ import akka.event.MarkerLoggingAdapter
import akka.japi.Util
import akka.pattern._
import akka.remote.{ UniqueAddress => _, _ }
import akka.util.Version

/**
* Cluster Extension Id and factory for creating Cluster extension.
Expand Down Expand Up @@ -359,6 +364,37 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
def joinSeedNodes(seedNodes: java.util.List[Address]): Unit =
joinSeedNodes(Util.immutableSeq(seedNodes))

/**
* Scala API: If the `appVersion` is read from an external system (e.g. Kubernetes) it can be defined after
* system startup but before joining by completing the `appVersion` `Future`. In that case, `setAppVersionLater`
* should be called before calling `join` or `joinSeedNodes`. It's fine to call `join` or `joinSeedNodes`
* immediately afterwards (before the `Future` is completed. The join will then wait for the `appVersion`
* to be completed.
*/
def setAppVersionLater(appVersion: Future[Version]): Unit = {
johanandren marked this conversation as resolved.
Show resolved Hide resolved
clusterCore ! ClusterUserAction.SetAppVersionLater
import system.dispatcher
appVersion.onComplete {
case Success(version) =>
clusterCore ! ClusterUserAction.SetAppVersion(version)
case Failure(exc) =>
logWarning("Later appVersion failed. Fallback to configured appVersion [{}]. {}", settings.AppVersion, exc)
clusterCore ! ClusterUserAction.SetAppVersion(settings.AppVersion)
}
}

/**
* Java API: If the `appVersion` is read from an external system (e.g. Kubernetes) it can be defined after
* system startup but before joining by completing the `appVersion` `CompletionStage`. In that case,
* `setAppVersionLater` should be called before calling `join` or `joinSeedNodes`. It's fine to call
* `join` or `joinSeedNodes` immediately afterwards (before the `CompletionStage` is completed. The join will
* then wait for the `appVersion` to be completed.
*/
def setAppVersionLater(appVersion: CompletionStage[Version]): Unit = {
import scala.compat.java8.FutureConverters._
setAppVersionLater(appVersion.toScala)
}

/**
* Send command to issue state transition to LEAVING for the node specified by 'address'.
* The member will go through the status changes [[MemberStatus]] `Leaving` (not published to
Expand Down
114 changes: 102 additions & 12 deletions akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
Expand Up @@ -9,8 +9,10 @@ import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.control.NonFatal

import scala.annotation.nowarn
import scala.util.Failure
import scala.util.Success

import com.typesafe.config.Config

import akka.Done
Expand Down Expand Up @@ -65,6 +67,19 @@ private[cluster] object ClusterUserAction {
*/
@SerialVersionUID(1L)
case object PrepareForShutdown extends ClusterMessage

/**
* The `appVersion` is defined after system startup but before joining.
* The `appVersion` is defined via the `SetAppVersion` message.
* Subsequent `JoinTo` will be deferred until after `SetAppVersion` has been
* received.
*/
case object SetAppVersionLater

/**
* Command to set the `appVersion` after system startup but before joining.
*/
final case class SetAppVersion(appVersion: Version)
}

/**
Expand Down Expand Up @@ -383,6 +398,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
}
var exitingConfirmed = Set.empty[UniqueAddress]

var laterAppVersion: Option[Promise[Version]] = None

/**
* Looks up and returns the remote cluster command connection for the specific address.
*/
Expand Down Expand Up @@ -470,6 +487,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
case JoinSeedNodes(newSeedNodes) =>
resetJoinSeedNodesDeadline()
joinSeedNodes(newSeedNodes)
case ClusterUserAction.SetAppVersionLater =>
setAppVersionLater()
case ClusterUserAction.SetAppVersion(version) =>
setAppVersion(version)
case msg: SubscriptionMessage =>
publisher.forward(msg)
case Welcome(from, gossip) =>
Expand All @@ -493,6 +514,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
resetJoinSeedNodesDeadline()
becomeUninitialized()
joinSeedNodes(newSeedNodes)
case ClusterUserAction.SetAppVersionLater =>
setAppVersionLater()
case ClusterUserAction.SetAppVersion(version) =>
setAppVersion(version)
case msg: SubscriptionMessage => publisher.forward(msg)
case _: Tick =>
if (joinSeedNodesDeadline.exists(_.isOverdue()))
Expand All @@ -505,6 +530,20 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
}
}: Actor.Receive).orElse(receiveExitingCompleted)

private def setAppVersionLater(): Unit = {
laterAppVersion match {
case Some(_) => // already set, ignore duplicate
case None => laterAppVersion = Some(Promise())
}
}

private def setAppVersion(version: Version): Unit = {
laterAppVersion match {
case Some(promise) => promise.trySuccess(version)
case None => laterAppVersion = Some(Promise().success(version))
}
}

private def resetJoinSeedNodesDeadline(): Unit = {
joinSeedNodesDeadline = ShutdownAfterUnsuccessfulJoinSeedNodes match {
case d: FiniteDuration => Some(Deadline.now + d)
Expand Down Expand Up @@ -568,6 +607,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
logInfo("Trying to join [{}] when already part of a cluster, ignoring", address)
case JoinSeedNodes(nodes) =>
logInfo("Trying to join seed nodes [{}] when already part of a cluster, ignoring", nodes.mkString(", "))
case ClusterUserAction.SetAppVersionLater =>
logInfo("Trying to set appVersion later when already part of a cluster, ignoring")
case ClusterUserAction.SetAppVersion(version) =>
logInfo("Trying to set appVersion [{}] when already part of a cluster, ignoring", version)
case ExitingConfirmed(address) => receiveExitingConfirmed(address)
}: Actor.Receive).orElse(receiveExitingCompleted)

Expand Down Expand Up @@ -703,17 +746,44 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
// to support manual join when joining to seed nodes is stuck (no seed nodes available)
stopSeedNodeProcess()

if (address == selfAddress) {
becomeInitialized()
joining(selfUniqueAddress, cluster.selfRoles, cluster.settings.AppVersion)
} else {
val joinDeadline = RetryUnsuccessfulJoinAfter match {
case d: FiniteDuration => Some(Deadline.now + d)
case _ => None
val appVersionOpt = laterAppVersion match {
case None =>
logDebug("Using appVersion [{}] from config.", cluster.settings.AppVersion)
Some(cluster.settings.AppVersion)
case Some(promise) =>
promise.future.value match {
case Some(Success(version)) =>
logDebug("Using appVersion [{}] from completed setAppVersion Future.", version)
Some(version)
case Some(Failure(exc)) =>
logError("Can't join because later appVersion was completed with failure: {}", exc)
None
case None =>
logDebug("appVersion from setAppVersion Future is not completed yet. Will continue the join to " +
"[{}] when the appVersion Future has been completed.", address)
import akka.pattern.pipe
// easiest to just try again via JoinTo when the promise has been completed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug logging or something so it is possible to notice waiting for it is what is going on?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 added in 49b339a

val pipeMessage = promise.future.map(_ => ClusterUserAction.JoinTo(address)).recover {
case _ => ClusterUserAction.JoinTo(address)
}
pipe(pipeMessage).to(self)
None
}
}

appVersionOpt.foreach { appVersion =>
if (address == selfAddress) {
becomeInitialized()
joining(selfUniqueAddress, cluster.selfRoles, appVersion)
} else {
val joinDeadline = RetryUnsuccessfulJoinAfter match {
case d: FiniteDuration => Some(Deadline.now + d)
case _ => None
}
context.become(tryingToJoin(address, joinDeadline))
logDebug("Trying to join [{}]", address)
clusterCore(address) ! Join(selfUniqueAddress, cluster.selfRoles, appVersion)
}
context.become(tryingToJoin(address, joinDeadline))
logDebug("Trying to join [{}]", address)
clusterCore(address) ! Join(selfUniqueAddress, cluster.selfRoles, cluster.settings.AppVersion)
}
}
}
Expand All @@ -734,6 +804,15 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
* current gossip state, including the new joining member.
*/
def joining(joiningNode: UniqueAddress, roles: Set[String], appVersion: Version): Unit = {
def isSelfAppVersionDefined = laterAppVersion match {
case None => true
case Some(promise) =>
promise.future.value match {
case None => false
case Some(v) => v.isSuccess
}
}

if (!preparingForShutdown) {
val selfStatus = latestGossip.member(selfUniqueAddress).status
if (joiningNode.address.protocol != selfAddress.protocol)
Expand All @@ -751,6 +830,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
"Trying to join [{}] to [{}] member, ignoring. Use a member that is Up instead.",
joiningNode,
selfStatus)
else if (laterAppVersion.nonEmpty && !isSelfAppVersionDefined)
logInfo(
"Trying to join [{}] but [{}] has not defined appVersion yet, ignoring. Try again later.",
joiningNode,
selfAddress)
else {
val localMembers = latestGossip.members

Expand Down Expand Up @@ -786,10 +870,16 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh

// add joining node as Joining
// add self in case someone else joins before self has joined (Set discards duplicates)
val selfAppVersion = laterAppVersion match {
case None => cluster.settings.AppVersion
case Some(promise) =>
// promise is known to be completed, checked above
promise.future.value.get.get
}
val newMembers = localMembers + Member(joiningNode, roles, appVersion) + Member(
selfUniqueAddress,
cluster.selfRoles,
cluster.settings.AppVersion)
selfAppVersion)
val newGossip = latestGossip.copy(members = newMembers)

updateLatestGossip(newGossip)
Expand Down
63 changes: 63 additions & 0 deletions akka-cluster/src/multi-jvm/scala/akka/cluster/AppVersionSpec.scala
@@ -0,0 +1,63 @@
/*
* Copyright (C) 2009-2023 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.cluster

import scala.concurrent.Promise

import akka.remote.testkit.MultiNodeConfig
import akka.util.Version

object AppVersionMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")

commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
}

class AppVersionMultiJvmNode1 extends AppVersionSpec
class AppVersionMultiJvmNode2 extends AppVersionSpec

abstract class AppVersionSpec extends MultiNodeClusterSpec(AppVersionMultiJvmSpec) {

import AppVersionMultiJvmSpec._

"Later appVersion" must {
"be used when joining" in {
val laterVersion = Promise[Version]()
cluster.setAppVersionLater(laterVersion.future)
// ok to try to join immediately
runOn(first) {
cluster.join(first)
// not joining until laterVersion has been completed
Thread.sleep(100)
cluster.selfMember.status should ===(MemberStatus.Removed)
laterVersion.trySuccess(Version("2"))
awaitAssert {
cluster.selfMember.status should ===(MemberStatus.Up)
cluster.selfMember.appVersion should ===(Version("2"))
}
}
enterBarrier("first-joined")

runOn(second) {
cluster.joinSeedNodes(List(address(first), address(second)))
// not joining until laterVersion has been completed
Thread.sleep(100)
cluster.selfMember.status should ===(MemberStatus.Removed)
laterVersion.trySuccess(Version("3"))
awaitAssert {
cluster.selfMember.status should ===(MemberStatus.Up)
cluster.selfMember.appVersion should ===(Version("3"))
}
}
enterBarrier("second-joined")

cluster.state.members.find(_.address == address(first)).get.appVersion should ===(Version("2"))
cluster.state.members.find(_.address == address(second)).get.appVersion should ===(Version("3"))

enterBarrier("after-1")
}
}
}