Skip to content

Commit

Permalink
Add further test for #396
Browse files Browse the repository at this point in the history
  • Loading branch information
epickrram committed Jan 25, 2018
1 parent 2563c58 commit 0fcdabb
Showing 1 changed file with 57 additions and 32 deletions.
@@ -1,15 +1,18 @@
package net.openhft.chronicle.queue.impl.single;

import net.openhft.chronicle.core.util.ThrowingConsumer;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.wire.DocumentContext;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
Expand All @@ -22,51 +25,73 @@ public class SingleChronicleQueueStoreTest {

@Rule
public TemporaryFolder tmpDir = new TemporaryFolder();
private final AtomicLong clock = new AtomicLong(System.currentTimeMillis());

@Ignore
@Test
public void shouldNotPerformIndexingOnAppendWhenLazyIndexingIsEnabled() throws Exception {
runTest(queue -> {
final ExcerptAppender appender = queue.acquireAppender();
appender.lazyIndexing(true);
final long[] indices = writeMessagesStoreIndices(appender, queue.createTailer());
assertExcerptsAreIndexed(queue, indices, i -> false);
});
}

@Test
public void shouldPerformIndexing() throws Exception {
final AtomicLong clock = new AtomicLong(System.currentTimeMillis());
runTest(queue -> {
final ExcerptAppender appender = queue.acquireAppender();
appender.lazyIndexing(false);
final long[] indices = writeMessagesStoreIndices(appender, queue.createTailer());
assertExcerptsAreIndexed(queue, indices, i -> i % INDEX_SPACING == 0);
});
}

private void runTest(final ThrowingConsumer<SingleChronicleQueue, Exception> testMethod) throws Exception {
try (final SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(tmpDir.newFolder()).
testBlockSize().timeProvider(clock::get).
rollCycle(ROLL_CYCLE).indexSpacing(INDEX_SPACING).
build()) {
final ExcerptAppender appender = queue.acquireAppender();
appender.lazyIndexing(false);
final SingleChronicleQueueStore wireStore = (SingleChronicleQueueStore)
queue.storeForCycle(queue.cycle(), 0L, true);

final long[] indices = new long[RECORD_COUNT];
for (int i = 0; i < RECORD_COUNT; i++) {
try (final DocumentContext ctx = appender.writingDocument()) {
ctx.wire().getValueOut().int32(i);
}
}
testMethod.accept(queue);
}
}

final ExcerptTailer tailer = queue.createTailer();
private static void assertExcerptsAreIndexed(final SingleChronicleQueue queue, final long[] indices,
final Function<Integer, Boolean> shouldBeIndexed) throws Exception {
final Field field = SingleChronicleQueueStore.class.getDeclaredField("recovery");
field.setAccessible(true);
final SingleChronicleQueueStore wireStore = (SingleChronicleQueueStore)
queue.storeForCycle(queue.cycle(), 0L, true);
final TimedStoreRecovery recovery = (TimedStoreRecovery) field.get(wireStore);
final SCQIndexing indexing = wireStore.indexing;
for (int i = 0; i < RECORD_COUNT; i++) {
final int startLinearScanCount = indexing.linearScanCount;
final ScanResult scanResult = indexing.moveToIndex0(recovery, (SingleChronicleQueueExcerpts.StoreTailer) queue.createTailer(), indices[i]);
assertThat(scanResult, is(ScanResult.FOUND));

for (int i = 0; i < RECORD_COUNT; i++) {
try (final DocumentContext ctx = tailer.readingDocument()) {
assertThat("Expected record at index " + i, ctx.isPresent(), is(true));
indices[i] = tailer.index();
}
if (shouldBeIndexed.apply(i)) {
assertThat(indexing.linearScanCount, is(startLinearScanCount));
} else {
assertThat(indexing.linearScanCount, is(startLinearScanCount + 1));
}
final Field field = SingleChronicleQueueStore.class.getDeclaredField("recovery");
field.setAccessible(true);

final TimedStoreRecovery recovery = (TimedStoreRecovery) field.get(wireStore);
final SCQIndexing indexing = wireStore.indexing;
}
}

for (int i = 0; i < RECORD_COUNT; i++) {
final int startLinearScanCount = indexing.linearScanCount;
assertThat(indexing.moveToIndex0(recovery, (SingleChronicleQueueExcerpts.StoreTailer) tailer, indices[i]),
is(ScanResult.FOUND));
private static long[] writeMessagesStoreIndices(final ExcerptAppender appender, final ExcerptTailer tailer) {
final long[] indices = new long[RECORD_COUNT];
for (int i = 0; i < RECORD_COUNT; i++) {
try (final DocumentContext ctx = appender.writingDocument()) {
ctx.wire().getValueOut().int32(i);
}
}

if (i % INDEX_SPACING == 0) {
assertThat(indexing.linearScanCount, is(startLinearScanCount));
} else {
assertThat(indexing.linearScanCount, is(startLinearScanCount + 1));
}
for (int i = 0; i < RECORD_COUNT; i++) {
try (final DocumentContext ctx = tailer.readingDocument()) {
assertThat("Expected record at index " + i, ctx.isPresent(), is(true));
indices[i] = tailer.index();
}
}
return indices;
}
}

0 comments on commit 0fcdabb

Please sign in to comment.