Skip to content

Commit

Permalink
accumulate over panes before making assertions
Browse files Browse the repository at this point in the history
  • Loading branch information
swachter committed Jul 23, 2021
1 parent 77ae72e commit fd11084
Showing 1 changed file with 21 additions and 34 deletions.
55 changes: 21 additions & 34 deletions main/src/test/java/org/opennms/nephron/RandomFlowIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,22 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.opennms.nephron.Pipeline.registerCoders;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -318,49 +322,32 @@ public void testRatesWithClockSkew() throws Exception {
}
}

final Map<String, LongSummaryStatistics> summaries = new TreeMap<>();
final long allowedError = 100;

for (final Map.Entry<String, List<FlowSummary>> list : lists.entrySet()) {
// skip first entry per router since it is incomplete
if (!list.getValue().isEmpty()) {
list.getValue().remove(0);
summaries.put(list.getKey(), list.getValue().stream().collect(Collectors.summarizingLong(FlowSummary::getBytesTotal)));
}
}
// for each exporter check that the expected volume is contained in each window
for (var exporterAndVolume: Arrays.asList(Map.entry(1, 11_000_000l), Map.entry(2, 12_000_000l), Map.entry(3, 13_000_000l))) {

for (final Map.Entry<String, LongSummaryStatistics> entry : summaries.entrySet()) {
LOG.info(entry.getKey() + " --> avg: " + entry.getValue().getAverage() + ", min: " + entry.getValue().getMin() + ", max: " + entry.getValue().getMax() + ", count: " + entry.getValue().getCount());
}
// Beam may output several panes for each window
// -> the result in each single pane does not meet the expected data volume but the sum over all panes of a windows does

assertThat(summaries, is(aMapWithSize(3)));
// calculate the LongSummaryStatistic over all panes for each window
var summaries = flowSummaries.stream()
.filter(fs -> fs.getExporter().getNodeId() == exporterAndVolume.getKey())
.collect(Collectors.groupingBy(FlowSummary::getTimestamp, Collectors.summarizingLong(FlowSummary::getBytesTotal)));

final long allowedError = 100;
assertThat(summaries, not(anEmptyMap()));

// node 1 has a negative clock skew
// -> it produces late panes; beam generates lots of these for each window
// -> the result in each single pane does not meet the expected data volume but the sum over all panes of a windows does
LOG.info("flow summary statistics for node: {}", exporterAndVolume.getKey());
for (var entry : summaries.entrySet()) {
LOG.info(Instant.ofEpochMilli(entry.getKey()) + " --> " + entry.getValue());
}

// calculate the LongSummaryStatistic over all panes for each window
var node1Summaries = flowSummaries.stream()
.filter(fs -> fs.getExporter().getNodeId() == 1)
.collect(Collectors.groupingBy(FlowSummary::getTimestamp, Collectors.summarizingLong(FlowSummary::getBytesTotal)));
// check that the sum of transferred bytes matches the expected volume in all windows
summaries.values()
.forEach(summaryStatistic -> assertThat(summaryStatistic.getSum(), longCloseTo(exporterAndVolume.getValue(), allowedError)));

for (var entry : node1Summaries.entrySet()) {
LOG.info(entry.getKey() + " --> " + entry.getValue());
}

// check that the sum of transferred bytes matches the expected volume in all windows
node1Summaries.values()
.forEach(summaryStatistic -> assertThat(summaryStatistic.getSum(), longCloseTo(11_000_000L, allowedError)));

assertThat(summaries.get("Test:Router2-13").getAverage(), closeTo(12_000_000.0, allowedError));
assertThat(summaries.get("Test:Router2-13").getMin(), longCloseTo(12_000_000L, allowedError));
assertThat(summaries.get("Test:Router2-13").getMax(), longCloseTo(12_000_000L, allowedError));

assertThat(summaries.get("Test:Router3-14").getAverage(), closeTo(13_000_000.0, allowedError));
assertThat(summaries.get("Test:Router3-14").getMin(), longCloseTo(13_000_000L, allowedError));
assertThat(summaries.get("Test:Router3-14").getMax(), longCloseTo(13_000_000L, allowedError));

}

public CompletableFuture<List<FlowSummary>> getFirstNFlowSummmariesFromES(int numDocs, NephronOptions options, QueryBuilder query) {
Expand Down

0 comments on commit fd11084

Please sign in to comment.