From 0276a51c2009b5a86a753f3fbecc2c7e17bf4158 Mon Sep 17 00:00:00 2001 From: Peter Lawrey Date: Fri, 25 Nov 2016 11:45:56 -0500 Subject: [PATCH] Extract pre-touch functionality. --- .../openhft/chronicle/queue/Pretoucher.java | 52 +++++++++++++++++++ .../single/SingleChronicleQueueExcerpts.java | 38 ++------------ .../chronicle/queue/WriteBytesTest.java | 2 +- 3 files changed, 57 insertions(+), 35 deletions(-) create mode 100755 src/main/java/net/openhft/chronicle/queue/Pretoucher.java diff --git a/src/main/java/net/openhft/chronicle/queue/Pretoucher.java b/src/main/java/net/openhft/chronicle/queue/Pretoucher.java new file mode 100755 index 0000000000..ea56ed3f41 --- /dev/null +++ b/src/main/java/net/openhft/chronicle/queue/Pretoucher.java @@ -0,0 +1,52 @@ +package net.openhft.chronicle.queue; + +import net.openhft.chronicle.bytes.MappedBytes; +import net.openhft.chronicle.core.Jvm; +import net.openhft.chronicle.core.OS; + +import java.util.function.LongSupplier; + +/** + * Created by peter on 25/11/2016. + */ +public class Pretoucher { + static final int HEAD_ROOM = 1 << 20; + private final LongSupplier posSupplier; + private long lastTouchedPage = 0, lastTouchedPos = 0; + + public Pretoucher(LongSupplier posSupplier) { + this.posSupplier = posSupplier; + } + + public void pretouch(MappedBytes bytes) { + long pos = posSupplier.getAsLong(); + if (lastTouchedPage > pos) { + lastTouchedPage = pos - pos % OS.pageSize(); + lastTouchedPos = pos; + String message = "Reset lastTouched to " + lastTouchedPage; + Jvm.debug().on(getClass(), message); + + } else { + long headroom = Math.max(HEAD_ROOM, (pos - lastTouchedPos) * 4); // for the next 4 ticks. + long last = pos + headroom; + Thread thread = Thread.currentThread(); + int count = 0, pretouch = 0; + for (; lastTouchedPage < last; lastTouchedPage += OS.pageSize()) { + if (thread.isInterrupted()) + break; + if (bytes.readVolatileLong(lastTouchedPage) == 0) + pretouch++; + count++; + } + if (pretouch < count) + Jvm.debug().on(getClass(), "pretouch for only " + pretouch + " or " + count); + + long pos2 = posSupplier.getAsLong(); + if (Jvm.isDebugEnabled(getClass())) { + String message = "Advanced " + (pos - lastTouchedPos) / 1024 + " KB between pretouch() and " + (pos2 - pos) / 1024 + " KB while mapping of " + headroom / 1024 + " KB."; + Jvm.debug().on(getClass(), message); + } + lastTouchedPos = pos; + } + } +} diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueExcerpts.java b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueExcerpts.java index 2f1a468fe5..34bde47f26 100755 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueExcerpts.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueExcerpts.java @@ -22,7 +22,6 @@ import net.openhft.chronicle.bytes.WriteBytesMarshallable; import net.openhft.chronicle.core.Jvm; import net.openhft.chronicle.core.Maths; -import net.openhft.chronicle.core.OS; import net.openhft.chronicle.core.annotation.UsedViaReflection; import net.openhft.chronicle.core.io.IORuntimeException; import net.openhft.chronicle.core.util.StringUtils; @@ -87,8 +86,7 @@ static class StoreAppender implements ExcerptAppender, ExcerptContext, InternalA private boolean lazyIndexing = false; private long lastPosition; private int lastCycle; - private long lastTouchedPage = -1; - private long lastTouchedPos = 0; + private Pretoucher pretoucher = null; private Padding padToCacheLines = Padding.SMART; StoreAppender(@NotNull SingleChronicleQueue queue) { @@ -148,36 +146,9 @@ private void close() { @Override public void pretouch() { setCycle(queue.cycle(), true); - long pos = store.writePosition(); - MappedBytes bytes = (MappedBytes) wire.bytes(); - - if (lastTouchedPage < 0) { - lastTouchedPage = pos - pos % OS.pageSize(); - lastTouchedPos = pos; - String message = "Reset lastTouched to " + lastTouchedPage; - Jvm.debug().on(getClass(), message); - } else { - long headroom = Math.max(HEAD_ROOM, (pos - lastTouchedPos) * 4); // for the next 4 ticks. - long last = pos + headroom; - Thread thread = Thread.currentThread(); - int count = 0, pretouch = 0; - for (; lastTouchedPage < last; lastTouchedPage += OS.pageSize()) { - if (thread.isInterrupted()) - break; - if (bytes.compareAndSwapInt(lastTouchedPage, 0, 0)) - pretouch++; - count++; - } - if (pretouch < count) - Jvm.debug().on(getClass(), "pretouch for only " + pretouch + " or " + count); - - long pos2 = store.writePosition(); - if (Jvm.isDebugEnabled(getClass())) { - String message = "Advanced " + (pos - lastTouchedPos) / 1024 + " KB between pretouch() and " + (pos2 - pos) / 1024 + " KB while mapping of " + headroom / 1024 + " KB."; - Jvm.debug().on(getClass(), message); - } - lastTouchedPos = pos; - } + if (pretoucher == null) + pretoucher = new Pretoucher(() -> this.store.writePosition()); + pretoucher.pretouch((MappedBytes) wire.bytes()); } @Override @@ -262,7 +233,6 @@ private void resetWires(SingleChronicleQueue queue) { private void resetPosition() throws UnrecoverableTimeoutException { try { - lastTouchedPage = -1; if (store == null || wire == null) return; diff --git a/src/test/java/net/openhft/chronicle/queue/WriteBytesTest.java b/src/test/java/net/openhft/chronicle/queue/WriteBytesTest.java index 9811f67b50..f4def5ddd3 100755 --- a/src/test/java/net/openhft/chronicle/queue/WriteBytesTest.java +++ b/src/test/java/net/openhft/chronicle/queue/WriteBytesTest.java @@ -71,7 +71,7 @@ public void testWriteBytes() { @Test public void testWriteBytesAndDump() { - String dir = OS.TARGET + "/WriteBytesTestAndDump"; + String dir = OS.TARGET + "/WriteBytesTestAndDump-" + System.nanoTime(); try (ChronicleQueue queue = SingleChronicleQueueBuilder.binary(dir) .rollCycle(TEST4_DAILY) .build()) {