Skip to content

Commit

Permalink
Rewrite of InMemoryStorageEngine + config to control multiVersionPuts
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar committed May 13, 2013
1 parent b743678 commit b3147d3
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 114 deletions.
Expand Up @@ -241,7 +241,7 @@ public List<Versioned<byte[]>> multiVersionPut(ByteArray key,

synchronized(this.locks.lockFor(key.get())) {
valuesInStorage = this.get(key, null);
obsoleteVals = computeVersionsToStore(valuesInStorage, values);
obsoleteVals = resolveAndConstructVersionsToPersist(valuesInStorage, values);

try {
datastore.put(key.get(), assembleValues(valuesInStorage));
Expand Down
22 changes: 22 additions & 0 deletions src/java/voldemort/server/VoldemortConfig.java
Expand Up @@ -202,6 +202,7 @@ public class VoldemortConfig implements Serializable {

private long streamMaxReadBytesPerSec;
private long streamMaxWriteBytesPerSec;
private boolean multiVersionStreamingPutsEnabled;
private int gossipIntervalMs;

private String failureDetectorImplementation;
Expand Down Expand Up @@ -353,6 +354,8 @@ public VoldemortConfig(Props props) {
this.streamMaxReadBytesPerSec = props.getBytes("stream.read.byte.per.sec", 10 * 1000 * 1000);
this.streamMaxWriteBytesPerSec = props.getBytes("stream.write.byte.per.sec",
10 * 1000 * 1000);
this.multiVersionStreamingPutsEnabled = props.getBoolean("use.multi.version.streaming.puts",
true);

this.socketTimeoutMs = props.getInt("socket.timeout.ms", 5000);
this.socketBufferSize = (int) props.getBytes("socket.buffer.size", 64 * 1024);
Expand Down Expand Up @@ -1458,6 +1461,25 @@ public long getSlopMaxWriteBytesPerSec() {
return slopMaxWriteBytesPerSec;
}

/**
* If true, multiple successive versions of the same key, will be atomically
* written to storage in a single operation. Currently not supported for
* MySqlStorageEngine
*
* <ul>
* <li>Property : "use.multi.version.streaming.puts"</li>
* <li>Default : true</li>
* </ul>
*
*/
public void setMultiVersionStreamingPutsEnabled(boolean multiVersionStreamingPutsEnabled) {
this.multiVersionStreamingPutsEnabled = multiVersionStreamingPutsEnabled;
}

public boolean getMultiVersionStreamingPutsEnabled() {
return this.multiVersionStreamingPutsEnabled;
}

/**
* Controls the rate at which the {@link StreamingSlopPusherJob} will send
* slop writes over the wire
Expand Down
Expand Up @@ -582,7 +582,8 @@ public StreamRequestHandler handleFetchPartitionEntries(VAdminProto.FetchPartiti
public StreamRequestHandler handleUpdatePartitionEntries(VAdminProto.UpdatePartitionEntriesRequest request) {
StorageEngine<ByteArray, byte[], byte[]> storageEngine = AdminServiceRequestHandler.getStorageEngine(storeRepository,
request.getStore());
if(storageEngine instanceof MysqlStorageEngine) {
if(!voldemortConfig.getMultiVersionStreamingPutsEnabled()
|| storageEngine instanceof MysqlStorageEngine) {
// TODO This check is ugly. Need some generic capability to check
// which storage engine supports which operations.
return new UpdatePartitionEntriesStreamRequestHandler(request,
Expand Down
Expand Up @@ -20,6 +20,7 @@
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
import voldemort.utils.NetworkClassLoader;
import voldemort.utils.Utils;
import voldemort.versioning.Versioned;

/**
Expand Down Expand Up @@ -67,9 +68,12 @@ protected void finalize() {
}

/**
* Now, it could be that the stream broke off and has more pending versions.
* For now, we simply commit what we have to disk. A better design would
* rely on in-stream markers to do the flushing to storage.
* Persists the current set of versions buffered for the current key into
* storage, using the multiVersionPut api
*
* NOTE: Now, it could be that the stream broke off and has more pending
* versions. For now, we simply commit what we have to disk. A better design
* would rely on in-stream markers to do the flushing to storage.
*/
private void writeBufferedValsToStorage() {
long startNs = System.nanoTime();
Expand All @@ -78,7 +82,8 @@ private void writeBufferedValsToStorage() {
currBufferedVals);
currBufferedVals = new ArrayList<Versioned<byte[]>>(VALS_BUFFER_EXPECTED_SIZE);
if(streamStats != null) {
streamStats.reportStorageTime(Operation.UPDATE_ENTRIES, System.nanoTime() - startNs);
streamStats.reportStorageTime(Operation.UPDATE_ENTRIES,
Utils.elapsedTimeNs(startNs, System.nanoTime()));
streamStats.reportStreamingPut(Operation.UPDATE_ENTRIES);
}

Expand Down Expand Up @@ -127,8 +132,8 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
if(logger.isTraceEnabled())
logger.trace("Incomplete read for message size");
if(streamStats != null)
streamStats.reportNetworkTime(Operation.UPDATE_ENTRIES, System.nanoTime()
- startNs);
streamStats.reportNetworkTime(Operation.UPDATE_ENTRIES,
Utils.elapsedTimeNs(startNs, System.nanoTime()));
return StreamRequestHandlerState.INCOMPLETE_READ;
}

Expand All @@ -142,8 +147,8 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
if(logger.isTraceEnabled())
logger.trace("Message size -1, completed partition update");
if(streamStats != null)
streamStats.reportNetworkTime(Operation.UPDATE_ENTRIES, System.nanoTime()
- startNs);
streamStats.reportNetworkTime(Operation.UPDATE_ENTRIES,
Utils.elapsedTimeNs(startNs, System.nanoTime()));
return StreamRequestHandlerState.COMPLETE;
}

Expand All @@ -161,8 +166,8 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
return StreamRequestHandlerState.INCOMPLETE_READ;
} finally {
if(streamStats != null)
streamStats.reportNetworkTime(Operation.UPDATE_ENTRIES, System.nanoTime()
- startNs);
streamStats.reportNetworkTime(Operation.UPDATE_ENTRIES,
Utils.elapsedTimeNs(startNs, System.nanoTime()));
}

VAdminProto.UpdatePartitionEntriesRequest.Builder builder = VAdminProto.UpdatePartitionEntriesRequest.newBuilder();
Expand All @@ -175,8 +180,9 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
Versioned<byte[]> value = ProtoUtils.decodeVersioned(partitionEntry.getVersioned());

if(filter.accept(key, value)) {
// Check if the current key is same as the one before.
if(currBufferedKey != null && !key.equals(currBufferedKey)) {
// write buffered values to storage
// if not, write buffered values for the previous key to storage
writeBufferedValsToStorage();
}
currBufferedKey = key;
Expand Down
7 changes: 4 additions & 3 deletions src/java/voldemort/store/AbstractStorageEngine.java
Expand Up @@ -74,8 +74,8 @@ public boolean endBatchModifications() {
* @param multiPutValues list of new versions being written to storage
* @return list of versions from multiPutVals that were rejected as obsolete
*/
protected List<Versioned<V>> computeVersionsToStore(List<Versioned<V>> valuesInStorage,
List<Versioned<V>> multiPutValues) {
protected List<Versioned<V>> resolveAndConstructVersionsToPersist(List<Versioned<V>> valuesInStorage,
List<Versioned<V>> multiPutValues) {
List<Versioned<V>> obsoleteVals = new ArrayList<Versioned<V>>(multiPutValues.size());
// Go over all the values and determine whether the version is
// acceptable
Expand All @@ -89,8 +89,9 @@ protected List<Versioned<V>> computeVersionsToStore(List<Versioned<V>> valuesInS
if(occurred == Occurred.BEFORE) {
obsolete = true;
break;
} else if(occurred == Occurred.AFTER)
} else if(occurred == Occurred.AFTER) {
iter.remove();
}
}
if(obsolete) {
// add to return value if obsolete
Expand Down
2 changes: 1 addition & 1 deletion src/java/voldemort/store/bdb/BdbStorageEngine.java
Expand Up @@ -693,7 +693,7 @@ public List<Versioned<byte[]>> multiVersionPut(ByteArray key,
valuesInStorage = new ArrayList<Versioned<byte[]>>(values.size());
}

obsoleteVals = computeVersionsToStore(valuesInStorage, values);
obsoleteVals = resolveAndConstructVersionsToPersist(valuesInStorage, values);
valueEntry.setData(StoreBinaryFormat.toByteArray(valuesInStorage));
status = getBdbDatabase().put(transaction, keyEntry, valueEntry);

Expand Down

0 comments on commit b3147d3

Please sign in to comment.