Skip to content

Commit

Permalink
Fix RebalanceUtils.getLatestCluster.
Browse files Browse the repository at this point in the history
RebalanceUtils
- changed getLatestCluster to examine contents of cluster.xml on different nodes rather than timestamps since timestamps are incomparable across servers.

AbstractZonedRebalanceTest
- added a specific rebalance test that goes from current -> shuffle and then shuffle -> current. This confirms that rebalance can be invoked repeated times (if need be).
  • Loading branch information
jayjwylie committed Jun 20, 2013
1 parent 0c8f0fa commit 882c01f
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 18 deletions.
26 changes: 8 additions & 18 deletions src/java/voldemort/utils/RebalanceUtils.java
Expand Up @@ -143,24 +143,15 @@ public static HashMap<Integer, List<Integer>> getOptimizedReplicaToPartitionList
public static Versioned<Cluster> getLatestCluster(List<Integer> requiredNodes,
AdminClient adminClient) {
Versioned<Cluster> latestCluster = new Versioned<Cluster>(adminClient.getAdminClientCluster());
ArrayList<Versioned<Cluster>> clusterList = new ArrayList<Versioned<Cluster>>();

clusterList.add(latestCluster);
for(Node node: adminClient.getAdminClientCluster().getNodes()) {
Cluster cluster = latestCluster.getValue();
for(Node node: cluster.getNodes()) {
try {
Versioned<Cluster> versionedCluster = adminClient.metadataMgmtOps.getRemoteCluster(node.getId());
VectorClock newClock = (VectorClock) versionedCluster.getVersion();
if(null != newClock && !clusterList.contains(versionedCluster)) {
// check no two clocks are concurrent.
checkNotConcurrent(clusterList, newClock);

// add to clock list
clusterList.add(versionedCluster);

// update latestClock
Occurred occurred = newClock.compare(latestCluster.getVersion());
if(Occurred.AFTER.equals(occurred))
latestCluster = versionedCluster;
Cluster nodesCluster = adminClient.metadataMgmtOps.getRemoteCluster(node.getId())
.getValue();
if(!nodesCluster.equals(cluster)) {
throw new VoldemortException("Cluster is in inconsistent state because cluster xml on node "
+ node.getId()
+ " does not match cluster xml of adminClient.");
}
} catch(Exception e) {
if(null != requiredNodes && requiredNodes.contains(node.getId()))
Expand All @@ -169,7 +160,6 @@ public static Versioned<Cluster> getLatestCluster(List<Integer> requiredNodes,
logger.info("Failed on node " + node.getId(), e);
}
}

return latestCluster;
}

Expand Down
Expand Up @@ -322,6 +322,52 @@ public void testShuffleZZ() throws Exception {
testZonedRebalance("TestShuffleZZ", zzCurrent, zzShuffle, zzStoresXml, zzStores);
}

@Test(timeout = 600000)
public void testShuffleZZAndShuffleAgain() throws Exception {

logger.info("Starting testShuffleZZAndShuffleAgain");
// Hacky work around of TOCTOU bind Exception issues. Each test that
// invokes this method brings servers up & down on the same ports. The
// OS seems to need a rest between subsequent tests...
Thread.sleep(TimeUnit.SECONDS.toMillis(2));

Cluster interimCluster = RebalanceUtils.getInterimCluster(zzCurrent, zzShuffle);

// start all the servers
List<Integer> serverList = new ArrayList<Integer>(interimCluster.getNodeIds());
Map<String, String> configProps = new HashMap<String, String>();
configProps.put("admin.max.threads", "5");
interimCluster = startServers(interimCluster, zzStoresXml, serverList, configProps);

// Populate cluster with data
for(StoreDefinition storeDef: zzStores) {
populateData(zzCurrent, storeDef);
}

String bootstrapUrl = getBootstrapUrl(interimCluster, 0);
boolean stealerBased = !useDonorBased;

// Shuffle cluster
ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl,
stealerBased,
zzShuffle,
zzStores);
rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, serverList);
checkConsistentMetadata(zzShuffle, serverList);

// Now, go from shuffled state, back to the original to ocnfirm
// subsequent rebalances can be invoked.
rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl,
stealerBased,
zzCurrent,
zzStores);
rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, serverList);
checkConsistentMetadata(zzCurrent, serverList);

// Done.
stopServer(serverList);
}

@Test(timeout = 600000)
public void testClusterExpansion() throws Exception {
testZonedRebalance("TestClusterExpansionZZ",
Expand Down

0 comments on commit 882c01f

Please sign in to comment.