Skip to content

Commit

Permalink
Code optimisation to support Queue and encryption in Queue-Enterprise.
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-lawrey committed Feb 5, 2017
1 parent 2313b27 commit 50a99a5
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 168 deletions.
Expand Up @@ -15,8 +15,10 @@
class PretoucherState {
private static final int HEAD_ROOM = 1 << 20;
private final LongSupplier posSupplier;
private final int minHeadRoom;
private long lastTouchedPage = 0, lastTouchedPos = 0;
private int minHeadRoom;
private long lastTouchedPage = 0,
lastTouchedPos = 0,
lastPos = 0;
private int lastBytesHashcode = -1;
private long averageMove = 0;

Expand All @@ -40,37 +42,44 @@ public void pretouch(MappedBytes bytes) {
long pos = posSupplier.getAsLong();
// don't retain the bytes object when it is head so keep the hashCode instead.
// small risk of a duplicate hashCode.
int pageSize = OS.pageSize();
if (lastBytesHashcode != System.identityHashCode(bytes)) {
lastTouchedPage = pos - pos % OS.pageSize();
lastTouchedPage = pos - pos % pageSize;
lastTouchedPos = pos;
lastBytesHashcode = System.identityHashCode(bytes);
String message = getFile(bytes) + " - Reset pretoucher to pos " + pos + " as the underlying MappedBytes changed.";
debug(message);

} else {
averageMove = (pos - lastTouchedPage) / 4 + averageMove * 3 / 4;
averageMove = (pos - lastPos) / 4 + averageMove * 3 / 4;
long neededHeadRoom = Math.max(minHeadRoom, averageMove * 4); // for the next 4 ticks.
final long neededEnd = pos + neededHeadRoom;
if (lastTouchedPage < neededEnd) {
Thread thread = Thread.currentThread();
int count = 0, pretouch = 0;
for (; lastTouchedPage < neededEnd; lastTouchedPage += OS.pageSize()) {
for (; lastTouchedPage < neededEnd; lastTouchedPage += pageSize) {
if (thread.isInterrupted())
break;
if (touchPage(bytes, lastTouchedPage))
pretouch++;
count++;
}
if (pretouch < count)
debug("pretouch for only " + pretouch + " of " + count);
if (pretouch < count) {
minHeadRoom += 1 << 20;
debug("pretouch for only " + pretouch + " of " + count + " min: " + (minHeadRoom >> 20) + " MB.");
}

long pos2 = posSupplier.getAsLong();
if (Jvm.isDebugEnabled(getClass())) {
String message = getFile(bytes) + ": Advanced " + (pos - lastTouchedPos) / 1024 + " KB between pretouch() and " + (pos2 - pos) / 1024 + " KB while mapping of " + neededHeadRoom / 1024 + " KB.";
String message = getFile(bytes) + ": Advanced " + (pos - lastTouchedPos) / 1024 + " KB, " +
"avg " + averageMove / 1024 + " KB " +
"between pretouch() and " + (pos2 - pos) / 1024 + " KB " +
"while mapping of " + pretouch * pageSize / 1024 + " KB ";
debug(message);
}
lastTouchedPos = pos;
}
lastPos = pos;
}
}

Expand All @@ -79,6 +88,6 @@ protected void debug(String message) {
}

protected boolean touchPage(MappedBytes bytes, long offset) {
return bytes.readVolatileLong(offset) == 0;
return bytes.compareAndSwapLong(offset, 0L, 0L);
}
}
Expand Up @@ -78,7 +78,7 @@ public class SingleChronicleQueue implements RollingChronicleQueue {
private final boolean isBuffered;
@NotNull
private final WireType wireType;
private final long blockSize;
private final long blockSize, overlapSize;
@NotNull
private final Consumer<BytesRingBufferStats> onRingBufferStats;
private final EventLoop eventLoop;
Expand Down Expand Up @@ -107,6 +107,7 @@ protected SingleChronicleQueue(@NotNull final SingleChronicleQueueBuilder builde
path = builder.path();
wireType = builder.wireType();
blockSize = builder.blockSize();
overlapSize = builder.blockSize() / 4;
eventLoop = builder.eventLoop();
bufferCapacity = builder.bufferCapacity();
onRingBufferStats = builder.onRingBufferStats();
Expand Down Expand Up @@ -524,6 +525,10 @@ public long blockSize() {
return this.blockSize;
}

public long overlapSize() {
return this.overlapSize;
}

@NotNull
@Override
public WireType wireType() {
Expand Down
Expand Up @@ -270,50 +270,52 @@ public DocumentContext writingDocument(boolean metaData) throws UnrecoverableTim
if (this.cycle != cycle || wire == null)
rollCycleTo(cycle);

for (int i = 0; i <= 100; i++) {
int safeLength = (int) queue.overlapSize();
for (int i = 0; i < 128; i++) {
try {
assert wire != null;
long pos = store.writeHeader(wire, Wires.UNKNOWN_LENGTH, (int) (queue.blockSize() / 4), timeoutMS());
long pos = store.writeHeader(wire, Wires.UNKNOWN_LENGTH, safeLength, timeoutMS());
position(pos);
context.isClosed = false;
context.metaData = false;
context.wire = wire;
context.padToCacheAlign = padToCacheAlign() != Padding.NEVER;
break;
context.metaData(metaData);
ok = true;
return context;

} catch (EOFException theySeeMeRolling) {

assert !((AbstractWire) wire).isInsideHeader();
int qCycle = queue.cycle();
if (cycle < queue.cycle()) {
setCycle2(cycle = qCycle, true);
} else if (cycle == qCycle) {
// for the rare case where the qCycle has just changed in the last
// few milliseconds since
setCycle2(++cycle, true);
} else
throw new IllegalStateException("Found an EOF on the next cycle file," +
" this next file, should not have an EOF as its cycle " +
"number is greater than the current cycle (based on the " +
"current time), this should only happen " +
"if it " +
"was written by a different appender set with a different " +
"EPOC or different roll cycle." +
"All your appenders ( that write to a given directory ) " +
"should have the same EPOCH and roll cycle" +
" qCycle=" + qCycle + ", cycle=" + cycle);
cycle = handleRoll(cycle);
}
if (i == 100)
throw new IllegalStateException("Unable to roll to the current cycle");
}
ok = true;
throw new IllegalStateException("Unable to roll to the current cycle");

} finally {
if (!ok)
assert resetAppendingThread();
}
context.metaData(metaData);
return context;
}

private int handleRoll(int cycle) {
assert !((AbstractWire) wire).isInsideHeader();
int qCycle = queue.cycle();
if (cycle < queue.cycle()) {
setCycle2(cycle = qCycle, true);
} else if (cycle == qCycle) {
// for the rare case where the qCycle has just changed in the last
// few milliseconds since
setCycle2(++cycle, true);
} else
throw new IllegalStateException("Found an EOF on the next cycle file," +
" this next file, should not have an EOF as its cycle " +
"number is greater than the current cycle (based on the " +
"current time), this should only happen " +
"if it " +
"was written by a different appender set with a different " +
"EPOC or different roll cycle." +
"All your appenders ( that write to a given directory ) " +
"should have the same EPOCH and roll cycle" +
" qCycle=" + qCycle + ", cycle=" + cycle);
return cycle;
}

boolean checkWritePositionHeaderNumber() {
Expand Down Expand Up @@ -794,6 +796,20 @@ private static boolean isReadOnly(Bytes bytes) {
!((MappedBytes) bytes).mappedFile().file().canWrite();
}

public boolean readDocument(@NotNull ReadMarshallable reader) {
try (@NotNull DocumentContext dc = readingDocument(false)) {
if (!dc.isPresent())
return false;
reader.readMarshallable(dc.wire());
}
return true;
}

@NotNull
public DocumentContext readingDocument() {
return readingDocument(false);
}

private void close() {
context.wire(null);
Wire w0 = wireForIndex;
Expand Down Expand Up @@ -835,7 +851,19 @@ public String toString() {
@Override
public DocumentContext readingDocument(boolean includeMetaData) {
try {
if (context.present(next(includeMetaData))) {
boolean next = false, tryAgain = true;
if (state == FOUND_CYCLE) {
try {
next = inACycle(includeMetaData);
tryAgain = false;
} catch (EOFException eof) {
state = TailerState.END_OF_CYCLE;
}
}
if (tryAgain)
next = next0(includeMetaData);

if (context.present(next)) {
assert wire().startUse();
return context;
}
Expand All @@ -854,17 +882,6 @@ public DocumentContext readingDocument(boolean includeMetaData) {
return NoDocumentContext.INSTANCE;
}

private boolean next(boolean includeMetaData) throws UnrecoverableTimeoutException, StreamCorruptedException {
if (state == FOUND_CYCLE) {
try {
return inACycle(includeMetaData);
} catch (EOFException eof) {
state = TailerState.END_OF_CYCLE;
}
}
return next0(includeMetaData);
}

private boolean next0(boolean includeMetaData) throws UnrecoverableTimeoutException, StreamCorruptedException {
for (int i = 0; i < 1000; i++) {
switch (state) {
Expand Down
Expand Up @@ -25,6 +25,7 @@
import net.openhft.chronicle.core.values.LongValue;
import net.openhft.chronicle.wire.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.EOFException;
import java.util.concurrent.Callable;
Expand All @@ -40,13 +41,25 @@ public class TimedStoreRecovery extends AbstractMarshallable implements StoreRec

@UsedViaReflection
public TimedStoreRecovery(WireIn in) {
timeStamp = in.read(() -> "timeStamp").int64ForBinding(in.newLongReference());
timeStamp = in.read("timeStamp").int64ForBinding(in.newLongReference());
}

public TimedStoreRecovery(WireType wireType) {
timeStamp = wireType.newLongReference().get();
}

@Override
public long writeHeader(Wire wire,
int length,
int safeLength,
long timeoutMS,
@Nullable final LongValue lastPosition) throws EOFException, UnrecoverableTimeoutException {
try {
return wire.writeHeader(length, safeLength, timeoutMS, TimeUnit.MILLISECONDS, lastPosition);
} catch (TimeoutException e) {
return recoverAndWriteHeader(wire, length, timeoutMS, lastPosition);
}
}
@Override
public void writeMarshallable(@NotNull WireOut out) {
out.write("timeStamp").int64forBinding(0);
Expand Down

0 comments on commit 50a99a5

Please sign in to comment.