Skip to content

Commit

Permalink
Initial work on RebalancePlanCLI.
Browse files Browse the repository at this point in the history
Extracted the planning logic out of RebalanceController. This will reduce cut-and-paste coding, clarify which arguments are needed to tune planning (versus execution of a plan), and isolate rebalance planning logic from execution logic. There are a ton of TODOs added in this commit. As the refactoring continues, and the controller is changed to use the new RebalancePlan, these TODOs will be addressed. This commit has a working RebalancePlanCLI that handles the use cases rebalance planning has historically handled (rebalance in place and cluster expansion). This commit does not address zone expanison. The output of plan statistics is much richer and includes detailed statistics on cross zone moves (from which zone to which zone) and nodes (how much in and how much out). All these statistics are at the partition-store level which should be more informative than prior statistics. Finally, these statistics include an estimation of storage overhead per-node which can be used to assess how much free disk space is required to execute a plan safely.

OrderedClusterTransition
- TODOs to deprecate this. This logic belongs in the plan, not the execution.

RebalanceClusterPlan
- TODOs for refactoring
- Has logic for doing things "the old way" and the "new way". The new way has simpler data structures nad better encapsulation.
- Many more getters for detailed plan stats

RebalanceNodePlan
- Minor TODOs and some renaming of varaibles

RebalancePartitionsInfo
- A ton of refactoring TODOs
- Fixed all move counting getters to count partition-stores

Node
- getStateString method to pretty print ID, host name and ports.

RepartitionCLI
- more argument handling

PartitionBalance
- getters for partition-store accounting of current cluster+store defs

RebalanceUtils
- Some methods to deprecate
- validation methods for cluster arguments

RebalancePlan
- New class to encapsulate all rebalancing planning

RebalanceTypedBatchPlan
RebalanceDonorBasedBatchPlan
RebalanceStealerBasedBatchPlan
- Sub-classes of RebalancePlan that are needed for converting static plan into executable plan

RebalancePlanCLI
- CLI for generating a plan

MoveMap & MoveMapTest
- 2-d map counter used in partitoin-store accounting
  • Loading branch information
jayjwylie committed Jun 20, 2013
1 parent 0e17e99 commit aee0639
Show file tree
Hide file tree
Showing 15 changed files with 1,222 additions and 44 deletions.
Expand Up @@ -13,6 +13,9 @@

import com.google.common.collect.Lists;

// TODO: (refactor) Ordering of rebalancing tasks ought to be solely the
// province of RebalancePlan. Remove or deprecate this class whenever
// RebalancePlan has wholly replaced current planning code.
/**
* Ordered representation of a cluster transition that guarantees that primary
* partition movements will take place before replicas.
Expand All @@ -21,6 +24,9 @@
*/
public class OrderedClusterTransition {

// TODO: (refactor) What is the value of this idGen member? It is only used
// in print outs? If it has no value, then remove it. (Same with member
// 'id')
private static final AtomicInteger idGen = new AtomicInteger(0);
private final Cluster currentCluster;
private final Cluster targetCluster;
Expand Down
170 changes: 139 additions & 31 deletions src/java/voldemort/client/rebalance/RebalanceClusterPlan.java
Expand Up @@ -14,15 +14,26 @@
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.store.StoreDefinition;
import voldemort.utils.MoveMap;
import voldemort.utils.Pair;
import voldemort.utils.RebalanceUtils;
import voldemort.utils.Utils;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

// TODO: (refactor) Rename RebalanceClusterPlan to RebalanceBatchPlan
// TODO: (refactor) Rename targetCluster -> finalCluster
// TODO: (refactor) Rename currentCluster -> targetCluster
// TODO: (refactor) Fix cluster nomenclature in general: make sure there are
// exactly three prefixes used to distinguish cluster xml: initial or current,
// target or spec or expanded, and final. 'target' is overloaded to mean
// spec/expanded or final depending on context.
// TODO: Remove stealerBased boolean argument from constructor. If a
// stealer-based or donor-based plan is needed, then either
// RebalanceStealerBasedBatchPlan or RebalanceDonorBasedBatchPlan should be
// constructed.
/**
* Compares the current cluster configuration with the target cluster
* configuration and generates a plan to move the partitions. The plan can be
Expand All @@ -39,6 +50,12 @@
*/
public class RebalanceClusterPlan {

private final Cluster currentCluster;
private final Cluster targetCluster;

protected final List<RebalancePartitionsInfo> batchPlan;

@Deprecated
private final Queue<RebalanceNodePlan> rebalanceTaskQueue;

/**
Expand Down Expand Up @@ -86,7 +103,11 @@ public RebalanceClusterPlan(final Cluster currentCluster,
final List<StoreDefinition> storeDefs,
final boolean enabledDeletePartition,
final boolean isStealerBased) {
this.rebalanceTaskQueue = new ConcurrentLinkedQueue<RebalanceNodePlan>();
this.currentCluster = currentCluster;
this.targetCluster = targetCluster;

this.batchPlan = Lists.newArrayList();

this.currentAllStoresNodeIdToAllPartitionTuples = Maps.newHashMap();
this.targetAllStoresNodeIdToAllPartitionTuples = Maps.newHashMap();

Expand All @@ -104,34 +125,129 @@ public RebalanceClusterPlan(final Cluster currentCluster,
+ ") not equal to Target cluster ("
+ targetCluster.getNumberOfNodes() + ") ]");

HashMultimap<Integer, RebalancePartitionsInfo> rebalancePartitionList = HashMultimap.create();
for(Node node: targetCluster.getNodes()) {
for(RebalancePartitionsInfo info: getRebalancePartitionsInfo(currentCluster,
targetCluster,
storeDefs,
node.getId(),
enabledDeletePartition)) {
if(isStealerBased) {
rebalancePartitionList.put(info.getStealerId(), info);
} else {
rebalancePartitionList.put(info.getDonorId(), info);
}
this.batchPlan.addAll(getRebalancePartitionsInfo(currentCluster,
targetCluster,
storeDefs,
node.getId(),
enabledDeletePartition));
}

prioritizeBatchPlan();

// TODO: (begin) Remove this redundant code once the
// getRebalancingTaskQueue method is actually removed from this class.
// This method must be retained until the rebalance controlelr is
// switched over to use RebalancePlan.
this.rebalanceTaskQueue = new ConcurrentLinkedQueue<RebalanceNodePlan>();
HashMap<Integer, List<RebalancePartitionsInfo>> nodeToBatchPlan = new HashMap<Integer, List<RebalancePartitionsInfo>>();
for(RebalancePartitionsInfo info: batchPlan) {
int nodeId = info.getDonorId();
if(isStealerBased) {
nodeId = info.getStealerId();
}
if(!nodeToBatchPlan.containsKey(nodeId)) {
nodeToBatchPlan.put(nodeId, new ArrayList<RebalancePartitionsInfo>());
}
nodeToBatchPlan.get(nodeId).add(info);
}

// Populate the rebalance task queue
for(int nodeId: rebalancePartitionList.keySet()) {
rebalanceTaskQueue.offer(new RebalanceNodePlan(nodeId,
Lists.newArrayList(rebalancePartitionList.get(nodeId)),
isStealerBased));
for(int nodeId: nodeToBatchPlan.keySet()) {
this.rebalanceTaskQueue.offer(new RebalanceNodePlan(nodeId,
Lists.newArrayList(nodeToBatchPlan.get(nodeId)),
isStealerBased));
}
// TODO: (end) Remove ...
}

public Cluster getCurrentCluster() {
return currentCluster;
}

public Cluster getTargetCluster() {
return targetCluster;
}

@Deprecated
public Queue<RebalanceNodePlan> getRebalancingTaskQueue() {
return rebalanceTaskQueue;
}

public MoveMap getZoneMoveMap() {
MoveMap moveMap = new MoveMap(targetCluster.getZoneIds());

for(RebalancePartitionsInfo info: batchPlan) {
int fromZoneId = targetCluster.getNodeById(info.getDonorId()).getZoneId();
int toZoneId = targetCluster.getNodeById(info.getStealerId()).getZoneId();
moveMap.add(fromZoneId, toZoneId, info.getPartitionStoreMoves());
}

return moveMap;
}

public MoveMap getNodeMoveMap() {
MoveMap moveMap = new MoveMap(targetCluster.getNodeIds());

for(RebalancePartitionsInfo info: batchPlan) {
moveMap.add(info.getDonorId(), info.getStealerId(), info.getPartitionStoreMoves());
}

return moveMap;
}

public int getCrossZonePartitionStoreMoves() {
int xzonePartitionStoreMoves = 0;
for(RebalancePartitionsInfo info: batchPlan) {
Node donorNode = targetCluster.getNodeById(info.getDonorId());
Node stealerNode = targetCluster.getNodeById(info.getStealerId());

if(donorNode.getZoneId() != stealerNode.getZoneId()) {
xzonePartitionStoreMoves += info.getPartitionStoreMoves();
}
}

return xzonePartitionStoreMoves;
}

/**
* Return the total number of partition-store moves
*
* @return Number of moves
*/
public int getPartitionStoreMoves() {
int partitionStoreMoves = 0;

for(RebalancePartitionsInfo info: batchPlan) {
partitionStoreMoves += info.getPartitionStoreMoves();
}

return partitionStoreMoves;
}

/**
* Prioritize the batchPlan such that primary partitions are ordered ahead
* or other nary partitions in the list.
*/
private void prioritizeBatchPlan() {
// TODO: Steal logic from "ORderedClusterTransition" and put it here.
// The RebalancePlan ought to specify the priority ordering and so place
// primary moves ahead of nary moves.

// TODO: Fix this to use zonePrimary rather than just primary. Need to
// rebase with master to pick up appropriate helper methods first
// though.

// NO-OP

/*-
* Start of code to implement the basics.
HashMap<Integer, List<RebalancePartitionsInfo>> naryToBatchPlan = new HashMap<Integer, List<RebalancePartitionsInfo>>();
for(RebalancePartitionsInfo info: batchPlan) {
}
List<RebalancePartitionsInfo> infos = Lists.newArrayList(batchPlan);
*/
}

/**
* Generate the list of partition movement based on 2 principles:
*
Expand Down Expand Up @@ -342,23 +458,15 @@ private boolean haveFinishedPartitions(Set<Pair<Integer, Integer>> set) {

@Override
public String toString() {
if(rebalanceTaskQueue.isEmpty()) {
return "No rebalancing required since rebalance task is empty";
if(batchPlan == null || batchPlan.isEmpty()) {
return "No rebalancing required since batch plan is empty";
}

StringBuilder builder = new StringBuilder();
builder.append("Cluster Rebalancing Plan : ").append(Utils.NEWLINE);
builder.append("Rebalancing Batch Plan : ").append(Utils.NEWLINE);

if(rebalanceTaskQueue == null || rebalanceTaskQueue.isEmpty()) {
return "";
}

for(RebalanceNodePlan nodePlan: rebalanceTaskQueue) {
builder.append((nodePlan.isNodeStealer() ? "Stealer " : "Donor ") + "Node "
+ nodePlan.getNodeId());
for(RebalancePartitionsInfo rebalancePartitionsInfo: nodePlan.getRebalanceTaskList()) {
builder.append(rebalancePartitionsInfo).append(Utils.NEWLINE);
}
for(RebalancePartitionsInfo rebalancePartitionsInfo: batchPlan) {
builder.append(rebalancePartitionsInfo).append(Utils.NEWLINE);
}

return builder.toString();
Expand Down
@@ -0,0 +1,50 @@
/*
* Copyright 2013 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package voldemort.client.rebalance;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import voldemort.cluster.Cluster;
import voldemort.store.StoreDefinition;

import com.google.common.collect.Lists;

public class RebalanceDonorBasedBatchPlan extends RebalanceTypedBatchPlan {

public RebalanceDonorBasedBatchPlan(final Cluster currentCluster,
final Cluster targetCluster,
final List<StoreDefinition> storeDefs,
final boolean enabledDeletePartition) {
super(currentCluster, targetCluster, storeDefs, enabledDeletePartition);

HashMap<Integer, List<RebalancePartitionsInfo>> donorToBatchPlan = new HashMap<Integer, List<RebalancePartitionsInfo>>();
for(RebalancePartitionsInfo info: batchPlan) {
int donorId = info.getDonorId();
if(!donorToBatchPlan.containsKey(donorId)) {
donorToBatchPlan.put(donorId, new ArrayList<RebalancePartitionsInfo>());
}
donorToBatchPlan.get(donorId).add(info);
}

for(int donorId: donorToBatchPlan.keySet()) {
rebalanceTaskQueue.offer(new RebalanceNodePlan(donorId,
Lists.newArrayList(donorToBatchPlan.get(donorId)),
false));
}
}
}
12 changes: 7 additions & 5 deletions src/java/voldemort/client/rebalance/RebalanceNodePlan.java
Expand Up @@ -2,6 +2,8 @@

import java.util.List;

// TODO: (refactor) Rename this class. Maybe RebalanceNodeBatchPlan or some
// such. This method wraps up all tasks from a single batch for a specific node.
/**
* This class acts as a container for the rebalancing plans for one particular
* node ( either donor or stealer depending on flag ). Can be one of the
Expand All @@ -17,19 +19,19 @@
public class RebalanceNodePlan {

private final int nodeId;
private final boolean isNodeStealer;
private final boolean isStealer;
private final List<RebalancePartitionsInfo> rebalanceTaskList;

public RebalanceNodePlan(int nodeId,
List<RebalancePartitionsInfo> rebalanceTaskList,
boolean isNodeStealer) {
boolean isStealer) {
this.nodeId = nodeId;
this.rebalanceTaskList = rebalanceTaskList;
this.isNodeStealer = isNodeStealer;
this.isStealer = isStealer;
}

public boolean isNodeStealer() {
return this.isNodeStealer;
public boolean isStealer() {
return this.isStealer;
}

public int getNodeId() {
Expand Down

0 comments on commit aee0639

Please sign in to comment.