diff --git a/src/java/voldemort/utils/KeyVersionFetcherCLI.java b/src/java/voldemort/utils/KeyVersionFetcherCLI.java index 140e9ae520..914132e6ae 100644 --- a/src/java/voldemort/utils/KeyVersionFetcherCLI.java +++ b/src/java/voldemort/utils/KeyVersionFetcherCLI.java @@ -21,11 +21,14 @@ import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -41,6 +44,7 @@ import org.apache.commons.codec.DecoderException; import org.apache.log4j.Logger; +import voldemort.VoldemortException; import voldemort.client.ClientConfig; import voldemort.client.protocol.admin.AdminClient; import voldemort.client.protocol.admin.AdminClientConfig; @@ -52,7 +56,7 @@ /** * The KeyVersionFetcherCLI is a rudimentary tool that outputs a sampling of * existing keys from a cluster. For each store in the cluster, a distinct file - * of keys to sample is expected. And, for each of these, a distint file of + * of keys to sample is expected. And, for each of these, a distinct file of * key-versions is generated. * */ @@ -155,37 +159,85 @@ public void updateFetchProgress(String storeName) { } } + public class ZoneToNaryToString { + + Map> zoneToNaryToString; + + ZoneToNaryToString() { + zoneToNaryToString = new HashMap>(); + } + + public void addZoneNaryString(int zoneId, int zoneNAry, String string) { + if(!zoneToNaryToString.containsKey(zoneId)) { + zoneToNaryToString.put(zoneId, new HashMap()); + } + if(zoneToNaryToString.get(zoneId).containsKey(zoneNAry)) { + throw new VoldemortException("ZoneToNaryToString already contains zoneNary " + + zoneNAry); + } else { + zoneToNaryToString.get(zoneId).put(zoneNAry, string); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + + Set sortedZoneIds = new TreeSet(zoneToNaryToString.keySet()); + for(int zoneId: sortedZoneIds) { + Set sortedZoneNAries = new TreeSet(zoneToNaryToString.get(zoneId) + .keySet()); + for(int zoneNary: sortedZoneNAries) { + sb.append(zoneId) + .append(" : ") + .append(zoneNary) + .append(" : ") + .append(zoneToNaryToString.get(zoneId).get(zoneNary)) + .append("\n"); + } + } + return sb.toString(); + } + } + public class FetchKeyVersionsTask implements Callable { - private final StoreRoutingPlan storeInstance; + private final StoreRoutingPlan storeRoutingPlan; private final byte[] key; - FetchKeyVersionsTask(StoreRoutingPlan storeInstance, byte[] key) { - this.storeInstance = storeInstance; + FetchKeyVersionsTask(StoreRoutingPlan storeRoutingPlan, byte[] key) { + this.storeRoutingPlan = storeRoutingPlan; this.key = key; } @Override public String call() throws Exception { - String storeName = storeInstance.getStoreDefinition().getName(); - int masterPartitionId = storeInstance.getMasterPartitionId(key); - List replicatingNodeIds = storeInstance.getReplicationNodeList(masterPartitionId); + String storeName = storeRoutingPlan.getStoreDefinition().getName(); + int masterPartitionId = storeRoutingPlan.getMasterPartitionId(key); + List replicatingNodeIds = storeRoutingPlan.getReplicationNodeList(masterPartitionId); + + ZoneToNaryToString zoneToNaryToString = new ZoneToNaryToString(); - int replicationOffset = 0; - StringBuilder sb = new StringBuilder(); for(int replicatingNodeId: replicatingNodeIds) { List> values = adminClient.storeOps.getNodeKey(storeName, replicatingNodeId, new ByteArray(key)); - sb.append(replicationOffset + " : " + ByteUtils.toHexString(key) + "\t"); + int zoneId = storeRoutingPlan.getCluster() + .getNodeById(replicatingNodeId) + .getZoneId(); + int zoneNAry = storeRoutingPlan.getZoneNAry(zoneId, replicatingNodeId, key); + + StringBuilder sb = new StringBuilder(); + sb.append(ByteUtils.toHexString(key)); for(Versioned value: values) { - sb.append(value.getVersion().toString() + "\t"); + sb.append(" : ").append(value.getVersion().toString()); } - sb.append("\n"); - replicationOffset++; + + zoneToNaryToString.addZoneNaryString(zoneId, zoneNAry, sb.toString()); } + updateFetchProgress(storeName); - return sb.toString(); + return zoneToNaryToString.toString(); } } @@ -207,7 +259,7 @@ public boolean sampleStore(StoreDefinition storeDefinition) { return true; } - StoreRoutingPlan storeInstance = new StoreRoutingPlan(cluster, storeDefinition); + StoreRoutingPlan storeRoutingPlan = new StoreRoutingPlan(cluster, storeDefinition); BufferedReader keyReader = null; BufferedWriter kvWriter = null; try { @@ -224,7 +276,7 @@ public boolean sampleStore(StoreDefinition storeDefinition) { break; } byte[] keyInBytes = ByteUtils.fromHexString(keyLine.trim()); - FetchKeyVersionsTask kvFetcher = new FetchKeyVersionsTask(storeInstance, + FetchKeyVersionsTask kvFetcher = new FetchKeyVersionsTask(storeRoutingPlan, keyInBytes); Future future = kvFetcherService.submit(kvFetcher); futureKVs.add(future);