Skip to content

Commit

Permalink
Reduce coupling around SharedMessageBuffer
Browse files Browse the repository at this point in the history
Flush complexity is much lower as well
  • Loading branch information
dgomezferro committed May 30, 2012
1 parent 9baff1f commit 142b81d
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 143 deletions.
Expand Up @@ -16,47 +16,47 @@

package com.yahoo.omid.replication;

import java.util.TreeSet;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;


public class ReferenceCountedBuffer {
private static final Log LOG = LogFactory.getLog(ReferenceCountedBuffer.class);
public class ReadersAwareBuffer {
private static final Log LOG = LogFactory.getLog(ReadersAwareBuffer.class);

private static final int CAPACITY = 1024*1024;

public ChannelBuffer buffer;
public TreeSet<SharedMessageBuffer.ReadingBuffer> readingBuffers = new TreeSet<SharedMessageBuffer.ReadingBuffer>();
private int pendingWrites = 0;
private int pendingReaders = 0;
private boolean scheduledForPool;

public static long nBuffers;


public ReferenceCountedBuffer() {
this(true);
}

public ReferenceCountedBuffer(boolean allocateBuffer) {

public ReadersAwareBuffer() {
nBuffers++;
LOG.warn("Allocated buffer");
if (allocateBuffer)
buffer = ChannelBuffers.directBuffer(CAPACITY);
buffer = ChannelBuffers.directBuffer(CAPACITY);
}

public ReferenceCountedBuffer reading(SharedMessageBuffer.ReadingBuffer buf) {
readingBuffers.add(buf);
return this;
public synchronized void increaseReaders() {
pendingReaders++;
}

public synchronized void incrementPending() {
++pendingWrites;
public synchronized void decreaseReaders() {
pendingReaders--;
}

public synchronized boolean decrementPending() {
return --pendingWrites == 0;

public synchronized boolean isReadyForPool() {
return scheduledForPool && pendingReaders == 0;
}

public synchronized void scheduleForPool() {
scheduledForPool = true;
}

public synchronized void reset() {
pendingReaders = 0;
scheduledForPool = false;
}
}
158 changes: 65 additions & 93 deletions src/main/java/com/yahoo/omid/replication/SharedMessageBuffer.java
Expand Up @@ -16,102 +16,94 @@

package com.yahoo.omid.replication;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;

import com.yahoo.omid.tso.TSOMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SharedMessageBuffer {

private static final Log LOG = LogFactory.getLog(SharedMessageBuffer.class);
private static final Logger LOG = LoggerFactory.getLogger(SharedMessageBuffer.class);

ReferenceCountedBuffer pastBuffer = new ReferenceCountedBuffer();
ReferenceCountedBuffer currentBuffer = new ReferenceCountedBuffer();
ReadersAwareBuffer pastBuffer = new ReadersAwareBuffer();
ReadersAwareBuffer currentBuffer = new ReadersAwareBuffer();
ChannelBuffer writeBuffer = currentBuffer.buffer;
Deque<ReferenceCountedBuffer> futureBuffer = new ArrayDeque<ReferenceCountedBuffer>();
BlockingQueue<ReadersAwareBuffer> bufferPool = new LinkedBlockingQueue<ReadersAwareBuffer>();
Zipper zipper = new Zipper();
Set<ReadingBuffer> readingBuffers = new TreeSet<SharedMessageBuffer.ReadingBuffer>();

public class ReadingBuffer implements Comparable<ReadingBuffer> {
private ChannelBuffer readBuffer;
private int readerIndex = 0;
private ReferenceCountedBuffer readingBuffer;
private ReadersAwareBuffer readingBuffer;
private ChannelHandlerContext ctx;
private Channel channel;

public ReadingBuffer(Channel channel) {
this.channel = channel;
private ReadingBuffer(ChannelHandlerContext ctx) {
this.channel = ctx.getChannel();
this.ctx = ctx;
}

public void initializeIndexes() {
this.readingBuffer = currentBuffer.reading(this);
this.readingBuffer = currentBuffer;
this.readBuffer = readingBuffer.buffer;
this.readerIndex = readBuffer.writerIndex();
}

public void flush() {
flush(true, false);
}

private void flush(boolean deleteRef, final boolean clearPast) {
/**
* Computes and returns the deltaSO for the associated client.
*
* This function registers some callbacks in the future passed for cleanup purposes, so the ChannelFuture object
* must be notified after the communication is finished.
*
* @param future It registers some callbacks on it
* @return the deltaSO for the associated client
*/
public ChannelBuffer flush(ChannelFuture future) {
int readable = readBuffer.readableBytes() - readerIndex;

if (readable == 0 && readingBuffer != pastBuffer) {
if (wrap) {
Channels.write(channel, tBuffer);
}
return;
return ChannelBuffers.EMPTY_BUFFER;
}

ChannelBuffer temp;
if (wrap && readingBuffer != pastBuffer) {
temp = ChannelBuffers.wrappedBuffer(readBuffer.slice(readerIndex, readable), tBuffer);
} else {
temp = readBuffer.slice(readerIndex, readable);
}
ChannelFuture future = Channels.write(channel, temp);
ChannelBuffer deltaSO = readBuffer.slice(readerIndex, readable);
addFinishedWriteListener(future, readingBuffer);
readingBuffer.increaseReaders();
readerIndex += readable;
if (readingBuffer == pastBuffer) {
readingBuffer = currentBuffer.reading(this);
readingBuffer = currentBuffer;
readBuffer = readingBuffer.buffer;
readerIndex = 0;
readable = readBuffer.readableBytes();
if (wrap) {
temp = ChannelBuffers.wrappedBuffer(readBuffer.slice(readerIndex, readable), tBuffer);
} else {
temp = readBuffer.slice(readerIndex, readable);
}
Channels.write(channel, temp);
deltaSO = readBuffer.slice(readerIndex, readable);
addFinishedWriteListener(future, readingBuffer);
readingBuffer.increaseReaders();
readerIndex += readable;
if (deleteRef) {
pastBuffer.readingBuffers.remove(this);
}
pastBuffer.incrementPending();
final ReferenceCountedBuffer pendingBuffer = pastBuffer;
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (clearPast) {
pendingBuffer.readingBuffers.clear();
}
if (pendingBuffer.decrementPending() && pendingBuffer.readingBuffers.size() == 0) {
pendingBuffer.buffer.clear();
synchronized (futureBuffer) {
futureBuffer.add(pendingBuffer);
}
}
}
});
}

return deltaSO;
}

private void addFinishedWriteListener(ChannelFuture future, final ReadersAwareBuffer buffer) {
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
buffer.decreaseReaders();
if (buffer.isReadyForPool()) {
bufferPool.add(buffer);
}
}
});
}

public ZipperState getZipperState() {
Expand All @@ -133,18 +125,10 @@ public boolean equals(Object obj) {

}

private ChannelBuffer tBuffer;
private boolean wrap = false;

public void writeTimestamp(long timestamp) {
wrap = true;
tBuffer = ChannelBuffers.buffer(9);
tBuffer.writeByte(TSOMessage.TimestampResponse);
tBuffer.writeLong(timestamp);
}

public void rollBackTimestamp() {
wrap = false;
public ReadingBuffer getReadingBuffer(ChannelHandlerContext ctx) {
ReadingBuffer rb = new ReadingBuffer(ctx);
readingBuffers.add(rb);
return rb;
}

public void writeCommit(long startTimestamp, long commitTimestamp) {
Expand Down Expand Up @@ -177,35 +161,23 @@ public void writeLargestIncrease(long largestTimestamp) {

private void nextBuffer() {
LOG.debug("Switching buffers");
Iterator<ReadingBuffer> it = pastBuffer.readingBuffers.iterator();
boolean moreBuffers = it.hasNext();
while (moreBuffers) {
ReadingBuffer buf = it.next();
moreBuffers = it.hasNext();
buf.flush(false, !moreBuffers);
}

pastBuffer = currentBuffer;
currentBuffer = null;
synchronized (futureBuffer) {
if (!futureBuffer.isEmpty()) {
currentBuffer = futureBuffer.removeLast();
// mark past buffer as scheduled for pool when all pending operations finish
pastBuffer.scheduleForPool();

for (final ReadingBuffer rb : readingBuffers) {
if (rb.readingBuffer == pastBuffer) {
ChannelFuture future = Channels.future(rb.channel);
ChannelBuffer cb = rb.flush(future);
Channels.write(rb.ctx, future, cb);
}
}

pastBuffer = currentBuffer;
currentBuffer = bufferPool.poll();
if (currentBuffer == null) {
currentBuffer = new ReferenceCountedBuffer();
currentBuffer = new ReadersAwareBuffer();
}
writeBuffer = currentBuffer.buffer;
}

public void reset() {
if (pastBuffer != null) {
pastBuffer.readingBuffers.clear();
pastBuffer.buffer.clear();
}
if (currentBuffer != null) {
currentBuffer.readingBuffers.clear();
currentBuffer.buffer.clear();
}
}
}
18 changes: 9 additions & 9 deletions src/main/java/com/yahoo/omid/tso/BufferPool.java
Expand Up @@ -17,22 +17,22 @@
package com.yahoo.omid.tso;

import java.io.ByteArrayOutputStream;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class BufferPool {

private static Deque<ByteArrayOutputStream> pool = new ArrayDeque<ByteArrayOutputStream>();
private static BlockingQueue<ByteArrayOutputStream> pool = new LinkedBlockingQueue<ByteArrayOutputStream>();

public static synchronized ByteArrayOutputStream getBuffer() {
if (pool.isEmpty()) {
return new ByteArrayOutputStream(1500);
}
return pool.pollLast();
public static ByteArrayOutputStream getBuffer() {
ByteArrayOutputStream baos = pool.poll();
if (baos != null)
return baos;
return new ByteArrayOutputStream(1500);
}


public static synchronized void pushBuffer(ByteArrayOutputStream buffer) {
public static void pushBuffer(ByteArrayOutputStream buffer) {
pool.add(buffer);
}
}
17 changes: 9 additions & 8 deletions src/main/java/com/yahoo/omid/tso/TSOHandler.java
Expand Up @@ -36,6 +36,7 @@
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
Expand All @@ -51,10 +52,9 @@
import com.yahoo.omid.tso.messages.CommitQueryResponse;
import com.yahoo.omid.tso.messages.CommitRequest;
import com.yahoo.omid.tso.messages.CommitResponse;
import com.yahoo.omid.tso.messages.CommittedTransactionReport;
import com.yahoo.omid.tso.messages.FullAbortRequest;
import com.yahoo.omid.tso.messages.LargestDeletedTimestampReport;
import com.yahoo.omid.tso.messages.TimestampRequest;
import com.yahoo.omid.tso.messages.TimestampResponse;
import com.yahoo.omid.tso.persistence.LoggerAsyncCallback.AddRecordCallback;
import com.yahoo.omid.tso.persistence.LoggerException;
import com.yahoo.omid.tso.persistence.LoggerException.Code;
Expand Down Expand Up @@ -199,15 +199,14 @@ public void handle(TimestampRequest msg, ChannelHandlerContext ctx) {
}

ReadingBuffer buffer;
Channel channel = null;
Channel channel = ctx.getChannel();
boolean bootstrap = false;
synchronized (messageBuffersMap) {
buffer = messageBuffersMap.get(ctx.getChannel());
if (buffer == null) {
synchronized (sharedMsgBufLock) {
bootstrap = true;
channel = ctx.getChannel();
buffer = sharedState.sharedMessageBuffer.new ReadingBuffer(channel);
buffer = sharedState.sharedMessageBuffer.getReadingBuffer(ctx);
messageBuffersMap.put(channel, buffer);
channelGroup.add(channel);
LOG.warn("Channel connected: " + messageBuffersMap.size());
Expand All @@ -225,11 +224,13 @@ public void handle(TimestampRequest msg, ChannelHandlerContext ctx) {
channel.write(new AbortedTransactionReport(halfAborted.getStartTimestamp()));
}
}
ChannelBuffer cb;
ChannelFuture future = Channels.future(channel);
synchronized (sharedMsgBufLock) {
sharedState.sharedMessageBuffer.writeTimestamp(timestamp);
buffer.flush();
sharedState.sharedMessageBuffer.rollBackTimestamp();
cb = buffer.flush(future);
}
Channels.write(ctx, future, cb);
Channels.write(channel, new TimestampResponse(timestamp));
}

ChannelBuffer cb = ChannelBuffers.buffer(10);
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/yahoo/omid/tso/ThroughputMonitor.java
Expand Up @@ -21,7 +21,7 @@

import com.yahoo.omid.client.TSOClient;
import com.yahoo.omid.client.TransactionalTable;
import com.yahoo.omid.replication.ReferenceCountedBuffer;
import com.yahoo.omid.replication.ReadersAwareBuffer;

/**
* Class for Throughput Monitoring
Expand Down Expand Up @@ -72,7 +72,7 @@ public void run() {
TSOPipelineFactory.bwhandler != null ? TSOPipelineFactory.bwhandler.getBytesSentPerSecond() / (double) (1024 * 1024) : 0,
state.largestDeletedTimestamp,
newQueries - oldQueries,
ReferenceCountedBuffer.nBuffers
ReadersAwareBuffer.nBuffers
)
);

Expand Down

0 comments on commit 142b81d

Please sign in to comment.