Skip to content

Commit

Permalink
exclude "message history" elements by default in chronicle reader
Browse files Browse the repository at this point in the history
  • Loading branch information
epickrram committed Aug 14, 2017
1 parent cf2482c commit 774075c
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 5 deletions.
24 changes: 20 additions & 4 deletions src/main/java/net/openhft/chronicle/queue/ChronicleReader.java
Expand Up @@ -39,15 +39,16 @@
final class ChronicleReader {
private static final long UNSET_VALUE = Long.MIN_VALUE;

private final List<Pattern> inclusionRegex = new ArrayList<>();
private final List<Pattern> exclusionRegex = new ArrayList<>();
private final Pauser pauser = Pauser.balanced();
private Path basePath;
private List<Pattern> inclusionRegex = new ArrayList<>();
private List<Pattern> exclusionRegex = new ArrayList<>();
private long startIndex = UNSET_VALUE;
private boolean tailInputSource = false;
private boolean excludeMessageHistory = true;
private long maxHistoryRecords = UNSET_VALUE;
private Consumer<String> messageSink;
private Function<ExcerptTailer, DocumentContext> pollMethod = ExcerptTailer::readingDocument;
private Pauser pauser = Pauser.balanced();

void execute() {
try {
Expand Down Expand Up @@ -89,7 +90,8 @@ void execute() {
text = serialisedMessage.toString();
}

applyFiltersAndLog(text, tailer.index());
applyFiltersAndLog(excludeMessageHistory ? stripMessageHistory(text) : text,
tailer.index());
}
}
} finally {
Expand Down Expand Up @@ -140,12 +142,26 @@ ChronicleReader historyRecords(final long maxHistoryRecords) {
return this;
}

ChronicleReader includeMessageHistory() {
excludeMessageHistory = false;
return this;
}

// visible for testing
ChronicleReader withDocumentPollMethod(final Function<ExcerptTailer, DocumentContext> pollMethod) {
this.pollMethod = pollMethod;
return this;
}

private String stripMessageHistory(final String text) {
if (!excludeMessageHistory || !text.startsWith("history: {\n")) {
return text;
}
final int messageHistoryClosingBracePosition = text.indexOf('}');

return text.substring(Math.min(text.length() - 1, messageHistoryClosingBracePosition + 2), text.length());
}

private boolean queueHasBeenModifiedSinceLastCheck(final long lastObservedTailIndex) {
return getCurrentTailIndex() != lastObservedTailIndex;
}
Expand Down
Expand Up @@ -4,6 +4,7 @@
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.MethodWriterBuilder;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
Expand Down Expand Up @@ -43,14 +44,31 @@ public class ChronicleReaderTest {
public void before() throws Exception {
dataDir = DirectoryUtils.tempDir(ChronicleReaderTest.class.getSimpleName()).toPath();
try (final SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(dataDir).testBlockSize().build()) {
final StringEvents events = queue.acquireAppender().methodWriterBuilder(StringEvents.class).build();
final ExcerptAppender excerptAppender = queue.acquireAppender();
final MethodWriterBuilder<StringEvents> methodWriterBuilder = excerptAppender.methodWriterBuilder(StringEvents.class);
methodWriterBuilder.recordHistory(true);
final StringEvents events = methodWriterBuilder.build();

for (int i = 0; i < 24; i++) {
events.say(i % 2 == 0 ? "hello" : "goodbye");
}
}
}

@Test
public void shouldExcludeMessageHistoryByDefault() throws Exception {
basicReader().execute();

assertThat(capturedOutput.stream().anyMatch(msg -> msg.contains("history:")), is(false));
}

@Test
public void shouldIncludeMessageHistoryWhenConfigured() throws Exception {
basicReader().includeMessageHistory().execute();

assertThat(capturedOutput.stream().anyMatch(msg -> msg.contains("history:")), is(true));
}

@Test(timeout = 5000)
public void readOnlyQueueTailerShouldObserveChangesAfterInitiallyObservedReadLimit() throws Exception {
DirectoryUtils.deleteDir(dataDir.toFile());
Expand Down

0 comments on commit 774075c

Please sign in to comment.