diff --git a/src/main/java/net/openhft/chronicle/queue/PretouchHandler.java b/src/main/java/net/openhft/chronicle/queue/PretouchHandler.java index c71a1ad045..d94f80095f 100644 --- a/src/main/java/net/openhft/chronicle/queue/PretouchHandler.java +++ b/src/main/java/net/openhft/chronicle/queue/PretouchHandler.java @@ -2,18 +2,20 @@ import net.openhft.chronicle.core.threads.EventHandler; import net.openhft.chronicle.core.threads.HandlerPriority; +import net.openhft.chronicle.queue.impl.single.Pretoucher; +import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; import org.jetbrains.annotations.NotNull; public final class PretouchHandler implements EventHandler { - private ChronicleQueue queue; + private final Pretoucher pretoucher; - public PretouchHandler(ChronicleQueue queue) { - this.queue = queue; + public PretouchHandler(final SingleChronicleQueue queue) { + this.pretoucher = new Pretoucher(queue); } @Override public boolean action() { - queue.acquireAppender().pretouch(); + pretoucher.execute(); return false; } @@ -22,4 +24,4 @@ public boolean action() { public HandlerPriority priority() { return HandlerPriority.MONITOR; } -} +} \ No newline at end of file diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/Pretoucher.java b/src/main/java/net/openhft/chronicle/queue/impl/single/Pretoucher.java new file mode 100644 index 0000000000..1ca74ed6f7 --- /dev/null +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/Pretoucher.java @@ -0,0 +1,79 @@ +package net.openhft.chronicle.queue.impl.single; + +import net.openhft.chronicle.bytes.MappedBytes; +import net.openhft.chronicle.bytes.NewChunkListener; +import net.openhft.chronicle.queue.impl.WireStore; + +import java.util.function.IntConsumer; + +/** + * A class designed to be called from a long-lived thread. + * + * Upon invocation of the {@code execute()} method, this object will pre-touch pages in the supplied queue's underlying store file, + * attempting to keep ahead of any appenders to the queue. + * + * Resources held by this object will be released when the underlying queue is closed. + * + * Alternatively, the {@code shutdown()} method can be called to close the supplied queue and release any other resources. + * Invocation of the {@code execute()} method after {@code shutdown()} has been called with cause an {@code IllegalStateException} to be thrown. + */ +public final class Pretoucher { + private final SingleChronicleQueue queue; + private final NewChunkListener chunkListener; + private final IntConsumer cycleChangedListener; + private final PretoucherState pretoucherState; + private int currentCycle = Integer.MIN_VALUE; + private WireStore currentCycleWireStore; + private MappedBytes currentCycleMappedBytes; + + public Pretoucher(final SingleChronicleQueue queue) { + this(queue, null, c -> {}); + } + + // visible for testing + Pretoucher(final SingleChronicleQueue queue, final NewChunkListener chunkListener, + final IntConsumer cycleChangedListener) { + this.queue = queue; + this.chunkListener = chunkListener; + this.cycleChangedListener = cycleChangedListener; + queue.addCloseListener(this, Pretoucher::releaseResources); + pretoucherState = new PretoucherState(this::getStoreWritePosition); + } + + public void execute() { + assignCurrentCycle(); + pretoucherState.pretouch(currentCycleMappedBytes); + } + + public void shutdown() { + queue.close(); + } + + private void assignCurrentCycle() { + if (queue.cycle() != currentCycle) { + releaseResources(); + + currentCycleWireStore = queue.storeForCycle(queue.cycle(), queue.epoch(), true); + currentCycleMappedBytes = currentCycleWireStore.bytes(); + currentCycle = queue.cycle(); + if (chunkListener != null) { + currentCycleMappedBytes.setNewChunkListener(chunkListener); + } + + cycleChangedListener.accept(queue.cycle()); + } + } + + private long getStoreWritePosition() { + return currentCycleWireStore.writePosition(); + } + + private void releaseResources() { + if (currentCycleWireStore != null) { + queue.release(currentCycleWireStore); + } + if (currentCycleMappedBytes != null) { + currentCycleMappedBytes.close(); + } + } +} \ No newline at end of file diff --git a/src/test/java/net/openhft/chronicle/queue/impl/single/ChunkAllocationAfterGarbageCollectionTest.java b/src/test/java/net/openhft/chronicle/queue/impl/single/ChunkAllocationAfterGarbageCollectionTest.java deleted file mode 100644 index 5609218711..0000000000 --- a/src/test/java/net/openhft/chronicle/queue/impl/single/ChunkAllocationAfterGarbageCollectionTest.java +++ /dev/null @@ -1,105 +0,0 @@ -package net.openhft.chronicle.queue.impl.single; - -import net.openhft.chronicle.bytes.NewChunkListener; -import net.openhft.chronicle.core.time.TimeProvider; -import net.openhft.chronicle.queue.DirectoryUtils; -import net.openhft.chronicle.queue.ExcerptAppender; -import net.openhft.chronicle.queue.RollCycles; -import net.openhft.chronicle.wire.DocumentContext; -import net.openhft.chronicle.wire.WireType; -import org.jetbrains.annotations.NotNull; -import org.junit.After; -import org.junit.Ignore; -import org.junit.Test; - -import java.io.File; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.LockSupport; -import java.util.stream.IntStream; - -import static java.util.stream.IntStream.range; - -public final class ChunkAllocationAfterGarbageCollectionTest { - private static final byte[] DATA = new byte[8192]; - - private final File path = DirectoryUtils.tempDir(ChunkAllocationAfterGarbageCollectionTest.class.getSimpleName()); - - @Ignore("demonstrates an issue") - @Test - public void pretoucherAppenderShouldNotResetToStartOfMappedFile() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - Executors.newSingleThreadExecutor().submit(() -> { - String lastAppenderHashCode = ""; - Thread.currentThread().setName("pre-toucher-thread"); - try (final SingleChronicleQueue queue = createQueue(path, System::currentTimeMillis)) { - - while (!Thread.currentThread().isInterrupted()) { - lastAppenderHashCode = preTouch(lastAppenderHashCode, queue, new LoggingNewChunkListener()); - LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10L)); - } - - } - }); - - try (final SingleChronicleQueue queue = createQueue(path, System::currentTimeMillis)) { - final ExcerptAppender appender = queue.acquireAppender(); - queue.storeForCycle(queue.cycle(), 0, true).bytes(). - setNewChunkListener(new LoggingNewChunkListener()); - appender.lazyIndexing(true); - - IntStream.range(0, 5_000).forEach(i -> { - range(0, 100).forEach(j -> { - try (final DocumentContext ctx = appender.writingDocument()) { - ctx.wire().write().bytes(DATA); - } - }); - - latch.countDown(); - LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L)); - - if (i % 30 == 0) { - System.out.println("GC"); - GcControls.waitForGcCycle(); - } - }); - } - } - - @NotNull - private String preTouch(final String lastAppenderHashCode, final SingleChronicleQueue queue, - final LoggingNewChunkListener chunkListener) { - final ExcerptAppender appender = queue.acquireAppender(); - queue.storeForCycle(queue.cycle(), 0, true).bytes(). - setNewChunkListener(chunkListener); - if (!lastAppenderHashCode.equals(Integer.toHexString(System.identityHashCode(appender)))) { - System.out.println("Appender changed"); - } - - appender.pretouch(); - return Integer.toHexString(System.identityHashCode(appender)); - } - - private static final class LoggingNewChunkListener implements NewChunkListener { - @Override - public void onNewChunk(final String filename, final int chunk, final long delayMicros) { - System.out.printf("%s chunk %d%n", Thread.currentThread().getName(), chunk); - } - } - - @After - public void deleteDir() throws Exception { - DirectoryUtils.deleteDir(path); - } - - private static SingleChronicleQueue createQueue(final File path, final TimeProvider timeProvider) { - return SingleChronicleQueueBuilder. - binary(path). - timeProvider(timeProvider). - rollCycle(RollCycles.DAILY). - testBlockSize(). - wireType(WireType.BINARY). - build(); - } -} \ No newline at end of file diff --git a/src/test/java/net/openhft/chronicle/queue/impl/single/PretoucherTest.java b/src/test/java/net/openhft/chronicle/queue/impl/single/PretoucherTest.java new file mode 100644 index 0000000000..51e92f902f --- /dev/null +++ b/src/test/java/net/openhft/chronicle/queue/impl/single/PretoucherTest.java @@ -0,0 +1,73 @@ +package net.openhft.chronicle.queue.impl.single; + +import net.openhft.chronicle.bytes.NewChunkListener; +import net.openhft.chronicle.core.time.TimeProvider; +import net.openhft.chronicle.queue.DirectoryUtils; +import net.openhft.chronicle.queue.RollCycles; +import net.openhft.chronicle.wire.DocumentContext; +import net.openhft.chronicle.wire.WireType; +import org.junit.After; +import org.junit.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static java.util.stream.IntStream.range; +import static net.openhft.chronicle.queue.DirectoryUtils.tempDir; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +public class PretoucherTest { + private final File path = tempDir(PretoucherTest.class.getSimpleName()); + private final AtomicLong clock = new AtomicLong(System.currentTimeMillis()); + private final List capturedCycles = new ArrayList<>(); + private final CapturingChunkListener chunkListener = new CapturingChunkListener(); + + @Test + public void shouldHandleCycleRoll() throws Exception { + try (final SingleChronicleQueue queue = createQueue(path, clock::get)) { + final Pretoucher pretoucher = new Pretoucher(queue, chunkListener, capturedCycles::add); + + range(0, 10).forEach(i -> { + try (final DocumentContext ctx = queue.acquireAppender().writingDocument()) { + ctx.wire().write().int32(i); + pretoucher.execute(); + ctx.wire().write().bytes(new byte[1024]); + } + pretoucher.execute(); + clock.addAndGet(TimeUnit.SECONDS.toMillis(5L)); + }); + + assertThat(capturedCycles.size(), is(10)); + assertThat(chunkListener.chunkMap.isEmpty(), is(false)); + } + } + + @After + public void deleteDir() throws Exception { + DirectoryUtils.deleteDir(path); + } + + private static final class CapturingChunkListener implements NewChunkListener { + private final TreeMap> chunkMap = new TreeMap<>(); + + @Override + public void onNewChunk(final String filename, final int chunk, final long delayMicros) { + chunkMap.computeIfAbsent(filename, f -> new ArrayList<>()).add(chunk); + } + } + + private static SingleChronicleQueue createQueue(final File path, final TimeProvider timeProvider) { + return SingleChronicleQueueBuilder. + binary(path). + timeProvider(timeProvider). + rollCycle(RollCycles.TEST_SECONDLY). + testBlockSize(). + wireType(WireType.BINARY). + build(); + } +} \ No newline at end of file