Skip to content

Commit

Permalink
Renamed RebalanceClusterPlan(Test).java to RebalanceBatchPlan(Test).j…
Browse files Browse the repository at this point in the history
…ava. Cleaned up TODOs and comments and variable names in RebalanceBatchPlan.
  • Loading branch information
jayjwylie committed Jun 20, 2013
1 parent 1219576 commit 8121c78
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 97 deletions.
Expand Up @@ -31,27 +31,17 @@

import com.google.common.collect.Maps;

// TODO: (refactor) Rename RebalanceClusterPlan to RebalanceBatchPlan
// TODO: (refactor) Fix cluster nomenclature in general: make sure there are
// exactly three prefixes used to distinguish cluster xml: initial or current,
// target or spec or expanded, and final. 'target' has historically been
// overloaded to mean spec/expanded or final depending on context.
// TODO: (refactor) Fix this header comment after all the refactoring...
/**
* Compares the target cluster configuration with the final cluster
* configuration and generates a plan to move the partitions. The plan can be
* either generated from the perspective of the stealer or the donor. <br>
*
* The end result is one of the following -
*
* <li>A map of stealer node-ids to partitions desired to be stolen from various
* donor nodes
*
* <li>A map of donor node-ids to partitions which we need to donate to various
* stealer nodes
*
* Constructs a batch plan that goes from targetCluster to finalCluster. The
* partition-stores included in the move are based on those listed in storeDefs.
* This batch plan is execution-agnostic, i.e., a plan is generated and later
* stealer- versus donor-based execution of that plan is decided.
*/
public class RebalanceClusterPlan {
public class RebalanceBatchPlan {

private final Cluster targetCluster;
private final Cluster finalCluster;
Expand All @@ -72,9 +62,9 @@ public class RebalanceClusterPlan {
* @param isStealerBased Do we want to generate the final plan based on the
* stealer node or the donor node?
*/
public RebalanceClusterPlan(final Cluster targetCluster,
final Cluster finalCluster,
final List<StoreDefinition> storeDefs) {
public RebalanceBatchPlan(final Cluster targetCluster,
final Cluster finalCluster,
final List<StoreDefinition> storeDefs) {
this.targetCluster = targetCluster;
this.finalCluster = finalCluster;
this.storeDefs = storeDefs;
Expand Down Expand Up @@ -153,21 +143,25 @@ public int getPartitionStoreMoves() {
return partitionStoreMoves;
}

// TODO: Simplify / kill this complicated struct once
// RebalancePartitionsInfo is rationalized.
private class UnnecessarilyComplicatedDataStructure {
// TODO: (replicaType) As part of dropping replicaType and
// RebalancePartitionsInfo from code, simplify this object.
/**
* Gathers all of the state necessary to build a
* List<RebalancePartitionsInfo> which is effectively a (batch) plan.
*/
private class RebalancePartitionsInfoBuilder {

final HashMap<Pair<Integer, Integer>, HashMap<String, HashMap<Integer, List<Integer>>>> stealerDonorToStoreToStealPartition;

UnnecessarilyComplicatedDataStructure() {
RebalancePartitionsInfoBuilder() {
stealerDonorToStoreToStealPartition = Maps.newHashMap();
}

public void shovelCrapIn(int stealerNodeId,
int donorNodeId,
String storeName,
int donorReplicaType,
int partitionId) {
public void addPartitionStoreMove(int stealerNodeId,
int donorNodeId,
String storeName,
int donorReplicaType,
int partitionId) {
Pair<Integer, Integer> stealerDonor = new Pair<Integer, Integer>(stealerNodeId,
donorNodeId);
if(!stealerDonorToStoreToStealPartition.containsKey(stealerDonor)) {
Expand All @@ -189,7 +183,7 @@ public void shovelCrapIn(int stealerNodeId,
partitionIdList.add(partitionId);
}

public List<RebalancePartitionsInfo> shovelCrapOut() {
public List<RebalancePartitionsInfo> buildRebalancePartitionsInfos() {
final List<RebalancePartitionsInfo> result = new ArrayList<RebalancePartitionsInfo>();

for(Pair<Integer, Integer> stealerDonor: stealerDonorToStoreToStealPartition.keySet()) {
Expand All @@ -202,32 +196,27 @@ public List<RebalancePartitionsInfo> shovelCrapOut() {
}
}

// TODO: Revisit these "two principles". I am not sure about the second one.
// Either because I don't like delete being mixed with this code or because
// we probably want to copy from a to-be-deleted partitoin-store.
/*
* Generate the list of partition movement based on 2 principles:
*
* <ol> <li>The number of partitions don't change; they are only
* redistributed across nodes <li>A primary or replica partition that is
* going to be deleted is never used to copy from data from another stealer
* </ol>
*
* @param targetCluster Current cluster configuration
/**
* Determine the batch plan and return it. The batch plan has the following
* properties:
*
* @param finalCluster Target cluster configuration
* 1) A stealer node does not steal any partition-stores it already hosts.
*
* @param storeDefs List of store definitions
* 2) If possible, a stealer node that is the n-ary zone replica in the
* finalCluster steals from the n-ary zone replica in the targetCluster in
* the same zone.
*
* @param stealerNodeId Id of the stealer node
* 3) If there are no partitoin-stores to steal in the same zone (i.e., this
* is the "zone expansion" use case), then the stealer node that is the
* n-ary zone replica in the finalCluster determines which pre-existing zone
* in the targetCluster hosts the primary partitionId for the
* partition-store and steals the n-ary zone replica from that zone.
*
* @param enableDeletePartition To delete or not to delete?
*/
// TODO: more verbose javadoc based on above (possibly)
/**
* Determine the batch plan!
* In summary, this batch plan avoids all unnecessary cross zone moves,
* distributes cross zone moves into new zones evenly across existing zones,
* and copies replicaFactor partition-stores into any new zone.
*
* @return
* @return the batch plan
*/
private List<RebalancePartitionsInfo> batchPlan() {
// Construct all store routing plans once.
Expand All @@ -240,7 +229,7 @@ private List<RebalancePartitionsInfo> batchPlan() {
storeDef));
}

UnnecessarilyComplicatedDataStructure ucds = new UnnecessarilyComplicatedDataStructure();
RebalancePartitionsInfoBuilder rpiBuilder = new RebalancePartitionsInfoBuilder();
// For every node in the final cluster ...
for(Node stealerNode: finalCluster.getNodes()) {
int stealerZoneId = stealerNode.getZoneId();
Expand Down Expand Up @@ -280,16 +269,16 @@ private List<RebalancePartitionsInfo> batchPlan() {
stealerPartitionId);
int donorReplicaType = targetSRP.getReplicaType(donorNodeId, stealerPartitionId);

ucds.shovelCrapIn(stealerNodeId,
donorNodeId,
storeDef.getName(),
donorReplicaType,
stealerPartitionId);
rpiBuilder.addPartitionStoreMove(stealerNodeId,
donorNodeId,
storeDef.getName(),
donorReplicaType,
stealerPartitionId);
}
}
}

return ucds.shovelCrapOut();
return rpiBuilder.buildRebalancePartitionsInfos();
}

@Override
Expand Down
6 changes: 3 additions & 3 deletions src/java/voldemort/client/rebalance/RebalanceController.java
Expand Up @@ -199,11 +199,11 @@ private void executePlan(RebalancePlan rebalancePlan) {
int partitionStoreCount = 0;
long totalTimeMs = 0;

List<RebalanceClusterPlan> entirePlan = rebalancePlan.getPlan();
List<RebalanceBatchPlan> entirePlan = rebalancePlan.getPlan();
int numBatches = entirePlan.size();
int numPartitionStores = rebalancePlan.getPartitionStoresMoved();

for(RebalanceClusterPlan batchPlan: entirePlan) {
for(RebalanceBatchPlan batchPlan: entirePlan) {
logger.info("======== REBALANCING BATCH " + (batchCount + 1) + " ========");
RebalanceUtils.printLog(batchCount, logger, batchPlan.toString());

Expand Down Expand Up @@ -271,7 +271,7 @@ private void batchStatusLog(int batchCount,
}

// TODO: Add javadoc.
private void executeBatch(int batchCount, final RebalanceClusterPlan batchPlan) {
private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) {
final Cluster batchCurrentCluster = batchPlan.getCurrentCluster();
final Cluster batchFinalCluster = batchPlan.getFinalCluster();
final List<StoreDefinition> batchStoreDefs = batchPlan.getStoreDefs();
Expand Down
24 changes: 12 additions & 12 deletions src/java/voldemort/client/rebalance/RebalancePlan.java
Expand Up @@ -57,7 +57,7 @@ public class RebalancePlan {

// TODO: (refactor) Better name than targetCluster? -> interimCluster
private final Cluster targetCluster;
private List<RebalanceClusterPlan> batchPlans;
private List<RebalanceBatchPlan> batchPlans;

// Aggregate stats
private int numPrimaryPartitionMoves;
Expand Down Expand Up @@ -100,7 +100,7 @@ public RebalancePlan(final Cluster currentCluster,
finalStores));

// Initialize the plan
batchPlans = new ArrayList<RebalanceClusterPlan>();
batchPlans = new ArrayList<RebalanceBatchPlan>();

// Initialize aggregate statistics
numPrimaryPartitionMoves = 0;
Expand Down Expand Up @@ -186,15 +186,15 @@ private void plan() {
"batch-" + Integer.toString(batches) + ".");

// Generate a plan to compute the tasks
final RebalanceClusterPlan rebalanceClusterPlan = new RebalanceClusterPlan(batchTargetCluster,
batchFinalCluster,
finalStores);
batchPlans.add(rebalanceClusterPlan);
final RebalanceBatchPlan RebalanceBatchPlan = new RebalanceBatchPlan(batchTargetCluster,
batchFinalCluster,
finalStores);
batchPlans.add(RebalanceBatchPlan);

numXZonePartitionStoreMoves += rebalanceClusterPlan.getCrossZonePartitionStoreMoves();
numPartitionStoreMoves += rebalanceClusterPlan.getPartitionStoreMoves();
nodeMoveMap.add(rebalanceClusterPlan.getNodeMoveMap());
zoneMoveMap.add(rebalanceClusterPlan.getZoneMoveMap());
numXZonePartitionStoreMoves += RebalanceBatchPlan.getCrossZonePartitionStoreMoves();
numPartitionStoreMoves += RebalanceBatchPlan.getPartitionStoreMoves();
nodeMoveMap.add(RebalanceBatchPlan.getNodeMoveMap());
zoneMoveMap.add(RebalanceBatchPlan.getZoneMoveMap());

batches++;
batchTargetCluster = mapper.readCluster(new StringReader(mapper.writeCluster(batchFinalCluster)));
Expand Down Expand Up @@ -264,7 +264,7 @@ public List<StoreDefinition> getFinalStores() {
*
* @return The plan!
*/
public List<RebalanceClusterPlan> getPlan() {
public List<RebalanceBatchPlan> getPlan() {
return batchPlans;
}

Expand Down Expand Up @@ -292,7 +292,7 @@ public MoveMap getZoneMoveMap() {
public String toString() {
StringBuilder sb = new StringBuilder();
// Dump entire plan batch-by-batch, partition info-by-partition info...
for(RebalanceClusterPlan batchPlan: batchPlans) {
for(RebalanceBatchPlan batchPlan: batchPlans) {
sb.append(batchPlan).append(Utils.NEWLINE);
}
// Dump aggregate stats of the plan
Expand Down
28 changes: 12 additions & 16 deletions test/unit/voldemort/client/rebalance/AdminRebalanceTest.java
Expand Up @@ -146,10 +146,9 @@ public void startThreeNodeRW() throws IOException {

targetCluster = RebalanceUtils.createUpdatedCluster(cluster, 2, Lists.newArrayList(0));

RebalanceClusterPlan plan = new RebalanceClusterPlan(cluster,
targetCluster,
Lists.newArrayList(storeDef1,
storeDef2));
RebalanceBatchPlan plan = new RebalanceBatchPlan(cluster,
targetCluster,
Lists.newArrayList(storeDef1, storeDef2));
plans = Lists.newArrayList(plan.getBatchPlan());
adminClient = ServerTestUtils.getAdminClient(cluster);
}
Expand Down Expand Up @@ -187,10 +186,9 @@ public void startFourNodeRW() throws IOException {
new Properties());

targetCluster = RebalanceUtils.createUpdatedCluster(cluster, 3, Lists.newArrayList(0));
RebalanceClusterPlan plan = new RebalanceClusterPlan(cluster,
targetCluster,
Lists.newArrayList(storeDef1,
storeDef2));
RebalanceBatchPlan plan = new RebalanceBatchPlan(cluster,
targetCluster,
Lists.newArrayList(storeDef1, storeDef2));
plans = Lists.newArrayList(plan.getBatchPlan());
adminClient = ServerTestUtils.getAdminClient(cluster);
}
Expand Down Expand Up @@ -238,10 +236,9 @@ public void startFourNodeRO() throws IOException {
new Properties());

targetCluster = RebalanceUtils.createUpdatedCluster(cluster, 3, Lists.newArrayList(0));
RebalanceClusterPlan plan = new RebalanceClusterPlan(cluster,
targetCluster,
Lists.newArrayList(storeDef1,
storeDef2));
RebalanceBatchPlan plan = new RebalanceBatchPlan(cluster,
targetCluster,
Lists.newArrayList(storeDef1, storeDef2));

plans = Lists.newArrayList(plan.getBatchPlan());
adminClient = ServerTestUtils.getAdminClient(cluster);
Expand Down Expand Up @@ -308,10 +305,9 @@ public void startFourNodeRORW() throws IOException {

targetCluster = RebalanceUtils.createUpdatedCluster(cluster, 3, Lists.newArrayList(0));
// Make plan only with RO stores
RebalanceClusterPlan plan = new RebalanceClusterPlan(cluster,
targetCluster,
Lists.newArrayList(storeDef1,
storeDef2));
RebalanceBatchPlan plan = new RebalanceBatchPlan(cluster,
targetCluster,
Lists.newArrayList(storeDef1, storeDef2));
plans = plan.getBatchPlan();

adminClient = ServerTestUtils.getAdminClient(cluster);
Expand Down
Expand Up @@ -48,7 +48,7 @@
// TODO: This test needs to be mostly re-written. The planning algorithm has
// changed and this test focused on the implementation of the prior planning
// algorithm, rather than the features of a plan in general.
public class RebalanceClusterPlanTest {
public class RebalanceBatchPlanTest {

private static String storeDefFile = "test/common/voldemort/config/stores.xml";
private Cluster currentCluster;
Expand Down Expand Up @@ -77,7 +77,7 @@ public void testRebalancePlanInsufficientReplicas() {
targetCluster = ServerTestUtils.getLocalCluster(2, new int[][] { { 1 }, { 0 }, { 2 } });

try {
new RebalanceClusterPlan(currentCluster, targetCluster, storeDefList);
new RebalanceBatchPlan(currentCluster, targetCluster, storeDefList);
fail("Should have thrown an exception since the migration should result in decrease in replication factor");
} catch(VoldemortException e) {}

Expand Down Expand Up @@ -749,9 +749,9 @@ private void checkAllRebalanceInfoPresent(List<RebalancePartitionsInfo> toCheckR
private List<RebalancePartitionsInfo> getExecutableTasks(Cluster currentCluster,
Cluster targetCluster,
List<StoreDefinition> storeDef) {
RebalanceClusterPlan rebalancePlan = new RebalanceClusterPlan(currentCluster,
targetCluster,
storeDef);
RebalanceBatchPlan rebalancePlan = new RebalanceBatchPlan(currentCluster,
targetCluster,
storeDef);
return rebalancePlan.getBatchPlan();
}

Expand Down
10 changes: 5 additions & 5 deletions test/unit/voldemort/store/rebalancing/RedirectingStoreTest.java
Expand Up @@ -50,7 +50,7 @@
import voldemort.client.SocketStoreClientFactory;
import voldemort.client.StoreClient;
import voldemort.client.protocol.RequestFormatType;
import voldemort.client.rebalance.RebalanceClusterPlan;
import voldemort.client.rebalance.RebalanceBatchPlan;
import voldemort.client.rebalance.RebalancePartitionsInfo;
import voldemort.cluster.Cluster;
import voldemort.cluster.failuredetector.NoopFailureDetector;
Expand Down Expand Up @@ -197,10 +197,10 @@ public void setUp() throws IOException, InterruptedException {
}
assertTrue("Not enough secondary entries", primaryEntriesMoved.size() > 1);

RebalanceClusterPlan rebalanceClusterPlan = new RebalanceClusterPlan(currentCluster,
targetCluster,
Lists.newArrayList(storeDef));
List<RebalancePartitionsInfo> plans = Lists.newArrayList(rebalanceClusterPlan.getBatchPlan());
RebalanceBatchPlan RebalanceBatchPlan = new RebalanceBatchPlan(currentCluster,
targetCluster,
Lists.newArrayList(storeDef));
List<RebalancePartitionsInfo> plans = Lists.newArrayList(RebalanceBatchPlan.getBatchPlan());

// Set into rebalancing state
for(RebalancePartitionsInfo partitionPlan: plans) {
Expand Down

0 comments on commit 8121c78

Please sign in to comment.