Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Define custom lease name for sharding and singleton #32122

Merged
merged 3 commits into from Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -810,6 +810,10 @@ object ClusterShardingSettings {
* snapshot plugin is used. Note that this is not related to persistence used by the entity
* actors.
* @param tuningParameters additional tuning parameters, see descriptions in reference.conf
* @param leaseSettings LeaseSettings for acquiring before creating the shard.
* Note that if you define a custom lease name and have several sharding entity types each one must have a unique
* lease name. If the lease name is undefined it will be derived from ActorSystem name and shard name,
* but that may result in too long lease names.
*/
final class ClusterShardingSettings(
val numberOfShards: Int,
Expand Down Expand Up @@ -1011,6 +1015,11 @@ final class ClusterShardingSettings(
def withShardRegionQueryTimeout(duration: java.time.Duration): ClusterShardingSettings =
copy(shardRegionQueryTimeout = duration.asScala)

/**
* Note that if you define a custom lease name and have several sharding entity types each one must have a unique
* lease name. If the lease name is undefined it will be derived from ActorSystem name and shard name,
* but that may result in too long lease names.
*/
def withLeaseSettings(leaseSettings: LeaseUsageSettings) = copy(leaseSettings = Option(leaseSettings))

/**
Expand Down
Expand Up @@ -95,7 +95,13 @@ object ClusterShardingSettings {

val lease = config.getString("use-lease") match {
case s if s.isEmpty => None
case other => Some(new LeaseUsageSettings(other, config.getDuration("lease-retry-interval").asScala))
case other =>
Some(
new LeaseUsageSettings(
other,
config.getDuration("lease-retry-interval").asScala,
leaseName = "" // intentionally not in config because would be high risk of not using unique names
))
}

new ClusterShardingSettings(
Expand Down Expand Up @@ -1113,6 +1119,10 @@ object ClusterShardingSettings {
* @param passivationStrategySettings settings for automatic passivation strategy, see descriptions in reference.conf
* @param tuningParameters additional tuning parameters, see descriptions in reference.conf
* @param shardRegionQueryTimeout the timeout for querying a shard region, see descriptions in reference.conf
* @param leaseSettings LeaseSettings for acquiring before creating the shard.
* Note that if you define a custom lease name and have several sharding entity types each one must have a unique
* lease name. If the lease name is undefined it will be derived from ActorSystem name and shard name,
* but that may result in too long lease names.
*/
final class ClusterShardingSettings(
val role: Option[String],
Expand Down Expand Up @@ -1311,6 +1321,11 @@ final class ClusterShardingSettings(
def withShardRegionQueryTimeout(duration: java.time.Duration): ClusterShardingSettings =
copy(shardRegionQueryTimeout = duration.asScala)

/**
* Note that if you define a custom lease name and have several sharding entity types each one must have a unique
* lease name. If the lease name is undefined it will be derived from ActorSystem name and shard name,
* but that may result in too long lease names.
*/
def withLeaseSettings(leaseSettings: LeaseUsageSettings): ClusterShardingSettings =
copy(leaseSettings = Some(leaseSettings))

Expand Down
Expand Up @@ -469,12 +469,13 @@ private[akka] class Shard(
context.system.scheduler.scheduleWithFixedDelay(interval, interval, self, PassivateIntervalTick)
}

private val lease = settings.leaseSettings.map(
ls =>
LeaseProvider(context.system).getLease(
s"${context.system.name}-shard-$typeName-$shardId",
ls.leaseImplementation,
Cluster(context.system).selfAddress.hostPort))
private val lease = settings.leaseSettings.map { ls =>
val leaseName =
if (ls.leaseName.isEmpty) s"${context.system.name}-shard-$typeName-$shardId"
else s"${ls.leaseName}-$shardId"
LeaseProvider(context.system)
.getLease(leaseName, ls.leaseImplementation, Cluster(context.system).selfAddress.hostPort)
}

private val leaseRetryInterval = settings.leaseSettings match {
case Some(l) => l.leaseRetryInterval
Expand Down
6 changes: 6 additions & 0 deletions akka-cluster-tools/src/main/resources/reference.conf
Expand Up @@ -89,6 +89,12 @@ akka.cluster.singleton {

# The interval between retries for acquiring the lease
lease-retry-interval = 5s

# Custom lease name. Note that if you have several singletons each one must have a unique
# lease name, which can be defined with the leaseSettings of ClusterSingletonSettings.
# If undefined it will be derived from ActorSystem name and singleton actor path,
# but that may result in too long lease names.
lease-name = ""
}
# //#singleton-config

Expand Down
Expand Up @@ -62,7 +62,12 @@ object ClusterSingletonManagerSettings {
val lease = config.getString("use-lease") match {
case s if s.isEmpty => None
case leaseConfigPath =>
Some(new LeaseUsageSettings(leaseConfigPath, config.getDuration("lease-retry-interval").asScala))
Some(
new LeaseUsageSettings(
leaseConfigPath,
config.getDuration("lease-retry-interval").asScala,
leaseName = "" // intentionally not in config because would be high risk of not using unique names
))
}
new ClusterSingletonManagerSettings(
singletonName = config.getString("singleton-name"),
Expand Down Expand Up @@ -112,7 +117,11 @@ object ClusterSingletonManagerSettings {
* over has started or the previous oldest member is removed from the cluster
* (+ `removalMargin`).
*
* @param leaseSettings LeaseSettings for acquiring before creating the singleton actor
* @param leaseSettings LeaseSettings for acquiring before creating the singleton actor.
* Note that if you define a custom lease name and have several singletons each
* one must have a unique lease name. If the lease name is undefined it will be
* derived from ActorSystem name and singleton actor path, but that may result in
* too long lease names.
*/
final class ClusterSingletonManagerSettings(
val singletonName: String,
Expand Down Expand Up @@ -143,6 +152,11 @@ final class ClusterSingletonManagerSettings(
def withHandOverRetryInterval(retryInterval: FiniteDuration): ClusterSingletonManagerSettings =
copy(handOverRetryInterval = retryInterval)

/**
* Note that if you define a custom lease name and have several singletons each one must have a unique
* lease name. If the lease name is undefined it will be derived from ActorSystem name and singleton
* actor path, but that may result in too long lease names.
*/
def withLeaseSettings(leaseSettings: LeaseUsageSettings): ClusterSingletonManagerSettings =
copy(leaseSettings = Some(leaseSettings))

Expand Down Expand Up @@ -496,14 +510,14 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
role.forall(cluster.selfRoles.contains),
s"This cluster member [${cluster.selfAddress}] doesn't have the role [$role]")

private val singletonLeaseName = s"${context.system.name}-singleton-${self.path}"

override val log: MarkerLoggingAdapter = Logging.withMarker(context.system, this)

val lease: Option[Lease] = settings.leaseSettings.map(
settings =>
LeaseProvider(context.system)
.getLease(singletonLeaseName, settings.leaseImplementation, cluster.selfAddress.hostPort))
val lease: Option[Lease] = settings.leaseSettings.map { settings =>
val leaseName =
if (settings.leaseName.isEmpty) s"${context.system.name}-singleton-${self.path}"
else settings.leaseName
LeaseProvider(context.system).getLease(leaseName, settings.leaseImplementation, cluster.selfAddress.hostPort)
}
val leaseRetryInterval: FiniteDuration = settings.leaseSettings match {
case Some(s) => s.leaseRetryInterval
case None => 5.seconds // won't be used
Expand Down
Expand Up @@ -85,6 +85,11 @@ final class ClusterSingletonSettings(

def withBufferSize(bufferSize: Int): ClusterSingletonSettings = copy(bufferSize = bufferSize)

/**
* Note that if you define a custom lease name and have several singletons each one must have a unique
* lease name. If the lease name is undefined it will be derived from ActorSystem name and singleton
* actor path, but that may result in too long lease names.
*/
def withLeaseSettings(leaseSettings: LeaseUsageSettings) = copy(leaseSettings = Option(leaseSettings))

private def copy(
Expand Down Expand Up @@ -239,7 +244,12 @@ object ClusterSingletonManagerSettings {
val lease = config.getString("use-lease") match {
case s if s.isEmpty => None
case leaseConfigPath =>
Some(new LeaseUsageSettings(leaseConfigPath, config.getDuration("lease-retry-interval").asScala))
Some(
new LeaseUsageSettings(
leaseConfigPath,
config.getDuration("lease-retry-interval").asScala,
leaseName = "" // intentionally not in config because would be high risk of not using unique names
))
}
new ClusterSingletonManagerSettings(
singletonName = config.getString("singleton-name"),
Expand Down Expand Up @@ -285,7 +295,11 @@ object ClusterSingletonManagerSettings {
* retried with this interval until the previous oldest confirms that the hand
* over has started or the previous oldest member is removed from the cluster
* (+ `removalMargin`).
* @param leaseSettings LeaseSettings for acquiring before creating the singleton actor
* @param leaseSettings LeaseSettings for acquiring before creating the singleton actor.
* Note that if you define a custom lease name and have several singletons each
* one must have a unique lease name. If the lease name is undefined it will be
* derived from ActorSystem name and singleton actor path, but that may result in
* too long lease names.
*/
final class ClusterSingletonManagerSettings(
val singletonName: String,
Expand Down Expand Up @@ -322,6 +336,11 @@ final class ClusterSingletonManagerSettings(
def withHandOverRetryInterval(retryInterval: java.time.Duration): ClusterSingletonManagerSettings =
withHandOverRetryInterval(retryInterval.asScala)

/**
* Note that if you define a custom lease name and have several singletons each one must have a unique
* lease name. If the lease name is undefined it will be derived from ActorSystem name and singleton
* actor path, but that may result in too long lease names.
*/
def withLeaseSettings(leaseSettings: LeaseUsageSettings) = copy(leaseSettings = Option(leaseSettings))

private def copy(
Expand Down
Expand Up @@ -9,7 +9,18 @@ import scala.concurrent.duration.FiniteDuration
import akka.util.JavaDurationConverters._
import akka.util.PrettyDuration._

final class LeaseUsageSettings private[akka] (val leaseImplementation: String, val leaseRetryInterval: FiniteDuration) {
/**
* Note that if you define a custom lease name and have several Cluster Singletons or Cluster Sharding
* entity types each one must have a unique lease name. If the lease name is undefined it will be derived
* from ActorSystem name and other component names, but that may result in too long lease names.
*/
final class LeaseUsageSettings private[akka] (
val leaseImplementation: String,
val leaseRetryInterval: FiniteDuration,
val leaseName: String) {
def this(leaseImplementation: String, leaseRetryInterval: FiniteDuration) =
this(leaseImplementation, leaseRetryInterval, leaseName = "")

def getLeaseRetryInterval(): java.time.Duration = leaseRetryInterval.asJava

override def toString = s"LeaseUsageSettings($leaseImplementation, ${leaseRetryInterval.pretty})"
Expand Down