Skip to content

Commit

Permalink
Switched to new RebalanceController based on RebalancePlan and droppe…
Browse files Browse the repository at this point in the history
…d tons of now deprecated code.

Notes:
- delete of partition-stores during rebalancing has been removed and is no longer an option.
- the old RebalanceCLI now only offers the entropy tool features. cannot be dropped until SREs are OK with KeySampler/KeyVersionFetcher tool chain for verifying rebalances.
- Dropped RebalanceClientConfig. Configuration parameters are now explicit in pertinent constructors.
- Tests are failing or disabled:
  - donor-based rebalancing is disabled in test suites. Does not currently work with new rebalancing.
  - RebalanceClusterPlanTest tests mostly fail at this time.
  - RebalanceTest.serverSideRouting[1] may timeout
  - ZonedRebalanceTest.testProxyGetDuringRebalancing[1] may timeout

OrderedClusterTransition
- many small changes to work with all other changes
- this class is to-be-deprecated regardless

RebalanceClusterPlan
- removed tons of deprecated code that permitted old controller to continue to work

RebalanceController
- Now uses RebalancePlan
- No longer offers delete-after-rebalancing feature
- Dropped tons of deprecated code...

RebalanceTypedBatchPlan
RebalanceDonorBasedBatchPlan
RebalanceStealerBasedBatchPlan
- no longer extends RebalanceClusterPlan
- the classes that make a generic plan executable as either stealer- or donor-based
- still need to be renamed so purpose of these key classes is more clear

RebalancePlanCLI
RebalanceControllerCLI
- moderate clean up

RebalanceUtils
- dropped unnecessary helper mehods

*Test
- Signficant clean up:
  - use Rebalance*BatchPlan and RebalancePlan
  - don't check keys deleted since no longer supported
  • Loading branch information
jayjwylie committed Jun 20, 2013
1 parent 79ca1e0 commit 70f8259
Show file tree
Hide file tree
Showing 19 changed files with 188 additions and 1,749 deletions.
18 changes: 13 additions & 5 deletions src/java/voldemort/client/rebalance/OrderedClusterTransition.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class OrderedClusterTransition {
private static final AtomicInteger idGen = new AtomicInteger(0);
private final Cluster currentCluster;
private final Cluster targetCluster;
private final RebalanceClusterPlan rebalanceClusterPlan;
private final RebalanceTypedBatchPlan rebalanceClusterPlan;
private final List<RebalancePartitionsInfo> orderedRebalancePartitionsInfoList;
private final List<StoreDefinition> storeDefs;
private String printedContent;
Expand All @@ -40,7 +40,7 @@ public class OrderedClusterTransition {
public OrderedClusterTransition(final Cluster currentCluster,
final Cluster targetCluster,
List<StoreDefinition> storeDefs,
final RebalanceClusterPlan rebalanceClusterPlan) {
final RebalanceTypedBatchPlan rebalanceClusterPlan) {
this.id = idGen.incrementAndGet();
this.currentCluster = currentCluster;
this.targetCluster = targetCluster;
Expand All @@ -49,11 +49,17 @@ public OrderedClusterTransition(final Cluster currentCluster,
this.orderedRebalancePartitionsInfoList = orderedClusterPlan(rebalanceClusterPlan);
}

public OrderedClusterTransition(final RebalanceClusterPlan rebalanceClusterPlan) {
@Deprecated
public OrderedClusterTransition(final RebalanceTypedBatchPlan rebalanceClusterPlan) {
this.id = idGen.incrementAndGet();
/*-
this.currentCluster = rebalanceClusterPlan.getCurrentCluster();
this.targetCluster = rebalanceClusterPlan.getFinalCluster();
this.storeDefs = rebalanceClusterPlan.getStoreDefs();
*/
this.currentCluster = null;
this.targetCluster = null;
this.storeDefs = null;
this.rebalanceClusterPlan = rebalanceClusterPlan;
this.orderedRebalancePartitionsInfoList = orderedClusterPlan(rebalanceClusterPlan);
}
Expand Down Expand Up @@ -94,10 +100,12 @@ public String toString() {
getStoreDefs(),
keys))
.append(Utils.NEWLINE);
/*-
sb.append("- Partition distribution : ")
.append(Utils.NEWLINE)
.append(getRebalanceClusterPlan().printPartitionDistribution())
.append(Utils.NEWLINE);
*/
sb.append("- Ordered rebalance node plan : ")
.append(Utils.NEWLINE)
.append(printRebalanceNodePlan(getOrderedRebalancePartitionsInfoList()));
Expand All @@ -114,7 +122,7 @@ private String printRebalanceNodePlan(List<RebalancePartitionsInfo> rebalancePar
return builder.toString();
}

private RebalanceClusterPlan getRebalanceClusterPlan() {
private RebalanceTypedBatchPlan getRebalanceClusterPlan() {
return rebalanceClusterPlan;
}

Expand All @@ -125,7 +133,7 @@ private RebalanceClusterPlan getRebalanceClusterPlan() {
* @param rebalanceClusterPlan Rebalance cluster plan
* @return Returns a list of ordered rebalance node plan
*/
private List<RebalancePartitionsInfo> orderedClusterPlan(final RebalanceClusterPlan rebalanceClusterPlan) {
private List<RebalancePartitionsInfo> orderedClusterPlan(final RebalanceTypedBatchPlan rebalanceClusterPlan) {
Queue<RebalanceNodePlan> rebalancingTaskQueue = rebalanceClusterPlan.getRebalancingTaskQueue();

List<RebalancePartitionsInfo> clusterRebalancePartitionsInfos = Lists.newArrayList();
Expand Down
167 changes: 28 additions & 139 deletions src/java/voldemort/client/rebalance/RebalanceCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@

import com.google.common.base.Joiner;

// TODO: Drop this tool once SREs are comfortable with KeySampler and
// KeyVersionFetcher tool chain.
@Deprecated
public class RebalanceCLI {

private final static int SUCCESS_EXIT_CODE = 0;
Expand All @@ -46,33 +49,15 @@ public class RebalanceCLI {

public static void main(String[] args) throws Exception {
int exitCode = ERROR_EXIT_CODE;
RebalanceController rebalanceController = null;
try {
OptionParser parser = new OptionParser();
parser.accepts("help", "Print usage information");
parser.accepts("current-cluster", "Path to current cluster xml")
.withRequiredArg()
.describedAs("cluster.xml");
parser.accepts("target-cluster", "Path to target cluster xml")
.withRequiredArg()
.describedAs("cluster.xml");
parser.accepts("current-stores", "Path to store definition xml")
.withRequiredArg()
.describedAs("stores.xml");
parser.accepts("url", "Url to bootstrap from ").withRequiredArg().describedAs("url");
parser.accepts("parallelism",
"Number of rebalances to run in parallel [ Default:"
+ RebalanceClientConfig.MAX_PARALLEL_REBALANCING + " ]")
.withRequiredArg()
.ofType(Integer.class)
.describedAs("parallelism");
parser.accepts("tries",
"(1) Tries during rebalance [ Default: "
+ RebalanceClientConfig.MAX_TRIES_REBALANCING
+ " ] (2) Number of tries while generating new metadata")
.withRequiredArg()
.ofType(Integer.class)
.describedAs("num-tries");
parser.accepts("entropy",
"True - if we want to run the entropy calculator. False - if we want to store keys")
.withRequiredArg()
Expand All @@ -83,34 +68,12 @@ public static void main(String[] args) throws Exception {
.withRequiredArg()
.ofType(String.class)
.describedAs("path");
parser.accepts("delete",
"Delete after rebalancing (Valid only for RW Stores) [ Default : false ] ");
parser.accepts("show-plan",
"Shows the rebalancing plan only without executing the rebalance");
parser.accepts("keys",
"The number of keys to use for entropy calculation [ Default : "
+ Entropy.DEFAULT_NUM_KEYS + " ]")
.withRequiredArg()
.ofType(Long.class)
.describedAs("num-keys");
parser.accepts("timeout",
"Time-out in seconds for rebalancing of a single task ( stealer - donor tuple ) [ Default : "
+ RebalanceClientConfig.REBALANCING_CLIENT_TIMEOUT_SEC + " ]")
.withRequiredArg()
.ofType(Long.class)
.describedAs("sec");
parser.accepts("batch",
"Number of primary partitions to move together [ Default : "
+ RebalanceClientConfig.PRIMARY_PARTITION_BATCH_SIZE + " ]")
.withRequiredArg()
.ofType(Integer.class)
.describedAs("num-primary-partitions");
parser.accepts("stealer-based",
"Run the rebalancing from the stealer node's perspective [ Default : "
+ RebalanceClientConfig.STEALER_BASED_REBALANCING + " ]")
.withRequiredArg()
.ofType(Boolean.class)
.describedAs("boolean");
parser.accepts("verbose-logging",
"Verbose logging such as keys found missing on specific nodes during post-rebalancing entropy verification");

Expand All @@ -121,122 +84,48 @@ public static void main(String[] args) throws Exception {
System.exit(HELP_EXIT_CODE);
}

boolean deleteAfterRebalancing = options.has("delete");
int parallelism = CmdUtils.valueOf(options,
"parallelism",
RebalanceClientConfig.MAX_PARALLEL_REBALANCING);
int maxTriesRebalancing = CmdUtils.valueOf(options,
"tries",
RebalanceClientConfig.MAX_TRIES_REBALANCING);
boolean enabledShowPlan = options.has("show-plan");
long rebalancingTimeoutSeconds = CmdUtils.valueOf(options,
"timeout",
RebalanceClientConfig.REBALANCING_CLIENT_TIMEOUT_SEC);
int primaryPartitionBatchSize = CmdUtils.valueOf(options,
"batch",
RebalanceClientConfig.PRIMARY_PARTITION_BATCH_SIZE);
boolean stealerBasedRebalancing = CmdUtils.valueOf(options,
"stealer-based",
RebalanceClientConfig.STEALER_BASED_REBALANCING);

RebalanceClientConfig config = new RebalanceClientConfig();
config.setMaxParallelRebalancing(parallelism);
config.setDeleteAfterRebalancingEnabled(deleteAfterRebalancing);
config.setEnableShowPlan(enabledShowPlan);
config.setMaxTriesRebalancing(maxTriesRebalancing);
config.setRebalancingClientTimeoutSeconds(rebalancingTimeoutSeconds);
config.setPrimaryPartitionBatchSize(primaryPartitionBatchSize);
config.setStealerBasedRebalancing(stealerBasedRebalancing);

if(options.has("output-dir")) {
config.setOutputDirectory((String) options.valueOf("output-dir"));
// Entropy tool

Set<String> missing = CmdUtils.missing(options,
"entropy",
"output-dir",
"current-cluster",
"current-stores");
if(missing.size() > 0) {
System.err.println("Missing required arguments: " + Joiner.on(", ").join(missing));
printHelp(System.err, parser);
System.exit(ERROR_EXIT_CODE);
}

if(options.has("url")) {
// Old rebalancing controller

if(!options.has("target-cluster")) {
System.err.println("Missing required arguments: target-cluster");
printHelp(System.err, parser);
System.exit(ERROR_EXIT_CODE);
}

String targetClusterXML = (String) options.valueOf("target-cluster");
Cluster targetCluster = new ClusterMapper().readCluster(new File(targetClusterXML));

// Normal execution of rebalancing
String bootstrapURL = (String) options.valueOf("url");
rebalanceController = new RebalanceController(bootstrapURL, config);
rebalanceController.rebalance(targetCluster);

} else {
// Entropy tool
Set<String> missing = CmdUtils.missing(options,
"entropy",
"output-dir",
"current-cluster",
"current-stores");
if(missing.size() > 0) {
System.err.println("Missing required arguments: "
+ Joiner.on(", ").join(missing));
printHelp(System.err, parser);
System.exit(ERROR_EXIT_CODE);
}

String currentClusterXML = (String) options.valueOf("current-cluster");
String currentStoresXML = (String) options.valueOf("current-stores");

Cluster currentCluster = new ClusterMapper().readCluster(new File(currentClusterXML));
List<StoreDefinition> storeDefs = new StoreDefinitionsMapper().readStoreList(new File(currentStoresXML));

boolean entropy = (Boolean) options.valueOf("entropy");
boolean verbose = options.has("verbose-logging");
long numKeys = CmdUtils.valueOf(options, "keys", Entropy.DEFAULT_NUM_KEYS);
Entropy generator = new Entropy(-1, numKeys, verbose);
generator.generateEntropy(currentCluster,
storeDefs,
new File(config.getOutputDirectory()),
entropy);
}
String currentClusterXML = (String) options.valueOf("current-cluster");
String currentStoresXML = (String) options.valueOf("current-stores");

Cluster currentCluster = new ClusterMapper().readCluster(new File(currentClusterXML));
List<StoreDefinition> storeDefs = new StoreDefinitionsMapper().readStoreList(new File(currentStoresXML));
String outputDir = (String) options.valueOf("output-dir");

boolean entropy = (Boolean) options.valueOf("entropy");
boolean verbose = options.has("verbose-logging");
long numKeys = CmdUtils.valueOf(options, "keys", Entropy.DEFAULT_NUM_KEYS);
Entropy generator = new Entropy(-1, numKeys, verbose);
generator.generateEntropy(currentCluster, storeDefs, new File(outputDir), entropy);

if(logger.isInfoEnabled()) {
logger.info("Successfully terminated rebalance all tasks");
logger.info("Successfully completed entropy check.");
}
exitCode = SUCCESS_EXIT_CODE;

} catch(VoldemortException e) {
logger.error("Unsuccessfully terminated rebalance operation - " + e.getMessage(), e);
logger.error("Entropy check unsuccessfull- " + e.getMessage(), e);
} catch(Throwable e) {
logger.error(e.getMessage(), e);
} finally {
if(rebalanceController != null) {
try {
rebalanceController.stop();
} catch(Exception e) {}
}
}
System.exit(exitCode);
}

public static void printHelp(PrintStream stream, OptionParser parser) throws IOException {
stream.println("Commands supported");
stream.println("------------------");
stream.println();
stream.println("REBALANCE (RUN PROCESS)");
stream.println("a) --url <url> --target-cluster <path> [ Run the actual rebalancing process ] ");

stream.println();
stream.println("REBALANCE (GENERATE PLAN)");
stream.println("b) --current-cluster <path> --current-stores <path> --target-cluster <path>");
stream.println("\t (1) --delete [ Will delete the data after rebalancing ]");
stream.println("\t (2) --show-plan [ Will generate only the plan ]");
stream.println("\t (3) --output-dir [ Path to output dir where we store intermediate metadata ]");
stream.println("\t (4) --parallelism [ Number of parallel stealer - donor node tasks to run in parallel ] ");
stream.println("\t (5) --tries [ Number of times we try to move the data before declaring failure ]");
stream.println("\t (6) --timeout [ Timeout in seconds for one rebalancing task ( stealer - donor tuple ) ]");
stream.println("\t (7) --batch [ Number of primary partitions to move together ]");
stream.println("\t (8) --stealer-based false [ Run the rebalancing from the donor's perspective ]");

stream.println();
stream.println("ENTROPY");
stream.println("a) --current-cluster <path> --current-stores <path> --entropy <true / false> --output-dir <path> [ Runs the entropy calculator if "
Expand Down
Loading

0 comments on commit 70f8259

Please sign in to comment.