Skip to content

Commit

Permalink
Improve batch modifications on BDB-JE
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar committed Jan 11, 2013
1 parent 903e749 commit ec3b37e
Show file tree
Hide file tree
Showing 23 changed files with 273 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,4 +324,14 @@ public boolean isPartitionAware() {
public boolean isPartitionScanSupported() {
return false;
}

@Override
public boolean beginBatchModifications() {
return false;
}

@Override
public boolean endBatchModifications() {
return false;
}
}
23 changes: 23 additions & 0 deletions src/java/voldemort/server/VoldemortConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public class VoldemortConfig implements Serializable {
private boolean bdbPrefixKeysWithPartitionId;
private boolean bdbLevelBasedEviction;
private boolean bdbProactiveBackgroundMigration;
private boolean bdbCheckpointerOffForBatchWrites;

private String mysqlUsername;
private String mysqlPassword;
Expand Down Expand Up @@ -266,6 +267,8 @@ public VoldemortConfig(Props props) {
this.bdbLevelBasedEviction = props.getBoolean("bdb.evict.by.level", false);
this.bdbProactiveBackgroundMigration = props.getBoolean("bdb.proactive.background.migration",
false);
this.bdbCheckpointerOffForBatchWrites = props.getBoolean("bdb.checkpointer.off.batch.writes",
false);

this.readOnlyBackups = props.getInt("readonly.backups", 1);
this.readOnlySearchStrategy = props.getString("readonly.search.strategy",
Expand Down Expand Up @@ -1121,6 +1124,26 @@ public void setBdbCheckpointBytes(long bdbCheckpointBytes) {
this.bdbCheckpointBytes = bdbCheckpointBytes;
}

/**
* BDB JE Checkpointer will be turned off during batch writes. This helps
* save redundant writing of index updates, as we do say large streaming
* updates
*
* <ul>
* <li>Property : "bdb.checkpointer.off.batch.writes"</li>
* <li>Default : false</li>
* </ul>
*
* @return
*/
public boolean getBdbCheckpointerOffForBatchWrites() {
return this.bdbCheckpointerOffForBatchWrites;
}

public void setBdbCheckpointerOffForBatchWrites(boolean bdbCheckpointerOffForBulkWrites) {
this.bdbCheckpointerOffForBatchWrites = bdbCheckpointerOffForBulkWrites;
}

/**
* BDB JE Checkpointer wakes up whenever this time period elapses
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
Expand Down Expand Up @@ -55,6 +56,8 @@ public class UpdatePartitionEntriesStreamRequestHandler implements StreamRequest

private final Logger logger = Logger.getLogger(getClass());

private AtomicBoolean isBatchWriteOff;

public UpdatePartitionEntriesStreamRequestHandler(UpdatePartitionEntriesRequest request,
ErrorCodeMapper errorCodeMapper,
VoldemortConfig voldemortConfig,
Expand All @@ -76,6 +79,17 @@ public UpdatePartitionEntriesStreamRequestHandler(UpdatePartitionEntriesRequest
} else {
this.streamStats = null;
}
storageEngine.beginBatchModifications();
isBatchWriteOff = new AtomicBoolean(false);
}

@Override
protected void finalize() {
// when the object is GCed, don't forget to end the batch-write mode.
// This is ugly. But the cleanest way to do this, given our network code
// does not guarantee that close() will always be called
if(!isBatchWriteOff.get())
storageEngine.endBatchModifications();
}

public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
Expand Down Expand Up @@ -179,11 +193,12 @@ public StreamRequestDirection getDirection() {

public void close(DataOutputStream outputStream) throws IOException {
ProtoUtils.writeMessage(outputStream, responseBuilder.build());
storageEngine.endBatchModifications();
isBatchWriteOff.compareAndSet(false, true);
}

public void handleError(DataOutputStream outputStream, VoldemortException e) throws IOException {
responseBuilder.setError(ProtoUtils.encodeError(errorCodeMapper, e));

if(logger.isEnabledFor(Level.ERROR))
logger.error("handleUpdatePartitionEntries failed for request(" + request + ")", e);
}
Expand Down
2 changes: 2 additions & 0 deletions src/java/voldemort/server/scheduler/DataCleanupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public DataCleanupJob(StorageEngine<K, V, T> store,

public void run() {
acquireCleanupPermit(progressThisRun);
store.beginBatchModifications();

ClosableIterator<Pair<K, Versioned<V>>> iterator = null;
try {
Expand Down Expand Up @@ -105,6 +106,7 @@ public void run() {
totalEntriesScanned += progressThisRun.get();
progressThisRun.set(0);
}
store.endBatchModifications();
}
}

Expand Down
16 changes: 16 additions & 0 deletions src/java/voldemort/store/StorageEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,20 @@ public interface StorageEngine<K, V, T> extends Store<K, V, T> {
* otherwise
*/
public boolean isPartitionScanSupported();

/**
* A lot of storage engines support efficient methods for performing large
* number of writes (puts/deletes) against the data source. This method puts
* the storage engine in this batch write mode
*
* @return true if the storage engine took successful action to switch to
* 'batch-write' mode
*/
public boolean beginBatchModifications();

/**
*
* @return true if the storage engine successfully returned to normal mode
*/
public boolean endBatchModifications();
}
12 changes: 12 additions & 0 deletions src/java/voldemort/store/bdb/BdbRuntimeConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ public class BdbRuntimeConfig {
public static final LockMode DEFAULT_LOCK_MODE = LockMode.READ_UNCOMMITTED;
public static final boolean DEFAULT_EXPOSE_SPACE_UTIL = true;
public static final boolean DEFAULT_MINIMIZE_SCAN_IMPACT = false;
public static final boolean DEFAULT_TURNOFF_CHECKPOINTER_BATCH_WRITES = false;

private long statsCacheTtlMs = DEFAULT_STATS_CACHE_TTL_MS;
private LockMode lockMode = DEFAULT_LOCK_MODE;
private boolean exposeSpaceUtil = DEFAULT_EXPOSE_SPACE_UTIL;
private boolean minimizeScanImpact = DEFAULT_MINIMIZE_SCAN_IMPACT;
private boolean checkpointerOffForBatchWrites = DEFAULT_TURNOFF_CHECKPOINTER_BATCH_WRITES;

public BdbRuntimeConfig() {

Expand All @@ -32,6 +34,7 @@ public BdbRuntimeConfig(VoldemortConfig config) {
setStatsCacheTtlMs(config.getBdbStatsCacheTtlMs());
setExposeSpaceUtil(config.getBdbExposeSpaceUtilization());
setMinimizeScanImpact(config.getBdbMinimizeScanImpact());
setCheckpointerOffForBatchWrites(config.getBdbCheckpointerOffForBatchWrites());
}

public long getStatsCacheTtlMs() {
Expand Down Expand Up @@ -67,4 +70,13 @@ public boolean getMinimizeScanImpact() {
public void setMinimizeScanImpact(boolean minimizeScanImpact) {
this.minimizeScanImpact = minimizeScanImpact;
}

public boolean isCheckpointerOffForBatchWrites() {
return checkpointerOffForBatchWrites;
}

public void setCheckpointerOffForBatchWrites(boolean checkpointerOffForBulkWrites) {
this.checkpointerOffForBatchWrites = checkpointerOffForBulkWrites;
}

}
43 changes: 43 additions & 0 deletions src/java/voldemort/store/bdb/BdbStorageEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.DatabaseStats;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.EnvironmentMutableConfig;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.StatsConfig;
Expand All @@ -81,6 +83,8 @@ public class BdbStorageEngine implements StorageEngine<ByteArray, byte[], byte[]

protected final BdbEnvironmentStats bdbEnvironmentStats;
protected final boolean minimizeScanImpact;
protected final boolean checkpointerOffForBatchWrites;
private volatile int numOutstandingBatchWriteJobs = 0;

public BdbStorageEngine(String name,
Environment environment,
Expand All @@ -96,6 +100,7 @@ public BdbStorageEngine(String name,
config.getStatsCacheTtlMs(),
config.getExposeSpaceUtil());
this.minimizeScanImpact = config.getMinimizeScanImpact();
this.checkpointerOffForBatchWrites = config.isCheckpointerOffForBatchWrites();
}

public String getName() {
Expand Down Expand Up @@ -636,4 +641,42 @@ public void nativeBackup(File toDir,
AsyncOperationStatus status) {
new BdbNativeBackup(environment, verifyFiles, isIncremental).performBackup(toDir, status);
}

@Override
public boolean beginBatchModifications() {
if(checkpointerOffForBatchWrites) {
synchronized(this) {
numOutstandingBatchWriteJobs++;
// turn the checkpointer off for the first job
if(numOutstandingBatchWriteJobs == 1) {
logger.info("Turning checkpointer off for batch writes");
EnvironmentMutableConfig mConfig = environment.getMutableConfig();
mConfig.setConfigParam(EnvironmentConfig.ENV_RUN_CHECKPOINTER,
Boolean.toString(false));
environment.setMutableConfig(mConfig);
return true;
}
}
}
return false;
}

@Override
public boolean endBatchModifications() {
if(checkpointerOffForBatchWrites) {
synchronized(this) {
numOutstandingBatchWriteJobs--;
// turn the checkpointer back on if the last job finishes
if(numOutstandingBatchWriteJobs == 0) {
logger.info("Turning checkpointer on");
EnvironmentMutableConfig mConfig = environment.getMutableConfig();
mConfig.setConfigParam(EnvironmentConfig.ENV_RUN_CHECKPOINTER,
Boolean.toString(true));
environment.setMutableConfig(mConfig);
return true;
}
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,14 @@ public boolean isPartitionAware() {
public boolean isPartitionScanSupported() {
return false;
}

@Override
public boolean beginBatchModifications() {
return false;
}

@Override
public boolean endBatchModifications() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -348,4 +348,14 @@ public boolean isPartitionScanSupported() {
return false;
}

@Override
public boolean beginBatchModifications() {
return false;
}

@Override
public boolean endBatchModifications() {
return false;
}

}
10 changes: 10 additions & 0 deletions src/java/voldemort/store/memory/InMemoryStorageEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -278,4 +278,14 @@ public boolean isPartitionAware() {
public boolean isPartitionScanSupported() {
return false;
}

@Override
public boolean beginBatchModifications() {
return false;
}

@Override
public boolean endBatchModifications() {
return false;
}
}
10 changes: 10 additions & 0 deletions src/java/voldemort/store/metadata/MetadataStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -658,4 +658,14 @@ public boolean isPartitionAware() {
public boolean isPartitionScanSupported() {
return false;
}

@Override
public boolean beginBatchModifications() {
return false;
}

@Override
public boolean endBatchModifications() {
return false;
}
}
10 changes: 10 additions & 0 deletions src/java/voldemort/store/mysql/MysqlStorageEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -409,4 +409,14 @@ public boolean isPartitionScanSupported() {
// no reason why we cannot do this on MySQL. Will be added later
return false;
}

@Override
public boolean beginBatchModifications() {
return false;
}

@Override
public boolean endBatchModifications() {
return false;
}
}
10 changes: 10 additions & 0 deletions src/java/voldemort/store/readonly/ReadOnlyStorageEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -630,4 +630,14 @@ public boolean isPartitionScanSupported() {
// this should be easy to support, will be added later
return false;
}

@Override
public boolean beginBatchModifications() {
return false;
}

@Override
public boolean endBatchModifications() {
return false;
}
}
10 changes: 10 additions & 0 deletions src/java/voldemort/store/serialized/SerializingStorageEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,14 @@ public boolean isPartitionAware() {
public boolean isPartitionScanSupported() {
return storageEngine.isPartitionScanSupported();
}

@Override
public boolean beginBatchModifications() {
return false;
}

@Override
public boolean endBatchModifications() {
return false;
}
}
10 changes: 10 additions & 0 deletions src/java/voldemort/store/slop/SlopStorageEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,14 @@ public boolean isPartitionAware() {
public boolean isPartitionScanSupported() {
return slopEngine.isPartitionScanSupported();
}

@Override
public boolean beginBatchModifications() {
return slopEngine.beginBatchModifications();
}

@Override
public boolean endBatchModifications() {
return slopEngine.endBatchModifications();
}
}
8 changes: 6 additions & 2 deletions src/java/voldemort/store/stats/SimpleCounter.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ public class SimpleCounter {
*/
long totalEventValueLastInterval;

// We need additional tracking for the end of the second last or penultimate
// interval, since resetting the atomicLong counters would mean we might
// miss some event updates

/**
* Number of events that occurred in the second last interval
* Number of events that occurred in the second last interval.
*/
long numEventsLastLastInterval;

/**
* Sum of all the event values in the the second last interval
* Sum of all the event values in the the second last interval.
*/
long totalEventValueLastLastInterval;

Expand Down
Loading

0 comments on commit ec3b37e

Please sign in to comment.