Skip to content

Commit

Permalink
lease settings support for typed cluster singleton (#30209)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhxiaogg committed May 19, 2021
1 parent 18e7881 commit a19e73a
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ object ClusterShardingSettings {
classicSettings.coordinatorSingletonSettings.singletonName,
classicSettings.coordinatorSingletonSettings.role,
classicSettings.coordinatorSingletonSettings.removalMargin,
classicSettings.coordinatorSingletonSettings.handOverRetryInterval),
classicSettings.coordinatorSingletonSettings.handOverRetryInterval,
classicSettings.coordinatorSingletonSettings.leaseSettings),
leaseSettings = classicSettings.leaseSettings)
}

Expand Down Expand Up @@ -99,7 +100,8 @@ object ClusterShardingSettings {
settings.coordinatorSingletonSettings.singletonName,
settings.coordinatorSingletonSettings.role,
settings.coordinatorSingletonSettings.removalMargin,
settings.coordinatorSingletonSettings.handOverRetryInterval),
settings.coordinatorSingletonSettings.handOverRetryInterval,
settings.coordinatorSingletonSettings.leaseSettings),
leaseSettings = settings.leaseSettings)

}
Expand Down Expand Up @@ -449,6 +451,8 @@ final class ClusterShardingSettings(
def withShardRegionQueryTimeout(duration: java.time.Duration): ClusterShardingSettings =
copy(shardRegionQueryTimeout = duration.asScala)

def withLeaseSettings(leaseSettings: LeaseUsageSettings) = copy(leaseSettings = Option(leaseSettings))

/**
* The `role` of the `ClusterSingletonManagerSettings` is not used. The `role` of the
* coordinator singleton will be the same as the `role` of `ClusterShardingSettings`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,19 @@

package akka.cluster.typed

import scala.concurrent.duration._
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.duration.{ Duration, FiniteDuration, _ }

import com.typesafe.config.Config

import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props }
import akka.actor.typed.ExtensionSetup
import akka.actor.typed._
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.singleton.{
ClusterSingletonProxySettings,
ClusterSingletonManagerSettings => ClassicClusterSingletonManagerSettings
}
import akka.cluster.typed.internal.AdaptedClusterSingletonImpl
import akka.coordination.lease.LeaseUsageSettings
import akka.util.JavaDurationConverters._
import com.typesafe.config.Config

object ClusterSingletonSettings {
def apply(system: ActorSystem[_]): ClusterSingletonSettings =
Expand All @@ -40,7 +38,8 @@ object ClusterSingletonSettings {
proxySettings.singletonIdentificationInterval,
mgrSettings.removalMargin,
mgrSettings.handOverRetryInterval,
proxySettings.bufferSize)
proxySettings.bufferSize,
mgrSettings.leaseSettings)
}
}

Expand All @@ -50,7 +49,19 @@ final class ClusterSingletonSettings(
val singletonIdentificationInterval: FiniteDuration,
val removalMargin: FiniteDuration,
val handOverRetryInterval: FiniteDuration,
val bufferSize: Int) {
val bufferSize: Int,
val leaseSettings: Option[LeaseUsageSettings]) {

// bin compat for 2.6.14
@deprecated("Use constructor with leaseSettings", "2.6.15")
def this(
role: Option[String],
dataCenter: Option[DataCenter],
singletonIdentificationInterval: FiniteDuration,
removalMargin: FiniteDuration,
handOverRetryInterval: FiniteDuration,
bufferSize: Int) =
this(role, dataCenter, singletonIdentificationInterval, removalMargin, handOverRetryInterval, bufferSize, None)

def withRole(role: String): ClusterSingletonSettings = copy(role = Some(role))

Expand All @@ -61,37 +72,43 @@ final class ClusterSingletonSettings(
def withNoDataCenter(): ClusterSingletonSettings = copy(dataCenter = None)

def withRemovalMargin(removalMargin: FiniteDuration): ClusterSingletonSettings = copy(removalMargin = removalMargin)

def withRemovalMargin(removalMargin: java.time.Duration): ClusterSingletonSettings =
withRemovalMargin(removalMargin.asScala)

def withHandoverRetryInterval(handOverRetryInterval: FiniteDuration): ClusterSingletonSettings =
copy(handOverRetryInterval = handOverRetryInterval)

def withHandoverRetryInterval(handOverRetryInterval: java.time.Duration): ClusterSingletonSettings =
withHandoverRetryInterval(handOverRetryInterval.asScala)

def withBufferSize(bufferSize: Int): ClusterSingletonSettings = copy(bufferSize = bufferSize)

def withLeaseSettings(leaseSettings: LeaseUsageSettings) = copy(leaseSettings = Option(leaseSettings))

private def copy(
role: Option[String] = role,
dataCenter: Option[DataCenter] = dataCenter,
singletonIdentificationInterval: FiniteDuration = singletonIdentificationInterval,
removalMargin: FiniteDuration = removalMargin,
handOverRetryInterval: FiniteDuration = handOverRetryInterval,
bufferSize: Int = bufferSize) =
bufferSize: Int = bufferSize,
leaseSettings: Option[LeaseUsageSettings] = leaseSettings) =
new ClusterSingletonSettings(
role,
dataCenter,
singletonIdentificationInterval,
removalMargin,
handOverRetryInterval,
bufferSize)
bufferSize,
leaseSettings)

/**
* INTERNAL API:
*/
@InternalApi
private[akka] def toManagerSettings(singletonName: String): ClassicClusterSingletonManagerSettings =
new ClassicClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval)
new ClassicClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval, leaseSettings)

/**
* INTERNAL API:
Expand All @@ -112,7 +129,7 @@ final class ClusterSingletonSettings(
}

override def toString =
s"ClusterSingletonSettings($role, $dataCenter, $singletonIdentificationInterval, $removalMargin, $handOverRetryInterval, $bufferSize)"
s"ClusterSingletonSettings($role, $dataCenter, $singletonIdentificationInterval, $removalMargin, $handOverRetryInterval, $bufferSize, $leaseSettings)"
}

object ClusterSingleton extends ExtensionId[ClusterSingleton] {
Expand Down Expand Up @@ -217,12 +234,19 @@ object ClusterSingletonManagerSettings {
* Create settings from a configuration with the same layout as
* the default configuration `akka.cluster.singleton`.
*/
def apply(config: Config): ClusterSingletonManagerSettings =
def apply(config: Config): ClusterSingletonManagerSettings = {
val lease = config.getString("use-lease") match {
case s if s.isEmpty => None
case leaseConfigPath =>
Some(new LeaseUsageSettings(leaseConfigPath, config.getDuration("lease-retry-interval").asScala))
}
new ClusterSingletonManagerSettings(
singletonName = config.getString("singleton-name"),
role = roleOption(config.getString("role")),
removalMargin = Duration.Zero, // defaults to ClusterSettings.DownRemovalMargin
handOverRetryInterval = config.getDuration("hand-over-retry-interval", MILLISECONDS).millis)
handOverRetryInterval = config.getDuration("hand-over-retry-interval", MILLISECONDS).millis,
lease)
}

/**
* Java API: Create settings from the default configuration
Expand All @@ -245,30 +269,38 @@ object ClusterSingletonManagerSettings {
}

/**
* @param singletonName The actor name of the child singleton actor.
*
* @param role Singleton among the nodes tagged with specified role.
* If the role is not specified it's a singleton among all nodes in
* the cluster.
*
* @param removalMargin Margin until the singleton instance that belonged to
* a downed/removed partition is created in surviving partition. The purpose of
* this margin is that in case of a network partition the singleton actors
* in the non-surviving partitions must be stopped before corresponding actors
* are started somewhere else. This is especially important for persistent
* actors.
*
* @param singletonName The actor name of the child singleton actor.
* @param role Singleton among the nodes tagged with specified role.
* If the role is not specified it's a singleton among all nodes in
* the cluster.
* @param removalMargin Margin until the singleton instance that belonged to
* a downed/removed partition is created in surviving partition. The purpose of
* this margin is that in case of a network partition the singleton actors
* in the non-surviving partitions must be stopped before corresponding actors
* are started somewhere else. This is especially important for persistent
* actors.
* @param handOverRetryInterval When a node is becoming oldest it sends hand-over
* request to previous oldest, that might be leaving the cluster. This is
* retried with this interval until the previous oldest confirms that the hand
* over has started or the previous oldest member is removed from the cluster
* (+ `removalMargin`).
* request to previous oldest, that might be leaving the cluster. This is
* retried with this interval until the previous oldest confirms that the hand
* over has started or the previous oldest member is removed from the cluster
* (+ `removalMargin`).
* @param leaseSettings LeaseSettings for acquiring before creating the singleton actor
*/
final class ClusterSingletonManagerSettings(
val singletonName: String,
val role: Option[String],
val removalMargin: FiniteDuration,
val handOverRetryInterval: FiniteDuration) {
val handOverRetryInterval: FiniteDuration,
val leaseSettings: Option[LeaseUsageSettings]) {

// bin compat for 2.6.14
@deprecated("Use constructor with leaseSettings", "2.6.15")
def this(
singletonName: String,
role: Option[String],
removalMargin: FiniteDuration,
handOverRetryInterval: FiniteDuration) =
this(singletonName, role, removalMargin, handOverRetryInterval, None)

def withSingletonName(name: String): ClusterSingletonManagerSettings = copy(singletonName = name)

Expand All @@ -279,20 +311,25 @@ final class ClusterSingletonManagerSettings(

def withRemovalMargin(removalMargin: FiniteDuration): ClusterSingletonManagerSettings =
copy(removalMargin = removalMargin)

def withRemovalMargin(removalMargin: java.time.Duration): ClusterSingletonManagerSettings =
withRemovalMargin(removalMargin.asScala)

def withHandOverRetryInterval(retryInterval: FiniteDuration): ClusterSingletonManagerSettings =
copy(handOverRetryInterval = retryInterval)

def withHandOverRetryInterval(retryInterval: java.time.Duration): ClusterSingletonManagerSettings =
withHandOverRetryInterval(retryInterval.asScala)

def withLeaseSettings(leaseSettings: LeaseUsageSettings) = copy(leaseSettings = Option(leaseSettings))

private def copy(
singletonName: String = singletonName,
role: Option[String] = role,
removalMargin: FiniteDuration = removalMargin,
handOverRetryInterval: FiniteDuration = handOverRetryInterval): ClusterSingletonManagerSettings =
new ClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval)
handOverRetryInterval: FiniteDuration = handOverRetryInterval,
leaseSettings: Option[LeaseUsageSettings] = leaseSettings): ClusterSingletonManagerSettings =
new ClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval, leaseSettings)
}

object ClusterSingletonSetup {
Expand Down

0 comments on commit a19e73a

Please sign in to comment.