Skip to content

Commit

Permalink
Allow a blocked appender to recover if an appender left an incomplete…
Browse files Browse the repository at this point in the history
… message.
  • Loading branch information
peter-lawrey committed May 21, 2016
1 parent f84009b commit e05c4a1
Showing 1 changed file with 38 additions and 169 deletions.
Expand Up @@ -16,50 +16,44 @@


package net.openhft.chronicle.queue.impl.single; package net.openhft.chronicle.queue.impl.single;


import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.ref.BinaryLongArrayReference; import net.openhft.chronicle.bytes.ref.BinaryLongArrayReference;
import net.openhft.chronicle.bytes.ref.BinaryLongReference; import net.openhft.chronicle.bytes.ref.BinaryLongReference;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender; import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer; import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles; import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.RollingChronicleQueue;
import net.openhft.chronicle.wire.DocumentContext; import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.WireType;
import net.openhft.chronicle.wire.Wires;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;


import java.io.File; import java.io.File;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;


import static net.openhft.chronicle.queue.ChronicleQueueTestBase.getTmpDir; import static net.openhft.chronicle.queue.ChronicleQueueTestBase.getTmpDir;

import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder.binary;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;


public class NotCompleteTest { public class NotCompleteTest {


private static final int TIMES = 100;


/** /**
* tests that when flags are set to not complete we are able to recover * tests that when flags are set to not complete we are able to recover
*/ */
@Test @Test
public void testUsingANotCompleteQueue() throws TimeoutException, ExecutionException, public void testUsingANotCompleteQueue()
InterruptedException { throws TimeoutException, ExecutionException, InterruptedException {


BinaryLongReference.startCollecting(); BinaryLongReference.startCollecting();


File tmpDir = getTmpDir(); File tmpDir = getTmpDir();
try (final RollingChronicleQueue queue = new SingleChronicleQueueBuilder(tmpDir) try (final ChronicleQueue queue = binary(tmpDir)
.wireType(WireType.BINARY)
.rollCycle(RollCycles.TEST_DAILY) .rollCycle(RollCycles.TEST_DAILY)
.build()) { .build()) {


ExcerptAppender appender = queue.createAppender(); ExcerptAppender appender = queue.createAppender();


try (DocumentContext documentContext = appender.writingDocument()) { try (DocumentContext dc = appender.writingDocument()) {
documentContext.wire().write("some").text("data"); dc.wire().write("some").text("data");
} }


Thread.sleep(100); Thread.sleep(100);
Expand All @@ -70,36 +64,34 @@ public void testUsingANotCompleteQueue() throws TimeoutException, ExecutionExcep
// this is what will corrupt the queue // this is what will corrupt the queue
BinaryLongReference.forceAllToNotCompleteState(); BinaryLongReference.forceAllToNotCompleteState();


try (final RollingChronicleQueue queue = new SingleChronicleQueueBuilder(tmpDir) try (final ChronicleQueue queue = binary(tmpDir)
.wireType(WireType.BINARY)
.timeoutMS(500) .timeoutMS(500)
.build()) { .build()) {
System.out.println(queue.dump()); System.out.println(queue.dump());


ExcerptTailer tailer = queue.createTailer(); ExcerptTailer tailer = queue.createTailer();


try (DocumentContext documentContext = tailer.readingDocument()) { try (DocumentContext dc = tailer.readingDocument()) {
Assert.assertEquals("data", documentContext.wire().read(() -> "some").text()); assertEquals("data", dc.wire().read(() -> "some").text());
} }
} }
} }


@Test @Test
public void testUsingANotCompleteArrayQueue() throws TimeoutException, ExecutionException, public void testUsingANotCompleteArrayQueue()
InterruptedException { throws TimeoutException, ExecutionException, InterruptedException {


BinaryLongArrayReference.startCollecting(); BinaryLongArrayReference.startCollecting();


File tmpDir = getTmpDir(); File tmpDir = getTmpDir();
try (final RollingChronicleQueue queue = new SingleChronicleQueueBuilder(tmpDir) try (final ChronicleQueue queue = binary(tmpDir)
.wireType(WireType.BINARY)
.rollCycle(RollCycles.TEST_DAILY) .rollCycle(RollCycles.TEST_DAILY)
.build()) { .build()) {


ExcerptAppender appender = queue.createAppender(); ExcerptAppender appender = queue.createAppender();


try (DocumentContext documentContext = appender.writingDocument()) { try (DocumentContext dc = appender.writingDocument()) {
documentContext.wire().write("some").text("data"); dc.wire().write("some").text("data");
} }


Thread.sleep(100); Thread.sleep(100);
Expand All @@ -110,182 +102,59 @@ public void testUsingANotCompleteArrayQueue() throws TimeoutException, Execution
// this is what will corrupt the queue // this is what will corrupt the queue
BinaryLongArrayReference.forceAllToNotCompleteState(); BinaryLongArrayReference.forceAllToNotCompleteState();


try (final RollingChronicleQueue queue = new SingleChronicleQueueBuilder(tmpDir) try (final ChronicleQueue queue = binary(tmpDir)
.wireType(WireType.BINARY)
.timeoutMS(500) .timeoutMS(500)
.build()) { .build()) {
System.out.println(queue.dump()); System.out.println(queue.dump());


ExcerptTailer tailer = queue.createTailer(); ExcerptTailer tailer = queue.createTailer();


try (DocumentContext documentContext = tailer.readingDocument()) { try (DocumentContext dc = tailer.readingDocument()) {
Assert.assertEquals("data", documentContext.wire().read(() -> "some").text()); assertEquals("data", dc.wire().read(() -> "some").text());
} }
} }
} }



@Test @Test
public void testDocumentLeftInNotRead() throws TimeoutException, ExecutionException, public void testMessageLeftNotComplete()
InterruptedException { throws TimeoutException, ExecutionException, InterruptedException {



File tmpDir = getTmpDir(); File tmpDir = getTmpDir();
try (final RollingChronicleQueue queue = new SingleChronicleQueueBuilder(tmpDir) try (final ChronicleQueue queue = binary(tmpDir).rollCycle(RollCycles.TEST_DAILY).build()) {
.wireType(WireType.BINARY)
.build()) {

ExcerptAppender appender = queue.createAppender(); ExcerptAppender appender = queue.createAppender();


// start a message which was not completed.
DocumentContext dc = appender.writingDocument();
dc.wire().write("some").text("data");
// didn't call dc.close();


long start; System.out.println(queue.dump());
Bytes<?> bytes;

try (DocumentContext documentContext = appender.writingDocument()) {
bytes = documentContext.wire().bytes();
documentContext.wire().write("some").text("data");
}

// this should simulate another document being written by another process and left in
// a not ready state
bytes.writeInt(Wires.NOT_READY);
}


// this should write it should not time out !

try (final RollingChronicleQueue queue = new SingleChronicleQueueBuilder(tmpDir)
.wireType(WireType.BINARY)
.build()) {

ExcerptAppender appender = queue.createAppender();

// the above should time out and only the some-more more data should be written
try (DocumentContext documentContext = appender.writingDocument()) {
documentContext.wire().write("some-more").text("more-data");
}
} }


try (final RollingChronicleQueue queue = new SingleChronicleQueueBuilder(tmpDir) try (final ChronicleQueue queue = binary(tmpDir).build()) {
.wireType(WireType.BINARY)
.build()) {
ExcerptTailer tailer = queue.createTailer(); ExcerptTailer tailer = queue.createTailer();


try (DocumentContext documentContext = tailer.readingDocument()) { try (DocumentContext dc = tailer.readingDocument()) {
Assert.assertEquals("some", documentContext.wire().read(() -> "data").text()); assertFalse(dc.isPresent());
} }


}


}


@Test
public void testMessageOfUnknownLengthLeftNotComplete() throws TimeoutException, ExecutionException,
InterruptedException {


File tmpDir = getTmpDir();
try (final RollingChronicleQueue queue = new SingleChronicleQueueBuilder(tmpDir)
.wireType(WireType.BINARY)
.build()) {

ExcerptAppender appender = queue.createAppender();


Bytes<?> bytes;

try (DocumentContext documentContext = appender.writingDocument()) {
bytes = documentContext.wire().bytes();
documentContext.wire().write("some").text("data");
}

// this should simulate another document being written by another process and left in
// a not ready state
bytes.writeInt(Wires.NOT_READY);

System.out.println(queue.dump().toString());

} }



try (final ChronicleQueue queue = binary(tmpDir).timeoutMS(500).build()) {
// this should write it should not time out !

try (final RollingChronicleQueue queue = new SingleChronicleQueueBuilder(tmpDir)
.wireType(WireType.BINARY)
.build()) {

ExcerptAppender appender = queue.createAppender(); ExcerptAppender appender = queue.createAppender();


// the above should time out and only the some-more more data should be written try (DocumentContext dc = appender.writingDocument()) {
try (DocumentContext documentContext = appender.writingDocument()) { dc.wire().write("some").text("data");
documentContext.wire().write("some-more").text("more-data");
} }



System.out.println(queue.dump());
} }


try (final RollingChronicleQueue queue = new SingleChronicleQueueBuilder(tmpDir) try (final ChronicleQueue queue = binary(tmpDir).build()) {
.wireType(WireType.BINARY)
.build()) {
ExcerptTailer tailer = queue.createTailer(); ExcerptTailer tailer = queue.createTailer();


try (DocumentContext documentContext = tailer.readingDocument()) { try (DocumentContext dc = tailer.readingDocument()) {
Assert.assertEquals("some", documentContext.wire().read(() -> "data").text()); assertEquals("data", dc.wire().read(() -> "some").text());
} }


} }


}


@Test
public void testMessageOfKnownLengthLeftNotComplete() throws TimeoutException,
ExecutionException,
InterruptedException {


File tmpDir = getTmpDir();
try (final RollingChronicleQueue queue = new SingleChronicleQueueBuilder(tmpDir)
.wireType(WireType.BINARY)
.build()) {

ExcerptAppender appender = queue.createAppender();


long start;
Bytes<?> bytes;

try (DocumentContext documentContext = appender.writingDocument()) {
bytes = documentContext.wire().bytes();
start = bytes.writePosition() - 4;
documentContext.wire().write("some").text("data");
}

// leave the document in a not read state - not sure about the index ?
bytes.writePosition(start).writeInt(bytes.readInt(start) | Wires.NOT_READY);


System.out.println(queue.dump().toString());
}


try (final RollingChronicleQueue queue = new SingleChronicleQueueBuilder(tmpDir)
.wireType(WireType.BINARY)
.build()) {
ExcerptTailer tailer = queue.createTailer().lazyIndexing(true);

try (DocumentContext documentContext = tailer.readingDocument()) {
Assert.assertEquals("some", documentContext.wire().read(() -> "data").text());
}


}


} }
} }

0 comments on commit e05c4a1

Please sign in to comment.