Permalink
Browse files

Make ClusterConfig compatible with newer Jackson.

Switch from @JsonProperty to @BeanProperty to make compatible w/ newer
versions of Jackson. Use setter names which match the names of the
underlying fields. Update example and README to match.
  • Loading branch information...
1 parent f58bb04 commit 1d35a2361f399d2af6b3fbc4fb64bb80be74deef Philip K. Warren committed Mar 5, 2014
View
15 Example.scala
@@ -17,29 +17,26 @@
import java.util.Random
import java.util.concurrent.CountDownLatch
import com.boundary.ordasity.{Cluster, ClusterConfig, SmartListener}
-import com.codahale.logula.Logging
import com.yammer.metrics.scala.Meter
import com.twitter.common.zookeeper.ZooKeeperClient
import java.util.concurrent.{ScheduledThreadPoolExecutor, TimeUnit, ScheduledFuture}
import java.util.{HashMap, TimerTask}
-Logging.configure
-
val random = new Random()
val latch = new CountDownLatch(1)
val pool = new ScheduledThreadPoolExecutor(1)
val futures = new HashMap[String, ScheduledFuture[_]]
-val config = new ClusterConfig().
+val config = ClusterConfig.builder().
setHosts("localhost:2181").
- setAutoRebalance(true).
- setRebalanceInterval(15).
- useSmartBalancing(true).
+ setEnableAutoRebalance(true).
+ setAutoRebalanceInterval(15).
+ setUseSmartBalancing(true).
setDrainTime(3).
- setZKTimeout(3).
+ setZkTimeout(3).
setUseSoftHandoff(true).
- setNodeId(java.util.UUID.randomUUID().toString)
+ setNodeId(java.util.UUID.randomUUID().toString).build()
val listener = new SmartListener {
def onJoin(client: ZooKeeperClient) = {}
View
12 Readme.md
@@ -62,7 +62,7 @@ Let's get started with an example. Here's how to build a clustered service in 25
def onLeave() { }
}
- val config = new ClusterConfig().setHosts("localhost:2181")
+ val config = ClusterConfig.builder().setHosts("localhost:2181").build()
val cluster = new Cluster("ServiceName", listener, config)
cluster.join()
@@ -168,10 +168,10 @@ Ordasity supports automatic and manual rebalancing to even out the cluster's loa
To trigger a manual rebalance on all nodes, touch "/service-name/meta/rebalance" in Zookeeper. However, automatic rebalancing is preferred. To enable it, just turn it on in your cluster config:
```scala
- val config = new ClusterConfig().
+ val config = ClusterConfig.builder().
setHosts("localhost:2181").
setAutoRebalance(true).
- setRebalanceInterval(60 * 60) // One hour
+ setRebalanceInterval(60 * 60).build() // One hour
```
As a masterless service, the rebalance process is handled uncoordinated by the node itself. The rebalancing logic is very simple. If a node has more than its fair share of work when a rebalance is triggered, it will drain or release this work to other nodes in the cluster. As the cluster sees this work become available, lighter-loaded nodes will claim it (or receive handoff) and begin processing.
@@ -217,7 +217,7 @@ The *drainToCount* and *drainToLoad* strategies invoked by a rebalance will rele
Ordasity allows you to configure the period of time for a drain to complete:
```scala
- val config = new ClusterConfig().setHosts("localhost:2181").setDrainTime(60) // 60 Seconds
+ val config = ClusterConfig.builder().setHosts("localhost:2181").setDrainTime(60).build() // 60 Seconds
```
When a drain is initiated, Ordasity will pace the release of work units over the time specified. If 15 work units were to be released over a 60-second period, the library would release one every four seconds.
@@ -232,10 +232,10 @@ When Handoff is enabled, Ordasity will allow another node to begin processing fo
To enable it, just turn it on in your ClusterConfig:
```scala
- val clusterConfig = new ClusterConfig().
+ val clusterConfig = ClusterConfig.builder().
setHosts("localhost:2181").
setUseSoftHandoff(true).
- setHandoffShutdownDelay(10) // Seconds
+ setHandoffShutdownDelay(10).build() // Seconds
```
The handoff process is fairly straightforward. When a node has decided to release a work unit (either due to a rebalance or because it is being drained for shutdown), it creates an entry in Zookeeper at /service-name/handoff-requests. Following their count-based or load-based claiming policies, other nodes will claim the work being handed off by creating an entry at /service-name/handoff-results.
View
82 src/main/scala/com/boundary/ordasity/ClusterConfig.scala
@@ -17,78 +17,86 @@
package com.boundary.ordasity
import java.net.InetAddress
-import org.codehaus.jackson.annotate.JsonProperty
+import scala.reflect.BeanProperty
class ClusterConfig {
// Defaults
- @JsonProperty var hosts = ""
- @JsonProperty var enableAutoRebalance = true
- @JsonProperty var autoRebalanceInterval = 60
- @JsonProperty var drainTime = 60
- @JsonProperty var useSmartBalancing = false
- @JsonProperty var zkTimeout = 3000
- @JsonProperty var workUnitName = "work-units"
- @JsonProperty var workUnitShortName = "work"
- @JsonProperty var nodeId = InetAddress.getLocalHost.getHostName
- @JsonProperty var useSoftHandoff = false
- @JsonProperty var handoffShutdownDelay = 10
-
- def setHosts(to: String) : ClusterConfig = {
- hosts = to
+ @BeanProperty var hosts = ""
+ @BeanProperty var enableAutoRebalance = true
+ @BeanProperty var autoRebalanceInterval = 60
+ @BeanProperty var drainTime = 60
+ @BeanProperty var useSmartBalancing = false
+ @BeanProperty var zkTimeout = 3000
+ @BeanProperty var workUnitName = "work-units"
+ @BeanProperty var workUnitShortName = "work"
+ @BeanProperty var nodeId = InetAddress.getLocalHost.getHostName
+ @BeanProperty var useSoftHandoff = false
+ @BeanProperty var handoffShutdownDelay = 10
+
+}
+
+object ClusterConfig {
+ def builder() = new ClusterConfigBuilder(new ClusterConfig)
+}
+
+class ClusterConfigBuilder(config: ClusterConfig) {
+ def setHosts(hosts: String) : ClusterConfigBuilder = {
+ config.hosts = hosts
this
}
- def setAutoRebalance(to: Boolean) : ClusterConfig = {
- enableAutoRebalance = to
+ def setEnableAutoRebalance(enableAutoRebalance: Boolean) : ClusterConfigBuilder = {
+ config.enableAutoRebalance = enableAutoRebalance
this
}
- def setRebalanceInterval(to: Int) : ClusterConfig = {
- autoRebalanceInterval = to
+ def setAutoRebalanceInterval(autoRebalanceInterval: Int) : ClusterConfigBuilder = {
+ config.autoRebalanceInterval = autoRebalanceInterval
this
}
- def setZKTimeout(to: Int) : ClusterConfig = {
- zkTimeout = to
+ def setZkTimeout(zkTimeout: Int) : ClusterConfigBuilder = {
+ config.zkTimeout = zkTimeout
this
}
- def useSmartBalancing(to : Boolean) = setUseSmartBalancing(to)
-
- def setUseSmartBalancing(to: Boolean) : ClusterConfig = {
- useSmartBalancing = to
+ def setUseSmartBalancing(useSmartBalancing: Boolean) : ClusterConfigBuilder = {
+ config.useSmartBalancing = useSmartBalancing
this
}
- def setDrainTime(to: Int) : ClusterConfig = {
- drainTime = to
+ def setDrainTime(drainTime: Int) : ClusterConfigBuilder = {
+ config.drainTime = drainTime
this
}
- def setWorkUnitName(to: String) : ClusterConfig = {
- workUnitName = to
+ def setWorkUnitName(workUnitName: String) : ClusterConfigBuilder = {
+ config.workUnitName = workUnitName
this
}
- def setWorkUnitShortName(to: String) : ClusterConfig = {
- workUnitShortName = to
+ def setWorkUnitShortName(workUnitShortName: String) : ClusterConfigBuilder = {
+ config.workUnitShortName = workUnitShortName
this
}
- def setNodeId(to: String) : ClusterConfig = {
- nodeId = to
+ def setNodeId(nodeId: String) : ClusterConfigBuilder = {
+ config.nodeId = nodeId
this
}
- def setUseSoftHandoff(to: Boolean) : ClusterConfig = {
- useSoftHandoff = to
+ def setUseSoftHandoff(useSoftHandoff: Boolean) : ClusterConfigBuilder = {
+ config.useSoftHandoff = useSoftHandoff
this
}
- def setHandoffShutdownDelay(to: Int) : ClusterConfig = {
- handoffShutdownDelay = to
+ def setHandoffShutdownDelay(handoffShutdownDelay: Int) : ClusterConfigBuilder = {
+ config.handoffShutdownDelay = handoffShutdownDelay
this
}
+ def build() : ClusterConfig = {
+ config
+ }
}
View
44 src/main/scala/com/boundary/ordasity/examples/MyService.scala
@@ -1,44 +0,0 @@
-//
-// Copyright 2011-2012, Boundary
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-//
-
-package com.boundary.ordasity
-
-import com.yammer.metrics.scala.Meter
-import com.twitter.common.zookeeper.ZooKeeperClient
-
-class MyService {
- val config = new ClusterConfig().
- setHosts("localhost:2181").
- setAutoRebalance(true).
- setRebalanceInterval(60).
- setUseSmartBalancing(true).
- setDrainTime(60).
- setZKTimeout(3000)
-
- val cluster = new Cluster("ServiceName", listener, config)
-
- val listener = new SmartListener {
- def onJoin(client: ZooKeeperClient) = null // You are *in*, baby.
-
- def startWork(workUnit: String, meter: Meter) = null // Do yer thang, mark dat meter.
-
- def shutdownWork(workUnit: String) = null // Stop doin' that thang
-
- def onLeave() = null
- }
-
- cluster.join()
-}
View
22 src/test/scala/com/boundary/ordasity/ClusterConfigSpec.scala
@@ -41,17 +41,17 @@ class ClusterConfigSpec extends Spec {
@Test def `test mutators` {
- new ClusterConfig().setHosts("foo").hosts.must(be("foo"))
- new ClusterConfig().setAutoRebalance(false).enableAutoRebalance.must(be(false))
- new ClusterConfig().setRebalanceInterval(10000).autoRebalanceInterval.must(be(10000))
- new ClusterConfig().setZKTimeout(333).zkTimeout.must(be(333))
- new ClusterConfig().setUseSmartBalancing(true).useSmartBalancing.must(be(true))
- new ClusterConfig().setDrainTime(100).drainTime.must(be(100))
- new ClusterConfig().setWorkUnitName("tacos").workUnitName.must(be("tacos"))
- new ClusterConfig().setWorkUnitShortName("taquitos").workUnitShortName.must(be("taquitos"))
- new ClusterConfig().setNodeId("skelter").nodeId.must(be("skelter"))
- new ClusterConfig().setUseSoftHandoff(true).useSoftHandoff.must(be(true))
- new ClusterConfig().setHandoffShutdownDelay(90).handoffShutdownDelay.must(be(90))
+ ClusterConfig.builder().setHosts("foo").build().hosts.must(be("foo"))
+ ClusterConfig.builder().setEnableAutoRebalance(false).build().enableAutoRebalance.must(be(false))
+ ClusterConfig.builder().setAutoRebalanceInterval(10000).build().autoRebalanceInterval.must(be(10000))
+ ClusterConfig.builder().setZkTimeout(333).build().zkTimeout.must(be(333))
+ ClusterConfig.builder().setUseSmartBalancing(true).build().useSmartBalancing.must(be(true))
+ ClusterConfig.builder().setDrainTime(100).build().drainTime.must(be(100))
+ ClusterConfig.builder().setWorkUnitName("tacos").build().workUnitName.must(be("tacos"))
+ ClusterConfig.builder().setWorkUnitShortName("taquitos").build().workUnitShortName.must(be("taquitos"))
+ ClusterConfig.builder().setNodeId("skelter").build().nodeId.must(be("skelter"))
+ ClusterConfig.builder().setUseSoftHandoff(true).build().useSoftHandoff.must(be(true))
+ ClusterConfig.builder().setHandoffShutdownDelay(90).build().handoffShutdownDelay.must(be(90))
}
}
}
View
6 src/test/scala/com/boundary/ordasity/ClusterSpec.scala
@@ -31,11 +31,11 @@ import com.boundary.logula.Logging
class ClusterSpec extends Spec with Logging {
val id = UUID.randomUUID().toString
- val config = new ClusterConfig().
+ val config = ClusterConfig.builder().
setNodeId("testNode").
- setRebalanceInterval(1).
+ setAutoRebalanceInterval(1).
setDrainTime(1).
- setHosts("no_existe:2181")
+ setHosts("no_existe:2181").build()
val mockClusterListener = mock[Listener]
val cluster = new Cluster(id, mockClusterListener, config)
View
6 src/test/scala/com/boundary/ordasity/balancing/BalancingPolicySpec.scala
@@ -35,12 +35,12 @@ class DummyBalancingPolicy(cluster: Cluster, config: ClusterConfig)
class BalancingPolicySpec extends Spec {
- val config = new ClusterConfig().
+ val config = ClusterConfig.builder().
setNodeId("testNode").
- setRebalanceInterval(1).
+ setAutoRebalanceInterval(1).
setDrainTime(1).
setHosts("no_existe:2181").
- setAutoRebalance(false)
+ setEnableAutoRebalance(false).build()
class `Base Balancing Policy Tests` {
View
6 src/test/scala/com/boundary/ordasity/balancing/CountBalancingPolicySpec.scala
@@ -26,12 +26,12 @@ import com.simple.simplespec.Spec
class CountBalancingPolicySpec extends Spec {
- val config = new ClusterConfig().
+ val config = ClusterConfig.builder().
setNodeId("testNode").
- setRebalanceInterval(1).
+ setAutoRebalanceInterval(1).
setDrainTime(1).
setHosts("no_existe:2181").
- setAutoRebalance(false)
+ setEnableAutoRebalance(false).build()
class `Count Balancing Policy` {
View
6 src/test/scala/com/boundary/ordasity/balancing/MeteredBalancingPolicySpec.scala
@@ -28,12 +28,12 @@ import com.simple.simplespec.Spec
class MeteredBalancingPolicySpec extends Spec {
- val config = new ClusterConfig().
+ val config = ClusterConfig.builder().
setNodeId("testNode").
- setRebalanceInterval(1).
+ setAutoRebalanceInterval(1).
setDrainTime(1).
setHosts("no_existe:2181").
- setAutoRebalance(false)
+ setEnableAutoRebalance(false).build()
class `Metered Balancing Policy` {
View
6 src/test/scala/com/boundary/ordasity/listeners/ClusterNodesChangedListenerSpec.scala
@@ -23,11 +23,11 @@ import com.simple.simplespec.Spec
class ClusterNodesChangedListenerSpec extends Spec {
- val config = new ClusterConfig().
+ val config = ClusterConfig.builder().
setNodeId("testNode").
- setRebalanceInterval(1).
+ setAutoRebalanceInterval(1).
setDrainTime(1).
- setHosts("no_existe:2181")
+ setHosts("no_existe:2181").build()
class `Cluster Nodes Changed Listener` {
View
6 src/test/scala/com/boundary/ordasity/listeners/HandoffResultsListenerSpec.scala
@@ -32,12 +32,12 @@ import com.simple.simplespec.Spec
class HandoffResultsListenerSpec extends Spec {
- val config = new ClusterConfig().
+ val config = ClusterConfig.builder().
setNodeId("testNode").
- setRebalanceInterval(1).
+ setAutoRebalanceInterval(1).
setDrainTime(1).
setHosts("no_existe:2181").
- setHandoffShutdownDelay(1)
+ setHandoffShutdownDelay(1).build()
class `Handoff Results Listener` {
View
6 src/test/scala/com/boundary/ordasity/listeners/VerifyIntegrityListenerSpec.scala
@@ -27,11 +27,11 @@ import com.simple.simplespec.Spec
class VerifyIntegrityListenerSpec extends Spec {
- val config = new ClusterConfig().
+ val config = ClusterConfig.builder().
setNodeId("testNode").
- setRebalanceInterval(1).
+ setAutoRebalanceInterval(1).
setDrainTime(1).
- setHosts("no_existe:2181")
+ setHosts("no_existe:2181").build()
class `Verify Integrity Listener` {

0 comments on commit 1d35a23

Please sign in to comment.