Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow named tailers to be used concurrently to split work #964 #965

Closed
wants to merge 7 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,12 @@ public ExcerptTailer createTailer(String id) {
return storeTailer;
}

/**
* The last index read for a named tailer.
*
* @param id of the last index
* @return the LongValue reference to this value.
*/
@Override
@NotNull
public LongValue indexForId(@NotNull String id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class StoreTailer extends AbstractCloseable
private final StoreTailerContext context = new StoreTailerContext();
private final MoveToState moveToState = new MoveToState();
private final Finalizer finalizer;
long index; // index of the next read.
long index, indexChecker; // index of the next read.
long lastReadIndex; // index of the last read message
@Nullable
SingleChronicleQueueStore store;
Expand Down Expand Up @@ -187,6 +187,12 @@ public String toString() {
@NotNull
@Override
public DocumentContext readingDocument(final boolean includeMetaData) {
return indexValue == null
? readingDocumentUnnamed(includeMetaData)
: readingDocumentNamed(includeMetaData);
}

DocumentContext readingDocumentUnnamed(final boolean includeMetaData) {
DocumentContext documentContext = readingDocument0(includeMetaData);
// this check was added after a strange behaviour seen by one client. I should be impossible.
if (documentContext.wire() != null)
Expand All @@ -195,6 +201,25 @@ public DocumentContext readingDocument(final boolean includeMetaData) {
return documentContext;
Copy link
Contributor

@nicktindall nicktindall Dec 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to replicate this check in the named version? or can we remove it from unnamed? it makes it more confusing if they're inconsistent

}

DocumentContext readingDocumentNamed(final boolean includeMetaData) {
for (int i = 0; i < 100; i++) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under what conditions would this loop run to completion and throw the AssertionError?
Stopping after 100 iterations looks arbitrary. Is there something more robust we can do here?
It feels that the loop itself should retry indefinitely, and either succeed eventually, or internally detect any genuine error condition and throw from the body

this.indexChecker = indexValue.getVolatileValue();
if (this.index != indexChecker)
moveToIndex(this.indexChecker);

DocumentContext documentContext = readingDocument0(includeMetaData);

if (indexChecker != Long.MIN_VALUE) {
this.index = indexChecker;
if (context.isPresent() && !context.isMetaData())
incrementIndex();
return documentContext;
}
documentContext.close();
}
throw new AssertionError();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible in cases of high contention that you'd get to the end of the 100 iterations? Probably very unlikely but even if it's a remote possibility we should probably put a meaningful exception/message in here.

}

DocumentContext readingDocument0(final boolean includeMetaData) {
throwExceptionIfClosed();

Expand Down Expand Up @@ -293,12 +318,16 @@ private boolean next0(final boolean includeMetaData) throws StreamCorruptedExcep
} else {
if (!moveToIndexInternal(firstIndex))
return false;
// had to reset the index.
this.indexChecker = index();
}
break;

case NOT_REACHED_IN_CYCLE:
if (!moveToIndexInternal(index))
return false;
// had to reset the index.
this.indexChecker = index();
break;

case FOUND_IN_CYCLE: {
Expand Down Expand Up @@ -577,7 +606,7 @@ private long nextIndexWithinFoundCycle(final int nextCycle) {
*/
@Override
public long index() {
return indexValue == null ? this.index : indexValue.getValue();
return context.isPresent() || indexValue == null ? this.index : indexValue.getVolatileValue();
}

@Override
Expand All @@ -588,9 +617,18 @@ public int cycle() {
@Override
public boolean moveToIndex(final long index) {
throwExceptionIfClosed();
if (moveToIndex0(index)) {
if (indexValue != null)
indexValue.setOrderedValue(index);
return true;
}
return false;
}

boolean moveToIndex0(final long index) {
if (moveToState.canReuseLastIndexMove(index, state, direction, queue, privateWire())) {
return setAddress(true);

} else if (moveToState.indexIsCloseToAndAheadOfLastIndexMove(index, state, direction, queue)) {
final long knownIndex = moveToState.lastMovedToIndex;
final boolean found =
Expand Down Expand Up @@ -1083,10 +1121,11 @@ private boolean tryWindBack(final int cycle) {
}

void index0(final long index) {
if (indexValue == null)
if (indexValue == null) {
this.index = index;
else
indexValue.setValue(index);
} else if (!indexValue.compareAndSwapValue(this.indexChecker, index)) {
this.indexChecker = Long.MIN_VALUE; // invalid.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to give this a constant with a meaningful name? It's a bit hard to follow what the significance of Long.MIN_VALUE is.

}
}

// DON'T INLINE THIS METHOD, as it's used by enterprise chronicle queue
Expand Down Expand Up @@ -1327,7 +1366,7 @@ public void close() {
if (rollbackIfNeeded())
return;

if (isPresent() && !isMetaData())
if (isPresent() && !isMetaData() && indexValue == null)
incrementIndex();

super.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
package net.openhft.chronicle.queue;

import net.openhft.chronicle.bytes.BytesStore;
import net.openhft.chronicle.bytes.ref.LongReference;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.core.io.IOTools;
import net.openhft.chronicle.core.time.SetTimeProvider;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.wire.DocumentContext;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;

import java.io.File;
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class ConcurrentNamedTailersTest {
@Test
public void concurrentNamedTailers() {
File tmpDir = new File(OS.getTarget(), IOTools.tempName("concurrentNamedTailers"));

final SetTimeProvider timeProvider = new SetTimeProvider("2021/12/03T12:34:56").advanceMillis(1000);
final String tailerName = "named";
try (ChronicleQueue q = SingleChronicleQueueBuilder.single(tmpDir).testBlockSize().timeProvider(timeProvider).build();
final ExcerptAppender appender = q.acquireAppender();
final ExcerptTailer tailer0 = q.createTailer(tailerName);
final ExcerptTailer tailer1 = q.createTailer(tailerName);
final ExcerptTailer tailer2 = q.createTailer(tailerName)) {

final Tasker tasker = appender.methodWriter(Tasker.class);
for (int i = 0; i < 20; i++)
tasker.task(i);

assertEquals(0x0, tailer0.index());
assertEquals(0x0, tailer1.index());
assertEquals(0x0, tailer2.index());

try (DocumentContext dc0 = tailer0.readingDocument()) {
assertEquals(0x4a1400000000L, tailer0.index());

try (DocumentContext dc1 = tailer1.readingDocument()) {
assertEquals(0x4a1400000001L, tailer1.index());
assertEquals(0x4a1400000000L, tailer0.index());

try (DocumentContext dc2 = tailer2.readingDocument()) {
assertEquals(0x4a1400000002L, tailer2.index());
assertEquals(0x4a1400000001L, tailer1.index());
assertEquals(0x4a1400000000L, tailer0.index());
}
}
}

try (DocumentContext dc0 = tailer0.readingDocument()) {
assertEquals(0x4a1400000003L, tailer0.index());

assertTrue(tailer2.moveToIndex(0x4a140000000AL));

try (DocumentContext dc1 = tailer1.readingDocument()) {
assertEquals(0x4a140000000AL, tailer1.index());
assertEquals(0x4a1400000003L, tailer0.index());

try (DocumentContext dc2 = tailer2.readingDocument()) {
assertEquals(0x4a140000000BL, tailer2.index());
assertEquals(0x4a140000000AL, tailer1.index());
assertEquals(0x4a1400000003L, tailer0.index());
}
}
}

IOTools.deleteDirWithFiles(tmpDir);
}
}

@Test
public void raceConditions() throws IllegalAccessException {
File tmpDir = new File(OS.getTarget(), IOTools.tempName("raceConditions"));

final SetTimeProvider timeProvider = new SetTimeProvider("2021/12/03T12:34:56").advanceMillis(1000);
final String tailerName = "named";
try (ChronicleQueue q = SingleChronicleQueueBuilder.single(tmpDir).testBlockSize().timeProvider(timeProvider).build();
final ExcerptAppender appender = q.acquireAppender();
final ExcerptTailer tailer0 = q.createTailer(tailerName)) {

final Tasker tasker = appender.methodWriter(Tasker.class);
for (int i = 0; i < 20; i++)
tasker.task(i);

DummyLongReference indexValue = new DummyLongReference();
Jvm.getField(tailer0.getClass(), "indexValue")
.set(tailer0, indexValue);

indexValue.getValues.add(0x4a1100000000L);

assertEquals(0x4a1100000000L, tailer0.index());

assertEquals(0, indexValue.setValues.size());

indexValue.getValues.add(0x4a1100000000L);
indexValue.getValues.add(0x4a1200000000L);
// pretend another tailer came in
indexValue.getValues.add(0x4a1400000001L);
indexValue.getValues.add(0x4a1400000001L);
indexValue.getValues.add(0x4a1400000001L);


try (DocumentContext dc0 = tailer0.readingDocument()) {
assertEquals(0x4a1400000001L, tailer0.index());
}

// changed before read
indexValue.getValues.add(0x4a1400000003L);
indexValue.getValues.add(0x4a1400000003L);

// changed during read
indexValue.getValues.add(0x4a1400000005L);
// changed during read again
indexValue.getValues.add(0x4a1400000007L);
// stable
indexValue.getValues.add(0x4a1400000007L);
indexValue.getValues.add(0x4a1400000007L);

try (DocumentContext dc0 = tailer0.readingDocument()) {
assertEquals(0x4a1400000007L, tailer0.index());
}
assertEquals("[4a1400000002, 4a1400000003, 4a1400000007, 4a1400000007, 4a1400000008]",
indexValue.setValues.stream().map(Long::toHexString).collect(Collectors.toList()).toString());

IOTools.deleteDirWithFiles(tmpDir);
}
}

interface Tasker {
void task(int taskId);
}

static class DummyLongReference implements LongReference {
List<Long> getValues = new ArrayList<>();
List<Long> setValues = new ArrayList<>();

@Override
public void bytesStore(BytesStore bytesStore, long offset, long length) throws IllegalStateException, IllegalArgumentException, BufferOverflowException, BufferUnderflowException {
throw new AssertionError();
}

@Override
public @Nullable BytesStore bytesStore() {
throw new AssertionError();
}

@Override
public long offset() {
return 0;
}

@Override
public long maxSize() {
return 0;
}

@Override
public long getValue() throws IllegalStateException {
return getValues.remove(0);
}

@Override
public void setValue(long value) throws IllegalStateException {
setValues.add(value);
}

@Override
public long getVolatileValue() throws IllegalStateException {
return getValue();
}

@Override
public void setVolatileValue(long value) throws IllegalStateException {
setValue(value);
}

@Override
public void setOrderedValue(long value) throws IllegalStateException {
setValue(value);
}

@Override
public long addValue(long delta) throws IllegalStateException {
throw new AssertionError();
}

@Override
public long addAtomicValue(long delta) throws IllegalStateException {
throw new AssertionError();
}

@Override
public boolean compareAndSwapValue(long expected, long value) throws IllegalStateException {
if (getValue() == expected) {
setValue(value);
return true;
}
return false;
}
}
}