Skip to content

Commit

Permalink
Fix errors introduced during rebase process.
Browse files Browse the repository at this point in the history
Rebase with master was ugly. Picked up a bunch of AdminClient refactoring. These changes conflicted with many changes on the persistency-check branch. Had to fix the return type of AdminClient.queryKeys by hand (to iterator<QueryKeyResult>).

Added some TODO comments to AdminClient.HelperOperations methods that should move into ClusterInstance.

Also updated some copyright dates.
  • Loading branch information
jayjwylie committed Mar 20, 2013
1 parent 37d3b76 commit 79de267
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 24 deletions.
10 changes: 5 additions & 5 deletions src/java/voldemort/VoldemortAdminTool.java
Expand Up @@ -1623,11 +1623,11 @@ private static void executeQueryKeys(final Integer nodeId,
listKeys.add(new ByteArray(serializer.toBytes(key)));
}
for(final String storeName: storeNames) {
final Iterator<QueryKeyResult> iterator = adminClient.queryKeys(nodeId.intValue(),
storeName,
listKeys.iterator());
List<StoreDefinition> storeDefinitionList = adminClient.getRemoteStoreDefList(nodeId)
.getValue();
final Iterator<QueryKeyResult> iterator = adminClient.storeOps.queryKeys(nodeId.intValue(),
storeName,
listKeys.iterator());
List<StoreDefinition> storeDefinitionList = adminClient.metadataMgmtOps.getRemoteStoreDefList(nodeId)
.getValue();
StoreDefinition storeDefinition = null;
for(StoreDefinition storeDef: storeDefinitionList) {
if(storeDef.getName().equals(storeName))
Expand Down
38 changes: 25 additions & 13 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
@@ -1,5 +1,5 @@
/*
* Copyright 2008-2009 LinkedIn, Inc
* Copyright 2008-2013 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
Expand Down Expand Up @@ -76,6 +76,7 @@
import voldemort.store.readonly.ReadOnlyStorageFormat;
import voldemort.store.readonly.ReadOnlyStorageMetadata;
import voldemort.store.readonly.ReadOnlyUtils;
import voldemort.store.routed.NodeValue;
import voldemort.store.slop.Slop;
import voldemort.store.slop.Slop.Operation;
import voldemort.store.socket.SocketDestination;
Expand All @@ -84,11 +85,14 @@
import voldemort.store.views.ViewStorageConfiguration;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
import voldemort.utils.ClusterUtils;
import voldemort.utils.MetadataVersionStoreUtils;
import voldemort.utils.NetworkClassLoader;
import voldemort.utils.Pair;
import voldemort.utils.RebalanceUtils;
import voldemort.utils.StoreDefinitionUtils;
import voldemort.utils.Utils;
import voldemort.versioning.ObsoleteVersionException;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;
Expand Down Expand Up @@ -335,6 +339,7 @@ private void close(Socket socket) {
}
}

// TODO: Move this helper method to ClusterInstance?
/**
* For a particular node, finds out all the [replica, partition] tuples
* it needs to steal in order to be brought back to normal state
Expand All @@ -351,6 +356,7 @@ public Map<Integer, HashMap<Integer, List<Integer>>> getReplicationMapping(int r
return getReplicationMapping(restoringNode, cluster, storeDef, -1);
}

// TODO: Move this helper method to ClusterInstance?
/**
* For a particular node, finds out all the [replica, partition] tuples
* it needs to steal in order to be brought back to normal state
Expand Down Expand Up @@ -408,6 +414,7 @@ public Map<Integer, HashMap<Integer, List<Integer>>> getReplicationMapping(int r
return returnMap;
}

// TODO: Move this helper method to ClusterInstance?
/**
* For each partition that need to be restored, find a donor node that
* owns the partition AND has the same zone ID as requested. -1 means no
Expand All @@ -431,7 +438,7 @@ private void addDonorWithZonePreference(List<Integer> remainderPartitions,
int zoneId,
Cluster cluster,
StoreDefinition storeDef) {
Map<Integer, Integer> partitionToNodeId = RebalanceUtils.getCurrentPartitionMapping(cluster);
Map<Integer, Integer> partitionToNodeId = ClusterUtils.getCurrentPartitionMapping(cluster);
int nodeId = -1;
int replicaType = -1;
int partition = -1;
Expand Down Expand Up @@ -487,6 +494,12 @@ private VAdminProto.VoldemortFilter encodeFilter(VoldemortFilter filter) throws
.build();
}

// TODO: It is weird that a helper method invokes
// metadataMgmtOps.getRemoteStoreDefList. Refactor this method to split
// some of the functionality into ClusterInstance, and then move this
// method to metadataMgmtOps. Or, do the refactoring wrt ClusterInstance
// and change the method interface to require storeDef rather than
// storeName to avoid doing a metadata operation...
/**
* Converts list of partitions to map of replica type to partition list.
*
Expand All @@ -501,7 +514,8 @@ private HashMap<Integer, List<Integer>> getReplicaToPartitionMap(int nodeId,
List<StoreDefinition> allStoreDefs = metadataMgmtOps.getRemoteStoreDefList(nodeId)
.getValue();
allStoreDefs.addAll(SystemStoreConstants.getAllSystemStoreDefs());
StoreDefinition def = RebalanceUtils.getStoreDefinitionWithName(allStoreDefs, storeName);
StoreDefinition def = StoreDefinitionUtils.getStoreDefinitionWithName(allStoreDefs,
storeName);
HashMap<Integer, List<Integer>> replicaToPartitionList = Maps.newHashMap();
for(int replicaNum = 0; replicaNum < def.getReplicationFactor(); replicaNum++) {
replicaToPartitionList.put(replicaNum, partitions);
Expand Down Expand Up @@ -1871,7 +1885,7 @@ public ByteArray computeNext() {
}
}

// TODO: Move into streaming operations or some such.
// TODO: Move into StreamingStoreOperations or some such.
/**
* This method is a work-in-progress. Expectation is that updateEntries will
* eventually be used to process a stream of repairs.
Expand Down Expand Up @@ -2035,9 +2049,9 @@ public void updateEntries(int nodeId,
* @return An iterator which allows entries to be streamed as they're
* being iterated over.
*/
public Iterator<Pair<ByteArray, Pair<List<Versioned<byte[]>>, Exception>>> queryKeys(int nodeId,
String storeName,
final Iterator<ByteArray> keys) {
public Iterator<QueryKeyResult> queryKeys(int nodeId,
String storeName,
final Iterator<ByteArray> keys) {

Node node = AdminClient.this.getAdminClientCluster().getNodeById(nodeId);
ClientConfig clientConfig = new ClientConfig();
Expand All @@ -2060,12 +2074,11 @@ public Iterator<Pair<ByteArray, Pair<List<Versioned<byte[]>>, Exception>>> query
throw new VoldemortException(e);
}

return new AbstractIterator<Pair<ByteArray, Pair<List<Versioned<byte[]>>, Exception>>>() {
return new AbstractIterator<QueryKeyResult>() {

@Override
public Pair<ByteArray, Pair<List<Versioned<byte[]>>, Exception>> computeNext() {
public QueryKeyResult computeNext() {
ByteArray key;
Exception exception = null;
List<Versioned<byte[]>> value = null;
if(!keys.hasNext()) {
clientPool.close();
Expand All @@ -2075,10 +2088,10 @@ public Pair<ByteArray, Pair<List<Versioned<byte[]>>, Exception>> computeNext() {
}
try {
value = store.get(key, null);
return new QueryKeyResult(key, value);
} catch(Exception e) {
exception = e;
return new QueryKeyResult(key, e);
}
return Pair.create(key, Pair.create(value, exception));
}
};
}
Expand Down Expand Up @@ -3101,4 +3114,3 @@ public void fetchPartitionFiles(int nodeId,
}
}
}

8 changes: 6 additions & 2 deletions src/java/voldemort/utils/ConsistencyCheck.java
Expand Up @@ -250,7 +250,7 @@ public static ConsistencyCheckStats doConsistencyCheck(String storeName,
Cluster cluster = adminClient.getAdminClientCluster();

/* find store */
Versioned<List<StoreDefinition>> storeDefinitions = adminClient.getRemoteStoreDefList(0);
Versioned<List<StoreDefinition>> storeDefinitions = adminClient.metadataMgmtOps.getRemoteStoreDefList(0);
List<StoreDefinition> StoreDefitions = storeDefinitions.getValue();
StoreDefinition storeDefinition = null;
for(StoreDefinition def: StoreDefitions) {
Expand Down Expand Up @@ -315,7 +315,11 @@ public static ConsistencyCheckStats doConsistencyCheck(String storeName,
/* get entry Iterator from each node */
for(Integer nodeId: nodeIdList) {
Iterator<Pair<ByteArray, Versioned<byte[]>>> entries;
entries = adminClient.fetchEntries(nodeId, storeName, singlePartition, null, false);
entries = adminClient.bulkFetchOps.fetchEntries(nodeId,
storeName,
singlePartition,
null,
false);
nodeEntriesMap.put(new PrefixNode(urlId, cluster.getNodeById(nodeId)), entries);
}
replicationFactor += storeDefinition.getReplicationFactor();
Expand Down
8 changes: 4 additions & 4 deletions src/java/voldemort/utils/ConsistencyFix.java
Expand Up @@ -62,7 +62,7 @@ public ConsistencyFixContext(String url, String storeName) throws Exception {
System.out.println("Cluster determined to be: " + cluster.getName());

System.out.println("Determining store definition for store: " + storeName);
Versioned<List<StoreDefinition>> storeDefinitions = adminClient.getRemoteStoreDefList(0);
Versioned<List<StoreDefinition>> storeDefinitions = adminClient.metadataMgmtOps.getRemoteStoreDefList(0);
List<StoreDefinition> storeDefs = storeDefinitions.getValue();
StoreDefinition storeDefinition = StoreDefinitionUtils.getStoreDefinitionWithName(storeDefs,
storeName);
Expand Down Expand Up @@ -282,9 +282,9 @@ private static ConsistencyFix.FixKeyResult doRead(final ConsistencyFixContext vI
}
for(int nodeId: nodeIdList) {
Iterator<QueryKeyResult> keyValues;
keyValues = vInstance.getAdminClient().queryKeys(nodeId,
vInstance.getStoreName(),
keys.iterator());
keyValues = vInstance.getAdminClient().storeOps.queryKeys(nodeId,
vInstance.getStoreName(),
keys.iterator());
nodeIdToKeyValues.put(nodeId, keyValues);
// TODO: Not sure if this is appropriate. Since the keyValues
// iterator is a google abstract iterator and inside computeNext()
Expand Down

0 comments on commit 79de267

Please sign in to comment.