diff --git a/src/java/voldemort/client/rebalance/RebalanceBatchPlan.java b/src/java/voldemort/client/rebalance/RebalanceBatchPlan.java index 9aa883c665..3a0487dc36 100644 --- a/src/java/voldemort/client/rebalance/RebalanceBatchPlan.java +++ b/src/java/voldemort/client/rebalance/RebalanceBatchPlan.java @@ -31,10 +31,6 @@ import com.google.common.collect.Maps; -// TODO: (refactor) Fix cluster nomenclature in general: make sure there are -// exactly three prefixes used to distinguish cluster xml: initial or current, -// target or spec or expanded, and final. 'target' has historically been -// overloaded to mean spec/expanded or final depending on context. /** * Constructs a batch plan that goes from currentCluster to finalCluster. The * partition-stores included in the move are based on those listed in storeDefs. @@ -258,10 +254,10 @@ public List buildRebalancePartitionsInfos() { */ private List constructBatchPlan() { // Construct all store routing plans once. - HashMap targetStoreRoutingPlans = new HashMap(); + HashMap currentStoreRoutingPlans = new HashMap(); for(StoreDefinition storeDef: currentStoreDefs) { - targetStoreRoutingPlans.put(storeDef.getName(), new StoreRoutingPlan(currentCluster, - storeDef)); + currentStoreRoutingPlans.put(storeDef.getName(), new StoreRoutingPlan(currentCluster, + storeDef)); } HashMap finalStoreRoutingPlans = new HashMap(); for(StoreDefinition storeDef: finalStoreDefs) { @@ -277,7 +273,7 @@ private List constructBatchPlan() { // Consider all store definitions ... for(StoreDefinition storeDef: finalStoreDefs) { - StoreRoutingPlan targetSRP = targetStoreRoutingPlans.get(storeDef.getName()); + StoreRoutingPlan currentSRP = currentStoreRoutingPlans.get(storeDef.getName()); StoreRoutingPlan finalSRP = finalStoreRoutingPlans.get(storeDef.getName()); for(int stealerPartitionId: finalSRP.getZoneNAryPartitionIds(stealerNodeId)) { // ... and all nary partition-stores, @@ -285,12 +281,13 @@ private List constructBatchPlan() { // Optimization: Do not steal a partition-store you already // host! - if(targetSRP.getReplicationNodeList(stealerPartitionId).contains(stealerNodeId)) { + if(currentSRP.getReplicationNodeList(stealerPartitionId) + .contains(stealerNodeId)) { continue; } // Determine which node to steal from. - int donorNodeId = getDonorId(targetSRP, + int donorNodeId = getDonorId(currentSRP, finalSRP, stealerZoneId, stealerNodeId, @@ -298,7 +295,8 @@ private List constructBatchPlan() { // Add this specific partition-store steal to the overall // plan - int donorReplicaType = targetSRP.getReplicaType(donorNodeId, stealerPartitionId); + int donorReplicaType = currentSRP.getReplicaType(donorNodeId, + stealerPartitionId); rpiBuilder.addPartitionStoreMove(stealerNodeId, donorNodeId, storeDef.getName(), @@ -320,12 +318,12 @@ private List constructBatchPlan() { * Current policy: * * 1) If possible, a stealer node that is the zone n-ary in the finalCluster - * steals from the zone n-ary in the targetCluster in the same zone. + * steals from the zone n-ary in the currentCluster in the same zone. * * 2) If there are no partition-stores to steal in the same zone (i.e., this * is the "zone expansion" use case), then a differnt policy must be used. * The stealer node that is the zone n-ary in the finalCluster determines - * which pre-existing zone in the targetCluster hosts the primary partition + * which pre-existing zone in the currentCluster hosts the primary partition * id for the partition-store. The stealer then steals the zone n-ary from * that pre-existing zone. * @@ -352,14 +350,14 @@ private List constructBatchPlan() { * n-aries in the new zone steal from the single cross-zone stealer in the * zone. This would require apparatus in the RebalanceController to work. * - * @param targetSRP + * @param currentSRP * @param finalSRP * @param stealerZoneId * @param stealerNodeId * @param stealerPartitionId * @return the node id of the donor for this partition Id. */ - protected int getDonorId(StoreRoutingPlan targetSRP, + protected int getDonorId(StoreRoutingPlan currentSRP, StoreRoutingPlan finalSRP, int stealerZoneId, int stealerNodeId, @@ -369,16 +367,16 @@ protected int getDonorId(StoreRoutingPlan targetSRP, stealerPartitionId); int donorZoneId; - if(targetSRP.zoneNAryExists(stealerZoneId, stealerZoneNAry, stealerPartitionId)) { + if(currentSRP.zoneNAryExists(stealerZoneId, stealerZoneNAry, stealerPartitionId)) { // Steal from local n-ary (since one exists). donorZoneId = stealerZoneId; } else { // Steal from zone that hosts primary partition Id. - int targetMasterNodeId = targetSRP.getNodeIdForPartitionId(stealerPartitionId); - donorZoneId = currentCluster.getNodeById(targetMasterNodeId).getZoneId(); + int currentMasterNodeId = currentSRP.getNodeIdForPartitionId(stealerPartitionId); + donorZoneId = currentCluster.getNodeById(currentMasterNodeId).getZoneId(); } - return targetSRP.getNodeIdForZoneNary(donorZoneId, stealerZoneNAry, stealerPartitionId); + return currentSRP.getNodeIdForZoneNary(donorZoneId, stealerZoneNAry, stealerPartitionId); } diff --git a/src/java/voldemort/client/rebalance/RebalanceController.java b/src/java/voldemort/client/rebalance/RebalanceController.java index 97653c6e9e..b5cebb3570 100644 --- a/src/java/voldemort/client/rebalance/RebalanceController.java +++ b/src/java/voldemort/client/rebalance/RebalanceController.java @@ -125,12 +125,11 @@ public RebalancePlan getPlan(Cluster finalCluster, List finalStoreDefs, int batchSize) { RebalanceUtils.validateClusterStores(finalCluster, finalStoreDefs); - RebalanceUtils.validateCurrentFinalCluster(currentCluster, finalCluster); + // If an interim cluster is needed, then currentCluster should be an + // interim cluster! I.e., it should include new nodes/zones without any + // partitions assigned to them. + RebalanceUtils.validateInterimFinalCluster(currentCluster, finalCluster); - // TODO: (currentCluster vs interimCluster) Add more validation before - // constructing plan? Given that currentCluster was polled from prod - // cluster, should confirm that it is an "interim cluster" i.e., has - // same (superset?) of nodes as are in finalCluster. String outputDir = null; return new RebalancePlan(currentCluster, currentStoreDefs, diff --git a/src/java/voldemort/client/rebalance/RebalancePlan.java b/src/java/voldemort/client/rebalance/RebalancePlan.java index 920ea1ca67..089a08b7d4 100644 --- a/src/java/voldemort/client/rebalance/RebalancePlan.java +++ b/src/java/voldemort/client/rebalance/RebalancePlan.java @@ -79,7 +79,7 @@ public class RebalancePlan { * be used to transform deployed store definitions. In practice, this use * case has not been tested. * - * @param currentCluster current deployed cluster + * @param currentCluster current deployed cluster. * @param currentStoreDefs current deployed store defs * @param finalCluster desired deployed cluster * @param finalStoreDefs desired deployed store defs @@ -99,11 +99,7 @@ public RebalancePlan(final Cluster currentCluster, this.batchSize = batchSize; this.outputDir = outputDir; - // TODO: (currentCluster vs interimCluster) Instead of divining - // interimCluster from currentCluster and finalCluster, should we - // require that interimCluster be passed in? - - // Derive the targetCluster from current & final cluster xml + // Derive the interimCluster from current & final cluster xml RebalanceUtils.validateCurrentFinalCluster(this.currentCluster, this.finalCluster); Cluster interimCluster = RebalanceUtils.getInterimCluster(this.currentCluster, this.finalCluster); diff --git a/src/java/voldemort/server/VoldemortServer.java b/src/java/voldemort/server/VoldemortServer.java index 30278485e2..10a765e806 100644 --- a/src/java/voldemort/server/VoldemortServer.java +++ b/src/java/voldemort/server/VoldemortServer.java @@ -123,8 +123,25 @@ public AsyncOperationService getAsyncRunner() { /** * Compare the configured hostname with all the ip addresses and hostnames - * for the server node, and log a warning if there is a mismatch + * for the server node, and log a warning if there is a mismatch. + * */ + // TODO: VoldemortServer should throw exception if cluster xml, node id, and + // server's state are not all mutually consistent. + // + // "I attempted to do this in the past. In practice its hard since the + // hostname strings returned may not exactly match what's in cluster.xml + // (ela4-app0000.prod vs ela4-app0000.prod.linkedin.com). And for folks + // running with multiple interfaces and stuff in the open source world, not + // sure how it would fan out.. + // + // I am in favour of doing this though.. May be implement a server config, + // "strict.hostname.check.on.startup" which is false by default and true for + // our environments and our SRE makes sure there is an exact match?" -- + // VChandar + // + // "Strict host name doesn't work? We can always trim the rest before the comparison." + // -- LGao private void checkHostName() { try { HashSet ipAddrList = new HashSet(); diff --git a/src/java/voldemort/server/rebalance/async/DonorBasedRebalanceAsyncOperation.java b/src/java/voldemort/server/rebalance/async/DonorBasedRebalanceAsyncOperation.java index 608aadd9a8..1a39a4b8c0 100644 --- a/src/java/voldemort/server/rebalance/async/DonorBasedRebalanceAsyncOperation.java +++ b/src/java/voldemort/server/rebalance/async/DonorBasedRebalanceAsyncOperation.java @@ -80,8 +80,8 @@ public class DonorBasedRebalanceAsyncOperation extends RebalanceAsyncOperation { private final StoreRepository storeRepository; private final AtomicBoolean running = new AtomicBoolean(true); - private final Cluster initialCluster; - private final Cluster targetCluster; + private final Cluster currentCluster; + private final Cluster finalCluster; private final boolean usePartitionScan; private final HashMultimap>>> storeToNodePartitionMapping; @@ -121,8 +121,8 @@ public DonorBasedRebalanceAsyncOperation(Rebalancer rebalancer, + " partition-stores."); this.storeRepository = storeRepository; this.stealInfos = stealInfos; - this.targetCluster = metadataStore.getCluster(); - this.initialCluster = stealInfos.get(0).getInitialCluster(); + this.finalCluster = metadataStore.getCluster(); + this.currentCluster = stealInfos.get(0).getInitialCluster(); this.usePartitionScan = usePartitionScan; this.partitionStoreCount = RebalanceUtils.countPartitionStores(stealInfos); @@ -275,7 +275,7 @@ public Thread newThread(Runnable r) { if(voldemortConfig.getRebalancingOptimization() && !storageEngine.isPartitionAware()) { for(Pair>> entry: stealerNodeToMappingTuples) { HashMap> optimizedReplicaToPartition = RebalanceUtils.getOptimizedReplicaToPartitionList(entry.getFirst(), - initialCluster, + currentCluster, storeDef, entry.getSecond()); @@ -329,7 +329,7 @@ private void fetchEntriesForStealers(StorageEngine st HashMap>>> nodeToQueue, String storeName) { int scanned = 0; - int[] fetched = new int[targetCluster.getNumberOfNodes()]; + int[] fetched = new int[finalCluster.getNumberOfNodes()]; long startTime = System.currentTimeMillis(); ClosableIterator keys = storageEngine.keys(); @@ -340,7 +340,7 @@ private void fetchEntriesForStealers(StorageEngine st scanned++; List nodeIds = StoreRoutingPlan.checkKeyBelongsToPartition(key.get(), optimizedStealerNodeToMappingTuples, - targetCluster, + finalCluster, storeDef); if(nodeIds.size() > 0) { @@ -368,7 +368,7 @@ private void fetchEntriesForStealersPartitionScan(StorageEngine>>> nodeToQueue, String storeName) { int scanned = 0; - int[] fetched = new int[targetCluster.getNumberOfNodes()]; + int[] fetched = new int[finalCluster.getNumberOfNodes()]; long startTime = System.currentTimeMillis(); // construct a set of all the partitions we will be fetching @@ -388,7 +388,7 @@ private void fetchEntriesForStealersPartitionScan(StorageEngine nodeIds = StoreRoutingPlan.checkKeyBelongsToPartition(key.get(), optimizedStealerNodeToMappingTuples, - targetCluster, + finalCluster, storeDef); if(nodeIds.size() > 0) { diff --git a/src/java/voldemort/tools/RebalanceControllerCLI.java b/src/java/voldemort/tools/RebalanceControllerCLI.java index f97bfac8d3..8b7783356a 100644 --- a/src/java/voldemort/tools/RebalanceControllerCLI.java +++ b/src/java/voldemort/tools/RebalanceControllerCLI.java @@ -71,11 +71,11 @@ private static void setupParser() { .ofType(Long.class) .describedAs("proxy pause"); - parser.accepts("final-cluster", "Path to target cluster xml") + parser.accepts("final-cluster", "Path to final cluster xml") .withRequiredArg() .describedAs("cluster.xml"); parser.accepts("final-stores", - "Path to target store definition xml. Needed for zone expansion.") + "Path to final store definition xml. Needed for zone expansion.") .withRequiredArg() .describedAs("stores.xml"); diff --git a/src/java/voldemort/tools/Repartitioner.java b/src/java/voldemort/tools/Repartitioner.java index 3b09d99080..79667d8efc 100644 --- a/src/java/voldemort/tools/Repartitioner.java +++ b/src/java/voldemort/tools/Repartitioner.java @@ -113,9 +113,9 @@ public class Repartitioner { * * @param currentCluster current cluster * @param currentStoreDefs current store defs - * @param targetCluster target cluster; needed for cluster or zone + * @param interimCluster interim cluster; needed for cluster or zone * expansion, otherwise pass in same as currentCluster. - * @param targetStoreDefs target store defs; needed for zone expansion, + * @param finalStoreDefs final store defs; needed for zone expansion, * otherwise pass in same as currentStores. * @param outputDir Directory in which to dump cluster xml and analysis * files. @@ -135,14 +135,14 @@ public class Repartitioner { * @param greedySwapMaxPartitionsPerNode * @param greedySwapMaxPartitionsPerZone * @param maxContiguousPartitionsPerZone - * @return Cluster that has had all specified balancing algorithms run - * against it. The number of zones and number of nodes will match - * that of the specified targetCluster. + * @return "final cluster" that has had all specified balancing algorithms + * run against it. The number of zones and number of nodes will + * match that of the specified "interim cluster". */ public static Cluster repartition(final Cluster currentCluster, final List currentStoreDefs, - final Cluster targetCluster, - final List targetStoreDefs, + final Cluster interimCluster, + final List finalStoreDefs, final String outputDir, final int attempts, final boolean disableNodeBalancing, @@ -161,12 +161,12 @@ public static Cluster repartition(final Cluster currentCluster, RebalanceUtils.currentClusterFileName, partitionBalance); - Cluster minCluster = targetCluster; + Cluster minCluster = interimCluster; double minUtility = Double.MAX_VALUE; for(int attempt = 0; attempt < attempts; attempt++) { - Cluster nextCluster = targetCluster; + Cluster nextCluster = interimCluster; if(maxContiguousPartitionsPerZone > 0) { nextCluster = repeatedlyBalanceContiguousPartitionsPerZone(nextCluster, @@ -181,25 +181,25 @@ public static Cluster repartition(final Cluster currentCluster, nextCluster = randomShufflePartitions(nextCluster, randomSwapAttempts, randomSwapSuccesses, - targetStoreDefs); + finalStoreDefs); } if(enableGreedySwaps) { nextCluster = greedyShufflePartitions(nextCluster, greedySwapAttempts, greedySwapMaxPartitionsPerNode, greedySwapMaxPartitionsPerZone, - new ArrayList(targetCluster.getZoneIds()), - targetStoreDefs); + new ArrayList(interimCluster.getZoneIds()), + finalStoreDefs); } RebalanceUtils.validateCurrentFinalCluster(currentCluster, nextCluster); System.out.println("-------------------------\n"); - partitionBalance = new PartitionBalance(nextCluster, targetStoreDefs); + partitionBalance = new PartitionBalance(nextCluster, finalStoreDefs); double currentUtility = partitionBalance.getUtility(); System.out.println("Optimization number " + attempt + ": " + currentUtility + " max/min ratio"); System.out.println("-------------------------\n"); - System.out.println(RebalanceUtils.analyzeInvalidMetadataRate(targetCluster, + System.out.println(RebalanceUtils.analyzeInvalidMetadataRate(interimCluster, currentStoreDefs, nextCluster, currentStoreDefs)); @@ -218,7 +218,7 @@ public static Cluster repartition(final Cluster currentCluster, System.out.println("\n=========================="); System.out.println("Final distribution"); - partitionBalance = new PartitionBalance(minCluster, targetStoreDefs); + partitionBalance = new PartitionBalance(minCluster, finalStoreDefs); System.out.println(partitionBalance); RebalanceUtils.dumpClusterToFile(outputDir, RebalanceUtils.finalClusterFileName, minCluster); @@ -233,16 +233,16 @@ public static Cluster repartition(final Cluster currentCluster, * have. The list of integers returned per zone is the same length as the * number of nodes in that zone. * - * @param targetCluster + * @param nextCluster * @param targetPartitionsPerZone * @return A map of zoneId to list of target number of partitions per node * within zone. */ - public static HashMap> getBalancedNumberOfPrimaryPartitionsPerNode(final Cluster targetCluster, + public static HashMap> getBalancedNumberOfPrimaryPartitionsPerNode(final Cluster nextCluster, Map targetPartitionsPerZone) { HashMap> numPartitionsPerNode = Maps.newHashMap(); - for(Integer zoneId: targetCluster.getZoneIds()) { - List partitionsOnNode = Utils.distributeEvenlyIntoList(targetCluster.getNumberOfNodesInZone(zoneId), + for(Integer zoneId: nextCluster.getZoneIds()) { + List partitionsOnNode = Utils.distributeEvenlyIntoList(nextCluster.getNumberOfNodesInZone(zoneId), targetPartitionsPerZone.get(zoneId)); numPartitionsPerNode.put(zoneId, partitionsOnNode); } @@ -254,23 +254,23 @@ public static HashMap> getBalancedNumberOfPrimaryPartitio * separates Nodes into donorNodes and stealerNodes based on whether the * node needs to donate or steal primary partitions. * - * @param targetCluster + * @param nextCluster * @param numPartitionsPerNodePerZone * @return a Pair. First element is donorNodes, second element is * stealerNodes. Each element in the pair is a HashMap of Node to * Integer where the integer value is the number of partitions to * store. */ - public static Pair, HashMap> getDonorsAndStealersForBalance(final Cluster targetCluster, + public static Pair, HashMap> getDonorsAndStealersForBalance(final Cluster nextCluster, Map> numPartitionsPerNodePerZone) { HashMap donorNodes = Maps.newHashMap(); HashMap stealerNodes = Maps.newHashMap(); HashMap numNodesAssignedInZone = Maps.newHashMap(); - for(Integer zoneId: targetCluster.getZoneIds()) { + for(Integer zoneId: nextCluster.getZoneIds()) { numNodesAssignedInZone.put(zoneId, 0); } - for(Node node: targetCluster.getNodes()) { + for(Node node: nextCluster.getNodes()) { int zoneId = node.getZoneId(); int offset = numNodesAssignedInZone.get(zoneId); @@ -300,7 +300,6 @@ public static Pair, HashMap> getDonorsAndS return new Pair, HashMap>(donorNodes, stealerNodes); } - // TODO: (refactor) rename targetCluster -> interimCluster /** * This method balances primary partitions among nodes within a zone, and * optionally primary partitions among zones. The balancing is done at the @@ -309,42 +308,40 @@ public static Pair, HashMap> getDonorsAndS * responsible for determining which partition-stores move where for a * specific repartitioning. * - * @param targetCluster + * @param nextCluster * @param balanceZones indicates whether or not number of primary partitions * per zone should be balanced. * @return */ - public static Cluster balancePrimaryPartitions(final Cluster targetCluster, boolean balanceZones) { + public static Cluster balancePrimaryPartitions(final Cluster nextCluster, boolean balanceZones) { System.out.println("Balance number of partitions across all nodes and zones."); Map targetPartitionsPerZone; if(balanceZones) { - targetPartitionsPerZone = Utils.distributeEvenlyIntoMap(targetCluster.getZoneIds(), - targetCluster.getNumberOfPartitions()); + targetPartitionsPerZone = Utils.distributeEvenlyIntoMap(nextCluster.getZoneIds(), + nextCluster.getNumberOfPartitions()); System.out.println("numPartitionsPerZone"); - for(int zoneId: targetCluster.getZoneIds()) { - System.out.println(zoneId + " : " - + targetCluster.getNumberOfPartitionsInZone(zoneId) + " -> " - + targetPartitionsPerZone.get(zoneId)); + for(int zoneId: nextCluster.getZoneIds()) { + System.out.println(zoneId + " : " + nextCluster.getNumberOfPartitionsInZone(zoneId) + + " -> " + targetPartitionsPerZone.get(zoneId)); } System.out.println("numNodesPerZone"); - for(int zoneId: targetCluster.getZoneIds()) { - System.out.println(zoneId + " : " + targetCluster.getNumberOfNodesInZone(zoneId)); + for(int zoneId: nextCluster.getZoneIds()) { + System.out.println(zoneId + " : " + nextCluster.getNumberOfNodesInZone(zoneId)); } } else { // Keep number of partitions per zone the same. targetPartitionsPerZone = new HashMap(); - for(int zoneId: targetCluster.getZoneIds()) { - targetPartitionsPerZone.put(zoneId, - targetCluster.getNumberOfPartitionsInZone(zoneId)); + for(int zoneId: nextCluster.getZoneIds()) { + targetPartitionsPerZone.put(zoneId, nextCluster.getNumberOfPartitionsInZone(zoneId)); } } - HashMap> numPartitionsPerNodeByZone = getBalancedNumberOfPrimaryPartitionsPerNode(targetCluster, + HashMap> numPartitionsPerNodeByZone = getBalancedNumberOfPrimaryPartitionsPerNode(nextCluster, targetPartitionsPerZone); - Pair, HashMap> donorsAndStealers = getDonorsAndStealersForBalance(targetCluster, + Pair, HashMap> donorsAndStealers = getDonorsAndStealersForBalance(nextCluster, numPartitionsPerNodeByZone); HashMap donorNodes = donorsAndStealers.getFirst(); List donorNodeKeys = new ArrayList(donorNodes.keySet()); @@ -370,7 +367,7 @@ public static Cluster balancePrimaryPartitions(final Cluster targetCluster, bool */ // Go over every stealerNode and steal partition Ids from donor nodes - Cluster returnCluster = ClusterUtils.copyCluster(targetCluster); + Cluster returnCluster = ClusterUtils.copyCluster(nextCluster); Collections.shuffle(stealerNodeKeys, new Random(System.currentTimeMillis())); for(Node stealerNode: stealerNodeKeys) { @@ -434,7 +431,7 @@ public static Cluster balancePrimaryPartitions(final Cluster targetCluster, bool * @param maxContiguousPartitionsPerZone See RebalanceCLI. * @return */ - public static Cluster repeatedlyBalanceContiguousPartitionsPerZone(final Cluster targetCluster, + public static Cluster repeatedlyBalanceContiguousPartitionsPerZone(final Cluster nextCluster, final int maxContiguousPartitionsPerZone) { System.out.println("Looping to evenly balance partitions across zones while limiting contiguous partitions"); // This loop is hard to make definitive. I.e., there are corner cases @@ -443,17 +440,17 @@ public static Cluster repeatedlyBalanceContiguousPartitionsPerZone(final Cluster // Therefore, a constant number of loops are run. Note that once the // goal is reached, the loop becomes a no-op. int repeatContigBalance = 10; - Cluster nextCluster = targetCluster; + Cluster returnCluster = nextCluster; for(int i = 0; i < repeatContigBalance; i++) { - nextCluster = balanceContiguousPartitionsPerZone(nextCluster, - maxContiguousPartitionsPerZone); + returnCluster = balanceContiguousPartitionsPerZone(returnCluster, + maxContiguousPartitionsPerZone); - nextCluster = balancePrimaryPartitions(nextCluster, false); + returnCluster = balancePrimaryPartitions(returnCluster, false); System.out.println("Completed round of balancing contiguous partitions: round " + (i + 1) + " of " + repeatContigBalance); } - return nextCluster; + return returnCluster; } /** @@ -464,29 +461,28 @@ public static Cluster repeatedlyBalanceContiguousPartitionsPerZone(final Cluster * to some other random zone/node. There is some chance that such random * moves could result in contiguous partitions in other zones. * - * @param targetCluster Target cluster metadata + * @param nextCluster cluster metadata * @param maxContiguousPartitionsPerZone See RebalanceCLI. - * @return Return a pair of cluster metadata and number of primary - * partitions that have moved. + * @return Return updated cluster metadata. */ - public static Cluster balanceContiguousPartitionsPerZone(final Cluster targetCluster, + public static Cluster balanceContiguousPartitionsPerZone(final Cluster nextCluster, final int maxContiguousPartitionsPerZone) { System.out.println("Balance number of contiguous partitions within a zone."); System.out.println("numPartitionsPerZone"); - for(int zoneId: targetCluster.getZoneIds()) { - System.out.println(zoneId + " : " + targetCluster.getNumberOfPartitionsInZone(zoneId)); + for(int zoneId: nextCluster.getZoneIds()) { + System.out.println(zoneId + " : " + nextCluster.getNumberOfPartitionsInZone(zoneId)); } System.out.println("numNodesPerZone"); - for(int zoneId: targetCluster.getZoneIds()) { - System.out.println(zoneId + " : " + targetCluster.getNumberOfNodesInZone(zoneId)); + for(int zoneId: nextCluster.getZoneIds()) { + System.out.println(zoneId + " : " + nextCluster.getNumberOfNodesInZone(zoneId)); } // Break up contiguous partitions within each zone HashMap> partitionsToRemoveFromZone = Maps.newHashMap(); System.out.println("Contiguous partitions"); - for(Integer zoneId: targetCluster.getZoneIds()) { + for(Integer zoneId: nextCluster.getZoneIds()) { System.out.println("\tZone: " + zoneId); - Map partitionToRunLength = ClusterUtils.getMapOfContiguousPartitions(targetCluster, + Map partitionToRunLength = ClusterUtils.getMapOfContiguousPartitions(nextCluster, zoneId); List partitionsToRemoveFromThisZone = new ArrayList(); @@ -495,8 +491,7 @@ public static Cluster balanceContiguousPartitionsPerZone(final Cluster targetClu List contiguousPartitions = new ArrayList(entry.getValue()); for(int partitionId = entry.getKey(); partitionId < entry.getKey() + entry.getValue(); partitionId++) { - contiguousPartitions.add(partitionId - % targetCluster.getNumberOfPartitions()); + contiguousPartitions.add(partitionId % nextCluster.getNumberOfPartitions()); } System.out.println("Contiguous partitions: " + contiguousPartitions); partitionsToRemoveFromThisZone.addAll(Utils.removeItemsToSplitListEvenly(contiguousPartitions, @@ -508,7 +503,7 @@ public static Cluster balanceContiguousPartitionsPerZone(final Cluster targetClu System.out.println("\t\tPartitions to remove: " + partitionsToRemoveFromThisZone); } - Cluster returnCluster = ClusterUtils.copyCluster(targetCluster); + Cluster returnCluster = ClusterUtils.copyCluster(nextCluster); Random r = new Random(); for(int zoneId: returnCluster.getZoneIds()) { @@ -541,12 +536,12 @@ public static Cluster balanceContiguousPartitionsPerZone(final Cluster targetClu * * @return modified cluster metadata. */ - public static Cluster swapPartitions(final Cluster targetCluster, + public static Cluster swapPartitions(final Cluster nextCluster, final int nodeIdA, final int partitionIdA, final int nodeIdB, final int partitionIdB) { - Cluster returnCluster = ClusterUtils.copyCluster(targetCluster); + Cluster returnCluster = ClusterUtils.copyCluster(nextCluster); // Swap partitions between nodes! returnCluster = RebalanceUtils.createUpdatedCluster(returnCluster, @@ -563,16 +558,15 @@ public static Cluster swapPartitions(final Cluster targetCluster, * Within a single zone, swaps one random partition on one random node with * another random partition on different random node. * - * @param targetCluster + * @param nextCluster * @param zoneId Zone ID within which to shuffle partitions * @return */ - public static Cluster swapRandomPartitionsWithinZone(final Cluster targetCluster, - final int zoneId) { - Cluster returnCluster = ClusterUtils.copyCluster(targetCluster); + public static Cluster swapRandomPartitionsWithinZone(final Cluster nextCluster, final int zoneId) { + Cluster returnCluster = ClusterUtils.copyCluster(nextCluster); Random r = new Random(); - List nodeIdsInZone = new ArrayList(targetCluster.getNodeIdsInZone(zoneId)); + List nodeIdsInZone = new ArrayList(nextCluster.getNodeIdsInZone(zoneId)); if(nodeIdsInZone.size() == 0) { return returnCluster; @@ -586,7 +580,7 @@ public static Cluster swapRandomPartitionsWithinZone(final Cluster targetCluster List stealerPartitions = returnCluster.getNodeById(stealerNodeId) .getPartitionIds(); if(stealerPartitions.size() == 0) { - return targetCluster; + return nextCluster; } int stealerPartitionOffset = r.nextInt(stealerPartitions.size()); int stealerPartitionId = stealerPartitions.get(stealerPartitionOffset); @@ -617,18 +611,18 @@ public static Cluster swapRandomPartitionsWithinZone(final Cluster targetCluster /** * Randomly shuffle partitions between nodes within every zone. * - * @param targetCluster Target cluster object. + * @param nextCluster cluster object. * @param randomSwapAttempts See RebalanceCLI. * @param randomSwapSuccesses See RebalanceCLI. * @param storeDefs List of store definitions - * @return + * @return updated cluster */ - public static Cluster randomShufflePartitions(final Cluster targetCluster, + public static Cluster randomShufflePartitions(final Cluster nextCluster, final int randomSwapAttempts, final int randomSwapSuccesses, List storeDefs) { - List zoneIds = new ArrayList(targetCluster.getZoneIds()); - Cluster returnCluster = ClusterUtils.copyCluster(targetCluster); + List zoneIds = new ArrayList(nextCluster.getZoneIds()); + Cluster returnCluster = ClusterUtils.copyCluster(nextCluster); double currentUtility = new PartitionBalance(returnCluster, storeDefs).getUtility(); @@ -663,21 +657,21 @@ public static Cluster randomShufflePartitions(final Cluster targetCluster, * Large values of the greedSwapMaxPartitions... arguments make this method * equivalent to comparing every possible swap. This may get very expensive. * - * @param targetCluster + * @param nextCluster * @param zoneId Zone ID within which to shuffle partitions * @param greedySwapMaxPartitionsPerNode See RebalanceCLI. * @param greedySwapMaxPartitionsPerZone See RebalanceCLI. * @param storeDefs * @return */ - public static Cluster swapGreedyRandomPartitions(final Cluster targetCluster, + public static Cluster swapGreedyRandomPartitions(final Cluster nextCluster, final List nodeIds, final int greedySwapMaxPartitionsPerNode, final int greedySwapMaxPartitionsPerZone, List storeDefs) { System.out.println("GreedyRandom : nodeIds:" + nodeIds); - Cluster returnCluster = ClusterUtils.copyCluster(targetCluster); + Cluster returnCluster = ClusterUtils.copyCluster(nextCluster); double currentUtility = new PartitionBalance(returnCluster, storeDefs).getUtility(); int nodeIdA = -1; int nodeIdB = -1; @@ -745,7 +739,7 @@ public static Cluster swapGreedyRandomPartitions(final Cluster targetCluster, * * #zones X #nodes/zone X max partitions/node X max partitions/zone * - * @param targetCluster Target cluster object. + * @param nextCluster cluster object. * @param greedyAttempts See RebalanceCLI. * @param greedySwapMaxPartitionsPerNode See RebalanceCLI. * @param greedySwapMaxPartitionsPerZone See RebalanceCLI. @@ -754,17 +748,7 @@ public static Cluster swapGreedyRandomPartitions(final Cluster targetCluster, * @param storeDefs * @return */ - /** - * - * @param targetCluster - * @param greedyAttempts - * @param greedySwapMaxPartitionsPerNode - * @param greedySwapMaxPartitionsPerZone - * @param zoneIds - * @param storeDefs - * @return - */ - public static Cluster greedyShufflePartitions(final Cluster targetCluster, + public static Cluster greedyShufflePartitions(final Cluster nextCluster, final int greedyAttempts, final int greedySwapMaxPartitionsPerNode, final int greedySwapMaxPartitionsPerZone, @@ -773,9 +757,9 @@ public static Cluster greedyShufflePartitions(final Cluster targetCluster, final int specialZoneId = -1; if(zoneIds == null) { zoneIds = new ArrayList(); - zoneIds.add(specialZoneId); // Special value. + zoneIds.add(specialZoneId); } - Cluster returnCluster = ClusterUtils.copyCluster(targetCluster); + Cluster returnCluster = ClusterUtils.copyCluster(nextCluster); if(zoneIds.isEmpty()) { logger.warn("greedyShufflePartitions invoked with empty list of zone IDs."); return returnCluster; @@ -791,9 +775,9 @@ public static Cluster greedyShufflePartitions(final Cluster targetCluster, List nodeIds; if(zoneId == specialZoneId) { - nodeIds = new ArrayList(targetCluster.getNodeIds()); + nodeIds = new ArrayList(nextCluster.getNodeIds()); } else { - nodeIds = new ArrayList(targetCluster.getNodeIdsInZone(zoneId)); + nodeIds = new ArrayList(nextCluster.getNodeIdsInZone(zoneId)); } Cluster shuffleResults = swapGreedyRandomPartitions(returnCluster, diff --git a/src/java/voldemort/tools/RepartitionerCLI.java b/src/java/voldemort/tools/RepartitionerCLI.java index 4991e7e723..4c3902a6d0 100644 --- a/src/java/voldemort/tools/RepartitionerCLI.java +++ b/src/java/voldemort/tools/RepartitionerCLI.java @@ -49,15 +49,15 @@ private static void setupParser() { parser.accepts("current-cluster", "Path to current cluster xml") .withRequiredArg() .describedAs("cluster.xml"); - parser.accepts("target-cluster", "Path to target cluster xml") + parser.accepts("interim-cluster", "Path to interim cluster xml") .withRequiredArg() .describedAs("cluster.xml"); parser.accepts("current-stores", "Path to current store definition xml. Needed for cluster and zone expansion.") .withRequiredArg() .describedAs("stores.xml"); - parser.accepts("target-stores", - "Path to target store definition xml. Needed for zone expansion.") + parser.accepts("final-stores", + "Path to final store definition xml. Needed for zone expansion. Used with interim-cluster.") .withRequiredArg() .describedAs("stores.xml"); parser.accepts("attempts", @@ -128,8 +128,8 @@ private static void printUsage() { help.append(" --current-cluster \n"); help.append(" --current-stores \n"); help.append(" Optional:\n"); - help.append(" --target-cluster [ Needed for cluster or zone expansion ]\n"); - help.append(" --target-stores [ Needed for zone expansion ]\n"); + help.append(" --interim-cluster [ Needed for cluster or zone expansion ]\n"); + help.append(" --final-stores [ Needed for zone expansion ]\n"); help.append(" --output-dir [ Output directory is where we store the optimized cluster ]\n"); help.append(" --attempts [ Number of distinct cycles of repartitioning ]\n"); help.append(" --disable-node-balancing [ Do not balance number of primary partitions among nodes within each zone ] \n"); @@ -173,8 +173,8 @@ private static OptionSet getValidOptions(String[] args) { if(missing.size() > 0) { printUsageAndDie("Missing required arguments: " + Joiner.on(", ").join(missing)); } - if(options.has("target-stores") && !options.has("target-cluster")) { - printUsageAndDie("target-stores specified, but target-cluster not specified."); + if(options.has("final-stores") && !options.has("interim-cluster")) { + printUsageAndDie("final-stores specified, but interim-cluster not specified."); } return options; @@ -187,24 +187,24 @@ public static void main(String[] args) throws Exception { // Required args String currentClusterXML = (String) options.valueOf("current-cluster"); String currentStoresXML = (String) options.valueOf("current-stores"); - String targetClusterXML = new String(currentClusterXML); - if(options.has("target-cluster")) { - targetClusterXML = (String) options.valueOf("target-cluster"); + String interimClusterXML = new String(currentClusterXML); + if(options.has("interim-cluster")) { + interimClusterXML = (String) options.valueOf("interim-cluster"); } - String targetStoresXML = new String(currentStoresXML); - if(options.has("target-stores")) { - targetStoresXML = (String) options.valueOf("target-stores"); + String finalStoresXML = new String(currentStoresXML); + if(options.has("final-stores")) { + finalStoresXML = (String) options.valueOf("final-stores"); } Cluster currentCluster = new ClusterMapper().readCluster(new File(currentClusterXML)); List currentStoreDefs = new StoreDefinitionsMapper().readStoreList(new File(currentStoresXML)); RebalanceUtils.validateClusterStores(currentCluster, currentStoreDefs); - Cluster targetCluster = new ClusterMapper().readCluster(new File(targetClusterXML)); - List targetStoreDefs = new StoreDefinitionsMapper().readStoreList(new File(targetStoresXML)); - RebalanceUtils.validateClusterStores(targetCluster, targetStoreDefs); + Cluster interimCluster = new ClusterMapper().readCluster(new File(interimClusterXML)); + List finalStoreDefs = new StoreDefinitionsMapper().readStoreList(new File(finalStoresXML)); + RebalanceUtils.validateClusterStores(interimCluster, finalStoreDefs); - RebalanceUtils.validateCurrentInterimCluster(currentCluster, targetCluster); + RebalanceUtils.validateCurrentInterimCluster(currentCluster, interimCluster); // Optional administrivia args int attempts = CmdUtils.valueOf(options, @@ -258,8 +258,8 @@ public static void main(String[] args) throws Exception { Repartitioner.repartition(currentCluster, currentStoreDefs, - targetCluster, - targetStoreDefs, + interimCluster, + finalStoreDefs, outputDir, attempts, disableNodeBalancing, diff --git a/src/java/voldemort/utils/RebalanceUtils.java b/src/java/voldemort/utils/RebalanceUtils.java index 4e9af4cfb3..6217d243d6 100644 --- a/src/java/voldemort/utils/RebalanceUtils.java +++ b/src/java/voldemort/utils/RebalanceUtils.java @@ -274,9 +274,9 @@ public static void validateCurrentFinalCluster(final Cluster currentCluster, } /** - * A target cluster ought to be a super set of current cluster. I.e., it + * An interim cluster ought to be a super set of current cluster. I.e., it * ought to either be the same as current cluster (every partition is mapped - * to the same node of current & target), or it ought to have more nodes + * to the same node of current & interim), or it ought to have more nodes * (possibly in new zones) without partitions. * * @param currentCluster @@ -421,7 +421,6 @@ public static void validateClusterNodeState(final Cluster subsetCluster, } } - // TODO: Can getInterimCluster and getClusterWithNewNodes be merged? /** * Given the current cluster and final cluster, generates an interim cluster * with empty new nodes (and zones). @@ -446,22 +445,16 @@ public static Cluster getInterimCluster(Cluster currentCluster, Cluster finalClu } /** - * Given the current cluster and a target cluster, generates a cluster with - * new nodes ( which in turn contain empty partition lists ) + * Given the current cluster and an interim cluster, generates a cluster + * with new nodes (which in turn contain empty partition lists). * * @param currentCluster Current cluster metadata - * @param targetCluster Target cluster metadata + * @param interimCluster Interim cluster metadata * @return Returns a new cluster which contains nodes of the current cluster * + new nodes */ - public static Cluster getClusterWithNewNodes(Cluster currentCluster, Cluster targetCluster) { - ArrayList newNodes = new ArrayList(); - for(Node node: targetCluster.getNodes()) { - if(!ClusterUtils.containsNode(currentCluster, node.getId())) { - newNodes.add(NodeUtils.updateNode(node, new ArrayList())); - } - } - return updateCluster(currentCluster, newNodes); + public static Cluster getClusterWithNewNodes(Cluster currentCluster, Cluster interimCluster) { + return getInterimCluster(currentCluster, interimCluster); } /** @@ -537,7 +530,7 @@ public static Cluster createUpdatedCluster(Cluster currentCluster, * always. * * @param currentCluster The cluster definition of the existing cluster - * @param finalCluster The target cluster definition + * @param finalCluster The final cluster definition * @param stealNodeId Node id of the stealer node * @return Returns a list of primary partitions which this stealer node will * get @@ -566,28 +559,28 @@ public static List getStolenPrimaryPartitions(final Cluster currentClus * Find all [replica_type, partition] tuples to be stolen * * @param currentCluster Current cluster metadata - * @param targetCluster Target cluster metadata + * @param finalCluster Final cluster metadata * @param storeDef Store Definition * @return Map of stealer node id to sets of [ replica_type, partition ] * tuples */ public static Map>> getStolenPartitionTuples(final Cluster currentCluster, - final Cluster targetCluster, + final Cluster finalCluster, final StoreDefinition storeDef) { Map>> currentNodeIdToReplicas = getNodeIdToAllPartitions(currentCluster, storeDef, true); - Map>> targetNodeIdToReplicas = getNodeIdToAllPartitions(targetCluster, - storeDef, - true); + Map>> finalNodeIdToReplicas = getNodeIdToAllPartitions(finalCluster, + storeDef, + true); Map>> stealerNodeToStolenPartitionTuples = Maps.newHashMap(); - for(int stealerId: NodeUtils.getNodeIds(Lists.newArrayList(targetCluster.getNodes()))) { + for(int stealerId: NodeUtils.getNodeIds(Lists.newArrayList(finalCluster.getNodes()))) { Set> clusterStealerReplicas = currentNodeIdToReplicas.get(stealerId); - Set> targetStealerReplicas = targetNodeIdToReplicas.get(stealerId); + Set> finalStealerReplicas = finalNodeIdToReplicas.get(stealerId); Set> diff = Utils.getAddedInTarget(clusterStealerReplicas, - targetStealerReplicas); + finalStealerReplicas); if(diff != null && diff.size() > 0) { stealerNodeToStolenPartitionTuples.put(stealerId, diff); @@ -908,17 +901,17 @@ public static void dumpClusters(Cluster currentCluster, } /** - * Given the initial and final cluster dumps it into the output directory + * Given the current and final cluster dumps it into the output directory * - * @param initialCluster Initial cluster metadata + * @param currentCluster Initial cluster metadata * @param finalCluster Final cluster metadata * @param outputDir Output directory where to dump this file * @throws IOException */ - public static void dumpClusters(Cluster initialCluster, + public static void dumpClusters(Cluster currentCluster, Cluster finalCluster, String outputDirName) { - dumpClusters(initialCluster, finalCluster, outputDirName, ""); + dumpClusters(currentCluster, finalCluster, outputDirName, ""); } /** diff --git a/test/common/voldemort/ClusterTestUtils.java b/test/common/voldemort/ClusterTestUtils.java index 90a4857add..e966214647 100644 --- a/test/common/voldemort/ClusterTestUtils.java +++ b/test/common/voldemort/ClusterTestUtils.java @@ -584,30 +584,31 @@ public static Cluster getZZClusterWithNonContiguousZoneIDsAndNonContiguousNodeID } /** - * Given the current and target cluster metadata, along with your store + * Given the current and final cluster metadata, along with your store * definition, return the batch plan. * * @param currentCluster Current cluster metadata - * @param targetCluster Target cluster metadata + * @param finalCluster Final cluster metadata * @param storeDef List of store definitions * @return list of tasks for this batch plan */ public static List getBatchPlan(Cluster currentCluster, - Cluster targetCluster, + Cluster finalCluster, List storeDef) { RebalanceBatchPlan rebalancePlan = new RebalanceBatchPlan(currentCluster, - targetCluster, + finalCluster, storeDef); return rebalancePlan.getBatchPlan(); } /** - * Given the current and target cluster metadata, along with your store + * Given the current and final cluster metadata, along with your store * definition, return the batch plan. * - * @param currentCluster Current cluster metadata - * @param targetCluster Target cluster metadata - * @param storeDef List of store definitions + * @param currentCluster + * @param currentStoreDefs + * @param finalCluster + * @param finalStoreDefs * @return list of tasks for this batch plan */ public static List getBatchPlan(Cluster currentCluster, diff --git a/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java b/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java index f48a647545..546c638804 100644 --- a/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java +++ b/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java @@ -225,9 +225,9 @@ public void testRORWRebalance() throws Exception { Cluster currentCluster = ServerTestUtils.getLocalCluster(2, new int[][] { { 0, 1, 2, 3, 4, 5, 6, 7, 8 }, {} }); - Cluster targetCluster = RebalanceUtils.createUpdatedCluster(currentCluster, - 1, - Lists.newArrayList(2, 3)); + Cluster finalCluster = RebalanceUtils.createUpdatedCluster(currentCluster, + 1, + Lists.newArrayList(2, 3)); // start servers 0 , 1 only List serverList = Arrays.asList(0, 1); @@ -242,7 +242,7 @@ public void testRORWRebalance() throws Exception { boolean stealerBased = !useDonorBased; final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, stealerBased, - targetCluster); + finalCluster); try { @@ -259,7 +259,7 @@ public void testRORWRebalance() throws Exception { rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, Arrays.asList(1)); - checkConsistentMetadata(targetCluster, serverList); + checkConsistentMetadata(finalCluster, serverList); } finally { // stop servers stopServer(serverList); @@ -277,9 +277,9 @@ public void testRORWRebalanceWithReplication() throws Exception { Cluster currentCluster = ServerTestUtils.getLocalCluster(2, new int[][] { { 0, 1, 2, 3, 4, 5, 6 }, { 7, 8 } }); - Cluster targetCluster = RebalanceUtils.createUpdatedCluster(currentCluster, - 1, - Lists.newArrayList(2, 3)); + Cluster finalCluster = RebalanceUtils.createUpdatedCluster(currentCluster, + 1, + Lists.newArrayList(2, 3)); // start servers 0 , 1 only List serverList = Arrays.asList(0, 1); @@ -295,7 +295,7 @@ public void testRORWRebalanceWithReplication() throws Exception { boolean stealerBased = !useDonorBased; final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, stealerBased, - targetCluster); + finalCluster); try { // Populate the two stores @@ -311,7 +311,7 @@ public void testRORWRebalanceWithReplication() throws Exception { rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, Arrays.asList(0, 1)); - checkConsistentMetadata(targetCluster, serverList); + checkConsistentMetadata(finalCluster, serverList); } finally { // stop servers stopServer(serverList); @@ -329,9 +329,9 @@ public void testRORebalanceWithReplication() throws Exception { Cluster currentCluster = ServerTestUtils.getLocalCluster(2, new int[][] { { 0, 1, 2, 3, 4, 5, 6 }, { 7, 8 } }); - Cluster targetCluster = RebalanceUtils.createUpdatedCluster(currentCluster, - 1, - Lists.newArrayList(2, 3)); + Cluster finalCluster = RebalanceUtils.createUpdatedCluster(currentCluster, + 1, + Lists.newArrayList(2, 3)); // start servers 0 , 1 only List serverList = Arrays.asList(0, 1); @@ -358,7 +358,7 @@ public void testRORebalanceWithReplication() throws Exception { boolean stealerBased = !useDonorBased; final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, stealerBased, - targetCluster); + finalCluster); try { populateData(currentCluster, @@ -367,7 +367,7 @@ public void testRORebalanceWithReplication() throws Exception { true); rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, Arrays.asList(0, 1)); - checkConsistentMetadata(targetCluster, serverList); + checkConsistentMetadata(finalCluster, serverList); } finally { // stop servers stopServer(serverList); @@ -384,9 +384,9 @@ public void testRWRebalanceWithReplication() throws Exception { try { Cluster currentCluster = ServerTestUtils.getLocalCluster(2, new int[][] { { 0, 1, 2, 3, 4, 5, 6 }, { 7, 8 } }); - Cluster targetCluster = RebalanceUtils.createUpdatedCluster(currentCluster, - 1, - Lists.newArrayList(2, 3)); + Cluster finalCluster = RebalanceUtils.createUpdatedCluster(currentCluster, + 1, + Lists.newArrayList(2, 3)); // start servers 0 , 1 only List serverList = Arrays.asList(0, 1); currentCluster = startServers(currentCluster, @@ -398,7 +398,7 @@ public void testRWRebalanceWithReplication() throws Exception { boolean stealerBased = !useDonorBased; final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, stealerBased, - targetCluster); + finalCluster); try { populateData(currentCluster, @@ -408,7 +408,7 @@ public void testRWRebalanceWithReplication() throws Exception { rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, Arrays.asList(0, 1)); - checkConsistentMetadata(targetCluster, serverList); + checkConsistentMetadata(finalCluster, serverList); } finally { // stop servers stopServer(serverList); @@ -426,9 +426,9 @@ public void testRebalanceCleanPrimary() throws Exception { Cluster currentCluster = ServerTestUtils.getLocalCluster(3, new int[][] { { 0 }, { 1, 3 }, { 2 } }); - Cluster targetCluster = RebalanceUtils.createUpdatedCluster(currentCluster, - 2, - Lists.newArrayList(3)); + Cluster finalCluster = RebalanceUtils.createUpdatedCluster(currentCluster, + 2, + Lists.newArrayList(3)); // start servers 0 , 1, 2 Map configProps = new HashMap(); @@ -443,7 +443,7 @@ public void testRebalanceCleanPrimary() throws Exception { boolean stealerBased = !useDonorBased; final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, stealerBased, - targetCluster); + finalCluster); try { AdminClient adminClient = rebalanceKit.controller.getAdminClient(); @@ -459,7 +459,7 @@ public void testRebalanceCleanPrimary() throws Exception { rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, Arrays.asList(0, 1, 2)); - checkConsistentMetadata(targetCluster, serverList); + checkConsistentMetadata(finalCluster, serverList); // Do the cleanup operation for(int i = 0; i < 3; i++) { @@ -494,9 +494,9 @@ public void testRebalanceCleanSecondary() throws Exception { Cluster currentCluster = ServerTestUtils.getLocalCluster(3, new int[][] { { 0, 3 }, { 1 }, { 2 } }); - Cluster targetCluster = RebalanceUtils.createUpdatedCluster(currentCluster, - 2, - Lists.newArrayList(3)); + Cluster finalCluster = RebalanceUtils.createUpdatedCluster(currentCluster, + 2, + Lists.newArrayList(3)); // start servers 0 , 1, 2 Map configProps = new HashMap(); @@ -511,7 +511,7 @@ public void testRebalanceCleanSecondary() throws Exception { boolean stealerBased = !useDonorBased; final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, stealerBased, - targetCluster); + finalCluster); try { AdminClient adminClient = rebalanceKit.controller.getAdminClient(); @@ -527,7 +527,7 @@ public void testRebalanceCleanSecondary() throws Exception { rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, Arrays.asList(0, 1, 2)); - checkConsistentMetadata(targetCluster, serverList); + checkConsistentMetadata(finalCluster, serverList); // Do the cleanup operation for(int i = 0; i < 3; i++) { @@ -571,7 +571,7 @@ public void testRWRebalanceFourNodes() throws Exception { ports[i * 3 + 2] = nodes.get(i).getAdminPort(); } - Cluster targetCluster = ServerTestUtils.getLocalCluster(4, ports, new int[][] { + Cluster finalCluster = ServerTestUtils.getLocalCluster(4, ports, new int[][] { { 0, 4, 7 }, { 2, 8 }, { 1, 6 }, { 3, 5, 9 } }); // start servers @@ -587,7 +587,7 @@ public void testRWRebalanceFourNodes() throws Exception { final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, maxParallel, stealerBased, - targetCluster); + finalCluster); try { populateData(currentCluster, @@ -602,7 +602,7 @@ public void testRWRebalanceFourNodes() throws Exception { rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, serverList); - checkConsistentMetadata(targetCluster, serverList); + checkConsistentMetadata(finalCluster, serverList); } catch(Exception e) { fail(e.getMessage()); } finally { @@ -631,7 +631,7 @@ public void testRWRebalanceSerial() throws Exception { ports[i * 3 + 2] = nodes.get(i).getAdminPort(); } - Cluster targetCluster = ServerTestUtils.getLocalCluster(4, ports, new int[][] { + Cluster finalCluster = ServerTestUtils.getLocalCluster(4, ports, new int[][] { { 0, 4, 7 }, { 2, 8 }, { 1, 6 }, { 3, 5, 9 } }); // start servers @@ -649,7 +649,7 @@ public void testRWRebalanceSerial() throws Exception { final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, maxParallel, stealerBased, - targetCluster); + finalCluster); try { populateData(currentCluster, @@ -664,7 +664,7 @@ public void testRWRebalanceSerial() throws Exception { rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, serverList); - checkConsistentMetadata(targetCluster, serverList); + checkConsistentMetadata(finalCluster, serverList); } catch(Exception e) { fail(e.getMessage()); } finally { @@ -684,10 +684,10 @@ public void testProxyGetDuringRebalancing() throws Exception { final Cluster currentCluster = ServerTestUtils.getLocalCluster(2, new int[][] { { 0, 1, 2, 3, 4, 5, 6 }, { 7, 8 } }); - final Cluster targetCluster = RebalanceUtils.createUpdatedCluster(currentCluster, - 1, - Lists.newArrayList(2, - 3)); + final Cluster finalCluster = RebalanceUtils.createUpdatedCluster(currentCluster, + 1, + Lists.newArrayList(2, + 3)); // start servers 0 , 1 only final List serverList = Arrays.asList(0, 1); Map configProps = new HashMap(); @@ -708,7 +708,7 @@ public void testProxyGetDuringRebalancing() throws Exception { final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, maxParallel, stealerBased, - targetCluster); + finalCluster); // Populate the two stores populateData(updatedCurrentCluster, @@ -799,7 +799,7 @@ public void run() { Thread.sleep(500); rebalancingComplete.set(true); - checkConsistentMetadata(targetCluster, serverList); + checkConsistentMetadata(finalCluster, serverList); } catch(Exception e) { exceptions.add(e); logger.error("Exception in rebalancing thread", e); @@ -839,9 +839,9 @@ public void testProxyPutDuringRebalancing() throws Exception { Cluster currentCluster = ServerTestUtils.getLocalCluster(3, new int[][] { { 0 }, { 1, 3 }, { 2 } }); - Cluster targetCluster = RebalanceUtils.createUpdatedCluster(currentCluster, - 2, - Lists.newArrayList(3)); + Cluster finalCluster = RebalanceUtils.createUpdatedCluster(currentCluster, + 2, + Lists.newArrayList(3)); // start servers 0,1,2 only final List serverList = Arrays.asList(0, 1, 2); @@ -866,7 +866,7 @@ public void testProxyPutDuringRebalancing() throws Exception { final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, maxParallel, stealerBased, - targetCluster); + finalCluster); populateData(updatedCurrentCluster, rwStoreDefWithReplication, @@ -1000,12 +1000,12 @@ public void run() { true); assertEquals("Not enough time to begin proxy writing", proxyWritesDone.get(), true); checkEntriesPostRebalance(updatedCurrentCluster, - targetCluster, + finalCluster, Lists.newArrayList(rwStoreDefWithReplication), Arrays.asList(0, 1, 2), baselineTuples, baselineVersions); - checkConsistentMetadata(targetCluster, serverList); + checkConsistentMetadata(finalCluster, serverList); // check No Exception if(exceptions.size() > 0) { @@ -1052,10 +1052,10 @@ public void testServerSideRouting() throws Exception { final Cluster currentCluster = ServerTestUtils.getLocalCluster(2, new int[][] { { 0, 1, 2, 3, 4, 5, 6 }, { 7, 8 } }); - final Cluster targetCluster = RebalanceUtils.createUpdatedCluster(currentCluster, - 1, - Lists.newArrayList(2, - 3)); + final Cluster finalCluster = RebalanceUtils.createUpdatedCluster(currentCluster, + 1, + Lists.newArrayList(2, + 3)); final List serverList = Arrays.asList(0, 1); Map configProps = new HashMap(); @@ -1075,7 +1075,7 @@ public void testServerSideRouting() throws Exception { final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, maxParallel, stealerBased, - targetCluster); + finalCluster); // Populate the two stores populateData(updatedCurrentCluster, @@ -1168,7 +1168,7 @@ public void run() { Thread.sleep(500); rebalancingToken.set(true); - checkConsistentMetadata(targetCluster, serverList); + checkConsistentMetadata(finalCluster, serverList); } catch(Exception e) { exceptions.add(e); } finally { diff --git a/test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java b/test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java index c707e2094c..54b9060f1c 100644 --- a/test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java +++ b/test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java @@ -143,9 +143,9 @@ protected Cluster getCurrentCluster(int nodeId) { } } - public void checkConsistentMetadata(Cluster targetCluster, List serverList) { + public void checkConsistentMetadata(Cluster finalCluster, List serverList) { for(int nodeId: serverList) { - assertEquals(targetCluster, getCurrentCluster(nodeId)); + assertEquals(finalCluster, getCurrentCluster(nodeId)); assertEquals(MetadataStore.VoldemortState.NORMAL_SERVER, getCurrentState(nodeId)); } } @@ -196,14 +196,14 @@ protected void rebalanceAndCheck(RebalancePlan rebalancePlan, * the rebalance. * * @param currentCluster - * @param targetCluster + * @param finalCluster * @param storeDefs * @param nodeCheckList * @param baselineTuples * @param baselineVersions */ protected void checkEntriesPostRebalance(Cluster currentCluster, - Cluster targetCluster, + Cluster finalCluster, List storeDefs, List nodeCheckList, HashMap baselineTuples, @@ -212,23 +212,23 @@ protected void checkEntriesPostRebalance(Cluster currentCluster, Map>> currentNodeToPartitionTuples = RebalanceUtils.getNodeIdToAllPartitions(currentCluster, storeDef, true); - Map>> targetNodeToPartitionTuples = RebalanceUtils.getNodeIdToAllPartitions(targetCluster, - storeDef, - true); + Map>> finalNodeToPartitionTuples = RebalanceUtils.getNodeIdToAllPartitions(finalCluster, + storeDef, + true); for(int nodeId: nodeCheckList) { Set> currentPartitionTuples = currentNodeToPartitionTuples.get(nodeId); - Set> targetPartitionTuples = targetNodeToPartitionTuples.get(nodeId); + Set> finalPartitionTuples = finalNodeToPartitionTuples.get(nodeId); HashMap> flattenedPresentTuples = RebalanceUtils.flattenPartitionTuples(Utils.getAddedInTarget(currentPartitionTuples, - targetPartitionTuples)); + finalPartitionTuples)); Store store = getSocketStore(storeDef.getName(), - targetCluster.getNodeById(nodeId) - .getHost(), - targetCluster.getNodeById(nodeId) - .getSocketPort()); - checkGetEntries(targetCluster.getNodeById(nodeId), - targetCluster, + finalCluster.getNodeById(nodeId) + .getHost(), + finalCluster.getNodeById(nodeId) + .getSocketPort()); + checkGetEntries(finalCluster.getNodeById(nodeId), + finalCluster, storeDef, store, flattenedPresentTuples, diff --git a/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java b/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java index 612d281ea7..7c22b9dbcb 100644 --- a/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java +++ b/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java @@ -240,14 +240,6 @@ public void tearDown() { socketStoreFactory = null; } - // TODO: (currentCluster vs interimCluster) Ideally, we could go from - // cCluster to fCluster for zone expansion. Unfortunately, to start a - // VoldemortServer, you need a cluster xml that includes that server. For - // now, we assume interim cluster is deployed (i.e., cluster with empty - // nodes in new zones). Either, deploying interim cluster with empty nodes - // must be codified in run book and tested as a pre-condition or servers - // need to be able to start without a cluster xml that includes them. - // TODO: The tests based on this method are susceptible to TOCTOU // BindException issue since findFreePorts is used to determine the ports // for localhost:PORT of each node. @@ -360,17 +352,7 @@ public void testClusterExpansionZZZ() throws Exception { @Test(timeout = 600000) public void testZoneExpansionZZ2ZZZ() throws Exception { - // TODO: see todo for method testZonedRebalance to understand why we - // cannot invoke the following: - /*- - testZonedRebalance("TestZoneExpansionZZ2ZZZ", - zzCurrent, - zzzZoneExpansionXXP, - zzStoresXml, - zzzStoresXml, - zzStores, - zzzStores); - */ + // Pass in an interim cluster for currentCluster testZonedRebalance("TestZoneExpansionZZ2ZZZ", zzeZoneExpansion, zzzZoneExpansionXXP, @@ -385,12 +367,12 @@ public void testRWRebalance() throws Exception { Cluster currentCluster = ServerTestUtils.getLocalZonedCluster(4, 2, new int[] { 0, 0, 1, 1 }, new int[][] { { 0, 2, 4, 6 }, {}, { 1, 3, 5, 7 }, {} }); - Cluster targetCluster = RebalanceUtils.createUpdatedCluster(currentCluster, - 3, - Lists.newArrayList(2, 6)); - targetCluster = RebalanceUtils.createUpdatedCluster(targetCluster, - 1, - Lists.newArrayList(3, 7)); + Cluster finalCluster = RebalanceUtils.createUpdatedCluster(currentCluster, + 3, + Lists.newArrayList(2, 6)); + finalCluster = RebalanceUtils.createUpdatedCluster(finalCluster, + 1, + Lists.newArrayList(3, 7)); // start all the servers List serverList = Arrays.asList(0, 1, 2, 3); @@ -405,14 +387,14 @@ public void testRWRebalance() throws Exception { boolean stealerBased = !useDonorBased; ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, stealerBased, - targetCluster); + finalCluster); try { populateData(currentCluster, rwStoreDefWithoutReplication); rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, Arrays.asList(1, 2)); - checkConsistentMetadata(targetCluster, serverList); + checkConsistentMetadata(finalCluster, serverList); } finally { // stop servers stopServer(serverList); @@ -432,10 +414,10 @@ public void testRWRebalanceWithReplication(boolean serial) throws Exception { new int[][] { { 0, 2, 4 }, { 6 }, { 1, 3, 5 }, { 7 } }); - Cluster targetCluster = RebalanceUtils.createUpdatedCluster(currentCluster, - 3, - Lists.newArrayList(2)); - targetCluster = RebalanceUtils.createUpdatedCluster(targetCluster, 1, Lists.newArrayList(3)); + Cluster finalCluster = RebalanceUtils.createUpdatedCluster(currentCluster, + 3, + Lists.newArrayList(2)); + finalCluster = RebalanceUtils.createUpdatedCluster(finalCluster, 1, Lists.newArrayList(3)); // start servers List serverList = Arrays.asList(0, 1, 2, 3); @@ -454,7 +436,7 @@ public void testRWRebalanceWithReplication(boolean serial) throws Exception { ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, maxParallel, stealerBased, - targetCluster); + finalCluster); try { @@ -462,7 +444,7 @@ public void testRWRebalanceWithReplication(boolean serial) throws Exception { rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, Arrays.asList(0, 1, 2, 3)); - checkConsistentMetadata(targetCluster, serverList); + checkConsistentMetadata(finalCluster, serverList); } finally { // stop servers stopServer(serverList); @@ -495,12 +477,12 @@ public void testRebalanceCleanPrimarySecondary() throws Exception { try { Cluster currentCluster = ServerTestUtils.getLocalZonedCluster(6, 2, new int[] { 0, 0, 0, 1, 1, 1 }, new int[][] { { 0 }, { 1, 6 }, { 2 }, { 3 }, { 4, 7 }, { 5 } }); - Cluster targetCluster = RebalanceUtils.createUpdatedCluster(currentCluster, - 2, - Lists.newArrayList(7)); - targetCluster = RebalanceUtils.createUpdatedCluster(targetCluster, - 5, - Lists.newArrayList(6)); + Cluster finalCluster = RebalanceUtils.createUpdatedCluster(currentCluster, + 2, + Lists.newArrayList(7)); + finalCluster = RebalanceUtils.createUpdatedCluster(finalCluster, + 5, + Lists.newArrayList(6)); /** * original server partition ownership @@ -508,7 +490,7 @@ public void testRebalanceCleanPrimarySecondary() throws Exception { * [s0 : p0,p3,p4,p5,p6,p7] [s1 : p1-p7] [s2 : p1,p2] [s3 : * p0,p1,p2,p3,p6,p7] [s4 : p1-p7] [s5 : p4,p5] * - * target server partition ownership + * final server partition ownership * * [s0 : p0,p2,p3,p4,p5,p6,p7] [s1 : p0,p1] [s2 : p1-p7] [s3 : * p0.p1,p2,p3,p5,p6,p7] [s4 : p0,p1,p2,p3,p4,p7] [s5 : p4,p5,p6] @@ -527,7 +509,7 @@ public void testRebalanceCleanPrimarySecondary() throws Exception { boolean stealerBased = !useDonorBased; ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, stealerBased, - targetCluster); + finalCluster); try { populateData(currentCluster, rwStoreDefWithReplication); @@ -564,7 +546,7 @@ public void testRebalanceCleanPrimarySecondary() throws Exception { rebalanceKit.controller, Arrays.asList(0, 1, 2, 3)); - checkConsistentMetadata(targetCluster, serverList); + checkConsistentMetadata(finalCluster, serverList); // Do the cleanup operation for(int i = 0; i < 6; i++) { @@ -611,12 +593,12 @@ public void testProxyGetDuringRebalancing() throws Exception { try { Cluster currentCluster = ServerTestUtils.getLocalZonedCluster(4, 2, new int[] { 0, 0, 1, 1 }, new int[][] { { 0, 2, 4 }, { 6 }, { 1, 3, 5 }, { 7 } }); - Cluster tmpTargetCluster = RebalanceUtils.createUpdatedCluster(currentCluster, - 3, - Lists.newArrayList(2)); - final Cluster targetCluster = RebalanceUtils.createUpdatedCluster(tmpTargetCluster, - 1, - Lists.newArrayList(3)); + Cluster tmpfinalCluster = RebalanceUtils.createUpdatedCluster(currentCluster, + 3, + Lists.newArrayList(2)); + final Cluster finalCluster = RebalanceUtils.createUpdatedCluster(tmpfinalCluster, + 1, + Lists.newArrayList(3)); final List serverList = Arrays.asList(0, 1, 2, 3); Map configProps = new HashMap(); @@ -637,7 +619,7 @@ public void testProxyGetDuringRebalancing() throws Exception { final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, maxParallel, stealerBased, - targetCluster); + finalCluster); try { @@ -707,7 +689,7 @@ public void run() { Thread.sleep(500); rebalancingComplete.set(true); - checkConsistentMetadata(targetCluster, serverList); + checkConsistentMetadata(finalCluster, serverList); } catch(Exception e) { exceptions.add(e); @@ -750,12 +732,12 @@ public void testProxyPutDuringRebalancing() throws Exception { try { Cluster currentCluster = ServerTestUtils.getLocalZonedCluster(6, 2, new int[] { 0, 0, 0, 1, 1, 1 }, new int[][] { { 0 }, { 1, 6 }, { 2 }, { 3 }, { 4, 7 }, { 5 } }); - Cluster targetCluster = RebalanceUtils.createUpdatedCluster(currentCluster, - 2, - Lists.newArrayList(7)); - targetCluster = RebalanceUtils.createUpdatedCluster(targetCluster, - 5, - Lists.newArrayList(6)); + Cluster finalCluster = RebalanceUtils.createUpdatedCluster(currentCluster, + 2, + Lists.newArrayList(7)); + finalCluster = RebalanceUtils.createUpdatedCluster(finalCluster, + 5, + Lists.newArrayList(6)); /** * Original partition map @@ -764,7 +746,7 @@ public void testProxyPutDuringRebalancing() throws Exception { * * [s3 : p3] [s4 : p4, p7] [s5 : p5] * - * Target server partition ownership + * final server partition ownership * * [s0 : p0] [s1 : p1] [s2 : p2, p7] * @@ -778,7 +760,7 @@ public void testProxyPutDuringRebalancing() throws Exception { * * [s3 : p0-3, p6-7] [s4 : p0-p7] [s5 : p4-5] * - * Target server n-ary partition ownership + * final server n-ary partition ownership * * [s0 : p0, p2-7] [s1 : p0-1] [s2 : p1-p7] * @@ -807,7 +789,7 @@ public void testProxyPutDuringRebalancing() throws Exception { final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, maxParallel, stealerBased, - targetCluster); + finalCluster); populateData(currentCluster, rwStoreDefWithReplication); final AdminClient adminClient = rebalanceKit.controller.getAdminClient(); @@ -944,12 +926,12 @@ public void run() { true); assertEquals("Not enough time to begin proxy writing", proxyWritesDone.get(), true); checkEntriesPostRebalance(updatedCurrentCluster, - targetCluster, + finalCluster, Lists.newArrayList(rwStoreDefWithReplication), Arrays.asList(0, 1, 2, 3, 4, 5), baselineTuples, baselineVersions); - checkConsistentMetadata(targetCluster, serverList); + checkConsistentMetadata(finalCluster, serverList); // check No Exception if(exceptions.size() > 0) { for(Exception e: exceptions) { diff --git a/test/unit/voldemort/client/rebalance/AdminRebalanceTest.java b/test/unit/voldemort/client/rebalance/AdminRebalanceTest.java index 5f0bb48d0b..4db003ab3f 100644 --- a/test/unit/voldemort/client/rebalance/AdminRebalanceTest.java +++ b/test/unit/voldemort/client/rebalance/AdminRebalanceTest.java @@ -95,8 +95,8 @@ public class AdminRebalanceTest { private StoreDefinition storeDef3; private StoreDefinition storeDef4; private VoldemortServer[] servers; - private Cluster cluster; - private Cluster targetCluster; + private Cluster currentCluster; + private Cluster finalCluster; private AdminClient adminClient; private List plans; @@ -135,22 +135,22 @@ public void startThreeNodeRW() throws IOException { int numServers = 3; servers = new VoldemortServer[numServers]; int partitionMap[][] = { { 0, 1, 2, 3 }, { 4, 5, 6, 7 }, {} }; - cluster = ServerTestUtils.startVoldemortCluster(numServers, - servers, - partitionMap, - socketStoreFactory, - useNio, - null, - tempStoreXml.getAbsolutePath(), - new Properties()); - - targetCluster = RebalanceUtils.createUpdatedCluster(cluster, 2, Lists.newArrayList(0)); - - RebalanceBatchPlan plan = new RebalanceBatchPlan(cluster, - targetCluster, + currentCluster = ServerTestUtils.startVoldemortCluster(numServers, + servers, + partitionMap, + socketStoreFactory, + useNio, + null, + tempStoreXml.getAbsolutePath(), + new Properties()); + + finalCluster = RebalanceUtils.createUpdatedCluster(currentCluster, 2, Lists.newArrayList(0)); + + RebalanceBatchPlan plan = new RebalanceBatchPlan(currentCluster, + finalCluster, Lists.newArrayList(storeDef1, storeDef2)); plans = Lists.newArrayList(plan.getBatchPlan()); - adminClient = ServerTestUtils.getAdminClient(cluster); + adminClient = ServerTestUtils.getAdminClient(currentCluster); } public void startFourNodeRW() throws IOException { @@ -176,21 +176,21 @@ public void startFourNodeRW() throws IOException { int numServers = 4; servers = new VoldemortServer[numServers]; int partitionMap[][] = { { 0, 1, 2, 3 }, { 4, 5, 6, 7 }, { 8, 9, 10, 11 }, {} }; - cluster = ServerTestUtils.startVoldemortCluster(numServers, - servers, - partitionMap, - socketStoreFactory, - useNio, - null, - tempStoreXml.getAbsolutePath(), - new Properties()); - - targetCluster = RebalanceUtils.createUpdatedCluster(cluster, 3, Lists.newArrayList(0)); - RebalanceBatchPlan plan = new RebalanceBatchPlan(cluster, - targetCluster, + currentCluster = ServerTestUtils.startVoldemortCluster(numServers, + servers, + partitionMap, + socketStoreFactory, + useNio, + null, + tempStoreXml.getAbsolutePath(), + new Properties()); + + finalCluster = RebalanceUtils.createUpdatedCluster(currentCluster, 3, Lists.newArrayList(0)); + RebalanceBatchPlan plan = new RebalanceBatchPlan(currentCluster, + finalCluster, Lists.newArrayList(storeDef1, storeDef2)); plans = Lists.newArrayList(plan.getBatchPlan()); - adminClient = ServerTestUtils.getAdminClient(cluster); + adminClient = ServerTestUtils.getAdminClient(currentCluster); } public void startFourNodeRO() throws IOException { @@ -226,22 +226,22 @@ public void startFourNodeRO() throws IOException { int numServers = 4; servers = new VoldemortServer[numServers]; int partitionMap[][] = { { 0, 1, 2, 3 }, { 4, 5, 6, 7 }, { 8, 9, 10, 11 }, {} }; - cluster = ServerTestUtils.startVoldemortCluster(numServers, - servers, - partitionMap, - socketStoreFactory, - useNio, - null, - tempStoreXml.getAbsolutePath(), - new Properties()); - - targetCluster = RebalanceUtils.createUpdatedCluster(cluster, 3, Lists.newArrayList(0)); - RebalanceBatchPlan plan = new RebalanceBatchPlan(cluster, - targetCluster, + currentCluster = ServerTestUtils.startVoldemortCluster(numServers, + servers, + partitionMap, + socketStoreFactory, + useNio, + null, + tempStoreXml.getAbsolutePath(), + new Properties()); + + finalCluster = RebalanceUtils.createUpdatedCluster(currentCluster, 3, Lists.newArrayList(0)); + RebalanceBatchPlan plan = new RebalanceBatchPlan(currentCluster, + finalCluster, Lists.newArrayList(storeDef1, storeDef2)); plans = Lists.newArrayList(plan.getBatchPlan()); - adminClient = ServerTestUtils.getAdminClient(cluster); + adminClient = ServerTestUtils.getAdminClient(currentCluster); } public void startFourNodeRORW() throws IOException { @@ -294,23 +294,23 @@ public void startFourNodeRORW() throws IOException { int numServers = 4; servers = new VoldemortServer[numServers]; int partitionMap[][] = { { 0, 1, 2, 3 }, { 4, 5, 6, 7 }, { 8, 9, 10, 11 }, {} }; - cluster = ServerTestUtils.startVoldemortCluster(numServers, - servers, - partitionMap, - socketStoreFactory, - useNio, - null, - tempStoreXml.getAbsolutePath(), - new Properties()); - - targetCluster = RebalanceUtils.createUpdatedCluster(cluster, 3, Lists.newArrayList(0)); + currentCluster = ServerTestUtils.startVoldemortCluster(numServers, + servers, + partitionMap, + socketStoreFactory, + useNio, + null, + tempStoreXml.getAbsolutePath(), + new Properties()); + + finalCluster = RebalanceUtils.createUpdatedCluster(currentCluster, 3, Lists.newArrayList(0)); // Make plan only with RO stores - RebalanceBatchPlan plan = new RebalanceBatchPlan(cluster, - targetCluster, + RebalanceBatchPlan plan = new RebalanceBatchPlan(currentCluster, + finalCluster, Lists.newArrayList(storeDef1, storeDef2)); plans = plan.getBatchPlan(); - adminClient = ServerTestUtils.getAdminClient(cluster); + adminClient = ServerTestUtils.getAdminClient(currentCluster); } @@ -359,11 +359,11 @@ public void testRebalanceNodeRW() throws IOException { HashMap entrySet = ServerTestUtils.createRandomKeyValuePairs(TEST_SIZE); SocketStoreClientFactory factory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(Lists.newArrayList("tcp://" - + cluster.getNodeById(0) - .getHost() + + currentCluster.getNodeById(0) + .getHost() + ":" - + cluster.getNodeById(0) - .getSocketPort()))); + + currentCluster.getNodeById(0) + .getSocketPort()))); StoreClient storeClient1 = factory.getStoreClient("test"), storeClient2 = factory.getStoreClient("test2"); List primaryPartitionsMoved = Lists.newArrayList(0); @@ -373,7 +373,7 @@ public void testRebalanceNodeRW() throws IOException { HashMap secondaryEntriesMoved = Maps.newHashMap(); RoutingStrategy strategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDef2, - cluster); + currentCluster); for(Entry entry: entrySet.entrySet()) { storeClient1.put(new String(entry.getKey().get()), new String(entry.getValue())); storeClient2.put(new String(entry.getKey().get()), new String(entry.getValue())); @@ -436,7 +436,7 @@ public void testRebalanceNodeRW() throws IOException { // Update the cluster metadata on all three nodes for(VoldemortServer server: servers) { - server.getMetadataStore().put(MetadataStore.CLUSTER_KEY, targetCluster); + server.getMetadataStore().put(MetadataStore.CLUSTER_KEY, finalCluster); } // Actually run it @@ -524,11 +524,11 @@ public void testRebalanceNodeRW2() throws IOException { HashMap entrySet = ServerTestUtils.createRandomKeyValuePairs(TEST_SIZE); SocketStoreClientFactory factory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(Lists.newArrayList("tcp://" - + cluster.getNodeById(0) - .getHost() + + currentCluster.getNodeById(0) + .getHost() + ":" - + cluster.getNodeById(0) - .getSocketPort()))); + + currentCluster.getNodeById(0) + .getSocketPort()))); StoreClient storeClient1 = factory.getStoreClient("test"), storeClient2 = factory.getStoreClient("test2"); List primaryPartitionsMoved = Lists.newArrayList(0); @@ -540,7 +540,7 @@ public void testRebalanceNodeRW2() throws IOException { HashMap tertiaryEntriesMoved = Maps.newHashMap(); RoutingStrategy strategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDef2, - cluster); + currentCluster); for(Entry entry: entrySet.entrySet()) { storeClient1.put(new String(entry.getKey().get()), new String(entry.getValue())); storeClient2.put(new String(entry.getKey().get()), new String(entry.getValue())); @@ -569,7 +569,7 @@ public void testRebalanceNodeRW2() throws IOException { // Update the cluster metadata on all three nodes for(VoldemortServer server: servers) { - server.getMetadataStore().put(MetadataStore.CLUSTER_KEY, targetCluster); + server.getMetadataStore().put(MetadataStore.CLUSTER_KEY, finalCluster); } // Actually run it @@ -805,10 +805,8 @@ public void testRebalanceNodeRO() throws IOException { .build())); try { - // TODO pass the target storedefs - // ATTENTION JAY - adminClient.rebalanceOps.rebalanceStateChange(cluster, - targetCluster, + adminClient.rebalanceOps.rebalanceStateChange(currentCluster, + finalCluster, servers[2].getMetadataStore() .getStoreDefList(), servers[2].getMetadataStore() @@ -827,13 +825,11 @@ public void testRebalanceNodeRO() throws IOException { // Test that all servers are still using the old cluster and have // swapped successfully - checkRO(cluster); + checkRO(currentCluster); // Test 2) All passes scenario - // TODO pass the target storedefs - // ATTENTION JAY - adminClient.rebalanceOps.rebalanceStateChange(cluster, - targetCluster, + adminClient.rebalanceOps.rebalanceStateChange(currentCluster, + finalCluster, servers[2].getMetadataStore() .getStoreDefList(), servers[2].getMetadataStore() @@ -845,7 +841,7 @@ public void testRebalanceNodeRO() throws IOException { true, true); - checkRO(targetCluster); + checkRO(finalCluster); // Test 3) Now try fetching files again even though they are // mmap-ed. Should fail... @@ -926,13 +922,11 @@ public void testRebalanceNodeRORW() throws IOException, InterruptedException { .update(new RebalancePartitionsInfo(3, 0, new HashMap>>(), - cluster)); + currentCluster)); try { - // TODO pass the target storedefs - // ATTENTION JAY - adminClient.rebalanceOps.rebalanceStateChange(cluster, - targetCluster, + adminClient.rebalanceOps.rebalanceStateChange(currentCluster, + finalCluster, servers[2].getMetadataStore() .getStoreDefList(), servers[2].getMetadataStore() @@ -955,9 +949,9 @@ public void testRebalanceNodeRORW() throws IOException, InterruptedException { assertEquals(server.getMetadataStore().getServerStateUnlocked(), MetadataStore.VoldemortState.NORMAL_SERVER); } - assertEquals(server.getMetadataStore().getCluster(), cluster); + assertEquals(server.getMetadataStore().getCluster(), currentCluster); } - checkRO(cluster); + checkRO(currentCluster); // Clean-up everything cleanUpAllState(); @@ -983,10 +977,8 @@ public void testRebalanceNodeRORW() throws IOException, InterruptedException { .build())); try { - // // TODO pass the target storedefs - // ATTENTION JAY - adminClient.rebalanceOps.rebalanceStateChange(cluster, - targetCluster, + adminClient.rebalanceOps.rebalanceStateChange(currentCluster, + finalCluster, servers[2].getMetadataStore() .getStoreDefList(), servers[2].getMetadataStore() @@ -1007,9 +999,9 @@ public void testRebalanceNodeRORW() throws IOException, InterruptedException { new RebalancerState(new ArrayList())); assertEquals(server.getMetadataStore().getServerStateUnlocked(), MetadataStore.VoldemortState.NORMAL_SERVER); - assertEquals(server.getMetadataStore().getCluster(), cluster); + assertEquals(server.getMetadataStore().getCluster(), currentCluster); } - checkRO(cluster); + checkRO(currentCluster); // Clean-up everything cleanUpAllState(); @@ -1022,11 +1014,8 @@ public void testRebalanceNodeRORW() throws IOException, InterruptedException { storeDef4)); // Test 3) Everything should work - - // TODO pass the target storedefs - // ATTENTION JAY - adminClient.rebalanceOps.rebalanceStateChange(cluster, - targetCluster, + adminClient.rebalanceOps.rebalanceStateChange(currentCluster, + finalCluster, servers[2].getMetadataStore() .getStoreDefList(), servers[2].getMetadataStore() @@ -1047,10 +1036,10 @@ public void testRebalanceNodeRORW() throws IOException, InterruptedException { .getServerStateUnlocked(), MetadataStore.VoldemortState.REBALANCING_MASTER_SERVER); assertEquals(servers[plan.getStealerId()].getMetadataStore().getCluster(), - targetCluster); + finalCluster); } - List allNodes = Lists.newArrayList(NodeUtils.getNodeIds(Lists.newArrayList(cluster.getNodes()))); + List allNodes = Lists.newArrayList(NodeUtils.getNodeIds(Lists.newArrayList(currentCluster.getNodes()))); allNodes.removeAll(nodesChecked); // Check all other nodes @@ -1059,10 +1048,10 @@ public void testRebalanceNodeRORW() throws IOException, InterruptedException { new RebalancerState(new ArrayList())); assertEquals(servers[nodeId].getMetadataStore().getServerStateUnlocked(), MetadataStore.VoldemortState.NORMAL_SERVER); - assertEquals(servers[nodeId].getMetadataStore().getCluster(), targetCluster); + assertEquals(servers[nodeId].getMetadataStore().getCluster(), finalCluster); } - checkRO(targetCluster); + checkRO(finalCluster); } finally { shutDown(); } @@ -1101,10 +1090,8 @@ public void testRebalanceStateChange() throws IOException { startFourNodeRW(); // Test 1) Normal case where-in all are up - // TODO pass the target storedefs - // ATTENTION JAY - adminClient.rebalanceOps.rebalanceStateChange(cluster, - targetCluster, + adminClient.rebalanceOps.rebalanceStateChange(currentCluster, + finalCluster, servers[2].getMetadataStore() .getStoreDefList(), servers[2].getMetadataStore() @@ -1123,7 +1110,7 @@ public void testRebalanceStateChange() throws IOException { new RebalancerState(Lists.newArrayList(plan))); } - List allNodes = Lists.newArrayList(NodeUtils.getNodeIds(Lists.newArrayList(cluster.getNodes()))); + List allNodes = Lists.newArrayList(NodeUtils.getNodeIds(Lists.newArrayList(currentCluster.getNodes()))); allNodes.removeAll(nodesChecked); // Check all other nodes @@ -1142,13 +1129,11 @@ public void testRebalanceStateChange() throws IOException { .update(new RebalancePartitionsInfo(3, 0, new HashMap>>(), - cluster)); + currentCluster)); try { - // TODO pass the target storedefs - // ATTENTION JAY - adminClient.rebalanceOps.rebalanceStateChange(cluster, - targetCluster, + adminClient.rebalanceOps.rebalanceStateChange(currentCluster, + finalCluster, servers[2].getMetadataStore() .getStoreDefList(), servers[2].getMetadataStore() @@ -1179,10 +1164,8 @@ public void testRebalanceStateChange() throws IOException { servers[3] = null; try { - // TODO pass the target storedefs - // ATTENTION JAY - adminClient.rebalanceOps.rebalanceStateChange(cluster, - targetCluster, + adminClient.rebalanceOps.rebalanceStateChange(currentCluster, + finalCluster, servers[2].getMetadataStore() .getStoreDefList(), servers[2].getMetadataStore() @@ -1216,10 +1199,8 @@ public void testClusterAndRebalanceStateChange() throws IOException { startFourNodeRW(); // Test 1) Normal case where-in all are up - // TODO pass the target storedefs - // ATTENTION JAY - adminClient.rebalanceOps.rebalanceStateChange(cluster, - targetCluster, + adminClient.rebalanceOps.rebalanceStateChange(currentCluster, + finalCluster, servers[2].getMetadataStore() .getStoreDefList(), servers[2].getMetadataStore() @@ -1237,17 +1218,17 @@ public void testClusterAndRebalanceStateChange() throws IOException { assertEquals(servers[plan.getStealerId()].getMetadataStore().getRebalancerState(), new RebalancerState(Lists.newArrayList(plan))); assertEquals(servers[plan.getStealerId()].getMetadataStore().getCluster(), - targetCluster); + finalCluster); } - List allNodes = Lists.newArrayList(NodeUtils.getNodeIds(Lists.newArrayList(cluster.getNodes()))); + List allNodes = Lists.newArrayList(NodeUtils.getNodeIds(Lists.newArrayList(currentCluster.getNodes()))); allNodes.removeAll(nodesChecked); // Check all other nodes for(int nodeId: allNodes) { assertEquals(servers[nodeId].getMetadataStore().getRebalancerState(), new RebalancerState(new ArrayList())); - assertEquals(servers[nodeId].getMetadataStore().getCluster(), targetCluster); + assertEquals(servers[nodeId].getMetadataStore().getCluster(), finalCluster); } // Clean-up everything @@ -1260,13 +1241,11 @@ public void testClusterAndRebalanceStateChange() throws IOException { .update(new RebalancePartitionsInfo(3, 0, new HashMap>>(), - cluster)); + currentCluster)); try { - // TODO pass the target storedefs - // ATTENTION JAY - adminClient.rebalanceOps.rebalanceStateChange(cluster, - targetCluster, + adminClient.rebalanceOps.rebalanceStateChange(currentCluster, + finalCluster, servers[2].getMetadataStore() .getStoreDefList(), servers[2].getMetadataStore() @@ -1287,7 +1266,7 @@ public void testClusterAndRebalanceStateChange() throws IOException { assertEquals(server.getMetadataStore().getRebalancerState(), new RebalancerState(new ArrayList())); } - assertEquals(server.getMetadataStore().getCluster(), cluster); + assertEquals(server.getMetadataStore().getCluster(), currentCluster); } // Clean-up everything @@ -1298,10 +1277,8 @@ public void testClusterAndRebalanceStateChange() throws IOException { servers[3] = null; try { - // TODO pass the target storedefs - // ATTENTION JAY - adminClient.rebalanceOps.rebalanceStateChange(cluster, - targetCluster, + adminClient.rebalanceOps.rebalanceStateChange(currentCluster, + finalCluster, servers[2].getMetadataStore() .getStoreDefList(), servers[2].getMetadataStore() @@ -1321,7 +1298,7 @@ public void testClusterAndRebalanceStateChange() throws IOException { if(server != null) { assertEquals(server.getMetadataStore().getRebalancerState(), new RebalancerState(new ArrayList())); - assertEquals(server.getMetadataStore().getCluster(), cluster); + assertEquals(server.getMetadataStore().getCluster(), currentCluster); } } } finally { @@ -1334,7 +1311,7 @@ private void cleanUpAllState() { if(server != null) { // Put back the old cluster metadata - server.getMetadataStore().put(MetadataStore.CLUSTER_KEY, cluster); + server.getMetadataStore().put(MetadataStore.CLUSTER_KEY, currentCluster); // Clear all the rebalancing state server.getMetadataStore().cleanAllRebalancingState(); @@ -1343,7 +1320,7 @@ private void cleanUpAllState() { } private void buildROStore(StoreDefinition storeDef, int numChunks) throws IOException { - Map>> nodeIdToAllPartitions = RebalanceUtils.getNodeIdToAllPartitions(cluster, + Map>> nodeIdToAllPartitions = RebalanceUtils.getNodeIdToAllPartitions(currentCluster, storeDef, true); for(Entry>> entry: nodeIdToAllPartitions.entrySet()) { diff --git a/test/unit/voldemort/client/rebalance/NonZonedRebalanceBatchPlanTest.java b/test/unit/voldemort/client/rebalance/NonZonedRebalanceBatchPlanTest.java index b3736e5941..5bd1abfa41 100644 --- a/test/unit/voldemort/client/rebalance/NonZonedRebalanceBatchPlanTest.java +++ b/test/unit/voldemort/client/rebalance/NonZonedRebalanceBatchPlanTest.java @@ -57,7 +57,7 @@ public class NonZonedRebalanceBatchPlanTest { private static String storeDefFile = "test/common/voldemort/config/stores.xml"; private Cluster currentCluster; - private Cluster targetCluster; + private Cluster finalCluster; private List storeDefList; private List storeDefList2; @@ -88,10 +88,10 @@ public void setUp() { public void testInsufficientNodes() { currentCluster = ServerTestUtils.getLocalCluster(3, new int[][] { { 0 }, { 1 }, { 2 } }); - targetCluster = ServerTestUtils.getLocalCluster(2, new int[][] { { 1 }, { 0 }, { 2 } }); + finalCluster = ServerTestUtils.getLocalCluster(2, new int[][] { { 1 }, { 0 }, { 2 } }); try { - new RebalanceBatchPlan(currentCluster, targetCluster, storeDefList); + new RebalanceBatchPlan(currentCluster, finalCluster, storeDefList); fail("Should have thrown an exception since the migration should result in decrease in replication factor"); } catch(VoldemortException e) {} @@ -107,11 +107,11 @@ public void testShuffleNoop() { currentCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 1, 2, 3 }, { 4, 5, 6, 7 } }); - targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { + finalCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 1, 2, 3 }, { 4, 5, 6, 7, 0 } }); List batchPlan = ClusterTestUtils.getBatchPlan(currentCluster, - targetCluster, + finalCluster, test211StoreDef); assertTrue("Batch plan should be empty.", batchPlan.isEmpty()); @@ -128,11 +128,11 @@ public void testClusterExpansion() { currentCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 1, 2, 3 }, { 4, 5, 6, 7 }, {} }); - targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { + finalCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 1, 2, 3 }, { 4, 5, 6, 7 }, { 0 } }); List batchPlan = ClusterTestUtils.getBatchPlan(currentCluster, - targetCluster, + finalCluster, test211StoreDef); // data should only move from node 0 to node 2 for node 2 to host // everything needed. no other movement should occur. @@ -169,11 +169,11 @@ public void testDeleteLastNode() { currentCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 3, 6, 9, 12, 15 }, { 1, 4, 7, 10, 13, 16 }, { 2, 5, 8, 11, 14, 17 }, { 0 } }); - targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { + finalCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 3, 6, 9, 12, 15 }, { 1, 4, 7, 10, 13, 16 }, { 2, 5, 8, 11, 14, 17 }, {} }); List orderedRebalancePartitionInfoList = ClusterTestUtils.getBatchPlan(currentCluster, - targetCluster, + finalCluster, storeDefList2); assertEquals("There should have exactly 1 rebalancing node", 1, @@ -213,12 +213,12 @@ public void testDeleteFirstNode() { currentCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 4 }, { 1, 5 }, { 2, 6 }, { 3, 7 } }); - targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 4 }, + finalCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 4 }, { 0, 1, 5 }, { 2, 6 }, { 3, 7 } }); // PHASE 1 - move partition 0 off of node 0 to node 1 List batchPlan = ClusterTestUtils.getBatchPlan(currentCluster, - targetCluster, + finalCluster, storeDefList2); assertFalse("Batch plan should not be empty.", batchPlan.isEmpty()); @@ -230,10 +230,10 @@ public void testDeleteFirstNode() { currentCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 4 }, { 0, 1, 5 }, { 2 }, { 3, 6, 7 } }); - targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { {}, + finalCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { {}, { 0, 1, 5 }, { 4, 2 }, { 3, 6, 7 } }); - batchPlan = ClusterTestUtils.getBatchPlan(currentCluster, targetCluster, storeDefList2); + batchPlan = ClusterTestUtils.getBatchPlan(currentCluster, finalCluster, storeDefList2); assertFalse("Batch plan should not be empty.", batchPlan.isEmpty()); assertFalse("Batch plan for server 2 should not be empty.", @@ -260,11 +260,11 @@ public void testRebalanceDeletingMiddleNode() { currentCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 4 }, { 1, 5 }, { 2, 6 }, { 3, 7 } }); - targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 4 }, + finalCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 4 }, { 2, 1, 5 }, { 6 }, { 3, 7 } }); List batchPlan = ClusterTestUtils.getBatchPlan(currentCluster, - targetCluster, + finalCluster, storeDefList2); assertFalse("Batch plan should not be empty.", batchPlan.isEmpty()); @@ -286,10 +286,10 @@ public void testRebalanceDeletingMiddleNode() { currentCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 4 }, { 2, 1, 5 }, { 6 }, { 3, 7 } }); - targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 4 }, + finalCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 4 }, { 2, 1, 5 }, {}, { 6, 3, 7 } }); - batchPlan = ClusterTestUtils.getBatchPlan(currentCluster, targetCluster, storeDefList2); + batchPlan = ClusterTestUtils.getBatchPlan(currentCluster, finalCluster, storeDefList2); assertFalse("Batch plan should not be empty.", batchPlan.isEmpty()); @@ -306,11 +306,11 @@ public void testManyStoreClusterExpansion() { currentCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 1, 2, 3 }, { 4, 5, 6 }, { 7, 8, 9 }, {} }); - targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { + finalCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 2, 3 }, { 4, 6 }, { 7, 8, 9 }, { 1, 5 } }); List batchPlan = ClusterTestUtils.getBatchPlan(currentCluster, - targetCluster, + finalCluster, storeDefList); assertFalse("Batch plan should not be empty.", batchPlan.isEmpty()); @@ -355,11 +355,11 @@ public void testRebalanceAllReplicasBeingMigrated() { currentCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 4 }, { 2, 3 }, { 1, 5 }, {} }); - targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 4 }, + finalCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 4 }, { 2, 3 }, { 1, 5 }, { 0 } }); List orderedRebalancePartitionInfoList = ClusterTestUtils.getBatchPlan(currentCluster, - targetCluster, + finalCluster, storeDefList2); assertEquals("There should have exactly 1 rebalancing node", diff --git a/test/unit/voldemort/client/rebalance/RebalanceMetadataConsistencyTest.java b/test/unit/voldemort/client/rebalance/RebalanceMetadataConsistencyTest.java index 44fe387282..486798061b 100644 --- a/test/unit/voldemort/client/rebalance/RebalanceMetadataConsistencyTest.java +++ b/test/unit/voldemort/client/rebalance/RebalanceMetadataConsistencyTest.java @@ -36,7 +36,7 @@ public class RebalanceMetadataConsistencyTest { private MetadataStore metadataStore; private Cluster currentCluster; - private Cluster targetCluster; + private Cluster finalCluster; protected static String testStoreNameRW = "test"; protected static String testStoreNameRW2 = "test2"; @@ -78,7 +78,7 @@ public void setUp() { currentCluster = ServerTestUtils.getLocalCluster(3, new int[][] { { 0 }, { 1, 3 }, { 2 } }); - targetCluster = ServerTestUtils.getLocalCluster(3, + finalCluster = ServerTestUtils.getLocalCluster(3, new int[][] { { 0 }, { 1 }, { 2 }, { 3 } }); rwStoreDefWithReplication = new StoreDefinitionBuilder().setName(testStoreNameRW) @@ -131,7 +131,7 @@ public void testThreading() { storeDef = rwStoreDefWithReplication; } else { - cluster = targetCluster; + cluster = finalCluster; storeDef = rwStoreDefWithReplication2; } @@ -190,7 +190,7 @@ public void run() { if(checkCluster.equals(currentCluster)) { Assert.assertEquals(checkstores.get(0), rwStoreDefWithReplication); } - if(checkCluster.equals(targetCluster)) { + if(checkCluster.equals(finalCluster)) { Assert.assertEquals(checkstores.get(0), rwStoreDefWithReplication2); } } diff --git a/test/unit/voldemort/client/rebalance/RebalancePlanTest.java b/test/unit/voldemort/client/rebalance/RebalancePlanTest.java index 9564d976f7..dc08159fb3 100644 --- a/test/unit/voldemort/client/rebalance/RebalancePlanTest.java +++ b/test/unit/voldemort/client/rebalance/RebalancePlanTest.java @@ -38,6 +38,7 @@ public class RebalancePlanTest { static Cluster zzzCurrent; static Cluster zzzShuffle; static Cluster zzzClusterExpansion; + static Cluster zzeZoneExpansion; static Cluster zzzZoneExpansion; static List zzzStores; @@ -52,6 +53,7 @@ public static void setup() { zzzShuffle = ClusterTestUtils.getZZZClusterWithSwappedPartitions(); zzzClusterExpansion = ClusterTestUtils.getZZZClusterWithPPP(); + zzeZoneExpansion = ClusterTestUtils.getZZECluster(); zzzZoneExpansion = ClusterTestUtils.getZZEClusterXXP(); zzzStores = ClusterTestUtils.getZZZStoreDefsBDB(); } @@ -129,7 +131,10 @@ public void testClusterExpansion() { assertTrue(zoneMoves.get(1, 1) > 0); // Three zones - rebalancePlan = ClusterTestUtils.makePlan(zzzCurrent, zzzStores, zzzClusterExpansion, zzzStores); + rebalancePlan = ClusterTestUtils.makePlan(zzzCurrent, + zzzStores, + zzzClusterExpansion, + zzzStores); assertEquals(rebalancePlan.getPlan().size(), 1); assertTrue(rebalancePlan.getPrimariesMoved() > 0); assertTrue(rebalancePlan.getPartitionStoresMoved() > 0); @@ -151,6 +156,7 @@ public void testClusterExpansion() { public void testZoneExpansion() { RebalancePlan rebalancePlan; + // This tests currentCluster to finalCluster rebalancePlan = ClusterTestUtils.makePlan(zzCurrent, zzStores, zzzZoneExpansion, zzzStores); assertEquals(rebalancePlan.getPlan().size(), 1); assertTrue(rebalancePlan.getPrimariesMoved() > 0); @@ -168,5 +174,27 @@ public void testZoneExpansion() { assertTrue(zoneMoves.get(2, 0) == 0); assertTrue(zoneMoves.get(2, 1) == 0); assertTrue(zoneMoves.get(2, 2) == 0); + + // This tests interimCluster to finalCluster + rebalancePlan = ClusterTestUtils.makePlan(zzeZoneExpansion, + zzzStores, + zzzZoneExpansion, + zzzStores); + assertEquals(rebalancePlan.getPlan().size(), 1); + assertTrue(rebalancePlan.getPrimariesMoved() > 0); + assertTrue(rebalancePlan.getPartitionStoresMoved() > 0); + assertTrue(rebalancePlan.getPartitionStoresMovedXZone() > 0); + + // zone id 2 is the new zone. + zoneMoves = rebalancePlan.getZoneMoveMap(); + assertTrue(zoneMoves.get(0, 0) > 0); + assertTrue(zoneMoves.get(0, 1) == 0); + assertTrue(zoneMoves.get(0, 2) > 0); + assertTrue(zoneMoves.get(1, 0) == 0); + assertTrue(zoneMoves.get(1, 1) > 0); + assertTrue(zoneMoves.get(1, 2) > 0); + assertTrue(zoneMoves.get(2, 0) == 0); + assertTrue(zoneMoves.get(2, 1) == 0); + assertTrue(zoneMoves.get(2, 2) == 0); } } diff --git a/test/unit/voldemort/tools/RepartitionerTest.java b/test/unit/voldemort/tools/RepartitionerTest.java index 4474a401c4..0b10ccae83 100644 --- a/test/unit/voldemort/tools/RepartitionerTest.java +++ b/test/unit/voldemort/tools/RepartitionerTest.java @@ -86,13 +86,13 @@ public boolean verifyZonesBalanced(Cluster cluster) { * * @param currentCluster * @param currentStores - * @param targetCluster - * @param targetStores + * @param interimCluster + * @param finalStores */ public void verifyBalanceZoneAndNode(Cluster currentCluster, List currentStores, - Cluster targetCluster, - List targetStores) { + Cluster interimCluster, + List finalStores) { // Confirm current cluster is imbalanced on all fronts: assertFalse(verifyNodesBalancedInEachZone(currentCluster)); assertFalse(verifyZonesBalanced(currentCluster)); @@ -103,8 +103,8 @@ public void verifyBalanceZoneAndNode(Cluster currentCluster, boolean disableZoneBalancing = false; Cluster repartitionedCluster = Repartitioner.repartition(currentCluster, currentStores, - targetCluster, - targetStores, + interimCluster, + finalStores, null, 1, disableNodeBalancing, @@ -128,13 +128,13 @@ public void verifyBalanceZoneAndNode(Cluster currentCluster, * * @param currentCluster * @param currentStores - * @param targetCluster - * @param targetStores + * @param interimCluster + * @param finalStores */ public void verifyBalanceNodesNotZones(Cluster currentCluster, List currentStores, - Cluster targetCluster, - List targetStores) { + Cluster interimCluster, + List finalStores) { // Confirm current cluster is imbalanced on all fronts: assertFalse(verifyNodesBalancedInEachZone(currentCluster)); @@ -147,8 +147,8 @@ public void verifyBalanceNodesNotZones(Cluster currentCluster, Cluster repartitionedCluster = Repartitioner.repartition(currentCluster, currentStores, - targetCluster, - targetStores, + interimCluster, + finalStores, null, 1, disableNodeBalancing, @@ -168,28 +168,28 @@ public void verifyBalanceNodesNotZones(Cluster currentCluster, /** * Verify the "no op" path through repartition method does not change the - * target cluster. + * interim cluster. * * @param currentCluster * @param currentStores - * @param targetCluster - * @param targetStores + * @param interimCluster + * @param finalStores */ public void verifyRepartitionNoop(Cluster currentCluster, List currentStores, - Cluster targetCluster, - List targetStores) { + Cluster interimCluster, + List finalStores) { // Confirm current cluster is imbalanced on all fronts: assertFalse(verifyNodesBalancedInEachZone(currentCluster)); assertFalse(verifyZonesBalanced(currentCluster)); - // Confirm noop rebalance has no effect on target cluster + // Confirm noop rebalance has no effect on interim cluster boolean disableNodeBalancing = true; boolean disableZoneBalancing = true; Cluster repartitionedCluster = Repartitioner.repartition(currentCluster, currentStores, - targetCluster, - targetStores, + interimCluster, + finalStores, null, 1, disableNodeBalancing, @@ -203,7 +203,7 @@ public void verifyRepartitionNoop(Cluster currentCluster, 0, 0, 0); - assertTrue(repartitionedCluster.equals(targetCluster)); + assertTrue(repartitionedCluster.equals(interimCluster)); } /** @@ -385,19 +385,19 @@ public void testShuffle() { public void testClusterExpansion() { // Two zone cluster Cluster currentCluster = ClusterTestUtils.getZZCluster(); - Cluster targetCluster = ClusterTestUtils.getZZClusterWithNN(); + Cluster interimCluster = ClusterTestUtils.getZZClusterWithNN(); List storeDefs = ClusterTestUtils.getZZStoreDefsInMemory(); - verifyBalanceZoneAndNode(currentCluster, storeDefs, targetCluster, storeDefs); - verifyBalanceNodesNotZones(currentCluster, storeDefs, targetCluster, storeDefs); - verifyRepartitionNoop(currentCluster, storeDefs, targetCluster, storeDefs); + verifyBalanceZoneAndNode(currentCluster, storeDefs, interimCluster, storeDefs); + verifyBalanceNodesNotZones(currentCluster, storeDefs, interimCluster, storeDefs); + verifyRepartitionNoop(currentCluster, storeDefs, interimCluster, storeDefs); // Three zone cluster currentCluster = ClusterTestUtils.getZZZCluster(); - targetCluster = ClusterTestUtils.getZZZClusterWithNNN(); + interimCluster = ClusterTestUtils.getZZZClusterWithNNN(); storeDefs = ClusterTestUtils.getZZZStoreDefsInMemory(); - verifyBalanceZoneAndNode(currentCluster, storeDefs, targetCluster, storeDefs); - verifyBalanceNodesNotZones(currentCluster, storeDefs, targetCluster, storeDefs); - verifyRepartitionNoop(currentCluster, storeDefs, targetCluster, storeDefs); + verifyBalanceZoneAndNode(currentCluster, storeDefs, interimCluster, storeDefs); + verifyBalanceNodesNotZones(currentCluster, storeDefs, interimCluster, storeDefs); + verifyRepartitionNoop(currentCluster, storeDefs, interimCluster, storeDefs); } @Test @@ -405,12 +405,12 @@ public void testZoneExpansion() { Cluster currentCluster = ClusterTestUtils.getZZECluster(); List currentStoreDefs = ClusterTestUtils.getZZZStoreDefsInMemory(); - Cluster targetCluster = ClusterTestUtils.getZZZClusterWithNNN(); - List targetStoreDefs = ClusterTestUtils.getZZZStoreDefsInMemory(); + Cluster interimCluster = ClusterTestUtils.getZZZClusterWithNNN(); + List finalStoreDefs = ClusterTestUtils.getZZZStoreDefsInMemory(); - verifyBalanceZoneAndNode(currentCluster, currentStoreDefs, targetCluster, targetStoreDefs); + verifyBalanceZoneAndNode(currentCluster, currentStoreDefs, interimCluster, finalStoreDefs); // verifyBalanceNodesNotZones does not make sense for zone expansion. - verifyRepartitionNoop(currentCluster, currentStoreDefs, targetCluster, targetStoreDefs); + verifyRepartitionNoop(currentCluster, currentStoreDefs, interimCluster, finalStoreDefs); } /** diff --git a/test/unit/voldemort/utils/RebalanceUtilsTest.java b/test/unit/voldemort/utils/RebalanceUtilsTest.java index 43d56d0222..7df269dfe2 100644 --- a/test/unit/voldemort/utils/RebalanceUtilsTest.java +++ b/test/unit/voldemort/utils/RebalanceUtilsTest.java @@ -40,11 +40,11 @@ public void testUpdateCluster() { Cluster currentCluster = ServerTestUtils.getLocalCluster(2, new int[][] { { 0, 1, 2, 3, 4, 5, 6, 7, 8 }, {} }); - Cluster targetCluster = ServerTestUtils.getLocalCluster(2, new int[][] { + Cluster finalCluster = ServerTestUtils.getLocalCluster(2, new int[][] { { 0, 1, 4, 5, 6, 7, 8 }, { 2, 3 } }); Cluster updatedCluster = RebalanceUtils.updateCluster(currentCluster, - new ArrayList(targetCluster.getNodes())); - assertEquals("updated cluster should match targetCluster", updatedCluster, targetCluster); + new ArrayList(finalCluster.getNodes())); + assertEquals("updated cluster should match finalCluster", updatedCluster, finalCluster); } @Test @@ -89,30 +89,30 @@ public void testGetClusterWithNewNodes() { } private void doClusterTransformationBase(Cluster currentC, - Cluster targetC, + Cluster interimC, Cluster finalC, boolean verify) { - Cluster derivedTarget1 = RebalanceUtils.getClusterWithNewNodes(currentC, targetC); + Cluster derivedInterim1 = RebalanceUtils.getClusterWithNewNodes(currentC, interimC); if(verify) - assertEquals(targetC, derivedTarget1); + assertEquals(interimC, derivedInterim1); - Cluster derivedTarget2 = RebalanceUtils.getInterimCluster(currentC, finalC); + Cluster derivedInterim2 = RebalanceUtils.getInterimCluster(currentC, finalC); if(verify) - assertEquals(targetC, derivedTarget2); + assertEquals(interimC, derivedInterim2); RebalanceUtils.validateCurrentFinalCluster(currentC, finalC); - RebalanceUtils.validateCurrentInterimCluster(currentC, targetC); - RebalanceUtils.validateInterimFinalCluster(targetC, finalC); + RebalanceUtils.validateCurrentInterimCluster(currentC, interimC); + RebalanceUtils.validateInterimFinalCluster(interimC, finalC); } - private void doClusterTransformation(Cluster currentC, Cluster targetC, Cluster finalC) { - doClusterTransformationBase(currentC, targetC, finalC, false); + private void doClusterTransformation(Cluster currentC, Cluster interimC, Cluster finalC) { + doClusterTransformationBase(currentC, interimC, finalC, false); } public void doClusterTransformationAndVerification(Cluster currentC, - Cluster targetC, + Cluster interimC, Cluster finalC) { - doClusterTransformationBase(currentC, targetC, finalC, true); + doClusterTransformationBase(currentC, interimC, finalC, true); } @Test @@ -147,22 +147,16 @@ public void testClusterTransformationAndVerification() { ClusterTestUtils.getZZZClusterWithNNN(), ClusterTestUtils.getZZZClusterWithPPP()); - // TODO: Fix this test to pass. This test currently fails because the - // method RebalanceUtils.getClusterWithNewNodes cannot handle a new zone - // coming into existence between currentCluster & targetCluster. - // Two- to Three-zone clusters: zone expansion - /*- doClusterTransformationAndVerification(ClusterTestUtils.getZZCluster(), ClusterTestUtils.getZZECluster(), ClusterTestUtils.getZZEClusterXXP()); - */ } @Test public void testClusterTransformationAndVerificationExceptions() { boolean excepted; - // Two-zone cluster: rebalance with extra partitions in target + // Two-zone cluster: rebalance with extra partitions in interim cluster excepted = false; try { doClusterTransformation(ClusterTestUtils.getZZCluster(), @@ -184,7 +178,7 @@ public void testClusterTransformationAndVerificationExceptions() { } assertTrue(excepted); - // Two-zone cluster: node ids swapped in target + // Two-zone cluster: node ids swapped in interim cluster excepted = false; try { doClusterTransformation(ClusterTestUtils.getZZCluster(),