Skip to content

Commit

Permalink
Make KeyVersionFetcherCLI ZoneNAry aware
Browse files Browse the repository at this point in the history
  • Loading branch information
jayjwylie committed Jun 20, 2013
1 parent 412088d commit ffc9b68
Showing 1 changed file with 68 additions and 16 deletions.
84 changes: 68 additions & 16 deletions src/java/voldemort/utils/KeyVersionFetcherCLI.java
Expand Up @@ -21,11 +21,14 @@
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -41,6 +44,7 @@
import org.apache.commons.codec.DecoderException;
import org.apache.log4j.Logger;

import voldemort.VoldemortException;
import voldemort.client.ClientConfig;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClientConfig;
Expand All @@ -52,7 +56,7 @@
/**
* The KeyVersionFetcherCLI is a rudimentary tool that outputs a sampling of
* existing keys from a cluster. For each store in the cluster, a distinct file
* of keys to sample is expected. And, for each of these, a distint file of
* of keys to sample is expected. And, for each of these, a distinct file of
* key-versions is generated.
*
*/
Expand Down Expand Up @@ -155,37 +159,85 @@ public void updateFetchProgress(String storeName) {
}
}

public class ZoneToNaryToString {

Map<Integer, Map<Integer, String>> zoneToNaryToString;

ZoneToNaryToString() {
zoneToNaryToString = new HashMap<Integer, Map<Integer, String>>();
}

public void addZoneNaryString(int zoneId, int zoneNAry, String string) {
if(!zoneToNaryToString.containsKey(zoneId)) {
zoneToNaryToString.put(zoneId, new HashMap<Integer, String>());
}
if(zoneToNaryToString.get(zoneId).containsKey(zoneNAry)) {
throw new VoldemortException("ZoneToNaryToString already contains zoneNary "
+ zoneNAry);
} else {
zoneToNaryToString.get(zoneId).put(zoneNAry, string);
}
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();

Set<Integer> sortedZoneIds = new TreeSet<Integer>(zoneToNaryToString.keySet());
for(int zoneId: sortedZoneIds) {
Set<Integer> sortedZoneNAries = new TreeSet<Integer>(zoneToNaryToString.get(zoneId)
.keySet());
for(int zoneNary: sortedZoneNAries) {
sb.append(zoneId)
.append(" : ")
.append(zoneNary)
.append(" : ")
.append(zoneToNaryToString.get(zoneId).get(zoneNary))
.append("\n");
}
}
return sb.toString();
}
}

public class FetchKeyVersionsTask implements Callable<String> {

private final StoreRoutingPlan storeInstance;
private final StoreRoutingPlan storeRoutingPlan;
private final byte[] key;

FetchKeyVersionsTask(StoreRoutingPlan storeInstance, byte[] key) {
this.storeInstance = storeInstance;
FetchKeyVersionsTask(StoreRoutingPlan storeRoutingPlan, byte[] key) {
this.storeRoutingPlan = storeRoutingPlan;
this.key = key;
}

@Override
public String call() throws Exception {
String storeName = storeInstance.getStoreDefinition().getName();
int masterPartitionId = storeInstance.getMasterPartitionId(key);
List<Integer> replicatingNodeIds = storeInstance.getReplicationNodeList(masterPartitionId);
String storeName = storeRoutingPlan.getStoreDefinition().getName();
int masterPartitionId = storeRoutingPlan.getMasterPartitionId(key);
List<Integer> replicatingNodeIds = storeRoutingPlan.getReplicationNodeList(masterPartitionId);

ZoneToNaryToString zoneToNaryToString = new ZoneToNaryToString();

int replicationOffset = 0;
StringBuilder sb = new StringBuilder();
for(int replicatingNodeId: replicatingNodeIds) {
List<Versioned<byte[]>> values = adminClient.storeOps.getNodeKey(storeName,
replicatingNodeId,
new ByteArray(key));
sb.append(replicationOffset + " : " + ByteUtils.toHexString(key) + "\t");
int zoneId = storeRoutingPlan.getCluster()
.getNodeById(replicatingNodeId)
.getZoneId();
int zoneNAry = storeRoutingPlan.getZoneNAry(zoneId, replicatingNodeId, key);

StringBuilder sb = new StringBuilder();
sb.append(ByteUtils.toHexString(key));
for(Versioned<byte[]> value: values) {
sb.append(value.getVersion().toString() + "\t");
sb.append(" : ").append(value.getVersion().toString());
}
sb.append("\n");
replicationOffset++;

zoneToNaryToString.addZoneNaryString(zoneId, zoneNAry, sb.toString());
}

updateFetchProgress(storeName);
return sb.toString();
return zoneToNaryToString.toString();
}
}

Expand All @@ -207,7 +259,7 @@ public boolean sampleStore(StoreDefinition storeDefinition) {
return true;
}

StoreRoutingPlan storeInstance = new StoreRoutingPlan(cluster, storeDefinition);
StoreRoutingPlan storeRoutingPlan = new StoreRoutingPlan(cluster, storeDefinition);
BufferedReader keyReader = null;
BufferedWriter kvWriter = null;
try {
Expand All @@ -224,7 +276,7 @@ public boolean sampleStore(StoreDefinition storeDefinition) {
break;
}
byte[] keyInBytes = ByteUtils.fromHexString(keyLine.trim());
FetchKeyVersionsTask kvFetcher = new FetchKeyVersionsTask(storeInstance,
FetchKeyVersionsTask kvFetcher = new FetchKeyVersionsTask(storeRoutingPlan,
keyInBytes);
Future<String> future = kvFetcherService.submit(kvFetcher);
futureKVs.add(future);
Expand Down

0 comments on commit ffc9b68

Please sign in to comment.