Skip to content

Commit

Permalink
add test to document EOF behaviour on empty queue file
Browse files Browse the repository at this point in the history
  • Loading branch information
epickrram committed Nov 1, 2017
1 parent 11d2b52 commit dfe5d3f
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 5 deletions.
Expand Up @@ -106,11 +106,24 @@ static void writeEOFIfNeeded(@NotNull Path newFilePath, @NotNull WireType wireTy
if (Files.exists(queuePath) && hasQueueFiles(queuePath)) if (Files.exists(queuePath) && hasQueueFiles(queuePath))
getLastQueueFileButNotTheNew(queuePath, newFilePath).ifPresent(f -> getLastQueueFileButNotTheNew(queuePath, newFilePath).ifPresent(f ->
processQueueFile(f, wireType, blockSize, false, (w, qs) -> { processQueueFile(f, wireType, blockSize, false, (w, qs) -> {
long l = qs.writePosition();
Bytes<?> bytes = w.bytes(); Bytes<?> bytes = w.bytes();
long len = Wires.lengthOf(bytes.readVolatileInt(l)); long writePosition = qs.writePosition();
long eofOffset = l + len + 4L; if (writePosition == 0) {
if (0 == bytes.readVolatileInt(eofOffset)) { // nothing has been written to the file, let the StoreTailer fix it up
return null;
}

final int recordHeader = bytes.readVolatileInt(writePosition);
if (Wires.isNotComplete(recordHeader)) {
// can't determine the length, so don't try to write anything
return null;
}

long recordLength = Wires.lengthOf(recordHeader);
long eofOffset = writePosition + recordLength + 4L;

final int existingValue = bytes.readVolatileInt(eofOffset);
if (0 == existingValue) {
// no EOF found - write EOF // no EOF found - write EOF
try { try {
bytes.writePosition(eofOffset); bytes.writePosition(eofOffset);
Expand Down
Expand Up @@ -212,8 +212,9 @@ public WireStore writePosition(long position) {


assert writePosition.getVolatileValue() + mappedFile.chunkSize() > position; assert writePosition.getVolatileValue() + mappedFile.chunkSize() > position;
int header = mappedBytes.readVolatileInt(position); int header = mappedBytes.readVolatileInt(position);
if (Wires.isReadyData(header)) if (Wires.isReadyData(header)) {
writePosition.setMaxValue(position); writePosition.setMaxValue(position);
}
else else
throw new AssertionError(); throw new AssertionError();
return this; return this;
Expand Down
@@ -0,0 +1,85 @@
package net.openhft.chronicle.queue.impl.single;

import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.WireStore;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.Wires;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;

public final class EofMarkerOnEmptyQueueTest {
@Rule
public TemporaryFolder tmpFolder = new TemporaryFolder();


@Test
public void shouldRecoverFromEmptyQueueOnRoll() throws Exception {
final AtomicLong clock = new AtomicLong(System.currentTimeMillis());
try (final SingleChronicleQueue queue =
SingleChronicleQueueBuilder.binary(tmpFolder.newFolder()).
rollCycle(RollCycles.TEST_SECONDLY).
timeProvider(clock::get).
timeoutMS(1_000).
testBlockSize().build()) {
final ExcerptAppender appender = queue.acquireAppender();
final DocumentContext context = appender.writingDocument();
// start to write a message, but don't close the context - simulates crashed writer
final long expectedEofMarkerPosition = context.wire().bytes().writePosition() - Wires.SPB_HEADER_SIZE;
context.wire().writeEventName("foo").int32(1);

final int startCycle = queue.cycle();

clock.addAndGet(TimeUnit.SECONDS.toMillis(1L));

final int nextCycle = queue.cycle();

// ensure that the cycle file will roll
assertThat(startCycle, is(not(nextCycle)));
Executors.newSingleThreadExecutor().submit(() -> {
try (final DocumentContext nextCtx = queue.acquireAppender().writingDocument()) {
nextCtx.wire().writeEventName("bar").int32(7);
}

}).get(1, TimeUnit.SECONDS);

final WireStore firstCycleStore = queue.storeForCycle(startCycle, 0, false);
final long firstCycleWritePosition = firstCycleStore.writePosition();
// assert that no write was completed
assertThat(firstCycleWritePosition, is(0L));

final ExcerptTailer tailer = queue.createTailer();

int recordCount = 0;
int lastItem = -1;
while (true) {
try (final DocumentContext readCtx = tailer.readingDocument()) {
if (!readCtx.isPresent()) {
break;
}

final StringBuilder name = new StringBuilder();
final ValueIn field = readCtx.wire().readEventName(name);
recordCount++;
lastItem = field.int32();
}
}

assertThat(firstCycleStore.bytes().readVolatileInt(expectedEofMarkerPosition),
is(Wires.END_OF_DATA));
assertThat(recordCount, is(1));
assertThat(lastItem, is(7));
}
}
}

0 comments on commit dfe5d3f

Please sign in to comment.