Skip to content

Commit

Permalink
Cleaned up CommandLogWriter. Added new profiling option.
Browse files Browse the repository at this point in the history
  • Loading branch information
apavlo committed May 15, 2012
1 parent bb9875a commit 80e5023
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 41 deletions.
5 changes: 2 additions & 3 deletions src/frontend/edu/brown/hstore/HStoreSite.java
Expand Up @@ -2050,9 +2050,8 @@ public void sendClientResponse(LocalTransaction ts, ClientResponseImpl cresponse
ts.getClientCallback(), ts.getClientCallback(),
ts.getInitiateTime(), ts.getInitiateTime(),
ts.getRestartCounter()); ts.getRestartCounter());
} else { // if (d) } else if (d) {
LOG.info(String.format("%s - Holding the ClientResponse until logged to disk", ts)); LOG.debug(String.format("%s - Holding the ClientResponse until logged to disk", ts));
//ts.markAsNotDeletable();
} }




Expand Down
14 changes: 12 additions & 2 deletions src/frontend/edu/brown/hstore/conf/HStoreConf.java
Expand Up @@ -323,11 +323,21 @@ public final class SiteConf extends Conf {
) )
public int exec_command_logging_group_commit_timeout; public int exec_command_logging_group_commit_timeout;


@ConfigProperty(
description="If enabled, then the CommandLogWriter will keep track of various internal " +
"profile statistics.",
defaultBoolean=false,
experimental=true
)
public boolean exec_command_logging_profile;

@ConfigProperty( @ConfigProperty(
description="Setting this configuration parameter to true allows clients to " + description="Setting this configuration parameter to true allows clients to " +
"issue ad hoc query requests use the @AdHoc sysproc.", "issue ad hoc query requests use the @AdHoc sysproc. This should be " +
"set to false if you are running benchmarking experiments because it " +
"will reduce the number of threads that are started per HStoreSite.",
defaultBoolean=true, defaultBoolean=true,
experimental=true experimental=false
) )
public boolean exec_adhoc_sql; public boolean exec_adhoc_sql;


Expand Down
88 changes: 58 additions & 30 deletions src/frontend/edu/brown/hstore/wal/CommandLogWriter.java
Expand Up @@ -36,7 +36,6 @@
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.Deflater;


import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.voltdb.ClientResponseImpl; import org.voltdb.ClientResponseImpl;
Expand All @@ -45,18 +44,17 @@
import org.voltdb.messaging.FastSerializer; import org.voltdb.messaging.FastSerializer;
import org.voltdb.utils.CompressionService; import org.voltdb.utils.CompressionService;
import org.voltdb.utils.DBBPool.BBContainer; import org.voltdb.utils.DBBPool.BBContainer;
import org.xerial.snappy.Snappy;


import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcCallback;


import edu.brown.catalog.CatalogUtil;
import edu.brown.hstore.HStoreSite; import edu.brown.hstore.HStoreSite;
import edu.brown.hstore.HStoreThreadManager; import edu.brown.hstore.HStoreThreadManager;
import edu.brown.hstore.conf.HStoreConf; import edu.brown.hstore.conf.HStoreConf;
import edu.brown.hstore.dtxn.LocalTransaction; import edu.brown.hstore.dtxn.LocalTransaction;
import edu.brown.hstore.interfaces.Shutdownable; import edu.brown.hstore.interfaces.Shutdownable;
import edu.brown.logging.LoggerUtil; import edu.brown.logging.LoggerUtil;
import edu.brown.logging.LoggerUtil.LoggerBoolean; import edu.brown.logging.LoggerUtil.LoggerBoolean;
import edu.brown.utils.ProfileMeasurement;


/** /**
* Transaction Command Log Writer * Transaction Command Log Writer
Expand Down Expand Up @@ -193,26 +191,30 @@ public void run() {
} }




final HStoreSite hstore_site; private final HStoreSite hstore_site;
final HStoreConf hstore_conf; private final HStoreConf hstore_conf;
final File outputFile; private final File outputFile;
final FileChannel fstream; private final FileChannel fstream;
final int group_commit_size; private final int group_commit_size;
final FastSerializer singletonSerializer; private final FastSerializer singletonSerializer;
final LogEntry singletonLogEntry; private final LogEntry singletonLogEntry;
final Deflater compresser = new Deflater(); private final AtomicBoolean flushInProgress = new AtomicBoolean(false);
boolean stop = false;
AtomicBoolean flushInProgress = new AtomicBoolean(false);
private final Semaphore swapInProgress; private final Semaphore swapInProgress;
private final AtomicInteger flushReady; private final AtomicInteger flushReady;
private final WriterThread flushThread; private final WriterThread flushThread;
protected Exchanger<EntryBuffer[]> bufferExchange; private final Exchanger<EntryBuffer[]> bufferExchange;
private int commitBatchCounter = 0;
private boolean stop = false;


/** /**
* The log entry buffers (one per partition) * The log entry buffers (one per partition)
*/ */
EntryBuffer entries[]; private EntryBuffer entries[];
EntryBuffer entriesFlushing[]; private EntryBuffer entriesFlushing[];

private final ProfileMeasurement blockedTime;
private final ProfileMeasurement writingTime;
private final ProfileMeasurement networkTime;


/** /**
* Constructor * Constructor
Expand All @@ -225,13 +227,14 @@ public CommandLogWriter(HStoreSite hstore_site, File outputFile) {
this.outputFile = outputFile; this.outputFile = outputFile;
this.singletonSerializer = new FastSerializer(true, true); this.singletonSerializer = new FastSerializer(true, true);
this.group_commit_size = Math.max(1, hstore_conf.site.exec_command_logging_group_commit); //Group commit threshold, or 1 if group commit is turned off this.group_commit_size = Math.max(1, hstore_conf.site.exec_command_logging_group_commit); //Group commit threshold, or 1 if group commit is turned off


if (hstore_conf.site.exec_command_logging_group_commit > 0) { if (hstore_conf.site.exec_command_logging_group_commit > 0) {
this.swapInProgress = new Semaphore(group_commit_size, false); //False = not fair this.swapInProgress = new Semaphore(group_commit_size, false); //False = not fair
this.flushReady = new AtomicInteger(0); this.flushReady = new AtomicInteger(0);
this.bufferExchange = new Exchanger<EntryBuffer[]>(); this.bufferExchange = new Exchanger<EntryBuffer[]>();
// Make one entry buffer per partition SO THAT SYNCHRONIZATION ON EACH BUFFER IS NOT REQUIRED // Make one entry buffer per partition SO THAT SYNCHRONIZATION ON EACH BUFFER IS NOT REQUIRED
int num_partitions = hstore_site.getLocalPartitionIds().size();//CatalogUtil.getNumberOfPartitions(hstore_site.getDatabase()); int num_partitions = hstore_site.getLocalPartitionIds().size();//CatalogUtil.getNumberOfPartitions(hstore_site.getDatabase());
LOG.info("local partitions on " + hstore_site.getSiteName() + " : " + num_partitions);
this.entries = new EntryBuffer[num_partitions]; this.entries = new EntryBuffer[num_partitions];
this.entriesFlushing = new EntryBuffer[num_partitions]; this.entriesFlushing = new EntryBuffer[num_partitions];
for (int partition = 0; partition < num_partitions; partition++) { for (int partition = 0; partition < num_partitions; partition++) {
Expand Down Expand Up @@ -268,6 +271,17 @@ public CommandLogWriter(HStoreSite hstore_site, File outputFile) {
if (hstore_conf.site.exec_command_logging_group_commit > 0) { if (hstore_conf.site.exec_command_logging_group_commit > 0) {
this.flushThread.start(); this.flushThread.start();
} }

// Writer Profiling
if (hstore_conf.site.exec_command_logging_profile) {
this.writingTime = new ProfileMeasurement("WRITING");
this.blockedTime = new ProfileMeasurement("BLOCKED");
this.networkTime = new ProfileMeasurement("NETWORK");
} else {
this.writingTime = null;
this.blockedTime = null;
this.networkTime = null;
}
} }




Expand Down Expand Up @@ -315,8 +329,8 @@ public boolean writeHeader() {
} // FOR } // FOR


BBContainer b = this.singletonSerializer.getBBContainer(); BBContainer b = this.singletonSerializer.getBBContainer();
fstream.write(b.b.asReadOnlyBuffer()); this.fstream.write(b.b.asReadOnlyBuffer());
fstream.force(true); this.fstream.force(true);
} catch (Exception e) { } catch (Exception e) {
String message = "Failed to write log headers"; String message = "Failed to write log headers";
throw new ServerFaultException(message, e); throw new ServerFaultException(message, e);
Expand All @@ -330,17 +344,22 @@ public boolean writeHeader() {
* @param eb * @param eb
*/ */
public void groupCommit(EntryBuffer[] eb) { public void groupCommit(EntryBuffer[] eb) {
if (hstore_conf.site.exec_command_logging_profile) this.writingTime.start();
this.commitBatchCounter++;

//Write all to a single FastSerializer buffer //Write all to a single FastSerializer buffer
this.singletonSerializer.clear(); this.singletonSerializer.clear();
int txnCounter = 0;
for (int i = 0; i < eb.length; i++) { for (int i = 0; i < eb.length; i++) {
EntryBuffer buffer = eb[i]; EntryBuffer buffer = eb[i];
try { try {
assert(this.singletonSerializer != null); assert(this.singletonSerializer != null);
int start = buffer.getStart(); int start = buffer.getStart();
for (int j = 0; j < buffer.getSize(); j++) { for (int j = 0, size = buffer.getSize(); j < size; j++) {
WriterLogEntry entry = buffer.buffer[(start + j) % buffer.buffer.length]; WriterLogEntry entry = buffer.buffer[(start + j) % buffer.buffer.length];
this.singletonSerializer.writeObject(entry); this.singletonSerializer.writeObject(entry);
} txnCounter++;
} // FOR


} catch (Exception e) { } catch (Exception e) {
String message = "Failed to serialize buffer during group commit"; String message = "Failed to serialize buffer during group commit";
Expand All @@ -357,19 +376,24 @@ public void groupCommit(EntryBuffer[] eb) {
throw new RuntimeException("Failed to compress WAL buffer"); throw new RuntimeException("Failed to compress WAL buffer");
} }


if (debug.get()) LOG.info(String.format("Writing out %d bytes for %d txns [batchCtr=%d]",
compressed.limit(), txnCounter, this.commitBatchCounter));
try { try {
fstream.write(compressed); this.fstream.write(compressed);
fstream.force(true); this.fstream.force(true);
} catch (IOException ex) { } catch (IOException ex) {
String message = "Failed to group commit for buffer"; String message = "Failed to group commit for buffer";
throw new ServerFaultException(message, ex); throw new ServerFaultException(message, ex);
} finally {
if (hstore_conf.site.exec_command_logging_profile) this.writingTime.stop();
} }


//Send responses // Send responses
if (hstore_conf.site.exec_command_logging_profile) this.networkTime.start();
for (int i = 0; i < eb.length; i++) { for (int i = 0; i < eb.length; i++) {
EntryBuffer buffer = eb[i]; EntryBuffer buffer = eb[i];
int start = buffer.getStart(); int start = buffer.getStart();
for (int j = 0; j < buffer.getSize(); j++) { for (int j = 0, size = buffer.getSize(); j < size; j++) {
WriterLogEntry entry = buffer.buffer[(start + j) % buffer.buffer.length]; WriterLogEntry entry = buffer.buffer[(start + j) % buffer.buffer.length];
hstore_site.sendClientResponse(entry.cresponse, hstore_site.sendClientResponse(entry.cresponse,
entry.clientCallback, entry.clientCallback,
Expand All @@ -378,7 +402,8 @@ public void groupCommit(EntryBuffer[] eb) {


} }
buffer.flushCleanup(); buffer.flushCleanup();
} } // FOR
if (hstore_conf.site.exec_command_logging_profile) this.networkTime.stop();
} }


/** /**
Expand Down Expand Up @@ -420,16 +445,19 @@ public boolean appendToLog(final LocalTransaction ts, final ClientResponseImpl c
//because we have reached the threshold for acquiring slots AND the //because we have reached the threshold for acquiring slots AND the
//same number have finished filling their slots. No one will acquire new //same number have finished filling their slots. No one will acquire new
//slots until all buffers have been exchanged for clean ones. //slots until all buffers have been exchanged for clean ones.
if (hstore_conf.site.exec_command_logging_profile) this.blockedTime.start();
try { try {
this.flushInProgress.set(true); this.flushInProgress.set(true);
entries = bufferExchange.exchange(entries); this.entries = this.bufferExchange.exchange(entries);
//XXX: As soon as we have a new empty buffer, we can reset the count //XXX: As soon as we have a new empty buffer, we can reset the count
//and release the semaphore permits to continue. NOTE: THESE MUST GO //and release the semaphore permits to continue. NOTE: THESE MUST GO
//IN THE CORRECT ORDER FOR PROPER REASONING //IN THE CORRECT ORDER FOR PROPER REASONING
assert(flushReady.compareAndSet(hstore_conf.site.exec_command_logging_group_commit, 0) == true); //Sanity check assert(flushReady.compareAndSet(hstore_conf.site.exec_command_logging_group_commit, 0) == true); //Sanity check
swapInProgress.release(hstore_conf.site.exec_command_logging_group_commit); this.swapInProgress.release(hstore_conf.site.exec_command_logging_group_commit);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException("[WAL] Thread interrupted while waiting for WriterThread to finish writing"); throw new RuntimeException("[WAL] Thread interrupted while waiting for WriterThread to finish writing");
} finally {
if (hstore_conf.site.exec_command_logging_profile) this.blockedTime.stop();
} }
} }
// We always want to set this to false because our flush thread will be the // We always want to set this to false because our flush thread will be the
Expand All @@ -443,8 +471,8 @@ public boolean appendToLog(final LocalTransaction ts, final ClientResponseImpl c
this.singletonLogEntry.init(ts); this.singletonLogEntry.init(ts);
fs.writeObject(this.singletonLogEntry); fs.writeObject(this.singletonLogEntry);
BBContainer b = fs.getBBContainer(); BBContainer b = fs.getBBContainer();
fstream.write(b.b.asReadOnlyBuffer()); this.fstream.write(b.b.asReadOnlyBuffer());
fstream.force(true); this.fstream.force(true);
this.singletonLogEntry.finish(); this.singletonLogEntry.finish();
} catch (Exception e) { } catch (Exception e) {
String message = "Failed to write single log entry for " + ts.toString(); String message = "Failed to write single log entry for " + ts.toString();
Expand Down
12 changes: 6 additions & 6 deletions tests/frontend/edu/brown/hstore/wal/TestCommandLogger.java
Expand Up @@ -35,10 +35,7 @@
import org.voltdb.catalog.Procedure; import org.voltdb.catalog.Procedure;
import org.voltdb.catalog.Site; import org.voltdb.catalog.Site;


import com.google.protobuf.RpcCallback;

import edu.brown.BaseTestCase; import edu.brown.BaseTestCase;
import edu.brown.benchmark.tm1.procedures.InsertSubscriber;
import edu.brown.benchmark.tm1.procedures.UpdateLocation; import edu.brown.benchmark.tm1.procedures.UpdateLocation;
import edu.brown.benchmark.tm1.procedures.UpdateSubscriberData; import edu.brown.benchmark.tm1.procedures.UpdateSubscriberData;
import edu.brown.catalog.CatalogUtil; import edu.brown.catalog.CatalogUtil;
Expand All @@ -60,7 +57,12 @@ public class TestCommandLogger extends BaseTestCase {


static final AtomicLong TXN_ID = new AtomicLong(1000); static final AtomicLong TXN_ID = new AtomicLong(1000);
static final int BASE_PARTITION = 0; static final int BASE_PARTITION = 0;
static final Class<? extends VoltProcedure>[] TARGET_PROC = (Class<? extends VoltProcedure> []) new Class[2];
@SuppressWarnings("unchecked")
static final Class<? extends VoltProcedure>[] TARGET_PROC = (Class<? extends VoltProcedure>[])new Class<?>[]{
UpdateLocation.class,
UpdateSubscriberData.class
};
static final Object TARGET_PARAMS[][] = new Object[][]{{ 12345l, "ABCDEF"},{ 666l, 777l, 888l, 999l}}; static final Object TARGET_PARAMS[][] = new Object[][]{{ 12345l, "ABCDEF"},{ 666l, 777l, 888l, 999l}};


HStoreSite hstore_site; HStoreSite hstore_site;
Expand All @@ -71,8 +73,6 @@ public class TestCommandLogger extends BaseTestCase {
@Override @Override
protected void setUp() throws Exception { protected void setUp() throws Exception {
super.setUp(ProjectType.TM1); super.setUp(ProjectType.TM1);
TARGET_PROC[0] = UpdateLocation.class;
TARGET_PROC[1] = UpdateSubscriberData.class;
this.catalog_proc = new Procedure[2]; this.catalog_proc = new Procedure[2];
this.catalog_proc[0] = this.getProcedure(TARGET_PROC[0]); this.catalog_proc[0] = this.getProcedure(TARGET_PROC[0]);
this.catalog_proc[1] = this.getProcedure(TARGET_PROC[1]); this.catalog_proc[1] = this.getProcedure(TARGET_PROC[1]);
Expand Down

0 comments on commit 80e5023

Please sign in to comment.