Skip to content

Commit

Permalink
interim commit. Some work on taking logic of OrderedClusterTransition…
Browse files Browse the repository at this point in the history
… and moving it into ExecutableBatches. Logic is broken though, and, more importantly, unnecessary. Given proxy-gets server from local zone and there can only be nominal perf gain from ordering rebalancing tasks at node level, OrderedClusterTransition needs to be removed from teh code, not incorporated into new planning/execution code.
  • Loading branch information
jayjwylie committed Jun 20, 2013
1 parent c1c367e commit d9e2623
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 20 deletions.
11 changes: 0 additions & 11 deletions src/java/voldemort/client/rebalance/RebalanceClusterPlan.java
Expand Up @@ -87,18 +87,7 @@ public RebalanceClusterPlan(final Cluster targetCluster,
RebalanceUtils.validateClusterStores(finalCluster, storeDefs);

this.batchPlan = batchPlan();
/*-
for(Node node: finalCluster.getNodes()) {
this.batchPlan.addAll(getRebalancePartitionsInfo(node.getId(), enabledDeletePartition));
}
*/

// TODO: Add some priority ordering somewhere in this class.
// prioritizeBatchPlan();

// TODO: Once plan-level optimization is complete, remove server side
// "optimization" that does not bother to steal partition-stores it
// already hosts. That code will be unnecessary.
}

public Cluster getCurrentCluster() {
Expand Down
7 changes: 7 additions & 0 deletions src/java/voldemort/client/rebalance/RebalanceController.java
Expand Up @@ -46,8 +46,15 @@

import com.google.common.collect.Lists;

// TODO: javadoc header needed
public class RebalanceController {

// TODO: Remove server side "optimization" that does not bother to steal
// partition-stores it already hosts. That code will be unnecessary. This
// also affects AdminClient that has an override of this option. Do not
// complete this work until the atomic metadata update is merged with this
// branch. Otherwise, there will be conflicts on .proto changes.

private static final Logger logger = Logger.getLogger(RebalanceController.class);

private static final DecimalFormat decimalFormatter = new DecimalFormat("#.##");
Expand Down
Expand Up @@ -27,7 +27,7 @@ public RebalanceDonorBasedBatchPlan(RebalanceClusterPlan rebalanceClusterPlan) {
super(rebalanceClusterPlan);

HashMap<Integer, List<RebalancePartitionsInfo>> donorToBatchPlan = new HashMap<Integer, List<RebalancePartitionsInfo>>();
for(RebalancePartitionsInfo info: batchPlan) {
for(RebalancePartitionsInfo info: rebalanceClusterPlan.getBatchPlan()) {
int donorId = info.getDonorId();
if(!donorToBatchPlan.containsKey(donorId)) {
donorToBatchPlan.put(donorId, new ArrayList<RebalancePartitionsInfo>());
Expand Down
52 changes: 47 additions & 5 deletions src/java/voldemort/client/rebalance/RebalancePartitionsInfo.java
Expand Up @@ -19,6 +19,7 @@
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -56,22 +57,29 @@ public class RebalancePartitionsInfo {

private final int stealerId;
private final int donorId;
private HashMap<String, List<Integer>> storeToPartitionIds;

// TODO: (refactor) Unclear what value of the inner Hashmap is. It maps
// "replica type" to lists of partition IDs. A list of partition IDs (per
// store) seems sufficient for all purposes. The replica type is a
// distraction.
@Deprecated
private HashMap<String, HashMap<Integer, List<Integer>>> storeToReplicaToAddPartitionList;
// TODO: (refactor) What value is maxReplica? Seems like it is used in loops
// internally. No idea why it is a member.
@Deprecated
private int maxReplica;
// TODO: (refactor) Does the initialCluster have to be a member? See if it
// can be removed. There is a getInitialCluster method that is called by
// others. Why do callers need the initial cluster from this particular
// class? At first glance, all usages of this method are awkward/unclean
// (i.e., seems like initialCluster could be found through other paths in
// all cases).
@Deprecated
private Cluster initialCluster;

// TODO: pre-"flatten" argument storeToReplicaToAddPArtitionList and then
// drop. Internally, we transform this struct into something much simpler.
/**
* Rebalance Partitions info maintains all information needed for
* rebalancing for a stealer-donor node tuple
Expand All @@ -95,15 +103,26 @@ public RebalancePartitionsInfo(int stealerNodeId,
Cluster initialCluster) {
this.stealerId = stealerNodeId;
this.donorId = donorId;
this.storeToReplicaToAddPartitionList = storeToReplicaToAddPartitionList;
this.maxReplica = 0;

// Find the max replica number
findMaxReplicaType(storeToReplicaToAddPartitionList);
setStoreToReplicaToAddPartitionList(storeToReplicaToAddPartitionList);

this.initialCluster = Utils.notNull(initialCluster);
}

private void flattenStoreToReplicaToAddPartitionListTOStoreToPartitionIds() {
this.storeToPartitionIds = new HashMap<String, List<Integer>>();
for(Entry<String, HashMap<Integer, List<Integer>>> entry: storeToReplicaToAddPartitionList.entrySet()) {
if(!this.storeToPartitionIds.containsKey(entry.getKey())) {
this.storeToPartitionIds.put(entry.getKey(), new LinkedList<Integer>());
}
List<Integer> storesPartitionIds = this.storeToPartitionIds.get(entry.getKey());
for(List<Integer> replicaTypesPartitionIds: entry.getValue().values()) {
storesPartitionIds.addAll(replicaTypesPartitionIds);
}
}
}

// TODO: Get rid of this.
@Deprecated
private void findMaxReplicaType(HashMap<String, HashMap<Integer, List<Integer>>> storeToReplicaToPartitionList) {
for(Entry<String, HashMap<Integer, List<Integer>>> entry: storeToReplicaToPartitionList.entrySet()) {
for(Entry<Integer, List<Integer>> replicaToPartitionList: entry.getValue().entrySet()) {
Expand Down Expand Up @@ -206,6 +225,8 @@ public synchronized int getStealerId() {
return stealerId;
}

// TODO: Get rid of this.
@Deprecated
public synchronized Cluster getInitialCluster() {
return initialCluster;
}
Expand Down Expand Up @@ -233,31 +254,52 @@ public synchronized int getPartitionStoreMoves() {
*
* @return Set of store names
*/
// TODO: Get rid of this.
@Deprecated
public synchronized Set<String> getUnbalancedStoreList() {
return storeToReplicaToAddPartitionList.keySet();
}

// TODO: Get rid of this.
@Deprecated
public synchronized HashMap<String, HashMap<Integer, List<Integer>>> getStoreToReplicaToAddPartitionList() {
return this.storeToReplicaToAddPartitionList;
}

// TODO: Get rid of this.
@Deprecated
public synchronized HashMap<Integer, List<Integer>> getReplicaToAddPartitionList(String storeName) {
return this.storeToReplicaToAddPartitionList.get(storeName);
}

// TODO: Get rid of this.
@Deprecated
public synchronized void setStoreToReplicaToAddPartitionList(HashMap<String, HashMap<Integer, List<Integer>>> storeToReplicaToAddPartitionList) {
this.storeToReplicaToAddPartitionList = storeToReplicaToAddPartitionList;
findMaxReplicaType(storeToReplicaToAddPartitionList);
flattenStoreToReplicaToAddPartitionListTOStoreToPartitionIds();
}

public synchronized void removeStore(String storeName) {
this.storeToReplicaToAddPartitionList.remove(storeName);
this.storeToPartitionIds.remove(storeName);
}

public synchronized List<Integer> getPartitionIds(String storeName) {
return this.storeToPartitionIds.get(storeName);
}

public synchronized void setPartitionIds(String storeName, List<Integer> partitionIds) {
this.storeToPartitionIds.put(storeName, partitionIds);
}

/**
* Gives the list of primary partitions being moved across all stores.
*
* @return List of primary partitions
*/
// TODO: Get rid of this.
@Deprecated
public synchronized List<Integer> getStealMasterPartitions() {
Iterator<HashMap<Integer, List<Integer>>> iter = storeToReplicaToAddPartitionList.values()
.iterator();
Expand Down
Expand Up @@ -27,7 +27,7 @@ public RebalanceStealerBasedBatchPlan(RebalanceClusterPlan rebalanceClusterPlan)
super(rebalanceClusterPlan);

HashMap<Integer, List<RebalancePartitionsInfo>> stealerToBatchPlan = new HashMap<Integer, List<RebalancePartitionsInfo>>();
for(RebalancePartitionsInfo info: batchPlan) {
for(RebalancePartitionsInfo info: rebalanceClusterPlan.getBatchPlan()) {
int stealerId = info.getStealerId();
if(!stealerToBatchPlan.containsKey(stealerId)) {
stealerToBatchPlan.put(stealerId, new ArrayList<RebalancePartitionsInfo>());
Expand Down
45 changes: 43 additions & 2 deletions src/java/voldemort/client/rebalance/RebalanceTypedBatchPlan.java
Expand Up @@ -15,20 +15,34 @@
*/
package voldemort.client.rebalance;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import voldemort.routing.StoreRoutingPlan;
import voldemort.store.StoreDefinition;
import voldemort.utils.Utils;

// TODO: Rename to ExecutableRebalanceBatch
public abstract class RebalanceTypedBatchPlan {

protected final List<RebalancePartitionsInfo> batchPlan;
protected final RebalanceClusterPlan rebalanceClusterPlan;
// Construct store routing plans so that task order can be optimized.
private final Map<String, StoreRoutingPlan> storeToRoutingPlan;

protected final Queue<RebalanceNodePlan> rebalanceTaskQueue;

public RebalanceTypedBatchPlan(final RebalanceClusterPlan rebalanceClusterPlan) {
this.batchPlan = rebalanceClusterPlan.getBatchPlan();
this.rebalanceClusterPlan = rebalanceClusterPlan;
this.storeToRoutingPlan = new HashMap<String, StoreRoutingPlan>();
for(StoreDefinition storeDef: rebalanceClusterPlan.getStoreDefs()) {
this.storeToRoutingPlan.put(storeDef.getName(),
new StoreRoutingPlan(rebalanceClusterPlan.getFinalCluster(),
storeDef));
}

// TODO: Why does this data structure need to be concurrent!? I have
// cut-and-paste this construction from prior code. But, if this needs
Expand All @@ -50,6 +64,33 @@ public Queue<RebalanceNodePlan> getRebalancingTaskQueue() {
return rebalanceTaskQueue;
}

// TODO: Change method name once types have better names.
// TODO: add javadoc
// Take an unorderd list of tasks and order them by zone n-ary for sake of
// prioritizing zone primaries ahead of zone secondaries ahead of ...
protected List<RebalancePartitionsInfo> sortTasks(int nodeId,
List<RebalancePartitionsInfo> tasks) {
int zoneId = rebalanceClusterPlan.getFinalCluster().getNodeById(nodeId).getZoneId();
for(RebalancePartitionsInfo task: tasks) {
Map<Integer, List<RebalancePartitionsInfo>> zoneNaryToTasks = new HashMap<Integer, List<RebalancePartitionsInfo>>();
for(String storeName: storeToRoutingPlan.keySet()) {
StoreRoutingPlan storeRoutingPlan = storeToRoutingPlan.get(storeName);
List<Integer> partitionIds = task.getPartitionIds(storeName);
for(Integer partitionId: partitionIds) {
int zoneNaryType = storeRoutingPlan.getZoneReplicaType(zoneId,
nodeId,
partitionId);
if(!zoneNaryToTasks.containsKey(zoneNaryType)) {
zoneNaryToTasks.put(zoneNaryType, new ArrayList<RebalancePartitionsInfo>());
}
List<RebalancePartitionsInfo> naryTasks = zoneNaryToTasks.get(zoneNaryType);
// naryTasks. .add(partitionId);
}
}
}
return null;
}

@Override
public String toString() {
if(rebalanceTaskQueue.isEmpty()) {
Expand Down

0 comments on commit d9e2623

Please sign in to comment.