Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow named tailers to be used concurrently to split work #964 #965

Closed
wants to merge 7 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,12 @@ public ExcerptTailer createTailer(String id) {
return storeTailer;
}

/**
* The last index read for a named tailer.
*
* @param id of the last index
* @return the LongValue reference to this value.
*/
@Override
@NotNull
public LongValue indexForId(@NotNull String id) {
Expand Down
106 changes: 83 additions & 23 deletions src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
*/
class StoreTailer extends AbstractCloseable
implements ExcerptTailer, SourceContext, ExcerptContext {
public static final long INVALID_INDEX = Long.MIN_VALUE;
static final int INDEXING_LINEAR_SCAN_THRESHOLD = 70;
static final StringBuilderPool SBP = new StringBuilderPool();
static final EOFException EOF_EXCEPTION = new EOFException();
Expand All @@ -47,7 +48,7 @@ class StoreTailer extends AbstractCloseable
private final StoreTailerContext context = new StoreTailerContext();
private final MoveToState moveToState = new MoveToState();
private final Finalizer finalizer;
long index; // index of the next read.
long index, indexChecker; // index of the next read.
long lastReadIndex; // index of the last read message
@Nullable
SingleChronicleQueueStore store;
Expand All @@ -57,7 +58,7 @@ class StoreTailer extends AbstractCloseable
private boolean readAfterReplicaAcknowledged;
@NotNull
private TailerState state = UNINITIALISED;
private long indexAtCreation = Long.MIN_VALUE;
private long indexAtCreation = INVALID_INDEX;
private boolean readingDocumentFound = false;
private long address = NO_PAGE;
private boolean striding = false;
Expand Down Expand Up @@ -177,7 +178,7 @@ public int sourceId() {
@NotNull
@Override
public String toString() {
final long index = index();
final long index = this.index; // don't use index() as this confuses the debugger
return "StoreTailer{" +
"index sequence=" + queue.rollCycle().toSequenceNumber(index) +
", index cycle=" + queue.rollCycle().toCycle(index) +
Expand All @@ -187,6 +188,12 @@ public String toString() {
@NotNull
@Override
public DocumentContext readingDocument(final boolean includeMetaData) {
return indexValue == null
? readingDocumentUnnamed(includeMetaData)
: readingDocumentNamed(includeMetaData);
}

DocumentContext readingDocumentUnnamed(final boolean includeMetaData) {
DocumentContext documentContext = readingDocument0(includeMetaData);
// this check was added after a strange behaviour seen by one client. I should be impossible.
if (documentContext.wire() != null)
Expand All @@ -195,6 +202,27 @@ public DocumentContext readingDocument(final boolean includeMetaData) {
return documentContext;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to replicate this check in the named version? or can we remove it from unnamed? it makes it more confusing if they're inconsistent

}

DocumentContext readingDocumentNamed(final boolean includeMetaData) {
for (int i = 0; i < 1_000_000; i++) {
this.indexChecker = indexValue.getVolatileValue();
if (this.index != indexChecker)
moveToIndex(this.indexChecker);

DocumentContext documentContext = readingDocument0(includeMetaData);

if (indexChecker != INVALID_INDEX) {
this.index = indexChecker;
if (context.isPresent() && !context.isMetaData())
incrementIndex();
return documentContext;
}
documentContext.close();
if (i > 1000)
Thread.yield();
}
throw new AssertionError();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible in cases of high contention that you'd get to the end of the 100 iterations? Probably very unlikely but even if it's a remote possibility we should probably put a meaningful exception/message in here.

}

DocumentContext readingDocument0(final boolean includeMetaData) {
throwExceptionIfClosed();

Expand Down Expand Up @@ -293,12 +321,16 @@ private boolean next0(final boolean includeMetaData) throws StreamCorruptedExcep
} else {
if (!moveToIndexInternal(firstIndex))
return false;
// had to reset the index.
this.indexChecker = index();
}
break;

case NOT_REACHED_IN_CYCLE:
if (!moveToIndexInternal(index))
return false;
// had to reset the index.
this.indexChecker = index();
break;

case FOUND_IN_CYCLE: {
Expand Down Expand Up @@ -340,7 +372,7 @@ private boolean endOfCycle() {
final int currentCycle = queue.rollCycle().toCycle(oldIndex);
final long nextIndex = nextIndexWithNextAvailableCycle(currentCycle);

if (nextIndex != Long.MIN_VALUE) {
if (nextIndex != INVALID_INDEX) {
return nextEndOfCycle(queue.rollCycle().toCycle(nextIndex));
} else {
state = END_OF_CYCLE;
Expand Down Expand Up @@ -400,7 +432,7 @@ private boolean beyondStartOfCycleBackward() throws StreamCorruptedException {
final int cycle = queue.rollCycle().toCycle(index());
final long nextIndex = nextIndexWithNextAvailableCycle(cycle);

if (nextIndex != Long.MIN_VALUE) {
if (nextIndex != INVALID_INDEX) {
moveToIndexInternal(nextIndex);
state = FOUND_IN_CYCLE;
return true;
Expand All @@ -411,7 +443,7 @@ private boolean beyondStartOfCycleBackward() throws StreamCorruptedException {
}

private boolean nextCycleNotFound() {
if (index() == Long.MIN_VALUE) {
if (index() == INVALID_INDEX) {
if (this.store != null)
queue.closeStore(this.store);
this.store = null;
Expand Down Expand Up @@ -512,7 +544,7 @@ private long nextIndexWithNextAvailableCycle0(final int cycle) {
assert cycle != Integer.MIN_VALUE : "cycle == Integer.MIN_VALUE";

if (cycle > queue.lastCycle() || direction == TailerDirection.NONE) {
return Long.MIN_VALUE;
return INVALID_INDEX;
}

long nextIndex;
Expand All @@ -524,7 +556,7 @@ private long nextIndexWithNextAvailableCycle0(final int cycle) {
try {
final int nextCycle0 = queue.nextCycle(this.cycle, direction);
if (nextCycle0 == -1)
return Long.MIN_VALUE;
return INVALID_INDEX;

nextIndex = nextIndexWithinFoundCycle(nextCycle0);

Expand All @@ -534,7 +566,7 @@ private long nextIndexWithNextAvailableCycle0(final int cycle) {

if (Jvm.isResourceTracing()) {
final int nextIndexCycle = queue.rollCycle().toCycle(nextIndex);
if (nextIndex != Long.MIN_VALUE && nextIndexCycle - 1 != cycle) {
if (nextIndex != INVALID_INDEX && nextIndexCycle - 1 != cycle) {

/*
* lets say that you were using a roll cycle of TEST_SECONDLY
Expand Down Expand Up @@ -577,7 +609,7 @@ private long nextIndexWithinFoundCycle(final int nextCycle) {
*/
@Override
public long index() {
return indexValue == null ? this.index : indexValue.getValue();
return context.isPresent() || indexValue == null ? this.index : indexValue.getVolatileValue();
}

@Override
Expand All @@ -588,9 +620,18 @@ public int cycle() {
@Override
public boolean moveToIndex(final long index) {
throwExceptionIfClosed();
if (moveToIndex0(index)) {
if (indexValue != null)
indexValue.setOrderedValue(index);
return true;
}
return false;
}

boolean moveToIndex0(final long index) {
if (moveToState.canReuseLastIndexMove(index, state, direction, queue, privateWire())) {
return setAddress(true);

} else if (moveToState.indexIsCloseToAndAheadOfLastIndexMove(index, state, direction, queue)) {
final long knownIndex = moveToState.lastMovedToIndex;
final boolean found =
Expand Down Expand Up @@ -698,6 +739,9 @@ ScanResult moveToIndexResult(final long index) {

private ExcerptTailer doToStart() {
assert direction != BACKWARD;

this.indexChecker = INVALID_INDEX;

final int firstCycle = queue.firstCycle();
if (firstCycle == Integer.MAX_VALUE) {
state = UNINITIALISED;
Expand Down Expand Up @@ -752,7 +796,7 @@ private long approximateLastIndex() {
final int lastCycle = queue.lastCycle();
try {
if (lastCycle == Integer.MIN_VALUE)
return Long.MIN_VALUE;
return INVALID_INDEX;

return approximateLastCycle2(lastCycle);

Expand Down Expand Up @@ -901,6 +945,8 @@ public boolean striding() {

@NotNull
private ExcerptTailer optimizedToEnd() {
this.indexChecker = INVALID_INDEX;

final RollCycle rollCycle = queue.rollCycle();
final int lastCycle = queue.lastCycle();
try {
Expand Down Expand Up @@ -952,7 +998,7 @@ public ExcerptTailer originalToEnd() {

long index = approximateLastIndex();

if (index == Long.MIN_VALUE) {
if (index == INVALID_INDEX) {
if (state() == TailerState.CYCLE_NOT_FOUND)
state = UNINITIALISED;
return this;
Expand Down Expand Up @@ -1083,17 +1129,24 @@ private boolean tryWindBack(final int cycle) {
}

void index0(final long index) {
if (indexValue == null)
if (indexValue == null) {
this.index = index;
else
indexValue.setValue(index);
} else if (!indexValue.compareAndSwapValue(this.indexChecker, index)) {
this.indexChecker = Long.MIN_VALUE; // invalid.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to give this a constant with a meaningful name? It's a bit hard to follow what the significance of Long.MIN_VALUE is.

}
}

// DON'T INLINE THIS METHOD, as it's used by enterprise chronicle queue
void index(final long index) {
index0(index);
this.index = index;
if (indexValue != null) {
if (this.indexChecker == INVALID_INDEX) {
indexValue.setValue(index);
} else if (!indexValue.compareAndSwapValue(this.indexChecker, index))
this.indexChecker = INVALID_INDEX;
}

if (indexAtCreation == Long.MIN_VALUE) {
if (indexAtCreation == INVALID_INDEX) {
indexAtCreation = index;
}

Expand Down Expand Up @@ -1247,9 +1300,9 @@ public File currentFile() {
}

static final class MoveToState {
private long lastMovedToIndex = Long.MIN_VALUE;
private long lastMovedToIndex = INVALID_INDEX;
private TailerDirection directionAtLastMoveTo = TailerDirection.NONE;
private long readPositionAtLastMove = Long.MIN_VALUE;
private long readPositionAtLastMove = INVALID_INDEX;
private int indexMoveCount = 0;

void onSuccessfulLookup(final long movedToIndex,
Expand All @@ -1269,16 +1322,16 @@ void onSuccessfulScan(final long movedToIndex,
}

void reset() {
lastMovedToIndex = Long.MIN_VALUE;
lastMovedToIndex = INVALID_INDEX;
directionAtLastMoveTo = TailerDirection.NONE;
readPositionAtLastMove = Long.MIN_VALUE;
readPositionAtLastMove = INVALID_INDEX;
}

private boolean indexIsCloseToAndAheadOfLastIndexMove(final long index,
final TailerState state,
final TailerDirection direction,
final ChronicleQueue queue) {
return lastMovedToIndex != Long.MIN_VALUE &&
return lastMovedToIndex != INVALID_INDEX &&
index - lastMovedToIndex < INDEXING_LINEAR_SCAN_THRESHOLD &&
state == FOUND_IN_CYCLE &&
direction == directionAtLastMoveTo &&
Expand Down Expand Up @@ -1322,12 +1375,19 @@ public int sourceId() {
return StoreTailer.this.sourceId();
}

@Override
public void rollbackOnClose() {
if (indexValue != null)
throw new IllegalStateException("Can't roll back a named tailer");
super.rollbackOnClose();
}

@Override
public void close() {
if (rollbackIfNeeded())
return;

if (isPresent() && !isMetaData())
if (isPresent() && !isMetaData() && indexValue == null)
incrementIndex();

super.close();
Expand Down