Permalink
Browse files

Merge pull request #16 from dgomezferro/replication

Reduce coupling around Shared Message Buffer
  • Loading branch information...
2 parents dfcc588 + 12d2337 commit f6f7b87fd389603a34bce90d8d1cb5ab6a45b153 @dgomezferro committed Nov 23, 2012
Showing with 1,202 additions and 1,046 deletions.
  1. +1 −9 pom.xml
  2. +10 −9 src/main/java/com/yahoo/omid/client/TSOClient.java
  3. +25 −24 src/main/java/com/yahoo/omid/{tso/TSOBuffer.java → replication/ReadersAwareBuffer.java}
  4. +188 −0 src/main/java/com/yahoo/omid/replication/SharedMessageBuffer.java
  5. +273 −0 src/main/java/com/yahoo/omid/replication/Zipper.java
  6. +86 −0 src/main/java/com/yahoo/omid/replication/ZipperState.java
  7. +9 −9 src/main/java/com/yahoo/omid/tso/BufferPool.java
  8. +1 −2 src/main/java/com/yahoo/omid/tso/RowKey.java
  9. +22 −17 src/main/java/com/yahoo/omid/tso/TSOHandler.java
  10. +8 −7 src/main/java/com/yahoo/omid/tso/TSOMessage.java
  11. +2 −2 src/main/java/com/yahoo/omid/tso/TSOMessageBuffer.java
  12. +1 −1 src/main/java/com/yahoo/omid/tso/TSOPipelineFactory.java
  13. +0 −337 src/main/java/com/yahoo/omid/tso/TSOSharedMessageBuffer.java
  14. +2 −5 src/main/java/com/yahoo/omid/tso/TSOState.java
  15. +7 −150 src/main/java/com/yahoo/omid/tso/ThroughputMonitor.java
  16. +1 −1 src/main/java/com/yahoo/omid/tso/messages/AbortRequest.java
  17. +53 −35 src/main/java/com/yahoo/omid/tso/messages/AbortedTransactionReport.java
  18. +86 −0 src/main/java/com/yahoo/omid/tso/messages/CleanedTransactionReport.java
  19. +1 −7 src/main/java/com/yahoo/omid/tso/messages/CommitQueryRequest.java
  20. +1 −2 src/main/java/com/yahoo/omid/tso/messages/CommitQueryResponse.java
  21. +1 −16 src/main/java/com/yahoo/omid/tso/messages/CommitRequest.java
  22. +1 −2 src/main/java/com/yahoo/omid/tso/messages/CommitResponse.java
  23. +54 −104 src/main/java/com/yahoo/omid/tso/messages/CommittedTransactionReport.java
  24. +16 −20 src/main/java/com/yahoo/omid/tso/messages/{FullAbortReport.java → FullAbortRequest.java}
  25. +54 −37 src/main/java/com/yahoo/omid/tso/messages/LargestDeletedTimestampReport.java
  26. +11 −15 src/main/java/com/yahoo/omid/tso/messages/TimestampRequest.java
  27. +2 −7 src/main/java/com/yahoo/omid/tso/messages/TimestampResponse.java
  28. +54 −189 src/main/java/com/yahoo/omid/tso/serialization/TSODecoder.java
  29. +5 −2 src/main/java/com/yahoo/omid/tso/serialization/TSOEncoder.java
  30. +3 −8 src/test/java/com/yahoo/omid/OmidTestBase.java
  31. +136 −0 src/test/java/com/yahoo/omid/replication/TestSharedMessageBuffer.java
  32. +56 −0 src/test/java/com/yahoo/omid/replication/TestZipper.java
  33. +4 −3 src/test/java/com/yahoo/omid/tso/TestBasicTransaction.java
  34. +17 −16 src/test/java/com/yahoo/omid/tso/TestClientHandler.java
  35. +9 −8 src/test/java/com/yahoo/omid/tso/TestCommitAbortedReport.java
  36. +2 −2 src/test/java/com/yahoo/omid/tso/TestCommitQuery.java
View
@@ -34,6 +34,7 @@
<configuration>
<argLine>-Xmx1G</argLine>
<forkMode>pertest</forkMode>
+ <argLine>-Djava.library.path=${basedir}/src/main/native</argLine>
</configuration>
</plugin>
<plugin>
@@ -77,15 +78,6 @@
</execution>
</executions>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.7.1</version>
- <configuration>
- <!-- forkMode>always</forkMode -->
- <argLine>-Djava.library.path=${basedir}/src/main/native</argLine>
- </configuration>
- </plugin>
</plugins>
<pluginManagement>
<plugins>
@@ -49,17 +49,20 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.yahoo.omid.replication.Zipper;
+import com.yahoo.omid.replication.ZipperState;
import com.yahoo.omid.tso.Committed;
import com.yahoo.omid.tso.RowKey;
import com.yahoo.omid.tso.TSOMessage;
import com.yahoo.omid.tso.messages.AbortRequest;
import com.yahoo.omid.tso.messages.AbortedTransactionReport;
+import com.yahoo.omid.tso.messages.CleanedTransactionReport;
import com.yahoo.omid.tso.messages.CommitQueryRequest;
import com.yahoo.omid.tso.messages.CommitQueryResponse;
import com.yahoo.omid.tso.messages.CommitRequest;
import com.yahoo.omid.tso.messages.CommitResponse;
import com.yahoo.omid.tso.messages.CommittedTransactionReport;
-import com.yahoo.omid.tso.messages.FullAbortReport;
+import com.yahoo.omid.tso.messages.FullAbortRequest;
import com.yahoo.omid.tso.messages.LargestDeletedTimestampReport;
import com.yahoo.omid.tso.messages.TimestampRequest;
import com.yahoo.omid.tso.messages.TimestampResponse;
@@ -275,7 +278,7 @@ public void error(Exception e) {
public void execute(Channel channel) {
try {
- FullAbortReport far = new FullAbortReport();
+ FullAbortRequest far = new FullAbortRequest();
far.startTimestamp = transactionId;
ChannelFuture f = channel.write(far);
@@ -410,7 +413,7 @@ public void completeAbort(long transactionId, AbortCompleteCallback cb) throws I
@Override
synchronized
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {
- e.getChannel().getPipeline().addFirst("decoder", new TSODecoder());
+ e.getChannel().getPipeline().addFirst("decoder", new TSODecoder(new Zipper()));
e.getChannel().getPipeline().addAfter("decoder", "encoder",
new TSOEncoder());
}
@@ -481,8 +484,6 @@ public boolean validRead(long transaction, long startTimestamp) throws IOExcepti
return transaction <= largestDeletedTimestamp;
if (transaction <= largestDeletedTimestamp)
return true;
-// System.out.format("Asking TSO... hasConnectionTimestamp: %s connectionTimestamp: %d transaction: %d startTimestamp: %d\n",
-// Boolean.valueOf(hasConnectionTimestamp).toString(), connectionTimestamp, transaction, startTimestamp);
askedTSO++;
SyncCommitQueryCallback cb = new SyncCommitQueryCallback();
isCommitted(startTimestamp, transaction, cb);
@@ -547,8 +548,8 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
} else if (msg instanceof CommittedTransactionReport) {
CommittedTransactionReport ctr = (CommittedTransactionReport) msg;
committed.commit(ctr.startTimestamp, ctr.commitTimestamp);
- } else if (msg instanceof FullAbortReport) {
- FullAbortReport r = (FullAbortReport) msg;
+ } else if (msg instanceof CleanedTransactionReport) {
+ CleanedTransactionReport r = (CleanedTransactionReport) msg;
aborted.remove(r.startTimestamp);
} else if (msg instanceof AbortedTransactionReport) {
AbortedTransactionReport r = (AbortedTransactionReport) msg;
@@ -557,6 +558,8 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
LargestDeletedTimestampReport r = (LargestDeletedTimestampReport) msg;
largestDeletedTimestamp = r.largestDeletedTimestamp;
committed.raiseLargestDeletedTransaction(r.largestDeletedTimestamp);
+ } else if (msg instanceof ZipperState) {
+ // ignore
} else {
LOG.error("Unknown message received " + msg);
}
@@ -634,8 +637,6 @@ public void bailout(Exception cause) {
}
protected void processMessage(TSOMessage msg) {
- // TODO Auto-generated method stub
-
}
}
@@ -14,48 +14,49 @@
* limitations under the License. See accompanying LICENSE file.
*/
-package com.yahoo.omid.tso;
-
-import java.util.TreeSet;
+package com.yahoo.omid.replication;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
-public class TSOBuffer {
- private static final Log LOG = LogFactory.getLog(TSOBuffer.class);
+
+public class ReadersAwareBuffer {
+ private static final Log LOG = LogFactory.getLog(ReadersAwareBuffer.class);
private static final int CAPACITY = 1024*1024;
public ChannelBuffer buffer;
- public TreeSet<TSOSharedMessageBuffer.ReadingBuffer> readingBuffers = new TreeSet<TSOSharedMessageBuffer.ReadingBuffer>();
- private int pendingWrites = 0;
+ private int pendingReaders = 0;
+ private boolean scheduledForPool;
public static long nBuffers;
-
-
- public TSOBuffer() {
- this(true);
- }
-
- public TSOBuffer(boolean allocateBuffer) {
+
+ public ReadersAwareBuffer() {
nBuffers++;
LOG.warn("Allocated buffer");
- if (allocateBuffer)
- buffer = ChannelBuffers.directBuffer(CAPACITY);
+ buffer = ChannelBuffers.directBuffer(CAPACITY);
}
- public TSOBuffer reading(TSOSharedMessageBuffer.ReadingBuffer buf) {
- readingBuffers.add(buf);
- return this;
+ public synchronized void increaseReaders() {
+ pendingReaders++;
}
- public synchronized void incrementPending() {
- ++pendingWrites;
+ public synchronized void decreaseReaders() {
+ pendingReaders--;
}
-
- public synchronized boolean decrementPending() {
- return --pendingWrites == 0;
+
+ public synchronized boolean isReadyForPool() {
+ return scheduledForPool && pendingReaders == 0;
+ }
+
+ public synchronized void scheduleForPool() {
+ scheduledForPool = true;
+ }
+
+ public synchronized void reset() {
+ pendingReaders = 0;
+ scheduledForPool = false;
}
}
@@ -0,0 +1,188 @@
+/**
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package com.yahoo.omid.replication;
+
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SharedMessageBuffer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SharedMessageBuffer.class);
+
+ private static final int MAX_MESSAGE_SIZE = 30;
+
+ ReadersAwareBuffer pastBuffer = new ReadersAwareBuffer();
+ ReadersAwareBuffer currentBuffer = new ReadersAwareBuffer();
+ ChannelBuffer writeBuffer = currentBuffer.buffer;
+ BlockingQueue<ReadersAwareBuffer> bufferPool = new LinkedBlockingQueue<ReadersAwareBuffer>();
+ Zipper zipper = new Zipper();
+ Set<ReadingBuffer> readingBuffers = new TreeSet<SharedMessageBuffer.ReadingBuffer>();
+
+ public class ReadingBuffer implements Comparable<ReadingBuffer> {
+ private ChannelBuffer readBuffer;
+ private int readerIndex = 0;
+ private ReadersAwareBuffer readingBuffer;
+ private ChannelHandlerContext ctx;
+ private Channel channel;
+
+ private ReadingBuffer(ChannelHandlerContext ctx) {
+ this.channel = ctx.getChannel();
+ this.ctx = ctx;
+ }
+
+ public void initializeIndexes() {
+ this.readingBuffer = currentBuffer;
+ this.readBuffer = readingBuffer.buffer;
+ this.readerIndex = readBuffer.writerIndex();
+ }
+
+ /**
+ * Computes and returns the deltaSO for the associated client.
+ *
+ * This function registers some callbacks in the future passed for cleanup purposes, so the ChannelFuture object
+ * must be notified after the communication is finished.
+ *
+ * @param future
+ * It registers some callbacks on it
+ * @return the deltaSO for the associated client
+ */
+ public ChannelBuffer flush(ChannelFuture future) {
+ int readable = readBuffer.readableBytes() - readerIndex;
+
+ if (readable == 0 && readingBuffer != pastBuffer) {
+ return ChannelBuffers.EMPTY_BUFFER;
+ }
+
+ ChannelBuffer deltaSO = readBuffer.slice(readerIndex, readable);
+ addFinishedWriteListener(future, readingBuffer);
+ readingBuffer.increaseReaders();
+ readerIndex += readable;
+ if (readingBuffer == pastBuffer) {
+ readingBuffer = currentBuffer;
+ readBuffer = readingBuffer.buffer;
+ readerIndex = 0;
+ readable = readBuffer.readableBytes();
+ deltaSO = ChannelBuffers.wrappedBuffer(deltaSO, readBuffer.slice(readerIndex, readable));
+ addFinishedWriteListener(future, readingBuffer);
+ readingBuffer.increaseReaders();
+ readerIndex += readable;
+ }
+
+ return deltaSO;
+ }
+
+ private void addFinishedWriteListener(ChannelFuture future, final ReadersAwareBuffer buffer) {
+ future.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ buffer.decreaseReaders();
+ if (buffer.isReadyForPool()) {
+ bufferPool.add(buffer);
+ }
+ }
+ });
+ }
+
+ public ZipperState getZipperState() {
+ return zipper.getZipperState();
+ }
+
+ @Override
+ public int compareTo(ReadingBuffer o) {
+ return this.channel.compareTo(o.channel);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof ReadingBuffer))
+ return false;
+ ReadingBuffer buf = (ReadingBuffer) obj;
+ return this.channel.equals(buf.channel);
+ }
+
+ }
+
+ public ReadingBuffer getReadingBuffer(ChannelHandlerContext ctx) {
+ ReadingBuffer rb = new ReadingBuffer(ctx);
+ readingBuffers.add(rb);
+ return rb;
+ }
+
+ public void removeReadingBuffer(ChannelHandlerContext ctx) {
+ readingBuffers.remove(new ReadingBuffer(ctx));
+ }
+
+ public void writeCommit(long startTimestamp, long commitTimestamp) {
+ checkBufferSpace();
+ zipper.encodeCommit(writeBuffer, startTimestamp, commitTimestamp);
+ }
+
+ public void writeHalfAbort(long startTimestamp) {
+ checkBufferSpace();
+ zipper.encodeHalfAbort(writeBuffer, startTimestamp);
+ }
+
+ public void writeFullAbort(long startTimestamp) {
+ checkBufferSpace();
+ zipper.encodeFullAbort(writeBuffer, startTimestamp);
+ }
+
+ public void writeLargestIncrease(long largestTimestamp) {
+ checkBufferSpace();
+ zipper.encodeLargestIncrease(writeBuffer, largestTimestamp);
+ }
+
+ private void checkBufferSpace() {
+ if (writeBuffer.writableBytes() < MAX_MESSAGE_SIZE) {
+ nextBuffer();
+ }
+ }
+
+ private void nextBuffer() {
+ LOG.debug("Switching buffers");
+
+ // mark past buffer as scheduled for pool when all pending operations finish
+ pastBuffer.scheduleForPool();
+
+ for (final ReadingBuffer rb : readingBuffers) {
+ if (rb.readingBuffer == pastBuffer) {
+ ChannelFuture future = Channels.future(rb.channel);
+ ChannelBuffer cb = rb.flush(future);
+ Channels.write(rb.ctx, future, cb);
+ }
+ }
+
+ pastBuffer = currentBuffer;
+ currentBuffer = bufferPool.poll();
+ if (currentBuffer == null) {
+ currentBuffer = new ReadersAwareBuffer();
+ }
+ writeBuffer = currentBuffer.buffer;
+ }
+}
Oops, something went wrong.

0 comments on commit f6f7b87

Please sign in to comment.