Permalink
Browse files

Improve client bootstrapping

  • Loading branch information...
1 parent 6488a53 commit a098132e4ccc68235b557572a504ac69a6df3ef8 @dgomezferro committed May 2, 2012
@@ -101,6 +101,21 @@
private ExecutorService executor;
+ public volatile static float tsreqTime;
+ public volatile static int tsreqTot;
+ public volatile static float fabTime;
+ public volatile static int fabTot;
+ public volatile static float cqTime;
+ public volatile static int cqTot;
+ public volatile static float flushTime;
+ public volatile static int flushTot;
+ public volatile static float replyTime;
+ public volatile static int replyTot;
+
+ public volatile static float messageTime;
+
+ public volatile static int messageTot;
+
/**
* Constructor
* @param channelGroup
@@ -152,17 +167,26 @@ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) thr
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
Object msg = e.getMessage();
+ long before = System.nanoTime();
if (msg instanceof TimestampRequest) {
handle((TimestampRequest) msg, ctx);
+ tsreqTot++;
+ tsreqTime += (System.nanoTime() - before) / (float) 1000000;
return;
} else if (msg instanceof CommitRequest) {
handle((CommitRequest) msg, ctx);
+ messageTot++;
+ messageTime += (System.nanoTime() - before) / (float) 1000000;
return;
} else if (msg instanceof FullAbortReport) {
handle((FullAbortReport) msg, ctx);
+ fabTot++;
+ fabTime += (System.nanoTime() - before) / (float) 1000000;
return;
} else if (msg instanceof CommitQueryRequest) {
handle((CommitQueryRequest) msg, ctx);
+ cqTot++;
+ cqTime += (System.nanoTime() - before) / (float) 1000000;
return;
}
}
@@ -208,10 +232,6 @@ public void handle(TimestampRequest msg, ChannelHandlerContext ctx) {
synchronized (sharedMsgBufLock) {
bootstrap = true;
channel = ctx.getChannel();
- channel.write(new CommittedTransactionReport(sharedState.latestStartTimestamp.get(), sharedState.latestCommitTimestamp.get()));
- channel.write(new AbortedTransactionReport(sharedState.latestHalfAbortTimestamp.get()));
- channel.write(new FullAbortReport(sharedState.latestFullAbortTimestamp.get()));
- channel.write(new LargestDeletedTimestampReport(sharedState.largestDeletedTimestamp.get()));
buffer = sharedState.sharedMessageBuffer.new ReadingBuffer(channel);
messageBuffersMap.put(channel, buffer);
channelGroup.add(channel);
@@ -222,6 +242,15 @@ public void handle(TimestampRequest msg, ChannelHandlerContext ctx) {
}
}
if (bootstrap) {
+ synchronized (sharedState) {
+ synchronized (sharedMsgBufLock) {
+ channel.write(new CommittedTransactionReport(sharedState.latestStartTimestamp, sharedState.latestCommitTimestamp));
+ channel.write(new AbortedTransactionReport(sharedState.latestHalfAbortTimestamp));
+ channel.write(new FullAbortReport(sharedState.latestFullAbortTimestamp));
+ channel.write(new LargestDeletedTimestampReport(sharedState.largestDeletedTimestamp));
+ buffer.initializeIndexes();
+ }
+ }
for (AbortedTransaction halfAborted : sharedState.hashmap.halfAborted) {
channel.write(new AbortedTransactionReport(halfAborted.getStartTimestamp()));
}
@@ -256,13 +285,18 @@ public void run() {
}
};
+ public volatile static long abSnapTot;
+
+ public volatile static float abSnapTime;
+
public void createAbortedSnapshot() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream toWAL = new DataOutputStream(baos);
long snapshot = sharedState.hashmap.getAndIncrementAbortedSnapshot();
+ long before = System.nanoTime();
try {
toWAL.writeByte(LoggerProtocol.SNAPSHOT);
toWAL.writeLong(snapshot);
@@ -279,7 +313,12 @@ public void createAbortedSnapshot() {
}
sharedState.addRecord(baos.toByteArray(), noCallback, null);
+ abSnapTot++;
+ abSnapTime += (System.nanoTime() - before) / (float) 1000000;
}
+
+ public volatile static float uncommittedTime = 0;
+ public volatile static long uncommittedTotal = 0;
/**
* Handle the CommitRequest message
@@ -293,7 +332,7 @@ public void handle(CommitRequest msg, ChannelHandlerContext ctx) {
if (msg.startTimestamp < timestampOracle.first()) {
reply.committed = false;
LOG.warn("Aborting transaction after restarting TSO");
- } else if (msg.startTimestamp < sharedState.largestDeletedTimestamp.get()) {
+ } else if (msg.startTimestamp < sharedState.largestDeletedTimestamp) {
// Too old
reply.committed = false;//set as abort
LOG.warn("Too old starttimestamp: ST "+ msg.startTimestamp +" MAX " + sharedState.largestDeletedTimestamp);
@@ -305,7 +344,7 @@ public void handle(CommitRequest msg, ChannelHandlerContext ctx) {
if (value != 0 && value > msg.startTimestamp) {
reply.committed = false;//set as abort
break;
- } else if (value == 0 && sharedState.largestDeletedTimestamp.get() > msg.startTimestamp) {
+ } else if (value == 0 && sharedState.largestDeletedTimestamp > msg.startTimestamp) {
//then it could have been committed after start timestamp but deleted by recycling
LOG.warn("Old transaction {Start timestamp " + msg.startTimestamp + "} {Largest deleted timestamp " + sharedState.largestDeletedTimestamp + "}");
reply.committed = false;//set as abort
@@ -329,33 +368,36 @@ public void handle(CommitRequest msg, ChannelHandlerContext ctx) {
toWAL.writeLong(msg.startTimestamp);
toWAL.writeLong(commitTimestamp);
- long oldLargestDeletedTimestamp = sharedState.largestDeletedTimestamp.get();
+ long oldLargestDeletedTimestamp = sharedState.largestDeletedTimestamp;
for (RowKey r: msg.rows) {
- sharedState.largestDeletedTimestamp.set(sharedState.hashmap.put(r.getRow(),
+ sharedState.largestDeletedTimestamp = sharedState.hashmap.put(r.getRow(),
r.getTable(),
commitTimestamp,
r.hashCode(),
- oldLargestDeletedTimestamp));
+ oldLargestDeletedTimestamp);
}
sharedState.processCommit(msg.startTimestamp, commitTimestamp);
- if (sharedState.largestDeletedTimestamp.get() > oldLargestDeletedTimestamp) {
+ if (sharedState.largestDeletedTimestamp > oldLargestDeletedTimestamp) {
toWAL.writeByte(LoggerProtocol.LARGESTDELETEDTIMESTAMP);
- toWAL.writeLong(sharedState.largestDeletedTimestamp.get());
- Set<Long> toAbort = sharedState.uncommited.raiseLargestDeletedTransaction(sharedState.largestDeletedTimestamp.get());
+ toWAL.writeLong(sharedState.largestDeletedTimestamp);
+ uncommittedTotal++;
+ long before = System.nanoTime();
+ Set<Long> toAbort = sharedState.uncommited.raiseLargestDeletedTransaction(sharedState.largestDeletedTimestamp);
+ uncommittedTime += ((System.nanoTime() - before)) / (float) 1000000;
synchronized (sharedMsgBufLock) {
for (Long id : toAbort) {
sharedState.hashmap.setHalfAborted(id);
queueHalfAbort(id);
}
- queueLargestIncrease(sharedState.largestDeletedTimestamp.get());
+ queueLargestIncrease(sharedState.largestDeletedTimestamp);
}
}
- if (sharedState.largestDeletedTimestamp.get() > sharedState.previousLargestDeletedTimestamp.get() + TSOState.MAX_COMMITS * 2) {
+ if (sharedState.largestDeletedTimestamp > sharedState.previousLargestDeletedTimestamp + TSOState.MAX_ITEMS) {
executor.submit(createAbortedSnaphostTask);
// schedule snapshot
- sharedState.previousLargestDeletedTimestamp.set(sharedState.largestDeletedTimestamp.get());
+ sharedState.previousLargestDeletedTimestamp = sharedState.largestDeletedTimestamp;
}
synchronized (sharedMsgBufLock) {
queueCommit(msg.startTimestamp, commitTimestamp);
@@ -398,11 +440,14 @@ public void addRecordComplete(int rc, Object ctx) {
} else {
synchronized (callbackLock) {
+ long before = System.nanoTime();
@SuppressWarnings("unchecked")
ArrayList<ChannelandMessage> theBatch = (ArrayList<ChannelandMessage>) ctx;
for (ChannelandMessage cam : theBatch) {
Channels.write(cam.ctx, Channels.succeededFuture(cam.ctx.getChannel()), cam.msg);
}
+ replyTot++;
+ replyTime += (System.nanoTime() - before) / (float) 1000000;
}
}
@@ -449,6 +494,7 @@ else if (sharedState.uncommited.isUncommited(msg.queryTimestamp))
}
public void flush() {
+ long before = System.nanoTime();
synchronized (sharedState) {
if(LOG.isTraceEnabled()){
LOG.trace("Adding record, size: " + sharedState.baos.size());
@@ -477,6 +523,8 @@ public void addRecordComplete(int rc, Object ctx) {
flushFuture = scheduledExecutor.schedule(flushThread, TSOState.FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
}
}
+ flushTot++;
+ flushTime += (System.nanoTime() - before) / (float) 1000000;
}
public class FlushThread implements Runnable {
@@ -24,6 +24,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
@@ -35,6 +36,7 @@
import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
+import org.jboss.netty.util.ObjectSizeEstimator;
import com.yahoo.omid.tso.persistence.BookKeeperStateBuilder;
import com.yahoo.omid.tso.persistence.LoggerAsyncCallback.AddRecordCallback;
@@ -111,7 +113,12 @@ public void run() {
int maxThreads = 5;
// Memory limitation: 1MB by channel, 1GB global, 100 ms of timeout
ThreadPoolExecutor pipelineExecutor = new OrderedMemoryAwareThreadPoolExecutor(maxThreads, 1048576, 1073741824,
- 100, TimeUnit.MILLISECONDS, Executors.defaultThreadFactory());
+ 100, TimeUnit.MILLISECONDS, new ObjectSizeEstimator() {
+ @Override
+ public int estimateSize(Object o) {
+ return 1000;
+ }
+ }, Executors.defaultThreadFactory());
// This is the only object of timestamp oracle
// TODO: make it singleton
@@ -141,11 +148,21 @@ public void addRecordComplete(int rc, Object ctx) {
bootstrap.setPipelineFactory(new TSOPipelineFactory(pipelineExecutor, handler));
bootstrap.setOption("tcpNoDelay", false);
+ //setting buffer size can improve I/O
+ bootstrap.setOption("child.sendBufferSize", 1048576);
+ bootstrap.setOption("child.receiveBufferSize", 1048576);
+ // better to have an receive buffer predictor
+ bootstrap.setOption("receiveBufferSizePredictorFactory",
+ new AdaptiveReceiveBufferSizePredictorFactory());
+ //if the server is sending 1000 messages per sec, optimum write buffer water marks will
+ //prevent unnecessary throttling, Check NioSocketChannelConfig doc
+ bootstrap.setOption("writeBufferLowWaterMark", 32 * 1024);
+ bootstrap.setOption("writeBufferHighWaterMark", 64 * 1024);
+
bootstrap.setOption("child.tcpNoDelay", false);
bootstrap.setOption("child.keepAlive", true);
bootstrap.setOption("child.reuseAddress", true);
bootstrap.setOption("child.connectTimeoutMillis", 60000);
- bootstrap.setOption("readWriteFair", true);
// *** Start the Netty running ***
@@ -65,9 +65,12 @@
public ReadingBuffer(Channel channel) {
this.channel = channel;
- this.readingBuffer = currentBuffer.reading(this);
- this.readBuffer = readingBuffer.buffer;
- this.readerIndex = readBuffer.writerIndex();
+ }
+
+ public void initializeIndexes() {
+ this.readingBuffer = currentBuffer.reading(this);
+ this.readBuffer = readingBuffer.buffer;
+ this.readerIndex = readBuffer.writerIndex();
}
public void flush() {
@@ -175,8 +178,8 @@ public void writeCommit(long startTimestamp, long commitTimestamp) {
++_Coms;
++_Writes;
int readBefore = writeBuffer.readableBytes();
- long startDiff = startTimestamp - state.latestStartTimestamp.get();
- long commitDiff = commitTimestamp - state.latestCommitTimestamp.get();
+ long startDiff = startTimestamp - state.latestStartTimestamp;
+ long commitDiff = commitTimestamp - state.latestCommitTimestamp;
if (commitDiff == 1 && startDiff >= -32 && startDiff <= 31) {
++_1B;
startDiff &= 0x3f;
@@ -230,8 +233,8 @@ public void writeCommit(long startTimestamp, long commitTimestamp) {
_Avg2 += (written - _Avg2) / _Writes;
_Avg += (written - _Avg) / _Coms;
- state.latestStartTimestamp.set(startTimestamp);
- state.latestCommitTimestamp.set(commitTimestamp);
+ state.latestStartTimestamp = startTimestamp;
+ state.latestCommitTimestamp = commitTimestamp;
}
public void writeHalfAbort(long startTimestamp) {
@@ -240,7 +243,7 @@ public void writeHalfAbort(long startTimestamp) {
}
++_Writes;
int readBefore = writeBuffer.readableBytes();
- long diff = startTimestamp - state.latestHalfAbortTimestamp.get();
+ long diff = startTimestamp - state.latestHalfAbortTimestamp;
if (diff >= -16 && diff <= 15) {
writeBuffer.writeByte((byte)((diff & 0x1f) | (0x40)));
} else if (diff >= Byte.MIN_VALUE && diff <= Byte.MAX_VALUE) {
@@ -252,7 +255,7 @@ public void writeHalfAbort(long startTimestamp) {
}
++_ha;
- state.latestHalfAbortTimestamp.set(startTimestamp);
+ state.latestHalfAbortTimestamp = startTimestamp;
int written = writeBuffer.readableBytes() - readBefore;
_Avg2 += (written - _Avg2) / _Writes;
}
@@ -263,7 +266,7 @@ public void writeFullAbort(long startTimestamp) {
}
++_Writes;
int readBefore = writeBuffer.readableBytes();
- long diff = startTimestamp - state.latestFullAbortTimestamp.get();
+ long diff = startTimestamp - state.latestFullAbortTimestamp;
if (diff >= -16 && diff <= 15) {
writeBuffer.writeByte((byte)((diff & 0x1f) | (0x60)));
} else if (diff >= Byte.MIN_VALUE && diff <= Byte.MAX_VALUE) {
@@ -275,7 +278,7 @@ public void writeFullAbort(long startTimestamp) {
}
++_fa;
- state.latestFullAbortTimestamp.set(startTimestamp);
+ state.latestFullAbortTimestamp = startTimestamp;
int written = writeBuffer.readableBytes() - readBefore;
_Avg2 += (written - _Avg2) / _Writes;
}
@@ -103,12 +103,12 @@ protected TimestampOracle getSO(){
/**
* Largest Deleted Timestamp
*/
- public AtomicLong largestDeletedTimestamp = new AtomicLong();
- public AtomicLong previousLargestDeletedTimestamp = new AtomicLong();
- public AtomicLong latestCommitTimestamp = new AtomicLong();
- public AtomicLong latestStartTimestamp = new AtomicLong();
- public AtomicLong latestHalfAbortTimestamp = new AtomicLong();
- public AtomicLong latestFullAbortTimestamp = new AtomicLong();
+ public long largestDeletedTimestamp = 0;
+ public long previousLargestDeletedTimestamp = 0;
+ public long latestCommitTimestamp = 0;
+ public long latestStartTimestamp = 0;
+ public long latestHalfAbortTimestamp = 0;
+ public long latestFullAbortTimestamp = 0;
public TSOSharedMessageBuffer sharedMessageBuffer = new TSOSharedMessageBuffer(this);
@@ -126,7 +126,7 @@ protected TimestampOracle getSO(){
* @param startTimestamp
*/
protected synchronized void processCommit(long startTimestamp, long commitTimestamp){
- largestDeletedTimestamp.set(hashmap.setCommitted(startTimestamp, commitTimestamp, largestDeletedTimestamp.get()));
+ largestDeletedTimestamp = hashmap.setCommitted(startTimestamp, commitTimestamp, largestDeletedTimestamp);
}
/**
@@ -135,7 +135,7 @@ protected synchronized void processCommit(long startTimestamp, long commitTimest
* @param largestDeletedTimestamp
*/
protected synchronized void processLargestDeletedTimestamp(long largestDeletedTimestamp){
- this.largestDeletedTimestamp.set(Math.max(largestDeletedTimestamp, this.largestDeletedTimestamp.get()));
+ this.largestDeletedTimestamp = Math.max(largestDeletedTimestamp, this.largestDeletedTimestamp);
}
/**
@@ -191,8 +191,8 @@ void stop(){
public TSOState(StateLogger logger, TimestampOracle timestampOracle) {
this.timestampOracle = timestampOracle;
- this.previousLargestDeletedTimestamp.set(this.timestampOracle.get());
- this.largestDeletedTimestamp.set(this.previousLargestDeletedTimestamp.get());
+ this.previousLargestDeletedTimestamp = this.timestampOracle.get();
+ this.largestDeletedTimestamp = this.previousLargestDeletedTimestamp;
this.uncommited = new Uncommited(timestampOracle.first());
this.logger = logger;
}
Oops, something went wrong.

0 comments on commit a098132

Please sign in to comment.