Skip to content

Commit

Permalink
ChronicleHistoryReader: allow ignore of first n samples, allow summar…
Browse files Browse the repository at this point in the history
…y output, allow output every time window
  • Loading branch information
JerryShea committed Nov 26, 2017
1 parent 8be57af commit 2589191
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 31 deletions.
Expand Up @@ -55,9 +55,14 @@ protected void setup(CommandLine commandLine, ChronicleHistoryReader chronicleHi
withMessageSink(System.out::println).
withProgress(commandLine.hasOption('p')).
withHistosByMethod(commandLine.hasOption('m')).
withSummaryOutput(commandLine.hasOption('u')).
withBasePath(Paths.get(commandLine.getOptionValue('d')));
if (commandLine.hasOption('t'))
chronicleHistoryReader.withTimeUnit(TimeUnit.valueOf(commandLine.getOptionValue('t')));
if (commandLine.hasOption('i'))
chronicleHistoryReader.withIgnore(Long.parseLong(commandLine.getOptionValue('i')));
if (commandLine.hasOption('w'))
chronicleHistoryReader.withMeasurementWindow(Long.parseLong(commandLine.getOptionValue('w')));
}

@NotNull
Expand Down Expand Up @@ -96,8 +101,11 @@ protected Options options() {
ChronicleReaderMain.addOption(options, "d", "directory", true, "Directory containing chronicle queue files", true);
ChronicleReaderMain.addOption(options, "h", "help-message", false, "Print this help and exit", false);
ChronicleReaderMain.addOption(options, "t", "time unit", true, "Time unit. Default nanos", false);
ChronicleReaderMain.addOption(options, "i", "ignore", true, "How many to ignore from start", false);
ChronicleReaderMain.addOption(options, "w", "window", true, "Window duration in time unit", false);
options.addOption(new Option("p", false, "Show progress"));
options.addOption(new Option("m", false, "By method"));
options.addOption(new Option("u", false, "Summary output"));
return options;
}
}
Expand Up @@ -16,6 +16,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
* Created by Jerry Shea on 29/09/17.
Expand All @@ -28,6 +29,13 @@ public class ChronicleHistoryReader {
private TimeUnit timeUnit = TimeUnit.NANOSECONDS;
protected boolean histosByMethod = false;
protected Map<String, Histogram> histos = new LinkedHashMap<>();
private long ignore = 0;
private long counter = 0;
private long measurementWindowNanos = 0;
private long firstTimeStampNanos = 0;
private long lastWindowCount = 0;
private boolean summaryOutput = false;
private int lastHistosSize = 0;

public ChronicleHistoryReader withMessageSink(final Consumer<String> messageSink) {
this.messageSink = messageSink;
Expand All @@ -54,6 +62,21 @@ public ChronicleHistoryReader withHistosByMethod(boolean b) {
return this;
}

public ChronicleHistoryReader withIgnore(long ignore) {
this.ignore = ignore;
return this;
}

public ChronicleHistoryReader withMeasurementWindow(long measurementWindow) {
this.measurementWindowNanos = timeUnit.toNanos(measurementWindow);
return this;
}

public ChronicleHistoryReader withSummaryOutput(boolean b) {
this.summaryOutput = b;
return this;
}

@NotNull
private SingleChronicleQueue createQueue() {
if (!Files.exists(basePath)) {
Expand All @@ -67,7 +90,8 @@ private SingleChronicleQueue createQueue() {

public void execute() {
readChronicle();
printPercentilesSummary();
if (measurementWindowNanos == 0)
outputData();
}

public Map<String, Histogram> readChronicle() {
Expand All @@ -77,18 +101,24 @@ public Map<String, Histogram> readChronicle() {
final MethodReader mr = new MethodReader(tailer, true, parselet, null, parselet);

MessageHistory.set(new VanillaMessageHistory());
int counter = 0;
while (! Thread.currentThread().isInterrupted() && mr.readOne()) {
++counter;
if (this.progress && counter % 1_000_000 == 0) {
if (this.progress && counter % 1_000_000L == 0) {
System.out.println("Progress: " + counter);
}
}

return histos;
}

public void printPercentilesSummary() {
public void outputData() {
if (summaryOutput)
printSummary();
else
printPercentilesSummary();
}

private void printPercentilesSummary() {
// we should also consider the case where >1 output messages are from 1 incoming

if (histos.size() == 0) {
Expand All @@ -111,6 +141,23 @@ public void printPercentilesSummary() {
messageSink.accept("worst: " + percentiles(-1));
}

private void printSummary() {
if (histos.size() > lastHistosSize) {
messageSink.accept("relative_ts," + String.join(",", histos.keySet()));
lastHistosSize = histos.size();
}
long tsSinceStart = (lastWindowCount * measurementWindowNanos) - firstTimeStampNanos;
messageSink.accept(
Long.toString(timeUnit.convert(tsSinceStart, TimeUnit.NANOSECONDS)) + "," +
histos.values().stream().
map(h -> Long.toString(timeUnit.convert((long) last(h.getPercentiles()), TimeUnit.NANOSECONDS))).
collect(Collectors.joining(",")));
}

private double last(double[] percentiles) {
return percentiles[percentiles.length - 1];
}

private String count() {
final StringBuilder sb = new StringBuilder(" ");
histos.forEach((id, histogram) -> sb.append(String.format("%12d ", histogram.totalCount())));
Expand All @@ -136,38 +183,65 @@ private String percentiles(final int index) {
protected WireParselet parselet() {
return (methodName, v, $) -> {
v.skipValue();
CharSequence extraHistoId = histosByMethod ? ("_"+methodName) : "";
if (counter < ignore)
return;
final MessageHistory history = MessageHistory.get();
long lastTime = 0;
// if the tailer has recordHistory(true) then the MessageHistory will be
// written with a single timing and nothing else. This is then carried through
int firstWriteOffset = history.timings() - (history.sources() * 2);
if (! (firstWriteOffset == 0 || firstWriteOffset == 1))
// don't know how this can happen, but there is at least one CQ that exhibits it
if (history == null)
return;
for (int sourceIndex=0; sourceIndex<history.sources(); sourceIndex++) {
String histoId = Integer.toString(history.sourceId(sourceIndex)) + extraHistoId;
Histogram histo = histos.computeIfAbsent(histoId, s -> histogram());
long receivedByThisComponent = history.timing((2 * sourceIndex) + firstWriteOffset);
long processedByThisComponent = history.timing((2 * sourceIndex) + firstWriteOffset + 1);
histo.sample(processedByThisComponent - receivedByThisComponent);
if (lastTime == 0 && firstWriteOffset > 0) {
Histogram histo1 = histos.computeIfAbsent("startTo" + histoId, s -> histogram());
histo1.sample(receivedByThisComponent - history.timing(0));
} else if (lastTime != 0) {
Histogram histo1 = histos.computeIfAbsent(Integer.toString(history.sourceId(sourceIndex-1)) + "to" + histoId, s -> histogram());
// here we are comparing System.nanoTime across processes. YMMV
histo1.sample(receivedByThisComponent - lastTime);

processMessage(methodName, history);

if (history.timings() > 0) {
long firstTiming = history.timing(0);
if (measurementWindowNanos > 0) {
long windowCount = firstTiming / measurementWindowNanos;
if (windowCount > lastWindowCount) {
windowPassed();
lastWindowCount = windowCount;
}
if (firstTimeStampNanos == 0)
firstTimeStampNanos = firstTiming;
}
lastTime = processedByThisComponent;
}
if (history.sources() > 1) {
Histogram histoE2E = histos.computeIfAbsent("endToEnd", s -> histogram());
histoE2E.sample(history.timing(history.timings() - 1) - history.timing(0));
}
};
}

protected void processMessage(CharSequence methodName, MessageHistory history) {
CharSequence extraHistoId = histosByMethod ? ("_"+methodName) : "";
long lastTime = 0;
// if the tailer has recordHistory(true) then the MessageHistory will be
// written with a single timing and nothing else. This is then carried through
int firstWriteOffset = history.timings() - (history.sources() * 2);
if (! (firstWriteOffset == 0 || firstWriteOffset == 1))
// don't know how this can happen, but there is at least one CQ that exhibits it
return;
for (int sourceIndex=0; sourceIndex<history.sources(); sourceIndex++) {
String histoId = Integer.toString(history.sourceId(sourceIndex)) + extraHistoId;
Histogram histo = histos.computeIfAbsent(histoId, s -> histogram());
long receivedByThisComponent = history.timing((2 * sourceIndex) + firstWriteOffset);
long processedByThisComponent = history.timing((2 * sourceIndex) + firstWriteOffset + 1);
histo.sample(processedByThisComponent - receivedByThisComponent);
if (lastTime == 0 && firstWriteOffset > 0) {
Histogram histo1 = histos.computeIfAbsent("startTo" + histoId, s -> histogram());
histo1.sample(receivedByThisComponent - history.timing(0));
} else if (lastTime != 0) {
Histogram histo1 = histos.computeIfAbsent(Integer.toString(history.sourceId(sourceIndex-1)) + "to" + histoId, s -> histogram());
// here we are comparing System.nanoTime across processes. YMMV
histo1.sample(receivedByThisComponent - lastTime);
}
lastTime = processedByThisComponent;
}
if (history.sources() > 1) {
Histogram histoE2E = histos.computeIfAbsent("endToEnd", s -> histogram());
histoE2E.sample(history.timing(history.timings() - 1) - history.timing(0));
}
}

protected void windowPassed() {
outputData();
histos.values().forEach(Histogram::reset);
}

@NotNull
protected Histogram histogram() {
return new Histogram(60, 4);
Expand Down
Expand Up @@ -22,7 +22,6 @@
import net.openhft.chronicle.core.util.Histogram;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.queue.reader.ChronicleHistoryReader;
import net.openhft.chronicle.wire.MessageHistory;
import net.openhft.chronicle.wire.MethodReader;
import net.openhft.chronicle.wire.VanillaMessageHistory;
Expand Down Expand Up @@ -114,7 +113,7 @@ private void doTest(boolean recordHistoryFirst) {
Map<String, Histogram> histos = chronicleHistoryReader.readChronicle();

chronicleHistoryReader.withMessageSink(System.out::println);
chronicleHistoryReader.printPercentilesSummary();
chronicleHistoryReader.outputData();

if (recordHistoryFirst) {
Assert.assertEquals(5, histos.size());
Expand Down

0 comments on commit 2589191

Please sign in to comment.