Skip to content
Browse files

Trigger finalizeHandoff() tasks via direct invocation upon claiming a…

… work unit rather than waiting for notification from ZooKeeper.
  • Loading branch information...
1 parent ad0a4a1 commit 0b9189abbca00db99b0031fe49abec06541cfafa C. Scott Andreas committed Nov 8, 2012
View
3 src/main/scala/com/boundary/ordasity/Cluster.scala
@@ -66,6 +66,7 @@ class Cluster(val name: String, val listener: Listener, config: ClusterConfig)
var loadMap : Map[String, Double] = null
val workUnitsPeggedToMe = new NonBlockingHashSet[String]
val claimer = new Claimer(this)
+ val handoffResultsListener = new HandoffResultsListener(this, config)
var balancingPolicy = {
if (config.useSmartBalancing)
@@ -348,7 +349,6 @@ class Cluster(val name: String, val listener: Listener, config: ClusterConfig)
val nodesChangedListener = new ClusterNodesChangedListener(this)
val verifyIntegrityListener = new VerifyIntegrityListener(this, config)
- val handoffResultsListener = new HandoffResultsListener(this, config)
val stringDeser = new StringDeserializer()
nodes = ZKMap.create(zk, "/%s/nodes".format(name),
@@ -458,6 +458,7 @@ class Cluster(val name: String, val listener: Listener, config: ClusterConfig)
def shutdownWork(workUnit: String, doLog: Boolean = true, deleteZNode: Boolean = true) {
if (doLog) log.info("Shutting down %s: %s...", config.workUnitName, workUnit)
myWorkUnits.remove(workUnit)
+ claimedForHandoff.remove(workUnit)
val path = "/%s/claimed-%s/%s".format(name, config.workUnitShortName, workUnit)
if (deleteZNode) ZKUtils.delete(zk, path)
balancingPolicy.onShutdownWork(workUnit)
View
1 src/main/scala/com/boundary/ordasity/balancing/CountBalancingPolicy.scala
@@ -51,6 +51,7 @@ class CountBalancingPolicy(cluster: Cluster, config: ClusterConfig) extends Bala
if ((isFairGame(workUnit) && claimed < maxToClaim) || isPeggedToMe(workUnit)) {
if (config.useSoftHandoff && cluster.handoffRequests.contains(workUnit) && attemptToClaim(workUnit, true)) {
log.info("Accepted handoff of %s.", workUnit)
+ cluster.handoffResultsListener.finishHandoff(workUnit)
claimed += 1
} else if (!cluster.handoffRequests.contains(workUnit) && attemptToClaim(workUnit)) {
claimed += 1
View
6 src/main/scala/com/boundary/ordasity/balancing/MeteredBalancingPolicy.scala
@@ -64,8 +64,10 @@ class MeteredBalancingPolicy(cluster: Cluster, config: ClusterConfig)
val workUnit = unclaimed.poll()
if (config.useSoftHandoff && cluster.handoffRequests.contains(workUnit)
- && isFairGame(workUnit) && attemptToClaim(workUnit, claimForHandoff = true))
- log.info("Accepted handoff for %s.", workUnit)
+ && isFairGame(workUnit) && attemptToClaim(workUnit, claimForHandoff = true)) {
+ log.info("Accepted handoff for %s.", workUnit)
+ cluster.handoffResultsListener.finishHandoff(workUnit)
+ }
else if (isFairGame(workUnit))
attemptToClaim(workUnit)
View
39 src/main/scala/com/boundary/ordasity/listeners/HandoffResultsListener.scala
@@ -41,10 +41,7 @@ class HandoffResultsListener(cluster: Cluster, config: ClusterConfig)
def apply(workUnit: String) {
if (!cluster.initialized.get()) return
- if (iAcceptedHandoff(workUnit)) {
- finishHandoff(workUnit)
-
- } else if (iRequestedHandoff(workUnit)) {
+ if (iRequestedHandoff(workUnit)) {
log.info("Handoff of %s to %s completed. Shutting down %s in %s seconds.", workUnit,
cluster.getOrElse(cluster.handoffResults, workUnit, "(None)"), workUnit, config.handoffShutdownDelay)
ZKUtils.delete(cluster.zk, "/%s/handoff-requests/%s".format(cluster.name, workUnit))
@@ -53,16 +50,6 @@ class HandoffResultsListener(cluster: Cluster, config: ClusterConfig)
}
/**
- * Determines if this Ordasity node has accepted handoff of a work unit.
- * I have accepted handoff of this work unit if its "destination" is "me"
- * and it is in my set of active work units.
- */
- def iAcceptedHandoff(workUnit: String) : Boolean = {
- val destinationNode = cluster.getOrElse(cluster.handoffResults, workUnit, "")
- cluster.myWorkUnits.contains(workUnit) && cluster.isMe(destinationNode)
- }
-
- /**
* Determines if this Ordasity node requested handoff of a work unit to someone else.
* I have requested handoff of a work unit if it's currently a member of my active set
* and its destination node is another node in the cluster.
@@ -83,7 +70,7 @@ class HandoffResultsListener(cluster: Cluster, config: ClusterConfig)
def run() {
log.info("Shutting down %s following handoff to %s.",
workUnit, cluster.getOrElse(cluster.handoffResults, workUnit, "(None)"))
- cluster.shutdownWork(workUnit, false, true)
+ cluster.shutdownWork(workUnit, doLog = false, deleteZNode = true)
if (cluster.myWorkUnits.size() == 0 && cluster.state.get() == NodeState.Draining)
cluster.shutdown()
@@ -102,15 +89,21 @@ class HandoffResultsListener(cluster: Cluster, config: ClusterConfig)
val claimPostHandoffTask = new TimerTask {
def run() {
- val path = "/%s/claimed-%s/%s".format(cluster.name, config.workUnitShortName, workUnit)
- if (ZKUtils.createEphemeral(cluster.zk, path, cluster.myNodeID) || cluster.znodeIsMe(path)) {
- ZKUtils.delete(cluster.zk, "/" + cluster.name + "/handoff-result/" + workUnit)
- cluster.claimedForHandoff.remove(workUnit)
- log.warn("Handoff of %s to me complete. Peer has shut down work.", workUnit)
- } else {
- log.warn("Waiting to establish final ownership of %s following handoff...", workUnit)
- cluster.pool.get.schedule(this, retryTime, TimeUnit.MILLISECONDS)
+ try {
+ val path = "/%s/claimed-%s/%s".format(cluster.name, config.workUnitShortName, workUnit)
+ if (ZKUtils.createEphemeral(cluster.zk, path, cluster.myNodeID) || cluster.znodeIsMe(path)) {
+ ZKUtils.delete(cluster.zk, "/" + cluster.name + "/handoff-result/" + workUnit)
+ cluster.claimedForHandoff.remove(workUnit)
+ log.warn("Handoff of %s to me complete. Peer has shut down work.", workUnit)
+ } else {
+ log.warn("Waiting to establish final ownership of %s following handoff...", workUnit)
+ cluster.pool.get.schedule(this, retryTime, TimeUnit.MILLISECONDS)
+ }
+ } catch {
+ case e: Exception =>
+ log.error(e, "Error completing handoff of %s to me.", workUnit)
}
+
}
}
View
27 src/test/scala/com/boundary/ordasity/listeners/HandoffResultsListenerSpec.scala
@@ -42,24 +42,6 @@ class HandoffResultsListenerSpec extends Spec with Logging {
class `Handoff Results Listener` {
- @Test def `test 'i accepted handoff'` {
- val cluster = new Cluster(UUID.randomUUID().toString, null, config)
- val listener = new HandoffResultsListener(cluster, config)
-
- cluster.handoffResults = new HashMap[String, String]
- cluster.handoffResults.put("workUnit", "testNode")
- cluster.handoffResults.put("otherWorkUnit", "otherNode")
- cluster.handoffResults.put("edgeCase", "otherNode")
-
- cluster.myWorkUnits.add("workUnit")
- cluster.myWorkUnits.add("edgeCase")
-
- listener.iAcceptedHandoff("workUnit").must(be(true))
- listener.iAcceptedHandoff("otherWorkUnit").must(be(false))
- listener.iAcceptedHandoff("edgeCase").must(be(false))
- listener.iAcceptedHandoff("nothing").must(be(false))
- }
-
@Test def `test 'i requested handoff'` {
val cluster = new Cluster(UUID.randomUUID().toString, null, config)
val listener = new HandoffResultsListener(cluster, config)
@@ -206,6 +188,7 @@ class HandoffResultsListenerSpec extends Spec with Logging {
cluster.claimedForHandoff.contains(workUnit).must(be(false))
}
+ // TODO: Expand the scope of this test.
// The big kahuna for 'i accepted handoff'
@Test def `test apply for accepting handoff` {
val workUnit = "workUnit"
@@ -215,16 +198,17 @@ class HandoffResultsListenerSpec extends Spec with Logging {
cluster.watchesRegistered.set(true)
cluster.initialized.set(true)
cluster.handoffResults = new HashMap[String, String]
+ cluster.workUnitMap = new HashMap[String, String]
cluster.handoffResults.put(workUnit, "testNode")
cluster.myWorkUnits.add(workUnit)
- listener.iAcceptedHandoff(workUnit).must(be(true))
+ cluster.claimedForHandoff.add(workUnit)
+ cluster.handoffResultsListener.finishHandoff(workUnit)
val mockZK = mock[ZooKeeper]
val mockZKClient = mock[ZooKeeperClient]
mockZKClient.get().returns(mockZK)
cluster.zk = mockZKClient
- cluster.claimedForHandoff.add(workUnit)
cluster.workUnitMap = new HashMap[String, String]
cluster.workUnitMap.put(workUnit, "somewhereElse")
@@ -256,6 +240,9 @@ class HandoffResultsListenerSpec extends Spec with Logging {
val myWorkUnits = new NonBlockingHashSet[String]
myWorkUnits.add(workUnit)
+ val claimedForHandoff = new NonBlockingHashSet[String]
+ cluster.claimedForHandoff.returns(claimedForHandoff)
+
// Mocks
val mockZK = mock[ZooKeeper]
val mockZKClient = mock[ZooKeeperClient]

0 comments on commit 0b9189a

Please sign in to comment.
Something went wrong with that request. Please try again.