Skip to content
Permalink
Browse files

removed DependentService from misk-zookeeper

  • Loading branch information...
keeferrourke committed May 25, 2019
1 parent 3304c61 commit 16dfbfac90c572d1e660b89a4290584e4ad45773
@@ -1,8 +1,6 @@
package misk.zookeeper.testing

import com.google.common.util.concurrent.Service
import com.google.inject.Key
import misk.DependentService
import misk.ServiceModule
import misk.clustering.zookeeper.ZookeeperConfig
import misk.config.AppName
import misk.inject.KAbstractModule
@@ -31,7 +29,8 @@ class ZkTestModule(

install(ZookeeperModule(config, qualifier))

multibind<Service>().toInstance(StartZookeeperService(qualifier))
install(ServiceModule<StartZookeeperService>(qualifier))
bind(keyOf<StartZookeeperService>(qualifier)).toInstance(StartZookeeperService())
val curator = getProvider(keyOf<CuratorFramework>(qualifier))
bind(keyOf<ZkClientFactory>(qualifier)).toProvider(object : Provider<ZkClientFactory> {
@Inject @AppName private lateinit var app: String
@@ -44,12 +43,7 @@ class ZkTestModule(
/**
* The same zookeeper instance is used for all zookeeper bindings to speed up tests.
*/
private class StartZookeeperService constructor(
qualifier: KClass<out Annotation>?
) : CachedTestService(), DependentService {
override val consumedKeys: Set<Key<*>> = setOf()
override val producedKeys: Set<Key<*>> = setOf(keyOf<StartZookeeperService>(qualifier))

private class StartZookeeperService : CachedTestService() {
override fun actualStartup() {
sharedZookeeper.start()
}
@@ -1,15 +1,15 @@
package misk.clustering.zookeeper

import com.google.common.util.concurrent.Service
import com.google.inject.Key
import com.google.inject.Provides
import misk.ServiceModule
import misk.clustering.ClusterService
import misk.clustering.lease.LeaseManager
import misk.concurrent.ExecutorServiceModule
import misk.inject.KAbstractModule
import misk.inject.asSingleton
import misk.inject.toKey
import misk.tasks.RepeatedTaskQueue
import misk.zookeeper.CuratorFrameworkModule
import misk.zookeeper.ZkService
import java.time.Clock
import java.util.concurrent.ExecutorService
import javax.inject.Singleton
@@ -41,8 +41,10 @@ class ZkLeaseCommonModule(private val config: ZookeeperConfig) : KAbstractModule
*/
internal class ZkLeaseManagerModule : KAbstractModule() {
override fun configure() {
multibind<Service>().to<ZkLeaseManager>()
multibind<Service>().to(RepeatedTaskQueue::class.toKey(ForZkLease::class)).asSingleton()
install(ServiceModule<ZkLeaseManager>()
.dependsOn<ZkService>(ForZkLease::class)
.dependsOn<ClusterService>())
install(ServiceModule<RepeatedTaskQueue>(ForZkLease::class))
bind<LeaseManager>().to<ZkLeaseManager>()
}
}
@@ -2,17 +2,14 @@ package misk.clustering.zookeeper

import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.AbstractExecutionThreadService
import misk.DependentService
import misk.clustering.Cluster
import misk.clustering.lease.LeaseManager
import misk.config.AppName
import misk.inject.keyOf
import misk.logging.getLogger
import misk.tasks.RepeatedTaskQueue
import misk.tasks.Result
import misk.tasks.Status
import misk.zookeeper.SERVICES_NODE
import misk.zookeeper.ZkService
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.state.ConnectionStateListener
import java.time.Duration
@@ -35,10 +32,7 @@ internal class ZkLeaseManager @Inject internal constructor(
@ForZkLease private val taskQueue: RepeatedTaskQueue,
internal val cluster: Cluster,
@ForZkLease curator: CuratorFramework
) : AbstractExecutionThreadService(), LeaseManager, DependentService {
override val consumedKeys = setOf(keyOf<ZkService>(ForZkLease::class), keyOf<Cluster>())
override val producedKeys = setOf(ZkLeaseCommonModule.leaseManagerKey)

) : AbstractExecutionThreadService(), LeaseManager {
internal val leaseNamespace = "$SERVICES_NODE/${appName.asZkNamespace}/leases"
internal val client = lazy { curator.usingNamespace(leaseNamespace) }

@@ -1,6 +1,7 @@
package misk.zookeeper

import com.google.common.util.concurrent.Service
import misk.ServiceModule
import misk.clustering.zookeeper.ZookeeperConfig
import misk.inject.KAbstractModule
import misk.inject.asSingleton
@@ -32,14 +33,13 @@ class CuratorFrameworkModule(
) : KAbstractModule() {
override fun configure() {
val ensembleProvider = getProvider(keyOf<EnsembleProvider>(qualifier))
bind(keyOf<CuratorFramework>(qualifier)).toProvider(
CuratorFrameworkProvider(config, ensembleProvider)).asSingleton()
bind(keyOf<CuratorFramework>(qualifier))
.toProvider(CuratorFrameworkProvider(config, ensembleProvider))
.asSingleton()
val curator = getProvider(keyOf<CuratorFramework>(qualifier))
bind(keyOf<ZkService>(qualifier)).toProvider(object : Provider<ZkService> {
override fun get(): ZkService {
return ZkService(curator.get(), qualifier)
}
}).asSingleton()
multibind<Service>().to(keyOf<ZkService>(qualifier))
bind(keyOf<ZkService>(qualifier))
.toProvider(Provider<ZkService> { ZkService(curator.get()) })
.asSingleton()
install(ServiceModule<ZkService>(qualifier))
}
}
@@ -1,21 +1,12 @@
package misk.zookeeper

import com.google.common.util.concurrent.AbstractIdleService
import com.google.inject.Key
import misk.DependentService
import misk.inject.keyOf
import misk.logging.getLogger
import org.apache.curator.framework.CuratorFramework
import kotlin.reflect.KClass

class ZkService internal constructor(
private val curatorFramework: CuratorFramework,
qualifier: KClass<out Annotation>?
) : AbstractIdleService(), DependentService {

override val consumedKeys: Set<Key<*>> = setOf()
override val producedKeys: Set<Key<*>> = setOf(keyOf<ZkService>(qualifier))

private val curatorFramework: CuratorFramework
) : AbstractIdleService() {
override fun startUp() {
log.info { "starting connection to zookeeper" }
curatorFramework.start()
@@ -1,6 +1,7 @@
package misk.clustering.zookeeper

import com.google.inject.Provides
import com.google.inject.Singleton
import misk.clustering.fake.FakeClusterModule
import misk.concurrent.ExplicitReleaseDelayQueue
import misk.config.AppName
@@ -19,7 +20,7 @@ internal class ZkLeaseTestModule : KAbstractModule() {

}

@Provides @ForZkLease
@Provides @ForZkLease @Singleton
fun provideTaskQueue(
clock: Clock,
@ForZkLease delayQueue: ExplicitReleaseDelayQueue<DelayedTask>
@@ -0,0 +1,6 @@
package misk.clustering

import com.google.common.util.concurrent.Service

/** Marker interface for the toBeEnhanced that produces a [Cluster]. */
interface ClusterService : Service
@@ -1,9 +1,6 @@
package misk.clustering

import com.google.common.util.concurrent.AbstractExecutionThreadService
import com.google.inject.Key
import misk.DependentService
import misk.inject.keyOf
import misk.logging.getLogger
import org.eclipse.jetty.util.BlockingArrayQueue
import java.util.concurrent.atomic.AtomicBoolean
@@ -21,7 +18,7 @@ internal class DefaultCluster(
self: Cluster.Member,
private val newResourceMapperFn: (members: Set<Cluster.Member>) -> ClusterResourceMapper =
{ ClusterHashRing(it) }
) : AbstractExecutionThreadService(), Cluster, DependentService {
) : AbstractExecutionThreadService(), Cluster, ClusterService {
private val snapshotRef = AtomicReference<Cluster.Snapshot>(Cluster.Snapshot(
self = self,
selfReady = false,
@@ -32,8 +29,6 @@ internal class DefaultCluster(
private val actions = BlockingArrayQueue<(MutableSet<ClusterWatch>) -> Unit>()

override val snapshot: Cluster.Snapshot get() = snapshotRef.get()
override val consumedKeys: Set<Key<*>> = setOf()
override val producedKeys: Set<Key<*>> = setOf(keyOf<Cluster>())

/**
* Runs the internal event loop that handles requests to add watches or cluster changes. We use
@@ -1,7 +1,7 @@
package misk.clustering.fake

import misk.DependentService
import misk.clustering.Cluster
import misk.clustering.ClusterService
import misk.clustering.ClusterWatch
import misk.clustering.DefaultCluster
import misk.logging.getLogger
@@ -22,8 +22,7 @@ import javax.inject.Singleton
class FakeCluster internal constructor(
val resourceMapper: ExplicitClusterResourceMapper,
private val delegate: DefaultCluster
) : Cluster by delegate, DependentService by delegate {

) : ClusterService by delegate, Cluster by delegate {
constructor(resourceMapper: ExplicitClusterResourceMapper) :
this(resourceMapper, DefaultCluster(self) { resourceMapper })

@@ -1,13 +1,15 @@
package misk.clustering.fake

import com.google.common.util.concurrent.Service
import misk.ServiceModule
import misk.clustering.Cluster
import misk.clustering.ClusterService
import misk.inject.KAbstractModule

/** [FakeClusterModule] installs fake implementations of the clustering primitives for use in tests */
class FakeClusterModule : KAbstractModule() {
override fun configure() {
bind<Cluster>().to<FakeCluster>()
multibind<Service>().to<FakeCluster>()
bind<ClusterService>().to<FakeCluster>()
install(ServiceModule<ClusterService>())
}
}
@@ -1,7 +1,8 @@
package misk.clustering.kubernetes

import com.google.common.util.concurrent.Service
import misk.ServiceModule
import misk.clustering.Cluster
import misk.clustering.ClusterService
import misk.clustering.DefaultCluster
import misk.inject.KAbstractModule
import misk.inject.asSingleton
@@ -11,8 +12,10 @@ class KubernetesClusterModule(private val config: KubernetesConfig) : KAbstractM
override fun configure() {
bind<KubernetesConfig>().toInstance(config)
bind<Cluster>().to<DefaultCluster>()
bind<ClusterService>().to<DefaultCluster>()
bind<DefaultCluster>().toProvider(KubernetesClusterProvider::class.java).asSingleton()
multibind<Service>().to<KubernetesClusterWatcher>()
multibind<Service>().to<DefaultCluster>()
install(ServiceModule<KubernetesClusterWatcher>()
.dependsOn<ClusterService>())
install(ServiceModule<ClusterService>())
}
}
@@ -3,19 +3,16 @@ package misk.clustering.kubernetes
import com.google.common.base.Stopwatch
import com.google.common.reflect.TypeToken
import com.google.common.util.concurrent.AbstractIdleService
import com.google.inject.Key
import io.kubernetes.client.apis.CoreV1Api
import io.kubernetes.client.models.V1Pod
import io.kubernetes.client.util.Config
import io.kubernetes.client.util.Watch
import misk.DependentService
import misk.backoff.ExponentialBackoff
import misk.clustering.Cluster
import misk.clustering.DefaultCluster
import misk.clustering.kubernetes.KubernetesClusterWatcher.Companion.CHANGE_TYPE_ADDED
import misk.clustering.kubernetes.KubernetesClusterWatcher.Companion.CHANGE_TYPE_DELETED
import misk.clustering.kubernetes.KubernetesClusterWatcher.Companion.CHANGE_TYPE_MODIFIED
import misk.inject.keyOf
import misk.logging.getLogger
import java.time.Duration
import java.util.concurrent.TimeUnit
@@ -34,13 +31,10 @@ import kotlin.concurrent.thread
internal class KubernetesClusterWatcher @Inject internal constructor(
private val cluster: DefaultCluster,
private val config: KubernetesConfig
) : AbstractIdleService(), DependentService {
) : AbstractIdleService() {
private val running = AtomicBoolean(false)
private val watchFailedTimer = Stopwatch.createUnstarted()

override val consumedKeys: Set<Key<*>> = setOf(keyOf<Cluster>())
override val producedKeys: Set<Key<*>> = setOf()

override fun startUp() {
log.info { "starting k8s cluster watch" }
running.set(true)

0 comments on commit 16dfbfa

Please sign in to comment.
You can’t perform that action at this time.