Skip to content

Commit

Permalink
Added KeySampler and KeyVersionSampler tools as a first step towards …
Browse files Browse the repository at this point in the history
…replacing "entropy" tool. Added another argument to bulk fetch operations that specifies maxRecords so that server can fetch a subset of a partition.

src/java/voldemort/utils/KeySamplerCLI.java
- Samples keys from a cluster

src/java/voldemort/utils/KeyVersionSamplerCLI.java
- Given file that lists keys per store, samples versions from each "responsible node" for that key

src/java/voldemort/client/protocol/admin/AdminClient.java
- passed maxRecords through
- TODO for future clean up of some types

src/java/voldemort/client/protocol/pb/VAdminProto.java
- auto generated!

src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java
- white space

src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java
src/java/voldemort/server/protocol/admin/FetchEntriesStreamRequestHandler.java
src/java/voldemort/server/protocol/admin/FetchKeysStreamRequestHandler.java
- handle maxRecords

src/java/voldemort/server/protocol/admin/FetchPartitionKeysStreamRequestHandler.java
src/java/voldemort/server/protocol/admin/FetchPartitionEntriesStreamRequestHandler.java
- handle maxRecords
- fixed usage of skipRecords

src/java/voldemort/utils/Entropy.java
- added maxRecords

src/proto/voldemort-admin.proto
- added mac_records to protobuff definition

test/unit/voldemort/client/AdminFetchTest.java
- added maxRecords field to test
  • Loading branch information
jayjwylie committed Mar 20, 2013
1 parent e50ad0a commit 7669d86
Show file tree
Hide file tree
Showing 11 changed files with 376 additions and 214 deletions.
38 changes: 26 additions & 12 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -1432,7 +1432,8 @@ private void initiateFetchRequest(DataOutputStream outputStream,
boolean fetchValues,
boolean fetchMasterEntries,
Cluster initialCluster,
long skipRecords) throws IOException {
long skipRecords,
long maxRecords) throws IOException {
HashMap<Integer, List<Integer>> filteredReplicaToPartitionList = Maps.newHashMap();
if(fetchMasterEntries) {
if(!replicaToPartitionList.containsKey(0)) {
Expand All @@ -1447,7 +1448,8 @@ private void initiateFetchRequest(DataOutputStream outputStream,
.setFetchValues(fetchValues)
.addAllReplicaToPartition(ProtoUtils.encodePartitionTuple(filteredReplicaToPartitionList))
.setStore(storeName)
.setSkipRecords(skipRecords);
.setSkipRecords(skipRecords)
.setMaxRecords(maxRecords);

try {
if(filter != null) {
Expand Down Expand Up @@ -1574,14 +1576,16 @@ public Iterator<Pair<ByteArray, Versioned<byte[]>>> fetchEntries(int nodeId,
List<Integer> partitionList,
VoldemortFilter filter,
boolean fetchMasterEntries,
long skipRecords) {
long skipRecords,
long maxRecords) {
return fetchEntries(nodeId,
storeName,
helperOps.getReplicaToPartitionMap(nodeId, storeName, partitionList),
filter,
fetchMasterEntries,
null,
skipRecords);
skipRecords,
maxRecords);
}

/**
Expand All @@ -1603,9 +1607,13 @@ public Iterator<Pair<ByteArray, Versioned<byte[]>>> fetchEntries(int nodeId,
List<Integer> partitionList,
VoldemortFilter filter,
boolean fetchMasterEntries) {
return fetchEntries(nodeId, storeName, partitionList, filter, fetchMasterEntries, 0);
return fetchEntries(nodeId, storeName, partitionList, filter, fetchMasterEntries, 0, 0);
}

// TODO: " HashMap<Integer, List<Integer>> replicaToPartitionList," is a
// confusing/opaque argument. Can this be made a type, or even
// unrolled/simplified?

// TODO: The use of "Pair" in the return for a fundamental type is
// awkward. We should have a core KeyValue type that effectively wraps
// up a ByteArray and a Versioned<byte[]>.
Expand Down Expand Up @@ -1645,7 +1653,8 @@ public Iterator<Pair<ByteArray, Versioned<byte[]>>> fetchEntries(int nodeId,
VoldemortFilter filter,
boolean fetchMasterEntries,
Cluster initialCluster,
long skipRecords) {
long skipRecords,
long maxRecords) {

Node node = AdminClient.this.getAdminClientCluster().getNodeById(nodeId);
final SocketDestination destination = new SocketDestination(node.getHost(),
Expand All @@ -1663,7 +1672,8 @@ public Iterator<Pair<ByteArray, Versioned<byte[]>>> fetchEntries(int nodeId,
true,
fetchMasterEntries,
initialCluster,
skipRecords);
skipRecords,
maxRecords);
} catch(IOException e) {
helperOps.close(sands.getSocket());
socketPool.checkin(destination, sands);
Expand Down Expand Up @@ -1790,14 +1800,16 @@ public Iterator<ByteArray> fetchKeys(int nodeId,
List<Integer> partitionList,
VoldemortFilter filter,
boolean fetchMasterEntries,
long skipRecords) {
long skipRecords,
long maxRecords) {
return fetchKeys(nodeId,
storeName,
helperOps.getReplicaToPartitionMap(nodeId, storeName, partitionList),
filter,
fetchMasterEntries,
null,
skipRecords);
skipRecords,
maxRecords);
}

/**
Expand All @@ -1819,7 +1831,7 @@ public Iterator<ByteArray> fetchKeys(int nodeId,
List<Integer> partitionList,
VoldemortFilter filter,
boolean fetchMasterEntries) {
return fetchKeys(nodeId, storeName, partitionList, filter, fetchMasterEntries, 0);
return fetchKeys(nodeId, storeName, partitionList, filter, fetchMasterEntries, 0, 0);
}

/**
Expand All @@ -1843,7 +1855,8 @@ public Iterator<ByteArray> fetchKeys(int nodeId,
VoldemortFilter filter,
boolean fetchMasterEntries,
Cluster initialCluster,
long skipRecords) {
long skipRecords,
long maxRecords) {
Node node = AdminClient.this.getAdminClientCluster().getNodeById(nodeId);
final SocketDestination destination = new SocketDestination(node.getHost(),
node.getAdminPort(),
Expand All @@ -1860,7 +1873,8 @@ public Iterator<ByteArray> fetchKeys(int nodeId,
false,
fetchMasterEntries,
initialCluster,
skipRecords);
skipRecords,
maxRecords);
} catch(IOException e) {
helperOps.close(sands.getSocket());
socketPool.checkin(destination, sands);
Expand Down

0 comments on commit 7669d86

Please sign in to comment.