diff --git a/src/java/voldemort/client/protocol/admin/AdminClient.java b/src/java/voldemort/client/protocol/admin/AdminClient.java index 6d9d5a7c4a..95fc873fb0 100644 --- a/src/java/voldemort/client/protocol/admin/AdminClient.java +++ b/src/java/voldemort/client/protocol/admin/AdminClient.java @@ -2392,6 +2392,12 @@ public Versioned getRemoteRebalancerState(int nodeId) { value.getVersion()); } + // TODO: The "Order" column makes no sense to me. This is very, very + // hard to grok. I want to prefix each entry with in the order column + // with a number (e.g., '0: rebalance', '1: cluster->Swap', ...) but + // have no idea what the order is. Are they in order in the table? what + // do the arrows mean in the order column? + /** * Used in rebalancing to indicate change in states. Groups the * partition plans on the basis of stealer nodes and sends them over. @@ -2399,22 +2405,22 @@ public Versioned getRemoteRebalancerState(int nodeId) { * The various combinations and their order of execution is given below * *
-         * | swapRO | changeClusterMetadata | changeRebalanceState | Order |
-         * | f | t | t | cluster -> rebalance | 
-         * | f | f | t | rebalance |
-         * | t | t | f | cluster -> swap |
-         * | t | t | t | cluster -> swap -> rebalance |
+         * | swapRO | changeClusterMetadata | changeRebalanceState | Order                        |
+         * |   f    |         t             |          t           | cluster -> rebalance         | 
+         * |   f    |         f             |          t           | rebalance                    |
+         * |   t    |         t             |          f           | cluster -> swap              |
+         * |   t    |         t             |          t           | cluster -> swap -> rebalance |
          * 
* * * Similarly for rollback: * *
-         * | swapRO | changeClusterMetadata | changeRebalanceState | Order |
-         * | f | t | t | remove from rebalance -> cluster  | 
-         * | f | f | t | remove from rebalance |
-         * | t | t | f | cluster -> swap |
-         * | t | t | t | remove from rebalance -> cluster -> swap  |
+         * | swapRO | changeClusterMetadata | changeRebalanceState | Order                                    |
+         * |   f    |         t             |          t           | remove from rebalance -> cluster         | 
+         * |   f    |         f             |          t           | remove from rebalance                    |
+         * |   t    |         t             |          f           | cluster -> swap                          |
+         * |   t    |         t             |          t           | remove from rebalance -> cluster -> swap |
          * 
* * diff --git a/src/java/voldemort/client/rebalance/RebalanceController.java b/src/java/voldemort/client/rebalance/RebalanceController.java index 227f2251bd..7f595a9d32 100644 --- a/src/java/voldemort/client/rebalance/RebalanceController.java +++ b/src/java/voldemort/client/rebalance/RebalanceController.java @@ -69,15 +69,18 @@ public class RebalanceController { public final static int MAX_PARALLEL_REBALANCING = 1; public final static int MAX_TRIES_REBALANCING = 2; public final static boolean STEALER_BASED_REBALANCING = true; + public final static long PROXY_PAUSE_IN_SECONDS = TimeUnit.MINUTES.toSeconds(5); private final int maxParallelRebalancing; private final int maxTriesRebalancing; private final boolean stealerBasedRebalancing; + private final long proxyPauseS; public RebalanceController(String bootstrapUrl, int maxParallelRebalancing, int maxTriesRebalancing, - boolean stealerBased) { + boolean stealerBased, + long proxyPauseS) { this.adminClient = new AdminClient(bootstrapUrl, new AdminClientConfig(), new ClientConfig()); @@ -88,6 +91,7 @@ public RebalanceController(String bootstrapUrl, this.maxParallelRebalancing = maxParallelRebalancing; this.maxTriesRebalancing = maxTriesRebalancing; this.stealerBasedRebalancing = stealerBased; + this.proxyPauseS = proxyPauseS; } /** @@ -353,6 +357,7 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) { // STEP 2 - Move RO data if(hasReadOnlyStores) { + proxyPause(); executeSubBatch(batchCount, batchCurrentCluster, batchCurrentStoreDefs, @@ -379,6 +384,9 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) { // STEP 4 - Move RW data if(hasReadWriteStores) { + if(!hasReadOnlyStores) { + proxyPause(); + } executeSubBatch(batchCount, batchCurrentCluster, batchCurrentStoreDefs, @@ -398,6 +406,20 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) { } } + /** + * Pause between cluster change in metadata and starting server rebalancing + * work. + */ + private void proxyPause() { + logger.info("Pausing after cluster state has changed to allow proxy bridges to be established. " + + "Will start rebalancing work on servers in " + proxyPauseS + " seconds."); + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(proxyPauseS)); + } catch(InterruptedException e) { + logger.warn("Sleep interrupted in proxy pause."); + } + } + /** * * Perform a group of state change actions. Also any errors + rollback diff --git a/src/java/voldemort/server/VoldemortServer.java b/src/java/voldemort/server/VoldemortServer.java index c7471d827e..30278485e2 100644 --- a/src/java/voldemort/server/VoldemortServer.java +++ b/src/java/voldemort/server/VoldemortServer.java @@ -91,6 +91,13 @@ public VoldemortServer(VoldemortConfig config) { this.services = createServices(); } + /** + * Constructor is used exclusively by tests. I.e., this is not a code path + * that is exercised in production. + * + * @param config + * @param cluster + */ public VoldemortServer(VoldemortConfig config, Cluster cluster) { super(ServiceType.VOLDEMORT); this.voldemortConfig = config; diff --git a/src/java/voldemort/tools/RebalanceControllerCLI.java b/src/java/voldemort/tools/RebalanceControllerCLI.java index b1b17573ff..f97bfac8d3 100644 --- a/src/java/voldemort/tools/RebalanceControllerCLI.java +++ b/src/java/voldemort/tools/RebalanceControllerCLI.java @@ -64,6 +64,13 @@ private static void setupParser() { .withRequiredArg() .ofType(Integer.class) .describedAs("parallelism"); + parser.accepts("proxy-pause", + "Time, in seconds, to pause between changing cluster metadata and starting rebalance tasks on server. [ Default:" + + RebalanceController.PROXY_PAUSE_IN_SECONDS + " ]") + .withRequiredArg() + .ofType(Long.class) + .describedAs("proxy pause"); + parser.accepts("final-cluster", "Path to target cluster xml") .withRequiredArg() .describedAs("cluster.xml"); @@ -96,6 +103,7 @@ private static void printUsage() { help.append(" Optional:\n"); help.append(" --final-stores [ Needed for zone expansion ]\n"); help.append(" --parallelism [ Number of rebalancing tasks to run in parallel ]"); + help.append(" --proxy-pause [ Seconds to pause between cluster change and server-side rebalancing tasks ]"); help.append(" --tries [ Number of times to try starting an async rebalancing task on a node "); help.append(" --output-dir [ Output directory in which plan is stored ]\n"); help.append(" --batch [ Number of primary partitions to move in each rebalancing batch. ]\n"); @@ -159,10 +167,16 @@ public static void main(String[] args) throws Exception { tries = (Integer) options.valueOf("tries"); } + long proxyPauseS = RebalanceController.PROXY_PAUSE_IN_SECONDS; + if(options.has("proxy-pause")) { + proxyPauseS = (Long) options.valueOf("proxy-pause"); + } + RebalanceController rebalanceController = new RebalanceController(bootstrapURL, parallelism, tries, - stealerBased); + stealerBased, + proxyPauseS); Cluster currentCluster = rebalanceController.getCurrentCluster(); List currentStoreDefs = rebalanceController.getCurrentStoreDefs(); diff --git a/test/common/voldemort/ClusterTestUtils.java b/test/common/voldemort/ClusterTestUtils.java index c4c4aff109..90a4857add 100644 --- a/test/common/voldemort/ClusterTestUtils.java +++ b/test/common/voldemort/ClusterTestUtils.java @@ -41,6 +41,8 @@ public class ClusterTestUtils { + public static long REBALANCE_CONTROLLER_TEST_PROXY_PAUSE_IN_SECONDS = 5; + public static List getZZ111StoreDefs(String storageType) { List storeDefs = new LinkedList(); @@ -664,11 +666,13 @@ public static RebalanceKit getRebalanceKit(String bootstrapUrl, int maxParallel, int maxTries, boolean stealerBased, + long proxyPauseS, Cluster finalCluster) { RebalanceController rebalanceController = new RebalanceController(bootstrapUrl, maxParallel, maxTries, - stealerBased); + stealerBased, + proxyPauseS); RebalancePlan rebalancePlan = rebalanceController.getPlan(finalCluster, RebalancePlan.BATCH_SIZE); @@ -682,6 +686,7 @@ public static RebalanceKit getRebalanceKit(String bootstrapUrl, RebalanceController.MAX_PARALLEL_REBALANCING, RebalanceController.MAX_TRIES_REBALANCING, stealerBased, + REBALANCE_CONTROLLER_TEST_PROXY_PAUSE_IN_SECONDS, finalCluster); } @@ -693,6 +698,7 @@ public static RebalanceKit getRebalanceKit(String bootstrapUrl, maxParallel, RebalanceController.MAX_TRIES_REBALANCING, stealerBased, + REBALANCE_CONTROLLER_TEST_PROXY_PAUSE_IN_SECONDS, finalCluster); } @@ -703,7 +709,8 @@ public static RebalanceKit getRebalanceKit(String bootstrapUrl, RebalanceController rebalanceController = new RebalanceController(bootstrapUrl, RebalanceController.MAX_PARALLEL_REBALANCING, RebalanceController.MAX_TRIES_REBALANCING, - stealerBased); + stealerBased, + REBALANCE_CONTROLLER_TEST_PROXY_PAUSE_IN_SECONDS); RebalancePlan rebalancePlan = rebalanceController.getPlan(finalCluster, finalStoreDefs, RebalancePlan.BATCH_SIZE);