Skip to content

Commit

Permalink
Don't trust the cached last sequence header. Fixes #1538
Browse files Browse the repository at this point in the history
  • Loading branch information
nicktindall committed Mar 6, 2024
1 parent d932683 commit 0444c59
Show file tree
Hide file tree
Showing 10 changed files with 328 additions and 130 deletions.
23 changes: 19 additions & 4 deletions src/main/java/net/openhft/chronicle/queue/ExcerptTailer.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,12 @@ default boolean readAfterReplicaAcknowledged() {
* Calling this method may move ExcerptTailer to the specified cycle and release its store.
*
* @return the approximate number of excerpts in a cycle.
*
* @deprecated Use {@link #excerptsInCycle(int)} instead
*/
@Deprecated(/* To be removed in x.27 */)
default long approximateExcerptsInCycle(int cycle) {
try (ExcerptTailer tailer = queue().createTailer()) {
return tailer.approximateExcerptsInCycle(cycle);
}
return excerptsInCycle(cycle);
}

/**
Expand All @@ -246,10 +247,24 @@ default long approximateExcerptsInCycle(int cycle) {
* Calling this method may move ExcerptTailer to the specified cycle and release its store.
*
* @return the exact number of excerpts in a cycle.
*
* @deprecated Use {@link #excerptsInCycle(int)} instead
*/
@Deprecated(/* To be removed in x.27 */)
default long exactExcerptsInCycle(int cycle) {
return excerptsInCycle(cycle);
}

/**
* Return the exact number of excerpts in a cycle available for reading.
* <p>
* Calling this method may move ExcerptTailer to the specified cycle and release its store.
*
* @return the exact number of excerpts in a cycle.
*/
default long excerptsInCycle(int cycle) {
try (ExcerptTailer tailer = queue().createTailer()) {
return tailer.exactExcerptsInCycle(cycle);
return tailer.excerptsInCycle(cycle);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private QueueSystemProperties() {
* Default unset value: false
* Activation values : "", "yes", or "true"
*/
public static final boolean CHECK_INDEX = Jvm.getBoolean("queue.check.index");
public static boolean CHECK_INDEX = Jvm.getBoolean("queue.check.index");

/**
* Name of a system property used to specify the default roll cycle.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public static long findWithinCycle(@NotNull Wire key,
try {
long lowSeqNum = 0;

long highSeqNum = tailer.approximateExcerptsInCycle(cycle) - 1;
long highSeqNum = tailer.excerptsInCycle(cycle) - 1;

// nothing to search
if (highSeqNum < lowSeqNum)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import java.io.EOFException;
import java.io.StreamCorruptedException;
import java.io.UncheckedIOException;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -574,7 +575,7 @@ long sequenceForPosition(@NotNull ExcerptContext ec,
try {
return linearScanByPosition(wire, position, indexOfNext, lastKnownAddress, inclusive);
} catch (EOFException e) {
throw new IllegalStateException(e);
throw new UncheckedIOException(e);
}
}

Expand Down Expand Up @@ -705,12 +706,12 @@ public boolean indexable(long index) {
return (index & (indexSpacing - 1)) == 0;
}

public long lastSequenceNumber(@NotNull ExcerptContext ec, boolean approximate)
public long lastSequenceNumber(@NotNull ExcerptContext ec)
throws StreamCorruptedException {
throwExceptionIfClosed();

Sequence sequence1 = this.sequence;
if (approximate && sequence1 != null) {
if (sequence1 != null) {
for (int i = 0; i < 128; i++) {

long address = writePosition.getVolatileValue(0);
Expand All @@ -721,7 +722,12 @@ public long lastSequenceNumber(@NotNull ExcerptContext ec, boolean approximate)
continue;
if (sequence == Sequence.NOT_FOUND)
break;
return sequence;
try {
Wire wireForIndex = ec.wireForIndex();
return wireForIndex == null ? sequence : linearScanByPosition(wireForIndex, Long.MAX_VALUE, sequence, address, true);
} catch (EOFException e) {
throw new UncheckedIOException(e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,29 +613,29 @@ public int nextCycle(int cycle, @NotNull TailerDirection direction) throws Parse
* which is updated last during append operation so may be possible that a single entry is available for reading
* but not acknowledged by this method yet.
*
* @see ExcerptTailer#approximateExcerptsInCycle(int)
* @see ExcerptTailer#excerptsInCycle(int)
* @deprecated
*/
@Deprecated(/* To be removed in x.25 */)
public long approximateExcerptsInCycle(int cycle) {
throwExceptionIfClosed();
try (ExcerptTailer tailer = createTailer()) {
return tailer.approximateExcerptsInCycle(cycle);
return tailer.excerptsInCycle(cycle);
}
}

/**
* Returns an exact number of excerpts in a cycle available for reading. This may be a computationally
* expensive operation.
*
* @see ExcerptTailer#exactExcerptsInCycle(int)
* @see ExcerptTailer#excerptsInCycle(int)
* @deprecated
*/
@Deprecated(/* To be removed in x.25 */)
public long exactExcerptsInCycle(int cycle) {
throwExceptionIfClosed();
try (ExcerptTailer tailer = createTailer()) {
return tailer.exactExcerptsInCycle(cycle);
return tailer.excerptsInCycle(cycle);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public String dump() {
@NotNull
@Override
public String shortDump() {
return dump(WireType.BINARY_LIGHT,true);
return dump(WireType.BINARY_LIGHT, true);
}

@Override
Expand Down Expand Up @@ -315,20 +315,25 @@ public long sequenceForPosition(@NotNull final ExcerptContext ec, final long pos
return indexing.sequenceForPosition(ec, position, inclusive);
}

@Override
@Deprecated(/* To be removed in 5.25 */)
public long lastSequenceNumber(@NotNull ExcerptContext ec) throws StreamCorruptedException {
return approximateLastSequenceNumber(ec);
throwExceptionIfClosedInSetter();
return indexing.lastSequenceNumber(ec);
}

/**
* @deprecated Use {@link #lastSequenceNumber(ExcerptContext)} instead
*/
@Deprecated(/* To be removed in x.26 */)
public long approximateLastSequenceNumber(@NotNull ExcerptContext ec) throws StreamCorruptedException {
throwExceptionIfClosedInSetter();
return indexing.lastSequenceNumber(ec, true);
return lastSequenceNumber(ec);
}

/**
* @deprecated Use {@link #lastSequenceNumber(ExcerptContext)} instead
*/
@Deprecated(/* To be removed in x.26 */)
public long exactLastSequenceNumber(@NotNull ExcerptContext ec) throws StreamCorruptedException {
throwExceptionIfClosedInSetter();
return indexing.lastSequenceNumber(ec, false);
return lastSequenceNumber(ec);
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class StoreAppender extends AbstractCloseable
}
}
if (wire != null)
resetPosition(false);
resetPosition();
}
} finally {
writeLock.unlock();
Expand Down Expand Up @@ -328,7 +328,7 @@ private void setCycle2(final int cycle, final WireStoreSupplier.CreateStrategy c

wire.parent(this);
wire.pauser(queue.pauserSupplier.get());
resetPosition(false);
resetPosition();
queue.onRoll(cycle);
}

Expand Down Expand Up @@ -358,7 +358,7 @@ private Wire createWire(@NotNull final WireType wireType) {
* @return true if the header number is changed, otherwise false
* @throws UnrecoverableTimeoutException todo
*/
private boolean resetPosition(boolean exact) {
private boolean resetPosition() {
long originalHeaderNumber = wire.headerNumber();
try {
if (store == null || wire == null)
Expand All @@ -369,8 +369,7 @@ private boolean resetPosition(boolean exact) {
Bytes<?> bytes = wire.bytes();
assert !QueueSystemProperties.CHECK_INDEX || checkPositionOfHeader(bytes);

final long lastSequenceNumber = exact ? store.exactLastSequenceNumber(this)
: store.approximateLastSequenceNumber(this);
final long lastSequenceNumber = store.lastSequenceNumber(this);
wire.headerNumber(queue.rollCycle().toIndex(cycle, lastSequenceNumber + 1) - 1);

assert !QueueSystemProperties.CHECK_INDEX || wire.headerNumber() != -1 || checkIndex(wire.headerNumber(), positionOfHeader);
Expand Down Expand Up @@ -437,7 +436,7 @@ private StoreAppender.StoreAppenderContext prepareAndReturnWriteContext(boolean
rollCycleTo(cycle);

long safeLength = queue.overlapSize();
resetPosition(false);
resetPosition();
assert !QueueSystemProperties.CHECK_INDEX || checkWritePositionHeaderNumber();

// sets the writeLimit based on the safeLength
Expand Down Expand Up @@ -657,7 +656,7 @@ protected void writeBytesInternal(final long index, @NotNull final BytesStore by
rollCycleTo(cycle, this.cycle > cycle);

// in case our cached headerNumber is incorrect.
resetPosition(true);
resetPosition();

long headerNumber = wire.headerNumber();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,8 @@ public ExcerptTailer originalToEnd() {
break;

case FOUND:
LoopForward: while (originalToEndLoopCondition(approximateLastIndex, index)) {
LoopForward:
while (originalToEndLoopCondition(approximateLastIndex, index)) {
final ScanResult result = moveToIndexResult(++index);
switch (result) {
case NOT_REACHED:
Expand Down Expand Up @@ -1124,7 +1125,7 @@ private void windBackCycle(int cycle) {
}

private boolean tryWindBack(final int cycle) {
final long count = exactExcerptsInCycle(cycle);
final long count = excerptsInCycle(cycle);
if (count <= 0)
return false;
final RollCycle rollCycle = queue.rollCycle();
Expand Down Expand Up @@ -1212,22 +1213,10 @@ public boolean readAfterReplicaAcknowledged() {
}

@Override
public long approximateExcerptsInCycle(int cycle) {
public long excerptsInCycle(int cycle) {
throwExceptionIfClosed();
try {
return moveToCycle(cycle) ? store.approximateLastSequenceNumber(this) + 1 : -1;
} catch (StreamCorruptedException e) {
throw new IllegalStateException(e);
} finally {
releaseStore();
}
}

@Override
public long exactExcerptsInCycle(int cycle) {
throwExceptionIfClosed();
try {
return moveToCycle(cycle) ? store.exactLastSequenceNumber(this) + 1 : -1;
return moveToCycle(cycle) ? store.lastSequenceNumber(this) + 1 : -1;
} catch (StreamCorruptedException e) {
throw new IllegalStateException(e);
} finally {
Expand All @@ -1249,7 +1238,7 @@ public TailerState state() {
* Must not be null.
* @param index The index to read the history message in the {@code queue}.
* @return This ExcerptTailer instance.
* @throws IORuntimeException if the provided {@code queue} couldn't be wound to the last index.
* @throws IORuntimeException if the provided {@code queue} couldn't be wound to the last index.
* @throws NullPointerException if the provided {@code queue} is null.
*/
@NotNull
Expand Down
Loading

0 comments on commit 0444c59

Please sign in to comment.