From 6056405a7612147102aca2043dccead4412720a8 Mon Sep 17 00:00:00 2001 From: Peter Lawrwy Date: Fri, 3 Dec 2021 12:25:33 +0000 Subject: [PATCH 1/7] Allow named tailers to be used concurrently to split work #964 --- .../impl/single/SingleChronicleQueue.java | 6 + .../queue/impl/single/StoreTailer.java | 51 ++++- .../queue/ConcurrentNamedTailersTest.java | 211 ++++++++++++++++++ 3 files changed, 262 insertions(+), 6 deletions(-) create mode 100644 src/test/java/net/openhft/chronicle/queue/ConcurrentNamedTailersTest.java diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java index c7d9f46ac2..0058d7c48b 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java @@ -541,6 +541,12 @@ public ExcerptTailer createTailer(String id) { return storeTailer; } + /** + * The last index read for a named tailer. + * + * @param id of the last index + * @return the LongValue reference to this value. + */ @Override @NotNull public LongValue indexForId(@NotNull String id) { diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java b/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java index 3d247ee6c2..06b3ba3e8b 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java @@ -47,7 +47,7 @@ class StoreTailer extends AbstractCloseable private final StoreTailerContext context = new StoreTailerContext(); private final MoveToState moveToState = new MoveToState(); private final Finalizer finalizer; - long index; // index of the next read. + long index, indexChecker; // index of the next read. long lastReadIndex; // index of the last read message @Nullable SingleChronicleQueueStore store; @@ -187,6 +187,12 @@ public String toString() { @NotNull @Override public DocumentContext readingDocument(final boolean includeMetaData) { + return indexValue == null + ? readingDocumentUnnamed(includeMetaData) + : readingDocumentNamed(includeMetaData); + } + + DocumentContext readingDocumentUnnamed(final boolean includeMetaData) { DocumentContext documentContext = readingDocument0(includeMetaData); // this check was added after a strange behaviour seen by one client. I should be impossible. if (documentContext.wire() != null) @@ -195,6 +201,25 @@ public DocumentContext readingDocument(final boolean includeMetaData) { return documentContext; } + DocumentContext readingDocumentNamed(final boolean includeMetaData) { + for (int i = 0; i < 100; i++) { + this.indexChecker = indexValue.getVolatileValue(); + if (this.index != indexChecker) + moveToIndex(this.indexChecker); + + DocumentContext documentContext = readingDocument0(includeMetaData); + + if (indexChecker != Long.MIN_VALUE) { + this.index = indexChecker; + if (context.isPresent() && !context.isMetaData()) + incrementIndex(); + return documentContext; + } + documentContext.close(); + } + throw new AssertionError(); + } + DocumentContext readingDocument0(final boolean includeMetaData) { throwExceptionIfClosed(); @@ -293,12 +318,16 @@ private boolean next0(final boolean includeMetaData) throws StreamCorruptedExcep } else { if (!moveToIndexInternal(firstIndex)) return false; + // had to reset the index. + this.indexChecker = index(); } break; case NOT_REACHED_IN_CYCLE: if (!moveToIndexInternal(index)) return false; + // had to reset the index. + this.indexChecker = index(); break; case FOUND_IN_CYCLE: { @@ -577,7 +606,7 @@ private long nextIndexWithinFoundCycle(final int nextCycle) { */ @Override public long index() { - return indexValue == null ? this.index : indexValue.getValue(); + return context.isPresent() || indexValue == null ? this.index : indexValue.getVolatileValue(); } @Override @@ -588,9 +617,18 @@ public int cycle() { @Override public boolean moveToIndex(final long index) { throwExceptionIfClosed(); + if (moveToIndex0(index)) { + if (indexValue != null) + indexValue.setOrderedValue(index); + return true; + } + return false; + } + boolean moveToIndex0(final long index) { if (moveToState.canReuseLastIndexMove(index, state, direction, queue, privateWire())) { return setAddress(true); + } else if (moveToState.indexIsCloseToAndAheadOfLastIndexMove(index, state, direction, queue)) { final long knownIndex = moveToState.lastMovedToIndex; final boolean found = @@ -1083,10 +1121,11 @@ private boolean tryWindBack(final int cycle) { } void index0(final long index) { - if (indexValue == null) + if (indexValue == null) { this.index = index; - else - indexValue.setValue(index); + } else if (!indexValue.compareAndSwapValue(this.indexChecker, index)) { + this.indexChecker = Long.MIN_VALUE; // invalid. + } } // DON'T INLINE THIS METHOD, as it's used by enterprise chronicle queue @@ -1327,7 +1366,7 @@ public void close() { if (rollbackIfNeeded()) return; - if (isPresent() && !isMetaData()) + if (isPresent() && !isMetaData() && indexValue == null) incrementIndex(); super.close(); diff --git a/src/test/java/net/openhft/chronicle/queue/ConcurrentNamedTailersTest.java b/src/test/java/net/openhft/chronicle/queue/ConcurrentNamedTailersTest.java new file mode 100644 index 0000000000..25f86de357 --- /dev/null +++ b/src/test/java/net/openhft/chronicle/queue/ConcurrentNamedTailersTest.java @@ -0,0 +1,211 @@ +package net.openhft.chronicle.queue; + +import net.openhft.chronicle.bytes.BytesStore; +import net.openhft.chronicle.bytes.ref.LongReference; +import net.openhft.chronicle.core.Jvm; +import net.openhft.chronicle.core.OS; +import net.openhft.chronicle.core.io.IOTools; +import net.openhft.chronicle.core.time.SetTimeProvider; +import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder; +import net.openhft.chronicle.wire.DocumentContext; +import org.jetbrains.annotations.Nullable; +import org.junit.Test; + +import java.io.File; +import java.nio.BufferOverflowException; +import java.nio.BufferUnderflowException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class ConcurrentNamedTailersTest { + @Test + public void concurrentNamedTailers() { + File tmpDir = new File(OS.getTarget(), IOTools.tempName("concurrentNamedTailers")); + + final SetTimeProvider timeProvider = new SetTimeProvider("2021/12/03T12:34:56").advanceMillis(1000); + final String tailerName = "named"; + try (ChronicleQueue q = SingleChronicleQueueBuilder.single(tmpDir).testBlockSize().timeProvider(timeProvider).build(); + final ExcerptAppender appender = q.acquireAppender(); + final ExcerptTailer tailer0 = q.createTailer(tailerName); + final ExcerptTailer tailer1 = q.createTailer(tailerName); + final ExcerptTailer tailer2 = q.createTailer(tailerName)) { + + final Tasker tasker = appender.methodWriter(Tasker.class); + for (int i = 0; i < 20; i++) + tasker.task(i); + + assertEquals(0x0, tailer0.index()); + assertEquals(0x0, tailer1.index()); + assertEquals(0x0, tailer2.index()); + + try (DocumentContext dc0 = tailer0.readingDocument()) { + assertEquals(0x4a1400000000L, tailer0.index()); + + try (DocumentContext dc1 = tailer1.readingDocument()) { + assertEquals(0x4a1400000001L, tailer1.index()); + assertEquals(0x4a1400000000L, tailer0.index()); + + try (DocumentContext dc2 = tailer2.readingDocument()) { + assertEquals(0x4a1400000002L, tailer2.index()); + assertEquals(0x4a1400000001L, tailer1.index()); + assertEquals(0x4a1400000000L, tailer0.index()); + } + } + } + + try (DocumentContext dc0 = tailer0.readingDocument()) { + assertEquals(0x4a1400000003L, tailer0.index()); + + assertTrue(tailer2.moveToIndex(0x4a140000000AL)); + + try (DocumentContext dc1 = tailer1.readingDocument()) { + assertEquals(0x4a140000000AL, tailer1.index()); + assertEquals(0x4a1400000003L, tailer0.index()); + + try (DocumentContext dc2 = tailer2.readingDocument()) { + assertEquals(0x4a140000000BL, tailer2.index()); + assertEquals(0x4a140000000AL, tailer1.index()); + assertEquals(0x4a1400000003L, tailer0.index()); + } + } + } + + IOTools.deleteDirWithFiles(tmpDir); + } + } + + @Test + public void raceConditions() throws IllegalAccessException { + File tmpDir = new File(OS.getTarget(), IOTools.tempName("raceConditions")); + + final SetTimeProvider timeProvider = new SetTimeProvider("2021/12/03T12:34:56").advanceMillis(1000); + final String tailerName = "named"; + try (ChronicleQueue q = SingleChronicleQueueBuilder.single(tmpDir).testBlockSize().timeProvider(timeProvider).build(); + final ExcerptAppender appender = q.acquireAppender(); + final ExcerptTailer tailer0 = q.createTailer(tailerName)) { + + final Tasker tasker = appender.methodWriter(Tasker.class); + for (int i = 0; i < 20; i++) + tasker.task(i); + + DummyLongReference indexValue = new DummyLongReference(); + Jvm.getField(tailer0.getClass(), "indexValue") + .set(tailer0, indexValue); + + indexValue.getValues.add(0x4a1100000000L); + + assertEquals(0x4a1100000000L, tailer0.index()); + + assertEquals(0, indexValue.setValues.size()); + + indexValue.getValues.add(0x4a1100000000L); + indexValue.getValues.add(0x4a1200000000L); + // pretend another tailer came in + indexValue.getValues.add(0x4a1400000001L); + indexValue.getValues.add(0x4a1400000001L); + indexValue.getValues.add(0x4a1400000001L); + + + try (DocumentContext dc0 = tailer0.readingDocument()) { + assertEquals(0x4a1400000001L, tailer0.index()); + } + + // changed before read + indexValue.getValues.add(0x4a1400000003L); + indexValue.getValues.add(0x4a1400000003L); + + // changed during read + indexValue.getValues.add(0x4a1400000005L); + // changed during read again + indexValue.getValues.add(0x4a1400000007L); + // stable + indexValue.getValues.add(0x4a1400000007L); + indexValue.getValues.add(0x4a1400000007L); + + try (DocumentContext dc0 = tailer0.readingDocument()) { + assertEquals(0x4a1400000007L, tailer0.index()); + } + assertEquals("[4a1400000002, 4a1400000003, 4a1400000007, 4a1400000007, 4a1400000008]", + indexValue.setValues.stream().map(Long::toHexString).collect(Collectors.toList()).toString()); + + IOTools.deleteDirWithFiles(tmpDir); + } + } + + interface Tasker { + void task(int taskId); + } + + static class DummyLongReference implements LongReference { + List getValues = new ArrayList<>(); + List setValues = new ArrayList<>(); + + @Override + public void bytesStore(BytesStore bytesStore, long offset, long length) throws IllegalStateException, IllegalArgumentException, BufferOverflowException, BufferUnderflowException { + throw new AssertionError(); + } + + @Override + public @Nullable BytesStore bytesStore() { + throw new AssertionError(); + } + + @Override + public long offset() { + return 0; + } + + @Override + public long maxSize() { + return 0; + } + + @Override + public long getValue() throws IllegalStateException { + return getValues.remove(0); + } + + @Override + public void setValue(long value) throws IllegalStateException { + setValues.add(value); + } + + @Override + public long getVolatileValue() throws IllegalStateException { + return getValue(); + } + + @Override + public void setVolatileValue(long value) throws IllegalStateException { + setValue(value); + } + + @Override + public void setOrderedValue(long value) throws IllegalStateException { + setValue(value); + } + + @Override + public long addValue(long delta) throws IllegalStateException { + throw new AssertionError(); + } + + @Override + public long addAtomicValue(long delta) throws IllegalStateException { + throw new AssertionError(); + } + + @Override + public boolean compareAndSwapValue(long expected, long value) throws IllegalStateException { + if (getValue() == expected) { + setValue(value); + return true; + } + return false; + } + } +} From 1f9bf9662acc2ff56f53f2f48a49176c01dd2d3a Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 6 Dec 2021 15:53:46 +1100 Subject: [PATCH 2/7] ConcurrentNamedTailersTest: Attempt at writing concurrent threads test --- .../queue/ConcurrentNamedTailersTest.java | 92 +++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/src/test/java/net/openhft/chronicle/queue/ConcurrentNamedTailersTest.java b/src/test/java/net/openhft/chronicle/queue/ConcurrentNamedTailersTest.java index 25f86de357..c7aa410a39 100644 --- a/src/test/java/net/openhft/chronicle/queue/ConcurrentNamedTailersTest.java +++ b/src/test/java/net/openhft/chronicle/queue/ConcurrentNamedTailersTest.java @@ -7,6 +7,8 @@ import net.openhft.chronicle.core.io.IOTools; import net.openhft.chronicle.core.time.SetTimeProvider; import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder; +import net.openhft.chronicle.threads.Pauser; +import net.openhft.chronicle.threads.TimingPauser; import net.openhft.chronicle.wire.DocumentContext; import org.jetbrains.annotations.Nullable; import org.junit.Test; @@ -16,7 +18,10 @@ import java.nio.BufferUnderflowException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -136,6 +141,93 @@ public void raceConditions() throws IllegalAccessException { } } + @Test + public void concurrency() { + File tmpDir = new File(OS.getTarget(), IOTools.tempName("concurrency")); + + final String tailerName = "named"; + final int numberOfEntries = 1_000_000; + + // populate the queue + try (ChronicleQueue q = SingleChronicleQueueBuilder.single(tmpDir).testBlockSize().rollCycle(RollCycles.TEST_SECONDLY).build(); + final ExcerptAppender appender = q.acquireAppender()) { + for (int i = 0; i < numberOfEntries; i++) { + try (final DocumentContext documentContext = appender.writingDocument()) { + documentContext.wire().write("index").int64(i); + } + } + } + Jvm.startup().on(ConcurrentNamedTailersTest.class, "Wrote " + numberOfEntries + " entries"); + + final int numReaders = 6; + final AtomicLong counter = new AtomicLong(-1); + final CyclicBarrier cyclicBarrier = new CyclicBarrier(numReaders); + final ExecutorService executorService = Executors.newFixedThreadPool(numReaders); + final List> futures = IntStream.range(0, numReaders) + .mapToObj(i -> executorService.submit(new CompetingConsumer(tmpDir, tailerName, numberOfEntries, counter, cyclicBarrier))) + .collect(Collectors.toList()); + futures.forEach(future -> { + try { + future.get(); + } catch (Exception e) { + throw new AssertionError("Error in Consumer", e); + } + }); + + IOTools.deleteDirWithFiles(tmpDir); + } + + private static class CompetingConsumer implements Runnable { + + private static final int LOG_EVERY = 10_000; + + private final File queueDir; + private final String tailerName; + private final long highestIndex; + private final AtomicLong counter; + private final CyclicBarrier barrier; + + public CompetingConsumer(File queueDir, String tailerName, long numberOfEntries, AtomicLong counter, CyclicBarrier barrier) { + this.queueDir = queueDir; + this.tailerName = tailerName; + this.highestIndex = numberOfEntries - 1; + this.counter = counter; + this.barrier = barrier; + } + + @Override + public void run() { + try (ChronicleQueue q = SingleChronicleQueueBuilder.single(queueDir).testBlockSize().rollCycle(RollCycles.TEST_SECONDLY).build(); + final ExcerptTailer namedTailer = q.createTailer(tailerName)) { + + try { + Jvm.startup().on(ConcurrentNamedTailersTest.class, "Waiting at barrier"); + barrier.await(10, TimeUnit.SECONDS); + } catch (Exception e) { + throw new AssertionError("Failed waiting at barrier", e); + } + TimingPauser pauser = Pauser.balanced(); + while (counter.get() < highestIndex) { + try (final DocumentContext documentContext = namedTailer.readingDocument()) { + final long index = documentContext.wire().read("index").int64(); + if (index % LOG_EVERY == 0) { + Jvm.startup().on(ConcurrentNamedTailersTest.class, "Read index " + index); + } + while (!counter.compareAndSet(index - 1, index)) { + try { + pauser.pause(1, TimeUnit.SECONDS); + } catch (TimeoutException e) { + throw new AssertionError("Timed out trying to write " + index + " current value is " + counter.get()); + } + } + pauser.reset(); + } + } + } + } + } + + interface Tasker { void task(int taskId); } From edc54a4b7db41afeec87ce351458ffa7831566e1 Mon Sep 17 00:00:00 2001 From: Peter Lawrey Date: Wed, 15 Dec 2021 11:22:18 +0000 Subject: [PATCH 3/7] Add more tests for named tailers #973 --- .../queue/MultipleNamedTailersTest.java | 44 +++++ .../queue/impl/single/MessageHistoryTest.java | 28 ++- .../queue/impl/single/RollingCycleTest.java | 26 ++- .../impl/single/SingleChronicleQueueTest.java | 179 +++++++++--------- 4 files changed, 180 insertions(+), 97 deletions(-) create mode 100644 src/test/java/net/openhft/chronicle/queue/MultipleNamedTailersTest.java diff --git a/src/test/java/net/openhft/chronicle/queue/MultipleNamedTailersTest.java b/src/test/java/net/openhft/chronicle/queue/MultipleNamedTailersTest.java new file mode 100644 index 0000000000..898a263fff --- /dev/null +++ b/src/test/java/net/openhft/chronicle/queue/MultipleNamedTailersTest.java @@ -0,0 +1,44 @@ +package net.openhft.chronicle.queue; + +import net.openhft.chronicle.core.OS; +import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder; +import net.openhft.chronicle.wire.DocumentContext; +import org.junit.Test; + +import java.io.File; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class MultipleNamedTailersTest { + @Test + public void multipleTailers() { + File tmpDir = new File(OS.getTarget(), "multipleTailers" + System.nanoTime()); + + try (ChronicleQueue q1 = SingleChronicleQueueBuilder.single(tmpDir).testBlockSize().rollCycle(RollCycles.TEST_SECONDLY).build(); + ExcerptAppender appender = q1.acquireAppender(); + ExcerptTailer tailer1 = q1.createTailer(); + ExcerptTailer namedTailer1 = q1.createTailer("named1"); + ChronicleQueue q2 = SingleChronicleQueueBuilder.single(tmpDir).testBlockSize().build(); + ExcerptTailer tailer2 = q2.createTailer(); + ExcerptTailer namedTailer2 = q2.createTailer("named2")) { + for (int i = 0; i < 1_000_000; i++) { + final String id0 = "" + i; + appender.writeText(id0); + final long index0 = appender.lastIndexAppended(); + check(tailer1, id0, index0); + check(namedTailer1, id0, index0); + check(tailer2, id0, index0); + check(namedTailer2, id0, index0); + } + } + } + + private void check(ExcerptTailer tailer1, String id0, long index0) { + try (DocumentContext dc = tailer1.readingDocument()) { + assertTrue(dc.isPresent()); + assertEquals(index0, tailer1.index()); + assertEquals(id0, dc.wire().getValueIn().text()); + } + } +} diff --git a/src/test/java/net/openhft/chronicle/queue/impl/single/MessageHistoryTest.java b/src/test/java/net/openhft/chronicle/queue/impl/single/MessageHistoryTest.java index def5841486..dc312b1fb6 100644 --- a/src/test/java/net/openhft/chronicle/queue/impl/single/MessageHistoryTest.java +++ b/src/test/java/net/openhft/chronicle/queue/impl/single/MessageHistoryTest.java @@ -11,13 +11,18 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.File; +import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import static org.junit.Assert.*; +@RunWith(Parameterized.class) public final class MessageHistoryTest extends ChronicleQueueTestBase { @Rule public final TestName testName = new TestName(); @@ -25,6 +30,19 @@ public final class MessageHistoryTest extends ChronicleQueueTestBase { private File inputQueueDir; private File middleQueueDir; private File outputQueueDir; + protected final boolean named; + + public MessageHistoryTest(boolean named) { + this.named = named; + } + + @Parameterized.Parameters(name = "named={0}") + public static Collection data() { + return Arrays.asList( + new Object[]{true}, + new Object[]{false} + ); + } @Before public void setUp() { @@ -42,7 +60,7 @@ public void shouldAccessMessageHistory() { final ChronicleQueue outputQueue = createQueue(outputQueueDir, 2)) { generateTestData(inputQueue, outputQueue); - final ExcerptTailer tailer = outputQueue.createTailer(); + final ExcerptTailer tailer = outputQueue.createTailer(named ? "named" : null); final ValidatingSecond validatingSecond = new ValidatingSecond(); final MethodReader validator = tailer.methodReader(validatingSecond); @@ -58,7 +76,7 @@ public void shouldAccessMessageHistoryWhenTailerIsMovedToEnd() { final ChronicleQueue outputQueue = createQueue(outputQueueDir, 2)) { generateTestData(inputQueue, outputQueue); - final ExcerptTailer tailer = outputQueue.createTailer(); + final ExcerptTailer tailer = outputQueue.createTailer(named ? "named" : null); tailer.direction(TailerDirection.BACKWARD).toEnd(); final ValidatingSecond validatingSecond = new ValidatingSecond(); @@ -76,10 +94,10 @@ public void chainedMessageHistory() { final ChronicleQueue outputQueue = createQueue(middleQueueDir, 2)) { generateTestData(inputQueue, middleQueue); - MethodReader reader = middleQueue.createTailer().methodReader(outputQueue.methodWriter(First.class)); + MethodReader reader = middleQueue.createTailer(named ? "named" : null).methodReader(outputQueue.methodWriter(First.class)); for (int i = 0; i < 3; i++) assertTrue(reader.readOne()); - MethodReader reader2 = outputQueue.createTailer().methodReader((First) this::say3); + MethodReader reader2 = outputQueue.createTailer(named ? "named2" : null).methodReader((First) this::say3); for (int i = 0; i < 3; i++) assertTrue(reader2.readOne()); } @@ -103,7 +121,7 @@ private void generateTestData(final ChronicleQueue inputQueue, final ChronicleQu new LoggingFirst(outputQueue.acquireAppender(). methodWriterBuilder(Second.class).build()); - final MethodReader reader = inputQueue.createTailer(). + final MethodReader reader = inputQueue.createTailer(named ? "named" : null). methodReaderBuilder().build(loggingFirst); assertTrue(reader.readOne()); diff --git a/src/test/java/net/openhft/chronicle/queue/impl/single/RollingCycleTest.java b/src/test/java/net/openhft/chronicle/queue/impl/single/RollingCycleTest.java index c983343b86..30d1a7d016 100644 --- a/src/test/java/net/openhft/chronicle/queue/impl/single/RollingCycleTest.java +++ b/src/test/java/net/openhft/chronicle/queue/impl/single/RollingCycleTest.java @@ -31,14 +31,32 @@ import org.jetbrains.annotations.Nullable; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import java.util.Arrays; +import java.util.Collection; import java.util.Random; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +@RunWith(Parameterized.class) public class RollingCycleTest extends QueueTestCommon { + protected final boolean named; + + public RollingCycleTest(boolean named) { + this.named = named; + } + + @Parameterized.Parameters(name = "named={0}") + public static Collection data() { + return Arrays.asList( + new Object[]{true}, + new Object[]{false} + ); + } @BeforeClass public static void sync() { @@ -216,13 +234,13 @@ public void testRollCycle() { "00000210 fc cc 5e cc da ··^·· \n" + "...\n"; - // System.out.println("Wrote: " + numWritten + " messages"); + // System.out.println("Wrote: " + numWritten + " messages"); long numRead = 0; final TestBytesMarshallable reusableData = new TestBytesMarshallable(0); - final ExcerptTailer currentPosTailer = queue.createTailer() + final ExcerptTailer currentPosTailer = queue.createTailer(named ? "named" : null) .toStart(); - final ExcerptTailer endPosTailer = queue.createTailer().toEnd(); + final ExcerptTailer endPosTailer = queue.createTailer(named ? "named2" : null).toEnd(); while (currentPosTailer.index() < endPosTailer.index()) { try { assertTrue(currentPosTailer.readBytes(reusableData)); @@ -239,7 +257,7 @@ public void testRollCycle() { } assertFalse(currentPosTailer.readBytes(reusableData)); - // System.out.println("Wrote " + numWritten + " Read " + numRead); + // System.out.println("Wrote " + numWritten + " Read " + numRead); String dump = queue.dump(); assertTrue(dump.contains(expectedEagerFile1)); diff --git a/src/test/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueTest.java b/src/test/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueTest.java index 13857905f5..cad0e9e927 100644 --- a/src/test/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueTest.java +++ b/src/test/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueTest.java @@ -72,7 +72,7 @@ public class SingleChronicleQueueTest extends ChronicleQueueTestBase { private static final long TIMES = (4L << 20L); @NotNull protected final WireType wireType; - protected final boolean encryption; + protected final boolean named; // ************************************************************************* // @@ -80,19 +80,20 @@ public class SingleChronicleQueueTest extends ChronicleQueueTestBase { // // ************************************************************************* - public SingleChronicleQueueTest(@NotNull WireType wireType, boolean encryption) { + public SingleChronicleQueueTest(@NotNull WireType wireType, boolean named) { this.wireType = wireType; - this.encryption = encryption; + this.named = named; } - @Parameters(name = "wireType={0}, encrypted={1}") + @Parameters(name = "wireType={0}, named={1}") public static Collection data() { return Arrays.asList( // {WireType.TEXT}, + new Object[]{WireType.BINARY_LIGHT, true}, new Object[]{WireType.BINARY, false}, new Object[]{WireType.BINARY_LIGHT, false}, new Object[]{WireType.COMPRESSED_BINARY, false} - // {WireType.DELTA_BINARY} - // {WireType.FIELDLESS_BINARY} + // {WireType.DELTA_BINARY} + // {WireType.FIELDLESS_BINARY} ); } @@ -167,7 +168,7 @@ public void testTextReadWrite() { builder(tmpDir, wireType) .build()) { queue.acquireAppender().writeText("hello world"); - assertEquals("hello world", queue.createTailer().readText()); + assertEquals("hello world", queue.createTailer(named ? "named" : null).readText()); } } @@ -209,7 +210,7 @@ public void testRollbackOnAppend() { } - ExcerptTailer tailer = queue.createTailer(); + ExcerptTailer tailer = queue.createTailer(named ? "named" : null); try (DocumentContext dc = tailer.readingDocument()) { dc.wire().read("hello"); @@ -253,7 +254,7 @@ public void testWriteWithDocumentReadBytesDifferentThreads() throws InterruptedE new NamedThreadFactory("service2")); service2.scheduleAtFixedRate(() -> { Bytes b = Bytes.allocateElasticOnHeap(128); - final ExcerptTailer tailer = queue.createTailer(); + final ExcerptTailer tailer = queue.createTailer(named ? "named" : null); tailer.readBytes(b); if (b.readRemaining() == 0) return; @@ -398,7 +399,7 @@ public void testReadWriteHourly() { try (final ChronicleQueue qTailer = builder(tmpDir, wireType).rollCycle(HOURLY).build()) { - try (DocumentContext documentContext2 = qTailer.createTailer().readingDocument()) { + try (DocumentContext documentContext2 = qTailer.createTailer(named ? "named" : null).readingDocument()) { String str = documentContext2.wire().read("somekey").text(); assertEquals("somevalue", str); } @@ -407,7 +408,7 @@ public void testReadWriteHourly() { @Test public void testMetaIndexTest() { - + assumeFalse(named); File tmpDir = getTmpDir(); try (final ChronicleQueue q = builder(tmpDir, wireType).rollCycle(HOURLY).build()) { { @@ -438,7 +439,7 @@ public void testMetaIndexTest() { } { - ExcerptTailer tailer = q.createTailer(); + ExcerptTailer tailer = q.createTailer(named ? "named" : null); try (DocumentContext documentContext2 = tailer.readingDocument()) { assertEquals(0, toSeq(q, documentContext2.index())); @@ -484,7 +485,7 @@ public void testMetaIndexTest() { } { - ExcerptTailer tailer = q.createTailer(); + ExcerptTailer tailer = q.createTailer(named ? "named" : null); try (DocumentContext documentContext2 = tailer.readingDocument()) { assertEquals(0, toSeq(q, documentContext2.index())); @@ -542,7 +543,7 @@ public void testLastWritten() throws InterruptedException { // read a message on the in queue and write it to the out queue { SCQMsg out = outQueue.acquireAppender().methodWriterBuilder(SCQMsg.class).get(); - MethodReader methodReader = inQueue.createTailer().methodReader((SCQMsg) out::msg); + MethodReader methodReader = inQueue.createTailer(named ? "named" : null).methodReader((SCQMsg) out::msg); // reads the somedata-0 methodReader.readOne(); @@ -573,7 +574,7 @@ public void testLastWritten() throws InterruptedException { // check that we are able to pick up from where we left off, in other words the next read should be somedata-2 { - ExcerptTailer excerptTailer = inQueue.createTailer().afterLastWritten(outQueue); + ExcerptTailer excerptTailer = inQueue.createTailer(named ? "named" : null).afterLastWritten(outQueue); MethodReader methodReader = excerptTailer.methodReader((SCQMsg) actualValue::set); methodReader.readOne(); @@ -612,7 +613,7 @@ public void shouldAllowDirectoryToBeDeletedWhenQueueIsClosed() throws IOExceptio try (final DocumentContext dc = queue.acquireAppender().writingDocument()) { dc.wire().write().text("foo"); } - try (final DocumentContext dc = queue.createTailer().readingDocument()) { + try (final DocumentContext dc = queue.createTailer(named ? "named" : null).readingDocument()) { assertEquals("foo", dc.wire().read().text()); } } @@ -639,7 +640,7 @@ public void testReadingLessBytesThanWritten() { appender.writeBytes(expected); } - final ExcerptTailer tailer = queue.createTailer(); + final ExcerptTailer tailer = queue.createTailer(named ? "named" : null); // Sequential read for (int i = 0; i < 10; i++) { @@ -668,7 +669,7 @@ public void testAppendAndRead() { assertEquals(n, queue.rollCycle().toSequenceNumber(appender.lastIndexAppended())); } - final ExcerptTailer tailer = queue.createTailer(); + final ExcerptTailer tailer = queue.createTailer(named ? "named" : null); // Sequential read for (int i = 0; i < 10; i++) { @@ -697,7 +698,7 @@ public void testReadAndAppend() throws InterruptedException { Thread t = new Thread(() -> { try { started.countDown(); - final ExcerptTailer tailer = queue.createTailer(); + final ExcerptTailer tailer = queue.createTailer(named ? "named" : null); for (int i = 0; i < 2; ) { boolean read = tailer.readDocument(r -> { int result = r.read(TestKey.test).int32(); @@ -801,7 +802,7 @@ void doTestCheckIndex(@NotNull BiConsumer writeTo) { .build()) { final ExcerptAppender appender = queue.acquireAppender(); - ExcerptTailer tailer = queue.createTailer(); + ExcerptTailer tailer = queue.createTailer(named ? "named" : null); int cycle = appender.cycle(); for (int i = 0; i <= 5; i++) { final int n = i; @@ -848,9 +849,8 @@ public void testAppendAndReadWithRollingB() { --- !!not-ready-meta-data! #binary ... */ - assumeFalse(encryption); assumeFalse(wireType == WireType.DEFAULT_ZERO_BINARY); - final ExcerptTailer tailer = queue.createTailer().toStart(); + final ExcerptTailer tailer = queue.createTailer(named ? "named" : null).toStart(); for (int i = 0; i < 6; i++) { final int n = i; boolean condition = tailer.readDocument(r -> assertEquals(n, @@ -880,7 +880,7 @@ public void testAppendAndReadAtIndex() { assertEquals(i, queue.rollCycle().toSequenceNumber(appender.lastIndexAppended())); } - final ExcerptTailer tailer = queue.createTailer(); + final ExcerptTailer tailer = queue.createTailer(named ? "named" : null); for (int i = 0; i < 5; i++) { final long index = queue.rollCycle().toIndex(appender.cycle(), i); assertTrue(tailer.moveToIndex(index)); @@ -907,7 +907,7 @@ public void testSimpleWire() { StringBuilder first = new StringBuilder(); StringBuilder surname = new StringBuilder(); - final ExcerptTailer tailer = chronicle.createTailer(); + final ExcerptTailer tailer = chronicle.createTailer(named ? "named" : null); tailer.readDocument(wire -> wire.read("FirstName").text(first)); tailer.readDocument(wire -> wire.read("Surname").text(surname)); @@ -949,7 +949,7 @@ public void testReadingWritingMarshallableDocument() { dc.wire().write("myMarshable").typedMarshallable(myMarshable); } - ExcerptTailer tailer = chronicle.createTailer(); + ExcerptTailer tailer = chronicle.createTailer(named ? "named" : null); try (DocumentContext dc = tailer.readingDocument()) { @@ -960,6 +960,7 @@ public void testReadingWritingMarshallableDocument() { @Test public void testMetaData() { + assumeFalse(named); try (final ChronicleQueue chronicle = builder(getTmpDir(), this.wireType) .build()) { @@ -977,7 +978,7 @@ public void testMetaData() { dc.wire().write("FirstName").text("Steve"); } - final ExcerptTailer tailer = chronicle.createTailer(); + final ExcerptTailer tailer = chronicle.createTailer(named ? "named" : null); StringBuilder event = new StringBuilder(); while (true) { @@ -1032,7 +1033,7 @@ public void testReadingSecondDocumentNotExist() { dc.wire().write("FirstName").text("Quartilla"); } - final ExcerptTailer tailer = chronicle.createTailer(); + final ExcerptTailer tailer = chronicle.createTailer(named ? "named" : null); try (DocumentContext dc = tailer.readingDocument()) { String text = dc.wire().read("FirstName").text(); @@ -1068,7 +1069,7 @@ public void testDocumentIndexTest() { dc.wire().write("FirstName").text("Rob"); } - ExcerptTailer tailer = chronicle.createTailer(); + ExcerptTailer tailer = chronicle.createTailer(named ? "named" : null); try (DocumentContext dc = tailer.readingDocument()) { long index = dc.index(); @@ -1100,7 +1101,7 @@ public void testReadingSecondDocumentNotExistIncludingMeta() { dc.wire().write("FirstName").text("Quartilla"); } - final ExcerptTailer tailer = chronicle.createTailer(); + final ExcerptTailer tailer = chronicle.createTailer(named ? "named" : null); StringBuilder event = new StringBuilder(); while (true) { try (DocumentContext dc = tailer.readingDocument(true)) { @@ -1132,7 +1133,7 @@ public void testSimpleByteTest() { Bytes jobs = Bytes.allocateDirect("Jobs".getBytes()); appender.writeBytes(jobs); - final ExcerptTailer tailer = chronicle.createTailer(); + final ExcerptTailer tailer = chronicle.createTailer(named ? "named" : null); Bytes bytes = Bytes.elasticByteBuffer(); try { tailer.readBytes(bytes); @@ -1168,7 +1169,7 @@ public void testReadAtIndex() { final int cycle = queue.rollCycle().toCycle(lastIndex); assertEquals(queue.firstCycle(), cycle); assertEquals(queue.lastCycle(), cycle); - final ExcerptTailer tailer = queue.createTailer(); + final ExcerptTailer tailer = queue.createTailer(named ? "named" : null); StringBuilder sb = new StringBuilder(); @@ -1206,7 +1207,7 @@ public void testReadAtIndex4MB() { final int cycle = queue.rollCycle().toCycle(lastIndex); - final ExcerptTailer tailer = queue.createTailer(); + final ExcerptTailer tailer = queue.createTailer(named ? "named" : null); // QueueDumpMain.dump(file, new PrintWriter(System.out)); @@ -1267,7 +1268,7 @@ public void testHeaderIndexReadAtIndex() { appender.writeDocument(wire -> wire.write("key").text("value=" + j)); } - final ExcerptTailer tailer = queue.createTailer(); + final ExcerptTailer tailer = queue.createTailer(named ? "named" : null); assertTrue(tailer.moveToIndex(queue.rollCycle().toIndex(cycle, 0))); StringBuilder sb = new StringBuilder(); @@ -1306,7 +1307,7 @@ public void shouldBeAbleToReadFromQueueWithNonZeroEpoch() { appender.writeDocument(wire -> wire.write("key").text("value=v")); assertEquals(0, appender.cycle()); - final ExcerptTailer excerptTailer = chronicle.createTailer().toStart(); + final ExcerptTailer excerptTailer = chronicle.createTailer(named ? "named" : null).toStart(); assertTrue(excerptTailer.readingDocument().isPresent()); } } @@ -1322,7 +1323,7 @@ public void shouldHandleLargeEpoch() { final ExcerptAppender appender = chronicle.acquireAppender(); appender.writeDocument(wire -> wire.write("key").text("value=v")); - final ExcerptTailer excerptTailer = chronicle.createTailer().toStart(); + final ExcerptTailer excerptTailer = chronicle.createTailer(named ? "named" : null).toStart(); assertTrue(excerptTailer.readingDocument().isPresent()); } } @@ -1336,7 +1337,7 @@ public void testNegativeEPOC() { final ExcerptAppender appender = chronicle.acquireAppender(); appender.writeDocument(wire -> wire.write("key").text("value=v")); - chronicle.createTailer() + chronicle.createTailer(named ? "named" : null) .readDocument(wire -> { assertEquals("value=v", wire.read("key").text()); }); @@ -1363,7 +1364,7 @@ public void testIndex() { } } - final ExcerptTailer tailer = queue.createTailer(); + final ExcerptTailer tailer = queue.createTailer(named ? "named" : null); assertTrue(tailer.moveToIndex(queue.rollCycle().toIndex(cycle, 2))); StringBuilder sb = new StringBuilder(); @@ -1397,7 +1398,7 @@ public void testReadingDocument() { } } - final ExcerptTailer tailer = queue.createTailer(); + final ExcerptTailer tailer = queue.createTailer(named ? "named" : null); final StringBuilder sb = Wires.acquireStringBuilder(); @@ -1464,7 +1465,7 @@ public void testReadingDocumentWithFirstAMove() { } } - final ExcerptTailer tailer = queue.createTailer(); + final ExcerptTailer tailer = queue.createTailer(named ? "named" : null); assertTrue(tailer.moveToIndex(queue.rollCycle().toIndex(cycle, 2))); final StringBuilder sb = Wires.acquireStringBuilder(); @@ -1544,7 +1545,7 @@ private void doTestEpochMove(long epoch, RollCycle rollCycle) { } } - final ExcerptTailer tailer = queue.createTailer(); + final ExcerptTailer tailer = queue.createTailer(named ? "named" : null); assertTrue(tailer.moveToIndex(queue.rollCycle().toIndex(cycle, 2))); final StringBuilder sb = Wires.acquireStringBuilder(); @@ -1587,7 +1588,7 @@ public void testAppendedBeforeToEnd() { ChronicleQueue chronicle2 = builder(dir, this.wireType) .rollCycle(RollCycles.TEST_SECONDLY) .build()) { - ExcerptTailer tailer = chronicle.createTailer(); + ExcerptTailer tailer = chronicle.createTailer(named ? "named" : null); ExcerptAppender append = chronicle2.acquireAppender(); append.writeDocument(w -> w.write("test").text("text")); @@ -1690,7 +1691,7 @@ public void testToEnd() throws InterruptedException { try (ChronicleQueue queue = builder(dir, wireType) .rollCycle(RollCycles.HOURLY) .build()) { - ExcerptTailer tailer = queue.createTailer(); + ExcerptTailer tailer = queue.createTailer(named ? "named" : null); // move to the end even though it doesn't exist yet. tailer.toEnd(); @@ -1727,7 +1728,7 @@ public void testToEnd2() { ExcerptAppender append = chronicle2.acquireAppender(); append.writeDocument(w -> w.write("test").text("before text")); - ExcerptTailer tailer = chronicle.createTailer(); + ExcerptTailer tailer = chronicle.createTailer(named ? "named" : null); // move to the end even though it doesn't exist yet. tailer.toEnd(); @@ -1750,7 +1751,7 @@ public void testToEndOnDeletedQueueFiles() throws IOException { ExcerptAppender append = q.acquireAppender(); append.writeDocument(w -> w.write("test").text("before text")); - ExcerptTailer tailer = q.createTailer(); + ExcerptTailer tailer = q.createTailer(named ? "named" : null); // move to the end even though it doesn't exist yet. tailer.toEnd(); @@ -1763,7 +1764,7 @@ public void testToEndOnDeletedQueueFiles() throws IOException { .forEach(path -> assertTrue(path.toFile().delete())); try (ChronicleQueue q2 = builder(dir, wireType).build()) { - tailer = q2.createTailer(); + tailer = q2.createTailer(named ? "named" : null); tailer.toEnd(); assertEquals(TailerState.UNINITIALISED, tailer.state()); append = q2.acquireAppender(); @@ -1793,10 +1794,10 @@ public void testReadWrite() { .text("text")); } - ExcerptTailer tailer = chronicle.createTailer(); - ExcerptTailer tailer2 = chronicle.createTailer(); - ExcerptTailer tailer3 = chronicle.createTailer(); - ExcerptTailer tailer4 = chronicle.createTailer(); + ExcerptTailer tailer = chronicle.createTailer(named ? "named" : null); + ExcerptTailer tailer2 = chronicle.createTailer(named ? "named" : null); + ExcerptTailer tailer3 = chronicle.createTailer(named ? "named" : null); + ExcerptTailer tailer4 = chronicle.createTailer(named ? "named" : null); for (int i = 0; i < runs; i++) { if (i % 10000 == 0) System.gc(); @@ -1817,7 +1818,7 @@ public void testReadingDocumentForEmptyQueue() { try (ChronicleQueue chronicle = builder(dir, this.wireType) .rollCycle(RollCycles.HOURLY) .build()) { - ExcerptTailer tailer = chronicle.createTailer(); + ExcerptTailer tailer = chronicle.createTailer(named ? "named" : null); // DocumentContext is empty as we have no queue and don't know what the wire type will be. try (DocumentContext dc = tailer.readingDocument()) { assertFalse(dc.isPresent()); @@ -1843,6 +1844,7 @@ public void testReadingDocumentForEmptyQueue() { @Test public void testMetaData6() { + assumeFalse(named); try (final ChronicleQueue chronicle = builder(getTmpDir(), this.wireType) .rollCycle(TEST2_DAILY) .timeProvider(new SetTimeProvider("2020/10/19T01:01:01")) @@ -1862,7 +1864,7 @@ public void testMetaData6() { dc.wire().write("FirstName").text("Steve"); } - final ExcerptTailer tailer = chronicle.createTailer(); + final ExcerptTailer tailer = chronicle.createTailer(named ? "named" : null); StringBuilder event = new StringBuilder(); while (true) { @@ -1965,7 +1967,7 @@ protected String expectedMetaDataTest2() { @Test(expected = IllegalArgumentException.class) public void dontPassQueueToReader() { try (ChronicleQueue queue = binary(getTmpDir()).build()) { - queue.createTailer().afterLastWritten(queue).methodReader(); + queue.createTailer(named ? "named" : null).afterLastWritten(queue).methodReader(); } } @@ -1975,7 +1977,7 @@ public void testToEndBeforeWrite() { .rollCycle(TEST2_DAILY) .build(); ExcerptAppender appender = chronicle.acquireAppender(); - ExcerptTailer tailer = chronicle.createTailer()) { + ExcerptTailer tailer = chronicle.createTailer(named ? "named" : null)) { int entries = chronicle.rollCycle().defaultIndexSpacing() * 2 + 2; @@ -1995,7 +1997,7 @@ public void testSomeMessages() { .build()) { ExcerptAppender appender = chronicle.acquireAppender(); - ExcerptTailer tailer = chronicle.createTailer(); + ExcerptTailer tailer = chronicle.createTailer(named ? "named" : null); int entries = chronicle.rollCycle().defaultIndexSpacing() * 2 + 2; @@ -2040,7 +2042,7 @@ public void shouldReadBackwardFromEndOfQueueWhenDirectionIsSetAfterMoveToEnd() { final ExcerptAppender appender = queue.acquireAppender(); appender.writeDocument(w -> w.writeEventName("hello").text("world")); - final ExcerptTailer tailer = queue.createTailer(); + final ExcerptTailer tailer = queue.createTailer(named ? "named" : null); tailer.toEnd(); tailer.direction(TailerDirection.BACKWARD); @@ -2049,7 +2051,7 @@ public void shouldReadBackwardFromEndOfQueueWhenDirectionIsSetAfterMoveToEnd() { } void readForward(@NotNull ChronicleQueue chronicle, int entries) { - try (ExcerptTailer forwardTailer = chronicle.createTailer() + try (ExcerptTailer forwardTailer = chronicle.createTailer(named ? "named" : null) .direction(TailerDirection.FORWARD) .toStart()) { @@ -2071,7 +2073,7 @@ void readForward(@NotNull ChronicleQueue chronicle, int entries) { } void readBackward(@NotNull ChronicleQueue chronicle, int entries) { - ExcerptTailer backwardTailer = chronicle.createTailer() + ExcerptTailer backwardTailer = chronicle.createTailer(named ? "named" : null) .direction(TailerDirection.BACKWARD) .toEnd(); @@ -2114,7 +2116,7 @@ public void testOverreadForwardFromFutureCycleThenReadBackwardTailer() { // go to the cycle next to the one the write was made on forwardToFuture.set(true); - ExcerptTailer forwardTailer = chronicle.createTailer() + ExcerptTailer forwardTailer = chronicle.createTailer(named ? "named" : null) .direction(TailerDirection.FORWARD) .toStart(); @@ -2125,7 +2127,7 @@ public void testOverreadForwardFromFutureCycleThenReadBackwardTailer() { assertFalse(context.isPresent()); } - ExcerptTailer backwardTailer = chronicle.createTailer() + ExcerptTailer backwardTailer = chronicle.createTailer(named ? "named" : null) .direction(TailerDirection.BACKWARD) .toEnd(); @@ -2160,7 +2162,7 @@ public void testZeroLengthMessage() { appender.writeDocument(w -> { }); // System.out.println(chronicle.dump()); - ExcerptTailer tailer = chronicle.createTailer(); + ExcerptTailer tailer = chronicle.createTailer(named ? "named" : null); try (DocumentContext dc = tailer.readingDocument()) { assertFalse(dc.wire().hasMore()); } @@ -2182,7 +2184,7 @@ public void testMoveToWithAppender() { appender.writeDocument(w -> w.writeEventName("hello").text("world1")); appender.writeDocument(w -> w.writeEventName("hello").text("world2")); - ExcerptTailer tailer = chronicle.createTailer(); + ExcerptTailer tailer = chronicle.createTailer(named ? "named" : null); try (DocumentContext documentContext = tailer.readingDocument()) { sync.writeBytes(documentContext.index(), documentContext.wire().bytes()); @@ -2211,7 +2213,7 @@ public void testMapWrapper() { appender.writeDocument(w -> w.write().object(myMap)); - ExcerptTailer tailer = chronicle.createTailer(); + ExcerptTailer tailer = chronicle.createTailer(named ? "named" : null); try (DocumentContext documentContext = tailer.readingDocument()) { MapWrapper object = documentContext.wire().read().object(MapWrapper.class); @@ -2278,7 +2280,7 @@ public void testAppendedSkipToEndMultiThreaded() throws InterruptedException { latch.await(); - ExcerptTailer tailer = q.createTailer(); + ExcerptTailer tailer = q.createTailer(named ? "named" : null); for (int i = 0; i < size; i++) { try (DocumentContext dc = tailer.readingDocument(false)) { long index = dc.index(); @@ -2350,7 +2352,7 @@ public void testToEndPrevCycleEOF() { .timeProvider(clock::get) .build()) { - ExcerptTailer tailer = q.createTailer(); + ExcerptTailer tailer = q.createTailer(named ? "named" : null); assertEquals("first", tailer.readText()); assertNull(tailer.readText()); @@ -2363,7 +2365,7 @@ public void testToEndPrevCycleEOF() { .timeProvider(clock::get) .build()) { - ExcerptTailer tailer = q.createTailer().toEnd(); + ExcerptTailer tailer = q.createTailer(named ? "named" : null).toEnd(); try (DocumentContext documentContext = tailer.readingDocument()) { assertFalse(documentContext.isPresent()); @@ -2382,9 +2384,9 @@ public void testToEndPrevCycleEOF() { .timeProvider(clock::get) .build()) { - ExcerptTailer excerptTailerBeforeAppend = q.createTailer().toEnd(); + ExcerptTailer excerptTailerBeforeAppend = q.createTailer(named ? "named" : null).toEnd(); q.acquireAppender().writeText("more text"); - ExcerptTailer excerptTailerAfterAppend = q.createTailer().toEnd(); + ExcerptTailer excerptTailerAfterAppend = q.createTailer(named ? "named" : null).toEnd(); q.acquireAppender().writeText("even more text"); assertEquals("more text", excerptTailerBeforeAppend.readText()); @@ -2416,7 +2418,7 @@ public void shouldNotGenerateGarbageReadingDocumentAfterEndOfFile() { .timeProvider(clock::get) .build(); - ExcerptTailer tailer = q.createTailer()) { + ExcerptTailer tailer = q.createTailer(named ? "named" : null)) { assertEquals("first", tailer.readText()); GcControls.waitForGcCycle(); @@ -2440,7 +2442,7 @@ public void testTailerWhenCyclesWhereSkippedOnWrite() { try (final ChronicleQueue queue = binary(getTmpDir()) .rollCycle(RollCycles.TEST_SECONDLY).timeProvider(timeProvider) .build(); - final ExcerptTailer tailer = queue.createTailer()) { + final ExcerptTailer tailer = queue.createTailer(named ? "named" : null)) { try (final ExcerptAppender appender = queue.acquireAppender()) { final List stringsToPut = Arrays.asList("one", "two", "three"); @@ -2675,7 +2677,7 @@ public void testReadingWritingWhenNextCycleIsInSequence() { // read both messages try (ChronicleQueue queue = binary(dir) .rollCycle(rollCycle).timeProvider(timeProvider).build(); - ExcerptTailer tailer = queue.createTailer()) { + ExcerptTailer tailer = queue.createTailer(named ? "named" : null)) { assertEquals("first message", tailer.readText()); assertEquals("second message", tailer.readText()); } @@ -2707,7 +2709,7 @@ public void testReadingWritingWhenCycleIsSkipped() { // read both messages try (ChronicleQueue queue = binary(dir) .rollCycle(rollCycle).timeProvider(timeProvider).build(); - ExcerptTailer tailer = queue.createTailer()) { + ExcerptTailer tailer = queue.createTailer(named ? "named" : null)) { assertEquals("first message", tailer.readText()); assertEquals("second message", tailer.readText()); } @@ -2736,7 +2738,7 @@ public void testReadingWritingWhenCycleIsSkippedBackwards() { // read both messages try (ChronicleQueue queue = binary(dir).rollCycle(rollCycle).timeProvider(timeProvider).build(); - ExcerptTailer tailer = queue.createTailer()) { + ExcerptTailer tailer = queue.createTailer(named ? "named" : null)) { ExcerptTailer excerptTailer = tailer.direction(TailerDirection.BACKWARD).toEnd(); assertEquals("second message", excerptTailer.readText()); assertEquals("first message", excerptTailer.readText()); @@ -2760,8 +2762,8 @@ public void testReadWritingWithTimeProvider() { .build(); final ExcerptAppender appender2 = q2.acquireAppender(); - final ExcerptTailer tailer1 = q1.createTailer(); - final ExcerptTailer tailer2 = q2.createTailer()) { + final ExcerptTailer tailer1 = q1.createTailer(named ? "named" : null); + final ExcerptTailer tailer2 = q2.createTailer(named ? "named" : null)) { try (final DocumentContext dc = appender2.writingDocument()) { dc.wire().write().text("some data"); @@ -2849,7 +2851,7 @@ public void testLongLivingTailerAppenderReAcquiredEachSecond() { .testBlockSize() .timeProvider(timeProvider) .build(); - final ExcerptTailer tailer = queuet.createTailer()) { + final ExcerptTailer tailer = queuet.createTailer(named ? "named" : null)) { // The look up of the first and last cycle is cached at this point and won't be checked again for 1 millisecond to reduce overhead. Jvm.pause(1); @@ -2907,10 +2909,9 @@ public void testFromSizePrefixedBlobs() { String s = null; DocumentContext dc0; - try (DocumentContext dc = queue.createTailer().readingDocument()) { + try (DocumentContext dc = queue.createTailer(named ? "named" : null).readingDocument()) { s = Wires.fromSizePrefixedBlobs(dc); - if (!encryption) - assertTrue(s.contains("some: data")); + assertTrue(s.contains("some: data")); dc0 = dc; } @@ -2955,7 +2956,7 @@ public void testCopyQueue() { { try (final ChronicleQueue s = binary(source).build(); final ChronicleQueue t = binary(target).build(); - ExcerptTailer sourceTailer = s.createTailer(); + ExcerptTailer sourceTailer = s.createTailer(named ? "named" : null); ExcerptAppender appender = t.acquireAppender()) { for (; ; ) { @@ -3006,7 +3007,7 @@ public void testIncorrectExcerptTailerReadsAfterSwitchingTailerDirection() { } } - try (ExcerptTailer tailer = queue.createTailer()) { + try (ExcerptTailer tailer = queue.createTailer(named ? "named" : null)) { assertTrue(tailer.moveToIndex(startIndex)); @@ -3069,7 +3070,7 @@ public void checkReferenceCountingAndCheckFileDeletion() { documentContext1.wire().write().text("some text"); } - try (DocumentContext documentContext = queue.createTailer().readingDocument()) { + try (DocumentContext documentContext = queue.createTailer(named ? "named" : null).readingDocument()) { mappedFile = toMappedFile(documentContext); assertEquals("some text", documentContext.wire().read().text()); } @@ -3110,7 +3111,7 @@ public void checkReferenceCountingWhenRollingAndCheckFileDeletion() { mappedFile2 = toMappedFile(dc); } - try (ExcerptTailer tailer = queue.createTailer()) { + try (ExcerptTailer tailer = queue.createTailer(named ? "named" : null)) { try (DocumentContext documentContext = tailer.readingDocument()) { mappedFile3 = toMappedFile(documentContext); assertEquals("some text", documentContext.wire().read().text()); @@ -3169,7 +3170,7 @@ public void testWritingDocumentIsAtomic() { } long timeout = 20_000 + System.currentTimeMillis(); - ExcerptTailer tailer = queue.createTailer(); + ExcerptTailer tailer = queue.createTailer(named ? "named" : null); for (int expected = 0; expected < totalIterations; expected++) { for (; ; ) { if (System.currentTimeMillis() > timeout) @@ -3203,6 +3204,7 @@ public void shouldBeAbleToLoadQueueFromReadOnlyFiles() throws IOException { System.err.println("#460 Cannot test read only mode on windows"); return; } + assumeFalse(named); final File queueDir = getTmpDir(); try (final ChronicleQueue queue = builder(queueDir, wireType). @@ -3219,7 +3221,7 @@ public void shouldBeAbleToLoadQueueFromReadOnlyFiles() throws IOException { try (final ChronicleQueue queue = builder(queueDir, wireType). readOnly(true). testBlockSize().build()) { - assertTrue(queue.createTailer().readingDocument().isPresent()); + assertTrue(queue.createTailer(named ? "named" : null).readingDocument().isPresent()); } } @@ -3269,7 +3271,7 @@ public void writeBytesAndIndexFiveTimesWithOverwriteTest() { } } - try (ExcerptTailer tailer = sourceQueue.createTailer(); + try (ExcerptTailer tailer = sourceQueue.createTailer(named ? "named" : null); ChronicleQueue queue = builder(getTmpDir(), wireType).testBlockSize().build()) { @@ -3341,7 +3343,7 @@ public void writeBytesAndIndexFiveTimesTest() { } String before = sourceQueue.dump().replaceAll("(?m)^#.+$\\n", ""); - try (ExcerptTailer tailer = sourceQueue.createTailer(); + try (ExcerptTailer tailer = sourceQueue.createTailer(named ? "named" : null); ChronicleQueue queue = builder(getTmpDir(), wireType).testBlockSize().build()) { @@ -3393,7 +3395,7 @@ public void rollbackTest() { } try (final ChronicleQueue queue = builder(file, wireType).testBlockSize().build(); - ExcerptTailer tailer1 = queue.createTailer()) { + ExcerptTailer tailer1 = queue.createTailer(named ? "named" : null)) { StringBuilder sb = Wires.acquireStringBuilder(); try (DocumentContext documentContext = tailer1.readingDocument()) { @@ -3584,6 +3586,7 @@ public void close() { @Test public void testReadUsingReadOnly() { assumeFalse("Read-only mode is not supported on Windows", OS.isWindows()); + assumeFalse(named); File tmpDir = getTmpDir(); String expected = "hello world"; @@ -3600,7 +3603,7 @@ public void testReadUsingReadOnly() { .readOnly(true) .build()) { StringBuilder sb = new StringBuilder(); - try (DocumentContext dc = out.createTailer().readingDocument()) { + try (DocumentContext dc = out.createTailer(named ? "named" : null).readingDocument()) { dc.wire().getValueIn().text(sb); } From dcb8395cea84d786d9cd8d9a1b6ac1a48554284f Mon Sep 17 00:00:00 2001 From: Peter Lawrey Date: Wed, 15 Dec 2021 11:31:44 +0000 Subject: [PATCH 4/7] Add more tests for named tailers #973 --- .../queue/MultipleNamedTailersTest.java | 30 +++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/src/test/java/net/openhft/chronicle/queue/MultipleNamedTailersTest.java b/src/test/java/net/openhft/chronicle/queue/MultipleNamedTailersTest.java index 898a263fff..c7b4f1a837 100644 --- a/src/test/java/net/openhft/chronicle/queue/MultipleNamedTailersTest.java +++ b/src/test/java/net/openhft/chronicle/queue/MultipleNamedTailersTest.java @@ -1,29 +1,53 @@ package net.openhft.chronicle.queue; import net.openhft.chronicle.core.OS; +import net.openhft.chronicle.core.io.IOTools; import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder; import net.openhft.chronicle.wire.DocumentContext; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.File; +import java.util.Arrays; +import java.util.Collection; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +@RunWith(Parameterized.class) public class MultipleNamedTailersTest { + private final boolean empty; + + public MultipleNamedTailersTest(boolean empty) { + this.empty = empty; + } + + @Parameterized.Parameters(name = "empty={0}") + public static Collection data() { + return Arrays.asList( + new Object[]{true}, + new Object[]{false} + ); + } + @Test public void multipleTailers() { File tmpDir = new File(OS.getTarget(), "multipleTailers" + System.nanoTime()); - try (ChronicleQueue q1 = SingleChronicleQueueBuilder.single(tmpDir).testBlockSize().rollCycle(RollCycles.TEST_SECONDLY).build(); + try (ChronicleQueue q1 = SingleChronicleQueueBuilder.single(tmpDir).testBlockSize() +// .timeProvider(new SetTimeProvider("2021/12/15T11:22:33").advanceMicros(10)) + .rollCycle(RollCycles.TEST_SECONDLY).build(); ExcerptAppender appender = q1.acquireAppender(); ExcerptTailer tailer1 = q1.createTailer(); ExcerptTailer namedTailer1 = q1.createTailer("named1"); ChronicleQueue q2 = SingleChronicleQueueBuilder.single(tmpDir).testBlockSize().build(); ExcerptTailer tailer2 = q2.createTailer(); ExcerptTailer namedTailer2 = q2.createTailer("named2")) { + if (!empty) + appender.writeText("0"); for (int i = 0; i < 1_000_000; i++) { - final String id0 = "" + i; + final String id0 = "" + (i + (empty ? 0 : 1)); appender.writeText(id0); final long index0 = appender.lastIndexAppended(); check(tailer1, id0, index0); @@ -32,11 +56,13 @@ public void multipleTailers() { check(namedTailer2, id0, index0); } } + IOTools.deleteDirWithFiles(tmpDir); } private void check(ExcerptTailer tailer1, String id0, long index0) { try (DocumentContext dc = tailer1.readingDocument()) { assertTrue(dc.isPresent()); + assertEquals(Long.toHexString(index0), Long.toHexString(tailer1.index())); assertEquals(index0, tailer1.index()); assertEquals(id0, dc.wire().getValueIn().text()); } From 5e7e8e9d6253bf6e3bf7350c72cae1ccdec4141c Mon Sep 17 00:00:00 2001 From: Peter Lawrey Date: Wed, 15 Dec 2021 16:06:56 +0000 Subject: [PATCH 5/7] Allow named tailers to be used concurrently to split work #964 --- .../queue/impl/single/StoreTailer.java | 59 +++++--- .../queue/ConcurrentNamedTailersTest.java | 22 +-- .../queue/MultipleNamedTailersTest.java | 57 +++++--- .../queue/impl/single/MessageHistoryTest.java | 3 + .../impl/single/SingleChronicleQueueTest.java | 131 +++++++++--------- 5 files changed, 159 insertions(+), 113 deletions(-) diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java b/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java index 06b3ba3e8b..d95e09d40a 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java @@ -37,6 +37,7 @@ */ class StoreTailer extends AbstractCloseable implements ExcerptTailer, SourceContext, ExcerptContext { + public static final long INVALID_INDEX = Long.MIN_VALUE; static final int INDEXING_LINEAR_SCAN_THRESHOLD = 70; static final StringBuilderPool SBP = new StringBuilderPool(); static final EOFException EOF_EXCEPTION = new EOFException(); @@ -57,7 +58,7 @@ class StoreTailer extends AbstractCloseable private boolean readAfterReplicaAcknowledged; @NotNull private TailerState state = UNINITIALISED; - private long indexAtCreation = Long.MIN_VALUE; + private long indexAtCreation = INVALID_INDEX; private boolean readingDocumentFound = false; private long address = NO_PAGE; private boolean striding = false; @@ -177,7 +178,7 @@ public int sourceId() { @NotNull @Override public String toString() { - final long index = index(); + final long index = this.index; // don't use index() as this confuses the debugger return "StoreTailer{" + "index sequence=" + queue.rollCycle().toSequenceNumber(index) + ", index cycle=" + queue.rollCycle().toCycle(index) + @@ -202,20 +203,22 @@ DocumentContext readingDocumentUnnamed(final boolean includeMetaData) { } DocumentContext readingDocumentNamed(final boolean includeMetaData) { - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 1_000_000; i++) { this.indexChecker = indexValue.getVolatileValue(); if (this.index != indexChecker) moveToIndex(this.indexChecker); DocumentContext documentContext = readingDocument0(includeMetaData); - if (indexChecker != Long.MIN_VALUE) { + if (indexChecker != INVALID_INDEX) { this.index = indexChecker; if (context.isPresent() && !context.isMetaData()) incrementIndex(); return documentContext; } documentContext.close(); + if (i > 1000) + Thread.yield(); } throw new AssertionError(); } @@ -369,7 +372,7 @@ private boolean endOfCycle() { final int currentCycle = queue.rollCycle().toCycle(oldIndex); final long nextIndex = nextIndexWithNextAvailableCycle(currentCycle); - if (nextIndex != Long.MIN_VALUE) { + if (nextIndex != INVALID_INDEX) { return nextEndOfCycle(queue.rollCycle().toCycle(nextIndex)); } else { state = END_OF_CYCLE; @@ -429,7 +432,7 @@ private boolean beyondStartOfCycleBackward() throws StreamCorruptedException { final int cycle = queue.rollCycle().toCycle(index()); final long nextIndex = nextIndexWithNextAvailableCycle(cycle); - if (nextIndex != Long.MIN_VALUE) { + if (nextIndex != INVALID_INDEX) { moveToIndexInternal(nextIndex); state = FOUND_IN_CYCLE; return true; @@ -440,7 +443,7 @@ private boolean beyondStartOfCycleBackward() throws StreamCorruptedException { } private boolean nextCycleNotFound() { - if (index() == Long.MIN_VALUE) { + if (index() == INVALID_INDEX) { if (this.store != null) queue.closeStore(this.store); this.store = null; @@ -541,7 +544,7 @@ private long nextIndexWithNextAvailableCycle0(final int cycle) { assert cycle != Integer.MIN_VALUE : "cycle == Integer.MIN_VALUE"; if (cycle > queue.lastCycle() || direction == TailerDirection.NONE) { - return Long.MIN_VALUE; + return INVALID_INDEX; } long nextIndex; @@ -553,7 +556,7 @@ private long nextIndexWithNextAvailableCycle0(final int cycle) { try { final int nextCycle0 = queue.nextCycle(this.cycle, direction); if (nextCycle0 == -1) - return Long.MIN_VALUE; + return INVALID_INDEX; nextIndex = nextIndexWithinFoundCycle(nextCycle0); @@ -563,7 +566,7 @@ private long nextIndexWithNextAvailableCycle0(final int cycle) { if (Jvm.isResourceTracing()) { final int nextIndexCycle = queue.rollCycle().toCycle(nextIndex); - if (nextIndex != Long.MIN_VALUE && nextIndexCycle - 1 != cycle) { + if (nextIndex != INVALID_INDEX && nextIndexCycle - 1 != cycle) { /* * lets say that you were using a roll cycle of TEST_SECONDLY @@ -736,6 +739,9 @@ ScanResult moveToIndexResult(final long index) { private ExcerptTailer doToStart() { assert direction != BACKWARD; + + this.indexChecker = INVALID_INDEX; + final int firstCycle = queue.firstCycle(); if (firstCycle == Integer.MAX_VALUE) { state = UNINITIALISED; @@ -790,7 +796,7 @@ private long approximateLastIndex() { final int lastCycle = queue.lastCycle(); try { if (lastCycle == Integer.MIN_VALUE) - return Long.MIN_VALUE; + return INVALID_INDEX; return approximateLastCycle2(lastCycle); @@ -939,6 +945,8 @@ public boolean striding() { @NotNull private ExcerptTailer optimizedToEnd() { + this.indexChecker = INVALID_INDEX; + final RollCycle rollCycle = queue.rollCycle(); final int lastCycle = queue.lastCycle(); try { @@ -990,7 +998,7 @@ public ExcerptTailer originalToEnd() { long index = approximateLastIndex(); - if (index == Long.MIN_VALUE) { + if (index == INVALID_INDEX) { if (state() == TailerState.CYCLE_NOT_FOUND) state = UNINITIALISED; return this; @@ -1130,9 +1138,15 @@ void index0(final long index) { // DON'T INLINE THIS METHOD, as it's used by enterprise chronicle queue void index(final long index) { - index0(index); + this.index = index; + if (indexValue != null) { + if (this.indexChecker == INVALID_INDEX) { + indexValue.setValue(index); + } else if (!indexValue.compareAndSwapValue(this.indexChecker, index)) + this.indexChecker = INVALID_INDEX; + } - if (indexAtCreation == Long.MIN_VALUE) { + if (indexAtCreation == INVALID_INDEX) { indexAtCreation = index; } @@ -1286,9 +1300,9 @@ public File currentFile() { } static final class MoveToState { - private long lastMovedToIndex = Long.MIN_VALUE; + private long lastMovedToIndex = INVALID_INDEX; private TailerDirection directionAtLastMoveTo = TailerDirection.NONE; - private long readPositionAtLastMove = Long.MIN_VALUE; + private long readPositionAtLastMove = INVALID_INDEX; private int indexMoveCount = 0; void onSuccessfulLookup(final long movedToIndex, @@ -1308,16 +1322,16 @@ void onSuccessfulScan(final long movedToIndex, } void reset() { - lastMovedToIndex = Long.MIN_VALUE; + lastMovedToIndex = INVALID_INDEX; directionAtLastMoveTo = TailerDirection.NONE; - readPositionAtLastMove = Long.MIN_VALUE; + readPositionAtLastMove = INVALID_INDEX; } private boolean indexIsCloseToAndAheadOfLastIndexMove(final long index, final TailerState state, final TailerDirection direction, final ChronicleQueue queue) { - return lastMovedToIndex != Long.MIN_VALUE && + return lastMovedToIndex != INVALID_INDEX && index - lastMovedToIndex < INDEXING_LINEAR_SCAN_THRESHOLD && state == FOUND_IN_CYCLE && direction == directionAtLastMoveTo && @@ -1361,6 +1375,13 @@ public int sourceId() { return StoreTailer.this.sourceId(); } + @Override + public void rollbackOnClose() { + if (indexValue != null) + throw new IllegalStateException("Can't roll back a named tailer"); + super.rollbackOnClose(); + } + @Override public void close() { if (rollbackIfNeeded()) diff --git a/src/test/java/net/openhft/chronicle/queue/ConcurrentNamedTailersTest.java b/src/test/java/net/openhft/chronicle/queue/ConcurrentNamedTailersTest.java index c7aa410a39..c7f8771fad 100644 --- a/src/test/java/net/openhft/chronicle/queue/ConcurrentNamedTailersTest.java +++ b/src/test/java/net/openhft/chronicle/queue/ConcurrentNamedTailersTest.java @@ -132,7 +132,7 @@ public void raceConditions() throws IllegalAccessException { indexValue.getValues.add(0x4a1400000007L); try (DocumentContext dc0 = tailer0.readingDocument()) { - assertEquals(0x4a1400000007L, tailer0.index()); + assertEquals(Long.toHexString(0x4a1400000007L), Long.toHexString(tailer0.index())); } assertEquals("[4a1400000002, 4a1400000003, 4a1400000007, 4a1400000007, 4a1400000008]", indexValue.setValues.stream().map(Long::toHexString).collect(Collectors.toList()).toString()); @@ -177,6 +177,10 @@ public void concurrency() { IOTools.deleteDirWithFiles(tmpDir); } + interface Tasker { + void task(int taskId); + } + private static class CompetingConsumer implements Runnable { private static final int LOG_EVERY = 10_000; @@ -208,8 +212,8 @@ public void run() { } TimingPauser pauser = Pauser.balanced(); while (counter.get() < highestIndex) { - try (final DocumentContext documentContext = namedTailer.readingDocument()) { - final long index = documentContext.wire().read("index").int64(); + try (final DocumentContext dc = namedTailer.readingDocument()) { + final long index = dc.wire().read("index").int64(); if (index % LOG_EVERY == 0) { Jvm.startup().on(ConcurrentNamedTailersTest.class, "Read index " + index); } @@ -217,7 +221,7 @@ public void run() { try { pauser.pause(1, TimeUnit.SECONDS); } catch (TimeoutException e) { - throw new AssertionError("Timed out trying to write " + index + " current value is " + counter.get()); + throw new AssertionError(Long.toHexString(dc.index()) + ": Timed out trying to read " + index + " current value is " + counter.get()); } } pauser.reset(); @@ -227,11 +231,6 @@ public void run() { } } - - interface Tasker { - void task(int taskId); - } - static class DummyLongReference implements LongReference { List getValues = new ArrayList<>(); List setValues = new ArrayList<>(); @@ -299,5 +298,10 @@ public boolean compareAndSwapValue(long expected, long value) throws IllegalStat } return false; } + + @Override + public String toString() { + return getClass().getName() + "@" + Integer.toHexString(hashCode()); + } } } diff --git a/src/test/java/net/openhft/chronicle/queue/MultipleNamedTailersTest.java b/src/test/java/net/openhft/chronicle/queue/MultipleNamedTailersTest.java index c7b4f1a837..2c2f143714 100644 --- a/src/test/java/net/openhft/chronicle/queue/MultipleNamedTailersTest.java +++ b/src/test/java/net/openhft/chronicle/queue/MultipleNamedTailersTest.java @@ -1,5 +1,6 @@ package net.openhft.chronicle.queue; +import net.openhft.chronicle.core.Jvm; import net.openhft.chronicle.core.OS; import net.openhft.chronicle.core.io.IOTools; import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder; @@ -26,8 +27,8 @@ public MultipleNamedTailersTest(boolean empty) { @Parameterized.Parameters(name = "empty={0}") public static Collection data() { return Arrays.asList( - new Object[]{true}, - new Object[]{false} + new Object[]{false}, + new Object[]{true} ); } @@ -36,35 +37,45 @@ public void multipleTailers() { File tmpDir = new File(OS.getTarget(), "multipleTailers" + System.nanoTime()); try (ChronicleQueue q1 = SingleChronicleQueueBuilder.single(tmpDir).testBlockSize() -// .timeProvider(new SetTimeProvider("2021/12/15T11:22:33").advanceMicros(10)) - .rollCycle(RollCycles.TEST_SECONDLY).build(); - ExcerptAppender appender = q1.acquireAppender(); - ExcerptTailer tailer1 = q1.createTailer(); - ExcerptTailer namedTailer1 = q1.createTailer("named1"); - ChronicleQueue q2 = SingleChronicleQueueBuilder.single(tmpDir).testBlockSize().build(); - ExcerptTailer tailer2 = q2.createTailer(); - ExcerptTailer namedTailer2 = q2.createTailer("named2")) { + .rollCycle(RollCycles.TEST_SECONDLY) + .build(); + ExcerptAppender appender = q1.acquireAppender()) { + if (!empty) appender.writeText("0"); - for (int i = 0; i < 1_000_000; i++) { - final String id0 = "" + (i + (empty ? 0 : 1)); - appender.writeText(id0); - final long index0 = appender.lastIndexAppended(); - check(tailer1, id0, index0); - check(namedTailer1, id0, index0); - check(tailer2, id0, index0); - check(namedTailer2, id0, index0); + + try (ExcerptTailer tailer1 = q1.createTailer(); + ExcerptTailer namedTailer1 = q1.createTailer("named1"); + ChronicleQueue q2 = SingleChronicleQueueBuilder.single(tmpDir).testBlockSize().build(); + ExcerptTailer tailer2 = q2.createTailer(); + ExcerptTailer namedTailer2 = q2.createTailer("named2")) { + for (int i = 0; i < 3_000; i++) { + final String id0 = "" + i; + if (i > 0 || empty) + appender.writeText(id0); + long index0 = appender.lastIndexAppended(); + if ((int) index0 == 0 && i > 0) + System.out.println("index: " + Long.toHexString(index0)); + check(tailer1, id0, index0); + check(namedTailer1, id0, index0); + check(tailer2, id0, index0); + check(namedTailer2, id0, index0); + Jvm.pause(1); + } } } IOTools.deleteDirWithFiles(tmpDir); } - private void check(ExcerptTailer tailer1, String id0, long index0) { - try (DocumentContext dc = tailer1.readingDocument()) { + private void check(ExcerptTailer tailer, String id0, long index0) { + try (DocumentContext dc = tailer.readingDocument()) { assertTrue(dc.isPresent()); - assertEquals(Long.toHexString(index0), Long.toHexString(tailer1.index())); - assertEquals(index0, tailer1.index()); - assertEquals(id0, dc.wire().getValueIn().text()); + final String text = dc.wire().getValueIn().text(); + assertEquals(id0, text); + assertEquals(Long.toHexString(index0), Long.toHexString(tailer.index())); + assertEquals(index0, tailer.index()); } + final long index2 = tailer.index(); + assertEquals(index0 + 1, index2); } } diff --git a/src/test/java/net/openhft/chronicle/queue/impl/single/MessageHistoryTest.java b/src/test/java/net/openhft/chronicle/queue/impl/single/MessageHistoryTest.java index dc312b1fb6..6bb19324d8 100644 --- a/src/test/java/net/openhft/chronicle/queue/impl/single/MessageHistoryTest.java +++ b/src/test/java/net/openhft/chronicle/queue/impl/single/MessageHistoryTest.java @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicLong; import static org.junit.Assert.*; +import static org.junit.Assume.assumeFalse; @RunWith(Parameterized.class) public final class MessageHistoryTest extends ChronicleQueueTestBase { @@ -72,6 +73,7 @@ public void shouldAccessMessageHistory() { @Test public void shouldAccessMessageHistoryWhenTailerIsMovedToEnd() { + assumeFalse(named); try (final ChronicleQueue inputQueue = createQueue(inputQueueDir, 1); final ChronicleQueue outputQueue = createQueue(outputQueueDir, 2)) { generateTestData(inputQueue, outputQueue); @@ -89,6 +91,7 @@ public void shouldAccessMessageHistoryWhenTailerIsMovedToEnd() { @Test public void chainedMessageHistory() { + assumeFalse(named); try (final ChronicleQueue inputQueue = createQueue(inputQueueDir, 1); final ChronicleQueue middleQueue = createQueue(middleQueueDir, 2); final ChronicleQueue outputQueue = createQueue(middleQueueDir, 2)) { diff --git a/src/test/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueTest.java b/src/test/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueTest.java index cad0e9e927..b38d50d4cc 100644 --- a/src/test/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueTest.java +++ b/src/test/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueTest.java @@ -195,6 +195,7 @@ private void testCleanupDir0() { @Test public void testRollbackOnAppend() { + assumeFalse(named); try (final ChronicleQueue queue = builder(getTmpDir(), wireType) .build()) { @@ -1193,15 +1194,15 @@ public void testReadAtIndex4MB() { .build()) { final ExcerptAppender appender = queue.acquireAppender(); - // System.out.print("Percent written="); + // System.out.print("Percent written="); for (long i = 0; i < TIMES; i++) { final long j = i; appender.writeDocument(wire -> wire.write("key").text("value=" + j)); - // if (i % (TIMES / 20) == 0) { - // System.out.println("" + (i * 100 / TIMES) + "%, "); - // } + // if (i % (TIMES / 20) == 0) { + // System.out.println("" + (i * 100 / TIMES) + "%, "); + // } } long lastIndex = appender.lastIndexAppended(); @@ -1209,7 +1210,7 @@ public void testReadAtIndex4MB() { final ExcerptTailer tailer = queue.createTailer(named ? "named" : null); - // QueueDumpMain.dump(file, new PrintWriter(System.out)); + // QueueDumpMain.dump(file, new PrintWriter(System.out)); StringBuilder sb = new StringBuilder(); @@ -1217,9 +1218,9 @@ public void testReadAtIndex4MB() { assertTrue(tailer.moveToIndex(queue.rollCycle().toIndex(cycle, i))); tailer.readDocument(wire -> wire.read("key").text(sb)); assertEquals("value=" + i, sb.toString()); - // if (i % (TIMES / 20) == 0) { - // System.out.println("Percent read= " + (i * 100 / TIMES) + "%"); - // } + // if (i % (TIMES / 20) == 0) { + // System.out.println("Percent read= " + (i * 100 / TIMES) + "%"); + // } } } } @@ -1764,7 +1765,7 @@ public void testToEndOnDeletedQueueFiles() throws IOException { .forEach(path -> assertTrue(path.toFile().delete())); try (ChronicleQueue q2 = builder(dir, wireType).build()) { - tailer = q2.createTailer(named ? "named" : null); + tailer = q2.createTailer(named ? "named2" : null); tailer.toEnd(); assertEquals(TailerState.UNINITIALISED, tailer.state()); append = q2.acquireAppender(); @@ -1794,10 +1795,10 @@ public void testReadWrite() { .text("text")); } - ExcerptTailer tailer = chronicle.createTailer(named ? "named" : null); - ExcerptTailer tailer2 = chronicle.createTailer(named ? "named" : null); - ExcerptTailer tailer3 = chronicle.createTailer(named ? "named" : null); - ExcerptTailer tailer4 = chronicle.createTailer(named ? "named" : null); + ExcerptTailer tailer = chronicle.createTailer(named ? "named1" : null); + ExcerptTailer tailer2 = chronicle.createTailer(named ? "named2" : null); + ExcerptTailer tailer3 = chronicle.createTailer(named ? "named3" : null); + ExcerptTailer tailer4 = chronicle.createTailer(named ? "named4" : null); for (int i = 0; i < runs; i++) { if (i % 10000 == 0) System.gc(); @@ -2006,7 +2007,7 @@ public void testSomeMessages() { appender.writeDocument(w -> w.writeEventName("hello").int64(finalI)); long seq = chronicle.rollCycle().toSequenceNumber(appender.lastIndexAppended()); assertEquals(i, seq); - // System.out.println(chronicle.dump()); + // System.out.println(chronicle.dump()); tailer.readDocument(w -> w.read().int64(finalI, (a, b) -> assertEquals((long) a, b))); } } @@ -2073,6 +2074,7 @@ void readForward(@NotNull ChronicleQueue chronicle, int entries) { } void readBackward(@NotNull ChronicleQueue chronicle, int entries) { + assumeFalse(named); ExcerptTailer backwardTailer = chronicle.createTailer(named ? "named" : null) .direction(TailerDirection.BACKWARD) .toEnd(); @@ -2146,7 +2148,7 @@ public void testLastIndexAppended() { appender.writeDocument(w -> w.writeEventName("hello").text("world0")); final long nextIndexToWrite = appender.lastIndexAppended() + 1; appender.writeDocument(w -> w.getValueOut().bytes(new byte[0])); - // System.out.println(chronicle.dump()); + // System.out.println(chronicle.dump()); assertEquals(nextIndexToWrite, appender.lastIndexAppended()); } @@ -2325,7 +2327,7 @@ public void testRandomConcurrentReadWrite() throws if (!executor.awaitTermination(10_000, TimeUnit.SECONDS)) executor.shutdownNow(); - // System.out.println(". " + i); + // System.out.println(". " + i); Jvm.pause(1000); } } @@ -2333,7 +2335,7 @@ public void testRandomConcurrentReadWrite() throws @Test public void testToEndPrevCycleEOF() { - final AtomicLong clock = new AtomicLong(System.currentTimeMillis()); + final AtomicLong clock = new AtomicLong(1639580990000L); File dir = getTmpDir(); try (ChronicleQueue q = builder(dir, wireType) .rollCycle(TEST_SECONDLY) @@ -2384,17 +2386,21 @@ public void testToEndPrevCycleEOF() { .timeProvider(clock::get) .build()) { - ExcerptTailer excerptTailerBeforeAppend = q.createTailer(named ? "named" : null).toEnd(); - q.acquireAppender().writeText("more text"); - ExcerptTailer excerptTailerAfterAppend = q.createTailer(named ? "named" : null).toEnd(); - q.acquireAppender().writeText("even more text"); + final ExcerptAppender appender = q.acquireAppender(); + appender.writeText("first text"); + ExcerptTailer excerptTailerBeforeAppend = q.createTailer(named ? "named" : null) + .toEnd(); + appender.writeText("more text"); + System.out.println(Long.toHexString(appender.lastIndexAppended())); + ExcerptTailer excerptTailerAfterAppend = q.createTailer(named ? "named2" : null) + .toEnd(); + appender.writeText("even more text"); + System.out.println(Long.toHexString(appender.lastIndexAppended())); assertEquals("more text", excerptTailerBeforeAppend.readText()); assertEquals("even more text", excerptTailerAfterAppend.readText()); assertEquals("even more text", excerptTailerBeforeAppend.readText()); } - AbstractCloseable.assertCloseablesClosed(); - } @Test @@ -2430,7 +2436,7 @@ public void shouldNotGenerateGarbageReadingDocumentAfterEndOfFile() { final long actualGcCycles = endCollectionCount - startCollectionCount; assertTrue(String.format("Too many GC cycles. Expected <= %d, but was %d", - maxAllowedGcCycles, actualGcCycles), + maxAllowedGcCycles, actualGcCycles), actualGcCycles <= maxAllowedGcCycles); } } @@ -2617,7 +2623,7 @@ public void testCountExceptsBetweenCycles() { long[] indexs = new long[10]; for (int i = 0; i < indexs.length; i++) { - // System.out.println("."); + // System.out.println("."); try (DocumentContext writingContext = appender.writingDocument()) { writingContext.wire().write().text("some-text-" + i); indexs[i] = writingContext.index(); @@ -2633,7 +2639,7 @@ else if ((i + 1) % 3 == 0) for (int lower = 0; lower < indexs.length; lower++) { for (int upper = lower; upper < indexs.length; upper++) { - // System.out.println("lower=" + lower + ",upper=" + upper); + // System.out.println("lower=" + lower + ",upper=" + upper); assertEquals(upper - lower, queue.countExcerpts(indexs[lower], indexs[upper])); } @@ -2736,6 +2742,7 @@ public void testReadingWritingWhenCycleIsSkippedBackwards() { queue.acquireAppender().writeText("second message"); } + assumeFalse(named); // read both messages try (ChronicleQueue queue = binary(dir).rollCycle(rollCycle).timeProvider(timeProvider).build(); ExcerptTailer tailer = queue.createTailer(named ? "named" : null)) { @@ -2762,8 +2769,8 @@ public void testReadWritingWithTimeProvider() { .build(); final ExcerptAppender appender2 = q2.acquireAppender(); - final ExcerptTailer tailer1 = q1.createTailer(named ? "named" : null); - final ExcerptTailer tailer2 = q2.createTailer(named ? "named" : null)) { + final ExcerptTailer tailer1 = q1.createTailer(named ? "named1" : null); + final ExcerptTailer tailer2 = q2.createTailer(named ? "named2" : null)) { try (final DocumentContext dc = appender2.writingDocument()) { dc.wire().write().text("some data"); @@ -2833,7 +2840,7 @@ public void testTailerSnappingRollWithNewAppender() throws InterruptedException, }); f1.get(10, TimeUnit.SECONDS); - // System.out.println(queue.dump().replaceAll("(?m)^#.+$\\n", "")); + // System.out.println(queue.dump().replaceAll("(?m)^#.+$\\n", "")); f2.get(10, TimeUnit.SECONDS); executorService.shutdownNow(); @@ -3377,7 +3384,7 @@ public void writeBytesAndIndexFiveTimesTest() { @Test public void rollbackTest() { - + assumeFalse(named); File file = getTmpDir(); try (final ChronicleQueue sourceQueue = builder(file, wireType). @@ -3548,38 +3555,6 @@ private boolean doMappedSegmentUnmappedRollTest(AtomicLong clock, StringBuilder } } - private static class MapWrapper extends SelfDescribingMarshallable { - final Map map = new HashMap<>(); - } - - static class MyMarshable extends SelfDescribingMarshallable implements Demarshallable { - @UsedViaReflection - String name; - - @UsedViaReflection - public MyMarshable(@NotNull WireIn wire) { - readMarshallable(wire); - } - - public MyMarshable() { - } - } - - private static class BytesWithIndex implements Closeable { - private BytesStore bytes; - private long index; - - public BytesWithIndex(Bytes bytes, long index) { - this.bytes = Bytes.allocateElasticDirect(bytes.readRemaining()).write(bytes); - this.index = index; - } - - @Override - public void close() { - bytes.releaseLast(); - } - } - /** * relates to https://github.com/OpenHFT/Chronicle-Queue/issues/699 */ @@ -3688,4 +3663,36 @@ public void shouldWaitForConditionWhenAcquiringAppender() throws TimeoutExceptio } } } + + private static class MapWrapper extends SelfDescribingMarshallable { + final Map map = new HashMap<>(); + } + + static class MyMarshable extends SelfDescribingMarshallable implements Demarshallable { + @UsedViaReflection + String name; + + @UsedViaReflection + public MyMarshable(@NotNull WireIn wire) { + readMarshallable(wire); + } + + public MyMarshable() { + } + } + + private static class BytesWithIndex implements Closeable { + private BytesStore bytes; + private long index; + + public BytesWithIndex(Bytes bytes, long index) { + this.bytes = Bytes.allocateElasticDirect(bytes.readRemaining()).write(bytes); + this.index = index; + } + + @Override + public void close() { + bytes.releaseLast(); + } + } } From 5ed9735c54bb295f380af899dc5ac17e80b3444e Mon Sep 17 00:00:00 2001 From: Peter Lawrey Date: Thu, 16 Dec 2021 10:22:10 +0000 Subject: [PATCH 6/7] Roll named tailers correctly #964 --- .../queue/impl/single/StoreTailer.java | 12 +++++-- .../queue/MultipleNamedTailersTest.java | 36 +++++++++++++++++-- 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java b/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java index d95e09d40a..6d55503d79 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java @@ -205,8 +205,10 @@ DocumentContext readingDocumentUnnamed(final boolean includeMetaData) { DocumentContext readingDocumentNamed(final boolean includeMetaData) { for (int i = 0; i < 1_000_000; i++) { this.indexChecker = indexValue.getVolatileValue(); - if (this.index != indexChecker) + if (this.index != indexChecker) { + System.out.println("move " + Long.toHexString(this.index) + " to " + Long.toHexString(indexChecker)); moveToIndex(this.indexChecker); + } DocumentContext documentContext = readingDocument0(includeMetaData); @@ -1142,8 +1144,12 @@ void index(final long index) { if (indexValue != null) { if (this.indexChecker == INVALID_INDEX) { indexValue.setValue(index); - } else if (!indexValue.compareAndSwapValue(this.indexChecker, index)) + } else if (indexValue.compareAndSwapValue(this.indexChecker, index)) { + this.indexChecker = index; + } else { this.indexChecker = INVALID_INDEX; + } + } if (indexAtCreation == INVALID_INDEX) { @@ -1389,6 +1395,8 @@ public void close() { if (isPresent() && !isMetaData() && indexValue == null) incrementIndex(); + else if (indexValue != null) + StoreTailer.this.index++; // it should be at this position if no other tailers read or appenders rolls. super.close(); if (direction == FORWARD) diff --git a/src/test/java/net/openhft/chronicle/queue/MultipleNamedTailersTest.java b/src/test/java/net/openhft/chronicle/queue/MultipleNamedTailersTest.java index 2c2f143714..dfda13b8ba 100644 --- a/src/test/java/net/openhft/chronicle/queue/MultipleNamedTailersTest.java +++ b/src/test/java/net/openhft/chronicle/queue/MultipleNamedTailersTest.java @@ -32,6 +32,37 @@ public static Collection data() { ); } + @Test + public void multipleSharedTailers() { + File tmpDir = new File(OS.getTarget(), "multipleTailers" + System.nanoTime()); + + try (ChronicleQueue q1 = SingleChronicleQueueBuilder.single(tmpDir).testBlockSize() + .rollCycle(RollCycles.TEST_SECONDLY) + .build(); + ExcerptAppender appender = q1.acquireAppender()) { + + if (!empty) + appender.writeText("0"); + + try (ExcerptTailer namedTailer1 = q1.createTailer("named1"); + ChronicleQueue q2 = SingleChronicleQueueBuilder.single(tmpDir).testBlockSize().build(); + ExcerptTailer namedTailer2 = q2.createTailer("named2")) { + for (int i = 0; i < 3_000; i++) { + final String id0 = "" + i; + if (i > 0 || empty) + appender.writeText(id0); + long index0 = appender.lastIndexAppended(); + if (i % 2 == 0) + check(namedTailer1, id0, index0); + else + check(namedTailer2, id0, index0); + Jvm.pause(1); + } + } + } + IOTools.deleteDirWithFiles(tmpDir); + } + @Test public void multipleTailers() { File tmpDir = new File(OS.getTarget(), "multipleTailers" + System.nanoTime()); @@ -54,8 +85,8 @@ public void multipleTailers() { if (i > 0 || empty) appender.writeText(id0); long index0 = appender.lastIndexAppended(); - if ((int) index0 == 0 && i > 0) - System.out.println("index: " + Long.toHexString(index0)); +// if ((int) index0 == 0 && i > 0) +// System.out.println("index: " + Long.toHexString(index0)); check(tailer1, id0, index0); check(namedTailer1, id0, index0); check(tailer2, id0, index0); @@ -76,6 +107,7 @@ private void check(ExcerptTailer tailer, String id0, long index0) { assertEquals(index0, tailer.index()); } final long index2 = tailer.index(); +// System.out.println("Was "+Long.toHexString(index0)+" now "+Long.toHexString(index2)); assertEquals(index0 + 1, index2); } } From fff315687146c670152cea66c44baa22a4403865 Mon Sep 17 00:00:00 2001 From: Peter Lawrey Date: Thu, 16 Dec 2021 10:23:51 +0000 Subject: [PATCH 7/7] Roll named tailers correctly #964 --- .../net/openhft/chronicle/queue/impl/single/StoreTailer.java | 4 +--- .../net/openhft/chronicle/queue/MultipleNamedTailersTest.java | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java b/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java index 6d55503d79..8dc9a92867 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java @@ -205,10 +205,8 @@ DocumentContext readingDocumentUnnamed(final boolean includeMetaData) { DocumentContext readingDocumentNamed(final boolean includeMetaData) { for (int i = 0; i < 1_000_000; i++) { this.indexChecker = indexValue.getVolatileValue(); - if (this.index != indexChecker) { - System.out.println("move " + Long.toHexString(this.index) + " to " + Long.toHexString(indexChecker)); + if (this.index != indexChecker) moveToIndex(this.indexChecker); - } DocumentContext documentContext = readingDocument0(includeMetaData); diff --git a/src/test/java/net/openhft/chronicle/queue/MultipleNamedTailersTest.java b/src/test/java/net/openhft/chronicle/queue/MultipleNamedTailersTest.java index dfda13b8ba..92900bb5b6 100644 --- a/src/test/java/net/openhft/chronicle/queue/MultipleNamedTailersTest.java +++ b/src/test/java/net/openhft/chronicle/queue/MultipleNamedTailersTest.java @@ -44,9 +44,9 @@ public void multipleSharedTailers() { if (!empty) appender.writeText("0"); - try (ExcerptTailer namedTailer1 = q1.createTailer("named1"); + try (ExcerptTailer namedTailer1 = q1.createTailer("named"); ChronicleQueue q2 = SingleChronicleQueueBuilder.single(tmpDir).testBlockSize().build(); - ExcerptTailer namedTailer2 = q2.createTailer("named2")) { + ExcerptTailer namedTailer2 = q2.createTailer("named")) { for (int i = 0; i < 3_000; i++) { final String id0 = "" + i; if (i > 0 || empty)