Browse files

Changed command logging to use only timeout-based group commits

  • Loading branch information...
1 parent c71fe7a commit d53afccc1705e90ea261be934ac487c79823bb30 @jdebrabant jdebrabant committed Jun 19, 2012
Showing with 128 additions and 84 deletions.
  1. +2 −0 log4j.properties
  2. +126 −84 src/frontend/edu/brown/hstore/wal/CommandLogWriter.java
View
2 log4j.properties
@@ -23,6 +23,8 @@ log4j.logger.edu.brown.hstore.handlers=INFO
log4j.logger.edu.brown.hstore.dispatchers=INFO
log4j.logger.edu.brown.catalog=INFO
log4j.logger.edu.brown.utils=INFO
+log4j.logger.edu.brown.hstore.wal=INFO
+
## VoltDB Stuff
log4j.logger.org.voltdb.VoltProcedure=INFO
View
210 src/frontend/edu/brown/hstore/wal/CommandLogWriter.java
@@ -61,11 +61,14 @@
* Transaction Command Log Writer
* @author mkirsch
* @author pavlo
+ * @author debrabant
*/
public class CommandLogWriter implements Shutdownable {
private static final Logger LOG = Logger.getLogger(CommandLogWriter.class);
+
private final static LoggerBoolean debug = new LoggerBoolean(LOG.isDebugEnabled());
- private final static LoggerBoolean trace = new LoggerBoolean(LOG.isTraceEnabled());
+ private final static LoggerBoolean trace = new LoggerBoolean(LOG.isTraceEnabled());
+
static {
LoggerUtil.attachObserver(LOG, debug, trace);
}
@@ -160,47 +163,65 @@ public void run() {
self.setName(HStoreThreadManager.getThreadName(hstore_site, HStoreConstants.THREAD_NAME_COMMANDLOGGER));
while (!stop) {
- try {
- entriesFlushing = bufferExchange.exchange(entriesFlushing, hstore_conf.site.exec_command_logging_group_commit_timeout, TimeUnit.MILLISECONDS);
- groupCommit(entriesFlushing); //Group commit is responsible for sending responses, and cleaning up the buffer before its next use
- flushInProgress.set(false);
- } catch (InterruptedException e) {
+ try
+ {
+ // This sync point is designed to timeout, at which point a flush will be initiated
+ entriesFlushing = bufferExchange.exchange(entriesFlushing,
+ hstore_conf.site.exec_command_logging_group_commit_timeout,
+ TimeUnit.MILLISECONDS);
+
+ }
+ catch (InterruptedException e)
+ {
throw new RuntimeException("WAL writer thread interrupted while waiting for a new buffer" + e.getStackTrace().toString());
- } catch (TimeoutException e) {
- //ON TIMEOUT, LOCK DOWN AND GROUP COMMIT NORMAL BUFFER
- // XXX: Need to have the ability to check how long it's been since we've flushed
- // the log to disk. If it's longer than our limit, then we'll want to do that now
- // That ensures that if there is only one client thread issuing txn requests (as is
- // often the case in testing), then it won't be blocked indefinitely.
- // Added a new HStoreConf parameter that defines the time in milliseconds
- // Use EstTime.currentTimeMillis() to get the current time
- flushInProgress.set(true);
- int permsFree = swapInProgress.drainPermits(); //Locks out other threads from starting the process
- if (permsFree == group_commit_size) {
- swapInProgress.release(group_commit_size);
- continue;
- }
- while (flushReady.get() < (group_commit_size - permsFree)) {} //Wait for the in progress slots to fill
- groupCommit(entries);
- //Release IN THE RIGHT ORDER
- assert(flushReady.compareAndSet((group_commit_size - permsFree), 0));
- swapInProgress.release(group_commit_size);
- flushInProgress.set(false);
+ }
+ // this will be thrown when the group commit buffer was not filled before the group_commit_timeout
+ catch (TimeoutException e)
+ {
+ LOG.info("Group commit timeout occurred, writing buffer to disk.");
+
+ try
+ {
+ flushInProgress.set(true);
+ swapBuffers.set(true);
+
+ //LOG.info("permit pool size: " + writingEntry.getQueueLength());
+
+ int free_permits = group_commit_size - writingEntry.drainPermits();
+
+ writingEntry.acquire(free_permits);
+
+ // SYNC POINT: a synchronization point between the thread filling the buffer and the writing thread where a full
+ // buffer is exchanged for an empty one and the full buffer is written out to disk.
+ entriesFlushing = bufferExchange.exchange(entriesFlushing);
+
+ writingEntry.release(group_commit_size);
+
+ groupCommit(entriesFlushing);
+ flushInProgress.set(false);
+
+ }
+ catch (InterruptedException ie)
+ {
+ throw new RuntimeException("WAL writer thread interrupted while waiting for a new buffer" + ie.getStackTrace().toString());
+ }
}
} // WHILE
}
}
-
private final HStoreSite hstore_site;
private final HStoreConf hstore_conf;
private final File outputFile;
private final FileChannel fstream;
private final int group_commit_size;
private final FastSerializer singletonSerializer;
private final LogEntry singletonLogEntry;
+ private final AtomicBoolean swapBuffers = new AtomicBoolean(false);
private final AtomicBoolean flushInProgress = new AtomicBoolean(false);
- private final Semaphore swapInProgress;
+ private final AtomicBoolean swapInProgress = new AtomicBoolean(false);
+ private final Semaphore writingEntry;
+ //private final Semaphore swapInProgress;
private final AtomicInteger flushReady;
private final WriterThread flushThread;
private final Exchanger<EntryBuffer[]> bufferExchange;
@@ -228,12 +249,18 @@ public CommandLogWriter(HStoreSite hstore_site, File outputFile) {
this.outputFile = outputFile;
this.singletonSerializer = new FastSerializer(true, true);
this.group_commit_size = Math.max(1, hstore_conf.site.exec_command_logging_group_commit); //Group commit threshold, or 1 if group commit is turned off
-
-
+
+ LOG.info("group_commit_size: " + hstore_conf.site.exec_command_logging_group_commit);
+ LOG.info("group_commit_timeout: " + hstore_conf.site.exec_command_logging_group_commit_timeout);
+
+ // Configure group commit parameters
if (hstore_conf.site.exec_command_logging_group_commit > 0) {
- this.swapInProgress = new Semaphore(group_commit_size, false); //False = not fair
+
+ //this.swapInProgress = new Semaphore(group_commit_size, false); //False = not fair
+ this.writingEntry = new Semaphore(group_commit_size, false);
this.flushReady = new AtomicInteger(0);
this.bufferExchange = new Exchanger<EntryBuffer[]>();
+
// Make one entry buffer per partition SO THAT SYNCHRONIZATION ON EACH BUFFER IS NOT REQUIRED
int num_partitions = hstore_site.getLocalPartitionIds().size();//CatalogUtil.getNumberOfPartitions(hstore_site.getDatabase());
this.entries = new EntryBuffer[num_partitions];
@@ -247,14 +274,14 @@ public CommandLogWriter(HStoreSite hstore_site, File outputFile) {
this.flushThread = new WriterThread();
this.singletonLogEntry = null;
} else {
- this.swapInProgress = null;
+ this.writingEntry = null;
+ //this.swapInProgress = null;
this.flushReady = null;
this.bufferExchange = null;
this.flushThread = null;
this.singletonLogEntry = new LogEntry();
}
-
FileOutputStream f = null;
try {
this.outputFile.getParentFile().mkdirs();
@@ -300,7 +327,8 @@ public void finishAndPrepareShutdown() {
@Override
public void shutdown() {
- if (debug.get()) LOG.debug("Closing WAL file");
+ if (debug.get())
+ LOG.debug("Closing WAL file");
try {
this.fstream.close();
} catch (IOException ex) {
@@ -314,8 +342,8 @@ public void shutdown() {
public boolean isShuttingDown() {
return (this.stop);
}
-
- public ProfileMeasurement getLoggerWritingTime() {
+
+ public ProfileMeasurement getLoggerWritingTime() {
return this.writingTime;
}
@@ -327,7 +355,6 @@ public ProfileMeasurement getLoggerNetworkTime() {
return this.networkTime;
}
-
public boolean writeHeader() {
if (debug.get()) LOG.debug("Writing out WAL header");
assert(this.singletonSerializer != null);
@@ -360,7 +387,7 @@ public boolean writeHeader() {
public void groupCommit(EntryBuffer[] eb) {
if (hstore_conf.site.exec_command_logging_profile) this.writingTime.start();
this.commitBatchCounter++;
-
+
//Write all to a single FastSerializer buffer
this.singletonSerializer.clear();
int txnCounter = 0;
@@ -390,7 +417,7 @@ public void groupCommit(EntryBuffer[] eb) {
throw new RuntimeException("Failed to compress WAL buffer");
}
- if (debug.get()) LOG.info(String.format("Writing out %d bytes for %d txns [batchCtr=%d]",
+ LOG.info(String.format("Writing out %d bytes for %d txns [batchCtr=%d]",
compressed.limit(), txnCounter, this.commitBatchCounter));
try {
this.fstream.write(compressed);
@@ -402,8 +429,10 @@ public void groupCommit(EntryBuffer[] eb) {
if (hstore_conf.site.exec_command_logging_profile) this.writingTime.stop();
}
- // Send responses
- if (hstore_conf.site.exec_command_logging_profile) this.networkTime.start();
+ if (hstore_conf.site.exec_command_logging_profile)
+ this.networkTime.start();
+
+ // Send responses
for (int i = 0; i < eb.length; i++) {
EntryBuffer buffer = eb[i];
int start = buffer.getStart();
@@ -413,11 +442,15 @@ public void groupCommit(EntryBuffer[] eb) {
entry.clientCallback,
entry.initiateTime,
entry.restartCounter);
+
+ //LOG.info("Sending response for transaction");
}
buffer.flushCleanup();
} // FOR
- if (hstore_conf.site.exec_command_logging_profile) this.networkTime.stop();
+
+ if (hstore_conf.site.exec_command_logging_profile)
+ this.networkTime.stop();
}
/**
@@ -428,57 +461,63 @@ public void groupCommit(EntryBuffer[] eb) {
* @return
*/
public boolean appendToLog(final LocalTransaction ts, final ClientResponseImpl cresponse) {
- if (debug.get()) LOG.debug(ts + " - Writing out WAL entry for committed transaction");
-
+
boolean sendResponse = true;
- if (hstore_conf.site.exec_command_logging_group_commit > 0) { //GROUP COMMIT
+ if (hstore_conf.site.exec_command_logging_group_commit > 0) //GROUP COMMIT
+ {
int basePartition = ts.getBasePartition();
assert(hstore_site.isLocalPartition(basePartition));
basePartition = hstore_site.getLocalPartitionOffset(basePartition);
+ // get the buffer for the partition of the current transaction
EntryBuffer buffer = this.entries[basePartition];
assert(buffer != null) :
"Unexpected log entry buffer for partition " + basePartition;
-
- try {
- swapInProgress.acquire(1); //Will wait if the buffers are being swapped
- } catch (InterruptedException e1) {
- throw new RuntimeException("WAL thread interrupted while waiting for buffers to swap");
- }
-
- //This is guaranteed to be thread-safe because there is only one thread per partition
- LogEntry entry = buffer.next(ts, cresponse);
- assert(entry != null);
-
- int place = 1 + flushReady.getAndIncrement(); //See how quick we were to finish
-
- if (place == hstore_conf.site.exec_command_logging_group_commit) {
- //XXX: We were the last in the group to finish, so we will poke the writer.
- //We know that none of the buffers are currently being written to
- //because we have reached the threshold for acquiring slots AND the
- //same number have finished filling their slots. No one will acquire new
- //slots until all buffers have been exchanged for clean ones.
- if (hstore_conf.site.exec_command_logging_profile) this.blockedTime.start();
- try {
- this.flushInProgress.set(true);
- this.entries = this.bufferExchange.exchange(entries);
- //XXX: As soon as we have a new empty buffer, we can reset the count
- //and release the semaphore permits to continue. NOTE: THESE MUST GO
- //IN THE CORRECT ORDER FOR PROPER REASONING
- assert(flushReady.compareAndSet(hstore_conf.site.exec_command_logging_group_commit, 0) == true); //Sanity check
- this.swapInProgress.release(hstore_conf.site.exec_command_logging_group_commit);
- } catch (InterruptedException e) {
- throw new RuntimeException("[WAL] Thread interrupted while waiting for WriterThread to finish writing");
- } finally {
- if (hstore_conf.site.exec_command_logging_profile) this.blockedTime.stop();
- }
- }
+
+ try
+ {
+ // if a swap is not yet in progress, initiate a swap with the write thread
+ // this ensures exactly one thread initiates the swap with the writer thread
+ if(swapBuffers.compareAndSet(true, false))
+ {
+ //swapInProgress.set(true);
+
+ // SYNC POINT: Will synchronize with writing thread
+ this.entries = this.bufferExchange.exchange(entries);
+
+ //swapInProgress.set(false);
+ }
+
+ // acquire semaphore permit to write a transaction to the log buffer
+ // will wait if buffer is currently being swapped
+ writingEntry.acquire();
+
+ // create an entry for this transaction in the buffer for this partition
+ // NOTE: this is guaranteed to be thread-safe because there is only one thread per partition
+ LogEntry entry = buffer.next(ts, cresponse);
+ assert(entry != null);
+
+ writingEntry.release();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException("[WAL] Thread interrupted while waiting for WriterThread to finish writing");
+ }
+ finally
+ {
+ if (hstore_conf.site.exec_command_logging_profile)
+ this.blockedTime.stop();
+ }
+
// We always want to set this to false because our flush thread will be the
// one that actually sends out the network messages
sendResponse = false;
- } else { //NO GROUP COMMIT -- FINISH AND RETURN TRUE
- try {
+ }
+ else //NO GROUP COMMIT -- FINISH AND RETURN TRUE
+ {
+ try
+ {
FastSerializer fs = this.singletonSerializer;
assert(fs != null);
fs.clear();
@@ -488,13 +527,16 @@ public boolean appendToLog(final LocalTransaction ts, final ClientResponseImpl c
this.fstream.write(b.b.asReadOnlyBuffer());
this.fstream.force(true);
this.singletonLogEntry.finish();
- } catch (Exception e) {
+ }
+ catch (Exception e)
+ {
String message = "Failed to write single log entry for " + ts.toString();
throw new ServerFaultException(message, e, ts.getTransactionId());
}
}
return (sendResponse);
}
-
-}
+}
+
+

0 comments on commit d53afcc

Please sign in to comment.