From 2589191da1cdc95061716e68ac9b44765174247b Mon Sep 17 00:00:00 2001 From: Jerry Shea Date: Mon, 27 Nov 2017 10:19:43 +1100 Subject: [PATCH] ChronicleHistoryReader: allow ignore of first n samples, allow summary output, allow output every time window --- .../queue/ChronicleHistoryReaderMain.java | 8 ++ .../queue/reader/ChronicleHistoryReader.java | 132 ++++++++++++++---- .../reader/ChronicleHistoryReaderTest.java | 3 +- 3 files changed, 112 insertions(+), 31 deletions(-) diff --git a/src/main/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMain.java b/src/main/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMain.java index e3b277bfcc..23138aa526 100755 --- a/src/main/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMain.java +++ b/src/main/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMain.java @@ -55,9 +55,14 @@ protected void setup(CommandLine commandLine, ChronicleHistoryReader chronicleHi withMessageSink(System.out::println). withProgress(commandLine.hasOption('p')). withHistosByMethod(commandLine.hasOption('m')). + withSummaryOutput(commandLine.hasOption('u')). withBasePath(Paths.get(commandLine.getOptionValue('d'))); if (commandLine.hasOption('t')) chronicleHistoryReader.withTimeUnit(TimeUnit.valueOf(commandLine.getOptionValue('t'))); + if (commandLine.hasOption('i')) + chronicleHistoryReader.withIgnore(Long.parseLong(commandLine.getOptionValue('i'))); + if (commandLine.hasOption('w')) + chronicleHistoryReader.withMeasurementWindow(Long.parseLong(commandLine.getOptionValue('w'))); } @NotNull @@ -96,8 +101,11 @@ protected Options options() { ChronicleReaderMain.addOption(options, "d", "directory", true, "Directory containing chronicle queue files", true); ChronicleReaderMain.addOption(options, "h", "help-message", false, "Print this help and exit", false); ChronicleReaderMain.addOption(options, "t", "time unit", true, "Time unit. Default nanos", false); + ChronicleReaderMain.addOption(options, "i", "ignore", true, "How many to ignore from start", false); + ChronicleReaderMain.addOption(options, "w", "window", true, "Window duration in time unit", false); options.addOption(new Option("p", false, "Show progress")); options.addOption(new Option("m", false, "By method")); + options.addOption(new Option("u", false, "Summary output")); return options; } } \ No newline at end of file diff --git a/src/main/java/net/openhft/chronicle/queue/reader/ChronicleHistoryReader.java b/src/main/java/net/openhft/chronicle/queue/reader/ChronicleHistoryReader.java index 73ec4b1d26..b4e172de13 100644 --- a/src/main/java/net/openhft/chronicle/queue/reader/ChronicleHistoryReader.java +++ b/src/main/java/net/openhft/chronicle/queue/reader/ChronicleHistoryReader.java @@ -16,6 +16,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.stream.Collectors; /** * Created by Jerry Shea on 29/09/17. @@ -28,6 +29,13 @@ public class ChronicleHistoryReader { private TimeUnit timeUnit = TimeUnit.NANOSECONDS; protected boolean histosByMethod = false; protected Map histos = new LinkedHashMap<>(); + private long ignore = 0; + private long counter = 0; + private long measurementWindowNanos = 0; + private long firstTimeStampNanos = 0; + private long lastWindowCount = 0; + private boolean summaryOutput = false; + private int lastHistosSize = 0; public ChronicleHistoryReader withMessageSink(final Consumer messageSink) { this.messageSink = messageSink; @@ -54,6 +62,21 @@ public ChronicleHistoryReader withHistosByMethod(boolean b) { return this; } + public ChronicleHistoryReader withIgnore(long ignore) { + this.ignore = ignore; + return this; + } + + public ChronicleHistoryReader withMeasurementWindow(long measurementWindow) { + this.measurementWindowNanos = timeUnit.toNanos(measurementWindow); + return this; + } + + public ChronicleHistoryReader withSummaryOutput(boolean b) { + this.summaryOutput = b; + return this; + } + @NotNull private SingleChronicleQueue createQueue() { if (!Files.exists(basePath)) { @@ -67,7 +90,8 @@ private SingleChronicleQueue createQueue() { public void execute() { readChronicle(); - printPercentilesSummary(); + if (measurementWindowNanos == 0) + outputData(); } public Map readChronicle() { @@ -77,10 +101,9 @@ public Map readChronicle() { final MethodReader mr = new MethodReader(tailer, true, parselet, null, parselet); MessageHistory.set(new VanillaMessageHistory()); - int counter = 0; while (! Thread.currentThread().isInterrupted() && mr.readOne()) { ++counter; - if (this.progress && counter % 1_000_000 == 0) { + if (this.progress && counter % 1_000_000L == 0) { System.out.println("Progress: " + counter); } } @@ -88,7 +111,14 @@ public Map readChronicle() { return histos; } - public void printPercentilesSummary() { + public void outputData() { + if (summaryOutput) + printSummary(); + else + printPercentilesSummary(); + } + + private void printPercentilesSummary() { // we should also consider the case where >1 output messages are from 1 incoming if (histos.size() == 0) { @@ -111,6 +141,23 @@ public void printPercentilesSummary() { messageSink.accept("worst: " + percentiles(-1)); } + private void printSummary() { + if (histos.size() > lastHistosSize) { + messageSink.accept("relative_ts," + String.join(",", histos.keySet())); + lastHistosSize = histos.size(); + } + long tsSinceStart = (lastWindowCount * measurementWindowNanos) - firstTimeStampNanos; + messageSink.accept( + Long.toString(timeUnit.convert(tsSinceStart, TimeUnit.NANOSECONDS)) + "," + + histos.values().stream(). + map(h -> Long.toString(timeUnit.convert((long) last(h.getPercentiles()), TimeUnit.NANOSECONDS))). + collect(Collectors.joining(","))); + } + + private double last(double[] percentiles) { + return percentiles[percentiles.length - 1]; + } + private String count() { final StringBuilder sb = new StringBuilder(" "); histos.forEach((id, histogram) -> sb.append(String.format("%12d ", histogram.totalCount()))); @@ -136,38 +183,65 @@ private String percentiles(final int index) { protected WireParselet parselet() { return (methodName, v, $) -> { v.skipValue(); - CharSequence extraHistoId = histosByMethod ? ("_"+methodName) : ""; + if (counter < ignore) + return; final MessageHistory history = MessageHistory.get(); - long lastTime = 0; - // if the tailer has recordHistory(true) then the MessageHistory will be - // written with a single timing and nothing else. This is then carried through - int firstWriteOffset = history.timings() - (history.sources() * 2); - if (! (firstWriteOffset == 0 || firstWriteOffset == 1)) - // don't know how this can happen, but there is at least one CQ that exhibits it + if (history == null) return; - for (int sourceIndex=0; sourceIndex histogram()); - long receivedByThisComponent = history.timing((2 * sourceIndex) + firstWriteOffset); - long processedByThisComponent = history.timing((2 * sourceIndex) + firstWriteOffset + 1); - histo.sample(processedByThisComponent - receivedByThisComponent); - if (lastTime == 0 && firstWriteOffset > 0) { - Histogram histo1 = histos.computeIfAbsent("startTo" + histoId, s -> histogram()); - histo1.sample(receivedByThisComponent - history.timing(0)); - } else if (lastTime != 0) { - Histogram histo1 = histos.computeIfAbsent(Integer.toString(history.sourceId(sourceIndex-1)) + "to" + histoId, s -> histogram()); - // here we are comparing System.nanoTime across processes. YMMV - histo1.sample(receivedByThisComponent - lastTime); + + processMessage(methodName, history); + + if (history.timings() > 0) { + long firstTiming = history.timing(0); + if (measurementWindowNanos > 0) { + long windowCount = firstTiming / measurementWindowNanos; + if (windowCount > lastWindowCount) { + windowPassed(); + lastWindowCount = windowCount; + } + if (firstTimeStampNanos == 0) + firstTimeStampNanos = firstTiming; } - lastTime = processedByThisComponent; - } - if (history.sources() > 1) { - Histogram histoE2E = histos.computeIfAbsent("endToEnd", s -> histogram()); - histoE2E.sample(history.timing(history.timings() - 1) - history.timing(0)); } }; } + protected void processMessage(CharSequence methodName, MessageHistory history) { + CharSequence extraHistoId = histosByMethod ? ("_"+methodName) : ""; + long lastTime = 0; + // if the tailer has recordHistory(true) then the MessageHistory will be + // written with a single timing and nothing else. This is then carried through + int firstWriteOffset = history.timings() - (history.sources() * 2); + if (! (firstWriteOffset == 0 || firstWriteOffset == 1)) + // don't know how this can happen, but there is at least one CQ that exhibits it + return; + for (int sourceIndex=0; sourceIndex histogram()); + long receivedByThisComponent = history.timing((2 * sourceIndex) + firstWriteOffset); + long processedByThisComponent = history.timing((2 * sourceIndex) + firstWriteOffset + 1); + histo.sample(processedByThisComponent - receivedByThisComponent); + if (lastTime == 0 && firstWriteOffset > 0) { + Histogram histo1 = histos.computeIfAbsent("startTo" + histoId, s -> histogram()); + histo1.sample(receivedByThisComponent - history.timing(0)); + } else if (lastTime != 0) { + Histogram histo1 = histos.computeIfAbsent(Integer.toString(history.sourceId(sourceIndex-1)) + "to" + histoId, s -> histogram()); + // here we are comparing System.nanoTime across processes. YMMV + histo1.sample(receivedByThisComponent - lastTime); + } + lastTime = processedByThisComponent; + } + if (history.sources() > 1) { + Histogram histoE2E = histos.computeIfAbsent("endToEnd", s -> histogram()); + histoE2E.sample(history.timing(history.timings() - 1) - history.timing(0)); + } + } + + protected void windowPassed() { + outputData(); + histos.values().forEach(Histogram::reset); + } + @NotNull protected Histogram histogram() { return new Histogram(60, 4); diff --git a/src/test/java/net/openhft/chronicle/queue/reader/ChronicleHistoryReaderTest.java b/src/test/java/net/openhft/chronicle/queue/reader/ChronicleHistoryReaderTest.java index 3411266587..bcb2ece300 100755 --- a/src/test/java/net/openhft/chronicle/queue/reader/ChronicleHistoryReaderTest.java +++ b/src/test/java/net/openhft/chronicle/queue/reader/ChronicleHistoryReaderTest.java @@ -22,7 +22,6 @@ import net.openhft.chronicle.core.util.Histogram; import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder; -import net.openhft.chronicle.queue.reader.ChronicleHistoryReader; import net.openhft.chronicle.wire.MessageHistory; import net.openhft.chronicle.wire.MethodReader; import net.openhft.chronicle.wire.VanillaMessageHistory; @@ -114,7 +113,7 @@ private void doTest(boolean recordHistoryFirst) { Map histos = chronicleHistoryReader.readChronicle(); chronicleHistoryReader.withMessageSink(System.out::println); - chronicleHistoryReader.printPercentilesSummary(); + chronicleHistoryReader.outputData(); if (recordHistoryFirst) { Assert.assertEquals(5, histos.size());