Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Fix bug during SharedMessageBuffer flushes

If a client was still on the past buffer, it would only receive the deltaSO from the currentBuffer.
Added a regression test as well.
  • Loading branch information...
commit 062702a40e3985750976146ccae74d5d9445e925 1 parent db29950
@dgomezferro authored
View
2  src/main/java/com/yahoo/omid/replication/SharedMessageBuffer.java
@@ -88,7 +88,7 @@ public ChannelBuffer flush(ChannelFuture future) {
readBuffer = readingBuffer.buffer;
readerIndex = 0;
readable = readBuffer.readableBytes();
- deltaSO = readBuffer.slice(readerIndex, readable);
+ deltaSO = ChannelBuffers.wrappedBuffer(deltaSO, readBuffer.slice(readerIndex, readable));
addFinishedWriteListener(future, readingBuffer);
readingBuffer.increaseReaders();
readerIndex += readable;
View
87 src/main/java/com/yahoo/omid/tso/messages/AbortedTransactionReport.java
@@ -28,36 +28,59 @@
*
*/
public class AbortedTransactionReport implements TSOMessage {
- /**
- * Starting timestamp
- */
- public long startTimestamp;
-
- public AbortedTransactionReport() {
- }
-
- public AbortedTransactionReport(long startTimestamp) {
- this.startTimestamp = startTimestamp;
- }
-
- @Override
- public String toString() {
- return "Aborted Transaction Report: T_s:" + startTimestamp;
- }
-
- @Override
- public void readObject(ChannelBuffer aInputStream) {
-
- startTimestamp = aInputStream.readLong();
- }
-
- @Override
- public void writeObject(DataOutputStream aOutputStream) throws IOException {
- aOutputStream.writeLong(startTimestamp);
- }
-
- @Override
- public void writeObject(ChannelBuffer buffer) {
- buffer.writeLong(startTimestamp);
- }
+ /**
+ * Starting timestamp
+ */
+ public long startTimestamp;
+
+ public AbortedTransactionReport() {
+ }
+
+ public AbortedTransactionReport(long startTimestamp) {
+ this.startTimestamp = startTimestamp;
+ }
+
+ @Override
+ public String toString() {
+ return "Aborted Transaction Report: T_s:" + startTimestamp;
+ }
+
+ @Override
+ public void readObject(ChannelBuffer aInputStream) {
+
+ startTimestamp = aInputStream.readLong();
+ }
+
+ @Override
+ public void writeObject(DataOutputStream aOutputStream) throws IOException {
+ aOutputStream.writeLong(startTimestamp);
+ }
+
+ @Override
+ public void writeObject(ChannelBuffer buffer) {
+ buffer.writeLong(startTimestamp);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (startTimestamp ^ (startTimestamp >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ AbortedTransactionReport other = (AbortedTransactionReport) obj;
+ if (startTimestamp != other.startTimestamp)
+ return false;
+ return true;
+ }
+
}
View
87 src/main/java/com/yahoo/omid/tso/messages/CleanedTransactionReport.java
@@ -28,36 +28,59 @@
*
*/
public class CleanedTransactionReport implements TSOMessage {
- /**
- * Starting timestamp
- */
- public long startTimestamp;
-
- public CleanedTransactionReport() {
- }
-
- public CleanedTransactionReport(long startTimestamp) {
- this.startTimestamp = startTimestamp;
- }
-
- @Override
- public String toString() {
- return "Cleaned up Transaction Report: T_s:" + startTimestamp;
- }
-
- @Override
- public void readObject(ChannelBuffer aInputStream) {
-
- startTimestamp = aInputStream.readLong();
- }
-
- @Override
- public void writeObject(DataOutputStream aOutputStream) throws IOException {
- aOutputStream.writeLong(startTimestamp);
- }
-
- @Override
- public void writeObject(ChannelBuffer buffer) {
- buffer.writeLong(startTimestamp);
- }
+ /**
+ * Starting timestamp
+ */
+ public long startTimestamp;
+
+ public CleanedTransactionReport() {
+ }
+
+ public CleanedTransactionReport(long startTimestamp) {
+ this.startTimestamp = startTimestamp;
+ }
+
+ @Override
+ public String toString() {
+ return "Cleaned up Transaction Report: T_s:" + startTimestamp;
+ }
+
+ @Override
+ public void readObject(ChannelBuffer aInputStream) {
+
+ startTimestamp = aInputStream.readLong();
+ }
+
+ @Override
+ public void writeObject(DataOutputStream aOutputStream) throws IOException {
+ aOutputStream.writeLong(startTimestamp);
+ }
+
+ @Override
+ public void writeObject(ChannelBuffer buffer) {
+ buffer.writeLong(startTimestamp);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (startTimestamp ^ (startTimestamp >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ CleanedTransactionReport other = (CleanedTransactionReport) obj;
+ if (startTimestamp != other.startTimestamp)
+ return false;
+ return true;
+ }
+
}
View
88 src/main/java/com/yahoo/omid/tso/messages/CommittedTransactionReport.java
@@ -28,35 +28,61 @@
*
*/
public class CommittedTransactionReport implements TSOMessage {
- /**
- * Starting timestamp
- */
- public long startTimestamp;
- public long commitTimestamp;
-
- public CommittedTransactionReport() {
- }
-
- public CommittedTransactionReport(long startTimestamp, long commitTimestamp) {
- this.startTimestamp = startTimestamp;
- this.commitTimestamp = commitTimestamp;
- }
-
- @Override
- public String toString() {
- return "Committed Transaction Report: T_s:" + startTimestamp + " T_c:" + commitTimestamp;
- }
-
- // (De)serialization handled on Zipper
- @Override
- public void readObject(ChannelBuffer aInputStream) {
- }
-
- @Override
- public void writeObject(DataOutputStream aOutputStream) throws IOException {
- }
-
- @Override
- public void writeObject(ChannelBuffer buffer) {
- }
+ /**
+ * Starting timestamp
+ */
+ public long startTimestamp;
+ public long commitTimestamp;
+
+ public CommittedTransactionReport() {
+ }
+
+ public CommittedTransactionReport(long startTimestamp, long commitTimestamp) {
+ this.startTimestamp = startTimestamp;
+ this.commitTimestamp = commitTimestamp;
+ }
+
+ @Override
+ public String toString() {
+ return "Committed Transaction Report: T_s:" + startTimestamp + " T_c:" + commitTimestamp;
+ }
+
+ // (De)serialization handled on Zipper
+ @Override
+ public void readObject(ChannelBuffer aInputStream) {
+ }
+
+ @Override
+ public void writeObject(DataOutputStream aOutputStream) throws IOException {
+ }
+
+ @Override
+ public void writeObject(ChannelBuffer buffer) {
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (commitTimestamp ^ (commitTimestamp >>> 32));
+ result = prime * result + (int) (startTimestamp ^ (startTimestamp >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ CommittedTransactionReport other = (CommittedTransactionReport) obj;
+ if (commitTimestamp != other.commitTimestamp)
+ return false;
+ if (startTimestamp != other.startTimestamp)
+ return false;
+ return true;
+ }
+
}
View
87 src/main/java/com/yahoo/omid/tso/messages/LargestDeletedTimestampReport.java
@@ -28,36 +28,59 @@
*
*/
public class LargestDeletedTimestampReport implements TSOMessage {
- /**
- * Starting timestamp
- */
- public long largestDeletedTimestamp;
-
- public LargestDeletedTimestampReport() {
- }
-
- public LargestDeletedTimestampReport(long largestDeletedTimestamp) {
- this.largestDeletedTimestamp = largestDeletedTimestamp;
- }
-
- @Override
- public String toString() {
- return "Largest Deleted Timestamp Report: T_s:" + largestDeletedTimestamp;
- }
-
- @Override
- public void readObject(ChannelBuffer aInputStream) {
-
- largestDeletedTimestamp = aInputStream.readLong();
- }
-
- @Override
- public void writeObject(DataOutputStream aOutputStream) throws IOException {
- aOutputStream.writeLong(largestDeletedTimestamp);
- }
-
- @Override
- public void writeObject(ChannelBuffer buffer) {
- buffer.writeLong(largestDeletedTimestamp);
- }
+ /**
+ * Starting timestamp
+ */
+ public long largestDeletedTimestamp;
+
+ public LargestDeletedTimestampReport() {
+ }
+
+ public LargestDeletedTimestampReport(long largestDeletedTimestamp) {
+ this.largestDeletedTimestamp = largestDeletedTimestamp;
+ }
+
+ @Override
+ public String toString() {
+ return "Largest Deleted Timestamp Report: T_s:" + largestDeletedTimestamp;
+ }
+
+ @Override
+ public void readObject(ChannelBuffer aInputStream) {
+
+ largestDeletedTimestamp = aInputStream.readLong();
+ }
+
+ @Override
+ public void writeObject(DataOutputStream aOutputStream) throws IOException {
+ aOutputStream.writeLong(largestDeletedTimestamp);
+ }
+
+ @Override
+ public void writeObject(ChannelBuffer buffer) {
+ buffer.writeLong(largestDeletedTimestamp);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (largestDeletedTimestamp ^ (largestDeletedTimestamp >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ LargestDeletedTimestampReport other = (LargestDeletedTimestampReport) obj;
+ if (largestDeletedTimestamp != other.largestDeletedTimestamp)
+ return false;
+ return true;
+ }
+
}
View
131 src/test/java/com/yahoo/omid/replication/TestSharedMessageBuffer.java
@@ -1,5 +1,136 @@
package com.yahoo.omid.replication;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Random;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelDownstreamHandler;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
+import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
+import org.jboss.netty.handler.codec.embedder.EncoderEmbedder;
+import org.junit.Test;
+
+import com.yahoo.omid.replication.SharedMessageBuffer.ReadingBuffer;
+import com.yahoo.omid.tso.TSOMessage;
+import com.yahoo.omid.tso.messages.AbortedTransactionReport;
+import com.yahoo.omid.tso.messages.CleanedTransactionReport;
+import com.yahoo.omid.tso.messages.CommittedTransactionReport;
+import com.yahoo.omid.tso.messages.LargestDeletedTimestampReport;
+import com.yahoo.omid.tso.serialization.TSODecoder;
+
public class TestSharedMessageBuffer {
+ @Test
+ public void testShortFlushes() {
+ final int ITERATIONS = 1000000;
+ SharedMessageBuffer smb = new SharedMessageBuffer();
+ final DecoderEmbedder<TSOMessage> decoder = new DecoderEmbedder<TSOMessage>(new TSODecoder(new Zipper()));
+ ChannelDownstreamHandler handler = new SimpleChannelDownstreamHandler();
+ EncoderEmbedder<ChannelBuffer> encoder = new EncoderEmbedder<ChannelBuffer>(handler);
+ ChannelHandlerContext ctx = encoder.getPipeline().getContext(handler);
+ Channel channel = ctx.getChannel();
+ ReadingBuffer rb = smb.getReadingBuffer(ctx);
+ rb.initializeIndexes();
+ Random rand = new Random();
+
+ Deque<TSOMessage> expectedMessages = new ArrayDeque<TSOMessage>();
+ int checked = 0;
+
+ // Write one message to the shared buffer and read it as a client
+ for (int i = 0; i < ITERATIONS; ++i) {
+ writeRandomMessage(smb, rand, expectedMessages);
+
+ ChannelFuture future = Channels.succeededFuture(channel);
+ ChannelBuffer buffer = rb.flush(future);
+ Channels.write(ctx, future, buffer);
+
+ forwardMessages(encoder, decoder);
+
+ checked += checkExpectedMessage(decoder, expectedMessages);
+ }
+
+ assertTrue("Some messages weren't consumed", expectedMessages.isEmpty());
+ assertEquals("Didn't check the generated number of messages", ITERATIONS, checked);
+ }
+
+ @Test
+ public void testLongFlushes() {
+ final int ITERATIONS = 1000000;
+ SharedMessageBuffer smb = new SharedMessageBuffer();
+ final DecoderEmbedder<TSOMessage> decoder = new DecoderEmbedder<TSOMessage>(new TSODecoder(new Zipper()));
+ ChannelDownstreamHandler handler = new SimpleChannelDownstreamHandler();
+ EncoderEmbedder<ChannelBuffer> encoder = new EncoderEmbedder<ChannelBuffer>(handler);
+ ChannelHandlerContext ctx = encoder.getPipeline().getContext(handler);
+ Channel channel = ctx.getChannel();
+ ReadingBuffer rb = smb.getReadingBuffer(ctx);
+ rb.initializeIndexes();
+ Random rand = new Random();
+
+ Deque<TSOMessage> expectedMessages = new ArrayDeque<TSOMessage>();
+
+ for (int i = 0; i < ITERATIONS; ++i) {
+ writeRandomMessage(smb, rand, expectedMessages);
+ }
+
+ // Flush the remaining messages
+ ChannelFuture future = Channels.succeededFuture(channel);
+ ChannelBuffer buffer = rb.flush(future);
+ Channels.write(ctx, future, buffer);
+
+ forwardMessages(encoder, decoder);
+
+ int checked = checkExpectedMessage(decoder, expectedMessages);
+
+ assertTrue("Some messages weren't consumed", expectedMessages.isEmpty());
+ assertEquals("Didn't check the generated number of messages", ITERATIONS, checked);
+ }
+
+ private int checkExpectedMessage(DecoderEmbedder<TSOMessage> decoder, Deque<TSOMessage> expectedMessages) {
+ int checked = 0;
+ while (!expectedMessages.isEmpty()) {
+ TSOMessage expected = expectedMessages.poll();
+ TSOMessage actual = decoder.poll();
+ assertEquals("Read message didn't correspond to written message", expected, actual);
+ checked++;
+ }
+ return checked;
+ }
+
+ private void writeRandomMessage(SharedMessageBuffer smb, Random rand, Deque<TSOMessage> expectedMessages) {
+ long firstTS = rand.nextLong();
+ long secondTS = rand.nextLong();
+ switch (rand.nextInt(4)) {
+ case 0:
+ smb.writeCommit(firstTS, secondTS);
+ expectedMessages.add(new CommittedTransactionReport(firstTS, secondTS));
+ break;
+ case 1:
+ smb.writeFullAbort(firstTS);
+ expectedMessages.add(new CleanedTransactionReport(firstTS));
+ break;
+ case 2:
+ smb.writeHalfAbort(firstTS);
+ expectedMessages.add(new AbortedTransactionReport(firstTS));
+ break;
+ case 3:
+ smb.writeLargestIncrease(firstTS);
+ expectedMessages.add(new LargestDeletedTimestampReport(firstTS));
+ break;
+ }
+ }
+
+ private void forwardMessages(EncoderEmbedder<ChannelBuffer> encoder, DecoderEmbedder<TSOMessage> decoder) {
+ ChannelBuffer buffer;
+ while ((buffer = encoder.poll()) != null) {
+ decoder.offer(buffer);
+ }
+ }
}
Please sign in to comment.
Something went wrong with that request. Please try again.