Skip to content

Commit

Permalink
Refactor StoreTailer.readDocument
Browse files Browse the repository at this point in the history
  • Loading branch information
tgd committed Mar 5, 2024
1 parent 9f2a524 commit 2f50ef6
Showing 1 changed file with 43 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package net.openhft.chronicle.queue.impl.single;

import net.openhft.chronicle.assertions.AssertUtil;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.MappedBytes;
import net.openhft.chronicle.bytes.MappedBytesStore;
Expand All @@ -42,6 +43,7 @@
import java.io.StreamCorruptedException;
import java.text.ParseException;

import static net.openhft.chronicle.assertions.AssertUtil.SKIP_ASSERTIONS;
import static net.openhft.chronicle.queue.TailerDirection.*;
import static net.openhft.chronicle.queue.TailerState.*;
import static net.openhft.chronicle.queue.impl.single.ScanResult.*;
Expand Down Expand Up @@ -202,11 +204,17 @@ public String toString() {
@Override
public DocumentContext readingDocument(final boolean includeMetaData) {
DocumentContext documentContext = readingDocument0(includeMetaData);
// this check was added after a strange behaviour seen by one client. It should be impossible.
checkReadRemainingPrecondition(documentContext);
return documentContext;
}

/**
* Added in response to client bug please see <a href="https://github.com/OpenHFT/Chronicle-Queue/issues/801">801</a>.
*/
private void checkReadRemainingPrecondition(DocumentContext documentContext) {
if (documentContext.wire() != null)
if (documentContext.wire().bytes().readRemaining() >= 1 << 30)
throw new AssertionError("readRemaining " + documentContext.wire().bytes().readRemaining());
return documentContext;
}

DocumentContext readingDocument0(final boolean includeMetaData) {
Expand All @@ -228,16 +236,14 @@ DocumentContext readingDocument0(final boolean includeMetaData) {
if (tryAgain)
next = next0(includeMetaData);

Wire wire = context.wire();
if (wire != null && context.present(next)) {
Bytes<?> bytes = wire.bytes();
context.setStart(bytes.readPosition() - 4);
readingDocumentFound = true;
this.lastReadIndex = this.index();
return context;
// An entry has been found, prepare the context and return it.
if (next && (context.wire() != null)) {
return prepareStoreTailerContext();
}

readingDocumentCycleNotFound(next);
if (state == CYCLE_NOT_FOUND) {
readingDocumentCycleNotFound(next);
}

} catch (StreamCorruptedException e) {
throw new IllegalStateException(e);
Expand All @@ -251,6 +257,16 @@ DocumentContext readingDocument0(final boolean includeMetaData) {
return INSTANCE;
}

/**
* A document has been found - prepare the tailer context for return to the caller.
*/
private StoreTailerContext prepareStoreTailerContext() {
context.init();
readingDocumentFound = true;
this.lastReadIndex = this.index();
return context;
}

private void readingDocumentDBUE(DecoratedBufferUnderflowException e) {
if (queue.isReadOnly()) {
Jvm.warn().on(StoreTailer.class,
Expand All @@ -262,12 +278,12 @@ private void readingDocumentDBUE(DecoratedBufferUnderflowException e) {
}

private void readingDocumentCycleNotFound(boolean next) {
RollCycle rollCycle = queue.rollCycle();
if (state == CYCLE_NOT_FOUND && direction == FORWARD) {
if (direction == FORWARD) {
RollCycle rollCycle = queue.rollCycle();
int firstCycle = queue.firstCycle();
if (rollCycle.toCycle(index()) < firstCycle)
toStart();
} else if (!next && state == CYCLE_NOT_FOUND && cycle != queue.cycle()) {
} else if (!next && cycle != queue.cycle()) {
// appenders have moved on, it's possible that linearScan is hitting EOF, which is ignored
// since we can't find an entry at current index, indicate that we're at the end of a cycle
state = TailerState.END_OF_CYCLE;
Expand Down Expand Up @@ -452,15 +468,15 @@ private boolean inACycle2(boolean includeMetaData, Wire wire, Bytes<?> bytes) th
bytes.readLimitToCapacity();

switch (wire.readDataHeader(includeMetaData)) {
case DATA:
context.metaData(false);
break;
case NONE:
// no more polling - appender will always write (or recover) EOF
return false;
case META_DATA:
context.metaData(true);
break;
case DATA:
context.metaData(false);
break;
case EOF:
throw EOF_EXCEPTION;
}
Expand Down Expand Up @@ -1420,6 +1436,9 @@ protected void finalize() throws Throwable {
}
}

/**
* Supports a view to a document somewhere in the queue.
*/
class StoreTailerContext extends BinaryReadDocumentContext {
StoreTailerContext() {
super(null);
Expand All @@ -1446,8 +1465,14 @@ public void close() {
super.close();
}

boolean present(final boolean present) {
return this.present = present;
/**
* Initialise the context with a new document ensuring that the start of this context points to the start of
* this document in the underlying buffer.
*/
public void init() {
present = true;
long bytesReadPosition = wire.bytes().readPosition();
setStart(bytesReadPosition - Wires.SPB_HEADER_SIZE);
}

public void wire(@Nullable final AbstractWire wire) {
Expand Down

0 comments on commit 2f50ef6

Please sign in to comment.