Skip to content

Commit

Permalink
Add 'proxy pause' between cluster change & rebalance
Browse files Browse the repository at this point in the history
The proxy pause is a window during which clients can pick up the new
cluster metadata, and servers can establish proxy bridges, before
servers start moving data around for rebalancing. This allows us to
observe the cost of proxying separate from rebalance and allows clients
to pick up new metadata before any data is moved.

Also added some more TODOs about documenting the various rebalanceState
methods...
  • Loading branch information
jayjwylie committed Jun 20, 2013
1 parent d7f81d7 commit c9b370e
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 14 deletions.
26 changes: 16 additions & 10 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -2392,29 +2392,35 @@ public Versioned<RebalancerState> getRemoteRebalancerState(int nodeId) {
value.getVersion());
}

// TODO: The "Order" column makes no sense to me. This is very, very
// hard to grok. I want to prefix each entry with in the order column
// with a number (e.g., '0: rebalance', '1: cluster->Swap', ...) but
// have no idea what the order is. Are they in order in the table? what
// do the arrows mean in the order column?

/**
* Used in rebalancing to indicate change in states. Groups the
* partition plans on the basis of stealer nodes and sends them over.
*
* The various combinations and their order of execution is given below
*
* <pre>
* | swapRO | changeClusterMetadata | changeRebalanceState | Order |
* | f | t | t | cluster -> rebalance |
* | f | f | t | rebalance |
* | t | t | f | cluster -> swap |
* | t | t | t | cluster -> swap -> rebalance |
* | swapRO | changeClusterMetadata | changeRebalanceState | Order |
* | f | t | t | cluster -> rebalance |
* | f | f | t | rebalance |
* | t | t | f | cluster -> swap |
* | t | t | t | cluster -> swap -> rebalance |
* </pre>
*
*
* Similarly for rollback:
*
* <pre>
* | swapRO | changeClusterMetadata | changeRebalanceState | Order |
* | f | t | t | remove from rebalance -> cluster |
* | f | f | t | remove from rebalance |
* | t | t | f | cluster -> swap |
* | t | t | t | remove from rebalance -> cluster -> swap |
* | swapRO | changeClusterMetadata | changeRebalanceState | Order |
* | f | t | t | remove from rebalance -> cluster |
* | f | f | t | remove from rebalance |
* | t | t | f | cluster -> swap |
* | t | t | t | remove from rebalance -> cluster -> swap |
* </pre>
*
*
Expand Down
24 changes: 23 additions & 1 deletion src/java/voldemort/client/rebalance/RebalanceController.java
Expand Up @@ -69,15 +69,18 @@ public class RebalanceController {
public final static int MAX_PARALLEL_REBALANCING = 1;
public final static int MAX_TRIES_REBALANCING = 2;
public final static boolean STEALER_BASED_REBALANCING = true;
public final static long PROXY_PAUSE_IN_SECONDS = TimeUnit.MINUTES.toSeconds(5);

private final int maxParallelRebalancing;
private final int maxTriesRebalancing;
private final boolean stealerBasedRebalancing;
private final long proxyPauseS;

public RebalanceController(String bootstrapUrl,
int maxParallelRebalancing,
int maxTriesRebalancing,
boolean stealerBased) {
boolean stealerBased,
long proxyPauseS) {
this.adminClient = new AdminClient(bootstrapUrl,
new AdminClientConfig(),
new ClientConfig());
Expand All @@ -88,6 +91,7 @@ public RebalanceController(String bootstrapUrl,
this.maxParallelRebalancing = maxParallelRebalancing;
this.maxTriesRebalancing = maxTriesRebalancing;
this.stealerBasedRebalancing = stealerBased;
this.proxyPauseS = proxyPauseS;
}

/**
Expand Down Expand Up @@ -353,6 +357,7 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) {

// STEP 2 - Move RO data
if(hasReadOnlyStores) {
proxyPause();
executeSubBatch(batchCount,
batchCurrentCluster,
batchCurrentStoreDefs,
Expand All @@ -379,6 +384,9 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) {

// STEP 4 - Move RW data
if(hasReadWriteStores) {
if(!hasReadOnlyStores) {
proxyPause();
}
executeSubBatch(batchCount,
batchCurrentCluster,
batchCurrentStoreDefs,
Expand All @@ -398,6 +406,20 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) {
}
}

/**
* Pause between cluster change in metadata and starting server rebalancing
* work.
*/
private void proxyPause() {
logger.info("Pausing after cluster state has changed to allow proxy bridges to be established. "
+ "Will start rebalancing work on servers in " + proxyPauseS + " seconds.");
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(proxyPauseS));
} catch(InterruptedException e) {
logger.warn("Sleep interrupted in proxy pause.");
}
}

/**
*
* Perform a group of state change actions. Also any errors + rollback
Expand Down
7 changes: 7 additions & 0 deletions src/java/voldemort/server/VoldemortServer.java
Expand Up @@ -91,6 +91,13 @@ public VoldemortServer(VoldemortConfig config) {
this.services = createServices();
}

/**
* Constructor is used exclusively by tests. I.e., this is not a code path
* that is exercised in production.
*
* @param config
* @param cluster
*/
public VoldemortServer(VoldemortConfig config, Cluster cluster) {
super(ServiceType.VOLDEMORT);
this.voldemortConfig = config;
Expand Down
16 changes: 15 additions & 1 deletion src/java/voldemort/tools/RebalanceControllerCLI.java
Expand Up @@ -64,6 +64,13 @@ private static void setupParser() {
.withRequiredArg()
.ofType(Integer.class)
.describedAs("parallelism");
parser.accepts("proxy-pause",
"Time, in seconds, to pause between changing cluster metadata and starting rebalance tasks on server. [ Default:"
+ RebalanceController.PROXY_PAUSE_IN_SECONDS + " ]")
.withRequiredArg()
.ofType(Long.class)
.describedAs("proxy pause");

parser.accepts("final-cluster", "Path to target cluster xml")
.withRequiredArg()
.describedAs("cluster.xml");
Expand Down Expand Up @@ -96,6 +103,7 @@ private static void printUsage() {
help.append(" Optional:\n");
help.append(" --final-stores <storesXML> [ Needed for zone expansion ]\n");
help.append(" --parallelism <parallelism> [ Number of rebalancing tasks to run in parallel ]");
help.append(" --proxy-pause <proxyPause> [ Seconds to pause between cluster change and server-side rebalancing tasks ]");
help.append(" --tries <tries> [ Number of times to try starting an async rebalancing task on a node ");
help.append(" --output-dir [ Output directory in which plan is stored ]\n");
help.append(" --batch <batch> [ Number of primary partitions to move in each rebalancing batch. ]\n");
Expand Down Expand Up @@ -159,10 +167,16 @@ public static void main(String[] args) throws Exception {
tries = (Integer) options.valueOf("tries");
}

long proxyPauseS = RebalanceController.PROXY_PAUSE_IN_SECONDS;
if(options.has("proxy-pause")) {
proxyPauseS = (Long) options.valueOf("proxy-pause");
}

RebalanceController rebalanceController = new RebalanceController(bootstrapURL,
parallelism,
tries,
stealerBased);
stealerBased,
proxyPauseS);

Cluster currentCluster = rebalanceController.getCurrentCluster();
List<StoreDefinition> currentStoreDefs = rebalanceController.getCurrentStoreDefs();
Expand Down
11 changes: 9 additions & 2 deletions test/common/voldemort/ClusterTestUtils.java
Expand Up @@ -41,6 +41,8 @@

public class ClusterTestUtils {

public static long REBALANCE_CONTROLLER_TEST_PROXY_PAUSE_IN_SECONDS = 5;

public static List<StoreDefinition> getZZ111StoreDefs(String storageType) {

List<StoreDefinition> storeDefs = new LinkedList<StoreDefinition>();
Expand Down Expand Up @@ -664,11 +666,13 @@ public static RebalanceKit getRebalanceKit(String bootstrapUrl,
int maxParallel,
int maxTries,
boolean stealerBased,
long proxyPauseS,
Cluster finalCluster) {
RebalanceController rebalanceController = new RebalanceController(bootstrapUrl,
maxParallel,
maxTries,
stealerBased);
stealerBased,
proxyPauseS);
RebalancePlan rebalancePlan = rebalanceController.getPlan(finalCluster,
RebalancePlan.BATCH_SIZE);

Expand All @@ -682,6 +686,7 @@ public static RebalanceKit getRebalanceKit(String bootstrapUrl,
RebalanceController.MAX_PARALLEL_REBALANCING,
RebalanceController.MAX_TRIES_REBALANCING,
stealerBased,
REBALANCE_CONTROLLER_TEST_PROXY_PAUSE_IN_SECONDS,
finalCluster);
}

Expand All @@ -693,6 +698,7 @@ public static RebalanceKit getRebalanceKit(String bootstrapUrl,
maxParallel,
RebalanceController.MAX_TRIES_REBALANCING,
stealerBased,
REBALANCE_CONTROLLER_TEST_PROXY_PAUSE_IN_SECONDS,
finalCluster);
}

Expand All @@ -703,7 +709,8 @@ public static RebalanceKit getRebalanceKit(String bootstrapUrl,
RebalanceController rebalanceController = new RebalanceController(bootstrapUrl,
RebalanceController.MAX_PARALLEL_REBALANCING,
RebalanceController.MAX_TRIES_REBALANCING,
stealerBased);
stealerBased,
REBALANCE_CONTROLLER_TEST_PROXY_PAUSE_IN_SECONDS);
RebalancePlan rebalancePlan = rebalanceController.getPlan(finalCluster,
finalStoreDefs,
RebalancePlan.BATCH_SIZE);
Expand Down

0 comments on commit c9b370e

Please sign in to comment.