diff --git a/src/java/voldemort/client/rebalance/RebalanceBatchPlan.java b/src/java/voldemort/client/rebalance/RebalanceBatchPlan.java index e985f07abd..70bb1453c7 100644 --- a/src/java/voldemort/client/rebalance/RebalanceBatchPlan.java +++ b/src/java/voldemort/client/rebalance/RebalanceBatchPlan.java @@ -72,7 +72,7 @@ public RebalanceBatchPlan(final Cluster targetCluster, RebalanceUtils.validateClusterStores(targetCluster, storeDefs); RebalanceUtils.validateClusterStores(finalCluster, storeDefs); - this.batchPlan = batchPlan(); + this.batchPlan = constructBatchPlan(); } @@ -202,23 +202,16 @@ public List buildRebalancePartitionsInfos() { * * 1) A stealer node does not steal any partition-stores it already hosts. * - * 2) If possible, a stealer node that is the n-ary zone replica in the - * finalCluster steals from the n-ary zone replica in the targetCluster in - * the same zone. + * 2) Use current policy to decide which node to steal from: see getDonorId + * method. * - * 3) If there are no partitoin-stores to steal in the same zone (i.e., this - * is the "zone expansion" use case), then the stealer node that is the - * n-ary zone replica in the finalCluster determines which pre-existing zone - * in the targetCluster hosts the primary partitionId for the - * partition-store and steals the n-ary zone replica from that zone. - * - * In summary, this batch plan avoids all unnecessary cross zone moves, + * Currently, this batch plan avoids all unnecessary cross zone moves, * distributes cross zone moves into new zones evenly across existing zones, * and copies replicaFactor partition-stores into any new zone. * * @return the batch plan */ - private List batchPlan() { + private List constructBatchPlan() { // Construct all store routing plans once. HashMap targetStoreRoutingPlans = new HashMap(); HashMap finalStoreRoutingPlans = new HashMap(); @@ -239,36 +232,26 @@ private List batchPlan() { for(StoreDefinition storeDef: storeDefs) { StoreRoutingPlan targetSRP = targetStoreRoutingPlans.get(storeDef.getName()); StoreRoutingPlan finalSRP = finalStoreRoutingPlans.get(storeDef.getName()); - for(int stealerPartitionId: finalSRP.getNaryPartitionIds(stealerNodeId)) { - // ... and all nary partition-stores - // steal what is needed! + for(int stealerPartitionId: finalSRP.getZoneNAryPartitionIds(stealerNodeId)) { + // ... and all nary partition-stores, + // now steal what is needed - // Do not steal a partition-store you host + // Optimization: Do not steal a partition-store you already + // host! if(targetSRP.getReplicationNodeList(stealerPartitionId).contains(stealerNodeId)) { continue; } - int stealerZoneReplicaType = finalSRP.getZoneReplicaType(stealerZoneId, - stealerNodeId, - stealerPartitionId); - - int donorZoneId; - if(targetSRP.hasZoneReplicaType(stealerZoneId, stealerPartitionId)) { - // Steal from local n-ary (since one exists). - donorZoneId = stealerZoneId; - } else { - // Steal from zone that hosts primary partition Id. - // TODO: Add option to steal from specific - // donorZoneId. - int targetMasterNodeId = targetSRP.getNodeIdForPartitionId(stealerPartitionId); - donorZoneId = targetCluster.getNodeById(targetMasterNodeId).getZoneId(); - } + // Determine which node to steal from. + int donorNodeId = getDonorId(targetSRP, + finalSRP, + stealerZoneId, + stealerNodeId, + stealerPartitionId); - int donorNodeId = targetSRP.getZoneReplicaNodeId(donorZoneId, - stealerZoneReplicaType, - stealerPartitionId); + // Add this specific partition-store steal to the overall + // plan int donorReplicaType = targetSRP.getReplicaType(donorNodeId, stealerPartitionId); - rpiBuilder.addPartitionStoreMove(stealerNodeId, donorNodeId, storeDef.getName(), @@ -281,6 +264,77 @@ private List batchPlan() { return rpiBuilder.buildRebalancePartitionsInfos(); } + /** + * Decide which donor node to steal from. This is a policy implementation. + * I.e., in the future, additional policies could be considered. At that + * time, this method should be overridden in a sub-class, or a policy object + * ought to implement this algorithm. + * + * Current policy: + * + * 1) If possible, a stealer node that is the zone n-ary in the finalCluster + * steals from the zone n-ary in the targetCluster in the same zone. + * + * 2) If there are no partition-stores to steal in the same zone (i.e., this + * is the "zone expansion" use case), then a differnt policy must be used. + * The stealer node that is the zone n-ary in the finalCluster determines + * which pre-existing zone in the targetCluster hosts the primary partition + * id for the partition-store. The stealer then steals the zone n-ary from + * that pre-existing zone. + * + * This policy avoids unnecessary cross-zone moves and distributes the load + * of cross-zone moves approximately-uniformly across pre-existing zones. + * + * Other policies to consider: + * + * - For zone expansion, steal all partition-stores from one specific + * pre-existing zone. + * + * - Replace heuristic to approximately uniformly distribute load among + * existing zones to something more concrete (i.e. track steals from each + * pre-existing zone and forcibly balance them). + * + * - Select a single donor for all replicas in a new zone. This will require + * donor-based rebalancing to be run (at least for this specific part of the + * plan). This would reduce the number of donor-side scans of data. (But + * still send replication factor copies over the WAN.) This would require + * apparatus in the RebalanceController to work. + * + * - Set up some sort of chain-replication in which a single stealer in the + * new zone steals some replica from a pre-exising zone, and then other + * n-aries in the new zone steal from the single cross-zone stealer in the + * zone. This would require apparatus in the RebalanceController to work. + * + * @param targetSRP + * @param finalSRP + * @param stealerZoneId + * @param stealerNodeId + * @param stealerPartitionId + * @return the node id of the donor for this partition Id. + */ + protected int getDonorId(StoreRoutingPlan targetSRP, + StoreRoutingPlan finalSRP, + int stealerZoneId, + int stealerNodeId, + int stealerPartitionId) { + + int donorZoneId; + if(targetSRP.zoneHasReplica(stealerZoneId, stealerPartitionId)) { + // Steal from local n-ary (since one exists). + donorZoneId = stealerZoneId; + } else { + // Steal from zone that hosts primary partition Id. + int targetMasterNodeId = targetSRP.getNodeIdForPartitionId(stealerPartitionId); + donorZoneId = targetCluster.getNodeById(targetMasterNodeId).getZoneId(); + } + + int stealerZoneNAry = finalSRP.getZoneNaryForNodesPartition(stealerZoneId, + stealerNodeId, + stealerPartitionId); + return targetSRP.getNodeIdForZoneNary(donorZoneId, stealerZoneNAry, stealerPartitionId); + + } + @Override public String toString() { if(batchPlan == null || batchPlan.isEmpty()) { diff --git a/src/java/voldemort/client/rebalance/RebalanceController.java b/src/java/voldemort/client/rebalance/RebalanceController.java index 7dafa74efe..6ae971f86c 100644 --- a/src/java/voldemort/client/rebalance/RebalanceController.java +++ b/src/java/voldemort/client/rebalance/RebalanceController.java @@ -184,7 +184,7 @@ private void validateCluster(Cluster finalCluster, List finalSt // Reset the cluster that the admin client points at adminClient.setAdminClientCluster(finalCluster); // Validate that all the nodes ( new + old ) are in normal state - RebalanceUtils.validateProdClusterStateIsNormal(finalCluster, adminClient); + RebalanceUtils.checkEachServerInNormalState(finalCluster, adminClient); // Verify all old RO stores exist at version 2 RebalanceUtils.validateReadOnlyStores(finalCluster, finalStores, adminClient); } @@ -643,11 +643,6 @@ private List executeTasks(final int taskId, HashMap> donorNodeBasedPartitionsInfo = RebalanceUtils.groupPartitionsInfoByNode(rebalancePartitionPlanList, false); for(Entry> entries: donorNodeBasedPartitionsInfo.entrySet()) { - // At some point, a 10 second sleep was added here to help with - // a race condition. Leaving this comment here in case, at some - // point in the future, we need to hack around some race - // condition: - // Thread.sleep(10000); DonorBasedRebalanceTask rebalanceTask = new DonorBasedRebalanceTask(taskId, entries.getValue(), rebalancingClientTimeoutSeconds, diff --git a/src/java/voldemort/client/rebalance/RebalancePlan.java b/src/java/voldemort/client/rebalance/RebalancePlan.java index 79ecf45615..8a44db5de7 100644 --- a/src/java/voldemort/client/rebalance/RebalancePlan.java +++ b/src/java/voldemort/client/rebalance/RebalancePlan.java @@ -127,7 +127,7 @@ public RebalancePlan(final Cluster currentCluster, /** * Create a plan. The plan consists of batches. Each batch involves the - * movement of nor more than batchSize primary partitions. The movement of a + * movement of no more than batchSize primary partitions. The movement of a * single primary partition may require migration of other n-ary replicas, * and potentially deletions. Migrating a primary or n-ary partition * requires migrating one partition-store for every store hosted at that @@ -138,9 +138,6 @@ private void plan() { // Mapping of stealer node to list of primary partitions being moved final TreeMultimap stealerToStolenPrimaryPartitions = TreeMultimap.create(); - // Used for creating clones - ClusterMapper mapper = new ClusterMapper(); - // Output initial and final cluster if(outputDir != null) RebalanceUtils.dumpClusters(targetCluster, finalCluster, outputDir); @@ -159,11 +156,11 @@ private void plan() { // Determine plan batch-by-batch int batches = 0; - Cluster batchTargetCluster = mapper.readCluster(new StringReader(mapper.writeCluster(targetCluster))); + Cluster batchTargetCluster = cloneCluster(targetCluster); while(!stealerToStolenPrimaryPartitions.isEmpty()) { // Generate a batch partitions to move - Cluster batchFinalCluster = mapper.readCluster(new StringReader(mapper.writeCluster(batchTargetCluster))); + Cluster batchFinalCluster = cloneCluster(batchTargetCluster); int partitions = 0; List> partitionsMoved = Lists.newArrayList(); for(Entry stealerToPartition: stealerToStolenPrimaryPartitions.entries()) { @@ -202,12 +199,24 @@ private void plan() { zoneMoveMap.add(RebalanceBatchPlan.getZoneMoveMap()); batches++; - batchTargetCluster = mapper.readCluster(new StringReader(mapper.writeCluster(batchFinalCluster))); + batchTargetCluster = cloneCluster(batchFinalCluster); } logger.info(this); } + /** + * In the absence of a Cluster.clone() operation, hack a clone method for + * local use. + * + * @param cluster + * @return clone of Cluster cluster. + */ + private Cluster cloneCluster(Cluster cluster) { + ClusterMapper mapper = new ClusterMapper(); + return mapper.readCluster(new StringReader(mapper.writeCluster(cluster))); + } + /** * Determines storage overhead and returns pretty printed summary. * diff --git a/src/java/voldemort/client/rebalance/task/DonorBasedRebalanceTask.java b/src/java/voldemort/client/rebalance/task/DonorBasedRebalanceTask.java index c24027f07d..02f2fbd795 100644 --- a/src/java/voldemort/client/rebalance/task/DonorBasedRebalanceTask.java +++ b/src/java/voldemort/client/rebalance/task/DonorBasedRebalanceTask.java @@ -33,6 +33,7 @@ public DonorBasedRebalanceTask(final int taskId, this.donorNodeId = stealInfos.get(0).getDonorId(); } + @Override public void run() { int rebalanceAsyncId = INVALID_REBALANCE_ID; diff --git a/src/java/voldemort/cluster/Cluster.java b/src/java/voldemort/cluster/Cluster.java index 4fcc84d4db..12d0718ddd 100644 --- a/src/java/voldemort/cluster/Cluster.java +++ b/src/java/voldemort/cluster/Cluster.java @@ -259,6 +259,10 @@ public String toString(boolean isDetailed) { return builder.toString(); } + // TODO: Add a .clone() implementation. See hacked method in + // RebalancePlan.cloneCluster for example of current approach to cloning + // (use ClusterMapper to serde via XML...) + @Override public boolean equals(Object second) { if(this == second) diff --git a/src/java/voldemort/routing/StoreRoutingPlan.java b/src/java/voldemort/routing/StoreRoutingPlan.java index efb9b7aa93..f166c49b2e 100644 --- a/src/java/voldemort/routing/StoreRoutingPlan.java +++ b/src/java/voldemort/routing/StoreRoutingPlan.java @@ -67,7 +67,7 @@ public StoreRoutingPlan(Cluster cluster, StoreDefinition storeDefinition) { int naryNodeId = getNodeIdForPartitionId(naryPartitionId); nodeIdToNaryPartitionMap.get(naryNodeId).add(masterPartitionId); int naryZoneId = cluster.getNodeById(naryNodeId).getZoneId(); - if(getZoneReplicaType(naryZoneId, naryNodeId, naryPartitionId) == 0) { + if(getZoneNaryForNodesPartition(naryZoneId, naryNodeId, naryPartitionId) == 0) { nodeIdToZonePrimaryMap.get(naryNodeId).add(masterPartitionId); } } @@ -93,18 +93,23 @@ public List getReplicatingPartitionList(int masterPartitionId) { } /** + * Returns all (zone n-ary) partition IDs hosted on the node. * * @param nodeId - * @return all nary partition IDs hosted on the node. + * @return all zone n-ary partition IDs hosted on the node in an unordered + * list. */ - public List getNaryPartitionIds(int nodeId) { + public List getZoneNAryPartitionIds(int nodeId) { return nodeIdToNaryPartitionMap.get(nodeId); } /** + * Returns all zone-primary partition IDs on node. A zone-primary means zone + * n-ary==0. Zone-primary nodes are generally pseudo-masters in the zone and + * receive get traffic for some partition Id. * * @param nodeId - * @return all nary partition IDs hosted on the node. + * @return all primary partition IDs (zone n-ary == 0) hosted on the node. */ public List getZonePrimaryPartitionIds(int nodeId) { return nodeIdToZonePrimaryMap.get(nodeId); @@ -220,9 +225,10 @@ public List getReplicationNodeList(int partitionId) throws VoldemortExc * @param zoneId * @param nodeId * @param key - * @return + * @return zone n-ary level for key hosted on node id in zone id. */ - public int getZoneReplicaType(int zoneId, int nodeId, byte[] key) { + // TODO: add unit test. + public int getZoneNAry(int zoneId, int nodeId, byte[] key) { if(cluster.getNodeById(nodeId).getZoneId() != zoneId) { throw new VoldemortException("Node " + nodeId + " is not in zone " + zoneId + "! The node is in zone " @@ -230,19 +236,19 @@ public int getZoneReplicaType(int zoneId, int nodeId, byte[] key) { } List replicatingNodes = this.routingStrategy.routeRequest(key); - int zoneReplicaType = -1; + int zoneNAry = -1; for(Node node: replicatingNodes) { // bump up the replica number once you encounter a node in the given // zone if(node.getZoneId() == zoneId) { - zoneReplicaType++; + zoneNAry++; } // we are done when we find the given node if(node.getId() == nodeId) { - return zoneReplicaType; + return zoneNAry; } } - if(zoneReplicaType > -1) { + if(zoneNAry > -1) { throw new VoldemortException("Node " + nodeId + " not a replica for the key " + ByteUtils.toHexString(key) + " in given zone " + zoneId); } else { @@ -251,15 +257,19 @@ public int getZoneReplicaType(int zoneId, int nodeId, byte[] key) { } } - // TODO: After other rebalancing code is cleaned up, either document and add - // a test, or remove this method. (Unclear if this method is needed once we - // drop replicaType from some key code paths). - public boolean hasZoneReplicaType(int zoneId, int partitionId) { + /** + * checks if zone replicates partition Id. False should only be returned in + * zone expansion use cases. + * + * @param zoneId + * @param partitionId + * @return true iff partitionId is replicated in zone id. + */ + // TODO: add unit test. + public boolean zoneHasReplica(int zoneId, int partitionId) { List replicatingNodeIds = getReplicationNodeList(partitionId); for(int replicatingNodeId: replicatingNodeIds) { Node replicatingNode = cluster.getNodeById(replicatingNodeId); - // bump up the replica number once you encounter a node in the given - // zone if(replicatingNode.getZoneId() == zoneId) { return true; } @@ -267,16 +277,18 @@ public boolean hasZoneReplicaType(int zoneId, int partitionId) { return false; } - // TODO: After other rebalancing code is cleaned up, either document and add - // a test, or remove this method. (Unclear if this method is needed once we - // drop replicaType from some key code paths). /** + * Determines the zone n-ary replica level of the specified partitionId on + * the node id in zone id. * * @param zoneId * @param nodeId * @param partitionId + * @return zone n-ary replica level of the partition id on the node id in + * the zone id (primary == 0, secondary == 1, ...) */ - public int getZoneReplicaType(int zoneId, int nodeId, int partitionId) { + // TODO: add unit test. + public int getZoneNaryForNodesPartition(int zoneId, int nodeId, int partitionId) { if(cluster.getNodeById(nodeId).getZoneId() != zoneId) { throw new VoldemortException("Node " + nodeId + " is not in zone " + zoneId + "! The node is in zone " @@ -284,19 +296,19 @@ public int getZoneReplicaType(int zoneId, int nodeId, int partitionId) { } List replicatingNodeIds = getReplicationNodeList(partitionId); - int zoneReplicaType = -1; + int zoneNAry = -1; for(int replicatingNodeId: replicatingNodeIds) { Node replicatingNode = cluster.getNodeById(replicatingNodeId); // bump up the replica number once you encounter a node in the given // zone if(replicatingNode.getZoneId() == zoneId) { - zoneReplicaType++; + zoneNAry++; } if(replicatingNode.getId() == nodeId) { - return zoneReplicaType; + return zoneNAry; } } - if(zoneReplicaType > 0) { + if(zoneNAry > 0) { throw new VoldemortException("Node " + nodeId + " not a replica for partition " + partitionId + " in given zone " + zoneId); } else { @@ -305,14 +317,15 @@ public int getZoneReplicaType(int zoneId, int nodeId, int partitionId) { } } - // TODO: After other rebalancing code is cleaned up, either document and add - // a test, or remove this method. (Unclear if this method is needed once we - // drop replicaType from some key code paths). /** + * Determines replicaType for partition id on node id. * * @param nodeId * @param partitionId + * @return replicaType of the partition Id on the given node id. */ + // TODO: (replicaType) drop method. + @Deprecated public int getReplicaType(int nodeId, int partitionId) { List replicatingNodeIds = getReplicationNodeList(partitionId); int replicaType = -1; @@ -337,68 +350,76 @@ public int getReplicaType(int nodeId, int partitionId) { * the node that contains the key as the nth replica in the given zone. * * @param zoneId - * @param zoneReplicaType + * @param zoneNary * @param key - * @return + * @return node id that hosts zone n-ary replica for the key */ - public int getZoneReplicaNode(int zoneId, int zoneReplicaType, byte[] key) { + // TODO: add unit test. + public int getNodeIdForZoneNary(int zoneId, int zoneNary, byte[] key) { List replicatingNodes = this.routingStrategy.routeRequest(key); - int zoneReplicaTypeCounter = -1; + int zoneNAry = -1; for(Node node: replicatingNodes) { - // bump up the counter if we encounter a replica in the given zone + // bump up the counter if we encounter a replica in the given zone; + // return current node if counter now matches requested if(node.getZoneId() == zoneId) { - zoneReplicaTypeCounter++; - } - // when the counter matches up with the replicaNumber we need, we - // are done. - if(zoneReplicaTypeCounter == zoneReplicaType) { - return node.getId(); + zoneNAry++; + + if(zoneNAry == zoneNary) { + return node.getId(); + } } } - if(zoneReplicaTypeCounter == -1) { + if(zoneNAry == -1) { throw new VoldemortException("Could not find any replicas for the key " + ByteUtils.toHexString(key) + " in given zone " + zoneId); } else { - throw new VoldemortException("Could not find " + (zoneReplicaType + 1) + throw new VoldemortException("Could not find " + (zoneNary + 1) + " replicas for the key " + ByteUtils.toHexString(key) + " in given zone " + zoneId + ". Only found " - + (zoneReplicaTypeCounter + 1)); + + (zoneNAry + 1)); } } - // TODO: After other rebalancing code is cleaned up, either document and add - // a test, or remove this method. (Unclear if this method is needed once we - // drop replicaType from some key code paths). - public int getZoneReplicaNodeId(int zoneId, int zoneReplicaType, int partitionId) { + /** + * Determines which node hosts partition id with specified n-ary level in + * specified zone. + * + * @param zoneId + * @param zoneNary + * @param partitionId + * @return node ID that hosts zone n-ary replica of partition. + */ + // TODO: add unit test. + public int getNodeIdForZoneNary(int zoneId, int zoneNary, int partitionId) { List replicatingNodeIds = getReplicationNodeList(partitionId); - int zoneReplicaTypeCounter = -1; + int zoneNAry = -1; for(int replicatingNodeId: replicatingNodeIds) { Node replicatingNode = cluster.getNodeById(replicatingNodeId); // bump up the counter if we encounter a replica in the given zone if(replicatingNode.getZoneId() == zoneId) { - zoneReplicaTypeCounter++; + zoneNAry++; } // when the counter matches up with the replicaNumber we need, we // are done. - if(zoneReplicaTypeCounter == zoneReplicaType) { + if(zoneNAry == zoneNary) { return replicatingNode.getId(); } } - if(zoneReplicaTypeCounter == 0) { + if(zoneNAry == 0) { throw new VoldemortException("Could not find any replicas for the partition " + partitionId + " in given zone " + zoneId); } else { - throw new VoldemortException("Could not find " + zoneReplicaType + throw new VoldemortException("Could not find " + zoneNary + " replicas for the partition " + partitionId - + " in given zone " + zoneId + ". Only found " - + zoneReplicaTypeCounter); + + " in given zone " + zoneId + ". Only found " + zoneNAry); } } // TODO: (refactor) Move from static methods to non-static methods that use // this object's cluster and storeDefinition member for the various - // check*BelongsTo* methods. + // check*BelongsTo* methods. Also, tweak internal members to make these + // checks easier/faster. /** * Check that the key belongs to one of the partitions in the map of replica * type to partitions diff --git a/src/java/voldemort/server/rebalance/async/DonorBasedRebalancePusherSlave.java b/src/java/voldemort/server/rebalance/async/DonorBasedRebalancePusherSlave.java index 5f7595a1dd..7d8c9946c5 100644 --- a/src/java/voldemort/server/rebalance/async/DonorBasedRebalancePusherSlave.java +++ b/src/java/voldemort/server/rebalance/async/DonorBasedRebalancePusherSlave.java @@ -53,6 +53,7 @@ public DonorBasedRebalancePusherSlave(int nodeId, nodeIterator = new ResumableIterator>>(); } + @Override public void run() throws VoldemortException { boolean needWait = false; logger.info("DonorBasedRebalancePusherSlave begains to send partitions for store " @@ -79,13 +80,11 @@ public void run() throws VoldemortException { if(needWait) { try { - // sleep for 5 minutes if exception occur while communicate - // with remote node - logger.info("waiting for 5 minutes for the remote node to recover"); - // TODO: Is this sleep really needed? Why? - Thread.sleep(TimeUnit.MINUTES.toMillis(5)); + logger.info("waiting 30 seconds for the remote node to recover..."); + Thread.sleep(TimeUnit.SECONDS.toMillis(30)); needWait = false; } catch(InterruptedException e) { + logger.info("sleep interrupted while waiting for remote node to recover:" + e); // continue } } @@ -120,6 +119,7 @@ class ResumableIterator implements ClosableIterator> currentElem = null; private ArrayList>> tentativeList = Lists.newArrayList(); + @Override public void close() {} public void setRecoveryMode() { @@ -143,6 +143,7 @@ public void reset() { this.currentElem = null; } + @Override public boolean hasNext() { boolean hasNext = false; if(!done) { @@ -173,6 +174,7 @@ public boolean hasNext() { // return the element when one or more is available, blocked // otherwise + @Override public Pair> next() { if(done) { throw new NoSuchElementException(); @@ -227,6 +229,7 @@ private Pair> getNextElem() throws InterruptedExcep return retValue; } + @Override public void remove() { throw new VoldemortException("Remove not supported"); } diff --git a/src/java/voldemort/store/StorageEngine.java b/src/java/voldemort/store/StorageEngine.java index 765d34da55..d48b25f497 100644 --- a/src/java/voldemort/store/StorageEngine.java +++ b/src/java/voldemort/store/StorageEngine.java @@ -97,17 +97,17 @@ public interface StorageEngine extends Store { public void truncate(); /** - * Is the data persistence aware of partitions? In other words is the data - * internally stored on a per partition basis or together + * Are partitions persisted in distinct files? In other words is the data + * stored on disk on a per-partition basis? This is really for the read-only + * use case in which each partition is stored in a distinct file. * - * @return Boolean indicating if the data persistence is partition aware + * @return Boolean indicating if partitions are persisted in distinct files + * (read-only use case). */ public boolean isPartitionAware(); - // TODO: Does "isPartitionScanSupported() == true" imply - // "isPartitionAware() === true"? /** - * Does the storage engine support efficient scanning of a single partition + * Does the storage engine support efficient scanning of a single partition? * * @return true if the storage engine implements the capability. false * otherwise diff --git a/src/java/voldemort/store/bdb/PartitionPrefixedBdbStorageEngine.java b/src/java/voldemort/store/bdb/PartitionPrefixedBdbStorageEngine.java index 5a9f550705..c07a0c0217 100644 --- a/src/java/voldemort/store/bdb/PartitionPrefixedBdbStorageEngine.java +++ b/src/java/voldemort/store/bdb/PartitionPrefixedBdbStorageEngine.java @@ -285,11 +285,6 @@ private boolean fetchNextKey() { } } - @Override - public boolean isPartitionAware() { - return true; - } - @Override public boolean isPartitionScanSupported() { return true; diff --git a/src/java/voldemort/store/rebalancing/RedirectingStore.java b/src/java/voldemort/store/rebalancing/RedirectingStore.java index dcb4cbe084..14b0a10d92 100644 --- a/src/java/voldemort/store/rebalancing/RedirectingStore.java +++ b/src/java/voldemort/store/rebalancing/RedirectingStore.java @@ -482,12 +482,12 @@ private Integer getProxyNode(StoreRoutingPlan currentRoutingPlan, // Use the old store definition to get the routing object StoreRoutingPlan oldRoutingPlan = new StoreRoutingPlan(sourceCluster, sourceStoreDef); // Check the current node's relationship to the key. - int zoneReplicaType = currentRoutingPlan.getZoneReplicaType(zoneId, nodeId, key); + int zoneNAry = currentRoutingPlan.getZoneNAry(zoneId, nodeId, key); // Determine which node held the key with the same relationship in the // old cluster. That is your man! Integer redirectNodeId; try { - redirectNodeId = oldRoutingPlan.getZoneReplicaNode(zoneId, zoneReplicaType, key); + redirectNodeId = oldRoutingPlan.getNodeIdForZoneNary(zoneId, zoneNAry, key); } catch(VoldemortException ve) { /* * If the zone does not exist, as in the case of Zone Expansion, diff --git a/src/java/voldemort/tools/PartitionBalance.java b/src/java/voldemort/tools/PartitionBalance.java index f68194e0e1..5ccd9a93f1 100644 --- a/src/java/voldemort/tools/PartitionBalance.java +++ b/src/java/voldemort/tools/PartitionBalance.java @@ -201,7 +201,7 @@ private Map getNodeIdToNaryCount(Cluster cluster, Map nodeIdToNaryCount = Maps.newHashMap(); for(int nodeId: cluster.getNodeIds()) { - nodeIdToNaryCount.put(nodeId, storeRoutingPlan.getNaryPartitionIds(nodeId).size()); + nodeIdToNaryCount.put(nodeId, storeRoutingPlan.getZoneNAryPartitionIds(nodeId).size()); } return nodeIdToNaryCount; @@ -278,10 +278,10 @@ private String dumpZoneNAryDetails(StoreRoutingPlan storeRoutingPlan) { int zoneId = node.getZoneId(); int nodeId = node.getId(); sb.append("\tNode ID: " + nodeId + " in zone " + zoneId).append(Utils.NEWLINE); - List naries = storeRoutingPlan.getNaryPartitionIds(nodeId); + List naries = storeRoutingPlan.getZoneNAryPartitionIds(nodeId); Map> zoneNaryTypeToPartitionIds = new HashMap>(); for(int nary: naries) { - int zoneReplicaType = storeRoutingPlan.getZoneReplicaType(zoneId, nodeId, nary); + int zoneReplicaType = storeRoutingPlan.getZoneNaryForNodesPartition(zoneId, nodeId, nary); if(!zoneNaryTypeToPartitionIds.containsKey(zoneReplicaType)) { zoneNaryTypeToPartitionIds.put(zoneReplicaType, new ArrayList()); } diff --git a/src/java/voldemort/utils/MoveMap.java b/src/java/voldemort/utils/MoveMap.java index a5fc9b43a1..de39d9e5fd 100644 --- a/src/java/voldemort/utils/MoveMap.java +++ b/src/java/voldemort/utils/MoveMap.java @@ -32,6 +32,9 @@ */ public class MoveMap { + // TODO: (refactor) create a voldemort.utils.rebalance package and move this + // class (and others) to it. + final TreeSet idKeySet; final Map> fromToMoves; diff --git a/src/java/voldemort/utils/RebalanceUtils.java b/src/java/voldemort/utils/RebalanceUtils.java index dd1e53277b..48c327a6dc 100644 --- a/src/java/voldemort/utils/RebalanceUtils.java +++ b/src/java/voldemort/utils/RebalanceUtils.java @@ -45,6 +45,7 @@ import voldemort.client.rebalance.RebalancePartitionsInfo; import voldemort.cluster.Cluster; import voldemort.cluster.Node; +import voldemort.cluster.Zone; import voldemort.routing.RoutingStrategy; import voldemort.routing.RoutingStrategyFactory; import voldemort.routing.StoreRoutingPlan; @@ -121,6 +122,10 @@ public static HashMap> getOptimizedReplicaToPartitionList } + // TODO: (refactor) Either move all methods that take an AdminClient + // somewhere else. Either (i) into a name space of AdminClient or (ii) + // separate utils class. Must wait until after all changes for abortable + // rebalance & atomic update of cluster/stores are merged to do this change. /** * Get the latest cluster from all available nodes in the cluster
* @@ -209,8 +214,8 @@ public static void assertSameDonor(List partitionInfos, * @param adminClient Admin client used to query * @throws VoldemortRebalancingException if any node is not in normal state */ - public static void validateProdClusterStateIsNormal(final Cluster cluster, - final AdminClient adminClient) { + public static void checkEachServerInNormalState(final Cluster cluster, + final AdminClient adminClient) { for(Node node: cluster.getNodes()) { Versioned versioned = adminClient.rebalanceOps.getRemoteServerState(node.getId()); @@ -244,6 +249,12 @@ public static void validateClusterStores(final Cluster cluster, return; } + // TODO: This method is biased towards the 3 currently supported use cases: + // shuffle, cluster expansion, and zone expansion. There are two other use + // cases we need to consider: cluster contraction (reducing # nodes in a + // zone) and zone contraction (reducing # of zones). We probably want to end + // up pass an enum into this method so we can do proper checks based on use + // case. /** * A final cluster ought to be a super set of current cluster. I.e., * existing node IDs ought to map to same server, but partition layout can @@ -356,7 +367,9 @@ public static void validateClusterPartitionState(final Cluster subsetCluster, * @param rhs */ public static void validateClusterZonesSame(final Cluster lhs, final Cluster rhs) { - if(lhs.getZones().equals(rhs.getZones())) + Set lhsSet = new HashSet(lhs.getZones()); + Set rhsSet = new HashSet(rhs.getZones()); + if(!lhsSet.equals(rhsSet)) throw new VoldemortException("Zones are not the same [ lhs cluster zones (" + lhs.getZones() + ") not equal to rhs cluster zones (" + rhs.getZones() + ") ]"); @@ -1202,7 +1215,7 @@ public static String analyzeInvalidMetadataRate(final Cluster currentCluster, for(int nodeId: currentCluster.getNodeIdsInZone(zoneId)) { for(int partitionId: targetSRP.getZonePrimaryPartitionIds(nodeId)) { zoneLocalPrimaries++; - if(!currentSRP.getNaryPartitionIds(nodeId).contains(partitionId)) { + if(!currentSRP.getZoneNAryPartitionIds(nodeId).contains(partitionId)) { invalidMetadata++; } } diff --git a/test/unit/voldemort/client/rebalance/RebalanceBatchPlanTest.java b/test/unit/voldemort/client/rebalance/RebalanceBatchPlanTest.java index eec113b94e..5aeec4ee1a 100644 --- a/test/unit/voldemort/client/rebalance/RebalanceBatchPlanTest.java +++ b/test/unit/voldemort/client/rebalance/RebalanceBatchPlanTest.java @@ -17,7 +17,9 @@ package voldemort.client.rebalance; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.File; @@ -45,6 +47,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; +// TODO: (offline) fix this as part of completing the RebalancePlan work // TODO: This suite of tests is known to mostly fail. Sorry. // TODO: This test needs to be mostly re-written. The planning algorithm has // changed and this test focused on the implementation of the prior planning @@ -56,12 +59,20 @@ public class RebalanceBatchPlanTest { private Cluster targetCluster; private List storeDefList; private List storeDefList2; + private List test211StoreDef; @Before public void setUp() { try { storeDefList = new StoreDefinitionsMapper().readStoreList(new FileReader(new File(storeDefFile))); storeDefList2 = new StoreDefinitionsMapper().readStoreList(new StringReader(VoldemortTestConstants.getSingleStore322Xml())); + test211StoreDef = Lists.newArrayList(ServerTestUtils.getStoreDef("test", + 2, + 1, + 1, + 1, + 1, + RoutingStrategyType.CONSISTENT_STRATEGY)); } catch(FileNotFoundException e) { throw new RuntimeException("Failed to find storeDefFile:" + storeDefFile, e); } @@ -72,7 +83,7 @@ public void setUp() { * replicas */ @Test - public void testRebalancePlanInsufficientReplicas() { + public void testInsufficientNodes() { currentCluster = ServerTestUtils.getLocalCluster(3, new int[][] { { 0 }, { 1 }, { 2 } }); targetCluster = ServerTestUtils.getLocalCluster(2, new int[][] { { 1 }, { 0 }, { 2 } }); @@ -84,126 +95,72 @@ public void testRebalancePlanInsufficientReplicas() { } + /** + * confirm that a shuffle of a cluster of size 2 for a 211 store is a no op. + */ @Test - public void testRebalancePlanDelete() { + public void testShuffleNoop() { + int numServers = 2; + int ports[] = ServerTestUtils.findFreePorts(3 * numServers); + currentCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { + { 0, 1, 2, 3 }, { 4, 5, 6, 7 } }); + + targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { + { 1, 2, 3 }, { 4, 5, 6, 7, 0 } }); + + List batchPlan = getBatchPlan(currentCluster, + targetCluster, + test211StoreDef); + + assertTrue("Batch plan should be empty.", batchPlan.isEmpty()); + } + + /** + * Expand on to an empty server. + */ + @Test + public void testExpansion() { int numServers = 3; int ports[] = ServerTestUtils.findFreePorts(3 * numServers); - // CASE 1 + currentCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 1, 2, 3 }, { 4, 5, 6, 7 }, {} }); targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 1, 2, 3 }, { 4, 5, 6, 7 }, { 0 } }); - List orderedRebalancePartitionInfoList = getExecutableTasks(currentCluster, - targetCluster, - Lists.newArrayList(ServerTestUtils.getStoreDef("test", - 2, - 1, - 1, - 1, - 1, - RoutingStrategyType.CONSISTENT_STRATEGY))); - assertEquals("There should have exactly 2 rebalancing node", - 2, - getUniqueNodeCount(orderedRebalancePartitionInfoList, false)); - assertEquals("There should be exactly 2 rebalancing partition info", - 2, - orderedRebalancePartitionInfoList.size()); + List batchPlan = getBatchPlan(currentCluster, + targetCluster, + test211StoreDef); + // data should only move from node 0 to node 2 for node 2 to host + // everything needed. no other movement should occur. + assertEquals("There should be one move in this plan.", 1, batchPlan.size()); + assertEquals("There should be exactly 1 rebalancing nodes", + 1, + getUniqueNodeCount(batchPlan, false)); assertEquals("Stealer 2 should have 1 entry", 1, - getStealerNodePartitionInfoCount(2, orderedRebalancePartitionInfoList)); + getStealerNodePartitionInfoCount(2, batchPlan)); // Partitions to move HashMap> partitionsToMove = Maps.newHashMap(); partitionsToMove.put(0, Lists.newArrayList(0)); - partitionsToMove.put(1, Lists.newArrayList(5, 4, 7, 6)); + partitionsToMove.put(1, Lists.newArrayList(4, 5, 6, 7)); HashMap>> storeToPartitionsToMove = Maps.newHashMap(); storeToPartitionsToMove.put("test", partitionsToMove); - checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(2, - orderedRebalancePartitionInfoList), + checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(2, batchPlan), Arrays.asList(new RebalancePartitionsInfo(2, 0, storeToPartitionsToMove, currentCluster))); - - assertEquals("Stealer 0 should have 1 entry", - 1, - getStealerNodePartitionInfoCount(0, orderedRebalancePartitionInfoList)); - partitionsToMove = Maps.newHashMap(); - partitionsToMove.put(1, Lists.newArrayList(0)); - storeToPartitionsToMove = Maps.newHashMap(); - storeToPartitionsToMove.put("test", partitionsToMove); - - checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(0, - orderedRebalancePartitionInfoList), - Arrays.asList(new RebalancePartitionsInfo(0, - 1, - storeToPartitionsToMove, - currentCluster))); - - // CASE 2 - - currentCluster = ServerTestUtils.getLocalCluster(2, new int[][] { { 0, 1, 2, 3 }, - { 4, 5, 6, 7 } }); - - targetCluster = ServerTestUtils.getLocalCluster(2, new int[][] { { 1, 2, 3 }, - { 4, 5, 6, 7, 0 } }); - - orderedRebalancePartitionInfoList = getExecutableTasks(currentCluster, - targetCluster, - Lists.newArrayList(ServerTestUtils.getStoreDef("test", - 2, - 1, - 1, - 1, - 1, - RoutingStrategyType.CONSISTENT_STRATEGY))); - assertEquals("There should have exactly 2 rebalancing node", - 2, - getUniqueNodeCount(orderedRebalancePartitionInfoList, false)); - assertEquals("There should be exactly 2 rebalance partition info", - 2, - orderedRebalancePartitionInfoList.size()); - assertEquals("Stealer 1 should have 1 entry", - 1, - getStealerNodePartitionInfoCount(1, orderedRebalancePartitionInfoList)); - - partitionsToMove = Maps.newHashMap(); - partitionsToMove.put(0, Lists.newArrayList(0)); - storeToPartitionsToMove = Maps.newHashMap(); - storeToPartitionsToMove.put("test", partitionsToMove); - - checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(1, - orderedRebalancePartitionInfoList), - Arrays.asList(new RebalancePartitionsInfo(1, - 0, - storeToPartitionsToMove, - currentCluster))); - - assertEquals("Stealer 0 should have 1 entry", - 1, - getStealerNodePartitionInfoCount(0, orderedRebalancePartitionInfoList)); - partitionsToMove = Maps.newHashMap(); - partitionsToMove.put(1, Lists.newArrayList(0)); - storeToPartitionsToMove = Maps.newHashMap(); - storeToPartitionsToMove.put("test", partitionsToMove); - - checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(0, - orderedRebalancePartitionInfoList), - Arrays.asList(new RebalancePartitionsInfo(0, - 1, - storeToPartitionsToMove, - currentCluster))); - } /** * Tests the case where-in we delete all the partitions from the last node */ @Test - public void testRebalancePlanDeleteLastNode() { + public void testDeleteLastNode() { int numServers = 4; int ports[] = ServerTestUtils.findFreePorts(3 * numServers); @@ -213,9 +170,9 @@ public void testRebalancePlanDeleteLastNode() { targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 3, 6, 9, 12, 15 }, { 1, 4, 7, 10, 13, 16 }, { 2, 5, 8, 11, 14, 17 }, {} }); - List orderedRebalancePartitionInfoList = getExecutableTasks(currentCluster, - targetCluster, - storeDefList2); + List orderedRebalancePartitionInfoList = getBatchPlan(currentCluster, + targetCluster, + storeDefList2); assertEquals("There should have exactly 1 rebalancing node", 1, getUniqueNodeCount(orderedRebalancePartitionInfoList, false)); @@ -247,7 +204,7 @@ public void testRebalancePlanDeleteLastNode() { * Tests the scenario where-in we delete the first node */ @Test - public void testRebalancePlanDeleteFirstNode() { + public void testDeleteFirstNode() { int numServers = 4; int ports[] = ServerTestUtils.findFreePorts(3 * numServers); @@ -257,124 +214,39 @@ public void testRebalancePlanDeleteFirstNode() { targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 4 }, { 0, 1, 5 }, { 2, 6 }, { 3, 7 } }); - // PHASE 1 - List orderedRebalancePartitionInfoList = getExecutableTasks(currentCluster, - targetCluster, - storeDefList2); - assertEquals("There should have exactly 3 rebalancing node", - 3, - getUniqueNodeCount(orderedRebalancePartitionInfoList, false)); - assertEquals("There should be exactly 3 rebalancing partition info", - 3, - orderedRebalancePartitionInfoList.size()); - assertEquals("Stealer 1 should have 1 entry", - 1, - getStealerNodePartitionInfoCount(1, orderedRebalancePartitionInfoList)); - - HashMap> partitionsToMove = Maps.newHashMap(); - partitionsToMove.clear(); - partitionsToMove.put(0, Lists.newArrayList(0)); - partitionsToMove.put(1, Lists.newArrayList(7)); - partitionsToMove.put(2, Lists.newArrayList(6)); - HashMap>> storeToPartitionsToMove = Maps.newHashMap(); - storeToPartitionsToMove.put("test", partitionsToMove); - - checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(1, - orderedRebalancePartitionInfoList), - Arrays.asList(new RebalancePartitionsInfo(1, - 0, - storeToPartitionsToMove, - currentCluster))); - assertEquals("Stealer 2 should have 1 entry", - 1, - getStealerNodePartitionInfoCount(2, orderedRebalancePartitionInfoList)); - partitionsToMove.clear(); - partitionsToMove.put(1, Lists.newArrayList(0)); - partitionsToMove.put(2, Lists.newArrayList(7)); - storeToPartitionsToMove = Maps.newHashMap(); - storeToPartitionsToMove.put("test", partitionsToMove); - checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(2, - orderedRebalancePartitionInfoList), - Arrays.asList(new RebalancePartitionsInfo(2, - 1, - storeToPartitionsToMove, - currentCluster))); - - assertEquals("Stealer 3 should have 1 entry", - 1, - getStealerNodePartitionInfoCount(3, orderedRebalancePartitionInfoList)); - partitionsToMove.clear(); - partitionsToMove.put(2, Lists.newArrayList(0)); - storeToPartitionsToMove = Maps.newHashMap(); - storeToPartitionsToMove.put("test", partitionsToMove); - checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(3, - orderedRebalancePartitionInfoList), - Arrays.asList(new RebalancePartitionsInfo(3, - 2, - storeToPartitionsToMove, - currentCluster))); - - // PHASE 2 - currentCluster = ServerTestUtils.getLocalCluster(4, new int[][] { { 4 }, { 0, 1, 5 }, - { 2, 6 }, { 3, 7 } }); - - targetCluster = ServerTestUtils.getLocalCluster(4, new int[][] { {}, { 0, 1, 5 }, - { 4, 2, 6 }, { 3, 7 } }); - - orderedRebalancePartitionInfoList = getExecutableTasks(currentCluster, + // PHASE 1 - move partition 0 off of node 0 to node 1 + List batchPlan = getBatchPlan(currentCluster, targetCluster, storeDefList2); - assertEquals("There should have exactly 3 rebalancing node", - 3, - getUniqueNodeCount(orderedRebalancePartitionInfoList, false)); - assertEquals("There should have exactly 3 rebalancing partition info", - 3, - orderedRebalancePartitionInfoList.size()); + assertFalse("Batch plan should not be empty.", batchPlan.isEmpty()); - assertEquals("Stealer 2 should have 1 entry", - 1, - getStealerNodePartitionInfoCount(2, orderedRebalancePartitionInfoList)); + // Cannot do other tests because with partition 1 already on node 1, its + // unclear which partitions will actual move. - partitionsToMove.clear(); - partitionsToMove.put(0, Lists.newArrayList(4)); - partitionsToMove.put(1, Lists.newArrayList(3)); - storeToPartitionsToMove = Maps.newHashMap(); - storeToPartitionsToMove.put("test", partitionsToMove); - checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(2, - orderedRebalancePartitionInfoList), - Arrays.asList(new RebalancePartitionsInfo(2, - 0, - storeToPartitionsToMove, - currentCluster))); - assertEquals("Stealer 1 should have 1 entry", - 1, - getStealerNodePartitionInfoCount(1, orderedRebalancePartitionInfoList)); - partitionsToMove.clear(); - partitionsToMove.put(2, Lists.newArrayList(2)); - storeToPartitionsToMove = Maps.newHashMap(); - storeToPartitionsToMove.put("test", partitionsToMove); - checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(1, - orderedRebalancePartitionInfoList), - Arrays.asList(new RebalancePartitionsInfo(1, - 0, - storeToPartitionsToMove, - currentCluster))); + // PHASE 2 - Move partition 4 off of node 0 to node 2 + currentCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 4 }, + { 0, 1, 5 }, { 2 }, { 3, 6, 7 } }); - assertEquals("Stealer 3 should have 1 entry", - 1, - getStealerNodePartitionInfoCount(3, orderedRebalancePartitionInfoList)); - partitionsToMove.clear(); - partitionsToMove.put(2, Lists.newArrayList(4)); - storeToPartitionsToMove = Maps.newHashMap(); - storeToPartitionsToMove.put("test", partitionsToMove); - checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(3, - orderedRebalancePartitionInfoList), - Arrays.asList(new RebalancePartitionsInfo(3, - 2, - storeToPartitionsToMove, - currentCluster))); + targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { {}, + { 0, 1, 5 }, { 4, 2 }, { 3, 6, 7 } }); + batchPlan = getBatchPlan(currentCluster, targetCluster, storeDefList2); + + assertFalse("Batch plan should not be empty.", batchPlan.isEmpty()); + assertFalse("Batch plan for server 2 should not be empty.", + getStealerNodePartitionInfoList(2, batchPlan).isEmpty()); + boolean hasTheMove = false; + // Confirm partition 4 is moved from server 0 to server 2 + for(RebalancePartitionsInfo info: getStealerNodePartitionInfoList(2, batchPlan)) { + assertTrue(info.getStealerId() == 2); + if(info.getDonorId() == 0) { + hasTheMove = true; + assertTrue(info.getPartitionStores().size() == 1); + assertTrue(info.getPartitionIds("test").contains(4)); + } + } + assertTrue(hasTheMove); } @Test @@ -382,134 +254,50 @@ public void testRebalanceDeletingMiddleNode() { int numServers = 4; int ports[] = ServerTestUtils.findFreePorts(3 * numServers); + // PHASE 1 - move partition 2 off of node 2 and onto node 1 currentCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 4 }, { 1, 5 }, { 2, 6 }, { 3, 7 } }); targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 4 }, { 2, 1, 5 }, { 6 }, { 3, 7 } }); - List orderedRebalancePartitionInfoList = getExecutableTasks(currentCluster, - targetCluster, - storeDefList2); - - assertEquals("There should have exactly 3 rebalancing node", - 3, - getUniqueNodeCount(orderedRebalancePartitionInfoList, false)); - - assertEquals("There should have exactly 3 rebalancing partition info", - 3, - orderedRebalancePartitionInfoList.size()); - - assertEquals("Stealer 1 should have 1 entry", - 1, - getStealerNodePartitionInfoCount(1, orderedRebalancePartitionInfoList)); - - HashMap> partitionsToMove = Maps.newHashMap(); - partitionsToMove.clear(); - partitionsToMove.put(0, Lists.newArrayList(2)); - HashMap>> storeToPartitionsToMove = Maps.newHashMap(); - storeToPartitionsToMove.put("test", partitionsToMove); - checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(1, - orderedRebalancePartitionInfoList), - Arrays.asList(new RebalancePartitionsInfo(1, - 2, - storeToPartitionsToMove, - currentCluster))); - assertEquals("Stealer 0 should have 1 entry", - 1, - getStealerNodePartitionInfoCount(0, orderedRebalancePartitionInfoList)); - partitionsToMove.clear(); - partitionsToMove.put(2, Lists.newArrayList(1)); - storeToPartitionsToMove = Maps.newHashMap(); - storeToPartitionsToMove.put("test", partitionsToMove); - checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(0, - orderedRebalancePartitionInfoList), - Arrays.asList(new RebalancePartitionsInfo(0, - 3, - storeToPartitionsToMove, - currentCluster))); - - assertEquals("Stealer 3 should have 1 entry", - 1, - getStealerNodePartitionInfoCount(3, orderedRebalancePartitionInfoList)); - partitionsToMove.clear(); - partitionsToMove.put(1, Lists.newArrayList(1)); - partitionsToMove.put(2, Lists.newArrayList(0)); - storeToPartitionsToMove = Maps.newHashMap(); - storeToPartitionsToMove.put("test", partitionsToMove); - checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(3, - orderedRebalancePartitionInfoList), - Arrays.asList(new RebalancePartitionsInfo(3, - 2, - storeToPartitionsToMove, - currentCluster))); - - currentCluster = ServerTestUtils.getLocalCluster(4, new int[][] { { 0, 4 }, { 2, 1, 5 }, - { 6 }, { 3, 7 } }); - - targetCluster = ServerTestUtils.getLocalCluster(4, new int[][] { { 0, 4 }, { 2, 1, 5 }, {}, - { 6, 3, 7 } }); - - orderedRebalancePartitionInfoList = getExecutableTasks(currentCluster, + List batchPlan = getBatchPlan(currentCluster, targetCluster, storeDefList2); - assertEquals("There should have exactly 3 rebalancing node", - 3, - getUniqueNodeCount(orderedRebalancePartitionInfoList, false)); - assertEquals("There should have exactly 3 rebalancing partition info", - 3, - orderedRebalancePartitionInfoList.size()); + assertFalse("Batch plan should not be empty.", batchPlan.isEmpty()); + assertFalse("Batch plan for server 1 should not be empty.", + getStealerNodePartitionInfoList(1, batchPlan).isEmpty()); + boolean hasTheMove = false; + // Confirm partition 2 is moved from server 2 to server 1 + for(RebalancePartitionsInfo info: getStealerNodePartitionInfoList(1, batchPlan)) { + assertTrue(info.getStealerId() == 1); + if(info.getDonorId() == 2) { + hasTheMove = true; + assertTrue(info.getPartitionStores().size() == 1); + assertTrue(info.getPartitionIds("test").contains(2)); + } + } + assertTrue(hasTheMove); - assertEquals("Stealer 3 should have 1 entry", - 1, - getStealerNodePartitionInfoCount(3, orderedRebalancePartitionInfoList)); + // PHASE 2 - move partition 6 off of node 2 and onto node 3 + currentCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 4 }, + { 2, 1, 5 }, { 6 }, { 3, 7 } }); - partitionsToMove.clear(); - partitionsToMove.put(0, Lists.newArrayList(6)); - partitionsToMove.put(1, Lists.newArrayList(5)); - partitionsToMove.put(2, Lists.newArrayList(4)); - storeToPartitionsToMove = Maps.newHashMap(); - storeToPartitionsToMove.put("test", partitionsToMove); - checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(3, - orderedRebalancePartitionInfoList), - Arrays.asList(new RebalancePartitionsInfo(3, - 2, - storeToPartitionsToMove, - currentCluster))); - assertEquals("Stealer 0 should have 1 entry", - 1, - getStealerNodePartitionInfoCount(0, orderedRebalancePartitionInfoList)); - partitionsToMove.clear(); - partitionsToMove.put(1, Lists.newArrayList(6)); - partitionsToMove.put(2, Lists.newArrayList(5)); - storeToPartitionsToMove = Maps.newHashMap(); - storeToPartitionsToMove.put("test", partitionsToMove); - checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(0, - orderedRebalancePartitionInfoList), - Arrays.asList(new RebalancePartitionsInfo(0, - 3, - storeToPartitionsToMove, - currentCluster))); + targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 4 }, + { 2, 1, 5 }, {}, { 6, 3, 7 } }); - assertEquals("Stealer 1 should have 1 entry", - 1, - getStealerNodePartitionInfoCount(1, orderedRebalancePartitionInfoList)); - partitionsToMove.clear(); - partitionsToMove.put(2, Lists.newArrayList(6)); - storeToPartitionsToMove = Maps.newHashMap(); - storeToPartitionsToMove.put("test", partitionsToMove); - checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(1, - orderedRebalancePartitionInfoList), - Arrays.asList(new RebalancePartitionsInfo(1, - 0, - storeToPartitionsToMove, - currentCluster))); + batchPlan = getBatchPlan(currentCluster, targetCluster, storeDefList2); + + assertFalse("Batch plan should not be empty.", batchPlan.isEmpty()); + + // Cannot do other tests because with partition 7 already on node 3, its + // unclear which partitions will actual move when partitoin 6 also moves + // to node 3. } - @SuppressWarnings("unchecked") @Test - public void testRebalancePlanWithReplicationChanges() { + public void testManyStoreExpansion() { int numServers = 4; int ports[] = ServerTestUtils.findFreePorts(3 * numServers); @@ -519,100 +307,39 @@ public void testRebalancePlanWithReplicationChanges() { targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 0, 2, 3 }, { 4, 6 }, { 7, 8, 9 }, { 1, 5 } }); - List orderedRebalancePartitionInfoList = getExecutableTasks(currentCluster, - targetCluster, - storeDefList); - - assertEquals("There should have exactly 3 rebalancing node", - 3, - this.getUniqueNodeCount(orderedRebalancePartitionInfoList, false)); - assertEquals("There should have exactly 5 rebalancing partition info", - 5, - orderedRebalancePartitionInfoList.size()); - assertEquals("Stealer 3 should have 3 entry", - 3, - this.getStealerNodePartitionInfoCount(3, orderedRebalancePartitionInfoList)); - assertEquals("Stealer 0 should have 1 entry", - 1, - this.getStealerNodePartitionInfoCount(0, orderedRebalancePartitionInfoList)); - assertEquals("Stealer 1 should have 1 entry", - 1, - this.getStealerNodePartitionInfoCount(1, orderedRebalancePartitionInfoList)); - - HashMap>> storeToPartitionsToMove[] = new HashMap[5]; - - for(int numPlan = 0; numPlan < 5; numPlan++) { - storeToPartitionsToMove[numPlan] = new HashMap>>(); + List batchPlan = getBatchPlan(currentCluster, + targetCluster, + storeDefList); + + assertFalse("Batch plan should not be empty.", batchPlan.isEmpty()); + assertFalse("Batch plan for server 3 should not be empty.", + getStealerNodePartitionInfoList(3, batchPlan).isEmpty()); + + boolean hasTheMove = false; + // Confirm partition 1 is moved from server 0 to server 3 + for(RebalancePartitionsInfo info: getStealerNodePartitionInfoList(3, batchPlan)) { + assertTrue(info.getStealerId() == 3); + if(info.getDonorId() == 0) { + hasTheMove = true; + for(String storeName: info.getPartitionStores()) { + assertTrue(info.getPartitionIds(storeName).contains(1)); + } + } } - - for(StoreDefinition storeDef: storeDefList) { - if(storeDef.getReplicationFactor() == 2) { - - // All moves - HashMap> partitions = Maps.newHashMap(); - partitions.put(0, Lists.newArrayList(1)); - storeToPartitionsToMove[0].put(storeDef.getName(), partitions); - - partitions = Maps.newHashMap(); - partitions.put(0, Lists.newArrayList(5)); - partitions.put(1, Lists.newArrayList(0)); - storeToPartitionsToMove[1].put(storeDef.getName(), partitions); - - partitions = Maps.newHashMap(); - partitions.put(1, Lists.newArrayList(4)); - storeToPartitionsToMove[2].put(storeDef.getName(), partitions); - - partitions = Maps.newHashMap(); - partitions.put(1, Lists.newArrayList(1)); - storeToPartitionsToMove[3].put(storeDef.getName(), partitions); - - partitions = Maps.newHashMap(); - partitions.put(1, Lists.newArrayList(5)); - storeToPartitionsToMove[4].put(storeDef.getName(), partitions); - - } else if(storeDef.getReplicationFactor() == 1) { - - // All moves - HashMap> partitions = Maps.newHashMap(); - partitions.put(0, Lists.newArrayList(1)); - storeToPartitionsToMove[0].put(storeDef.getName(), partitions); - - partitions = Maps.newHashMap(); - partitions.put(0, Lists.newArrayList(5)); - storeToPartitionsToMove[1].put(storeDef.getName(), partitions); - - } else { - throw new VoldemortException("Change in store definitions file found"); + assertTrue(hasTheMove); + + hasTheMove = false; + // Confirm partition 5 is moved from server 1 to server 3 + for(RebalancePartitionsInfo info: getStealerNodePartitionInfoList(3, batchPlan)) { + assertTrue(info.getStealerId() == 3); + if(info.getDonorId() == 1) { + hasTheMove = true; + for(String storeName: info.getPartitionStores()) { + assertTrue(info.getPartitionIds(storeName).contains(5)); + } } } - - checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(3, - orderedRebalancePartitionInfoList), - Arrays.asList(new RebalancePartitionsInfo(3, - 0, - storeToPartitionsToMove[0], - currentCluster), - new RebalancePartitionsInfo(3, - 1, - storeToPartitionsToMove[1], - currentCluster), - new RebalancePartitionsInfo(3, - 2, - storeToPartitionsToMove[2], - currentCluster))); - checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(0, - orderedRebalancePartitionInfoList), - Arrays.asList(new RebalancePartitionsInfo(0, - 1, - storeToPartitionsToMove[3], - currentCluster))); - checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(1, - orderedRebalancePartitionInfoList), - Arrays.asList(new RebalancePartitionsInfo(1, - 2, - storeToPartitionsToMove[4], - currentCluster))); - + assertTrue(hasTheMove); } /** @@ -629,9 +356,9 @@ public void testRebalanceAllReplicasBeingMigrated() { targetCluster = ServerTestUtils.getLocalCluster(numServers, ports, new int[][] { { 4 }, { 2, 3 }, { 1, 5 }, { 0 } }); - List orderedRebalancePartitionInfoList = getExecutableTasks(currentCluster, - targetCluster, - storeDefList2); + List orderedRebalancePartitionInfoList = getBatchPlan(currentCluster, + targetCluster, + storeDefList2); assertEquals("There should have exactly 1 rebalancing node", 1, @@ -740,16 +467,16 @@ private void checkAllRebalanceInfoPresent(List toCheckR /** * Given the current and target cluster metadata, along with your store - * definition, return the executable tasks. + * definition, return the batch plan. * * @param currentCluster Current cluster metadata * @param targetCluster Target cluster metadata * @param storeDef List of store definitions * @return list of tasks */ - private List getExecutableTasks(Cluster currentCluster, - Cluster targetCluster, - List storeDef) { + private List getBatchPlan(Cluster currentCluster, + Cluster targetCluster, + List storeDef) { RebalanceBatchPlan rebalancePlan = new RebalanceBatchPlan(currentCluster, targetCluster, storeDef); diff --git a/test/unit/voldemort/client/rebalance/RebalancePlanTest.java b/test/unit/voldemort/client/rebalance/RebalancePlanTest.java index 10783d140f..34c6c1765e 100644 --- a/test/unit/voldemort/client/rebalance/RebalancePlanTest.java +++ b/test/unit/voldemort/client/rebalance/RebalancePlanTest.java @@ -60,7 +60,7 @@ RebalancePlan makePlan(Cluster cCluster, Cluster fCluster, List fStores) { // Defaults for plans - int batchSize = 1000; + int batchSize = RebalancePlan.BATCH_SIZE; String outputDir = null; return new RebalancePlan(cCluster, cStores, fCluster, fStores, batchSize, outputDir); diff --git a/test/unit/voldemort/routing/StoreRoutingPlanTest.java b/test/unit/voldemort/routing/StoreRoutingPlanTest.java index 2d35d99bc4..fe43291ccb 100644 --- a/test/unit/voldemort/routing/StoreRoutingPlanTest.java +++ b/test/unit/voldemort/routing/StoreRoutingPlanTest.java @@ -96,29 +96,29 @@ public void testZonedStoreRoutingPlan() { assertEquals("Zone replica type should be 1", 1, - zonedRoutingPlan.getZoneReplicaType(0, 0, samplePartitionKeysMap.get(6).get(0))); + zonedRoutingPlan.getZoneNAry(0, 0, samplePartitionKeysMap.get(6).get(0))); assertEquals("Zone replica type should be 0", 0, - zonedRoutingPlan.getZoneReplicaType(0, 1, samplePartitionKeysMap.get(6).get(0))); + zonedRoutingPlan.getZoneNAry(0, 1, samplePartitionKeysMap.get(6).get(0))); assertEquals("Zone replica type should be 1", 1, - zonedRoutingPlan.getZoneReplicaType(1, 3, samplePartitionKeysMap.get(7).get(0))); + zonedRoutingPlan.getZoneNAry(1, 3, samplePartitionKeysMap.get(7).get(0))); assertEquals("Zone replica type should be 0", 0, - zonedRoutingPlan.getZoneReplicaType(1, 4, samplePartitionKeysMap.get(7).get(0))); + zonedRoutingPlan.getZoneNAry(1, 4, samplePartitionKeysMap.get(7).get(0))); assertEquals("Replica owner should be 1", 1, - zonedRoutingPlan.getZoneReplicaNode(0, 1, samplePartitionKeysMap.get(2).get(0))); + zonedRoutingPlan.getNodeIdForZoneNary(0, 1, samplePartitionKeysMap.get(2).get(0))); assertEquals("Replica owner should be 1", 1, - zonedRoutingPlan.getZoneReplicaNode(0, 0, samplePartitionKeysMap.get(3).get(0))); + zonedRoutingPlan.getNodeIdForZoneNary(0, 0, samplePartitionKeysMap.get(3).get(0))); assertEquals("Replica owner should be 4", 4, - zonedRoutingPlan.getZoneReplicaNode(1, 1, samplePartitionKeysMap.get(1).get(0))); + zonedRoutingPlan.getNodeIdForZoneNary(1, 1, samplePartitionKeysMap.get(1).get(0))); assertEquals("Replica owner should be 3", 3, - zonedRoutingPlan.getZoneReplicaNode(1, 0, samplePartitionKeysMap.get(2).get(0))); + zonedRoutingPlan.getNodeIdForZoneNary(1, 0, samplePartitionKeysMap.get(2).get(0))); } @Test @@ -136,22 +136,22 @@ public void testNonZonedStoreRoutingPlan() { assertEquals("Zone replica type should be 1", 1, - nonZonedRoutingPlan.getZoneReplicaType(Zone.DEFAULT_ZONE_ID, + nonZonedRoutingPlan.getZoneNAry(Zone.DEFAULT_ZONE_ID, 2, samplePartitionKeysMap.get(1).get(0))); assertEquals("Zone replica type should be 0", 0, - nonZonedRoutingPlan.getZoneReplicaType(Zone.DEFAULT_ZONE_ID, + nonZonedRoutingPlan.getZoneNAry(Zone.DEFAULT_ZONE_ID, 1, samplePartitionKeysMap.get(3).get(0))); assertEquals("Replica owner should be 2", 2, - nonZonedRoutingPlan.getZoneReplicaNode(Zone.DEFAULT_ZONE_ID, + nonZonedRoutingPlan.getNodeIdForZoneNary(Zone.DEFAULT_ZONE_ID, 1, samplePartitionKeysMap.get(1).get(0))); assertEquals("Replica owner should be 1", 1, - nonZonedRoutingPlan.getZoneReplicaNode(Zone.DEFAULT_ZONE_ID, + nonZonedRoutingPlan.getNodeIdForZoneNary(Zone.DEFAULT_ZONE_ID, 0, samplePartitionKeysMap.get(3).get(0))); }