Skip to content

Commit

Permalink
update test to reflect actual behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
epickrram committed Jan 3, 2018
1 parent 5f7bb6d commit ee1e1be
Showing 1 changed file with 53 additions and 26 deletions.
Expand Up @@ -16,7 +16,7 @@
import java.io.File; import java.io.File;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
Expand All @@ -32,6 +32,7 @@


@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public final class DocumentOrderingTest { public final class DocumentOrderingTest {
private static final RollCycles ROLL_CYCLE = RollCycles.TEST_SECONDLY;
private final ExecutorService executorService = Executors.newCachedThreadPool(); private final ExecutorService executorService = Executors.newCachedThreadPool();
private final AtomicLong clock = new AtomicLong(System.currentTimeMillis()); private final AtomicLong clock = new AtomicLong(System.currentTimeMillis());
private final AtomicInteger counter = new AtomicInteger(0); private final AtomicInteger counter = new AtomicInteger(0);
Expand All @@ -48,9 +49,14 @@ public DocumentOrderingTest(final String testType, final boolean progressOnConte


@Ignore("WIP") @Ignore("WIP")
@Test @Test
public void multipleQueuedOpenDocumentsInPreviousCycleFileShouldRemainOrdered() throws Exception { public void queuedWriteInPreviousCycleShouldBeAppendedToNewCycle() throws Exception {
try (final SingleChronicleQueue queue = try (final SingleChronicleQueue queue =
builder(DirectoryUtils.tempDir("document-ordering"), 1_000L).build()) { builder(DirectoryUtils.tempDir("document-ordering"), 1_000L).build()) {
Assume.assumeFalse(
"ordering/atomicity is not guaranteed when using progressOnContention = true," +
"as multiple threads can be concurrently executing within a queue's " +
"document context when the queue head is contented",
progressOnContention);


final ExcerptAppender excerptAppender = queue.acquireAppender(); final ExcerptAppender excerptAppender = queue.acquireAppender();
// write initial document // write initial document
Expand All @@ -60,17 +66,21 @@ public void multipleQueuedOpenDocumentsInPreviousCycleFileShouldRemainOrdered()
final DocumentContext firstOpenDocument = excerptAppender.writingDocument(); final DocumentContext firstOpenDocument = excerptAppender.writingDocument();
firstOpenDocument.wire().getValueOut().int32(counter.getAndIncrement()); firstOpenDocument.wire().getValueOut().int32(counter.getAndIncrement());


final Future<Integer> secondDocumentInFirstCycle = executorService.submit(attemptToWriteDocument(queue)); // start another record in the first cycle file
// this will actually be written to the second cycle file, since it will wait for
// firstOpenDocument to be completed/timed out
final Future<RecordInfo> secondDocumentInFirstCycle = attemptToWriteDocument(queue);


// move time to beyond the next cycle // move time to beyond the next cycle
clock.addAndGet(TimeUnit.SECONDS.toMillis(2L)); clock.addAndGet(TimeUnit.SECONDS.toMillis(2L));


final Future<Integer> otherDocumentWriter = executorService.submit(attemptToWriteDocument(queue)); final Future<RecordInfo> otherDocumentWriter = attemptToWriteDocument(queue);


firstOpenDocument.close(); firstOpenDocument.close();


assertEquals(1, secondDocumentInFirstCycle.get(5L, TimeUnit.SECONDS).intValue()); // assert that queued record (secondDocumentInFirstCycle) is actually written to the second cycle file
assertEquals(2, otherDocumentWriter.get(5L, TimeUnit.SECONDS).intValue()); assertEquals(secondDocumentInFirstCycle.get(5L, TimeUnit.SECONDS).cycle,
otherDocumentWriter.get(5L, TimeUnit.SECONDS).cycle);


final ExcerptTailer tailer = queue.createTailer(); final ExcerptTailer tailer = queue.createTailer();
// discard first record // discard first record
Expand All @@ -93,17 +103,17 @@ public void shouldRecoverFromUnfinishedFirstMessageInPreviousQueue() throws Exce
progressOnContention(progressOnContention).build()) { progressOnContention(progressOnContention).build()) {


final ExcerptAppender excerptAppender = queue.acquireAppender(); final ExcerptAppender excerptAppender = queue.acquireAppender();
final Future<Integer> otherDocumentWriter; final Future<RecordInfo> otherDocumentWriter;
// begin a record in the first cycle file // begin a record in the first cycle file
final DocumentContext documentContext = excerptAppender.writingDocument(); final DocumentContext documentContext = excerptAppender.writingDocument();
documentContext.wire().getValueOut().int32(counter.getAndIncrement()); documentContext.wire().getValueOut().int32(counter.getAndIncrement());


// move time to beyond the next cycle // move time to beyond the next cycle
clock.addAndGet(TimeUnit.SECONDS.toMillis(2L)); clock.addAndGet(TimeUnit.SECONDS.toMillis(2L));


otherDocumentWriter = executorService.submit(attemptToWriteDocument(queue)); otherDocumentWriter = attemptToWriteDocument(queue);


assertEquals(1, otherDocumentWriter.get(5L, TimeUnit.SECONDS).intValue()); assertEquals(1, otherDocumentWriter.get(5L, TimeUnit.SECONDS).counterValue);


final ExcerptTailer tailer = queue.createTailer(); final ExcerptTailer tailer = queue.createTailer();
expectValue(1, tailer); expectValue(1, tailer);
Expand Down Expand Up @@ -132,19 +142,19 @@ public void multipleThreadsMustWaitUntilPreviousCycleFileIsCompleted() throws Ex
) { ) {


final ExcerptAppender excerptAppender = queue.acquireAppender(); final ExcerptAppender excerptAppender = queue.acquireAppender();
final Future<Integer> firstWriter; final Future<RecordInfo> firstWriter;
final Future<Integer> secondWriter; final Future<RecordInfo> secondWriter;
final Future<Integer> thirdWriter; final Future<RecordInfo> thirdWriter;
try (final DocumentContext documentContext = excerptAppender.writingDocument()) { try (final DocumentContext documentContext = excerptAppender.writingDocument()) {


// move time to beyond the next cycle // move time to beyond the next cycle
clock.addAndGet(TimeUnit.SECONDS.toMillis(2L)); clock.addAndGet(TimeUnit.SECONDS.toMillis(2L));
// add some jitter to allow threads to race // add some jitter to allow threads to race
firstWriter = executorService.submit(attemptToWriteDocument(queue2)); firstWriter = attemptToWriteDocument(queue2);
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L)); LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
secondWriter = executorService.submit(attemptToWriteDocument(queue3)); secondWriter = attemptToWriteDocument(queue3);
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L)); LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
thirdWriter = executorService.submit(attemptToWriteDocument(queue4)); thirdWriter = attemptToWriteDocument(queue4);


// stall this thread, other threads should not be able to advance, // stall this thread, other threads should not be able to advance,
// since this DocumentContext is still open // since this DocumentContext is still open
Expand All @@ -171,13 +181,13 @@ public void codeWithinPriorDocumentMustExecuteBeforeSubsequentDocumentWhenQueueI
builder(DirectoryUtils.tempDir("document-ordering"), 3_000L).build()) { builder(DirectoryUtils.tempDir("document-ordering"), 3_000L).build()) {


final ExcerptAppender excerptAppender = queue.acquireAppender(); final ExcerptAppender excerptAppender = queue.acquireAppender();
final Future<Integer> otherDocumentWriter; final Future<RecordInfo> otherDocumentWriter;
try (final DocumentContext documentContext = excerptAppender.writingDocument()) { try (final DocumentContext documentContext = excerptAppender.writingDocument()) {


// move time to beyond the next cycle // move time to beyond the next cycle
clock.addAndGet(TimeUnit.SECONDS.toMillis(2L)); clock.addAndGet(TimeUnit.SECONDS.toMillis(2L));


otherDocumentWriter = executorService.submit(attemptToWriteDocument(queue)); otherDocumentWriter = attemptToWriteDocument(queue);


// stall this thread, other thread should not be able to advance, // stall this thread, other thread should not be able to advance,
// since this DocumentContext is still open // since this DocumentContext is still open
Expand All @@ -186,7 +196,7 @@ public void codeWithinPriorDocumentMustExecuteBeforeSubsequentDocumentWhenQueueI
documentContext.wire().getValueOut().int32(counter.getAndIncrement()); documentContext.wire().getValueOut().int32(counter.getAndIncrement());
} }


assertEquals(1, otherDocumentWriter.get(5L, TimeUnit.SECONDS).intValue()); assertEquals(1, otherDocumentWriter.get(5L, TimeUnit.SECONDS).counterValue);


final ExcerptTailer tailer = queue.createTailer(); final ExcerptTailer tailer = queue.createTailer();
expectValue(0, tailer); expectValue(0, tailer);
Expand All @@ -201,13 +211,13 @@ public void codeWithinPriorDocumentMustExecuteBeforeSubsequentDocumentWhenQueueI


final ExcerptAppender excerptAppender = queue.acquireAppender(); final ExcerptAppender excerptAppender = queue.acquireAppender();
excerptAppender.writeDocument("foo", ValueOut::text); excerptAppender.writeDocument("foo", ValueOut::text);
final Future<Integer> otherDocumentWriter; final Future<RecordInfo> otherDocumentWriter;
try (final DocumentContext documentContext = excerptAppender.writingDocument()) { try (final DocumentContext documentContext = excerptAppender.writingDocument()) {


// move time to beyond the next cycle // move time to beyond the next cycle
clock.addAndGet(TimeUnit.SECONDS.toMillis(2L)); clock.addAndGet(TimeUnit.SECONDS.toMillis(2L));


otherDocumentWriter = executorService.submit(attemptToWriteDocument(queue)); otherDocumentWriter = attemptToWriteDocument(queue);


// stall this thread, other thread should not be able to advance, // stall this thread, other thread should not be able to advance,
// since this DocumentContext is still open // since this DocumentContext is still open
Expand All @@ -216,7 +226,7 @@ public void codeWithinPriorDocumentMustExecuteBeforeSubsequentDocumentWhenQueueI
documentContext.wire().getValueOut().int32(counter.getAndIncrement()); documentContext.wire().getValueOut().int32(counter.getAndIncrement());
} }


assertEquals(1, otherDocumentWriter.get(5L, TimeUnit.SECONDS).intValue()); assertEquals(1, otherDocumentWriter.get(5L, TimeUnit.SECONDS).counterValue);


final ExcerptTailer tailer = queue.createTailer(); final ExcerptTailer tailer = queue.createTailer();
final DocumentContext documentContext = tailer.readingDocument(); final DocumentContext documentContext = tailer.readingDocument();
Expand All @@ -239,21 +249,38 @@ private static void expectValue(final int expectedValue, final ExcerptTailer tai
} }
} }


private Callable<Integer> attemptToWriteDocument(final SingleChronicleQueue queue) { private Future<RecordInfo> attemptToWriteDocument(final SingleChronicleQueue queue) throws InterruptedException {
return () -> { final CountDownLatch startedLatch = new CountDownLatch(1);
final Future<RecordInfo> future = executorService.submit(() -> {
final int counterValue; final int counterValue;
DocumentContext outer;
startedLatch.countDown();
try (final DocumentContext documentContext = queue.acquireAppender().writingDocument()) { try (final DocumentContext documentContext = queue.acquireAppender().writingDocument()) {
counterValue = counter.getAndIncrement(); counterValue = counter.getAndIncrement();
documentContext.wire().getValueOut().int32(counterValue); documentContext.wire().getValueOut().int32(counterValue);
outer = documentContext;
} }
return counterValue; final long index = outer.index();
}; return new RecordInfo(counterValue, ROLL_CYCLE.toCycle(index));
});
assertTrue("Task did not start", startedLatch.await(1, TimeUnit.MINUTES));
return future;
} }


private SingleChronicleQueueBuilder builder(final File dir, final long timeoutMS) { private SingleChronicleQueueBuilder builder(final File dir, final long timeoutMS) {
return SingleChronicleQueueBuilder.binary(dir). return SingleChronicleQueueBuilder.binary(dir).
testBlockSize().rollCycle(RollCycles.TEST_SECONDLY). testBlockSize().rollCycle(ROLL_CYCLE).
progressOnContention(progressOnContention). progressOnContention(progressOnContention).
timeProvider(clock::get).timeoutMS(timeoutMS); timeProvider(clock::get).timeoutMS(timeoutMS);
} }

private static final class RecordInfo {
private final int counterValue;
private final int cycle;

RecordInfo(final int counterValue, final int cycle) {
this.counterValue = counterValue;
this.cycle = cycle;
}
}
} }

0 comments on commit ee1e1be

Please sign in to comment.