Skip to content

Commit

Permalink
fix command-line handling
Browse files Browse the repository at this point in the history
  • Loading branch information
epickrram committed Jun 20, 2017
1 parent 6dd859f commit 4b92d80
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 26 deletions.
12 changes: 9 additions & 3 deletions src/main/java/net/openhft/chronicle/queue/ChronicleReader.java
Expand Up @@ -27,6 +27,7 @@


import java.nio.file.Path; import java.nio.file.Path;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Pattern; import java.util.regex.Pattern;


final class ChronicleReader { final class ChronicleReader {
Expand All @@ -39,6 +40,7 @@ final class ChronicleReader {
private boolean tailInputSource = false; private boolean tailInputSource = false;
private long maxHistoryRecords = UNSET_VALUE; private long maxHistoryRecords = UNSET_VALUE;
private Consumer<String> messageSink; private Consumer<String> messageSink;
private Function<ExcerptTailer, DocumentContext> pollMethod = ExcerptTailer::readingDocument;


void execute() { void execute() {
final ChronicleQueue inputQueue = SingleChronicleQueueBuilder.binary(basePath.toFile()).build(); final ChronicleQueue inputQueue = SingleChronicleQueueBuilder.binary(basePath.toFile()).build();
Expand All @@ -50,7 +52,7 @@ void execute() {


//noinspection InfiniteLoopStatement //noinspection InfiniteLoopStatement
while (true) { while (true) {
try (DocumentContext dc = tailer.readingDocument()) { try (DocumentContext dc = pollMethod.apply(tailer)) {
if (!dc.isPresent()) { if (!dc.isPresent()) {
if (!tailInputSource) if (!tailInputSource)
break; break;
Expand Down Expand Up @@ -111,8 +113,6 @@ private void applyFiltersAndLog(final String text, final long index) {
} }
} }




ChronicleReader withMessageSink(final Consumer<String> messageSink) { ChronicleReader withMessageSink(final Consumer<String> messageSink) {
this.messageSink = messageSink; this.messageSink = messageSink;
return this; return this;
Expand Down Expand Up @@ -148,6 +148,12 @@ ChronicleReader historyRecords(final long maxHistoryRecords) {
return this; return this;
} }


// visible for testing
ChronicleReader withDocumentPollMethod(final Function<ExcerptTailer, DocumentContext> pollMethod) {
this.pollMethod = pollMethod;
return this;
}

private static boolean isSet(final long configValue) { private static boolean isSet(final long configValue) {
return configValue != UNSET_VALUE; return configValue != UNSET_VALUE;
} }
Expand Down
49 changes: 31 additions & 18 deletions src/main/java/net/openhft/chronicle/queue/ChronicleReaderMain.java
Expand Up @@ -22,6 +22,7 @@
import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException; import org.apache.commons.cli.ParseException;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
Expand All @@ -44,9 +45,10 @@ public static void main(@NotNull String[] args) throws IOException {
final CommandLine commandLine = parseCommandLine(args, options); final CommandLine commandLine = parseCommandLine(args, options);


final ChronicleReader chronicleReader = new ChronicleReader(). final ChronicleReader chronicleReader = new ChronicleReader().
withMessageSink(System.out::println).
withBasePath(Paths.get(commandLine.getOptionValue('d'))); withBasePath(Paths.get(commandLine.getOptionValue('d')));


configureReader(options, chronicleReader); configureReader(chronicleReader, commandLine);


chronicleReader.execute(); chronicleReader.execute();
} }
Expand All @@ -57,40 +59,51 @@ private static CommandLine parseCommandLine(final @NotNull String[] args, final
try { try {
commandLine = parser.parse(options, args); commandLine = parser.parse(options, args);
} catch (ParseException e) { } catch (ParseException e) {
new HelpFormatter().printUsage(new PrintWriter(System.out), 120, final PrintWriter writer = new PrintWriter(System.out);
new HelpFormatter().printUsage(writer, 180,
ChronicleReaderMain.class.getSimpleName(), options); ChronicleReaderMain.class.getSimpleName(), options);
writer.flush();
System.exit(1); System.exit(1);
} }
return commandLine; return commandLine;
} }


private static void configureReader(final Options options, final ChronicleReader chronicleReader) { private static void configureReader(final ChronicleReader chronicleReader, final CommandLine commandLine) {
if (options.hasOption("i")) { if (commandLine.hasOption('i')) {
chronicleReader.withInclusionRegex(options.getOption("i").getValue()); chronicleReader.withInclusionRegex(commandLine.getOptionValue('i'));
} }
if (options.hasOption("e")) { if (commandLine.hasOption('e')) {
chronicleReader.withExclusionRegex(options.getOption("e").getValue()); chronicleReader.withExclusionRegex(commandLine.getOptionValue('e'));
} }
if (options.hasOption("f")) { if (commandLine.hasOption('f')) {
chronicleReader.tail(); chronicleReader.tail();
} }
if (options.hasOption("m")) { if (commandLine.hasOption('m')) {
chronicleReader.historyRecords(Long.parseLong(options.getOption("m").getValue())); chronicleReader.historyRecords(Long.parseLong(commandLine.getOptionValue('m')));
} }
if (options.hasOption("n")) { if (commandLine.hasOption('n')) {
chronicleReader.withStartIndex(Long.decode(options.getOption("n").getValue())); chronicleReader.withStartIndex(Long.decode(commandLine.getOptionValue('n')));
} }
} }


@NotNull @NotNull
private static Options options() { private static Options options() {
final Options options = new Options(); final Options options = new Options();
options.addRequiredOption("d", "directory", true, "Directory containing chronicle queue files");
options.addOption("i", "include-regex", true, "Display records containing this regular expression"); addOption(options, "d", "directory", true, "Directory containing chronicle queue files", true);
options.addOption("e", "exclude-regex", true, "Do not display records containing this regular expression"); addOption(options, "i", "include-regex", true, "Display records containing this regular expression", false);
options.addOption("f", "follow", false, "Tail behaviour - wait for new records to arrive"); addOption(options, "e", "exclude-regex", true, "Do not display records containing this regular expression", false);
options.addOption("m", "max-history", true, "Show this many records from the end of the data set"); addOption(options, "f", "follow", false, "Tail behaviour - wait for new records to arrive", false);
options.addOption("n", "from-index", true, "Start reading from this index (e.g. 0x123ABE)"); addOption(options, "m", "max-history", true, "Show this many records from the end of the data set", false);
addOption(options, "n", "from-index", true, "Start reading from this index (e.g. 0x123ABE)", false);
return options; return options;
} }

private static void addOption(final Options options, final String opt, final String argName, final boolean hasArg,
final String description, final boolean isRequired) {
final Option option = new Option(opt, hasArg, description);
option.setArgName(argName);
option.setRequired(isRequired);
options.addOption(option);
}
} }
50 changes: 45 additions & 5 deletions src/test/java/net/openhft/chronicle/queue/ChronicleReaderTest.java
@@ -1,13 +1,14 @@
package net.openhft.chronicle.queue; package net.openhft.chronicle.queue;


import net.openhft.chronicle.wire.DocumentContext;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;


import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Function;


import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
Expand Down Expand Up @@ -69,10 +70,49 @@ public void shouldFailIfSpecifiedIndexIsBeforeFirstIndex() throws Exception {
basicReader().withStartIndex(1L).execute(); basicReader().withStartIndex(1L).execute();
} }


@Ignore @Test
@Test(expected = IllegalArgumentException.class) public void shouldNotRewindPastStartOfQueueWhenDisplayingHistory() throws Exception {
public void shouldFailIfSpecifiedIndexIsAfterLastIndex() throws Exception { basicReader().historyRecords(Long.MAX_VALUE).execute();
basicReader().withStartIndex(Long.MAX_VALUE).execute();
assertThat(capturedOutput.stream().
filter(msg -> !msg.startsWith("0x")).count(), is(12L));
}

@Test
public void shouldContinueToPollQueueWhenTailModeIsEnabled() throws Exception {
final int expectedPollCountWhenDocumentIsEmpty = 3;
final FiniteDocumentPollMethod pollMethod = new FiniteDocumentPollMethod(expectedPollCountWhenDocumentIsEmpty);
try {
basicReader().withDocumentPollMethod(pollMethod).tail().execute();
} catch (ArithmeticException e) {
// expected
}

assertThat(pollMethod.invocationCount, is(expectedPollCountWhenDocumentIsEmpty));
}

private static final class FiniteDocumentPollMethod implements Function<ExcerptTailer, DocumentContext> {

private final int maxPollsReturningEmptyDocument;
private int invocationCount;

private FiniteDocumentPollMethod(final int maxPollsReturningEmptyDocument) {
this.maxPollsReturningEmptyDocument = maxPollsReturningEmptyDocument;
}

@Override
public DocumentContext apply(final ExcerptTailer excerptTailer) {
final DocumentContext documentContext = excerptTailer.readingDocument();

if (!documentContext.isPresent()) {
invocationCount++;
if (invocationCount >= maxPollsReturningEmptyDocument) {
throw new ArithmeticException("For testing purposes");
}
}

return documentContext;
}
} }


private ChronicleReader basicReader() { private ChronicleReader basicReader() {
Expand Down

0 comments on commit 4b92d80

Please sign in to comment.