Skip to content
Permalink
Browse files

Release leases if the cluster weight is 0 (#1048)

This allows applications to run in an Active-Passive mode. If the
current weight for a cluster is 0, leases are not acquired or released
if already acquired. This allows an application to only run leased
background tasks in the Active region.

This iteration requires misk users to provide their own implementation
for ClusterWeightProvider. SQ services will use a global ZK impl, which
we could consider contributing as well in the future.
  • Loading branch information...
ryanhall07 committed Jun 10, 2019
1 parent a8caecb commit 615cc7b4598a547c2a69f281d06a5bdd759086ed
@@ -2,6 +2,7 @@ package misk.clustering.zookeeper

import misk.clustering.NoMembersAvailableException
import misk.clustering.lease.Lease
import misk.clustering.weights.ClusterWeightProvider
import misk.logging.getLogger
import org.apache.zookeeper.CreateMode
import org.apache.zookeeper.KeeperException
@@ -37,6 +38,7 @@ internal class ZkLease(
ownerName: String,
private val manager: ZkLeaseManager,
private val leaseResourceName: String,
private val clusterWeight: ClusterWeightProvider,
override val name: String
) : Lease {

@@ -203,6 +205,7 @@ internal class ZkLease(
null
}
return desiredLeaseOwner?.name == clusterSnapshot.self.name
&& clusterWeight.get() > 0
}

/** @return true if the lease node exists in zk */
@@ -233,4 +236,4 @@ internal class ZkLease(
companion object {
private val log = getLogger<ZkLease>()
}
}
}
@@ -4,6 +4,7 @@ import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.AbstractExecutionThreadService
import misk.clustering.Cluster
import misk.clustering.lease.LeaseManager
import misk.clustering.weights.ClusterWeightProvider
import misk.config.AppName
import misk.logging.getLogger
import misk.tasks.RepeatedTaskQueue
@@ -31,7 +32,8 @@ internal class ZkLeaseManager @Inject internal constructor(
@AppName appName: String,
@ForZkLease private val taskQueue: RepeatedTaskQueue,
internal val cluster: Cluster,
@ForZkLease curator: CuratorFramework
@ForZkLease curator: CuratorFramework,
private val clusterWeight : ClusterWeightProvider
) : AbstractExecutionThreadService(), LeaseManager {
internal val leaseNamespace = "$SERVICES_NODE/${appName.asZkNamespace}/leases"
internal val client = lazy { curator.usingNamespace(leaseNamespace) }
@@ -107,7 +109,7 @@ internal class ZkLeaseManager @Inject internal constructor(

override fun requestLease(name: String): ZkLease {
return leases.computeIfAbsent(name) {
ZkLease(ownerName, this, "$leaseNamespace/$name", name)
ZkLease(ownerName, this, "$leaseNamespace/$name", clusterWeight, name)
}
}

@@ -6,6 +6,7 @@ import misk.MiskTestingServiceModule
import misk.clustering.Cluster
import misk.clustering.fake.FakeCluster
import misk.clustering.lease.Lease
import misk.clustering.weights.FakeClusterWeight
import misk.mockito.Mockito
import misk.testing.MiskTest
import misk.testing.MiskTestModule
@@ -34,6 +35,7 @@ internal class ZkLeaseTest {
@Inject lateinit var cluster: FakeCluster
@Inject lateinit var leaseManager: ZkLeaseManager
@Inject @ForZkLease lateinit var curator: CuratorFramework
@Inject lateinit var clusterWeight : FakeClusterWeight
lateinit var leaseNamespace: String
lateinit var leasePath: String

@@ -83,6 +85,18 @@ internal class ZkLeaseTest {
assertThat(curator.checkExists().forPath(leasePath.asZkPath)).isNull()
}

@Test fun releasesLeaseAfterClusterWeightChanges() {
cluster.resourceMapper.addMapping(leasePath, self)

val lease = leaseManager.requestLease(LEASE_NAME)
assertThat(lease.checkHeld()).isTrue()

clusterWeight.setClusterWeight(0)
leaseManager.checkAllLeases()

assertThat(lease.checkHeld()).isFalse()
}

@Test fun releasesAcquiredLeaseIfMappingChangesAwayFromSelf() {
cluster.resourceMapper.addMapping(leasePath, self)

@@ -3,6 +3,8 @@ package misk.clustering.zookeeper
import com.google.inject.Provides
import com.google.inject.Singleton
import misk.clustering.fake.FakeClusterModule
import misk.clustering.weights.FakeClusterWeight
import misk.clustering.weights.FakeClusterWeightModule
import misk.concurrent.ExplicitReleaseDelayQueue
import misk.config.AppName
import misk.inject.KAbstractModule
@@ -17,7 +19,7 @@ internal class ZkLeaseTestModule : KAbstractModule() {
install(FakeClusterModule())
install(ZkTestModule(ForZkLease::class))
install(ZkLeaseManagerModule())

install(FakeClusterWeightModule())
}

@Provides @ForZkLease @Singleton
@@ -0,0 +1,22 @@
package misk.clustering.weights

import misk.inject.KAbstractModule

/**
* A static [ClusterWeightProvider] that always returns 100
*/
class ActiveClusterWeight : ClusterWeightProvider {

override fun get(): Int {
return 100
}
}

/**
* Provides an [ActiveClusterWeight]
*/
class ActiveClusterWeightModule : KAbstractModule() {
override fun configure() {
bind<ClusterWeightProvider>().to<ActiveClusterWeight>()
}
}
@@ -0,0 +1,15 @@
package misk.clustering.weights

/**
* Provides the current weight assigned to the cluster.
*
* A weight value is between 0 and 100 to indicate how much traffic a cluster should handle.
* Typically an active-passive setup has 1 active cluster with 100 and 1 passive cluster with 0.
*
* If your application does not require dynamic cluster weights, you can use the static
* [ActiveClusterWeight].
*/
interface ClusterWeightProvider {

fun get(): Int
}
@@ -0,0 +1,30 @@
package misk.clustering.weights

import misk.inject.KAbstractModule

/**
* A [ClusterWeightProvider] for testing
*/
class FakeClusterWeight : ClusterWeightProvider {

private var weight = 100

override fun get(): Int {
return weight
}

fun setClusterWeight(weight : Int) {
this.weight = weight
}
}

/**
* Provides a [FakeClusterWeight] for testing
*/
class FakeClusterWeightModule : KAbstractModule() {
override fun configure() {
val fake = FakeClusterWeight()
bind<FakeClusterWeight>().toInstance(fake)
bind<ClusterWeightProvider>().toInstance(fake)
}
}

0 comments on commit 615cc7b

Please sign in to comment.
You can’t perform that action at this time.