From 412088dd2ca5d437f4a7ab165824996df0525371 Mon Sep 17 00:00:00 2001 From: Jay J Wylie Date: Wed, 29 May 2013 08:23:26 -0700 Subject: [PATCH] Address minor review feedback Mostly, made variable names more clear AdminClient - cleaned up comments & TODO RebalanceController - fix subtle bug in when proxyPause invoked RepartitoinerTest - changed zone expansion test to cover two use cases: when invoked from RepartitoinerCLI with a current cluster versus with an interim cluster. --- .../client/protocol/admin/AdminClient.java | 19 ++- .../RebalanceBatchPlanProgressBar.java | 45 +++-- .../client/rebalance/RebalanceController.java | 19 +-- .../client/rebalance/task/RebalanceTask.java | 32 ++-- .../tools/RebalanceControllerCLI.java | 8 +- src/java/voldemort/tools/Repartitioner.java | 154 +++++++++--------- .../voldemort/tools/RepartitionerTest.java | 15 +- 7 files changed, 166 insertions(+), 126 deletions(-) diff --git a/src/java/voldemort/client/protocol/admin/AdminClient.java b/src/java/voldemort/client/protocol/admin/AdminClient.java index 3a79389da5..0bc325c516 100644 --- a/src/java/voldemort/client/protocol/admin/AdminClient.java +++ b/src/java/voldemort/client/protocol/admin/AdminClient.java @@ -2393,17 +2393,17 @@ public Versioned getRemoteRebalancerState(int nodeId) { value.getVersion()); } - // TODO: The "Order" column makes no sense to me. This is very, very - // hard to grok. I want to prefix each entry with in the order column - // with a number (e.g., '0: rebalance', '1: cluster->Swap', ...) but - // have no idea what the order is. Are they in order in the table? what - // do the arrows mean in the order column? - /** * Used in rebalancing to indicate change in states. Groups the * partition plans on the basis of stealer nodes and sends them over. * * The various combinations and their order of execution is given below + * where: + * * *
          * | swapRO | changeClusterMetadata | changeRebalanceState | Order                        |
@@ -2414,7 +2414,12 @@ public Versioned getRemoteRebalancerState(int nodeId) {
          * 
* * - * Similarly for rollback: + * Similarly for rollback, order means the following: + * * *
          * | swapRO | changeClusterMetadata | changeRebalanceState | Order                                    |
diff --git a/src/java/voldemort/client/rebalance/RebalanceBatchPlanProgressBar.java b/src/java/voldemort/client/rebalance/RebalanceBatchPlanProgressBar.java
index cf6f2c8fb9..9a18a44855 100644
--- a/src/java/voldemort/client/rebalance/RebalanceBatchPlanProgressBar.java
+++ b/src/java/voldemort/client/rebalance/RebalanceBatchPlanProgressBar.java
@@ -31,25 +31,33 @@ public class RebalanceBatchPlanProgressBar {
     private static final DecimalFormat decimalFormatter = new DecimalFormat("#.##");
 
     private final int batchId;
-    private final int taskCount;
-    private final int partitionStoreCount;
+    private final int totalTaskCount;
+    private final int totalPartitionStoreCount;
 
     private final long startTimeMs;
 
     private Set tasksInFlight;
-    private int numTasks;
-    private int numPartitionStores;
+    private int numTasksCompleted;
+    private int numPartitionStoresMigrated;
 
-    RebalanceBatchPlanProgressBar(int batchId, int taskCount, int partitionStoreCount) {
+    /**
+     * Construct a progress bar object to track rebalance tasks completed and
+     * partition-stores migrated.
+     * 
+     * @param batchId
+     * @param totalTaskCount
+     * @param totalPartitionStoreCount
+     */
+    RebalanceBatchPlanProgressBar(int batchId, int totalTaskCount, int totalPartitionStoreCount) {
         this.batchId = batchId;
-        this.taskCount = taskCount;
-        this.partitionStoreCount = partitionStoreCount;
+        this.totalTaskCount = totalTaskCount;
+        this.totalPartitionStoreCount = totalPartitionStoreCount;
 
         this.startTimeMs = System.currentTimeMillis();
 
         this.tasksInFlight = new HashSet();
-        this.numTasks = 0;
-        this.numPartitionStores = 0;
+        this.numTasksCompleted = 0;
+        this.numPartitionStoresMigrated = 0;
     }
 
     /**
@@ -65,17 +73,17 @@ synchronized public void beginTask(int taskId) {
 
     /**
      * Called whenever a rebalance task completes. This means one task is done
-     * and some number of partition stores have been moved.
+     * and some number of partition stores have been migrated.
      * 
      * @param taskId
-     * @param partitionStoreCount Number of partition stores moved by this
+     * @param totalPartitionStoreCount Number of partition stores moved by this
      *        completed task.
      */
-    synchronized public void completeTask(int taskId, int taskPartitionStores) {
+    synchronized public void completeTask(int taskId, int partitionStoresMigrated) {
         tasksInFlight.remove(taskId);
 
-        numTasks++;
-        numPartitionStores += taskPartitionStores;
+        numTasksCompleted++;
+        numPartitionStoresMigrated += partitionStoresMigrated;
 
         updateProgressBar();
     }
@@ -89,8 +97,8 @@ synchronized public void completeTask(int taskId, int taskPartitionStores) {
     synchronized public String getPrettyProgressBar() {
         StringBuilder sb = new StringBuilder();
 
-        double taskRate = numTasks / (double) taskCount;
-        double partitionStoreRate = numPartitionStores / (double) partitionStoreCount;
+        double taskRate = numTasksCompleted / (double) totalTaskCount;
+        double partitionStoreRate = numPartitionStoresMigrated / (double) totalPartitionStoreCount;
 
         long deltaTimeMs = System.currentTimeMillis() - startTimeMs;
         long taskTimeRemainingMs = Long.MAX_VALUE;
@@ -110,7 +118,8 @@ synchronized public String getPrettyProgressBar() {
           .append(".")
           .append(Utils.NEWLINE);
         // Tasks completed update
-        sb.append("\t" + numTasks + " out of " + taskCount + " rebalance tasks complete.")
+        sb.append("\t" + numTasksCompleted + " out of " + totalTaskCount
+                  + " rebalance tasks complete.")
           .append(Utils.NEWLINE)
           .append("\t")
           .append(decimalFormatter.format(taskRate * 100.0))
@@ -121,7 +130,7 @@ synchronized public String getPrettyProgressBar() {
           .append(" minutes) remaining.")
           .append(Utils.NEWLINE);
         // Partition-stores migrated update
-        sb.append("\t" + numPartitionStores + " out of " + partitionStoreCount
+        sb.append("\t" + numPartitionStoresMigrated + " out of " + totalPartitionStoreCount
                   + " partition-stores migrated.")
           .append(Utils.NEWLINE)
           .append("\t")
diff --git a/src/java/voldemort/client/rebalance/RebalanceController.java b/src/java/voldemort/client/rebalance/RebalanceController.java
index b5cebb3570..9620311acc 100644
--- a/src/java/voldemort/client/rebalance/RebalanceController.java
+++ b/src/java/voldemort/client/rebalance/RebalanceController.java
@@ -74,13 +74,13 @@ public class RebalanceController {
     private final int maxParallelRebalancing;
     private final int maxTriesRebalancing;
     private final boolean stealerBasedRebalancing;
-    private final long proxyPauseS;
+    private final long proxyPauseSec;
 
     public RebalanceController(String bootstrapUrl,
                                int maxParallelRebalancing,
                                int maxTriesRebalancing,
                                boolean stealerBased,
-                               long proxyPauseS) {
+                               long proxyPauseSec) {
         this.adminClient = new AdminClient(bootstrapUrl,
                                            new AdminClientConfig(),
                                            new ClientConfig());
@@ -91,7 +91,7 @@ public RebalanceController(String bootstrapUrl,
         this.maxParallelRebalancing = maxParallelRebalancing;
         this.maxTriesRebalancing = maxTriesRebalancing;
         this.stealerBasedRebalancing = stealerBased;
-        this.proxyPauseS = proxyPauseS;
+        this.proxyPauseSec = proxyPauseSec;
     }
 
     /**
@@ -354,7 +354,6 @@ private void executeBatch(int batchId, final RebalanceBatchPlan batchPlan) {
 
             // STEP 2 - Move RO data
             if(hasReadOnlyStores) {
-                proxyPause();
                 RebalanceBatchPlanProgressBar progressBar = batchPlan.getProgressBar(batchId);
                 executeSubBatch(batchId,
                                 progressBar,
@@ -383,9 +382,7 @@ private void executeBatch(int batchId, final RebalanceBatchPlan batchPlan) {
 
             // STEP 4 - Move RW data
             if(hasReadWriteStores) {
-                if(!hasReadOnlyStores) {
-                    proxyPause();
-                }
+                proxyPause();
                 RebalanceBatchPlanProgressBar progressBar = batchPlan.getProgressBar(batchId);
                 executeSubBatch(batchId,
                                 progressBar,
@@ -415,9 +412,9 @@ private void executeBatch(int batchId, final RebalanceBatchPlan batchPlan) {
      */
     private void proxyPause() {
         logger.info("Pausing after cluster state has changed to allow proxy bridges to be established. "
-                    + "Will start rebalancing work on servers in " + proxyPauseS + " seconds.");
+                    + "Will start rebalancing work on servers in " + proxyPauseSec + " seconds.");
         try {
-            Thread.sleep(TimeUnit.SECONDS.toMillis(proxyPauseS));
+            Thread.sleep(TimeUnit.SECONDS.toMillis(proxyPauseSec));
         } catch(InterruptedException e) {
             logger.warn("Sleep interrupted in proxy pause.");
         }
@@ -442,7 +439,7 @@ private void proxyPause() {
      * 
      * Truth table, FTW!
      * 
-     * @param batchId Rebalancing task id
+     * @param batchId Rebalancing batch id
      * @param batchCurrentCluster Current cluster
      * @param batchFinalCluster Transition cluster to propagate
      * @param rebalancePartitionPlanList List of partition plan list
@@ -562,7 +559,7 @@ private void rebalanceStateChange(final int batchId,
      * | 7 | f | f | f | won't be triggered |
      * 
* - * @param batchId Rebalance task id + * @param batchId Rebalance batch id * @param batchRollbackCluster Cluster to rollback to if we have a problem * @param rebalancePartitionPlanList The list of rebalance partition plans * @param hasReadOnlyStores Are we rebalancing any read-only stores? diff --git a/src/java/voldemort/client/rebalance/task/RebalanceTask.java b/src/java/voldemort/client/rebalance/task/RebalanceTask.java index 8568a557a0..73830a723c 100644 --- a/src/java/voldemort/client/rebalance/task/RebalanceTask.java +++ b/src/java/voldemort/client/rebalance/task/RebalanceTask.java @@ -42,7 +42,8 @@ public abstract class RebalanceTask implements Runnable { protected final AtomicBoolean isComplete; protected final int partitionStoreCount; - protected long timeMs; + protected long permitAcquisitionTimeMs; + protected long taskCompletionTimeMs; protected final static int INVALID_REBALANCE_ID = -1; @@ -65,7 +66,8 @@ public RebalanceTask(final int batchId, this.isComplete = new AtomicBoolean(false); this.partitionStoreCount = RebalanceUtils.countPartitionStores(stealInfos); - this.timeMs = 0; + this.permitAcquisitionTimeMs = -1; + this.taskCompletionTimeMs = -1; taskLog(toString()); } @@ -113,7 +115,7 @@ protected void taskLog(String message) { * @param nodeId node ID for which donor permit is required */ protected void permitStart(int nodeId) { - timeMs = System.currentTimeMillis(); + permitAcquisitionTimeMs = System.currentTimeMillis(); taskLog("Acquiring donor permit for node " + nodeId + "."); } @@ -123,10 +125,13 @@ protected void permitStart(int nodeId) { * @param nodeId node ID for which donor permit is required */ protected void permitAcquired(int nodeId) { - long durationMs = System.currentTimeMillis() - timeMs; - timeMs = 0; - taskLog("Acquired donor permit for node " + nodeId + " in " - + TimeUnit.MILLISECONDS.toSeconds(durationMs) + " seconds."); + String durationString = ""; + if(permitAcquisitionTimeMs >= 0) { + long durationMs = System.currentTimeMillis() - permitAcquisitionTimeMs; + permitAcquisitionTimeMs = -1; + durationString = " in " + TimeUnit.MILLISECONDS.toSeconds(durationMs) + " seconds."; + } + taskLog("Acquired donor permit for node " + nodeId + durationString); } /** @@ -135,7 +140,7 @@ protected void permitAcquired(int nodeId) { * @param rebalanceAsyncId ID of the async rebalancing task */ protected void taskStart(int rebalanceAsyncId) { - timeMs = System.currentTimeMillis(); + taskCompletionTimeMs = System.currentTimeMillis(); taskLog("Starting rebalance of " + partitionStoreCount + " partition-stores for async operation id " + rebalanceAsyncId + "."); progressBar.beginTask(taskId); @@ -147,11 +152,14 @@ protected void taskStart(int rebalanceAsyncId) { * @param rebalanceAsyncId ID of the async rebalancing task */ protected void taskDone(int rebalanceAsyncId) { - long durationMs = System.currentTimeMillis() - timeMs; - timeMs = 0; + String durationString = ""; + if(taskCompletionTimeMs >= 0) { + long durationMs = System.currentTimeMillis() - taskCompletionTimeMs; + taskCompletionTimeMs = -1; + durationString = " in " + TimeUnit.MILLISECONDS.toSeconds(durationMs) + " seconds."; + } taskLog("Successfully finished rebalance of " + partitionStoreCount - + " for async operation id " + rebalanceAsyncId + " in " - + TimeUnit.MILLISECONDS.toSeconds(durationMs) + " seconds."); + + " for async operation id " + rebalanceAsyncId + durationString); progressBar.completeTask(taskId, partitionStoreCount); } diff --git a/src/java/voldemort/tools/RebalanceControllerCLI.java b/src/java/voldemort/tools/RebalanceControllerCLI.java index 8b7783356a..0ae8661d6d 100644 --- a/src/java/voldemort/tools/RebalanceControllerCLI.java +++ b/src/java/voldemort/tools/RebalanceControllerCLI.java @@ -103,7 +103,7 @@ private static void printUsage() { help.append(" Optional:\n"); help.append(" --final-stores [ Needed for zone expansion ]\n"); help.append(" --parallelism [ Number of rebalancing tasks to run in parallel ]"); - help.append(" --proxy-pause [ Seconds to pause between cluster change and server-side rebalancing tasks ]"); + help.append(" --proxy-pause [ Seconds to pause between cluster change and server-side rebalancing tasks ]"); help.append(" --tries [ Number of times to try starting an async rebalancing task on a node "); help.append(" --output-dir [ Output directory in which plan is stored ]\n"); help.append(" --batch [ Number of primary partitions to move in each rebalancing batch. ]\n"); @@ -167,16 +167,16 @@ public static void main(String[] args) throws Exception { tries = (Integer) options.valueOf("tries"); } - long proxyPauseS = RebalanceController.PROXY_PAUSE_IN_SECONDS; + long proxyPauseSec = RebalanceController.PROXY_PAUSE_IN_SECONDS; if(options.has("proxy-pause")) { - proxyPauseS = (Long) options.valueOf("proxy-pause"); + proxyPauseSec = (Long) options.valueOf("proxy-pause"); } RebalanceController rebalanceController = new RebalanceController(bootstrapURL, parallelism, tries, stealerBased, - proxyPauseS); + proxyPauseSec); Cluster currentCluster = rebalanceController.getCurrentCluster(); List currentStoreDefs = rebalanceController.getCurrentStoreDefs(); diff --git a/src/java/voldemort/tools/Repartitioner.java b/src/java/voldemort/tools/Repartitioner.java index 79667d8efc..6386a7e3d4 100644 --- a/src/java/voldemort/tools/Repartitioner.java +++ b/src/java/voldemort/tools/Repartitioner.java @@ -166,47 +166,48 @@ public static Cluster repartition(final Cluster currentCluster, double minUtility = Double.MAX_VALUE; for(int attempt = 0; attempt < attempts; attempt++) { - Cluster nextCluster = interimCluster; + Cluster nextCandidateCluster = interimCluster; if(maxContiguousPartitionsPerZone > 0) { - nextCluster = repeatedlyBalanceContiguousPartitionsPerZone(nextCluster, - maxContiguousPartitionsPerZone); + nextCandidateCluster = repeatedlyBalanceContiguousPartitionsPerZone(nextCandidateCluster, + maxContiguousPartitionsPerZone); } if(!disableNodeBalancing) { - nextCluster = balancePrimaryPartitions(nextCluster, !disableZoneBalancing); + nextCandidateCluster = balancePrimaryPartitions(nextCandidateCluster, + !disableZoneBalancing); } if(enableRandomSwaps) { - nextCluster = randomShufflePartitions(nextCluster, - randomSwapAttempts, - randomSwapSuccesses, - finalStoreDefs); + nextCandidateCluster = randomShufflePartitions(nextCandidateCluster, + randomSwapAttempts, + randomSwapSuccesses, + finalStoreDefs); } if(enableGreedySwaps) { - nextCluster = greedyShufflePartitions(nextCluster, - greedySwapAttempts, - greedySwapMaxPartitionsPerNode, - greedySwapMaxPartitionsPerZone, - new ArrayList(interimCluster.getZoneIds()), - finalStoreDefs); + nextCandidateCluster = greedyShufflePartitions(nextCandidateCluster, + greedySwapAttempts, + greedySwapMaxPartitionsPerNode, + greedySwapMaxPartitionsPerZone, + new ArrayList(interimCluster.getZoneIds()), + finalStoreDefs); } - RebalanceUtils.validateCurrentFinalCluster(currentCluster, nextCluster); + RebalanceUtils.validateCurrentFinalCluster(currentCluster, nextCandidateCluster); System.out.println("-------------------------\n"); - partitionBalance = new PartitionBalance(nextCluster, finalStoreDefs); + partitionBalance = new PartitionBalance(nextCandidateCluster, finalStoreDefs); double currentUtility = partitionBalance.getUtility(); System.out.println("Optimization number " + attempt + ": " + currentUtility + " max/min ratio"); System.out.println("-------------------------\n"); System.out.println(RebalanceUtils.analyzeInvalidMetadataRate(interimCluster, - currentStoreDefs, - nextCluster, - currentStoreDefs)); + finalStoreDefs, + nextCandidateCluster, + finalStoreDefs)); if(currentUtility <= minUtility) { minUtility = currentUtility; - minCluster = nextCluster; + minCluster = nextCandidateCluster; RebalanceUtils.dumpClusterToFile(outputDir, RebalanceUtils.finalClusterFileName + attempt, minCluster); @@ -233,16 +234,16 @@ public static Cluster repartition(final Cluster currentCluster, * have. The list of integers returned per zone is the same length as the * number of nodes in that zone. * - * @param nextCluster + * @param nextCandidateCluster * @param targetPartitionsPerZone * @return A map of zoneId to list of target number of partitions per node * within zone. */ - public static HashMap> getBalancedNumberOfPrimaryPartitionsPerNode(final Cluster nextCluster, + public static HashMap> getBalancedNumberOfPrimaryPartitionsPerNode(final Cluster nextCandidateCluster, Map targetPartitionsPerZone) { HashMap> numPartitionsPerNode = Maps.newHashMap(); - for(Integer zoneId: nextCluster.getZoneIds()) { - List partitionsOnNode = Utils.distributeEvenlyIntoList(nextCluster.getNumberOfNodesInZone(zoneId), + for(Integer zoneId: nextCandidateCluster.getZoneIds()) { + List partitionsOnNode = Utils.distributeEvenlyIntoList(nextCandidateCluster.getNumberOfNodesInZone(zoneId), targetPartitionsPerZone.get(zoneId)); numPartitionsPerNode.put(zoneId, partitionsOnNode); } @@ -254,23 +255,23 @@ public static HashMap> getBalancedNumberOfPrimaryPartitio * separates Nodes into donorNodes and stealerNodes based on whether the * node needs to donate or steal primary partitions. * - * @param nextCluster + * @param nextCandidateCluster * @param numPartitionsPerNodePerZone * @return a Pair. First element is donorNodes, second element is * stealerNodes. Each element in the pair is a HashMap of Node to * Integer where the integer value is the number of partitions to * store. */ - public static Pair, HashMap> getDonorsAndStealersForBalance(final Cluster nextCluster, + public static Pair, HashMap> getDonorsAndStealersForBalance(final Cluster nextCandidateCluster, Map> numPartitionsPerNodePerZone) { HashMap donorNodes = Maps.newHashMap(); HashMap stealerNodes = Maps.newHashMap(); HashMap numNodesAssignedInZone = Maps.newHashMap(); - for(Integer zoneId: nextCluster.getZoneIds()) { + for(Integer zoneId: nextCandidateCluster.getZoneIds()) { numNodesAssignedInZone.put(zoneId, 0); } - for(Node node: nextCluster.getNodes()) { + for(Node node: nextCandidateCluster.getNodes()) { int zoneId = node.getZoneId(); int offset = numNodesAssignedInZone.get(zoneId); @@ -308,40 +309,44 @@ public static Pair, HashMap> getDonorsAndS * responsible for determining which partition-stores move where for a * specific repartitioning. * - * @param nextCluster + * @param nextCandidateCluster * @param balanceZones indicates whether or not number of primary partitions * per zone should be balanced. * @return */ - public static Cluster balancePrimaryPartitions(final Cluster nextCluster, boolean balanceZones) { + public static Cluster balancePrimaryPartitions(final Cluster nextCandidateCluster, + boolean balanceZones) { System.out.println("Balance number of partitions across all nodes and zones."); Map targetPartitionsPerZone; if(balanceZones) { - targetPartitionsPerZone = Utils.distributeEvenlyIntoMap(nextCluster.getZoneIds(), - nextCluster.getNumberOfPartitions()); + targetPartitionsPerZone = Utils.distributeEvenlyIntoMap(nextCandidateCluster.getZoneIds(), + nextCandidateCluster.getNumberOfPartitions()); System.out.println("numPartitionsPerZone"); - for(int zoneId: nextCluster.getZoneIds()) { - System.out.println(zoneId + " : " + nextCluster.getNumberOfPartitionsInZone(zoneId) + for(int zoneId: nextCandidateCluster.getZoneIds()) { + System.out.println(zoneId + " : " + + nextCandidateCluster.getNumberOfPartitionsInZone(zoneId) + " -> " + targetPartitionsPerZone.get(zoneId)); } System.out.println("numNodesPerZone"); - for(int zoneId: nextCluster.getZoneIds()) { - System.out.println(zoneId + " : " + nextCluster.getNumberOfNodesInZone(zoneId)); + for(int zoneId: nextCandidateCluster.getZoneIds()) { + System.out.println(zoneId + " : " + + nextCandidateCluster.getNumberOfNodesInZone(zoneId)); } } else { // Keep number of partitions per zone the same. targetPartitionsPerZone = new HashMap(); - for(int zoneId: nextCluster.getZoneIds()) { - targetPartitionsPerZone.put(zoneId, nextCluster.getNumberOfPartitionsInZone(zoneId)); + for(int zoneId: nextCandidateCluster.getZoneIds()) { + targetPartitionsPerZone.put(zoneId, + nextCandidateCluster.getNumberOfPartitionsInZone(zoneId)); } } - HashMap> numPartitionsPerNodeByZone = getBalancedNumberOfPrimaryPartitionsPerNode(nextCluster, + HashMap> numPartitionsPerNodeByZone = getBalancedNumberOfPrimaryPartitionsPerNode(nextCandidateCluster, targetPartitionsPerZone); - Pair, HashMap> donorsAndStealers = getDonorsAndStealersForBalance(nextCluster, + Pair, HashMap> donorsAndStealers = getDonorsAndStealersForBalance(nextCandidateCluster, numPartitionsPerNodeByZone); HashMap donorNodes = donorsAndStealers.getFirst(); List donorNodeKeys = new ArrayList(donorNodes.keySet()); @@ -367,7 +372,7 @@ public static Cluster balancePrimaryPartitions(final Cluster nextCluster, boolea */ // Go over every stealerNode and steal partition Ids from donor nodes - Cluster returnCluster = ClusterUtils.copyCluster(nextCluster); + Cluster returnCluster = ClusterUtils.copyCluster(nextCandidateCluster); Collections.shuffle(stealerNodeKeys, new Random(System.currentTimeMillis())); for(Node stealerNode: stealerNodeKeys) { @@ -427,11 +432,11 @@ public static Cluster balancePrimaryPartitions(final Cluster nextCluster, boolea * partition runs in another zone. Therefore, this overall process is * repeated multiple times. * - * @param nextCluster + * @param nextCandidateCluster * @param maxContiguousPartitionsPerZone See RebalanceCLI. * @return */ - public static Cluster repeatedlyBalanceContiguousPartitionsPerZone(final Cluster nextCluster, + public static Cluster repeatedlyBalanceContiguousPartitionsPerZone(final Cluster nextCandidateCluster, final int maxContiguousPartitionsPerZone) { System.out.println("Looping to evenly balance partitions across zones while limiting contiguous partitions"); // This loop is hard to make definitive. I.e., there are corner cases @@ -440,7 +445,7 @@ public static Cluster repeatedlyBalanceContiguousPartitionsPerZone(final Cluster // Therefore, a constant number of loops are run. Note that once the // goal is reached, the loop becomes a no-op. int repeatContigBalance = 10; - Cluster returnCluster = nextCluster; + Cluster returnCluster = nextCandidateCluster; for(int i = 0; i < repeatContigBalance; i++) { returnCluster = balanceContiguousPartitionsPerZone(returnCluster, maxContiguousPartitionsPerZone); @@ -461,28 +466,29 @@ public static Cluster repeatedlyBalanceContiguousPartitionsPerZone(final Cluster * to some other random zone/node. There is some chance that such random * moves could result in contiguous partitions in other zones. * - * @param nextCluster cluster metadata + * @param nextCandidateCluster cluster metadata * @param maxContiguousPartitionsPerZone See RebalanceCLI. * @return Return updated cluster metadata. */ - public static Cluster balanceContiguousPartitionsPerZone(final Cluster nextCluster, + public static Cluster balanceContiguousPartitionsPerZone(final Cluster nextCandidateCluster, final int maxContiguousPartitionsPerZone) { System.out.println("Balance number of contiguous partitions within a zone."); System.out.println("numPartitionsPerZone"); - for(int zoneId: nextCluster.getZoneIds()) { - System.out.println(zoneId + " : " + nextCluster.getNumberOfPartitionsInZone(zoneId)); + for(int zoneId: nextCandidateCluster.getZoneIds()) { + System.out.println(zoneId + " : " + + nextCandidateCluster.getNumberOfPartitionsInZone(zoneId)); } System.out.println("numNodesPerZone"); - for(int zoneId: nextCluster.getZoneIds()) { - System.out.println(zoneId + " : " + nextCluster.getNumberOfNodesInZone(zoneId)); + for(int zoneId: nextCandidateCluster.getZoneIds()) { + System.out.println(zoneId + " : " + nextCandidateCluster.getNumberOfNodesInZone(zoneId)); } // Break up contiguous partitions within each zone HashMap> partitionsToRemoveFromZone = Maps.newHashMap(); System.out.println("Contiguous partitions"); - for(Integer zoneId: nextCluster.getZoneIds()) { + for(Integer zoneId: nextCandidateCluster.getZoneIds()) { System.out.println("\tZone: " + zoneId); - Map partitionToRunLength = ClusterUtils.getMapOfContiguousPartitions(nextCluster, + Map partitionToRunLength = ClusterUtils.getMapOfContiguousPartitions(nextCandidateCluster, zoneId); List partitionsToRemoveFromThisZone = new ArrayList(); @@ -491,7 +497,8 @@ public static Cluster balanceContiguousPartitionsPerZone(final Cluster nextClust List contiguousPartitions = new ArrayList(entry.getValue()); for(int partitionId = entry.getKey(); partitionId < entry.getKey() + entry.getValue(); partitionId++) { - contiguousPartitions.add(partitionId % nextCluster.getNumberOfPartitions()); + contiguousPartitions.add(partitionId + % nextCandidateCluster.getNumberOfPartitions()); } System.out.println("Contiguous partitions: " + contiguousPartitions); partitionsToRemoveFromThisZone.addAll(Utils.removeItemsToSplitListEvenly(contiguousPartitions, @@ -503,7 +510,7 @@ public static Cluster balanceContiguousPartitionsPerZone(final Cluster nextClust System.out.println("\t\tPartitions to remove: " + partitionsToRemoveFromThisZone); } - Cluster returnCluster = ClusterUtils.copyCluster(nextCluster); + Cluster returnCluster = ClusterUtils.copyCluster(nextCandidateCluster); Random r = new Random(); for(int zoneId: returnCluster.getZoneIds()) { @@ -536,12 +543,12 @@ public static Cluster balanceContiguousPartitionsPerZone(final Cluster nextClust * * @return modified cluster metadata. */ - public static Cluster swapPartitions(final Cluster nextCluster, + public static Cluster swapPartitions(final Cluster nextCandidateCluster, final int nodeIdA, final int partitionIdA, final int nodeIdB, final int partitionIdB) { - Cluster returnCluster = ClusterUtils.copyCluster(nextCluster); + Cluster returnCluster = ClusterUtils.copyCluster(nextCandidateCluster); // Swap partitions between nodes! returnCluster = RebalanceUtils.createUpdatedCluster(returnCluster, @@ -558,15 +565,16 @@ public static Cluster swapPartitions(final Cluster nextCluster, * Within a single zone, swaps one random partition on one random node with * another random partition on different random node. * - * @param nextCluster + * @param nextCandidateCluster * @param zoneId Zone ID within which to shuffle partitions * @return */ - public static Cluster swapRandomPartitionsWithinZone(final Cluster nextCluster, final int zoneId) { - Cluster returnCluster = ClusterUtils.copyCluster(nextCluster); + public static Cluster swapRandomPartitionsWithinZone(final Cluster nextCandidateCluster, + final int zoneId) { + Cluster returnCluster = ClusterUtils.copyCluster(nextCandidateCluster); Random r = new Random(); - List nodeIdsInZone = new ArrayList(nextCluster.getNodeIdsInZone(zoneId)); + List nodeIdsInZone = new ArrayList(nextCandidateCluster.getNodeIdsInZone(zoneId)); if(nodeIdsInZone.size() == 0) { return returnCluster; @@ -580,7 +588,7 @@ public static Cluster swapRandomPartitionsWithinZone(final Cluster nextCluster, List stealerPartitions = returnCluster.getNodeById(stealerNodeId) .getPartitionIds(); if(stealerPartitions.size() == 0) { - return nextCluster; + return nextCandidateCluster; } int stealerPartitionOffset = r.nextInt(stealerPartitions.size()); int stealerPartitionId = stealerPartitions.get(stealerPartitionOffset); @@ -611,18 +619,18 @@ public static Cluster swapRandomPartitionsWithinZone(final Cluster nextCluster, /** * Randomly shuffle partitions between nodes within every zone. * - * @param nextCluster cluster object. + * @param nextCandidateCluster cluster object. * @param randomSwapAttempts See RebalanceCLI. * @param randomSwapSuccesses See RebalanceCLI. * @param storeDefs List of store definitions * @return updated cluster */ - public static Cluster randomShufflePartitions(final Cluster nextCluster, + public static Cluster randomShufflePartitions(final Cluster nextCandidateCluster, final int randomSwapAttempts, final int randomSwapSuccesses, List storeDefs) { - List zoneIds = new ArrayList(nextCluster.getZoneIds()); - Cluster returnCluster = ClusterUtils.copyCluster(nextCluster); + List zoneIds = new ArrayList(nextCandidateCluster.getZoneIds()); + Cluster returnCluster = ClusterUtils.copyCluster(nextCandidateCluster); double currentUtility = new PartitionBalance(returnCluster, storeDefs).getUtility(); @@ -657,21 +665,21 @@ public static Cluster randomShufflePartitions(final Cluster nextCluster, * Large values of the greedSwapMaxPartitions... arguments make this method * equivalent to comparing every possible swap. This may get very expensive. * - * @param nextCluster + * @param nextCandidateCluster * @param zoneId Zone ID within which to shuffle partitions * @param greedySwapMaxPartitionsPerNode See RebalanceCLI. * @param greedySwapMaxPartitionsPerZone See RebalanceCLI. * @param storeDefs * @return */ - public static Cluster swapGreedyRandomPartitions(final Cluster nextCluster, + public static Cluster swapGreedyRandomPartitions(final Cluster nextCandidateCluster, final List nodeIds, final int greedySwapMaxPartitionsPerNode, final int greedySwapMaxPartitionsPerZone, List storeDefs) { System.out.println("GreedyRandom : nodeIds:" + nodeIds); - Cluster returnCluster = ClusterUtils.copyCluster(nextCluster); + Cluster returnCluster = ClusterUtils.copyCluster(nextCandidateCluster); double currentUtility = new PartitionBalance(returnCluster, storeDefs).getUtility(); int nodeIdA = -1; int nodeIdB = -1; @@ -739,7 +747,7 @@ public static Cluster swapGreedyRandomPartitions(final Cluster nextCluster, * * #zones X #nodes/zone X max partitions/node X max partitions/zone * - * @param nextCluster cluster object. + * @param nextCandidateCluster cluster object. * @param greedyAttempts See RebalanceCLI. * @param greedySwapMaxPartitionsPerNode See RebalanceCLI. * @param greedySwapMaxPartitionsPerZone See RebalanceCLI. @@ -748,7 +756,7 @@ public static Cluster swapGreedyRandomPartitions(final Cluster nextCluster, * @param storeDefs * @return */ - public static Cluster greedyShufflePartitions(final Cluster nextCluster, + public static Cluster greedyShufflePartitions(final Cluster nextCandidateCluster, final int greedyAttempts, final int greedySwapMaxPartitionsPerNode, final int greedySwapMaxPartitionsPerZone, @@ -759,7 +767,7 @@ public static Cluster greedyShufflePartitions(final Cluster nextCluster, zoneIds = new ArrayList(); zoneIds.add(specialZoneId); } - Cluster returnCluster = ClusterUtils.copyCluster(nextCluster); + Cluster returnCluster = ClusterUtils.copyCluster(nextCandidateCluster); if(zoneIds.isEmpty()) { logger.warn("greedyShufflePartitions invoked with empty list of zone IDs."); return returnCluster; @@ -775,9 +783,9 @@ public static Cluster greedyShufflePartitions(final Cluster nextCluster, List nodeIds; if(zoneId == specialZoneId) { - nodeIds = new ArrayList(nextCluster.getNodeIds()); + nodeIds = new ArrayList(nextCandidateCluster.getNodeIds()); } else { - nodeIds = new ArrayList(nextCluster.getNodeIdsInZone(zoneId)); + nodeIds = new ArrayList(nextCandidateCluster.getNodeIdsInZone(zoneId)); } Cluster shuffleResults = swapGreedyRandomPartitions(returnCluster, diff --git a/test/unit/voldemort/tools/RepartitionerTest.java b/test/unit/voldemort/tools/RepartitionerTest.java index 0b10ccae83..1db03f8daa 100644 --- a/test/unit/voldemort/tools/RepartitionerTest.java +++ b/test/unit/voldemort/tools/RepartitionerTest.java @@ -401,7 +401,20 @@ public void testClusterExpansion() { } @Test - public void testZoneExpansion() { + public void testZoneExpansionAsRepartitionerCLI() { + Cluster currentCluster = ClusterTestUtils.getZZCluster(); + List currentStoreDefs = ClusterTestUtils.getZZStoreDefsInMemory(); + + Cluster interimCluster = ClusterTestUtils.getZZZClusterWithNNN(); + List finalStoreDefs = ClusterTestUtils.getZZZStoreDefsInMemory(); + + verifyBalanceZoneAndNode(currentCluster, currentStoreDefs, interimCluster, finalStoreDefs); + // verifyBalanceNodesNotZones does not make sense for zone expansion. + verifyRepartitionNoop(currentCluster, currentStoreDefs, interimCluster, finalStoreDefs); + } + + @Test + public void testZoneExpansionAsRebalanceControllerCLI() { Cluster currentCluster = ClusterTestUtils.getZZECluster(); List currentStoreDefs = ClusterTestUtils.getZZZStoreDefsInMemory();