diff --git a/contrib/ec2-testing/test/voldemort/utils/Ec2RebalanceTest.java b/contrib/ec2-testing/test/voldemort/utils/Ec2RebalanceTest.java index 0fd77e2e2f..ac51110dc5 100644 --- a/contrib/ec2-testing/test/voldemort/utils/Ec2RebalanceTest.java +++ b/contrib/ec2-testing/test/voldemort/utils/Ec2RebalanceTest.java @@ -53,6 +53,8 @@ import voldemort.store.socket.SocketStoreFactory; import voldemort.store.socket.clientrequest.ClientRequestExecutorPool; +// TODO: Drop this class (as well as all other ec2 tests) since the tests have +// not been run in over a year. /** */ public class Ec2RebalanceTest extends AbstractNonZonedRebalanceTest { @@ -128,7 +130,8 @@ public void ec2Cleanup() throws Exception { } } - @Override + // TODO: This is probably broken since it was removed from + // AbstractNonZonedRebalanceTest protected Cluster updateCluster(Cluster template) { List nodes = new ArrayList(); for(Map.Entry entry: nodeIdsInv.entrySet()) { diff --git a/test/common/voldemort/ClusterTestUtils.java b/test/common/voldemort/ClusterTestUtils.java index b565b0f8fc..6843bc0981 100644 --- a/test/common/voldemort/ClusterTestUtils.java +++ b/test/common/voldemort/ClusterTestUtils.java @@ -22,6 +22,10 @@ import java.util.List; import voldemort.client.RoutingTier; +import voldemort.client.rebalance.RebalanceBatchPlan; +import voldemort.client.rebalance.RebalanceController; +import voldemort.client.rebalance.RebalancePartitionsInfo; +import voldemort.client.rebalance.RebalancePlan; import voldemort.cluster.Cluster; import voldemort.cluster.Node; import voldemort.cluster.Zone; @@ -36,8 +40,6 @@ public class ClusterTestUtils { - // TODO: Move these storeDefs and cluster helper test methods into - // ClusterTestUtils. public static List getZZ111StoreDefs(String storageType) { List storeDefs = new LinkedList(); @@ -224,6 +226,24 @@ public static List getZZZStoreDefsBDB() { // partitions is also intentional (i.e., some servers having more // partitions, and some zones having contiguous runs of partitions). + // NOTE: We want nodes returned from various get??Cluster?? methods that + // have the same ID to have the same ports. These static variables are used + // for this purpose. + static final private int MAX_NODES_IN_TEST_CLUSTER = 12; + static private int clusterPorts[] = null; + + /** + * Possibly sets, via freeports, clusterPorts, then returns + * + * @return ports for use in the cluster. + */ + static private int[] getClusterPorts() { + if(clusterPorts == null) { + clusterPorts = ServerTestUtils.findFreePorts(MAX_NODES_IN_TEST_CLUSTER * 3); + } + return clusterPorts; + } + /** * The 'Z' and 'E' prefixes in these method names indicate zones with * partitions and zones without partitions. @@ -233,7 +253,10 @@ public static Cluster getZZCluster() { int nodesPerZone[][] = new int[][] { { 0, 1, 2 }, { 3, 4, 5 } }; int partitionMap[][] = new int[][] { { 0, 6, 12, 16, 17 }, { 1, 7, 15 }, { 2, 8, 14 }, { 3, 9, 13 }, { 4, 10 }, { 5, 11 } }; - return ServerTestUtils.getLocalZonedCluster(numberOfZones, nodesPerZone, partitionMap); + return ServerTestUtils.getLocalZonedCluster(numberOfZones, + nodesPerZone, + partitionMap, + getClusterPorts()); } public static Cluster getZZClusterWithExtraPartitions() { @@ -241,7 +264,10 @@ public static Cluster getZZClusterWithExtraPartitions() { int nodesPerZone[][] = new int[][] { { 0, 1, 2 }, { 3, 4, 5 } }; int partitionMap[][] = new int[][] { { 0, 6, 12, 16, 17 }, { 1, 7, 15 }, { 2, 8, 14 }, { 3, 9, 13 }, { 4, 10 }, { 5, 11, 18 } }; - return ServerTestUtils.getLocalZonedCluster(numberOfZones, nodesPerZone, partitionMap); + return ServerTestUtils.getLocalZonedCluster(numberOfZones, + nodesPerZone, + partitionMap, + getClusterPorts()); } public static Cluster getZZClusterWithSwappedPartitions() { @@ -249,7 +275,10 @@ public static Cluster getZZClusterWithSwappedPartitions() { int nodesPerZone[][] = new int[][] { { 0, 1, 2 }, { 3, 4, 5 } }; int partitionMap[][] = new int[][] { { 0, 6, 16, 17 }, { 1, 7, 15 }, { 2, 8, 11, 14 }, { 3, 9, 13 }, { 4, 10 }, { 5, 12 } }; - return ServerTestUtils.getLocalZonedCluster(numberOfZones, nodesPerZone, partitionMap); + return ServerTestUtils.getLocalZonedCluster(numberOfZones, + nodesPerZone, + partitionMap, + getClusterPorts()); } public static Cluster getZZZCluster() { @@ -257,15 +286,21 @@ public static Cluster getZZZCluster() { int nodesPerZone[][] = new int[][] { { 0, 1, 2 }, { 3, 4, 5 }, { 6, 7, 8 } }; int partitionMap[][] = new int[][] { { 0, 9, 15, 16, 17 }, { 1, 10 }, { 2, 11 }, { 3, 12 }, { 4, 13 }, { 5, 14 }, { 6 }, { 7 }, { 8 } }; - return ServerTestUtils.getLocalZonedCluster(numberOfZones, nodesPerZone, partitionMap); + return ServerTestUtils.getLocalZonedCluster(numberOfZones, + nodesPerZone, + partitionMap, + getClusterPorts()); } public static Cluster getZZZClusterWithSwappedPartitions() { int numberOfZones = 3; int nodesPerZone[][] = new int[][] { { 0, 1, 2 }, { 3, 4, 5 }, { 6, 7, 8 } }; - int partitionMap[][] = new int[][] { { 0, 9, 17 }, { 1, 10 }, { 2, 11 }, { 3, 12 }, - { 4, 13 }, { 5, 14 }, { 6 }, { 7, 15 }, { 8, 16 } }; - return ServerTestUtils.getLocalZonedCluster(numberOfZones, nodesPerZone, partitionMap); + int partitionMap[][] = new int[][] { { 0, 9, 17 }, { 1, 5, 10 }, { 2, 11 }, { 3, 12 }, + { 4, 13 }, { 14 }, { 6 }, { 7, 15 }, { 8, 16 } }; + return ServerTestUtils.getLocalZonedCluster(numberOfZones, + nodesPerZone, + partitionMap, + getClusterPorts()); } public static Cluster getZECluster() { @@ -273,7 +308,10 @@ public static Cluster getZECluster() { int nodesPerZone[][] = new int[][] { { 0, 1, 2 }, { 3, 4, 5 } }; int partitionMap[][] = new int[][] { { 0, 1, 6, 7, 12, 13, 16, 17 }, { 2, 3, 8, 9, 14, 15 }, { 4, 5, 10, 11 }, {}, {}, {} }; - return ServerTestUtils.getLocalZonedCluster(numberOfZones, nodesPerZone, partitionMap); + return ServerTestUtils.getLocalZonedCluster(numberOfZones, + nodesPerZone, + partitionMap, + getClusterPorts()); } public static Cluster getZEZCluster() { @@ -281,7 +319,10 @@ public static Cluster getZEZCluster() { int nodesPerZone[][] = new int[][] { { 0, 1, 2 }, { 6, 7, 8 }, { 3, 4, 5 } }; int partitionMap[][] = new int[][] { { 0, 9, 6, 17 }, { 1, 10, 15 }, { 2, 11, 7 }, {}, {}, {}, { 3, 12, 16 }, { 4, 13, 8 }, { 5, 14 } }; - return ServerTestUtils.getLocalZonedCluster(numberOfZones, nodesPerZone, partitionMap); + return ServerTestUtils.getLocalZonedCluster(numberOfZones, + nodesPerZone, + partitionMap, + getClusterPorts()); } public static Cluster getZZECluster() { @@ -289,7 +330,10 @@ public static Cluster getZZECluster() { int nodesPerZone[][] = new int[][] { { 0, 1, 2 }, { 3, 4, 5 }, { 6, 7, 8 } }; int partitionMap[][] = new int[][] { { 0, 6, 12, 16, 17 }, { 1, 7, 15 }, { 2, 8, 14 }, { 3, 9, 13 }, { 4, 10 }, { 5, 11 }, {}, {}, {} }; - return ServerTestUtils.getLocalZonedCluster(numberOfZones, nodesPerZone, partitionMap); + return ServerTestUtils.getLocalZonedCluster(numberOfZones, + nodesPerZone, + partitionMap, + getClusterPorts()); } /** @@ -301,7 +345,10 @@ public static Cluster getZZClusterWithNN() { int nodesPerZone[][] = new int[][] { { 0, 1, 2, 6 }, { 3, 4, 5, 7 } }; int partitionMap[][] = new int[][] { { 0, 6, 12, 16, 17 }, { 1, 7, 15 }, { 2, 8, 14 }, {}, { 3, 9, 13 }, { 4, 10 }, { 5, 11 }, {} }; - return ServerTestUtils.getLocalZonedCluster(numberOfZones, nodesPerZone, partitionMap); + return ServerTestUtils.getLocalZonedCluster(numberOfZones, + nodesPerZone, + partitionMap, + getClusterPorts()); } public static Cluster getZZClusterWithNNWithSwappedNodeIds() { @@ -309,7 +356,10 @@ public static Cluster getZZClusterWithNNWithSwappedNodeIds() { int nodesPerZone[][] = new int[][] { { 0, 1, 6, 2 }, { 3, 4, 5, 7 } }; int partitionMap[][] = new int[][] { { 0, 6, 12, 16, 17 }, { 1, 7, 15 }, { 2, 8, 14 }, {}, { 3, 9, 13 }, { 4, 10 }, { 5, 11 }, {} }; - return ServerTestUtils.getLocalZonedCluster(numberOfZones, nodesPerZone, partitionMap); + return ServerTestUtils.getLocalZonedCluster(numberOfZones, + nodesPerZone, + partitionMap, + getClusterPorts()); } public static Cluster getZZZClusterWithNNN() { @@ -317,7 +367,10 @@ public static Cluster getZZZClusterWithNNN() { int nodesPerZone[][] = new int[][] { { 0, 1, 2, 9 }, { 3, 4, 5, 10 }, { 6, 7, 8, 11 } }; int partitionMap[][] = new int[][] { { 0, 9, 15, 16, 17 }, { 1, 10 }, { 2, 11 }, {}, { 3, 12 }, { 4, 13 }, { 5, 14 }, {}, { 6 }, { 7 }, { 8 }, {} }; - return ServerTestUtils.getLocalZonedCluster(numberOfZones, nodesPerZone, partitionMap); + return ServerTestUtils.getLocalZonedCluster(numberOfZones, + nodesPerZone, + partitionMap, + getClusterPorts()); } public static Cluster getZEZClusterWithXNN() { @@ -325,7 +378,10 @@ public static Cluster getZEZClusterWithXNN() { int nodesPerZone[][] = new int[][] { { 0, 1, 2 }, { 6, 7, 8 }, { 3, 4, 5, 9 } }; int partitionMap[][] = new int[][] { { 0, 9, 6, 17 }, { 1, 10, 15 }, { 2, 11, 7 }, {}, {}, {}, { 3, 12, 16 }, { 4, 13, 8 }, { 5, 14 }, {} }; - return ServerTestUtils.getLocalZonedCluster(numberOfZones, nodesPerZone, partitionMap); + return ServerTestUtils.getLocalZonedCluster(numberOfZones, + nodesPerZone, + partitionMap, + getClusterPorts()); } /** @@ -338,7 +394,10 @@ public static Cluster getZZClusterWithPP() { int nodesPerZone[][] = new int[][] { { 0, 1, 2, 6 }, { 3, 4, 5, 7 } }; int partitionMap[][] = new int[][] { { 0, 6, 12, 16, 17 }, { 7, 13 }, { 8, 14 }, { 1, 2 }, { 9, 15 }, { 10 }, { 5, 11 }, { 3, 4 } }; - return ServerTestUtils.getLocalZonedCluster(numberOfZones, nodesPerZone, partitionMap); + return ServerTestUtils.getLocalZonedCluster(numberOfZones, + nodesPerZone, + partitionMap, + getClusterPorts()); } public static Cluster getZZClusterWithPPWithSwappedNodeIds() { @@ -346,7 +405,10 @@ public static Cluster getZZClusterWithPPWithSwappedNodeIds() { int nodesPerZone[][] = new int[][] { { 0, 1, 6, 2 }, { 3, 4, 5, 7 } }; int partitionMap[][] = new int[][] { { 0, 6, 12, 16, 17 }, { 7, 13 }, { 8, 14 }, { 1, 2 }, { 9, 15 }, { 10 }, { 5, 11 }, { 3, 4 } }; - return ServerTestUtils.getLocalZonedCluster(numberOfZones, nodesPerZone, partitionMap); + return ServerTestUtils.getLocalZonedCluster(numberOfZones, + nodesPerZone, + partitionMap, + getClusterPorts()); } public static Cluster getZZClusterWithPPWithTooManyNodes() { @@ -354,7 +416,10 @@ public static Cluster getZZClusterWithPPWithTooManyNodes() { int nodesPerZone[][] = new int[][] { { 0, 1, 6, 2 }, { 3, 4, 5, 7, 8 } }; int partitionMap[][] = new int[][] { { 0, 6, 12, 16, 17 }, { 7, 13 }, { 8, 14 }, { 1, 2 }, { 9, 15 }, { 10 }, { 5, 11 }, { 3 }, { 4 } }; - return ServerTestUtils.getLocalZonedCluster(numberOfZones, nodesPerZone, partitionMap); + return ServerTestUtils.getLocalZonedCluster(numberOfZones, + nodesPerZone, + partitionMap, + getClusterPorts()); } public static Cluster getZZZClusterWithPPP() { @@ -362,7 +427,10 @@ public static Cluster getZZZClusterWithPPP() { int nodesPerZone[][] = new int[][] { { 0, 1, 2, 9 }, { 3, 4, 5, 10 }, { 6, 7, 8, 11 } }; int partitionMap[][] = new int[][] { { 0, 15, 16, 17 }, { 1, 10 }, { 11 }, { 2 }, { 12 }, { 4, 13 }, { 5, 14 }, { 3 }, { 6 }, { 7 }, { 8 }, { 9 } }; - return ServerTestUtils.getLocalZonedCluster(numberOfZones, nodesPerZone, partitionMap); + return ServerTestUtils.getLocalZonedCluster(numberOfZones, + nodesPerZone, + partitionMap, + getClusterPorts()); } public static Cluster getZEZClusterWithXPP() { @@ -370,7 +438,10 @@ public static Cluster getZEZClusterWithXPP() { int nodesPerZone[][] = new int[][] { { 0, 1, 2 }, { 6, 7, 8 }, { 3, 4, 5, 9 } }; int partitionMap[][] = new int[][] { { 0, 17 }, { 1, 15 }, { 2, 11, 7 }, { 6 }, { 9 }, { 10 }, { 3, 12, 16 }, { 4, 8 }, { 5, 14 }, { 13 } }; - return ServerTestUtils.getLocalZonedCluster(numberOfZones, nodesPerZone, partitionMap); + return ServerTestUtils.getLocalZonedCluster(numberOfZones, + nodesPerZone, + partitionMap, + getClusterPorts()); } public static Cluster getZZEClusterXXP() { @@ -378,7 +449,10 @@ public static Cluster getZZEClusterXXP() { int nodesPerZone[][] = new int[][] { { 0, 1, 2 }, { 3, 4, 5 }, { 6, 7, 8 } }; int partitionMap[][] = new int[][] { { 16, 17 }, { 1, 13 }, { 8, 14 }, { 3, 15 }, { 4, 10 }, { 5, 11 }, { 0, 6 }, { 2, 12 }, { 7, 9 } }; - return ServerTestUtils.getLocalZonedCluster(numberOfZones, nodesPerZone, partitionMap); + return ServerTestUtils.getLocalZonedCluster(numberOfZones, + nodesPerZone, + partitionMap, + getClusterPorts()); } /** @@ -390,7 +464,10 @@ public static Cluster getZEZClusterWithOnlyOneNodeInNewZone() { int nodesPerZone[][] = new int[][] { { 0, 1, 2 }, { 6 }, { 3, 4, 5 } }; int partitionMap[][] = new int[][] { { 0, 9, 6, 17 }, { 1, 10, 15 }, { 2, 11, 7 }, {}, { 3, 12, 16 }, { 4, 13, 8 }, { 5, 14 } }; - return ServerTestUtils.getLocalZonedCluster(numberOfZones, nodesPerZone, partitionMap); + return ServerTestUtils.getLocalZonedCluster(numberOfZones, + nodesPerZone, + partitionMap, + getClusterPorts()); } public static Cluster getZZZClusterWithOnlyOneNodeInNewZone() { @@ -398,7 +475,10 @@ public static Cluster getZZZClusterWithOnlyOneNodeInNewZone() { int nodesPerZone[][] = new int[][] { { 0, 1, 2 }, { 6 }, { 3, 4, 5 } }; int partitionMap[][] = new int[][] { { 0, 9, 6, 17 }, { 1, 10, 15 }, { 2, 11, 7 }, { 14 }, { 3, 12, 16 }, { 4, 13, 8 }, { 5 } }; - return ServerTestUtils.getLocalZonedCluster(numberOfZones, nodesPerZone, partitionMap); + return ServerTestUtils.getLocalZonedCluster(numberOfZones, + nodesPerZone, + partitionMap, + getClusterPorts()); } /** @@ -470,4 +550,103 @@ public static Cluster getZZClusterWithNonContiguousZoneIDsAndNonContiguousNodeID return new Cluster(cluster.getName(), nodeList, zones); } + /** + * Given the current and target cluster metadata, along with your store + * definition, return the batch plan. + * + * @param currentCluster Current cluster metadata + * @param targetCluster Target cluster metadata + * @param storeDef List of store definitions + * @return list of tasks for this batch plan + */ + public static List getBatchPlan(Cluster currentCluster, + Cluster targetCluster, + List storeDef) { + RebalanceBatchPlan rebalancePlan = new RebalanceBatchPlan(currentCluster, + targetCluster, + storeDef); + return rebalancePlan.getBatchPlan(); + } + + /** + * Constructs a plan to rebalance from current state (cCluster/cStores) to + * final state (fCluster/fStores). Uses default values for the planning. + * + * @param cCluster + * @param cStores + * @param fCluster + * @param fStores + * @return A complete RebalancePlan for the rebalance. + */ + public static RebalancePlan makePlan(Cluster cCluster, + List cStores, + Cluster fCluster, + List fStores) { + // Defaults for plans + int batchSize = RebalancePlan.BATCH_SIZE; + String outputDir = null; + + return new RebalancePlan(cCluster, cStores, fCluster, fStores, batchSize, outputDir); + } + + /** + * Helper class to hold a rebalance controller & plan for use in other + * tests. + * + */ + public static class RebalanceKit { + + public RebalanceController controller; + public RebalancePlan plan; + + public RebalanceKit(RebalanceController controller, RebalancePlan plan) { + this.controller = controller; + this.plan = plan; + } + + public void rebalance() { + this.controller.rebalance(this.plan); + } + } + + public static RebalanceKit getRebalanceKit(String bootstrapUrl, + int maxParallel, + int maxTries, + long timeout, + boolean stealerBased, + Cluster finalCluster) { + RebalanceController rebalanceController = new RebalanceController(bootstrapUrl, + maxParallel, + maxTries, + timeout, + stealerBased); + RebalancePlan rebalancePlan = rebalanceController.getPlan(finalCluster, + RebalancePlan.BATCH_SIZE); + + return new RebalanceKit(rebalanceController, rebalancePlan); + } + + public static RebalanceKit getRebalanceKit(String bootstrapUrl, + boolean stealerBased, + Cluster finalCluster) { + return getRebalanceKit(bootstrapUrl, + RebalanceController.MAX_PARALLEL_REBALANCING, + RebalanceController.MAX_TRIES_REBALANCING, + RebalanceController.REBALANCING_CLIENT_TIMEOUT_SEC, + stealerBased, + finalCluster); + } + + public static RebalanceKit getRebalanceKit(String bootstrapUrl, + int maxParallel, + boolean stealerBased, + Cluster finalCluster) { + return getRebalanceKit(bootstrapUrl, + maxParallel, + RebalanceController.MAX_TRIES_REBALANCING, + RebalanceController.REBALANCING_CLIENT_TIMEOUT_SEC, + stealerBased, + finalCluster); + } + } diff --git a/test/common/voldemort/ServerTestUtils.java b/test/common/voldemort/ServerTestUtils.java index ca173b4f97..b137371eb8 100644 --- a/test/common/voldemort/ServerTestUtils.java +++ b/test/common/voldemort/ServerTestUtils.java @@ -508,9 +508,11 @@ public static Cluster getLocalZonedCluster(int numberOfNodes, */ // TODO: Method should eventually accept a list of ZoneIds so that // non-contig zone Ids can be tested. + /*- public static Cluster getLocalZonedCluster(int numberOfZones, int[][] nodeIdsPerZone, int[][] partitionMap) { + if(numberOfZones < 1) { throw new VoldemortException("The number of zones must be positive (" + numberOfZones @@ -564,6 +566,65 @@ public static Cluster getLocalZonedCluster(int numberOfZones, } return new Cluster("cluster", nodes, zones); } + */ + + public static Cluster getLocalZonedCluster(int numberOfZones, + int[][] nodeIdsPerZone, + int[][] partitionMap, + int[] ports) { + + if(numberOfZones < 1) { + throw new VoldemortException("The number of zones must be positive (" + numberOfZones + + ")"); + } + if(nodeIdsPerZone.length != numberOfZones) { + throw new VoldemortException("Mismatch between numberOfZones (" + numberOfZones + + ") and size of nodesPerZone array (" + + nodeIdsPerZone.length + ")."); + } + + int numNodes = 0; + for(int nodeIdsInZone[]: nodeIdsPerZone) { + numNodes += nodeIdsInZone.length; + } + if(partitionMap.length != numNodes) { + throw new VoldemortException("Mismatch between numNodes (" + numNodes + + ") and size of partitionMap array (" + partitionMap + + ")."); + } + + // Generate nodes + List nodes = new ArrayList(); + int offset = 0; + for(int zoneId = 0; zoneId < numberOfZones; zoneId++) { + for(int nodeId: nodeIdsPerZone[zoneId]) { + List partitions = new ArrayList(partitionMap[nodeId].length); + for(int p: partitionMap[offset]) { + partitions.add(p); + } + nodes.add(new Node(nodeId, + "localhost", + ports[nodeId * 3], + ports[nodeId * 3 + 1], + ports[nodeId * 3 + 2], + zoneId, + partitions)); + offset++; + } + } + + List zones = Lists.newArrayList(); + for(int i = 0; i < numberOfZones; i++) { + LinkedList proximityList = Lists.newLinkedList(); + int zoneId = i + 1; + for(int j = 0; j < numberOfZones; j++) { + proximityList.add(zoneId % numberOfZones); + zoneId++; + } + zones.add(new Zone(i, proximityList)); + } + return new Cluster("cluster", nodes, zones); + } public static Node getLocalNode(int nodeId, List partitions) { int[] ports = findFreePorts(3); diff --git a/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java b/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java index 106e220aa8..281bb4c6b0 100644 --- a/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java +++ b/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java @@ -43,6 +43,7 @@ import org.junit.Before; import org.junit.Test; +import voldemort.ClusterTestUtils; import voldemort.ServerTestUtils; import voldemort.TestUtils; import voldemort.client.ClientConfig; @@ -236,38 +237,27 @@ public void testRORWRebalance() throws Exception { storeDefFileWithoutReplication, serverList, configProps); - // Update the cluster information based on the node information - targetCluster = updateCluster(targetCluster); - // TODO: make helper method(s) (possibly at AbstractREbalanceTest - // level) that constructs appropriate controller & plan. There is a - // fair bit of cut-and-pasted coding among these tests... String bootstrapUrl = getBootstrapUrl(currentCluster, 0); - int maxParallel = RebalanceController.MAX_PARALLEL_REBALANCING; - int maxTries = RebalanceController.MAX_TRIES_REBALANCING; - long timeout = RebalanceController.REBALANCING_CLIENT_TIMEOUT_SEC; boolean stealerBased = !useDonorBased; - RebalanceController rebalanceClient = new RebalanceController(bootstrapUrl, - maxParallel, - maxTries, - timeout, - stealerBased); - int batchSize = RebalancePlan.BATCH_SIZE; - RebalancePlan rebalancePlan = rebalanceClient.getPlan(targetCluster, batchSize); + final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, + stealerBased, + targetCluster); + try { // Populate the two stores populateData(currentCluster, roStoreDefWithoutReplication, - rebalanceClient.getAdminClient(), + rebalanceKit.controller.getAdminClient(), true); populateData(currentCluster, rwStoreDefWithoutReplication, - rebalanceClient.getAdminClient(), + rebalanceKit.controller.getAdminClient(), false); - rebalanceAndCheck(rebalancePlan, rebalanceClient, Arrays.asList(1)); + rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, Arrays.asList(1)); checkConsistentMetadata(targetCluster, serverList); } finally { @@ -300,35 +290,26 @@ public void testRORWRebalanceWithReplication() throws Exception { storeDefFileWithReplication, serverList, configProps); - // Update the cluster information based on the node information - targetCluster = updateCluster(targetCluster); String bootstrapUrl = getBootstrapUrl(currentCluster, 0); - int maxParallel = RebalanceController.MAX_PARALLEL_REBALANCING; - int maxTries = RebalanceController.MAX_TRIES_REBALANCING; - long timeout = RebalanceController.REBALANCING_CLIENT_TIMEOUT_SEC; boolean stealerBased = !useDonorBased; - RebalanceController rebalanceClient = new RebalanceController(bootstrapUrl, - maxParallel, - maxTries, - timeout, - stealerBased); - int batchSize = RebalancePlan.BATCH_SIZE; - RebalancePlan rebalancePlan = rebalanceClient.getPlan(targetCluster, batchSize); + final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, + stealerBased, + targetCluster); try { // Populate the two stores populateData(currentCluster, roStoreDefWithReplication, - rebalanceClient.getAdminClient(), + rebalanceKit.controller.getAdminClient(), true); populateData(currentCluster, rwStoreDefWithReplication, - rebalanceClient.getAdminClient(), + rebalanceKit.controller.getAdminClient(), false); - rebalanceAndCheck(rebalancePlan, rebalanceClient, Arrays.asList(0, 1)); + rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, Arrays.asList(0, 1)); checkConsistentMetadata(targetCluster, serverList); } finally { @@ -372,29 +353,20 @@ public void testRORebalanceWithReplication() throws Exception { roStoreDefFileWithReplication, serverList, configProps); - // Update the cluster information based on the node information - targetCluster = updateCluster(targetCluster); String bootstrapUrl = getBootstrapUrl(currentCluster, 0); - int maxParallel = RebalanceController.MAX_PARALLEL_REBALANCING; - int maxTries = RebalanceController.MAX_TRIES_REBALANCING; - long timeout = RebalanceController.REBALANCING_CLIENT_TIMEOUT_SEC; boolean stealerBased = !useDonorBased; - RebalanceController rebalanceClient = new RebalanceController(bootstrapUrl, - maxParallel, - maxTries, - timeout, - stealerBased); - int batchSize = RebalancePlan.BATCH_SIZE; - RebalancePlan rebalancePlan = rebalanceClient.getPlan(targetCluster, batchSize); + final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, + stealerBased, + targetCluster); try { populateData(currentCluster, roStoreDefWithReplication, - rebalanceClient.getAdminClient(), + rebalanceKit.controller.getAdminClient(), true); - rebalanceAndCheck(rebalancePlan, rebalanceClient, Arrays.asList(0, 1)); + rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, Arrays.asList(0, 1)); checkConsistentMetadata(targetCluster, serverList); } finally { // stop servers @@ -422,29 +394,19 @@ public void testRWRebalanceWithReplication() throws Exception { serverList, null); - // Update the cluster information based on the node information - targetCluster = updateCluster(targetCluster); - String bootstrapUrl = getBootstrapUrl(currentCluster, 0); - int maxParallel = RebalanceController.MAX_PARALLEL_REBALANCING; - int maxTries = RebalanceController.MAX_TRIES_REBALANCING; - long timeout = RebalanceController.REBALANCING_CLIENT_TIMEOUT_SEC; boolean stealerBased = !useDonorBased; - RebalanceController rebalanceClient = new RebalanceController(bootstrapUrl, - maxParallel, - maxTries, - timeout, - stealerBased); - int batchSize = RebalancePlan.BATCH_SIZE; - RebalancePlan rebalancePlan = rebalanceClient.getPlan(targetCluster, batchSize); + final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, + stealerBased, + targetCluster); try { populateData(currentCluster, rwStoreDefWithReplication, - rebalanceClient.getAdminClient(), + rebalanceKit.controller.getAdminClient(), false); - rebalanceAndCheck(rebalancePlan, rebalanceClient, Arrays.asList(0, 1)); + rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, Arrays.asList(0, 1)); checkConsistentMetadata(targetCluster, serverList); } finally { @@ -476,42 +438,32 @@ public void testRebalanceCleanPrimary() throws Exception { rwStoreDefFileWithReplication, serverList, configProps); - // Update the cluster information based on the node information - targetCluster = updateCluster(targetCluster); String bootstrapUrl = getBootstrapUrl(currentCluster, 0); - int maxParallel = RebalanceController.MAX_PARALLEL_REBALANCING; - int maxTries = RebalanceController.MAX_TRIES_REBALANCING; - long timeout = RebalanceController.REBALANCING_CLIENT_TIMEOUT_SEC; boolean stealerBased = !useDonorBased; - RebalanceController rebalanceClient = new RebalanceController(bootstrapUrl, - maxParallel, - maxTries, - timeout, - stealerBased); - int batchSize = RebalancePlan.BATCH_SIZE; - RebalancePlan rebalancePlan = rebalanceClient.getPlan(targetCluster, batchSize); + final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, + stealerBased, + targetCluster); try { - populateData(currentCluster, - rwStoreDefWithReplication, - rebalanceClient.getAdminClient(), - false); + AdminClient adminClient = rebalanceKit.controller.getAdminClient(); + populateData(currentCluster, rwStoreDefWithReplication, adminClient, false); - AdminClient admin = rebalanceClient.getAdminClient(); // Figure out the positive keys to check - List positiveTestKeyList = sampleKeysFromPartition(admin, + List positiveTestKeyList = sampleKeysFromPartition(adminClient, 1, rwStoreDefWithReplication.getName(), Arrays.asList(1), 20); - rebalanceAndCheck(rebalancePlan, rebalanceClient, Arrays.asList(0, 1, 2)); + rebalanceAndCheck(rebalanceKit.plan, + rebalanceKit.controller, + Arrays.asList(0, 1, 2)); checkConsistentMetadata(targetCluster, serverList); // Do the cleanup operation for(int i = 0; i < 3; i++) { - admin.storeMntOps.repairJob(i); + adminClient.storeMntOps.repairJob(i); } // wait for the repairs to complete for(int i = 0; i < 3; i++) { @@ -519,7 +471,7 @@ public void testRebalanceCleanPrimary() throws Exception { } // do the positive tests - checkForKeyExistence(admin, + checkForKeyExistence(adminClient, 1, rwStoreDefWithReplication.getName(), positiveTestKeyList); @@ -554,42 +506,32 @@ public void testRebalanceCleanSecondary() throws Exception { rwStoreDefFileWithReplication, serverList, configProps); - // Update the cluster information based on the node information - targetCluster = updateCluster(targetCluster); String bootstrapUrl = getBootstrapUrl(currentCluster, 0); - int maxParallel = RebalanceController.MAX_PARALLEL_REBALANCING; - int maxTries = RebalanceController.MAX_TRIES_REBALANCING; - long timeout = RebalanceController.REBALANCING_CLIENT_TIMEOUT_SEC; boolean stealerBased = !useDonorBased; - RebalanceController rebalanceClient = new RebalanceController(bootstrapUrl, - maxParallel, - maxTries, - timeout, - stealerBased); - int batchSize = RebalancePlan.BATCH_SIZE; - RebalancePlan rebalancePlan = rebalanceClient.getPlan(targetCluster, batchSize); + final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, + stealerBased, + targetCluster); try { - populateData(currentCluster, - rwStoreDefWithReplication, - rebalanceClient.getAdminClient(), - false); + AdminClient adminClient = rebalanceKit.controller.getAdminClient(); + populateData(currentCluster, rwStoreDefWithReplication, adminClient, false); - AdminClient admin = rebalanceClient.getAdminClient(); // Figure out the positive and negative keys to check - List positiveTestKeyList = sampleKeysFromPartition(admin, + List positiveTestKeyList = sampleKeysFromPartition(adminClient, 0, rwStoreDefWithReplication.getName(), Arrays.asList(3), 20); - rebalanceAndCheck(rebalancePlan, rebalanceClient, Arrays.asList(0, 1, 2)); + rebalanceAndCheck(rebalanceKit.plan, + rebalanceKit.controller, + Arrays.asList(0, 1, 2)); checkConsistentMetadata(targetCluster, serverList); // Do the cleanup operation for(int i = 0; i < 3; i++) { - admin.storeMntOps.repairJob(i); + adminClient.storeMntOps.repairJob(i); } // wait for the repairs to complete for(int i = 0; i < 3; i++) { @@ -597,7 +539,7 @@ public void testRebalanceCleanSecondary() throws Exception { } // do the positive tests - checkForKeyExistence(admin, + checkForKeyExistence(adminClient, 0, rwStoreDefWithReplication.getName(), positiveTestKeyList); @@ -638,34 +580,27 @@ public void testRWRebalanceFourNodes() throws Exception { rwTwoStoreDefFileWithReplication, serverList, null); - // Update the cluster information based on the node information - targetCluster = updateCluster(targetCluster); String bootstrapUrl = getBootstrapUrl(currentCluster, 0); int maxParallel = 5; - int maxTries = RebalanceController.MAX_TRIES_REBALANCING; - long timeout = RebalanceController.REBALANCING_CLIENT_TIMEOUT_SEC; boolean stealerBased = !useDonorBased; - RebalanceController rebalanceClient = new RebalanceController(bootstrapUrl, - maxParallel, - maxTries, - timeout, - stealerBased); - int batchSize = 100; - RebalancePlan rebalancePlan = rebalanceClient.getPlan(targetCluster, batchSize); + final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, + maxParallel, + stealerBased, + targetCluster); try { populateData(currentCluster, rwStoreDefWithReplication, - rebalanceClient.getAdminClient(), + rebalanceKit.controller.getAdminClient(), false); populateData(currentCluster, rwStoreDefWithReplication2, - rebalanceClient.getAdminClient(), + rebalanceKit.controller.getAdminClient(), false); - rebalanceAndCheck(rebalancePlan, rebalanceClient, serverList); + rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, serverList); checkConsistentMetadata(targetCluster, serverList); } catch(Exception e) { @@ -707,34 +642,27 @@ public void testRWRebalanceSerial() throws Exception { rwTwoStoreDefFileWithReplication, serverList, serverProps); - // Update the cluster information based on the node information - targetCluster = updateCluster(targetCluster); String bootstrapUrl = getBootstrapUrl(currentCluster, 0); int maxParallel = 5; - int maxTries = RebalanceController.MAX_TRIES_REBALANCING; - long timeout = RebalanceController.REBALANCING_CLIENT_TIMEOUT_SEC; boolean stealerBased = !useDonorBased; - RebalanceController rebalanceClient = new RebalanceController(bootstrapUrl, - maxParallel, - maxTries, - timeout, - stealerBased); - int batchSize = 100; - RebalancePlan rebalancePlan = rebalanceClient.getPlan(targetCluster, batchSize); + final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, + maxParallel, + stealerBased, + targetCluster); try { populateData(currentCluster, rwStoreDefWithReplication, - rebalanceClient.getAdminClient(), + rebalanceKit.controller.getAdminClient(), false); populateData(currentCluster, rwStoreDefWithReplication2, - rebalanceClient.getAdminClient(), + rebalanceKit.controller.getAdminClient(), false); - rebalanceAndCheck(rebalancePlan, rebalanceClient, serverList); + rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, serverList); checkConsistentMetadata(targetCluster, serverList); } catch(Exception e) { @@ -768,37 +696,29 @@ public void testProxyGetDuringRebalancing() throws Exception { storeDefFileWithReplication, serverList, configProps); - final Cluster updatedTargetCluster = updateCluster(targetCluster); ExecutorService executors = Executors.newFixedThreadPool(2); final AtomicBoolean rebalancingComplete = new AtomicBoolean(false); final List exceptions = Collections.synchronizedList(new ArrayList()); - String bootstrapUrl = getBootstrapUrl(updatedCurrentCluster, 0); + String bootstrapUrl = getBootstrapUrl(currentCluster, 0); int maxParallel = 2; - int maxTries = RebalanceController.MAX_TRIES_REBALANCING; - long timeout = RebalanceController.REBALANCING_CLIENT_TIMEOUT_SEC; - // We are forced to use stealer based since RO does not support - // donor based rebalancing yet + // Must use stealer-based for RO! boolean stealerBased = true; - final RebalanceController rebalanceClient = new RebalanceController(bootstrapUrl, - maxParallel, - maxTries, - timeout, - stealerBased); - int batchSize = RebalancePlan.BATCH_SIZE; - final RebalancePlan rebalancePlan = rebalanceClient.getPlan(updatedTargetCluster, - batchSize); + final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, + maxParallel, + stealerBased, + targetCluster); // Populate the two stores populateData(updatedCurrentCluster, roStoreDefWithReplication, - rebalanceClient.getAdminClient(), + rebalanceKit.controller.getAdminClient(), true); populateData(updatedCurrentCluster, rwStoreDefWithReplication, - rebalanceClient.getAdminClient(), + rebalanceKit.controller.getAdminClient(), false); final SocketStoreClientFactory factory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(getBootstrapUrl(updatedCurrentCluster, @@ -873,11 +793,13 @@ public void run() { Thread.sleep(500); - rebalanceAndCheck(rebalancePlan, rebalanceClient, Arrays.asList(0, 1)); + rebalanceAndCheck(rebalanceKit.plan, + rebalanceKit.controller, + Arrays.asList(0, 1)); Thread.sleep(500); rebalancingComplete.set(true); - checkConsistentMetadata(updatedTargetCluster, serverList); + checkConsistentMetadata(targetCluster, serverList); } catch(Exception e) { exceptions.add(e); logger.error("Exception in rebalancing thread", e); @@ -935,7 +857,6 @@ public void testProxyPutDuringRebalancing() throws Exception { rwStoreDefFileWithReplication, serverList, configProps); - final Cluster updatedTargetCluster = updateCluster(targetCluster); ExecutorService executors = Executors.newFixedThreadPool(2); final AtomicBoolean rebalancingComplete = new AtomicBoolean(false); @@ -945,26 +866,20 @@ public void testProxyPutDuringRebalancing() throws Exception { // batches would mean the proxy bridges being torn down and // established multiple times and we cannot test against the source // cluster topology then. - String bootstrapUrl = getBootstrapUrl(updatedCurrentCluster, 0); + String bootstrapUrl = getBootstrapUrl(currentCluster, 0); int maxParallel = 2; - int maxTries = RebalanceController.MAX_TRIES_REBALANCING; - long timeout = RebalanceController.REBALANCING_CLIENT_TIMEOUT_SEC; boolean stealerBased = !useDonorBased; - final RebalanceController rebalanceClient = new RebalanceController(bootstrapUrl, - maxParallel, - maxTries, - timeout, - stealerBased); - int batchSize = RebalancePlan.BATCH_SIZE; - final RebalancePlan rebalancePlan = rebalanceClient.getPlan(updatedTargetCluster, - batchSize); + final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, + maxParallel, + stealerBased, + targetCluster); populateData(updatedCurrentCluster, rwStoreDefWithReplication, - rebalanceClient.getAdminClient(), + rebalanceKit.controller.getAdminClient(), false); - final AdminClient adminClient = rebalanceClient.getAdminClient(); + final AdminClient adminClient = rebalanceKit.controller.getAdminClient(); // the plan would cause these partitions to move // Partition : Donor -> Stealer // p2 (SEC) : 1 -> 0 @@ -1071,7 +986,7 @@ public void run() { @Override public void run() { try { - rebalanceClient.rebalance(rebalancePlan); + rebalanceKit.rebalance(); } catch(Exception e) { logger.error("Error in rebalancing... ", e); exceptions.add(e); @@ -1091,12 +1006,12 @@ public void run() { true); assertEquals("Not enough time to begin proxy writing", proxyWritesDone.get(), true); checkEntriesPostRebalance(updatedCurrentCluster, - updatedTargetCluster, + targetCluster, Lists.newArrayList(rwStoreDefWithReplication), Arrays.asList(0, 1, 2), baselineTuples, baselineVersions); - checkConsistentMetadata(updatedTargetCluster, serverList); + checkConsistentMetadata(targetCluster, serverList); // check No Exception if(exceptions.size() > 0) { @@ -1155,36 +1070,28 @@ public void testServerSideRouting() throws Exception { storeDefFileWithReplication, serverList, configProps); - final Cluster updatedTargetCluster = updateCluster(targetCluster); ExecutorService executors = Executors.newFixedThreadPool(2); final AtomicBoolean rebalancingToken = new AtomicBoolean(false); final List exceptions = Collections.synchronizedList(new ArrayList()); - // populate data now. - String bootstrapUrl = getBootstrapUrl(updatedCurrentCluster, 0); + String bootstrapUrl = getBootstrapUrl(currentCluster, 0); int maxParallel = 2; - int maxTries = RebalanceController.MAX_TRIES_REBALANCING; - long timeout = RebalanceController.REBALANCING_CLIENT_TIMEOUT_SEC; boolean stealerBased = !useDonorBased; - final RebalanceController rebalanceClient = new RebalanceController(bootstrapUrl, - maxParallel, - maxTries, - timeout, - stealerBased); - int batchSize = RebalancePlan.BATCH_SIZE; - final RebalancePlan rebalancePlan = rebalanceClient.getPlan(updatedTargetCluster, - batchSize); + final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, + maxParallel, + stealerBased, + targetCluster); // Populate the two stores populateData(updatedCurrentCluster, roStoreDefWithReplication, - rebalanceClient.getAdminClient(), + rebalanceKit.controller.getAdminClient(), true); populateData(updatedCurrentCluster, rwStoreDefWithReplication, - rebalanceClient.getAdminClient(), + rebalanceKit.controller.getAdminClient(), false); Node node = updatedCurrentCluster.getNodeById(1); @@ -1261,7 +1168,9 @@ public void run() { public void run() { try { Thread.sleep(500); - rebalanceAndCheck(rebalancePlan, rebalanceClient, Arrays.asList(0, 1)); + rebalanceAndCheck(rebalanceKit.plan, + rebalanceKit.controller, + Arrays.asList(0, 1)); Thread.sleep(500); rebalancingToken.set(true); diff --git a/test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java b/test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java index 3361cccfd6..c101a23226 100644 --- a/test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java +++ b/test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java @@ -107,10 +107,6 @@ protected Cluster startServers(Cluster cluster, return cluster; } - protected Cluster updateCluster(Cluster template) { - return template; - } - protected Store getSocketStore(String storeName, String host, int port) { diff --git a/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java b/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java index cb54c9f8f4..d8168c86e6 100644 --- a/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java +++ b/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java @@ -41,8 +41,10 @@ import org.apache.log4j.Logger; import org.junit.After; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; +import voldemort.ClusterTestUtils; import voldemort.ServerTestUtils; import voldemort.client.ClientConfig; import voldemort.client.DefaultStoreClient; @@ -92,6 +94,22 @@ public abstract class AbstractZonedRebalanceTest extends AbstractRebalanceTest { protected static String rwStoreDefFileWithReplication; protected static String rwTwoStoreDefFileWithReplication; + static Cluster zzCurrent; + static Cluster zzShuffle; + static Cluster zzClusterExpansionNN; + static Cluster zzClusterExpansionPP; + static String zzStoresXml; + static List zzStores; + + static Cluster zzzCurrent; + static Cluster zzzShuffle; + static Cluster zzzClusterExpansionNNN; + static Cluster zzzClusterExpansionPPP; + static Cluster zzeZoneExpansion; + static Cluster zzzZoneExpansionXXP; + static String zzzStoresXml; + static List zzzStores; + private List storeDefWithoutReplication; private List storeDefWithReplication; private StoreDefinition rwStoreDefWithoutReplication; @@ -102,6 +120,31 @@ public AbstractZonedRebalanceTest(boolean useNio, boolean useDonorBased) { super(useNio, useDonorBased); } + @BeforeClass + public static void generalSetup() throws IOException { + zzCurrent = ClusterTestUtils.getZZCluster(); + zzShuffle = ClusterTestUtils.getZZClusterWithSwappedPartitions(); + zzClusterExpansionNN = ClusterTestUtils.getZZClusterWithNN(); + zzClusterExpansionPP = ClusterTestUtils.getZZClusterWithPP(); + + zzStores = ClusterTestUtils.getZZStoreDefsBDB(); + File zzfile = File.createTempFile("zz-stores-", ".xml"); + FileUtils.writeStringToFile(zzfile, new StoreDefinitionsMapper().writeStoreList(zzStores)); + zzStoresXml = zzfile.getAbsolutePath(); + + zzzCurrent = ClusterTestUtils.getZZZCluster(); + zzzShuffle = ClusterTestUtils.getZZZClusterWithSwappedPartitions(); + zzzClusterExpansionNNN = ClusterTestUtils.getZZZClusterWithNNN(); + zzzClusterExpansionPPP = ClusterTestUtils.getZZZClusterWithPPP(); + zzeZoneExpansion = ClusterTestUtils.getZZECluster(); + zzzZoneExpansionXXP = ClusterTestUtils.getZZEClusterXXP(); + + zzzStores = ClusterTestUtils.getZZZStoreDefsBDB(); + File zzzfile = File.createTempFile("zzz-stores-", ".xml"); + FileUtils.writeStringToFile(zzzfile, new StoreDefinitionsMapper().writeStoreList(zzzStores)); + zzzStoresXml = zzzfile.getAbsolutePath(); + } + @Before public void setUp() throws IOException { // First without replication @@ -194,6 +237,104 @@ public void tearDown() { socketStoreFactory = null; } + // TODO: The tests based on this method are susceptible to TOCTOU + // BindException issue since findFreePorts is used to determine the ports + // for localhost:PORT of each node. + public void testZonedRebalance(String testTag, + Cluster cCluster, + Cluster fCluster, + String storesXml, + List storeDefs) throws Exception { + logger.info("Starting " + testTag); + // Hacky work around of TOCTOU bind Exception issues. Each test that + // invokes this method brings servers up & down on the same ports. + Thread.sleep(TimeUnit.SECONDS.toMillis(2)); + try { + Cluster currentCluster = cCluster; + Cluster targetCluster = fCluster; + + // start all the servers + List serverList = new ArrayList(currentCluster.getNodeIds()); + Map configProps = new HashMap(); + configProps.put("admin.max.threads", "5"); + currentCluster = startServers(currentCluster, storesXml, serverList, configProps); + + String bootstrapUrl = getBootstrapUrl(currentCluster, 0); + boolean stealerBased = !useDonorBased; + ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, + stealerBased, + targetCluster); + + try { + for(StoreDefinition storeDef: storeDefs) { + populateData(currentCluster, storeDef); + } + + rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, serverList); + + checkConsistentMetadata(targetCluster, serverList); + } finally { + // stop servers + stopServer(serverList); + } + } catch(AssertionError ae) { + logger.error("Assertion broken in " + testTag + " : ", ae); + throw ae; + } + } + + @Test(timeout = 600000) + public void testNoopZZ() throws Exception { + testZonedRebalance("TestNoopZZ", zzCurrent, zzCurrent, zzStoresXml, zzStores); + } + + @Test(timeout = 600000) + public void testShuffleZZ() throws Exception { + testZonedRebalance("TestShuffleZZ", zzCurrent, zzShuffle, zzStoresXml, zzStores); + } + + // TODO: Ideally, zzCurrent would be passed instead of zzClusterExpansionNN. + @Test(timeout = 600000) + public void testClusterExpansionZZ() throws Exception { + testZonedRebalance("TestClusterExpansionZZ", + zzClusterExpansionNN, + zzClusterExpansionPP, + zzStoresXml, + zzStores); + } + + @Test(timeout = 600000) + public void testNoopZZZ() throws Exception { + testZonedRebalance("TestNoopZZZ", zzzCurrent, zzzCurrent, zzzStoresXml, zzzStores); + } + + @Test(timeout = 600000) + public void testShuffleZZZ() throws Exception { + testZonedRebalance("TestShuffleZZZ", zzzCurrent, zzzShuffle, zzzStoresXml, zzzStores); + } + + // TODO: Ideally, zzzCurrent would be passed instead of + // zzzClusterExpansionNNN. + @Test(timeout = 600000) + public void testClusterExpansionZZZ() throws Exception { + testZonedRebalance("TestClusterExpansionZZZ", + zzzClusterExpansionNNN, + zzzClusterExpansionPPP, + zzzStoresXml, + zzzStores); + } + + // TODO: Pass in zzCurrent and zzzZoneExpansionXXP after atomic metadata + // update is in place. + @Test(timeout = 600000) + public void testZoneExpansionZZ2ZZZ() throws Exception { + testZonedRebalance("TestZoneExpansionZZ2ZZZ", + zzeZoneExpansion, + zzzZoneExpansionXXP, + zzzStoresXml, + zzzStores); + } + @Test(timeout = 600000) public void testRWRebalance() throws Exception { logger.info("Starting testRWRebalance"); @@ -216,28 +357,17 @@ public void testRWRebalance() throws Exception { storeDefFileWithoutReplication, serverList, configProps); - // Update the cluster information based on the node information - targetCluster = updateCluster(targetCluster); - // TODO: make helper method(s) (possibly at AbstractREbalanceTest - // level) that constructs appropriate controller & plan. String bootstrapUrl = getBootstrapUrl(currentCluster, 0); - int maxParallel = RebalanceController.MAX_PARALLEL_REBALANCING; - int maxTries = RebalanceController.MAX_TRIES_REBALANCING; - long timeout = RebalanceController.REBALANCING_CLIENT_TIMEOUT_SEC; boolean stealerBased = !useDonorBased; - RebalanceController rebalanceClient = new RebalanceController(bootstrapUrl, - maxParallel, - maxTries, - timeout, - stealerBased); - int batchSize = RebalancePlan.BATCH_SIZE; - RebalancePlan rebalancePlan = rebalanceClient.getPlan(targetCluster, batchSize); + ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, + stealerBased, + targetCluster); try { populateData(currentCluster, rwStoreDefWithoutReplication); - rebalanceAndCheck(rebalancePlan, rebalanceClient, Arrays.asList(1, 2)); + rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, Arrays.asList(1, 2)); checkConsistentMetadata(targetCluster, serverList); } finally { @@ -274,27 +404,20 @@ public void testRWRebalanceWithReplication(boolean serial) throws Exception { storeDefFileWithReplication, serverList, configProps); - // Update the cluster information based on the node information - targetCluster = updateCluster(targetCluster); String bootstrapUrl = getBootstrapUrl(currentCluster, 0); int maxParallel = 5; - int maxTries = RebalanceController.MAX_TRIES_REBALANCING; - long timeout = RebalanceController.REBALANCING_CLIENT_TIMEOUT_SEC; boolean stealerBased = !useDonorBased; - RebalanceController rebalanceClient = new RebalanceController(bootstrapUrl, - maxParallel, - maxTries, - timeout, - stealerBased); - int batchSize = 100; - RebalancePlan rebalancePlan = rebalanceClient.getPlan(targetCluster, batchSize); + ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, + maxParallel, + stealerBased, + targetCluster); try { populateData(currentCluster, rwStoreDefWithReplication); - rebalanceAndCheck(rebalancePlan, rebalanceClient, Arrays.asList(0, 1, 2, 3)); + rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, Arrays.asList(0, 1, 2, 3)); checkConsistentMetadata(targetCluster, serverList); } finally { @@ -356,26 +479,17 @@ public void testRebalanceCleanPrimarySecondary() throws Exception { rwStoreDefFileWithReplication, serverList, configProps); - // Update the cluster information based on the node information - targetCluster = updateCluster(targetCluster); String bootstrapUrl = getBootstrapUrl(currentCluster, 0); - int maxParallel = RebalanceController.MAX_PARALLEL_REBALANCING; - int maxTries = RebalanceController.MAX_TRIES_REBALANCING; - long timeout = RebalanceController.REBALANCING_CLIENT_TIMEOUT_SEC; boolean stealerBased = !useDonorBased; - RebalanceController rebalanceClient = new RebalanceController(bootstrapUrl, - maxParallel, - maxTries, - timeout, - stealerBased); - int batchSize = RebalancePlan.BATCH_SIZE; - RebalancePlan rebalancePlan = rebalanceClient.getPlan(targetCluster, batchSize); + ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, + stealerBased, + targetCluster); try { populateData(currentCluster, rwStoreDefWithReplication); - AdminClient admin = rebalanceClient.getAdminClient(); + AdminClient admin = rebalanceKit.controller.getAdminClient(); List p6KeySamples = sampleKeysFromPartition(admin, 1, @@ -403,7 +517,9 @@ public void testRebalanceCleanPrimarySecondary() throws Exception { Arrays.asList(7), 20); - rebalanceAndCheck(rebalancePlan, rebalanceClient, Arrays.asList(0, 1, 2, 3)); + rebalanceAndCheck(rebalanceKit.plan, + rebalanceKit.controller, + Arrays.asList(0, 1, 2, 3)); checkConsistentMetadata(targetCluster, serverList); @@ -452,12 +568,12 @@ public void testProxyGetDuringRebalancing() throws Exception { try { Cluster currentCluster = ServerTestUtils.getLocalZonedCluster(4, 2, new int[] { 0, 0, 1, 1 }, new int[][] { { 0, 2, 4 }, { 6 }, { 1, 3, 5 }, { 7 } }); - Cluster targetCluster = RebalanceUtils.createUpdatedCluster(currentCluster, - 3, - Lists.newArrayList(2)); - targetCluster = RebalanceUtils.createUpdatedCluster(targetCluster, - 1, - Lists.newArrayList(3)); + Cluster tmpTargetCluster = RebalanceUtils.createUpdatedCluster(currentCluster, + 3, + Lists.newArrayList(2)); + final Cluster targetCluster = RebalanceUtils.createUpdatedCluster(tmpTargetCluster, + 1, + Lists.newArrayList(3)); final List serverList = Arrays.asList(0, 1, 2, 3); Map configProps = new HashMap(); @@ -466,8 +582,6 @@ public void testProxyGetDuringRebalancing() throws Exception { storeDefFileWithReplication, serverList, configProps); - // Update the cluster information based on the node information - final Cluster updatedTargetCluster = updateCluster(targetCluster); ExecutorService executors = Executors.newFixedThreadPool(2); final AtomicBoolean rebalancingComplete = new AtomicBoolean(false); @@ -475,17 +589,12 @@ public void testProxyGetDuringRebalancing() throws Exception { String bootstrapUrl = getBootstrapUrl(updatedCurrentCluster, 0); int maxParallel = 2; - int maxTries = RebalanceController.MAX_TRIES_REBALANCING; - long timeout = RebalanceController.REBALANCING_CLIENT_TIMEOUT_SEC; // Forced to use steal since RO does not support donor based. boolean stealerBased = true; - final RebalanceController rebalanceClient = new RebalanceController(bootstrapUrl, - maxParallel, - maxTries, - timeout, - stealerBased); - int batchSize = RebalancePlan.BATCH_SIZE; - final RebalancePlan rebalancePlan = rebalanceClient.getPlan(targetCluster, batchSize); + final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, + maxParallel, + stealerBased, + targetCluster); try { @@ -549,13 +658,13 @@ public void run() { try { Thread.sleep(500); - rebalanceAndCheck(rebalancePlan, - rebalanceClient, + rebalanceAndCheck(rebalanceKit.plan, + rebalanceKit.controller, Arrays.asList(0, 1, 2, 3)); Thread.sleep(500); rebalancingComplete.set(true); - checkConsistentMetadata(updatedTargetCluster, serverList); + checkConsistentMetadata(targetCluster, serverList); } catch(Exception e) { exceptions.add(e); @@ -625,8 +734,6 @@ public void testProxyPutDuringRebalancing() throws Exception { rwStoreDefFileWithReplication, serverList, configProps); - // Update the cluster information based on the node information - final Cluster updatedTargetCluster = updateCluster(targetCluster); ExecutorService executors = Executors.newFixedThreadPool(2); final AtomicBoolean rebalancingComplete = new AtomicBoolean(false); @@ -638,20 +745,14 @@ public void testProxyPutDuringRebalancing() throws Exception { // cluster topology then. String bootstrapUrl = getBootstrapUrl(updatedCurrentCluster, 0); int maxParallel = 2; - int maxTries = RebalanceController.MAX_TRIES_REBALANCING; - long timeout = RebalanceController.REBALANCING_CLIENT_TIMEOUT_SEC; boolean stealerBased = !useDonorBased; - final RebalanceController rebalanceClient = new RebalanceController(bootstrapUrl, - maxParallel, - maxTries, - timeout, - stealerBased); - int batchSize = RebalancePlan.BATCH_SIZE; - final RebalancePlan rebalancePlan = rebalanceClient.getPlan(updatedTargetCluster, - batchSize); + final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, + maxParallel, + stealerBased, + targetCluster); populateData(currentCluster, rwStoreDefWithReplication); - final AdminClient adminClient = rebalanceClient.getAdminClient(); + final AdminClient adminClient = rebalanceKit.controller.getAdminClient(); // the plan would cause the following cross zone move Partition : // Donor -> Stealer p6 (PRI) : 1 -> 5 final List movingKeysList = sampleKeysFromPartition(adminClient, @@ -756,7 +857,7 @@ public void run() { @Override public void run() { try { - rebalanceClient.rebalance(rebalancePlan); + rebalanceKit.rebalance(); } catch(Exception e) { logger.error("Error in rebalancing... ", e); exceptions.add(e); @@ -776,12 +877,12 @@ public void run() { true); assertEquals("Not enough time to begin proxy writing", proxyWritesDone.get(), true); checkEntriesPostRebalance(updatedCurrentCluster, - updatedTargetCluster, + targetCluster, Lists.newArrayList(rwStoreDefWithReplication), Arrays.asList(0, 1, 2, 3, 4, 5), baselineTuples, baselineVersions); - checkConsistentMetadata(updatedTargetCluster, serverList); + checkConsistentMetadata(targetCluster, serverList); // check No Exception if(exceptions.size() > 0) { for(Exception e: exceptions) { @@ -819,7 +920,7 @@ public void run() { } } - protected void populateData(Cluster cluster, StoreDefinition storeDef) throws Exception { + private void populateData(Cluster cluster, StoreDefinition storeDef) throws Exception { // Create SocketStores for each Node first Map> storeMap = new HashMap>(); diff --git a/test/unit/voldemort/client/rebalance/RebalanceBatchPlanTest.java b/test/unit/voldemort/client/rebalance/NonZonedRebalanceBatchPlanTest.java similarity index 87% rename from test/unit/voldemort/client/rebalance/RebalanceBatchPlanTest.java rename to test/unit/voldemort/client/rebalance/NonZonedRebalanceBatchPlanTest.java index 5aeec4ee1a..b3736e5941 100644 --- a/test/unit/voldemort/client/rebalance/RebalanceBatchPlanTest.java +++ b/test/unit/voldemort/client/rebalance/NonZonedRebalanceBatchPlanTest.java @@ -35,6 +35,7 @@ import org.junit.Before; import org.junit.Test; +import voldemort.ClusterTestUtils; import voldemort.ServerTestUtils; import voldemort.VoldemortException; import voldemort.VoldemortTestConstants; @@ -47,16 +48,17 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; -// TODO: (offline) fix this as part of completing the RebalancePlan work -// TODO: This suite of tests is known to mostly fail. Sorry. -// TODO: This test needs to be mostly re-written. The planning algorithm has -// changed and this test focused on the implementation of the prior planning -// algorithm, rather than the features of a plan in general. -public class RebalanceBatchPlanTest { +/** + * Tests rebalance batch plan for non-zoned cluster. These tests existed before + * the RebalancePlan was re-written in April/May 2013. That is why these tests + * follow a different format than those in ZonedRebalanceBatchPlanTest. + */ +public class NonZonedRebalanceBatchPlanTest { private static String storeDefFile = "test/common/voldemort/config/stores.xml"; private Cluster currentCluster; private Cluster targetCluster; + private List storeDefList; private List storeDefList2; private List test211StoreDef; @@ -108,9 +110,9 @@ public void testShuffleNoop() { targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 1, 2, 3 }, { 4, 5, 6, 7, 0 } }); - List batchPlan = getBatchPlan(currentCluster, - targetCluster, - test211StoreDef); + List batchPlan = ClusterTestUtils.getBatchPlan(currentCluster, + targetCluster, + test211StoreDef); assertTrue("Batch plan should be empty.", batchPlan.isEmpty()); } @@ -119,7 +121,7 @@ public void testShuffleNoop() { * Expand on to an empty server. */ @Test - public void testExpansion() { + public void testClusterExpansion() { int numServers = 3; int ports[] = ServerTestUtils.findFreePorts(3 * numServers); @@ -129,9 +131,9 @@ public void testExpansion() { targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 1, 2, 3 }, { 4, 5, 6, 7 }, { 0 } }); - List batchPlan = getBatchPlan(currentCluster, - targetCluster, - test211StoreDef); + List batchPlan = ClusterTestUtils.getBatchPlan(currentCluster, + targetCluster, + test211StoreDef); // data should only move from node 0 to node 2 for node 2 to host // everything needed. no other movement should occur. assertEquals("There should be one move in this plan.", 1, batchPlan.size()); @@ -170,9 +172,9 @@ public void testDeleteLastNode() { targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 3, 6, 9, 12, 15 }, { 1, 4, 7, 10, 13, 16 }, { 2, 5, 8, 11, 14, 17 }, {} }); - List orderedRebalancePartitionInfoList = getBatchPlan(currentCluster, - targetCluster, - storeDefList2); + List orderedRebalancePartitionInfoList = ClusterTestUtils.getBatchPlan(currentCluster, + targetCluster, + storeDefList2); assertEquals("There should have exactly 1 rebalancing node", 1, getUniqueNodeCount(orderedRebalancePartitionInfoList, false)); @@ -215,9 +217,9 @@ public void testDeleteFirstNode() { { 0, 1, 5 }, { 2, 6 }, { 3, 7 } }); // PHASE 1 - move partition 0 off of node 0 to node 1 - List batchPlan = getBatchPlan(currentCluster, - targetCluster, - storeDefList2); + List batchPlan = ClusterTestUtils.getBatchPlan(currentCluster, + targetCluster, + storeDefList2); assertFalse("Batch plan should not be empty.", batchPlan.isEmpty()); @@ -231,7 +233,7 @@ public void testDeleteFirstNode() { targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { {}, { 0, 1, 5 }, { 4, 2 }, { 3, 6, 7 } }); - batchPlan = getBatchPlan(currentCluster, targetCluster, storeDefList2); + batchPlan = ClusterTestUtils.getBatchPlan(currentCluster, targetCluster, storeDefList2); assertFalse("Batch plan should not be empty.", batchPlan.isEmpty()); assertFalse("Batch plan for server 2 should not be empty.", @@ -261,9 +263,9 @@ public void testRebalanceDeletingMiddleNode() { targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 4 }, { 2, 1, 5 }, { 6 }, { 3, 7 } }); - List batchPlan = getBatchPlan(currentCluster, - targetCluster, - storeDefList2); + List batchPlan = ClusterTestUtils.getBatchPlan(currentCluster, + targetCluster, + storeDefList2); assertFalse("Batch plan should not be empty.", batchPlan.isEmpty()); assertFalse("Batch plan for server 1 should not be empty.", @@ -287,7 +289,7 @@ public void testRebalanceDeletingMiddleNode() { targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 4 }, { 2, 1, 5 }, {}, { 6, 3, 7 } }); - batchPlan = getBatchPlan(currentCluster, targetCluster, storeDefList2); + batchPlan = ClusterTestUtils.getBatchPlan(currentCluster, targetCluster, storeDefList2); assertFalse("Batch plan should not be empty.", batchPlan.isEmpty()); @@ -297,7 +299,7 @@ public void testRebalanceDeletingMiddleNode() { } @Test - public void testManyStoreExpansion() { + public void testManyStoreClusterExpansion() { int numServers = 4; int ports[] = ServerTestUtils.findFreePorts(3 * numServers); @@ -307,9 +309,9 @@ public void testManyStoreExpansion() { targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 2, 3 }, { 4, 6 }, { 7, 8, 9 }, { 1, 5 } }); - List batchPlan = getBatchPlan(currentCluster, - targetCluster, - storeDefList); + List batchPlan = ClusterTestUtils.getBatchPlan(currentCluster, + targetCluster, + storeDefList); assertFalse("Batch plan should not be empty.", batchPlan.isEmpty()); assertFalse("Batch plan for server 3 should not be empty.", @@ -356,9 +358,9 @@ public void testRebalanceAllReplicasBeingMigrated() { targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 4 }, { 2, 3 }, { 1, 5 }, { 0 } }); - List orderedRebalancePartitionInfoList = getBatchPlan(currentCluster, - targetCluster, - storeDefList2); + List orderedRebalancePartitionInfoList = ClusterTestUtils.getBatchPlan(currentCluster, + targetCluster, + storeDefList2); assertEquals("There should have exactly 1 rebalancing node", 1, @@ -465,22 +467,4 @@ private void checkAllRebalanceInfoPresent(List toCheckR } } - /** - * Given the current and target cluster metadata, along with your store - * definition, return the batch plan. - * - * @param currentCluster Current cluster metadata - * @param targetCluster Target cluster metadata - * @param storeDef List of store definitions - * @return list of tasks - */ - private List getBatchPlan(Cluster currentCluster, - Cluster targetCluster, - List storeDef) { - RebalanceBatchPlan rebalancePlan = new RebalanceBatchPlan(currentCluster, - targetCluster, - storeDef); - return rebalancePlan.getBatchPlan(); - } - } \ No newline at end of file diff --git a/test/unit/voldemort/client/rebalance/RebalancePlanTest.java b/test/unit/voldemort/client/rebalance/RebalancePlanTest.java index 34c6c1765e..9564d976f7 100644 --- a/test/unit/voldemort/client/rebalance/RebalancePlanTest.java +++ b/test/unit/voldemort/client/rebalance/RebalancePlanTest.java @@ -26,16 +26,17 @@ import voldemort.ClusterTestUtils; import voldemort.cluster.Cluster; import voldemort.store.StoreDefinition; +import voldemort.utils.MoveMap; public class RebalancePlanTest { static Cluster zzCurrent; - static Cluster zzRebalance; + static Cluster zzShuffle; static Cluster zzClusterExpansion; static List zzStores; static Cluster zzzCurrent; - static Cluster zzzRebalance; + static Cluster zzzShuffle; static Cluster zzzClusterExpansion; static Cluster zzzZoneExpansion; static List zzzStores; @@ -43,42 +44,31 @@ public class RebalancePlanTest { @BeforeClass public static void setup() { zzCurrent = ClusterTestUtils.getZZCluster(); - zzRebalance = ClusterTestUtils.getZZClusterWithSwappedPartitions(); + zzShuffle = ClusterTestUtils.getZZClusterWithSwappedPartitions(); zzClusterExpansion = ClusterTestUtils.getZZClusterWithPP(); zzStores = ClusterTestUtils.getZZStoreDefsBDB(); zzzCurrent = ClusterTestUtils.getZZZCluster(); - zzzRebalance = ClusterTestUtils.getZZZClusterWithSwappedPartitions(); + zzzShuffle = ClusterTestUtils.getZZZClusterWithSwappedPartitions(); zzzClusterExpansion = ClusterTestUtils.getZZZClusterWithPPP(); zzzZoneExpansion = ClusterTestUtils.getZZEClusterXXP(); zzzStores = ClusterTestUtils.getZZZStoreDefsBDB(); } - RebalancePlan makePlan(Cluster cCluster, - List cStores, - Cluster fCluster, - List fStores) { - // Defaults for plans - int batchSize = RebalancePlan.BATCH_SIZE; - String outputDir = null; - - return new RebalancePlan(cCluster, cStores, fCluster, fStores, batchSize, outputDir); - } - @Test public void testNoop() { RebalancePlan rebalancePlan; // Two zones - rebalancePlan = makePlan(zzCurrent, zzStores, zzCurrent, zzStores); + rebalancePlan = ClusterTestUtils.makePlan(zzCurrent, zzStores, zzCurrent, zzStores); assertEquals(rebalancePlan.getPlan().size(), 0); assertEquals(rebalancePlan.getPrimariesMoved(), 0); assertEquals(rebalancePlan.getPartitionStoresMoved(), 0); assertEquals(rebalancePlan.getPartitionStoresMovedXZone(), 0); // Three zones - rebalancePlan = makePlan(zzzCurrent, zzzStores, zzzCurrent, zzzStores); + rebalancePlan = ClusterTestUtils.makePlan(zzzCurrent, zzzStores, zzzCurrent, zzzStores); assertEquals(rebalancePlan.getPlan().size(), 0); assertEquals(rebalancePlan.getPrimariesMoved(), 0); assertEquals(rebalancePlan.getPartitionStoresMoved(), 0); @@ -86,22 +76,39 @@ public void testNoop() { } @Test - public void testRebalance() { + public void testShuffle() { RebalancePlan rebalancePlan; // Two zones - rebalancePlan = makePlan(zzCurrent, zzStores, zzRebalance, zzStores); + rebalancePlan = ClusterTestUtils.makePlan(zzCurrent, zzStores, zzShuffle, zzStores); assertEquals(rebalancePlan.getPlan().size(), 1); assertTrue(rebalancePlan.getPrimariesMoved() > 0); assertTrue(rebalancePlan.getPartitionStoresMoved() > 0); assertEquals(rebalancePlan.getPartitionStoresMovedXZone(), 0); + MoveMap zoneMoves = rebalancePlan.getZoneMoveMap(); + assertTrue(zoneMoves.get(0, 0) > 0); + assertTrue(zoneMoves.get(0, 1) == 0); + assertTrue(zoneMoves.get(1, 0) == 0); + assertTrue(zoneMoves.get(1, 1) > 0); + // Three zones - rebalancePlan = makePlan(zzzCurrent, zzzStores, zzzRebalance, zzzStores); + rebalancePlan = ClusterTestUtils.makePlan(zzzCurrent, zzzStores, zzzShuffle, zzzStores); assertEquals(rebalancePlan.getPlan().size(), 1); assertTrue(rebalancePlan.getPrimariesMoved() > 0); assertTrue(rebalancePlan.getPartitionStoresMoved() > 0); assertEquals(rebalancePlan.getPartitionStoresMovedXZone(), 0); + + zoneMoves = rebalancePlan.getZoneMoveMap(); + assertTrue(zoneMoves.get(0, 0) > 0); + assertTrue(zoneMoves.get(0, 1) == 0); + assertTrue(zoneMoves.get(0, 2) == 0); + assertTrue(zoneMoves.get(1, 0) == 0); + assertTrue(zoneMoves.get(1, 1) > 0); + assertTrue(zoneMoves.get(1, 2) == 0); + assertTrue(zoneMoves.get(2, 0) == 0); + assertTrue(zoneMoves.get(2, 1) == 0); + assertTrue(zoneMoves.get(2, 2) > 0); } @Test @@ -109,28 +116,57 @@ public void testClusterExpansion() { RebalancePlan rebalancePlan; // Two zones - rebalancePlan = makePlan(zzCurrent, zzStores, zzClusterExpansion, zzStores); + rebalancePlan = ClusterTestUtils.makePlan(zzCurrent, zzStores, zzClusterExpansion, zzStores); assertEquals(rebalancePlan.getPlan().size(), 1); assertTrue(rebalancePlan.getPrimariesMoved() > 0); assertTrue(rebalancePlan.getPartitionStoresMoved() > 0); assertEquals(rebalancePlan.getPartitionStoresMovedXZone(), 0); + MoveMap zoneMoves = rebalancePlan.getZoneMoveMap(); + assertTrue(zoneMoves.get(0, 0) > 0); + assertTrue(zoneMoves.get(0, 1) == 0); + assertTrue(zoneMoves.get(1, 0) == 0); + assertTrue(zoneMoves.get(1, 1) > 0); + // Three zones - rebalancePlan = makePlan(zzzCurrent, zzzStores, zzzClusterExpansion, zzzStores); + rebalancePlan = ClusterTestUtils.makePlan(zzzCurrent, zzzStores, zzzClusterExpansion, zzzStores); assertEquals(rebalancePlan.getPlan().size(), 1); assertTrue(rebalancePlan.getPrimariesMoved() > 0); assertTrue(rebalancePlan.getPartitionStoresMoved() > 0); assertEquals(rebalancePlan.getPartitionStoresMovedXZone(), 0); + + zoneMoves = rebalancePlan.getZoneMoveMap(); + assertTrue(zoneMoves.get(0, 0) > 0); + assertTrue(zoneMoves.get(0, 1) == 0); + assertTrue(zoneMoves.get(0, 2) == 0); + assertTrue(zoneMoves.get(1, 0) == 0); + assertTrue(zoneMoves.get(1, 1) > 0); + assertTrue(zoneMoves.get(1, 2) == 0); + assertTrue(zoneMoves.get(2, 0) == 0); + assertTrue(zoneMoves.get(2, 1) == 0); + assertTrue(zoneMoves.get(2, 2) > 0); } @Test public void testZoneExpansion() { RebalancePlan rebalancePlan; - rebalancePlan = makePlan(zzCurrent, zzStores, zzzZoneExpansion, zzzStores); + rebalancePlan = ClusterTestUtils.makePlan(zzCurrent, zzStores, zzzZoneExpansion, zzzStores); assertEquals(rebalancePlan.getPlan().size(), 1); assertTrue(rebalancePlan.getPrimariesMoved() > 0); assertTrue(rebalancePlan.getPartitionStoresMoved() > 0); assertTrue(rebalancePlan.getPartitionStoresMovedXZone() > 0); + + // zone id 2 is the new zone. + MoveMap zoneMoves = rebalancePlan.getZoneMoveMap(); + assertTrue(zoneMoves.get(0, 0) > 0); + assertTrue(zoneMoves.get(0, 1) == 0); + assertTrue(zoneMoves.get(0, 2) > 0); + assertTrue(zoneMoves.get(1, 0) == 0); + assertTrue(zoneMoves.get(1, 1) > 0); + assertTrue(zoneMoves.get(1, 2) > 0); + assertTrue(zoneMoves.get(2, 0) == 0); + assertTrue(zoneMoves.get(2, 1) == 0); + assertTrue(zoneMoves.get(2, 2) == 0); } } diff --git a/test/unit/voldemort/client/rebalance/ZonedRebalanceBatchPlanTest.java b/test/unit/voldemort/client/rebalance/ZonedRebalanceBatchPlanTest.java new file mode 100644 index 0000000000..41d4b3d04f --- /dev/null +++ b/test/unit/voldemort/client/rebalance/ZonedRebalanceBatchPlanTest.java @@ -0,0 +1,116 @@ +/* + * Copyright 2013 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package voldemort.client.rebalance; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.junit.BeforeClass; +import org.junit.Test; + +import voldemort.ClusterTestUtils; +import voldemort.cluster.Cluster; +import voldemort.store.StoreDefinition; + +public class ZonedRebalanceBatchPlanTest { + + static Cluster zzCurrent; + static Cluster zzShuffle; + static Cluster zzClusterExpansionNN; + static Cluster zzClusterExpansionPP; + static List zzStores; + + static Cluster zzzCurrent; + static Cluster zzzShuffle; + static Cluster zzzClusterExpansionNNN; + static Cluster zzzClusterExpansionPPP; + static Cluster zzeZoneExpansion; + static Cluster zzzZoneExpansionXXP; + static List zzzStores; + + @BeforeClass + public static void setup() { + zzCurrent = ClusterTestUtils.getZZCluster(); + zzShuffle = ClusterTestUtils.getZZClusterWithSwappedPartitions(); + zzClusterExpansionNN = ClusterTestUtils.getZZClusterWithNN(); + zzClusterExpansionPP = ClusterTestUtils.getZZClusterWithPP(); + zzStores = ClusterTestUtils.getZZStoreDefsBDB(); + + zzzCurrent = ClusterTestUtils.getZZZCluster(); + + zzzShuffle = ClusterTestUtils.getZZZClusterWithSwappedPartitions(); + zzzClusterExpansionNNN = ClusterTestUtils.getZZZClusterWithNNN(); + zzzClusterExpansionPPP = ClusterTestUtils.getZZZClusterWithPPP(); + zzeZoneExpansion = ClusterTestUtils.getZZECluster(); + zzzZoneExpansionXXP = ClusterTestUtils.getZZEClusterXXP(); + zzzStores = ClusterTestUtils.getZZZStoreDefsBDB(); + } + + @Test + public void testNoop() { + List batchPlan; + + // Two zones + batchPlan = ClusterTestUtils.getBatchPlan(zzCurrent, zzCurrent, zzStores); + assertEquals(batchPlan.size(), 0); + + // Three zones + batchPlan = ClusterTestUtils.getBatchPlan(zzzCurrent, zzzCurrent, zzzStores); + assertEquals(batchPlan.size(), 0); + } + + @Test + public void testShuffle() { + List batchPlan; + + // Two zones + batchPlan = ClusterTestUtils.getBatchPlan(zzCurrent, zzShuffle, zzStores); + assertTrue(batchPlan.size() > 0); + + // Three zones + batchPlan = ClusterTestUtils.getBatchPlan(zzzCurrent, zzzShuffle, zzzStores); + assertTrue(batchPlan.size() > 0); + } + + @Test + public void testClusterExpansion() { + List batchPlan; + + // Two zones + batchPlan = ClusterTestUtils.getBatchPlan(zzClusterExpansionNN, + zzClusterExpansionPP, + zzStores); + assertTrue(batchPlan.size() > 0); + + // Three zones + batchPlan = ClusterTestUtils.getBatchPlan(zzzClusterExpansionNNN, + zzzClusterExpansionPPP, + zzzStores); + assertTrue(batchPlan.size() > 0); + } + + @Test + public void testZoneExpansion() { + List batchPlan; + + // Two-to-three zones + batchPlan = ClusterTestUtils.getBatchPlan(zzeZoneExpansion, zzzZoneExpansionXXP, zzzStores); + assertTrue(batchPlan.size() > 0); + } + +} diff --git a/test/unit/voldemort/utils/ClusterUtilsTest.java b/test/unit/voldemort/utils/ClusterUtilsTest.java index 146b0eed60..07748c1de3 100644 --- a/test/unit/voldemort/utils/ClusterUtilsTest.java +++ b/test/unit/voldemort/utils/ClusterUtilsTest.java @@ -36,7 +36,8 @@ public void testGetMapOfContiguousPartitionRunLengths() { { 3, 9, 13 }, { 4, 10 }, { 5, 11 } }; Cluster cluster = ServerTestUtils.getLocalZonedCluster(numberOfZones, nodesPerZone, - partitionMap); + partitionMap, + ServerTestUtils.findFreePorts(6 * 3)); Map iiMap; // Zone 0: // 0, 1, 2, 6, 7, 8, 12, 14, 15, 16, 17