Skip to content

Commit

Permalink
Addressed all code review comments for KeySampler and KeyVersionFetch…
Browse files Browse the repository at this point in the history
…er. Renamed many classes and methods related to FetchStreamRequestHandler.

- All sub-classes of FetchStreamRequestHandler have been renamed to have a more consistent nomenclature.
- Did some further refactoring in the FullScan* classes to move more work from leaf classes to FullScanFetchRequestHandler.java
- moved scan accounting to overall bae class
- Added getNodesPartitionIdForKey method to StoreInstance to help with some fetch logic
  • Loading branch information
jayjwylie committed Mar 20, 2013
1 parent 96862f2 commit 1a0b8f7
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 192 deletions.
Expand Up @@ -547,29 +547,29 @@ public StreamRequestHandler handleFetchPartitionEntries(VAdminProto.FetchPartiti

if(fetchValues) {
if(storageEngine.isPartitionScanSupported() && !fetchOrphaned)
return new FetchPartitionEntriesStreamRequestHandler(request,
return new PartitionScanFetchEntriesRequestHandler(request,
metadataStore,
errorCodeMapper,
voldemortConfig,
storeRepository,
networkClassLoader);
else
return new FetchEntriesStreamRequestHandler(request,
return new FullScanFetchEntriesRequestHandler(request,
metadataStore,
errorCodeMapper,
voldemortConfig,
storeRepository,
networkClassLoader);
} else {
if(storageEngine.isPartitionScanSupported() && !fetchOrphaned)
return new FetchPartitionKeysStreamRequestHandler(request,
return new PartitionScanFetchKeysRequestHandler(request,
metadataStore,
errorCodeMapper,
voldemortConfig,
storeRepository,
networkClassLoader);
else
return new FetchKeysStreamRequestHandler(request,
return new FullScanFetchKeysRequestHandler(request,
metadataStore,
errorCodeMapper,
voldemortConfig,
Expand Down
Expand Up @@ -42,6 +42,7 @@
import voldemort.utils.ByteArray;
import voldemort.utils.EventThrottler;
import voldemort.utils.NetworkClassLoader;
import voldemort.utils.StoreInstance;
import voldemort.utils.Time;
import voldemort.xml.ClusterMapper;

Expand Down Expand Up @@ -85,10 +86,12 @@ public abstract class FetchStreamRequestHandler implements StreamRequestHandler

protected int nodeId;

protected StoreDefinition storeDef;
protected final StoreDefinition storeDef;

protected boolean fetchOrphaned;

protected final StoreInstance storeInstance;

protected FetchStreamRequestHandler(VAdminProto.FetchPartitionEntriesRequest request,
MetadataStore metadataStore,
ErrorCodeMapper errorCodeMapper,
Expand Down Expand Up @@ -116,6 +119,8 @@ protected FetchStreamRequestHandler(VAdminProto.FetchPartitionEntriesRequest req
} else {
this.initialCluster = metadataStore.getCluster();
}
this.storeInstance = new StoreInstance(this.initialCluster, this.storeDef);

this.throttler = new EventThrottler(voldemortConfig.getStreamMaxReadBytesPerSec());
if(request.hasFilter()) {
this.filter = AdminServiceRequestHandler.getFilterFromRequest(request.getFilter(),
Expand Down Expand Up @@ -188,6 +193,19 @@ protected void progressInfoMessage(final String tag) {
}
}

/**
* Account for item being scanned.
*
* @param itemTag mad libs style string to insert into progress message.
*
*/
protected void accountForScanProgress(String itemTag) {
scanned++;
if(0 == scanned % STAT_RECORDS_INTERVAL) {
progressInfoMessage("Fetch " + itemTag + " (progress)");
}
}

/**
* Helper method to send message on outputStream and account for network
* time stats.
Expand Down
Expand Up @@ -31,7 +31,6 @@
import voldemort.store.stats.StreamingStats.Operation;
import voldemort.utils.ByteArray;
import voldemort.utils.NetworkClassLoader;
import voldemort.utils.StoreInstance;
import voldemort.versioning.Versioned;

import com.google.protobuf.Message;
Expand All @@ -43,14 +42,14 @@
* unwanted keys and then call storageEngine.get() for valid keys.
* <p>
*/
public class FetchEntriesStreamRequestHandler extends FetchItemsStreamRequestHandler {

public FetchEntriesStreamRequestHandler(FetchPartitionEntriesRequest request,
MetadataStore metadataStore,
ErrorCodeMapper errorCodeMapper,
VoldemortConfig voldemortConfig,
StoreRepository storeRepository,
NetworkClassLoader networkClassLoader) {
public class FullScanFetchEntriesRequestHandler extends FullScanFetchStreamRequestHandler {

public FullScanFetchEntriesRequestHandler(FetchPartitionEntriesRequest request,
MetadataStore metadataStore,
ErrorCodeMapper errorCodeMapper,
VoldemortConfig voldemortConfig,
StoreRepository storeRepository,
NetworkClassLoader networkClassLoader) {
super(request,
metadataStore,
errorCodeMapper,
Expand Down Expand Up @@ -82,25 +81,14 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
// Cannot invoke 'throttler.maybeThrottle(key.length());' here since
// that would affect timing measurements of storage operations.

boolean entryAccepted = false;
if(!fetchOrphaned) {
if(keyIsNeeded(key.get())) {
entryAccepted = true;
}
} else {
if(!StoreInstance.checkKeyBelongsToNode(key.get(), nodeId, initialCluster, storeDef)) {
entryAccepted = true;
}
}

if(entryAccepted) {
if(isItemAccepted(key.get())) {
List<Versioned<byte[]>> values = storageEngine.get(key, null);
reportStorageOpTime(startNs);
throttler.maybeThrottle(key.length());
for(Versioned<byte[]> value: values) {

if(filter.accept(key, value)) {
keyFetched(key.get());
accountForFetchedKey(key.get());

VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder();
VAdminProto.PartitionEntry partitionEntry = VAdminProto.PartitionEntry.newBuilder()
Expand All @@ -120,27 +108,9 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
throttler.maybeThrottle(key.length());
}

// log progress
scanned++;
if(0 == scanned % STAT_RECORDS_INTERVAL) {
progressInfoMessage("Fetch entries (progress)");
}

if(keyIterator.hasNext() && !fetchedEnough()) {
return StreamRequestHandlerState.WRITING;
} else {
logger.info("Finished fetch entries for store '" + storageEngine.getName()
+ "' with replica to partition mapping " + replicaToPartitionList);
progressInfoMessage("Fetch entries (end of scan)");
accountForScanProgress("entries");

return StreamRequestHandlerState.COMPLETE;
}
return determineRequestHandlerState("entries");
}

@Override
public final void close(DataOutputStream outputStream) throws IOException {
if(null != keyIterator)
keyIterator.close();
super.close(outputStream);
}
}
Expand Up @@ -30,22 +30,21 @@
import voldemort.store.stats.StreamingStats.Operation;
import voldemort.utils.ByteArray;
import voldemort.utils.NetworkClassLoader;
import voldemort.utils.StoreInstance;

import com.google.protobuf.Message;

/**
* Fetches keys by scanning entire storage engine in storage-order.
*
*/
public class FetchKeysStreamRequestHandler extends FetchItemsStreamRequestHandler {

public FetchKeysStreamRequestHandler(FetchPartitionEntriesRequest request,
MetadataStore metadataStore,
ErrorCodeMapper errorCodeMapper,
VoldemortConfig voldemortConfig,
StoreRepository storeRepository,
NetworkClassLoader networkClassLoader) {
public class FullScanFetchKeysRequestHandler extends FullScanFetchStreamRequestHandler {

public FullScanFetchKeysRequestHandler(FetchPartitionEntriesRequest request,
MetadataStore metadataStore,
ErrorCodeMapper errorCodeMapper,
VoldemortConfig voldemortConfig,
StoreRepository storeRepository,
NetworkClassLoader networkClassLoader) {
super(request,
metadataStore,
errorCodeMapper,
Expand Down Expand Up @@ -73,49 +72,20 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,

throttler.maybeThrottle(key.length());

boolean keyAccepted = false;
if(!fetchOrphaned) {
if(keyIsNeeded(key.get()) && filter.accept(key, null)) {
keyAccepted = true;
}

} else {
if(!StoreInstance.checkKeyBelongsToNode(key.get(), nodeId, initialCluster, storeDef)) {
keyAccepted = true;
}
}

if(keyAccepted) {
keyFetched(key.get());

VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder();
response.setKey(ProtoUtils.encodeBytes(key));
Message message = response.build();
if(isItemAccepted(key.get())) {
if(filter.accept(key, null)) {
accountForFetchedKey(key.get());

sendMessage(outputStream, message);
}
VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder();
response.setKey(ProtoUtils.encodeBytes(key));
Message message = response.build();

// log progress
scanned++;
if(0 == scanned % STAT_RECORDS_INTERVAL) {
progressInfoMessage("Fetch keys (progress)");
sendMessage(outputStream, message);
}
}

if(keyIterator.hasNext() && !fetchedEnough()) {
return StreamRequestHandlerState.WRITING;
} else {
logger.info("Finished fetch keys for store '" + storageEngine.getName()
+ "' with replica to partition mapping " + replicaToPartitionList);
progressInfoMessage("Fetch keys (end of scan)");
accountForScanProgress("keys");

return StreamRequestHandlerState.COMPLETE;
}
}

@Override
public final void close(DataOutputStream outputStream) throws IOException {
if(null != keyIterator)
keyIterator.close();
super.close(outputStream);
return determineRequestHandlerState("keys");
}
}

0 comments on commit 1a0b8f7

Please sign in to comment.