Skip to content

Commit

Permalink
Fix some bugs I introduced and added more TODOs
Browse files Browse the repository at this point in the history
Fixed overflow introduced in AdminClient.waitForCompletion by passing in
Long.MAX_VALUE for duration.

Verify cluster store definition in StoreRoutingPlan. This requires
working around existing problems with how system stores are handled (the
store definition is hard-coded for two zones). Left some TODOs about
testing and fixing all of this.

Added TODOs about currentCluster vs interimCluster. Need to tweak
interface to RebalanceController and RebalancePlan to be consistent with
recommended usage (i.e., deploying interimCluster before starting
rebalance).

Minor tweaks to tests based on above changes.
  • Loading branch information
jayjwylie committed Jun 20, 2013
1 parent c794504 commit 81f0a16
Show file tree
Hide file tree
Showing 11 changed files with 132 additions and 24 deletions.
9 changes: 6 additions & 3 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -673,7 +673,7 @@ public void stopAsyncRequest(int nodeId, int requestId) {
* @param nodeId Id of the node to poll
* @param requestId Id of the request to check
* @param maxWait Maximum time we'll keep checking a request until we
* give up
* give up. Pass in 0 or less to wait "forever".
* @param timeUnit Unit in which maxWait is expressed.
* @param higherStatus A higher level async operation object. If this
* waiting is being run another async operation this helps us
Expand All @@ -688,7 +688,10 @@ public String waitForCompletion(int nodeId,
TimeUnit timeUnit,
AsyncOperationStatus higherStatus) {
long delay = INITIAL_DELAY;
long waitUntil = System.currentTimeMillis() + timeUnit.toMillis(maxWait);
long waitUntil = Long.MAX_VALUE;
if(maxWait > 0) {
waitUntil = System.currentTimeMillis() + timeUnit.toMillis(maxWait);
}

String description = null;
String oldStatus = "";
Expand Down Expand Up @@ -765,7 +768,7 @@ public String waitForCompletion(int nodeId, int requestId, long maxWait, TimeUni
* maxWait time.
*/
public String waitForCompletion(int nodeId, int requestId) {
return waitForCompletion(nodeId, requestId, Long.MAX_VALUE, TimeUnit.SECONDS, null);
return waitForCompletion(nodeId, requestId, 0, TimeUnit.SECONDS, null);
}

/**
Expand Down
4 changes: 4 additions & 0 deletions src/java/voldemort/client/rebalance/RebalanceController.java
Expand Up @@ -123,6 +123,10 @@ public RebalancePlan getPlan(Cluster finalCluster,
RebalanceUtils.validateClusterStores(finalCluster, finalStoreDefs);
RebalanceUtils.validateCurrentFinalCluster(currentCluster, finalCluster);

// TODO: (currentCluster vs interimCluster) Add more validation before
// constructing plan? Given that currentCluster was polled from prod
// cluster, should confirm that it is an "interim cluster" i.e., has
// same (superset?) of nodes as are in finalCluster.
String outputDir = null;
return new RebalancePlan(currentCluster,
currentStoreDefs,
Expand Down
4 changes: 4 additions & 0 deletions src/java/voldemort/client/rebalance/RebalancePlan.java
Expand Up @@ -99,6 +99,10 @@ public RebalancePlan(final Cluster currentCluster,
this.batchSize = batchSize;
this.outputDir = outputDir;

// TODO: (currentCluster vs interimCluster) Instead of divining
// interimCluster from currentCluster and finalCluster, should we
// require that interimCluster be passed in?

// Derive the targetCluster from current & final cluster xml
RebalanceUtils.validateCurrentFinalCluster(this.currentCluster, this.finalCluster);
Cluster interimCluster = RebalanceUtils.getInterimCluster(this.currentCluster,
Expand Down
46 changes: 46 additions & 0 deletions src/java/voldemort/routing/StoreRoutingPlan.java
Expand Up @@ -26,6 +26,7 @@
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.store.StoreDefinition;
import voldemort.store.system.SystemStoreConstants;
import voldemort.utils.ByteUtils;
import voldemort.utils.ClusterUtils;
import voldemort.utils.NodeUtils;
Expand All @@ -52,6 +53,8 @@ public class StoreRoutingPlan {
public StoreRoutingPlan(Cluster cluster, StoreDefinition storeDefinition) {
this.cluster = cluster;
this.storeDefinition = storeDefinition;
verifyClusterStoreDefinition();

this.partitionIdToNodeIdMap = ClusterUtils.getCurrentPartitionMapping(cluster);
this.routingStrategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDefinition,
cluster);
Expand All @@ -78,7 +81,50 @@ public StoreRoutingPlan(Cluster cluster, StoreDefinition storeDefinition) {
}
}
}
}

/**
* Verify that cluster is congruent to store def wrt zones.
*/
private void verifyClusterStoreDefinition() {
if(SystemStoreConstants.isSystemStore(storeDefinition.getName())) {
// TODO: Once "todo" in StorageService.initSystemStores is complete,
// this early return can be removed and verification can be enabled
// for system stores.
return;
}

Set<Integer> clusterZoneIds = cluster.getZoneIds();

if(clusterZoneIds.size() > 1) { // Zoned
Map<Integer, Integer> zoneRepFactor = storeDefinition.getZoneReplicationFactor();
Set<Integer> storeDefZoneIds = zoneRepFactor.keySet();

if(!clusterZoneIds.equals(storeDefZoneIds)) {
throw new VoldemortException("Zone IDs in cluster (" + clusterZoneIds
+ ") are incongruent with zone IDs in store defs ("
+ storeDefZoneIds + ")");
}

for(int zoneId: clusterZoneIds) {
if(zoneRepFactor.get(zoneId) > cluster.getNumberOfNodesInZone(zoneId)) {
throw new VoldemortException("Not enough nodes ("
+ cluster.getNumberOfNodesInZone(zoneId)
+ ") in zone with id " + zoneId
+ " for replication factor of "
+ zoneRepFactor.get(zoneId) + ".");
}
}
} else { // Non-zoned

if(storeDefinition.getReplicationFactor() > cluster.getNumberOfNodes()) {
System.err.println(storeDefinition);
System.err.println(cluster);
throw new VoldemortException("Not enough nodes (" + cluster.getNumberOfNodes()
+ ") for replication factor of "
+ storeDefinition.getReplicationFactor() + ".");
}
}
}

public Cluster getCluster() {
Expand Down
3 changes: 2 additions & 1 deletion src/java/voldemort/server/storage/StorageService.java
Expand Up @@ -249,7 +249,8 @@ private void initSystemStores() {
}

private void updateRepFactor(List<StoreDefinition> storesDefs) {
// need impl
// TODO: need implementation. Once implemented, see related todo in
// StoreRoutingPlan.verifyClusterStoreDefinition
}

@Override
Expand Down
2 changes: 2 additions & 0 deletions src/java/voldemort/store/system/SystemStoreConstants.java
Expand Up @@ -37,6 +37,8 @@ public static enum SystemStoreName {
voldsys$_metadata_version_persistence;
}

// TODO: Verify that this hard coded system store works in three zones
// and/or extend it to have zone 2.
public static final String SYSTEM_STORE_SCHEMA = "<stores>"
+ " <store>"
+ " <name>voldsys$_client_registry</name>"
Expand Down
11 changes: 6 additions & 5 deletions src/java/voldemort/tools/PartitionBalance.java
Expand Up @@ -35,8 +35,6 @@

import com.google.common.collect.Maps;

;

public class PartitionBalance {

/**
Expand Down Expand Up @@ -207,15 +205,16 @@ private Map<Integer, Integer> getNodeIdToNaryCount(Cluster cluster,
return nodeIdToNaryCount;
}

// TODO: (refactor) When/if "replica type" is exorcised from the code base,
// TODO: (replicaType) When replicaType is exorcized from the code base,
// this detailed dump method should be removed.
/**
* Dumps the partition IDs per node in terms of "replica type".
*
* @param cluster
* @param storeDefinition
* @return pretty printed string of detailed replica tyep dump.
* @return pretty printed string of detailed replica type dump.
*/
@Deprecated
private String dumpReplicaTypeDetails(Cluster cluster, StoreDefinition storeDefinition) {
StringBuilder sb = new StringBuilder();
Map<Integer, Set<Pair<Integer, Integer>>> nodeIdToAllPartitions = RebalanceUtils.getNodeIdToAllPartitions(cluster,
Expand Down Expand Up @@ -281,7 +280,9 @@ private String dumpZoneNAryDetails(StoreRoutingPlan storeRoutingPlan) {
List<Integer> naries = storeRoutingPlan.getZoneNAryPartitionIds(nodeId);
Map<Integer, List<Integer>> zoneNaryTypeToPartitionIds = new HashMap<Integer, List<Integer>>();
for(int nary: naries) {
int zoneReplicaType = storeRoutingPlan.getZoneNaryForNodesPartition(zoneId, nodeId, nary);
int zoneReplicaType = storeRoutingPlan.getZoneNaryForNodesPartition(zoneId,
nodeId,
nary);
if(!zoneNaryTypeToPartitionIds.containsKey(zoneReplicaType)) {
zoneNaryTypeToPartitionIds.put(zoneReplicaType, new ArrayList<Integer>());
}
Expand Down
34 changes: 32 additions & 2 deletions test/common/voldemort/ClusterTestUtils.java
Expand Up @@ -110,21 +110,50 @@ public static List<StoreDefinition> getZZ322StoreDefs(String storageType) {
return storeDefs;
}

public static List<StoreDefinition> getZZ322StoreDefsWithNonContiguousZoneIds(String storageType) {

List<StoreDefinition> storeDefs = new LinkedList<StoreDefinition>();
HashMap<Integer, Integer> zoneRep322 = new HashMap<Integer, Integer>();
zoneRep322.put(0, 3);
zoneRep322.put(2, 3);
StoreDefinition storeDef322 = new StoreDefinitionBuilder().setName("ZZ322")
.setType(storageType)
.setRoutingPolicy(RoutingTier.CLIENT)
.setRoutingStrategyType(RoutingStrategyType.ZONE_STRATEGY)
.setKeySerializer(new SerializerDefinition("string"))
.setValueSerializer(new SerializerDefinition("string"))
.setReplicationFactor(6)
.setZoneReplicationFactor(zoneRep322)
.setRequiredReads(2)
.setRequiredWrites(2)
.setZoneCountReads(0)
.setZoneCountWrites(0)
.build();
storeDefs.add(storeDef322);
return storeDefs;
}

/**
* Store defs for zoned clusters with 2 zones. Covers the three store
* definitions of interest: 3/2/2, 2/1/1, and
*/
public static List<StoreDefinition> getZZStoreDefsInMemory() {
List<StoreDefinition> storeDefs = new LinkedList<StoreDefinition>();
storeDefs.addAll(ClusterTestUtils.getZZ111StoreDefs(InMemoryStorageConfiguration.TYPE_NAME));
storeDefs.addAll(getZZ111StoreDefs(InMemoryStorageConfiguration.TYPE_NAME));
storeDefs.addAll(getZZ211StoreDefs(InMemoryStorageConfiguration.TYPE_NAME));
storeDefs.addAll(getZZ322StoreDefs(InMemoryStorageConfiguration.TYPE_NAME));
return storeDefs;
}

public static List<StoreDefinition> getZZStoreDefsWithNonContiguousZoneIDsInMemory() {
List<StoreDefinition> storeDefs = new LinkedList<StoreDefinition>();
storeDefs.addAll(getZZ322StoreDefsWithNonContiguousZoneIds(InMemoryStorageConfiguration.TYPE_NAME));
return storeDefs;
}

public static List<StoreDefinition> getZZStoreDefsBDB() {
List<StoreDefinition> storeDefs = new LinkedList<StoreDefinition>();
storeDefs.addAll(ClusterTestUtils.getZZ111StoreDefs(BdbStorageConfiguration.TYPE_NAME));
storeDefs.addAll(getZZ111StoreDefs(BdbStorageConfiguration.TYPE_NAME));
storeDefs.addAll(getZZ211StoreDefs(BdbStorageConfiguration.TYPE_NAME));
storeDefs.addAll(getZZ322StoreDefs(BdbStorageConfiguration.TYPE_NAME));
return storeDefs;
Expand Down Expand Up @@ -511,6 +540,7 @@ public static Cluster getZZClusterWithNonContiguousZoneIDsButContiguousNodeIDs()
node.getHttpPort(),
node.getSocketPort(),
node.getAdminPort(),
node.getZoneId(),
node.getPartitionIds());
nodeList.add(newNode);
nodeId++;
Expand Down
Expand Up @@ -41,7 +41,6 @@
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import voldemort.ClusterTestUtils;
Expand Down Expand Up @@ -120,8 +119,13 @@ public AbstractZonedRebalanceTest(boolean useNio, boolean useDonorBased) {
super(useNio, useDonorBased);
}

@BeforeClass
public static void generalSetup() throws IOException {
@Before
public void setUp() throws IOException {
setUpRWStuff();
setupZZandZZZ();
}

public static void setupZZandZZZ() throws IOException {
zzCurrent = ClusterTestUtils.getZZCluster();
zzShuffle = ClusterTestUtils.getZZClusterWithSwappedPartitions();
zzClusterExpansionNN = ClusterTestUtils.getZZClusterWithNN();
Expand All @@ -145,8 +149,7 @@ public static void generalSetup() throws IOException {
zzzStoresXml = zzzfile.getAbsolutePath();
}

@Before
public void setUp() throws IOException {
public void setUpRWStuff() throws IOException {
// First without replication
HashMap<Integer, Integer> zrfRWStoreWithoutReplication = new HashMap<Integer, Integer>();
zrfRWStoreWithoutReplication.put(0, 1);
Expand Down Expand Up @@ -237,6 +240,14 @@ public void tearDown() {
socketStoreFactory = null;
}

// TODO: (currentCluster vs interimCluster) Ideally, we could go from
// cCluster to fCluster for zone expansion. Unfortunately, to start a
// VoldemortServer, you need a cluster xml that includes that server. For
// now, we assume interim cluster is deployed (i.e., cluster with empty
// nodes in new zones). Either, deploying interim cluster with empty nodes
// must be codified in run book and tested as a pre-condition or servers
// need to be able to start without a cluster xml that includes them.

// TODO: The tests based on this method are susceptible to TOCTOU
// BindException issue since findFreePorts is used to determine the ports
// for localhost:PORT of each node.
Expand Down Expand Up @@ -349,13 +360,22 @@ public void testClusterExpansionZZZ() throws Exception {

@Test(timeout = 600000)
public void testZoneExpansionZZ2ZZZ() throws Exception {
// TODO: see todo for method testZonedRebalance to understand why we
// cannot invoke the following:
/*-
testZonedRebalance("TestZoneExpansionZZ2ZZZ",
zzCurrent,
zzzZoneExpansionXXP,
zzStoresXml,
zzzStoresXml,
zzStores,
zzzStores);
*/
testZonedRebalance("TestZoneExpansionZZ2ZZZ",
zzeZoneExpansion,
zzzZoneExpansionXXP,
zzzStoresXml,
zzzStores);
}

@Test(timeout = 600000)
Expand Down
7 changes: 2 additions & 5 deletions test/unit/voldemort/tools/PartitionBalanceTest.java
Expand Up @@ -60,9 +60,6 @@ public void testEmptyZoneThingsThatShouldWork() {

new PartitionBalance(ClusterTestUtils.getZEZCluster(),
ClusterTestUtils.getZZZStoreDefsInMemory());

new PartitionBalance(ClusterTestUtils.getZEZClusterWithOnlyOneNodeInNewZone(),
ClusterTestUtils.getZZZStoreDefsInMemory());
}

@Test
Expand Down Expand Up @@ -114,9 +111,9 @@ public void testClusterWithZoneThatCannotFullyReplicate() {
* to shrink zones.
*/
@Test
public void testNonContiguousZonesThatShouldWork() {
public void testNonContiguousZoneIds() {
new PartitionBalance(ClusterTestUtils.getZZClusterWithNonContiguousZoneIDsButContiguousNodeIDs(),
ClusterTestUtils.getZZStoreDefsInMemory());
ClusterTestUtils.getZZStoreDefsWithNonContiguousZoneIDsInMemory());
}

// TODO: Fix handling of node Ids so that they do not need to be contiguous.
Expand Down
6 changes: 3 additions & 3 deletions test/unit/voldemort/tools/RepartitionerTest.java
Expand Up @@ -361,7 +361,7 @@ public void verifyGreedySwapsImproveBalance(Cluster currentCluster,
}

@Test
public void testRebalance() {
public void testShuffle() {
// Two zone cluster
Cluster currentCluster = ClusterTestUtils.getZZCluster();
List<StoreDefinition> storeDefs = ClusterTestUtils.getZZStoreDefsInMemory();
Expand Down Expand Up @@ -402,8 +402,8 @@ public void testClusterExpansion() {

@Test
public void testZoneExpansion() {
Cluster currentCluster = ClusterTestUtils.getZZCluster();
List<StoreDefinition> currentStoreDefs = ClusterTestUtils.getZZStoreDefsInMemory();
Cluster currentCluster = ClusterTestUtils.getZZECluster();
List<StoreDefinition> currentStoreDefs = ClusterTestUtils.getZZZStoreDefsInMemory();

Cluster targetCluster = ClusterTestUtils.getZZZClusterWithNNN();
List<StoreDefinition> targetStoreDefs = ClusterTestUtils.getZZZStoreDefsInMemory();
Expand Down

0 comments on commit 81f0a16

Please sign in to comment.