Skip to content
This repository
Browse code

Initial release of Ordasity 0.1.8

  • Loading branch information...
commit 7d49623376e669b527926a6702a7d68e2f1c5d65 0 parents
authored
7  .gitignore
... ...
@@ -0,0 +1,7 @@
  1
+.DS_Store
  2
+target/
  3
+build/
  4
+*.class
  5
+*.ipr
  6
+*.iml
  7
+*.iws
49  Example.scala
... ...
@@ -0,0 +1,49 @@
  1
+import java.util.Random
  2
+import java.util.concurrent.CountDownLatch
  3
+import com.boundary.ordasity.{Cluster, ClusterConfig, SmartListener}
  4
+import com.codahale.logula.Logging
  5
+import com.yammer.metrics.Meter
  6
+import com.twitter.zookeeper.ZooKeeperClient
  7
+import java.util.concurrent.{ScheduledThreadPoolExecutor, TimeUnit, ScheduledFuture}
  8
+import java.util.{HashMap, TimerTask}
  9
+
  10
+Logging.configure
  11
+
  12
+val random = new Random()
  13
+val latch = new CountDownLatch(1)
  14
+val pool = new ScheduledThreadPoolExecutor(1)
  15
+
  16
+val futures = new HashMap[String, ScheduledFuture[_]]
  17
+
  18
+val config = new ClusterConfig("localhost:2181").
  19
+  setAutoRebalance(true).
  20
+  setRebalanceInterval(15).
  21
+  useSmartBalancing(true).
  22
+  setDrainTime(3).
  23
+  setZKTimeout(3).
  24
+  setUseSoftHandoff(true).
  25
+  setNodeId(java.util.UUID.randomUUID().toString)
  26
+
  27
+val listener = new SmartListener {
  28
+  def onJoin(client: ZooKeeperClient) = {}
  29
+  def onLeave() = {}
  30
+
  31
+  // Do yer thang, mark dat meter.
  32
+  def startWork(workUnit: String, meter: Meter) = {
  33
+    val task = new TimerTask {
  34
+      def run() = meter.mark(random.nextInt(1000))
  35
+    }
  36
+	  val future = pool.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS)
  37
+	  futures.put(workUnit, future)
  38
+  }
  39
+
  40
+  // Stop doin' that thang
  41
+  def shutdownWork(workUnit: String) {
  42
+    futures.get(workUnit).cancel(true)
  43
+  }
  44
+}
  45
+
  46
+val clustar = new Cluster("example_service", listener, config)
  47
+
  48
+clustar.join()
  49
+latch.await()
224  Readme.md
Source Rendered
... ...
@@ -0,0 +1,224 @@
  1
+# Ordasity
  2
+
  3
+## Table of Contents
  4
+1. Overview, Use Cases, and Features
  5
+2. A Clustered Service in 30 Seconds
  6
+3. In Action at Boundary
  7
+4. Distribution / Coordination Strategy
  8
+5. Rebalancing
  9
+6. Draining and Handoff
  10
+7. Wrapping Up
  11
+
  12
+
  13
+## Building Stateful Clustered Services on the JVM
  14
+
  15
+Ordasity is a library designed to make building and deploying reliable clustered services on the JVM as straightforward as possible. It's written in Scala and uses Zookeeper for coordination.
  16
+
  17
+Ordasity's simplicity and flexibility allows us to quickly write, deploy, and (most importantly) operate distributed systems on the JVM without duplicating distributed "glue" code or revisiting complex reasoning about distribution strategies.
  18
+
  19
+---
  20
+
  21
+### Primary Use Cases
  22
+
  23
+Ordasity is designed to spread persistent or long-lived workloads across several machines. It's a toolkit for building systems which can be described in terms of individual nodes serving a partition or shard of a cluster's total load.
  24
+
  25
+---
  26
+
  27
+### Features
  28
+- Cluster membership (joining / leaving / mutual awareness)
  29
+- Work claiming and distribution
  30
+- Load-based workload balancing
  31
+- Count-based workload balancing
  32
+- Automatic periodic rebalancing
  33
+- Graceful cluster exiting ("draining")
  34
+- Graceful handoff of work units between nodes
  35
+- Pegging of work units to a specific node
  36
+
  37
+---
  38
+
  39
+### A Clustered Service in 30 Seconds
  40
+
  41
+Let's get started with an example. Here's how to build a clustered service in 25 lines of code with Ordasity:
  42
+
  43
+    import com.yammer.metrics.Meter
  44
+    import com.twitter.zookeeper.ZooKeeperClient
  45
+    import com.boundary.ordasity.{Cluster, ClusterConfig, SmartListener}
  46
+
  47
+    class MyService {
  48
+      val config = new ClusterConfig("localhost:2181")
  49
+      val cluster = new Cluster("ServiceName", listener, config)
  50
+
  51
+      val listener = new SmartListener {
  52
+		
  53
+		// Called after successfully joining the cluster.
  54
+        def onJoin(client: ZooKeeperClient) { } 
  55
+
  56
+        // Do yer thang, mark that meter.
  57
+        def startWork(workUnit: String, meter: Meter) { }
  58
+
  59
+        // Stop doin' that thang.
  60
+        def shutdownWork(workUnit: String) { }
  61
+
  62
+		// Called after leaving the cluster.
  63
+        def onLeave() { }
  64
+      }
  65
+
  66
+      cluster.join()
  67
+    }
  68
+
  69
+---
  70
+
  71
+### In Action at Boundary
  72
+
  73
+At Boundary, the library holds together our pubsub and event stream processing systems. It's a critical part of ensuring that at any moment, we're consuming and aggregating data from our network of collectors at one tier, and processing this data at hundreds of megabits a second in another. Ordasity also helps keep track of the mappings between these services, wiring everything together for us behind the scenes.
  74
+
  75
+Ordasity's distribution enables us to spread the work of our pubsub aggregation and event stream processing systems across any number of nodes. Automatic load balancing keeps the cluster's workload evenly distributed, with nodes handing off work to others as workload changes. Graceful draining and handoff allows us to iterate rapidly on these systems, continously deploying updates without disrupting operation of the cluster. Ordasity's membership and work claiming approach ensures transparent failover within a couple seconds if a node becomes unavailable due to a network partition or system failure.
  76
+
  77
+---
  78
+
  79
+### Distribution / Coordination Strategy
  80
+
  81
+Ordasity's architecture is masterless, relying on Zookeeper only for coordination between individual nodes. The service is designed around the principle that many nodes acting together under a common set of rules can cooperatively form a self-organizing, self-regulating system.
  82
+
  83
+Ordasity supports two work claiming strategies: "simple" (count-based), and "smart" (load-based).
  84
+
  85
+#### Count-Based Distribution
  86
+The count-based distribution strategy is simple. When in effect, each node in the cluster will attempt to claim its fair share of available work units according to the following formula:
  87
+
  88
+      val maxToClaim = {
  89
+        if (allWorkUnits.size <= 1) allWorkUnits.size
  90
+        else (allWorkUnits.size / nodeCount.toDouble).ceil
  91
+      }
  92
+
  93
+If zero or one work units are present, the node will attempt to claim up to one work unit. Otherwise, the node will attempt to claim up to the number of work units divided by the number of active nodes.
  94
+
  95
+#### Load-Based Distribution
  96
+Ordasity's load-based distribution strategy assumes that all work units are not equal. It's unlikely that balancing simply by count will result in an even load distribution -- some nodes would probably end up much busier than others. The load-based strategy is smarter. It divides up work based on the amount of actual "work" done.
  97
+
  98
+
  99
+##### Meters Measure Load
  100
+When you enable smart balancing and initialize Ordasity with a SmartListener, you get back a "meter" to mark when work occurs. Here's a simple, contrived example:
  101
+
  102
+    val listener = new SmartListener {
  103
+      ...
  104
+      def startWork(workUnit: String, meter: Meter) = {
  105
+
  106
+        val somethingOrOther = new Runnable() {
  107
+          def run() {
  108
+            while (true) {
  109
+              val processingAmount = process(workUnit)
  110
+              meter.mark(processingAmount)
  111
+              Thread.sleep(100)
  112
+            }
  113
+          }
  114
+        }
  115
+
  116
+        new Thread(somethingOrOther).start()
  117
+      }
  118
+  
  119
+      ...
  120
+    }
  121
+
  122
+Ordasity uses this meter to determine how much "work" each work unit in the cluster represents. If the application were a database or frontend to a data service, you might mark the meter each time a query is performed. In a messaging system, you'd mark it each time a message is sent or received. In an event stream processing system, you'd mark it each time an event is processed. You get the idea.
  123
+
  124
+*(Bonus: Each of these meters expose their metrics via JMX, providing you and your operations team with insight into what's happening when your service is in production).*
  125
+
  126
+##### Knowing the Load Lets us Balance
  127
+Ordasity checks the meters once per minute (configurable) and updates this information in Zookeeper. The "load map" determines the actual load represented by each work unit. All nodes watch the cluster's "load map" and are notified via Zookeeper's Atomic Broadcast mechanism when this changes. Each node in the cluster will attempt to claim its fair share of available work units according to the following formula:
  128
+
  129
+    def evenDistribution() : Double = {
  130
+      loadMap.values.sum / activeNodeSize().toDouble
  131
+    }
  132
+
  133
+As the number of nodes or the load of individual work units change, each node's idea of an "even distribution" changes as well. Using this "even distribution" value, each node will choose to claim additional work, or in the event of a rebalance, drain its workload to other nodes if it's processing more than its fair share.
  134
+
  135
+---
  136
+
  137
+### Rebalancing
  138
+
  139
+Ordasity supports automatic and manual rebalancing to even out the cluster's load distribution as workloads change.
  140
+
  141
+To trigger a manual rebalance on all nodes, touch "/service-name/meta/rebalance" in Zookeeper. However, automatic rebalancing is preferred. To enable it, just turn it on in your cluster config:
  142
+
  143
+    val config = new ClusterConfig("localhost:2181").
  144
+      setAutoRebalance(true).
  145
+      setRebalanceInterval(60 * 60) // One hour
  146
+
  147
+As a masterless service, the rebalance process is handled uncoordinated by the node itself. The rebalancing logic is very simple. If a node has more than its fair share of work when a rebalance is triggered, it will drain or release this work to other nodes in the cluster. As the cluster sees this work become available, lighter-loaded nodes will claim it (or receive handoff) and begin processing.
  148
+
  149
+If you're using **count-based distribution**, it looks like this:
  150
+
  151
+    def simpleRebalance() {
  152
+      val target = fairShare()
  153
+
  154
+      if (myWorkUnits.size > target) {
  155
+        log.info("Simple Rebalance triggered. Load: %s. Target: %s.",  myWorkUnits.size, target)
  156
+        drainToCount(target)
  157
+      }
  158
+    }
  159
+
  160
+If you're using **load-based distribution**, it looks like this:
  161
+
  162
+    def smartRebalance() {
  163
+      val target = evenDistribution()
  164
+
  165
+      if (myLoad() > target) {
  166
+        log.info("Smart Rebalance triggered. Load: %s. Target: %s", myLoad(), target)
  167
+        drainToLoad(target.longValue)
  168
+      }
  169
+    }
  170
+
  171
+---
  172
+
  173
+### Draining and Handoff
  174
+
  175
+To avoid dumping a bucket of work on an already-loaded cluster at once, Ordasity supports "draining." Draining is a process by which a node can gradually release work to other nodes in the cluster. In addition to draining, Ordasity also supports graceful handoff, allowing for a period of overlap during which a new node can begin serving a work unit before the previous owner shuts it down.
  176
+
  177
+#### Draining
  178
+
  179
+Ordasity's work claiming strategies (count-based and load-based) have internal counterparts for releasing work: *drainToLoad* and *drainToCount*.
  180
+
  181
+The *drainToCount* and *drainToLoad* strategies invoked by a rebalance will release work units until the node's load is just greater than its fair share. That is to say, each node is "generous" in that it will strive to maintain slightly greater than a mathematically even distribution of work to guard against a scenario where work units are caught in a cycle of being claimed, released, and reclaimed continually. (Similarly, both claiming strategies will attempt to claim one unit beyond their fair share to avoid a scenario in which a work unit is claimed by no one).
  182
+
  183
+Ordasity allows you to configure the period of time for a drain to complete: 
  184
+
  185
+    val config = new ClusterConfig("localhost:2181").setDrainTime(60) // 60 Seconds
  186
+
  187
+When a drain is initiated, Ordasity will pace the release of work units over the time specified. If 15 work units were to be released over a 60-second period, the library would release one every four seconds.
  188
+
  189
+Whether you're using count-based or load-based distribution, the drain process is the same. Ordasity makes a list of work units to unclaim, then paces their release over the configured drain time.
  190
+
  191
+Draining is especially useful for scheduled maintenance and deploys. Ordasity exposes a "shutdown" method via JMX. When invoked, the node will set its status to "Draining," cease claiming new work, and release all existing work to other nodes in the cluster over the configured interval before exiting the cluster.
  192
+
  193
+#### Handoff
  194
+When Handoff is enabled, Ordasity will allow another node to begin processing for a work unit before the former owner shuts it down. This eliminates the very brief gap between one node releasing and another node claiming a work unit. Handoff ensures that at any point, a work unit is being served.
  195
+
  196
+To enable it, just turn it on in your ClusterConfig:
  197
+
  198
+    val clusterConfig = new ClusterConfig("localhost:2181").
  199
+      setUseSoftHandoff(true).
  200
+      setHandoffShutdownDelay(10) // Seconds
  201
+
  202
+The handoff process is fairly straightforward. When a node has decided to release a work unit (either due to a rebalance or because it is being drained for shutdown), it creates an entry in Zookeeper at /service-name/handoff-requests. Following their count-based or load-based claiming policies, other nodes will claim the work being handed off by creating an entry at /service-name/handoff-results.
  203
+
  204
+When a node has successfully accepted handoff by creating this entry, the new owner will begin work. The successful "handoff-results" entry signals to the original owner that handoff has occurred and that it is free to cease processing after a configurable overlap (default: 10 seconds). After this time, Ordasity will call the "shutdownWork" method on your listener.
  205
+
  206
+---
  207
+
  208
+### Wrapping Up
  209
+
  210
+So, that's Ordasity! We hope you enjoy using it to build reliable distributed services quickly.
  211
+
  212
+#### Questions
  213
+If you have any questions, please feel free to shoot us an e-mail or get in touch on Twitter.
  214
+
  215
+#### Bug Reports and Contributions
  216
+Think you've found a bug? Sorry about that. Please open an issue on GitHub and we'll check it out as soon as possible.
  217
+
  218
+Want to contribute to Ordasity? Awesome! Fork the repo, make your changes, and issue a pull request. Please make effort to keep commits small, clean, and confined to specific changes. If you'd like to propose a new feature, give us a heads-up by getting in touch beforehand. We'd like to talk with you.
  219
+
  220
+
  221
+
  222
+
  223
+
  224
+
187  pom.xml
... ...
@@ -0,0 +1,187 @@
  1
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3
+    <modelVersion>4.0.0</modelVersion>
  4
+
  5
+    <groupId>com.boundary</groupId>
  6
+    <artifactId>ordasity</artifactId>
  7
+    <version>0.1.8</version>
  8
+
  9
+    <name>Ordasity</name>
  10
+    <url>http://www.boundary.com</url>
  11
+    <packaging>jar</packaging>
  12
+
  13
+    <properties>
  14
+        <scala.version>2.8.1</scala.version>
  15
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  16
+    </properties>
  17
+
  18
+    <dependencies>
  19
+
  20
+        <dependency>
  21
+            <groupId>com.twitter</groupId>
  22
+            <artifactId>zookeeper-client</artifactId>
  23
+            <version>2.0.0-p01</version>
  24
+        </dependency>
  25
+
  26
+        <dependency>
  27
+            <groupId>com.codahale</groupId>
  28
+            <artifactId>logula_2.8.1</artifactId>
  29
+            <version>2.1.2</version>
  30
+        </dependency>
  31
+
  32
+        <dependency>
  33
+            <groupId>com.boundary</groupId>
  34
+            <artifactId>overlock_2.8.1</artifactId>
  35
+            <version>0.5.1</version>
  36
+        </dependency>
  37
+
  38
+        <dependency>
  39
+            <groupId>com.codahale</groupId>
  40
+            <artifactId>jerkson_${scala.version}</artifactId>
  41
+            <version>0.4.0</version>
  42
+        </dependency>
  43
+
  44
+        <dependency>
  45
+            <groupId>org.codehaus.jackson</groupId>
  46
+            <artifactId>jackson-core-asl</artifactId>
  47
+            <version>1.8.4</version>
  48
+        </dependency>
  49
+
  50
+        <dependency>
  51
+            <groupId>org.codehaus.jackson</groupId>
  52
+            <artifactId>jackson-mapper-asl</artifactId>
  53
+            <version>1.8.4</version>
  54
+        </dependency>
  55
+
  56
+        <dependency>
  57
+            <groupId>org.apache.zookeeper</groupId>
  58
+            <artifactId>zookeeper</artifactId>
  59
+            <version>3.3.2</version>
  60
+        </dependency>
  61
+    </dependencies>
  62
+
  63
+    <build>
  64
+        <sourceDirectory>src/main/scala</sourceDirectory>
  65
+        <testSourceDirectory>src/test/scala</testSourceDirectory>
  66
+
  67
+        <plugins>
  68
+            <plugin>
  69
+                <groupId>org.scala-tools</groupId>
  70
+                <artifactId>maven-scala-plugin</artifactId>
  71
+                <version>2.15.2</version>
  72
+                <executions>
  73
+                    <execution>
  74
+                        <goals>
  75
+                            <goal>compile</goal>
  76
+                            <goal>testCompile</goal>
  77
+                        </goals>
  78
+                    </execution>
  79
+                </executions>
  80
+                <configuration>
  81
+                    <args>
  82
+                        <arg>-optimise</arg>
  83
+                        <arg>-unchecked</arg>
  84
+                        <arg>-deprecation</arg>
  85
+                    </args>
  86
+                    <charset>UTF-8</charset>
  87
+                    <scalaVersion>${scala.version}</scalaVersion>
  88
+                </configuration>
  89
+            </plugin>
  90
+            <plugin>
  91
+                <groupId>org.apache.maven.plugins</groupId>
  92
+                <artifactId>maven-compiler-plugin</artifactId>
  93
+                <version>2.3.2</version>
  94
+                <configuration>
  95
+                    <source>1.6</source>
  96
+                    <target>1.6</target>
  97
+                </configuration>
  98
+            </plugin>
  99
+
  100
+            <plugin>
  101
+                <groupId>org.apache.maven.plugins</groupId>
  102
+                <artifactId>maven-shade-plugin</artifactId>
  103
+                <version>1.4</version>
  104
+                <executions>
  105
+                    <execution>
  106
+                        <phase>package</phase>
  107
+                        <goals>
  108
+                            <goal>shade</goal>
  109
+                        </goals>
  110
+                        <configuration>
  111
+                            <artifactSet>
  112
+                                <excludes>
  113
+                                    <exclude>org.scala-tools.testing:specs_2.8.0</exclude>
  114
+                                    <exclude>com.twitter:json_2.8.0</exclude>
  115
+                                    <exclude>junit:junit</exclude>
  116
+                                    <exclude>org.scala-lang:scala-library</exclude>
  117
+                                </excludes>
  118
+                            </artifactSet>
  119
+                        </configuration>
  120
+                    </execution>
  121
+                </executions>
  122
+            </plugin>
  123
+
  124
+            <plugin>
  125
+                <groupId>org.apache.maven.plugins</groupId>
  126
+                <artifactId>maven-source-plugin</artifactId>
  127
+                <version>2.1.2</version>
  128
+                <executions>
  129
+                    <execution>
  130
+                        <id>attach-sources</id>
  131
+                        <phase>package</phase>
  132
+                        <goals>
  133
+                            <goal>jar-no-fork</goal>
  134
+                        </goals>
  135
+                    </execution>
  136
+                </executions>
  137
+            </plugin>
  138
+        </plugins>
  139
+    </build>
  140
+
  141
+    <repositories>
  142
+        <repository>
  143
+            <id>central</id> 
  144
+            <name>Maven repository</name> 
  145
+            <url>http://repo1.maven.org/maven2</url> 
  146
+        </repository>
  147
+        <repository>
  148
+            <id>scala-tools.org</id>
  149
+            <name>Scala-tools Maven2 Repository</name>
  150
+            <url>http://scala-tools.org/repo-releases</url>
  151
+        </repository>
  152
+        <repository>
  153
+            <id>apache</id>
  154
+            <name>apache</name>
  155
+            <url>http://people.apache.org/repo/m2-ibiblio-rsync-repository/</url>
  156
+        </repository>
  157
+        <repository>
  158
+            <id>testingscalatoolsorg</id>
  159
+            <name>testing.scala-tools.org</name>
  160
+            <url>http://scala-tools.org/repo-releases/testing/</url>
  161
+        </repository>
  162
+        <repository>
  163
+            <id>downloadjavanet</id>
  164
+            <name>download.java.net</name>
  165
+            <url>http://download.java.net/maven/2/</url>
  166
+        </repository>
  167
+        <repository>
  168
+            <id>coda</id>
  169
+            <name>Coda</name>
  170
+            <url>http://repo.codahale.com/</url>
  171
+        </repository>
  172
+        <repository>
  173
+            <id>boundary-public</id>
  174
+            <name>Boundary Public</name>
  175
+            <url>http://maven.boundary.com/artifactory/repo</url>
  176
+        </repository>
  177
+    </repositories>
  178
+
  179
+    <pluginRepositories>
  180
+        <pluginRepository>
  181
+            <id>scala-tools.org</id>
  182
+            <name>Scala-tools Maven2 Repository</name>
  183
+            <url>http://scala-tools.org/repo-releases</url>
  184
+        </pluginRepository>
  185
+    </pluginRepositories>
  186
+
  187
+</project>
706  src/main/scala/com/boundary/ordasity/Cluster.scala
... ...
@@ -0,0 +1,706 @@
  1
+package com.boundary.ordasity
  2
+
  3
+import com.codahale.jerkson.Json._
  4
+import com.codahale.logula.Logging
  5
+import com.twitter.zookeeper.ZooKeeperClient
  6
+import com.yammer.metrics.{Meter, Instrumented}
  7
+
  8
+import java.nio.charset.Charset
  9
+import overlock.atomicmap.AtomicMap
  10
+import scala.collection.JavaConversions._
  11
+import org.cliffc.high_scale_lib.NonBlockingHashSet
  12
+import java.util.concurrent.atomic.AtomicReference
  13
+import java.util.concurrent.{TimeUnit, ScheduledFuture, ScheduledThreadPoolExecutor}
  14
+import java.util.{HashSet, LinkedList, TimerTask, Set => JSet, Collection => JCollection}
  15
+
  16
+import javax.management.ObjectName
  17
+import java.lang.management.ManagementFactory
  18
+
  19
+trait ClusterMBean {
  20
+  def join() : String
  21
+  def shutdown()
  22
+}
  23
+
  24
+object NodeState extends Enumeration {
  25
+  type NodeState = Value
  26
+  val Fresh, Started, Draining, Shutdown = Value
  27
+}
  28
+
  29
+class Cluster(name: String, listener: Listener, config: ClusterConfig) extends ClusterMBean with Logging with Instrumented {
  30
+  val myNodeID = config.nodeId
  31
+
  32
+  ManagementFactory.getPlatformMBeanServer.registerMBean(this, new ObjectName(name + ":" + "name=Cluster"))
  33
+
  34
+  // Cluster, node, and work unit state
  35
+  private val nodes = AtomicMap.atomicNBHM[String, String]
  36
+  private val meters = AtomicMap.atomicNBHM[String, Meter]
  37
+  private val myWorkUnits = new NonBlockingHashSet[String]
  38
+  private val allWorkUnits = AtomicMap.atomicNBHM[String, String]
  39
+  private val workUnitMap = AtomicMap.atomicNBHM[String, String]
  40
+  private val handoffRequests = new HashSet[String]
  41
+  private val handoffResults = AtomicMap.atomicNBHM[String, String]
  42
+  private val loadMap = AtomicMap.atomicNBHM[String, Double]
  43
+  private val workUnitsPeggedToMe = new NonBlockingHashSet[String]
  44
+
  45
+  // Scheduled executions
  46
+  private val pool = new ScheduledThreadPoolExecutor(1)
  47
+  private var loadFuture : Option[ScheduledFuture[_]] = None
  48
+  private var autoRebalanceFuture : Option[ScheduledFuture[_]] = None
  49
+
  50
+  // Metrics
  51
+  val listGauge = metrics.gauge[String]("my_" + config.workUnitShortName) { myWorkUnits.mkString(", ") }
  52
+  val countGauge = metrics.gauge[Int]("my_" + config.workUnitShortName + "_count") { myWorkUnits.size }
  53
+  val loadGauge = metrics.gauge[Double]("my_load") { myLoad() }
  54
+
  55
+  private val state = new AtomicReference[NodeState.Value](NodeState.Fresh)
  56
+
  57
+  var zk : ZooKeeperClient = null
  58
+
  59
+  /**
  60
+   * Joins the cluster, claims work, and begins operation.
  61
+   */
  62
+  def join() : String = {
  63
+    state.get() match {
  64
+      case NodeState.Fresh    => zk = new ZooKeeperClient(config.hosts, config.zkTimeout, "/", onConnect(_))
  65
+      case NodeState.Shutdown => zk = new ZooKeeperClient(config.hosts, config.zkTimeout, "/", onConnect(_))
  66
+      case NodeState.Draining => log.warn("'join' called while draining; ignoring.")
  67
+      case NodeState.Started  => log.warn("'join' called after started; ignoring.")
  68
+    }
  69
+
  70
+    state.get().toString
  71
+  }
  72
+
  73
+  /**
  74
+   * Drains all work claimed by this node over the time period provided in the config
  75
+   * (default: 60 seconds), prevents it from claiming new work, and exits the cluster.
  76
+   */
  77
+  def shutdown() {
  78
+    if (state.get() == NodeState.Shutdown) return
  79
+
  80
+    if (loadFuture.isDefined) loadFuture.get.cancel(true)
  81
+    if (autoRebalanceFuture.isDefined) autoRebalanceFuture.get.cancel(true)
  82
+    log.info("Shutdown initiated; beginning drain...")
  83
+    setState(NodeState.Draining)
  84
+    drainToCount(0, true)
  85
+  }
  86
+
  87
+  def forceShutdown() {
  88
+    if (loadFuture.isDefined) loadFuture.get.cancel(true)
  89
+    if (autoRebalanceFuture.isDefined) autoRebalanceFuture.get.cancel(true)
  90
+    log.warn("Forcible shutdown initiated due to connection loss...")
  91
+    myWorkUnits.map(w => shutdownWork(w, true, false))
  92
+    myWorkUnits.clear()
  93
+    listener.onLeave()
  94
+  }
  95
+
  96
+  /**
  97
+   * Finalizes the shutdown sequence. Called once the drain operation completes.
  98
+   */
  99
+  private def completeShutdown() {
  100
+    setState(NodeState.Shutdown)
  101
+    try {
  102
+      zk.close()
  103
+    } catch {
  104
+      case e: Exception => log.warn(e, "Zookeeper reported exception on shutdown.")
  105
+    }
  106
+    listener.onLeave()
  107
+    state.set(NodeState.Fresh)
  108
+  }
  109
+
  110
+  /**
  111
+   * Primary callback which is triggered upon successful Zookeeper connection.
  112
+   */
  113
+  private def onConnect(client: ZooKeeperClient) {
  114
+    zk = client
  115
+
  116
+    if (state.get() != NodeState.Fresh)
  117
+      ensureCleanStartup()
  118
+
  119
+    log.info("Connected to Zookeeper (ID: %s).", myNodeID)
  120
+    zk.createPath(name + "/nodes")
  121
+    zk.createPath(config.workUnitName)
  122
+    zk.createPath(name + "/meta/rebalance")
  123
+    zk.createPath(name + "/meta/workload")
  124
+    zk.createPath(name + "/claimed-" + config.workUnitShortName)
  125
+    zk.createPath(name + "/handoff-requests")
  126
+    zk.createPath(name + "/handoff-result")
  127
+    joinCluster()
  128
+
  129
+    listener.onJoin(zk)
  130
+
  131
+    registerWatchers()
  132
+
  133
+    setState(NodeState.Started)
  134
+    claimWork()
  135
+    verifyIntegrity()
  136
+
  137
+    if (config.useSmartBalancing && listener.isInstanceOf[SmartListener])
  138
+      scheduleLoadTicks()
  139
+
  140
+    if (config.enableAutoRebalance)
  141
+      scheduleRebalancing()
  142
+  }
  143
+
  144
+  def ensureCleanStartup() {
  145
+    forceShutdown()
  146
+    nodes.clear()
  147
+    meters.clear()
  148
+    myWorkUnits.clear()
  149
+    allWorkUnits.clear()
  150
+    workUnitMap.clear()
  151
+    handoffRequests.clear()
  152
+    handoffResults.clear()
  153
+    loadMap.clear()
  154
+    workUnitsPeggedToMe.clear()
  155
+    state.set(NodeState.Fresh)
  156
+  }
  157
+
  158
+  /**
  159
+   * Schedules auto-rebalancing if auto-rebalancing is enabled. The task is
  160
+   * scheduled to run every 60 seconds by default, or according to the config.
  161
+   */
  162
+  private def scheduleRebalancing() {
  163
+    val runRebalance = new Runnable {
  164
+      def run() = rebalance()
  165
+    }
  166
+
  167
+    autoRebalanceFuture = Some(pool.scheduleAtFixedRate(runRebalance, config.autoRebalanceInterval,
  168
+      config.autoRebalanceInterval, TimeUnit.SECONDS))
  169
+  }
  170
+
  171
+  /**
  172
+   * Once a minute, pass off information about the amount of load generated per
  173
+   * work unit off to Zookeeper for use in the claiming and rebalancing process.
  174
+   */
  175
+  private def scheduleLoadTicks() {
  176
+    val sendLoadToZookeeper = new Runnable {
  177
+      def run() {
  178
+        meters.foreach { case(workUnit, meter) =>
  179
+          ZKUtils.setOrCreate(zk, name + "/meta/workload/" + workUnit, meter.oneMinuteRate.toString)
  180
+        }
  181
+        ZKUtils.setOrCreate(zk, name + "/nodes/" + myNodeID, state.toString)
  182
+
  183
+        if (config.useSmartBalancing)
  184
+          log.info("My load: %s", myLoad())
  185
+        else
  186
+          log.info("My load: %s", myWorkUnits.size)
  187
+      }
  188
+    }
  189
+
  190
+    if (config.useSmartBalancing && listener.isInstanceOf[SmartListener])
  191
+      loadFuture = Some(pool.scheduleAtFixedRate(sendLoadToZookeeper, 0, 1, TimeUnit.MINUTES))
  192
+  }
  193
+
  194
+  /**
  195
+   * Registers this node with Zookeeper on startup, retrying until it succeeds.
  196
+   * This retry logic is important in that a node which restarts before Zookeeper
  197
+   * detects the previous disconnect could prohibit the node from properly launching.
  198
+   */
  199
+  private def joinCluster() {
  200
+    while (true) {
  201
+      if (ZKUtils.createEphemeral(zk, name + "/nodes/" + myNodeID, NodeState.Fresh.toString)) {
  202
+        return
  203
+      } else {
  204
+        log.warn("Unable to register with Zookeeper on launch. " +
  205
+          "Is %s already running on this host? Retrying in 1 second...", name)
  206
+        Thread.sleep(1000)
  207
+      }
  208
+    }
  209
+  }
  210
+
  211
+  /**
  212
+   * Registers each of the watchers that we're interested in in Zookeeper, and callbacks.
  213
+   * This includes watchers for changes to cluster topology (/nodes), work units
  214
+   * (/work-units), and claimed work (/<service-name>/claimed-work). We also register
  215
+   * watchers for calls to "/meta/rebalance", and if smart balancing is enabled, we'll
  216
+   * watch "<service-name>/meta/workload" for changes to the cluster's workload.
  217
+   */
  218
+  private def registerWatchers() {
  219
+    zk.watchChildrenWithData[String](name + "/nodes", nodes, bytesToString(_), { data: String =>
  220
+      log.info("Nodes: %s".format(nodes.mkString(", ")))
  221
+      claimWork()
  222
+      verifyIntegrity()
  223
+    })
  224
+
  225
+    zk.watchChildrenWithData[String](config.workUnitName,
  226
+        allWorkUnits, bytesToString(_), { data: String =>
  227
+      log.debug(config.workUnitName.capitalize + " IDs: %s".format(allWorkUnits.keys.mkString(", ")))
  228
+      claimWork()
  229
+      verifyIntegrity()
  230
+    })
  231
+
  232
+    zk.watchChildrenWithData[String](name + "/claimed-" + config.workUnitShortName,
  233
+        workUnitMap, bytesToString(_), { data: String =>
  234
+      log.debug(config.workUnitName.capitalize + " / Node Mapping changed: %s", workUnitMap)
  235
+      claimWork()
  236
+      verifyIntegrity()
  237
+    })
  238
+
  239
+    if (config.useSoftHandoff) {
  240
+      // Watch handoff requests.
  241
+      zk.watchChildren(name + "/handoff-requests", { (newWorkUnits: Seq[String]) =>
  242
+        refreshSet(handoffRequests, newWorkUnits)
  243
+        log.debug("Handoff requests changed: %s".format(handoffRequests.mkString(", ")))
  244
+        verifyIntegrity()
  245
+        claimWork()
  246
+      })
  247
+
  248
+      // Watch handoff results.
  249
+      zk.watchChildrenWithData[String](name + "/handoff-result",
  250
+        handoffResults, bytesToString(_), { workUnit: String =>
  251
+
  252
+        // If I am the node which accepted this handoff, finish the job.
  253
+        val destinationNode = handoffResults.get(workUnit).getOrElse("")
  254
+        if (myWorkUnits.contains(workUnit) && myNodeID.equals(destinationNode))
  255
+          finishHandoff(workUnit)
  256
+
  257
+        // If I'm the node that requested to hand off this work unit to another node, shut it down after <config> seconds.
  258
+        else if (myWorkUnits.contains(workUnit) && !destinationNode.equals("") && !myNodeID.equals(destinationNode)) {
  259
+          log.info("Handoff of %s to %s completed. Shutting down %s in %s seconds.",
  260
+            workUnit, handoffResults.get(workUnit).getOrElse("(None)"), workUnit, config.handoffShutdownDelay)
  261
+          ZKUtils.delete(zk, name + "/handoff-requests/" + workUnit)
  262
+
  263
+          val runnable = new Runnable {
  264
+            def run() {
  265
+              log.info("Shutting down %s following handoff to %s.",
  266
+                workUnit, handoffResults.get(workUnit).getOrElse("(None)"))
  267
+              shutdownWork(workUnit, false, true)
  268
+              if (myWorkUnits.size() == 0 && state.get() == NodeState.Draining)
  269
+                shutdown()
  270
+            }
  271
+          };
  272
+
  273
+          pool.schedule(runnable, config.handoffShutdownDelay, TimeUnit.SECONDS).asInstanceOf[Unit]
  274
+        }
  275
+      })
  276
+    }
  277
+
  278
+    // Watch for rebalance requests.
  279
+    zk.watchNode(name + "/meta/rebalance", rebalance(_))
  280
+
  281
+    // If smart balancing is enabled, watch for changes to the cluster's workload.
  282
+    if (config.useSmartBalancing && listener.isInstanceOf[SmartListener])
  283
+      zk.watchChildrenWithData[Double](name + "/meta/workload", loadMap, bytesToDouble(_))
  284
+  }
  285
+
  286
+  /**
  287
+   * Triggers a work-claiming cycle. If smart balancing is enabled, claim work based
  288
+   * on node and cluster load. If simple balancing is in effect, claim by count.
  289
+   */
  290
+  private def claimWork() {
  291
+    if (state.get != NodeState.Started) return
  292
+
  293
+    if (config.useSmartBalancing && listener.isInstanceOf[SmartListener])
  294
+      claimByLoad()
  295
+    else
  296
+      claimByCount()
  297
+  }
  298
+
  299
+  /**
  300
+   * Begins by claimng all work units that are pegged to this node.
  301
+   * Then, continues to c state.get  ==laim work from the available pool until we've claimed
  302
+   * equal to or slightly more than the total desired load.
  303
+   */
  304
+  private def claimByLoad() {
  305
+    allWorkUnits.synchronized {
  306
+
  307
+      val peggedCheck = new LinkedList[String](allWorkUnits.keys.toSet -- workUnitMap.keys.toSet --
  308
+        myWorkUnits ++ handoffRequests -- handoffResults.keys)
  309
+      for (workUnit <- peggedCheck)
  310
+        if (isPeggedToMe(workUnit))
  311
+          claimWorkPeggedToMe(workUnit)
  312
+
  313
+      val unclaimed = new LinkedList[String](peggedCheck -- myWorkUnits)
  314
+
  315
+      while (myLoad() <= evenDistribution && !unclaimed.isEmpty) {
  316
+        val workUnit = unclaimed.poll()
  317
+
  318
+        if (config.useSoftHandoff && handoffRequests.contains(workUnit)
  319
+            && isFairGame(workUnit) && attemptToClaim(workUnit, true))
  320
+          log.info("Accepted handoff for %s.", workUnit)
  321
+        else if (isFairGame(workUnit))
  322
+          attemptToClaim(workUnit)
  323
+      }
  324
+    }
  325
+  }
  326
+
  327
+  /**
  328
+    * Claims work in Zookeeper. This method will attempt to divide work about the cluster
  329
+    * by claiming up to ((<x> Work Unit Count / <y> Nodes) + 1) work units. While
  330
+    * this doesn't necessarily represent an even load distribution based on work unit load,
  331
+    * it should result in a relatively even "work unit count" per node. This randomly-distributed
  332
+    * amount is in addition to any work units which are pegged to this node.
  333
+   */
  334
+  private def claimByCount() {
  335
+    var claimed = myWorkUnits.size
  336
+    val nodeCount = activeNodeSize()
  337
+
  338
+    allWorkUnits.synchronized {
  339
+      val maxToClaim = {
  340
+        if (allWorkUnits.size <= 1) allWorkUnits.size
  341
+        else (allWorkUnits.size / nodeCount.toDouble).ceil
  342
+      }
  343
+
  344
+      log.debug("%s Nodes: %s. %s: %s.", name, nodeCount, config.workUnitName.capitalize, allWorkUnits.size)
  345
+      log.debug("Claiming %s pegged to me, and up to %s more.", config.workUnitName, maxToClaim)
  346
+
  347
+      val unclaimed = allWorkUnits.keys.toSet -- workUnitMap.keys.toSet ++ handoffRequests -- handoffResults.keys
  348
+      log.debug("Handoff requests: %s, Handoff Results: %s, Unclaimed: %s",
  349
+        handoffRequests.mkString(", "), handoffResults.mkString(", "), unclaimed.mkString(", "))
  350
+
  351
+      for (workUnit <- unclaimed) {
  352
+        if ((isFairGame(workUnit) && claimed < maxToClaim) || isPeggedToMe(workUnit)) {
  353
+
  354
+          if (config.useSoftHandoff && handoffRequests.contains(workUnit) && attemptToClaim(workUnit, true)) {
  355
+            log.info("Accepted handoff of %s.", workUnit)
  356
+            claimed += 1
  357
+          } else if (!handoffRequests.contains(workUnit) && attemptToClaim(workUnit)) {
  358
+            claimed += 1
  359
+          }
  360
+        }
  361
+      }
  362
+    }
  363
+  }
  364
+
  365
+  def finishHandoff(workUnit: String) {
  366
+    log.info("Handoff of %s to me acknowledged. Deleting claim ZNode for %s and waiting for " +
  367
+      "%s to shutdown work.", workUnit, workUnit, workUnitMap.get(workUnit).getOrElse("(None)"))
  368
+
  369
+    val claimPostHandoffTask = new TimerTask {
  370
+      def run() {
  371
+        if (ZKUtils.createEphemeral(zk,
  372
+          name + "/claimed-" + config.workUnitShortName + "/" + workUnit, myNodeID)) {
  373
+          ZKUtils.delete(zk, name + "/handoff-result/" + workUnit)
  374
+          log.warn("Handoff of %s to me complete. Peer has shut down work.", workUnit)
  375
+        } else {
  376
+          log.warn("Waiting to establish final ownership of %s following handoff...", workUnit)
  377
+          pool.schedule(this, 2, TimeUnit.SECONDS)
  378
+        }
  379
+      }
  380
+    }
  381
+
  382
+    pool.schedule(claimPostHandoffTask, config.handoffShutdownDelay, TimeUnit.SECONDS)
  383
+  }
  384
+
  385
+  def attemptToClaim(workUnit: String, claimForHandoff: Boolean = false) : Boolean = {
  386
+    val path = {
  387
+      if (claimForHandoff) name + "/handoff-result/" + workUnit
  388
+      else name + "/claimed-" + config.workUnitShortName + "/" + workUnit
  389
+    }
  390
+
  391
+    val created = ZKUtils.createEphemeral(zk, path, myNodeID)
  392
+
  393
+    if (created) {
  394
+      startWork(workUnit)
  395
+      true
  396
+    } else if (isPeggedToMe(workUnit)) {
  397
+      claimWorkPeggedToMe(workUnit)
  398
+      true
  399
+    } else {
  400
+      false
  401
+    }
  402
+  }
  403
+
  404
+  /**
  405
+    * Requests that another node take over for a work unit by creating a ZNode
  406
+    * at handoff-requests. This will trigger a claim cycle and adoption.
  407
+   */
  408
+  def requestHandoff(workUnit: String) {
  409
+    log.info("Requesting handoff for %s.", workUnit)
  410
+    ZKUtils.createEphemeral(zk, name + "/handoff-requests/" + workUnit)
  411
+  }
  412
+
  413
+
  414
+  /**
  415
+    * Determines whether or not a given work unit is designated "claimable" by this node.
  416
+    * If the ZNode for this work unit is empty, or contains JSON mapping this node to that
  417
+    * work unit, it's considered "claimable."
  418
+   */
  419
+  private def isFairGame(workUnit: String) : Boolean = {
  420
+    val workUnitData = allWorkUnits.get(workUnit)
  421
+    if (workUnitData.isEmpty || workUnitData.get.equals(""))
  422
+      return true
  423
+
  424
+    val mapping = parse[Map[String, String]](workUnitData.get)
  425
+    val pegged = mapping.get(name)
  426
+    if (pegged.isDefined) log.debug("Pegged status for %s: %s.", workUnit, pegged.get)
  427
+    (pegged.isEmpty || (pegged.isDefined && pegged.get.equals(myNodeID)) ||
  428
+      (pegged.isDefined && pegged.get.equals("")))
  429
+  }
  430
+
  431
+  /**
  432
+   * Determines whether or not a given work unit is pegged to this instance.
  433
+   */
  434
+  private def isPeggedToMe(workUnitId: String) : Boolean = {
  435
+    val zkWorkData = allWorkUnits.get(workUnitId).get
  436
+    if (zkWorkData.isEmpty) {
  437
+      workUnitsPeggedToMe.remove(workUnitId)
  438
+      return false
  439
+    }
  440
+
  441
+    val mapping = parse[Map[String, String]](zkWorkData)
  442
+    val pegged = mapping.get(name)
  443
+    val isPegged = (pegged.isDefined && (pegged.get.equals(myNodeID)))
  444
+
  445
+    if (isPegged) workUnitsPeggedToMe.add(workUnitId)
  446
+    else workUnitsPeggedToMe.remove(workUnitId)
  447
+
  448
+    isPegged
  449
+  }
  450
+
  451
+  /**
  452
+   * Verifies that all nodes are hooked up properly. Shuts down any work units
  453
+   * which have been removed from the cluster or have been assigned to another node.
  454
+   */
  455
+  private def verifyIntegrity() {
  456
+    val noLongerActive = myWorkUnits -- allWorkUnits.keys.toSet
  457
+    for (workUnit <- noLongerActive)
  458
+      shutdownWork(workUnit)
  459
+
  460
+    // Check the status of pegged work units to ensure that this node is not serving
  461
+    // a work unit that is pegged to another node in the cluster.
  462
+    myWorkUnits.map { workUnit =>
  463
+      if (!isFairGame(workUnit) && !isPeggedToMe(workUnit)) {
  464
+        log.info("Discovered I'm serving a work unit that's now " +
  465
+          "pegged to someone else. Shutting down %s", workUnit)
  466
+        shutdownWork(workUnit)
  467
+
  468
+      } else if (workUnitMap.contains(workUnit) && !workUnitMap.get(workUnit).get.equals(myNodeID) &&
  469
+          !handoffResults.get(workUnit).getOrElse("None").equals(myNodeID)) {
  470
+        log.info("Discovered I'm serving a work unit that's now " +
  471
+          "served by %s. Shutting down %s", workUnitMap.get(workUnit).get, workUnit)
  472
+        shutdownWork(workUnit, true, false)
  473
+      }
  474
+    }
  475
+  }
  476
+
  477
+  /**
  478
+   * Claims a work unit pegged to this node, waiting for the ZNode to become available
  479
+   * (i.e., deleted by the node which previously owned it).
  480
+   */
  481
+  private def claimWorkPeggedToMe(workUnit: String) {
  482
+    while (true) {
  483
+      if (ZKUtils.createEphemeral(zk,
  484
+          name + "/claimed-" + config.workUnitShortName + "/" + workUnit, myNodeID)) {
  485
+        startWork(workUnit)
  486
+        return
  487
+      } else {
  488
+        log.warn("Attempting to establish ownership of %s. Retrying in one second...", workUnit)
  489
+        Thread.sleep(1000)
  490
+      }
  491
+    }
  492
+  }
  493
+
  494
+  /**
  495
+   * Starts up a work unit that this node has claimed.
  496
+   * If "smart rebalancing" is enabled, hand the listener a meter to mark load.
  497
+   * Otherwise, just call "startWork" on the listener and let the client have at it.
  498
+   */
  499
+  private def startWork(workUnit: String) {
  500
+    log.info("Successfully claimed %s: %s. Starting...", config.workUnitName, workUnit)
  501
+    myWorkUnits.add(workUnit)
  502
+
  503
+    if (listener.isInstanceOf[SmartListener]) {
  504
+      val meter = metrics.meter(workUnit, "processing")
  505
+      meters.put(workUnit, meter)
  506
+      listener.asInstanceOf[SmartListener].startWork(workUnit, meter)
  507
+    } else {
  508
+      listener.asInstanceOf[ClusterListener].startWork(workUnit)
  509
+    }
  510
+  }
  511
+
  512
+  /**
  513
+   * Shuts down a work unit by removing the claim in ZK and calling the listener.
  514
+   */
  515
+  private def shutdownWork(workUnit: String, doLog: Boolean = true, deleteZNode: Boolean = true) {
  516
+    if (doLog) log.info("Shutting down %s: %s...", config.workUnitName, workUnit)
  517
+    myWorkUnits.remove(workUnit)
  518
+    if (deleteZNode) ZKUtils.delete(zk, name + "/claimed-" + config.workUnitShortName + "/" + workUnit)
  519
+    meters.remove(workUnit)
  520
+    listener.shutdownWork(workUnit)
  521
+  }
  522
+
  523
+  /**
  524
+   * Drains excess load on this node down to a fraction distributed across the cluster.
  525
+   * The target load is set to (clusterLoad / # nodes).
  526
+   */
  527
+  private def drainToLoad(targetLoad: Long, time: Int = config.drainTime, useHandoff: Boolean = config.useSoftHandoff) {
  528
+    var currentLoad = myLoad()
  529
+    val drainList = new LinkedList[String]
  530
+    val eligibleToDrop = new LinkedList[String](myWorkUnits -- workUnitsPeggedToMe)
  531
+
  532
+    while (currentLoad > targetLoad && !eligibleToDrop.isEmpty) {
  533
+      val workUnit = eligibleToDrop.poll()
  534
+      val workUnitLoad : Double = loadMap.get(workUnit).getOrElse(0)
  535
+
  536
+      if (workUnitLoad > 0 && currentLoad - workUnitLoad > targetLoad) {
  537
+        drainList.add(workUnit)
  538
+        currentLoad -= workUnitLoad
  539
+      }
  540
+    }
  541
+    val drainInterval = ((config.drainTime.toDouble / drainList.size) * 1000).intValue()
  542
+
  543
+    val drainTask = new TimerTask {
  544
+
  545
+      def run() {
  546
+        if (drainList.isEmpty || myLoad <= evenDistribution)
  547
+          return
  548
+        else if (useHandoff)
  549
+          requestHandoff(drainList.poll)
  550
+        else
  551
+          shutdownWork(drainList.poll)
  552
+
  553
+        pool.schedule(this, drainInterval, TimeUnit.MILLISECONDS)
  554
+      }
  555
+    }
  556
+
  557
+    if (!drainList.isEmpty) {
  558
+      log.info("Releasing work units over %s seconds. Current load: %s. Target: %s. " +
  559
+        "Releasing: %s", time, currentLoad, targetLoad, drainList.mkString(", "))
  560
+      pool.schedule(drainTask, 0, TimeUnit.SECONDS)
  561
+    }
  562
+  }
  563
+
  564
+  /**
  565
+   * Drains this node's share of the cluster workload down to a specific number
  566
+   * of work units over a period of time specified in the configuration with soft handoff if enabled..
  567
+   */
  568
+  def drainToCount(targetCount: Int, doShutdown: Boolean = false, useHandoff: Boolean = config.useSoftHandoff) {
  569
+    val msg = if (useHandoff) " with handoff" else ""
  570
+    log.info("Draining %s%s. Target count: %s, Current: %s", config.workUnitName, msg, targetCount, myWorkUnits.size)
  571
+    if (targetCount >= myWorkUnits.size)
  572
+      return
  573
+
  574
+    val amountToDrain = myWorkUnits.size - targetCount
  575
+
  576
+    val msgPrefix = if (useHandoff) "Requesting handoff for " else "Shutting down "
  577
+    log.info("%s %s of %s %s over %s seconds",
  578
+      msgPrefix, amountToDrain, myWorkUnits.size, config.workUnitName, config.drainTime)
  579
+
  580
+    // Build a list of work units to hand off.
  581
+    val toHandOff = new LinkedList[String]
  582
+    val wuList = myWorkUnits.toList
  583
+    for (i <- (0 to amountToDrain - 1))
  584
+      if (wuList.size - 1 >= i) toHandOff.add(wuList(i))
  585
+
  586
+    val drainInterval = ((config.drainTime.toDouble / toHandOff.size) * 1000).intValue()
  587
+
  588
+    val handoffTask = new TimerTask {
  589
+      def run() {
  590
+        if (toHandOff.isEmpty) {
  591
+          if (targetCount == 0 && doShutdown) completeShutdown()
  592
+          return
  593
+        } else {
  594
+          val workUnit = toHandOff.poll()
  595
+          if (useHandoff && !isPeggedToMe(workUnit)) requestHandoff(workUnit)
  596
+          else shutdownWork(workUnit)
  597
+        }
  598
+        pool.schedule(this, drainInterval, TimeUnit.MILLISECONDS)
  599
+      }
  600
+    }
  601
+
  602
+    log.info("Releasing %s / %s work units over %s seconds: %s",
  603
+      amountToDrain, myWorkUnits.size, config.drainTime, toHandOff.mkString(", "))
  604
+
  605
+    if (!myWorkUnits.isEmpty)
  606
+      pool.schedule(handoffTask, 0, TimeUnit.SECONDS)
  607
+  }
  608
+
  609
+
  610
+  /**
  611
+   * Initiates a cluster rebalance. If smart balancing is enabled, the target load
  612
+   * is set to (total cluster load / node count), where "load" is determined by the
  613
+   * sum of all work unit meters in the cluster. If smart balancing is disabled,
  614
+   * the target load is set to (# of work items / node count).
  615
+   */
  616
+  def rebalance(data: Option[Array[Byte]] = null) {
  617
+    if (state.get() == NodeState.Fresh) return
  618
+
  619
+    if (config.useSmartBalancing && listener.isInstanceOf[SmartListener])
  620
+      smartRebalance()
  621
+    else
  622
+      simpleRebalance()
  623
+  }
  624
+
  625
+  /**
  626
+   * Performs a "smart rebalance." The target load is set to (cluster load / node count),
  627
+   * where "load" is determined by the sum of all work unit meters in the cluster.
  628
+   */
  629
+  private def smartRebalance() {
  630
+    val target = evenDistribution()
  631
+    if (myLoad() > target) {
  632
+      log.info("Smart Rebalance triggered. Load: %s. Target: %s", myLoad(), target)
  633
+      drainToLoad(target.longValue)
  634
+    }
  635
+  }
  636
+
  637
+  /**
  638
+   * Performs a simple rebalance. Target load is set to (# of work items / node count).
  639
+   */
  640
+  private def simpleRebalance(data: Option[Array[Byte]] = null) {
  641
+    val target = fairShare()
  642
+
  643
+    if (myWorkUnits.size > target) {
  644
+      log.info("Simple Rebalance triggered. My Share: %s. Target: %s.",  myWorkUnits.size, target)
  645
+      drainToCount(target)
  646
+    }
  647
+  }
  648
+
  649
+  /**
  650
+   * Determines the current load on this instance when smart rebalancing is enabled.
  651
+   * This load is determined by the sum of all of this node's meters' one minute rate.
  652
+   */
  653
+  private def myLoad() : Double = {
  654
+    var load = 0d
  655
+    log.debug(loadMap.toString)
  656
+    log.debug(myWorkUnits.toString)