From 96d1513dcf9c82e5278ad5ed7b36844f229739fa Mon Sep 17 00:00:00 2001 From: Jay J Wylie Date: Thu, 14 Mar 2013 10:36:14 -0700 Subject: [PATCH] Correctness fixes and significant refactoring of Fetch*StreamRequestHandlers. Expanded AdminFetchTest. Added more common helper methods to common base class of all fetchers FetchStreamRequestHandler. Added abstract base classes for partition-based fetching and non-partition-based fetching: - FetchPartitionStreamRequestHandler (partition-based) - FetchItemsStreamRequestHandler (non-partition-based) Refactored some code up to abstract base classes and made implementations as similar as possible (without heroic efforts) across all fetchers: - FetchEntriesStreamRequestHandler - FetchKeysStreamRequestHandler - FetchPartitionEntriesStreamRequestHandler - FetchPartitionKeysStreamRequestHandler Significant better test coverage in AdminFetchTest - tests fetching keys as well as fetching entries - tests partition-aware and non-partition-aware servers - tests per-partition limits on entries/keys fetched All of this clean up and additional testing lead to minor correctness fixes. Minor other clean ups of comments, override annotations, and fixes for KeySamplerCLI. --- .../client/protocol/admin/AdminClient.java | 5 +- .../client/protocol/pb/ProtoUtils.java | 6 + .../admin/AdminServiceRequestHandler.java | 1 + .../FetchEntriesStreamRequestHandler.java | 78 ++++----- .../admin/FetchItemsStreamRequestHandler.java | 157 +++++++++++++++++ .../admin/FetchKeysStreamRequestHandler.java | 74 +++----- ...hPartitionEntriesStreamRequestHandler.java | 89 +++------- ...etchPartitionKeysStreamRequestHandler.java | 91 +++------- .../FetchPartitionStreamRequestHandler.java | 123 +++++++++++++ .../admin/FetchStreamRequestHandler.java | 70 +++++++- src/java/voldemort/utils/KeySamplerCLI.java | 15 +- src/java/voldemort/utils/StoreInstance.java | 15 ++ .../unit/voldemort/client/AdminFetchTest.java | 165 +++++++++++++++++- 13 files changed, 641 insertions(+), 248 deletions(-) create mode 100644 src/java/voldemort/server/protocol/admin/FetchItemsStreamRequestHandler.java create mode 100644 src/java/voldemort/server/protocol/admin/FetchPartitionStreamRequestHandler.java diff --git a/src/java/voldemort/client/protocol/admin/AdminClient.java b/src/java/voldemort/client/protocol/admin/AdminClient.java index 3857b2a540..38691152bf 100644 --- a/src/java/voldemort/client/protocol/admin/AdminClient.java +++ b/src/java/voldemort/client/protocol/admin/AdminClient.java @@ -1614,9 +1614,10 @@ public Iterator>> fetchEntries(int nodeId, return fetchEntries(nodeId, storeName, partitionList, filter, fetchMasterEntries, 0); } - // TODO: " HashMap> replicaToPartitionList," is a + // TODO: "HashMap> replicaToPartitionList" is a // confusing/opaque argument. Can this be made a type, or even - // unrolled/simplified? + // unrolled/simplified? The replicaType is pretty much meaningless + // anyhow. // TODO: The use of "Pair" in the return for a fundamental type is // awkward. We should have a core KeyValue type that effectively wraps diff --git a/src/java/voldemort/client/protocol/pb/ProtoUtils.java b/src/java/voldemort/client/protocol/pb/ProtoUtils.java index efc97b8538..a7d81928d0 100644 --- a/src/java/voldemort/client/protocol/pb/ProtoUtils.java +++ b/src/java/voldemort/client/protocol/pb/ProtoUtils.java @@ -147,6 +147,12 @@ public static List encodePartitionTuple(HashMap> decodePartitionTuple(List partitionTuples) { HashMap> replicaToPartitionList = Maps.newHashMap(); for(PartitionTuple tuple: partitionTuples) { diff --git a/src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java b/src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java index 8a70b3c8b6..e976facd32 100644 --- a/src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java @@ -1344,6 +1344,7 @@ public VAdminProto.AddStoreResponse handleAddStore(VAdminProto.AddStoreRequest r * returns * @return True if the buffer holds a complete request, false otherwise */ + @Override public boolean isCompleteRequest(ByteBuffer buffer) { DataInputStream inputStream = new DataInputStream(new ByteBufferBackedInputStream(buffer)); diff --git a/src/java/voldemort/server/protocol/admin/FetchEntriesStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FetchEntriesStreamRequestHandler.java index 4a418d9b42..20e17635f9 100644 --- a/src/java/voldemort/server/protocol/admin/FetchEntriesStreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/FetchEntriesStreamRequestHandler.java @@ -30,25 +30,20 @@ import voldemort.store.metadata.MetadataStore; import voldemort.store.stats.StreamingStats.Operation; import voldemort.utils.ByteArray; -import voldemort.utils.ClosableIterator; import voldemort.utils.NetworkClassLoader; import voldemort.utils.StoreInstance; -import voldemort.utils.Time; import voldemort.versioning.Versioned; import com.google.protobuf.Message; /** - * FetchEntries fetches and return key/value entry. + * Fetches entries by scanning entire storage engine in storage-order. *

* For performance reason use storageEngine.keys() iterator to filter out * unwanted keys and then call storageEngine.get() for valid keys. *

*/ - -public class FetchEntriesStreamRequestHandler extends FetchStreamRequestHandler { - - protected final ClosableIterator keyIterator; +public class FetchEntriesStreamRequestHandler extends FetchItemsStreamRequestHandler { public FetchEntriesStreamRequestHandler(FetchPartitionEntriesRequest request, MetadataStore metadataStore, @@ -63,7 +58,6 @@ public FetchEntriesStreamRequestHandler(FetchPartitionEntriesRequest request, storeRepository, networkClassLoader, Operation.FETCH_ENTRIES); - this.keyIterator = storageEngine.keys(); logger.info("Starting fetch entries for store '" + storageEngine.getName() + "' with replica to partition mapping " + replicaToPartitionList); } @@ -72,27 +66,28 @@ public FetchEntriesStreamRequestHandler(FetchPartitionEntriesRequest request, public StreamRequestHandlerState handleRequest(DataInputStream inputStream, DataOutputStream outputStream) throws IOException { - if(!keyIterator.hasNext()) + if(!keyIterator.hasNext()) { return StreamRequestHandlerState.COMPLETE; + } + + // NOTE: Storage time is accounted for somewhat incorrectly because + // .hasNext() is invoked at end of method for the common case. + // Since key reading (keyIterator.next()) is done separately from entry + // fetching (storageEngine.get()), must be careful about when to invoke + // reportStorageOpTime and when to invoke maybeThrottle(). long startNs = System.nanoTime(); ByteArray key = keyIterator.next(); - if(streamStats != null) - streamStats.reportStreamingScan(operation); - boolean entryAccepted = false; + // Cannot invoke 'throttler.maybeThrottle(key.length());' here since + // that would affect timing measurements of storage operations. + boolean entryAccepted = false; if(!fetchOrphaned) { - // normal code path - if(StoreInstance.checkKeyBelongsToPartition(nodeId, - key.get(), - replicaToPartitionList, - initialCluster, - storeDef)) { + if(keyIsNeeded(key.get())) { entryAccepted = true; } } else { - // orphaned fetches if(!StoreInstance.checkKeyBelongsToNode(key.get(), nodeId, initialCluster, storeDef)) { entryAccepted = true; } @@ -100,55 +95,44 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, if(entryAccepted) { List> values = storageEngine.get(key, null); - if(streamStats != null) - streamStats.reportStorageTime(operation, System.nanoTime() - startNs); + reportStorageOpTime(startNs); + throttler.maybeThrottle(key.length()); for(Versioned value: values) { - throttler.maybeThrottle(key.length()); + if(filter.accept(key, value)) { - fetched++; - if(streamStats != null) - streamStats.reportStreamingFetch(operation); - VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder(); + keyFetched(key.get()); + VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder(); VAdminProto.PartitionEntry partitionEntry = VAdminProto.PartitionEntry.newBuilder() .setKey(ProtoUtils.encodeBytes(key)) .setVersioned(ProtoUtils.encodeVersioned(value)) .build(); response.setPartitionEntry(partitionEntry); - Message message = response.build(); - startNs = System.nanoTime(); - ProtoUtils.writeMessage(outputStream, message); - if(streamStats != null) - streamStats.reportNetworkTime(operation, System.nanoTime() - startNs); + sendMessage(outputStream, message); throttler.maybeThrottle(AdminServiceRequestHandler.valueSize(value)); } } } else { - if(streamStats != null) - streamStats.reportStorageTime(operation, System.nanoTime() - startNs); + reportStorageOpTime(startNs); + throttler.maybeThrottle(key.length()); } // log progress - counter++; - - if(0 == counter % STAT_RECORDS_INTERVAL) { - long totalTime = (System.currentTimeMillis() - startTime) / Time.MS_PER_SECOND; - - logger.info("Fetch entries scanned " + counter + " entries, fetched " + fetched - + " entries for store '" + storageEngine.getName() - + "' replicaToPartitionList:" + replicaToPartitionList + " in " + totalTime - + " s"); + scanned++; + if(0 == scanned % STAT_RECORDS_INTERVAL) { + progressInfoMessage("Fetch entries (progress)"); } - // TODO: Add logic to FetchKeys and FetchEntries to count keys per - // partition correctly. - - if(keyIterator.hasNext() && counter < recordsPerPartition) + if(keyIterator.hasNext() && !fetchedEnough()) { return StreamRequestHandlerState.WRITING; - else { + } else { + logger.info("Finished fetch entries for store '" + storageEngine.getName() + + "' with replica to partition mapping " + replicaToPartitionList); + progressInfoMessage("Fetch entries (end of scan)"); + return StreamRequestHandlerState.COMPLETE; } } diff --git a/src/java/voldemort/server/protocol/admin/FetchItemsStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FetchItemsStreamRequestHandler.java new file mode 100644 index 0000000000..b57f80d5eb --- /dev/null +++ b/src/java/voldemort/server/protocol/admin/FetchItemsStreamRequestHandler.java @@ -0,0 +1,157 @@ +package voldemort.server.protocol.admin; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import voldemort.client.protocol.pb.VAdminProto.FetchPartitionEntriesRequest; +import voldemort.server.StoreRepository; +import voldemort.server.VoldemortConfig; +import voldemort.store.ErrorCodeMapper; +import voldemort.store.metadata.MetadataStore; +import voldemort.store.stats.StreamingStats; +import voldemort.utils.ByteArray; +import voldemort.utils.ClosableIterator; +import voldemort.utils.NetworkClassLoader; +import voldemort.utils.StoreInstance; +import voldemort.utils.Utils; + +/** + * Base class for key/entry stream fetching handlers that do not rely on PID + * layout. + * + */ +public abstract class FetchItemsStreamRequestHandler extends FetchStreamRequestHandler { + + protected final ClosableIterator keyIterator; + + // PartitionId to count of fetches on that partition. + protected Map partitionFetches; + + public FetchItemsStreamRequestHandler(FetchPartitionEntriesRequest request, + MetadataStore metadataStore, + ErrorCodeMapper errorCodeMapper, + VoldemortConfig voldemortConfig, + StoreRepository storeRepository, + NetworkClassLoader networkClassLoader, + StreamingStats.Operation operation) { + super(request, + metadataStore, + errorCodeMapper, + voldemortConfig, + storeRepository, + networkClassLoader, + operation); + + this.keyIterator = storageEngine.keys(); + + this.partitionFetches = new HashMap(); + for(Integer replicaType: replicaToPartitionList.keySet()) { + if(replicaToPartitionList.get(replicaType) != null) { + for(Integer partitionId: replicaToPartitionList.get(replicaType)) { + this.partitionFetches.put(partitionId, new Long(0)); + } + } + } + } + + /** + * Given the key, figures out which partition on the local node hosts the + * key based on contents of the "replica to partition list" data structure. + * + * @param key + * @return + */ + private Integer getKeyPartitionId(byte[] key) { + StoreInstance storeInstance = new StoreInstance(initialCluster, storeDef); + Integer keyPartitionId = null; + for(Integer partitionId: storeInstance.getReplicationPartitionList(key)) { + for(Map.Entry> rtps: replicaToPartitionList.entrySet()) { + if(rtps.getValue().contains(partitionId)) { + keyPartitionId = partitionId; + break; + } + } + } + Utils.notNull(keyPartitionId); + return keyPartitionId; + } + + /** + * Determines if the key is needed. To be 'needed', a key must (i) belong to + * a partition being requested and (ii) be necessary to meet + * recordsPerPartition constraint, if any. + * + * @param nodeId + * @param key + * @param replicaToPartitionList + * @param cluster + * @param storeDef + * @return true iff key is needed. + */ + protected boolean keyIsNeeded(byte[] key) { + if(!StoreInstance.checkKeyBelongsToPartition(nodeId, + key, + replicaToPartitionList, + initialCluster, + storeDef)) { + return false; + } + + if(recordsPerPartition <= 0) { + return true; + } + + Integer keyPartitionId = getKeyPartitionId(key); + Long partitionFetch = partitionFetches.get(keyPartitionId); + Utils.notNull(partitionFetch); + + if(partitionFetch >= recordsPerPartition) { + return false; + } + + return true; + } + + /** + * Account for key being fetched. + * + * @param key + */ + protected void keyFetched(byte[] key) { + fetched++; + if(streamStats != null) { + streamStats.reportStreamingFetch(operation); + } + + if(recordsPerPartition <= 0) { + return; + } + + Integer keyPartitionId = getKeyPartitionId(key); + Long partitionFetch = partitionFetches.get(keyPartitionId); + Utils.notNull(partitionFetch); + + partitionFetches.put(keyPartitionId, partitionFetch + 1); + } + + /** + * True iff enough partitions have been fetched relative to + * recordsPerPartition value. + * + * @param partitionFetched Records fetched for current partition + * @return + */ + protected boolean fetchedEnough() { + if(recordsPerPartition <= 0) { + return false; + } + + for(Map.Entry partitionFetch: partitionFetches.entrySet()) { + if(partitionFetch.getValue() < recordsPerPartition) { + return false; + } + } + return true; + } +} diff --git a/src/java/voldemort/server/protocol/admin/FetchKeysStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FetchKeysStreamRequestHandler.java index 407984b0d2..47ed06ea79 100644 --- a/src/java/voldemort/server/protocol/admin/FetchKeysStreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/FetchKeysStreamRequestHandler.java @@ -29,15 +29,16 @@ import voldemort.store.metadata.MetadataStore; import voldemort.store.stats.StreamingStats.Operation; import voldemort.utils.ByteArray; -import voldemort.utils.ClosableIterator; import voldemort.utils.NetworkClassLoader; import voldemort.utils.StoreInstance; import com.google.protobuf.Message; -public class FetchKeysStreamRequestHandler extends FetchStreamRequestHandler { - - protected final ClosableIterator keyIterator; +/** + * Fetches keys by scanning entire storage engine in storage-order. + * + */ +public class FetchKeysStreamRequestHandler extends FetchItemsStreamRequestHandler { public FetchKeysStreamRequestHandler(FetchPartitionEntriesRequest request, MetadataStore metadataStore, @@ -52,7 +53,6 @@ public FetchKeysStreamRequestHandler(FetchPartitionEntriesRequest request, storeRepository, networkClassLoader, Operation.FETCH_KEYS); - this.keyIterator = storageEngine.keys(); logger.info("Starting fetch keys for store '" + storageEngine.getName() + "' with replica to partition mapping " + replicaToPartitionList); } @@ -61,24 +61,21 @@ public FetchKeysStreamRequestHandler(FetchPartitionEntriesRequest request, public StreamRequestHandlerState handleRequest(DataInputStream inputStream, DataOutputStream outputStream) throws IOException { - if(!keyIterator.hasNext()) + if(!keyIterator.hasNext()) { return StreamRequestHandlerState.COMPLETE; + } + // NOTE: Storage time is accounted for somewhat incorrectly because + // .hasNext() is invoked at end of method for the common case. long startNs = System.nanoTime(); ByteArray key = keyIterator.next(); - if(streamStats != null) { - streamStats.reportStorageTime(operation, System.nanoTime() - startNs); - streamStats.reportStreamingScan(operation); - } + reportStorageOpTime(startNs); throttler.maybeThrottle(key.length()); + boolean keyAccepted = false; if(!fetchOrphaned) { - if(StoreInstance.checkKeyBelongsToPartition(nodeId, - key.get(), - replicaToPartitionList, - initialCluster, - storeDef) && filter.accept(key, null)) { + if(keyIsNeeded(key.get()) && filter.accept(key, null)) { keyAccepted = true; } @@ -87,51 +84,30 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, keyAccepted = true; } } + if(keyAccepted) { + keyFetched(key.get()); + VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder(); response.setKey(ProtoUtils.encodeBytes(key)); - - fetched++; - if(streamStats != null) - streamStats.reportStreamingFetch(operation); Message message = response.build(); - startNs = System.nanoTime(); - ProtoUtils.writeMessage(outputStream, message); - if(streamStats != null) { - // TODO: The accounting for streaming reads should also - // move along with the next() call since we are indeed - // fetching from disk.. ---VChandar - - streamStats.reportNetworkTime(operation, System.nanoTime() - startNs); - } + sendMessage(outputStream, message); } // log progress - counter++; - - if(0 == counter % STAT_RECORDS_INTERVAL) { - long totalTime = (System.currentTimeMillis() - startTime) / 1000; - - logger.info("Fetch keys scanned " + counter + " keys, fetched " + fetched - + " keys for store '" + storageEngine.getName() - + "' replicaToPartitionList:" + replicaToPartitionList + " in " + totalTime - + " s"); + scanned++; + if(0 == scanned % STAT_RECORDS_INTERVAL) { + progressInfoMessage("Fetch keys (progress)"); } - // TODO: make usage clearer. Rename maxRecords to recordsPerPartition. - // And, make recordsPerPartition <=0 mean 'get them all'. - - // TODO: Remove skipRecords from message and from code. - - // TODO: Make sure the distinction between FetchKeysStream and - // FetchPartitionKeysStream is clear. - - // TODO: Add logic to FetchKeys and FetchEntries to count keys per - // partition correctly. - if(keyIterator.hasNext() && (counter < recordsPerPartition)) + if(keyIterator.hasNext() && !fetchedEnough()) { return StreamRequestHandlerState.WRITING; - else { + } else { + logger.info("Finished fetch keys for store '" + storageEngine.getName() + + "' with replica to partition mapping " + replicaToPartitionList); + progressInfoMessage("Fetch keys (end of scan)"); + return StreamRequestHandlerState.COMPLETE; } } diff --git a/src/java/voldemort/server/protocol/admin/FetchPartitionEntriesStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FetchPartitionEntriesStreamRequestHandler.java index d3419e3dde..1b9f642609 100644 --- a/src/java/voldemort/server/protocol/admin/FetchPartitionEntriesStreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/FetchPartitionEntriesStreamRequestHandler.java @@ -19,10 +19,6 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; import voldemort.client.protocol.pb.ProtoUtils; import voldemort.client.protocol.pb.VAdminProto; @@ -37,22 +33,18 @@ import voldemort.utils.NetworkClassLoader; import voldemort.utils.Pair; import voldemort.utils.StoreInstance; -import voldemort.utils.Time; import voldemort.versioning.Versioned; import com.google.protobuf.Message; /** - * Fetches the entries using an efficient partition scan + * Fetches entries using an efficient partition scan. Of course, only works if + * isPartitionScanSupported() is true for the storage engine to be scanned.. * */ -public class FetchPartitionEntriesStreamRequestHandler extends FetchStreamRequestHandler { +public class FetchPartitionEntriesStreamRequestHandler extends FetchPartitionStreamRequestHandler { - protected Set fetchedPartitions; protected ClosableIterator>> entriesPartitionIterator; - protected List replicaTypeList; - protected List partitionList; - protected Integer currentIndex; public FetchPartitionEntriesStreamRequestHandler(FetchPartitionEntriesRequest request, MetadataStore metadataStore, @@ -69,21 +61,8 @@ public FetchPartitionEntriesStreamRequestHandler(FetchPartitionEntriesRequest re Operation.FETCH_ENTRIES); logger.info("Starting fetch entries for store '" + storageEngine.getName() + "' with replica to partition mapping " + replicaToPartitionList); - fetchedPartitions = new HashSet(); - replicaTypeList = new ArrayList(); - partitionList = new ArrayList(); - currentIndex = 0; - entriesPartitionIterator = null; - // flatten the replicatype to partition map - for(Integer replicaType: replicaToPartitionList.keySet()) { - if(replicaToPartitionList.get(replicaType) != null) { - for(Integer partitionId: replicaToPartitionList.get(replicaType)) { - partitionList.add(partitionId); - replicaTypeList.add(replicaType); - } - } - } + entriesPartitionIterator = null; } @Override @@ -93,18 +72,16 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, // process the next partition if(entriesPartitionIterator == null) { - if(currentIndex == partitionList.size() || counter >= recordsPerPartition) { - // TODO: Make .info consistent - logger.info("Done fetching store " + storageEngine.getName() + " : " + counter - + " records processed."); + + if(currentIndex == partitionList.size()) { return StreamRequestHandlerState.COMPLETE; } - boolean found = false; // find the next partition to scan + boolean found = false; while(!found && (currentIndex < partitionList.size())) { - Integer currentPartition = partitionList.get(currentIndex); - Integer currentReplicaType = replicaTypeList.get(currentIndex); + currentPartition = new Integer(partitionList.get(currentIndex)); + currentReplicaType = new Integer(replicaTypeList.get(currentIndex)); // Check the current node contains the partition as the // requested replicatype @@ -114,11 +91,11 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, nodeId, initialCluster, storeDef)) { - fetchedPartitions.add(currentPartition); found = true; - logger.info("Fetching [partition: " + currentPartition + ", replica type:" - + currentReplicaType + "] for store " + storageEngine.getName()); + fetchedPartitions.add(currentPartition); + partitionFetched = 0; entriesPartitionIterator = storageEngine.entries(currentPartition); + statusInfoMessage("Starting fetch entries"); } currentIndex++; } @@ -126,24 +103,17 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, long startNs = System.nanoTime(); // do a check before reading in case partition has 0 elements if(entriesPartitionIterator.hasNext()) { - counter++; Pair> entry = entriesPartitionIterator.next(); - - // do the filtering - if(streamStats != null) { - streamStats.reportStorageTime(operation, System.nanoTime() - startNs); - streamStats.reportStreamingScan(operation); - } ByteArray key = entry.getFirst(); Versioned value = entry.getSecond(); + reportStorageOpTime(startNs); throttler.maybeThrottle(key.length()); + if(filter.accept(key, value)) { - fetched++; - if(streamStats != null) - streamStats.reportStreamingFetch(operation); - VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder(); + recordFetched(); + VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder(); VAdminProto.PartitionEntry partitionEntry = VAdminProto.PartitionEntry.newBuilder() .setKey(ProtoUtils.encodeBytes(key)) .setVersioned(ProtoUtils.encodeVersioned(value)) @@ -151,31 +121,24 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, response.setPartitionEntry(partitionEntry); Message message = response.build(); - startNs = System.nanoTime(); - ProtoUtils.writeMessage(outputStream, message); - if(streamStats != null) - streamStats.reportNetworkTime(operation, System.nanoTime() - startNs); + sendMessage(outputStream, message); + throttler.maybeThrottle(AdminServiceRequestHandler.valueSize(value)); } - // log progress - if(0 == counter % STAT_RECORDS_INTERVAL) { - long totalTime = (System.currentTimeMillis() - startTime) / Time.MS_PER_SECOND; - - logger.info("Fetch entries scanned " + counter + " entries, fetched " + fetched - + " entries for store '" + storageEngine.getName() - + "' replicaToPartitionList:" + replicaToPartitionList + " in " - + totalTime + " s"); + scanned++; + if(0 == scanned % STAT_RECORDS_INTERVAL) { + progressInfoMessage("Fetch entries (progress)"); } } - // TODO: Add logic to FetchKeys and FetchEntries to count keys per - // partition correctly. - - // reset the iterator if done with this partition - if(!entriesPartitionIterator.hasNext() || counter >= recordsPerPartition) { + if(!entriesPartitionIterator.hasNext() || fetchedEnough(partitionFetched)) { + // Finished current partition. Reset iterator. Info status. entriesPartitionIterator.close(); entriesPartitionIterator = null; + + statusInfoMessage("Finished fetch keys"); + progressInfoMessage("Fetch entries (end of partition)"); } } return StreamRequestHandlerState.WRITING; diff --git a/src/java/voldemort/server/protocol/admin/FetchPartitionKeysStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FetchPartitionKeysStreamRequestHandler.java index 55b51b29a6..1f0069aa1d 100644 --- a/src/java/voldemort/server/protocol/admin/FetchPartitionKeysStreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/FetchPartitionKeysStreamRequestHandler.java @@ -19,10 +19,6 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; import voldemort.client.protocol.pb.ProtoUtils; import voldemort.client.protocol.pb.VAdminProto; @@ -36,21 +32,17 @@ import voldemort.utils.ClosableIterator; import voldemort.utils.NetworkClassLoader; import voldemort.utils.StoreInstance; -import voldemort.utils.Time; import com.google.protobuf.Message; /** - * Fetches the keys using an efficient partition scan + * Fetches keys using an efficient partition scan. Of course, only works if + * isPartitionScanSupported() is true for the storage engine to be scanned.. * */ -public class FetchPartitionKeysStreamRequestHandler extends FetchStreamRequestHandler { +public class FetchPartitionKeysStreamRequestHandler extends FetchPartitionStreamRequestHandler { protected ClosableIterator keysPartitionIterator; - protected Set fetchedPartitions; - protected List replicaTypeList; - protected List partitionList; - protected Integer currentIndex; public FetchPartitionKeysStreamRequestHandler(FetchPartitionEntriesRequest request, MetadataStore metadataStore, @@ -67,21 +59,8 @@ public FetchPartitionKeysStreamRequestHandler(FetchPartitionEntriesRequest reque Operation.FETCH_KEYS); logger.info("Starting fetch keys for store '" + storageEngine.getName() + "' with replica to partition mapping " + replicaToPartitionList); - fetchedPartitions = new HashSet(); - replicaTypeList = new ArrayList(); - partitionList = new ArrayList(); - currentIndex = 0; - keysPartitionIterator = null; - // flatten the replicatype to partition map - for(Integer replicaType: replicaToPartitionList.keySet()) { - if(replicaToPartitionList.get(replicaType) != null) { - for(Integer partitionId: replicaToPartitionList.get(replicaType)) { - partitionList.add(partitionId); - replicaTypeList.add(replicaType); - } - } - } + keysPartitionIterator = null; } @Override @@ -91,19 +70,16 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, // process the next partition if(keysPartitionIterator == null) { - if(currentIndex == partitionList.size() || counter >= recordsPerPartition) { - // TODO: Make all .info messages consistent. "Records fetched" - // instead of "Done fetching". - logger.info("Done fetching store " + storageEngine.getName() + " : " + counter - + " records processed."); + + if(currentIndex == partitionList.size()) { return StreamRequestHandlerState.COMPLETE; } + // find the next partition to scan and set currentIndex. boolean found = false; - // find the next partition to scan while(!found && (currentIndex < partitionList.size())) { - Integer currentPartition = partitionList.get(currentIndex); - Integer currentReplicaType = replicaTypeList.get(currentIndex); + currentPartition = partitionList.get(currentIndex); + currentReplicaType = replicaTypeList.get(currentIndex); // Check the current node contains the partition as the // requested replicatype @@ -113,11 +89,11 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, nodeId, initialCluster, storeDef)) { - fetchedPartitions.add(currentPartition); found = true; - logger.info("Fetching [partition: " + currentPartition + ", replica type:" - + currentReplicaType + "] for store " + storageEngine.getName()); + fetchedPartitions.add(currentPartition); + partitionFetched = 0; keysPartitionIterator = storageEngine.keys(currentPartition); + statusInfoMessage("Starting fetch keys"); } currentIndex++; } @@ -125,53 +101,34 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, long startNs = System.nanoTime(); // do a check before reading in case partition has 0 elements if(keysPartitionIterator.hasNext()) { - counter++; ByteArray key = keysPartitionIterator.next(); + reportStorageOpTime(startNs); - // do the filtering - if(streamStats != null) { - // TODO: The accounting for streaming reads should also - // move along with the next() call since we are indeed - // fetching from disk.. ---VChandar - streamStats.reportStorageTime(operation, System.nanoTime() - startNs); - streamStats.reportStreamingScan(operation); - } throttler.maybeThrottle(key.length()); + if(filter.accept(key, null)) { + recordFetched(); VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder(); response.setKey(ProtoUtils.encodeBytes(key)); - - fetched++; - if(streamStats != null) - streamStats.reportStreamingFetch(operation); Message message = response.build(); - startNs = System.nanoTime(); - ProtoUtils.writeMessage(outputStream, message); - if(streamStats != null) - streamStats.reportNetworkTime(operation, System.nanoTime() - startNs); + sendMessage(outputStream, message); } - // log progress - if(0 == counter % STAT_RECORDS_INTERVAL) { - long totalTime = (System.currentTimeMillis() - startTime) / Time.MS_PER_SECOND; - - logger.info("Fetch entries scanned " + counter + " entries, fetched " + fetched - + " entries for store '" + storageEngine.getName() - + "' replicaToPartitionList:" + replicaToPartitionList + " in " - + totalTime + " s"); + scanned++; + if(0 == scanned % STAT_RECORDS_INTERVAL) { + progressInfoMessage("Fetch keys (progress)"); } } - // TODO: Add logic to FetchKeys and FetchEntries to count keys per - // partition correctly. - - // reset the iterator if done with this partition or fetched enough - // records - if(!keysPartitionIterator.hasNext() || (counter >= recordsPerPartition)) { + if(!keysPartitionIterator.hasNext() || fetchedEnough(partitionFetched)) { + // Finished current partition. Reset iterator. Info status. keysPartitionIterator.close(); keysPartitionIterator = null; + + statusInfoMessage("Finished fetch keys"); + progressInfoMessage("Fetch keys (end of partition)"); } } return StreamRequestHandlerState.WRITING; diff --git a/src/java/voldemort/server/protocol/admin/FetchPartitionStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FetchPartitionStreamRequestHandler.java new file mode 100644 index 0000000000..20ce143c6d --- /dev/null +++ b/src/java/voldemort/server/protocol/admin/FetchPartitionStreamRequestHandler.java @@ -0,0 +1,123 @@ +/* + * Copyright 2013 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package voldemort.server.protocol.admin; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import voldemort.client.protocol.pb.VAdminProto.FetchPartitionEntriesRequest; +import voldemort.server.StoreRepository; +import voldemort.server.VoldemortConfig; +import voldemort.store.ErrorCodeMapper; +import voldemort.store.metadata.MetadataStore; +import voldemort.store.stats.StreamingStats; +import voldemort.utils.NetworkClassLoader; + +/** + * Base class for key/entry stream fetching handlers that use efficient + * partition scan (PID layout). Of course, only works if + * isPartitionScanSupported() is true for the storage engine to be scanned.. + * + */ +public abstract class FetchPartitionStreamRequestHandler extends FetchStreamRequestHandler { + + protected Set fetchedPartitions; + protected List replicaTypeList; + protected List partitionList; + + protected Integer currentIndex; + protected Integer currentPartition; + protected Integer currentReplicaType; + protected long partitionFetched; + + public FetchPartitionStreamRequestHandler(FetchPartitionEntriesRequest request, + MetadataStore metadataStore, + ErrorCodeMapper errorCodeMapper, + VoldemortConfig voldemortConfig, + StoreRepository storeRepository, + NetworkClassLoader networkClassLoader, + StreamingStats.Operation operation) { + super(request, + metadataStore, + errorCodeMapper, + voldemortConfig, + storeRepository, + networkClassLoader, + operation); + + fetchedPartitions = new HashSet(); + replicaTypeList = new ArrayList(); + partitionList = new ArrayList(); + currentIndex = 0; + + // flatten the replicatype to partition map + for(Integer replicaType: replicaToPartitionList.keySet()) { + if(replicaToPartitionList.get(replicaType) != null) { + for(Integer partitionId: replicaToPartitionList.get(replicaType)) { + partitionList.add(partitionId); + replicaTypeList.add(replicaType); + } + } + } + + currentPartition = null; + currentReplicaType = null; + partitionFetched = 0; + } + + /** + * Simple info message for status + * + * @param tag Message to print out at start of info message + * @param currentIndex current partition index + */ + protected void statusInfoMessage(final String tag) { + if(logger.isInfoEnabled()) { + logger.info(tag + " : [partition: " + currentPartition + ", replica type:" + + currentReplicaType + ", partitionFetched: " + partitionFetched + + "] for store " + storageEngine.getName()); + } + } + + /** + * True iff enough partitions have been fetched relative to + * recordsPerPartition value. + * + * @param partitionFetched Records fetched for current partition + * @return + */ + protected boolean fetchedEnough(long partitionFetched) { + if(recordsPerPartition <= 0) { + return false; + } + return (partitionFetched >= recordsPerPartition); + } + + /** + * Account for fetch. + * + * @param key + */ + protected void recordFetched() { + fetched++; + partitionFetched++; + if(streamStats != null) { + streamStats.reportStreamingFetch(operation); + } + } +} diff --git a/src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java index 727289e0fd..abbda84372 100644 --- a/src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java @@ -42,8 +42,15 @@ import voldemort.utils.ByteArray; import voldemort.utils.EventThrottler; import voldemort.utils.NetworkClassLoader; +import voldemort.utils.Time; import voldemort.xml.ClusterMapper; +import com.google.protobuf.Message; + +/** + * Base class for all key/entry stream fetching handlers. + * + */ public abstract class FetchStreamRequestHandler implements StreamRequestHandler { protected final VAdminProto.FetchPartitionEntriesRequest request; @@ -66,13 +73,13 @@ public abstract class FetchStreamRequestHandler implements StreamRequestHandler protected final StreamingStats.Operation operation; - protected long counter; + protected long scanned; // Read from disk. - protected long recordsPerPartition; + protected long fetched; // Returned to caller. - protected int fetched; + protected final long recordsPerPartition; - protected final long startTime; + protected final long startTimeMs; protected final Logger logger = Logger.getLogger(getClass()); @@ -117,12 +124,13 @@ protected FetchStreamRequestHandler(VAdminProto.FetchPartitionEntriesRequest req } else { this.filter = new DefaultVoldemortFilter(); } - this.startTime = System.currentTimeMillis(); - this.counter = 0; + this.startTimeMs = System.currentTimeMillis(); + this.scanned = 0; - this.recordsPerPartition = 0; if(request.hasRecordsPerPartition() && request.getRecordsPerPartition() > 0) { this.recordsPerPartition = request.getRecordsPerPartition(); + } else { + this.recordsPerPartition = 0; } this.fetchOrphaned = request.hasFetchOrphaned() && request.getFetchOrphaned(); } @@ -144,9 +152,9 @@ public final StreamRequestDirection getDirection() { @Override public void close(DataOutputStream outputStream) throws IOException { - logger.info("Successfully scanned " + counter + " tuples, fetched " + fetched + logger.info("Successfully scanned " + scanned + " tuples, fetched " + fetched + " tuples for store '" + storageEngine.getName() + "' in " - + ((System.currentTimeMillis() - startTime) / 1000) + " s"); + + ((System.currentTimeMillis() - startTimeMs) / 1000) + " s"); ProtoUtils.writeEndOfStream(outputStream); } @@ -164,4 +172,48 @@ public final void handleError(DataOutputStream outputStream, VoldemortException e); } + /** + * Progress info message + * + * @param tag Message that precedes progress info. Indicate 'keys' or + * 'entries'. + */ + protected void progressInfoMessage(final String tag) { + if(logger.isInfoEnabled()) { + long totalTimeS = (System.currentTimeMillis() - startTimeMs) / Time.MS_PER_SECOND; + + logger.info(tag + " : scanned " + scanned + " and fetched " + fetched + " for store '" + + storageEngine.getName() + "' replicaToPartitionList:" + + replicaToPartitionList + " in " + totalTimeS + " s"); + } + } + + /** + * Helper method to send message on outputStream and account for network + * time stats. + * + * @param outputStream + * @param message + * @throws IOException + */ + protected void sendMessage(DataOutputStream outputStream, Message message) throws IOException { + long startNs = System.nanoTime(); + ProtoUtils.writeMessage(outputStream, message); + if(streamStats != null) { + streamStats.reportNetworkTime(operation, System.nanoTime() - startNs); + } + } + + /** + * Helper method to track storage operations & time via StreamingStats. + * + * @param startNs + */ + protected void reportStorageOpTime(long startNs) { + if(streamStats != null) { + streamStats.reportStreamingScan(operation); + streamStats.reportStorageTime(operation, System.nanoTime() - startNs); + } + } + } diff --git a/src/java/voldemort/utils/KeySamplerCLI.java b/src/java/voldemort/utils/KeySamplerCLI.java index e63d6a5caa..d6df8bc027 100644 --- a/src/java/voldemort/utils/KeySamplerCLI.java +++ b/src/java/voldemort/utils/KeySamplerCLI.java @@ -100,12 +100,11 @@ public boolean sampleStores() { public static class NodeSampleResult { public final boolean success; - public final String keyString; + public final String keysString; - NodeSampleResult(boolean success, String keyString) { + NodeSampleResult(boolean success, String keysString) { this.success = success; - // TODO: keysString versus keyString - this.keyString = keyString; + this.keysString = keysString; } } @@ -124,7 +123,7 @@ public NodeSampleResult call() throws Exception { boolean success = false; String storeName = storeDefinition.getName(); - StringBuilder hexKeyStrings = new StringBuilder(); + StringBuilder hexKeysString = new StringBuilder(); // TODO: Change this from a loop to flat st the list of partitoinIds // are sent to server. @@ -167,7 +166,7 @@ public NodeSampleResult call() throws Exception { while(fetchIterator.hasNext()) { ByteArray key = fetchIterator.next(); String hexKeyString = ByteUtils.toHexString(key.get()); - hexKeyStrings.append(hexKeyString + "\n"); + hexKeysString.append(hexKeyString + "\n"); keyCount++; } if(keyCount < recordsPerPartition) { @@ -196,7 +195,7 @@ public NodeSampleResult call() throws Exception { break; } } - return new NodeSampleResult(success, hexKeyStrings.toString()); + return new NodeSampleResult(success, hexKeysString.toString()); } } @@ -233,7 +232,7 @@ public boolean sampleStore(StoreDefinition storeDefinition) { try { NodeSampleResult nodeSampleResult = future.get(); if(nodeSampleResult.success) { - keyWriter.write(nodeSampleResult.keyString); + keyWriter.write(nodeSampleResult.keysString); } else { success = false; logger.error("Sampling on node " + node.getHost() + " of store " diff --git a/src/java/voldemort/utils/StoreInstance.java b/src/java/voldemort/utils/StoreInstance.java index aa7ac936b4..614b5e6f44 100644 --- a/src/java/voldemort/utils/StoreInstance.java +++ b/src/java/voldemort/utils/StoreInstance.java @@ -57,11 +57,26 @@ public StoreDefinition getStoreDefinition() { return storeDefinition; } + /** + * + * @param partitionId + * @return List of partition IDs that replicate the partition ID. + */ public List getReplicationPartitionList(int partitionId) { return new RoutingStrategyFactory().updateRoutingStrategy(storeDefinition, cluster) .getReplicatingPartitionList(partitionId); } + /** + * + * @param key + * @return List of partition IDs that replicate the partition ID. + */ + public List getReplicationPartitionList(final byte[] key) { + return new RoutingStrategyFactory().updateRoutingStrategy(storeDefinition, cluster) + .getReplicatingPartitionList(getMasterPartitionId(key)); + } + public int getMasterPartitionId(final byte[] key) { return new RoutingStrategyFactory().updateRoutingStrategy(storeDefinition, cluster) .getMasterPartition(key); diff --git a/test/unit/voldemort/client/AdminFetchTest.java b/test/unit/voldemort/client/AdminFetchTest.java index cf1b224acc..e0d1ccabcb 100644 --- a/test/unit/voldemort/client/AdminFetchTest.java +++ b/test/unit/voldemort/client/AdminFetchTest.java @@ -65,14 +65,22 @@ public class AdminFetchTest { private HashMap> partitionToKeysMap; private final boolean useNio; + private final Properties properties; - public AdminFetchTest(boolean useNio) { + public AdminFetchTest(boolean useNio, boolean usePIDScan) { this.useNio = useNio; + properties = new Properties(); + if(usePIDScan) { + properties.put("bdb.prefix.keys.with.partitionid", "true"); + } else { + properties.put("bdb.prefix.keys.with.partitionid", "false"); + } } @Parameters public static Collection configs() { - return Arrays.asList(new Object[][] { { true }, { false } }); + return Arrays.asList(new Object[][] { { true, true }, { true, false }, { false, true }, + { false, false } }); } @Before @@ -88,6 +96,7 @@ public void setUp() throws IOException { final int numServers = 2; servers = new VoldemortServer[numServers]; int partitionMap[][] = { { 0, 1, 2, 3 }, { 4, 5, 6, 7 } }; + cluster = ServerTestUtils.startVoldemortCluster(numServers, servers, partitionMap, @@ -95,7 +104,7 @@ public void setUp() throws IOException { this.useNio, null, storesXmlfile, - new Properties()); + properties); List storeDefs = new StoreDefinitionsMapper().readStoreList(new File(storesXmlfile)); @@ -189,6 +198,36 @@ public void testFetchPartitionSecondaryEntries() { assertEquals("Remainder in partition 6" + partition6Keys, 0, partition6Keys.size()); } + @Test + public void testFetchPartitionPrimaryTwoEntries() { + HashMap> replicaToPartitionList = new HashMap>(); + replicaToPartitionList.put(0, Arrays.asList(0, 3)); + Iterator>> entriesItr = adminClient.bulkFetchOps.fetchEntries(0, + testStoreName, + replicaToPartitionList, + null, + false, + cluster, + 2); + Set fetchedKeys = getEntries(entriesItr); + + Set partition0Keys = new HashSet(partitionToKeysMap.get(0)); + int numPartition0Keys = partition0Keys.size(); + partition0Keys.removeAll(fetchedKeys); + assertEquals("Remainder in partition 0 should be two less.", + numPartition0Keys - 2, + partition0Keys.size()); + + Set partition3Keys = new HashSet(partitionToKeysMap.get(3)); + int numPartition3Keys = partition3Keys.size(); + partition3Keys.removeAll(fetchedKeys); + assertEquals("Remainder in partition 3 should be two less.", + numPartition3Keys - 2, + partition3Keys.size()); + + assertEquals("Total of four entries fetched.", 4, fetchedKeys.size()); + } + @Test public void testFetchNonExistentEntriesPrimary() { HashMap> replicaToPartitionList = new HashMap>(); @@ -224,4 +263,124 @@ public void testFetchNonExistentEntriesSecondary() { // 0 as primary assertEquals("Obtained something:" + fetchedKeys, 0, fetchedKeys.size()); } + + private Set getKeys(Iterator itr) { + HashSet keySet = new HashSet(); + while(itr.hasNext()) { + keySet.add(new String(itr.next().get())); + } + return keySet; + } + + @Test + public void testFetchPartitionPrimaryKeys() { + HashMap> replicaToPartitionList = new HashMap>(); + replicaToPartitionList.put(0, Arrays.asList(0, 3)); + Iterator keysItr = adminClient.bulkFetchOps.fetchKeys(0, + testStoreName, + replicaToPartitionList, + null, + false, + cluster, + 0); + // gather all the keys obtained + Set fetchedKeys = getKeys(keysItr); + // make sure it fetched all the keys from the partitions requested + Set partition0Keys = new HashSet(partitionToKeysMap.get(0)); + Set partition3Keys = new HashSet(partitionToKeysMap.get(3)); + + partition0Keys.removeAll(fetchedKeys); + partition3Keys.removeAll(fetchedKeys); + assertEquals("Remainder in partition 0" + partition0Keys, 0, partition0Keys.size()); + assertEquals("Remainder in partition 3" + partition3Keys, 0, partition3Keys.size()); + } + + @Test + public void testFetchPartitionSecondaryKeys() { + HashMap> replicaToPartitionList = new HashMap>(); + replicaToPartitionList.put(1, Arrays.asList(4, 6)); + Iterator keysItr = adminClient.bulkFetchOps.fetchKeys(0, + testStoreName, + replicaToPartitionList, + null, + false, + cluster, + 0); + // gather all the keys obtained + Set fetchedKeys = getKeys(keysItr); + // make sure it fetched all the keys from the partitions requested + Set partition4Keys = new HashSet(partitionToKeysMap.get(4)); + Set partition6Keys = new HashSet(partitionToKeysMap.get(6)); + + partition4Keys.removeAll(fetchedKeys); + partition6Keys.removeAll(fetchedKeys); + assertEquals("Remainder in partition 4" + partition4Keys, 0, partition4Keys.size()); + assertEquals("Remainder in partition 6" + partition6Keys, 0, partition6Keys.size()); + } + + @Test + public void testFetchPartitionPrimaryTwoKeys() { + HashMap> replicaToPartitionList = new HashMap>(); + replicaToPartitionList.put(0, Arrays.asList(0, 3)); + Iterator keysItr = adminClient.bulkFetchOps.fetchKeys(0, + testStoreName, + replicaToPartitionList, + null, + false, + cluster, + 2); + Set fetchedKeys = getKeys(keysItr); + + Set partition0Keys = new HashSet(partitionToKeysMap.get(0)); + int numPartition0Keys = partition0Keys.size(); + partition0Keys.removeAll(fetchedKeys); + assertEquals("Remainder in partition 0 should be two less.", + numPartition0Keys - 2, + partition0Keys.size()); + + Set partition3Keys = new HashSet(partitionToKeysMap.get(3)); + int numPartition3Keys = partition3Keys.size(); + partition3Keys.removeAll(fetchedKeys); + assertEquals("Remainder in partition 3 should be two less.", + numPartition3Keys - 2, + partition3Keys.size()); + + assertEquals("Total of four keys fetched.", 4, fetchedKeys.size()); + } + + @Test + public void testFetchNonExistentKeysPrimary() { + HashMap> replicaToPartitionList = new HashMap>(); + replicaToPartitionList.put(0, Arrays.asList(5, 7)); + Iterator keysItr = adminClient.bulkFetchOps.fetchKeys(0, + testStoreName, + replicaToPartitionList, + null, + false, + cluster, + 0); + // gather all the keys obtained + Set fetchedKeys = getKeys(keysItr); + // make sure it fetched nothing since these partitions belong to server + // 1 + assertEquals("Obtained something:" + fetchedKeys, 0, fetchedKeys.size()); + } + + @Test + public void testFetchNonExistentKeysSecondary() { + HashMap> replicaToPartitionList = new HashMap>(); + replicaToPartitionList.put(1, Arrays.asList(1, 2)); + Iterator keysItr = adminClient.bulkFetchOps.fetchKeys(0, + testStoreName, + replicaToPartitionList, + null, + false, + cluster, + 0); + // gather all the keys obtained + Set fetchedKeys = getKeys(keysItr); + // make sure it fetched nothing since these partitions belong to server + // 0 as primary + assertEquals("Obtained something:" + fetchedKeys, 0, fetchedKeys.size()); + } }