Skip to content

Commit

Permalink
Extract pre-touch functionality.
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-lawrey committed Nov 25, 2016
1 parent 9f915bb commit 0276a51
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 35 deletions.
52 changes: 52 additions & 0 deletions src/main/java/net/openhft/chronicle/queue/Pretoucher.java
@@ -0,0 +1,52 @@
package net.openhft.chronicle.queue;

import net.openhft.chronicle.bytes.MappedBytes;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.OS;

import java.util.function.LongSupplier;

/**
* Created by peter on 25/11/2016.
*/
public class Pretoucher {
static final int HEAD_ROOM = 1 << 20;
private final LongSupplier posSupplier;
private long lastTouchedPage = 0, lastTouchedPos = 0;

public Pretoucher(LongSupplier posSupplier) {
this.posSupplier = posSupplier;
}

public void pretouch(MappedBytes bytes) {
long pos = posSupplier.getAsLong();
if (lastTouchedPage > pos) {
lastTouchedPage = pos - pos % OS.pageSize();
lastTouchedPos = pos;
String message = "Reset lastTouched to " + lastTouchedPage;
Jvm.debug().on(getClass(), message);

} else {
long headroom = Math.max(HEAD_ROOM, (pos - lastTouchedPos) * 4); // for the next 4 ticks.
long last = pos + headroom;
Thread thread = Thread.currentThread();
int count = 0, pretouch = 0;
for (; lastTouchedPage < last; lastTouchedPage += OS.pageSize()) {
if (thread.isInterrupted())
break;
if (bytes.readVolatileLong(lastTouchedPage) == 0)
pretouch++;
count++;
}
if (pretouch < count)
Jvm.debug().on(getClass(), "pretouch for only " + pretouch + " or " + count);

long pos2 = posSupplier.getAsLong();
if (Jvm.isDebugEnabled(getClass())) {
String message = "Advanced " + (pos - lastTouchedPos) / 1024 + " KB between pretouch() and " + (pos2 - pos) / 1024 + " KB while mapping of " + headroom / 1024 + " KB.";
Jvm.debug().on(getClass(), message);
}
lastTouchedPos = pos;
}
}
}
Expand Up @@ -22,7 +22,6 @@
import net.openhft.chronicle.bytes.WriteBytesMarshallable; import net.openhft.chronicle.bytes.WriteBytesMarshallable;
import net.openhft.chronicle.core.Jvm; import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.Maths; import net.openhft.chronicle.core.Maths;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.core.annotation.UsedViaReflection; import net.openhft.chronicle.core.annotation.UsedViaReflection;
import net.openhft.chronicle.core.io.IORuntimeException; import net.openhft.chronicle.core.io.IORuntimeException;
import net.openhft.chronicle.core.util.StringUtils; import net.openhft.chronicle.core.util.StringUtils;
Expand Down Expand Up @@ -87,8 +86,7 @@ static class StoreAppender implements ExcerptAppender, ExcerptContext, InternalA
private boolean lazyIndexing = false; private boolean lazyIndexing = false;
private long lastPosition; private long lastPosition;
private int lastCycle; private int lastCycle;
private long lastTouchedPage = -1; private Pretoucher pretoucher = null;
private long lastTouchedPos = 0;
private Padding padToCacheLines = Padding.SMART; private Padding padToCacheLines = Padding.SMART;


StoreAppender(@NotNull SingleChronicleQueue queue) { StoreAppender(@NotNull SingleChronicleQueue queue) {
Expand Down Expand Up @@ -148,36 +146,9 @@ private void close() {
@Override @Override
public void pretouch() { public void pretouch() {
setCycle(queue.cycle(), true); setCycle(queue.cycle(), true);
long pos = store.writePosition(); if (pretoucher == null)
MappedBytes bytes = (MappedBytes) wire.bytes(); pretoucher = new Pretoucher(() -> this.store.writePosition());

pretoucher.pretouch((MappedBytes) wire.bytes());
if (lastTouchedPage < 0) {
lastTouchedPage = pos - pos % OS.pageSize();
lastTouchedPos = pos;
String message = "Reset lastTouched to " + lastTouchedPage;
Jvm.debug().on(getClass(), message);
} else {
long headroom = Math.max(HEAD_ROOM, (pos - lastTouchedPos) * 4); // for the next 4 ticks.
long last = pos + headroom;
Thread thread = Thread.currentThread();
int count = 0, pretouch = 0;
for (; lastTouchedPage < last; lastTouchedPage += OS.pageSize()) {
if (thread.isInterrupted())
break;
if (bytes.compareAndSwapInt(lastTouchedPage, 0, 0))
pretouch++;
count++;
}
if (pretouch < count)
Jvm.debug().on(getClass(), "pretouch for only " + pretouch + " or " + count);

long pos2 = store.writePosition();
if (Jvm.isDebugEnabled(getClass())) {
String message = "Advanced " + (pos - lastTouchedPos) / 1024 + " KB between pretouch() and " + (pos2 - pos) / 1024 + " KB while mapping of " + headroom / 1024 + " KB.";
Jvm.debug().on(getClass(), message);
}
lastTouchedPos = pos;
}
} }


@Override @Override
Expand Down Expand Up @@ -262,7 +233,6 @@ private void resetWires(SingleChronicleQueue queue) {


private void resetPosition() throws UnrecoverableTimeoutException { private void resetPosition() throws UnrecoverableTimeoutException {
try { try {
lastTouchedPage = -1;


if (store == null || wire == null) if (store == null || wire == null)
return; return;
Expand Down
Expand Up @@ -71,7 +71,7 @@ public void testWriteBytes() {


@Test @Test
public void testWriteBytesAndDump() { public void testWriteBytesAndDump() {
String dir = OS.TARGET + "/WriteBytesTestAndDump"; String dir = OS.TARGET + "/WriteBytesTestAndDump-" + System.nanoTime();
try (ChronicleQueue queue = SingleChronicleQueueBuilder.binary(dir) try (ChronicleQueue queue = SingleChronicleQueueBuilder.binary(dir)
.rollCycle(TEST4_DAILY) .rollCycle(TEST4_DAILY)
.build()) { .build()) {
Expand Down

0 comments on commit 0276a51

Please sign in to comment.