Skip to content

Commit

Permalink
I mangled the merge after a rebase. This commit manually fixes the er…
Browse files Browse the repository at this point in the history
…rors I introduced. Mea culpa.
  • Loading branch information
jayjwylie committed Jun 20, 2013
1 parent aee0639 commit 857aa57
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 4 deletions.
Expand Up @@ -225,7 +225,6 @@ public synchronized ImmutableMap<String, Object> asMap() {
return builder.build();
}

<<<<<<< HEAD
// TODO: Remove this.
@Deprecated
public synchronized void setAttempt(int attempt) {
Expand Down Expand Up @@ -276,7 +275,6 @@ public synchronized int getPartitionStoreMoves() {
return count;
}


/**
* Returns the stores which have their partitions being added ( The stores
* with partitions being deleted are a sub-set )
Expand Down
5 changes: 3 additions & 2 deletions src/java/voldemort/utils/PartitionBalance.java
Expand Up @@ -24,6 +24,7 @@

import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.routing.StoreRoutingPlan;
import voldemort.store.StoreDefinition;

import com.google.common.collect.Maps;
Expand Down Expand Up @@ -68,7 +69,7 @@ public PartitionBalance(Cluster cluster, List<StoreDefinition> storeDefs) {
}

for(StoreDefinition storeDefinition: uniqueStores.keySet()) {
StoreInstance storeInstance = new StoreInstance(cluster, storeDefinition);
StoreRoutingPlan storeRoutingPlan = new StoreRoutingPlan(cluster, storeDefinition);

builder.append("\n");
builder.append("Store exemplar: " + storeDefinition.getName() + "\n");
Expand Down Expand Up @@ -142,7 +143,7 @@ public PartitionBalance(Cluster cluster, List<StoreDefinition> storeDefs) {
// determines the number of "zone primaries" each node hosts.
for(int partitionId = 0; partitionId < cluster.getNumberOfPartitions(); partitionId++) {
for(int zoneId: zoneIds) {
for(int nodeId: storeInstance.getReplicationNodeList(partitionId)) {
for(int nodeId: storeRoutingPlan.getReplicationNodeList(partitionId)) {
if(cluster.getNodeById(nodeId).getZoneId() == zoneId) {
nodeIdToZonePrimaryCount.put(nodeId,
nodeIdToZonePrimaryCount.get(nodeId) + 1);
Expand Down
111 changes: 111 additions & 0 deletions test/common/voldemort/ServerTestUtils.java
Expand Up @@ -60,6 +60,7 @@
import voldemort.server.protocol.RequestHandler;
import voldemort.server.protocol.RequestHandlerFactory;
import voldemort.server.protocol.SocketRequestHandlerFactory;
import voldemort.server.protocol.admin.AsyncOperationService;
import voldemort.server.socket.SocketService;
import voldemort.store.Store;
import voldemort.store.StoreDefinition;
Expand Down Expand Up @@ -425,6 +426,70 @@ public static Cluster getLocalCluster(int numberOfNodes,
}
}

public static Cluster getLocalZonedCluster(int numberOfNodes,
int numberOfZones,
int[] nodeToZoneMapping,
int[][] partitionMapping) {
return getLocalZonedCluster(numberOfNodes,
numberOfZones,
nodeToZoneMapping,
partitionMapping,
findFreePorts(3 * numberOfNodes));
}

/**
* Returns a cluster with <b>numberOfNodes</b> nodes in <b>numberOfZones</b>
* zones. It is important that <b>numberOfNodes</b> be divisible by
* <b>numberOfZones</b>
*
* @param numberOfNodes Number of nodes in the cluster
* @param partitionsPerNode Number of partitions in one node
* @param numberOfZones Number of zones
* @return Cluster
*/
public static Cluster getLocalZonedCluster(int numberOfNodes,
int numberOfZones,
int[] nodeToZoneMapping,
int[][] partitionMapping,
int[] ports) {

if(numberOfZones > 0 && numberOfNodes > 0 && numberOfNodes % numberOfZones != 0) {
throw new VoldemortException("The number of nodes (" + numberOfNodes
+ ") is not divisible by number of zones ("
+ numberOfZones + ")");
}

List<Node> nodes = new ArrayList<Node>();
for(int i = 0; i < numberOfNodes; i++) {

List<Integer> partitions = new ArrayList<Integer>(partitionMapping[i].length);
for(int p: partitionMapping[i]) {
partitions.add(p);
}

nodes.add(new Node(i,
"localhost",
ports[3 * i],
ports[3 * i + 1],
ports[3 * i + 2],
nodeToZoneMapping[i],
partitions));
}

// Generate zones
List<Zone> zones = Lists.newArrayList();
for(int i = 0; i < numberOfZones; i++) {
LinkedList<Integer> proximityList = Lists.newLinkedList();
int zoneId = i + 1;
for(int j = 0; j < numberOfZones - 1; j++) {
proximityList.add(zoneId % numberOfZones);
zoneId++;
}
zones.add(new Zone(i, proximityList));
}
return new Cluster("cluster", nodes, zones);
}

/**
* Returns a cluster with <b>numberOfZones</b> zones. The array
* <b>nodesPerZone<b> indicates how many nodes are in each of the zones. The
Expand All @@ -435,6 +500,7 @@ public static Cluster getLocalCluster(int numberOfNodes,
* @param nodesPerZone An array of size <b>numberOfZones<b> indicating the
* number of nodes in each zone.
* @param partitionMap An array of size total number of nodes (derived from
* @param partitionMap An array of size total number of nodes (derived from
* <b>nodesPerZone<b> that indicates the specific partitions on each
* node.
* @return
Expand Down Expand Up @@ -833,6 +899,51 @@ public static void waitForServerStart(SocketStoreFactory socketStoreFactory, Nod
throw new RuntimeException("Failed to connect with server:" + node);
}

/***
*
*
* NOTE: This relies on the current behavior of the AsyncOperationService to
* remove an operation if an explicit isComplete() is invoked. If/When that
* is changed, this method will always block upto timeoutMs & return
*
* @param server
* @param asyncOperationPattern substring to match with the operation
* description
* @param timeoutMs
* @return
*/
public static boolean waitForAsyncOperationOnServer(VoldemortServer server,
String asyncOperationPattern,
long timeoutMs) {
long endTimeMs = System.currentTimeMillis() + timeoutMs;
AsyncOperationService service = server.getAsyncRunner();
List<Integer> matchingOperationIds = null;
// wait till the atleast one matching operation shows up
while(System.currentTimeMillis() < endTimeMs) {
matchingOperationIds = service.getMatchingAsyncOperationList(asyncOperationPattern,
true);
if(matchingOperationIds.size() > 0) {
System.err.println(">>" + matchingOperationIds);
break;
}
}
// now wait for those operations to complete
while(System.currentTimeMillis() < endTimeMs) {
List<Integer> completedOps = new ArrayList<Integer>(matchingOperationIds.size());
for(Integer op: matchingOperationIds) {
if(service.isComplete(op)) {
System.err.println("Operation " + op + " is complete");
completedOps.add(op);
}
}
matchingOperationIds.removeAll(completedOps);
if(matchingOperationIds.size() == 0) {
return false;
}
}
return false;
}

protected static Cluster internalStartVoldemortCluster(int numServers,
VoldemortServer[] voldemortServers,
int[][] partitionMap,
Expand Down

0 comments on commit 857aa57

Please sign in to comment.