From 98ad2320e6c28bcfd7afc6a7f2bf25b640bc957a Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Thu, 1 Mar 2018 14:59:53 +0200 Subject: [PATCH] Remove locking from DEHR and use CAS update operation in rescaling to improve performance Add method to combine two Snapshots Use LongAdder[] instead of AtomicLongArray to reduce contention Read metrics from children LatencyMetrics instances when needed, instead of writing to parent instances each time. Add benchmark for LatencyMetrics and Reservoir When child is released, it replicates previous status to parents --- .../DecayingEstimatedHistogramReservoir.java | 228 +++++++++++------- .../cassandra/metrics/LatencyMetrics.java | 154 +++++++++++- .../test/microbench/LatencyTrackingBench.java | 111 +++++++++ ...cayingEstimatedHistogramReservoirTest.java | 35 +++ .../cassandra/metrics/LatencyMetricsTest.java | 70 +++++- 5 files changed, 488 insertions(+), 110 deletions(-) create mode 100644 test/microbench/org/apache/cassandra/test/microbench/LatencyTrackingBench.java diff --git a/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java b/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java index 118f062cdc90..33ed0385af5e 100644 --- a/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java +++ b/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java @@ -24,8 +24,7 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLongArray; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.atomic.LongAdder; import com.google.common.annotations.VisibleForTesting; @@ -85,8 +84,8 @@ public class DecayingEstimatedHistogramReservoir implements Reservoir private final long[] bucketOffsets; // decayingBuckets and buckets are one element longer than bucketOffsets -- the last element is values greater than the last offset - private final AtomicLongArray decayingBuckets; - private final AtomicLongArray buckets; + private final LongAdder[] decayingBuckets; + private final LongAdder[] buckets; public static final long HALF_TIME_IN_S = 60L; public static final double MEAN_LIFETIME_IN_S = HALF_TIME_IN_S / Math.log(2.0); @@ -95,8 +94,6 @@ public class DecayingEstimatedHistogramReservoir implements Reservoir private final AtomicBoolean rescaling = new AtomicBoolean(false); private volatile long decayLandmark; - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - // Wrapper around System.nanoTime() to simplify unit testing. private final Clock clock; @@ -150,8 +147,14 @@ public DecayingEstimatedHistogramReservoir(boolean considerZeroes, int bucketCou { bucketOffsets = EstimatedHistogram.newOffsets(bucketCount, considerZeroes); } - decayingBuckets = new AtomicLongArray(bucketOffsets.length + 1); - buckets = new AtomicLongArray(bucketOffsets.length + 1); + decayingBuckets = new LongAdder[bucketOffsets.length + 1]; + buckets = new LongAdder[bucketOffsets.length + 1]; + + for(int i = 0; i < buckets.length; i++) { + decayingBuckets[i] = new LongAdder(); + buckets[i] = new LongAdder(); + } + this.clock = clock; decayLandmark = clock.getTime(); } @@ -174,18 +177,8 @@ public void update(long value) } // else exact match; we're good - lockForRegularUsage(); - - try - { - decayingBuckets.getAndAdd(index, Math.round(forwardDecayWeight(now))); - } - finally - { - unlockForRegularUsage(); - } - - buckets.getAndIncrement(index); + decayingBuckets[index].add(Math.round(forwardDecayWeight(now))); + buckets[index].increment(); } private double forwardDecayWeight(long now) @@ -202,7 +195,7 @@ private double forwardDecayWeight(long now) */ public int size() { - return decayingBuckets.length(); + return decayingBuckets.length; } /** @@ -215,17 +208,7 @@ public int size() public Snapshot getSnapshot() { rescaleIfNeeded(); - - lockForRegularUsage(); - - try - { - return new EstimatedHistogramReservoirSnapshot(this); - } - finally - { - unlockForRegularUsage(); - } + return new EstimatedHistogramReservoirSnapshot(this); } /** @@ -234,7 +217,7 @@ public Snapshot getSnapshot() @VisibleForTesting boolean isOverflowed() { - return decayingBuckets.get(decayingBuckets.length() - 1) > 0; + return decayingBuckets[decayingBuckets.length - 1].sum() > 0; } private void rescaleIfNeeded() @@ -254,6 +237,7 @@ private void rescaleIfNeeded(long now) } finally { + decayLandmark = now; rescaling.set(false); } } @@ -262,27 +246,14 @@ private void rescaleIfNeeded(long now) private void rescale(long now) { - // Check again to make sure that another thread didn't complete rescale already - if (needRescale(now)) - { - lockForRescale(); + final double rescaleFactor = forwardDecayWeight(now); - try - { - final double rescaleFactor = forwardDecayWeight(now); - decayLandmark = now; - - final int bucketCount = decayingBuckets.length(); - for (int i = 0; i < bucketCount; i++) - { - long newValue = Math.round((decayingBuckets.get(i) / rescaleFactor)); - decayingBuckets.set(i, newValue); - } - } - finally - { - unlockForRescale(); - } + final int bucketCount = decayingBuckets.length; + for (int i = 0; i < bucketCount; i++) + { + long storedValue = decayingBuckets[i].sumThenReset(); + storedValue = Math.round(storedValue / rescaleFactor); + decayingBuckets[i].add(storedValue); } } @@ -294,41 +265,44 @@ private boolean needRescale(long now) @VisibleForTesting public void clear() { - lockForRescale(); - - try + final int bucketCount = decayingBuckets.length; + for (int i = 0; i < bucketCount; i++) { - final int bucketCount = decayingBuckets.length(); - for (int i = 0; i < bucketCount; i++) - { - decayingBuckets.set(i, 0L); - buckets.set(i, 0L); - } - } - finally - { - unlockForRescale(); + decayingBuckets[i].reset(); + buckets[i].reset(); } } - private void lockForRegularUsage() + /** + * Replaces current internal values with the given one from a Snapshot. This method is NOT thread safe, values + * added at the same time to this reservoir using methods such as update may lose their data + */ + public void rebase(EstimatedHistogramReservoirSnapshot snapshot) { - this.lock.readLock().lock(); - } + // Check bucket count + if (decayingBuckets.length != snapshot.decayingBuckets.length) + { + throw new IllegalStateException("Unable to merge two DecayingEstimatedHistogramReservoirs with different bucket sizes"); + } - private void unlockForRegularUsage() - { - this.lock.readLock().unlock(); - } + // Check bucketOffsets + for (int i = 0; i < bucketOffsets.length; i++) + { + if (bucketOffsets[i] != snapshot.bucketOffsets[i]) + { + throw new IllegalStateException("Merge is only supported with equal bucketOffsets"); + } + } - private void lockForRescale() - { - this.lock.writeLock().lock(); - } + this.decayLandmark = snapshot.snapshotLandmark; + for (int i = 0; i < decayingBuckets.length; i++) + { + decayingBuckets[i].reset(); + buckets[i].reset(); - private void unlockForRescale() - { - this.lock.writeLock().unlock(); + decayingBuckets[i].add(snapshot.decayingBuckets[i]); + buckets[i].add(snapshot.values[i]); + } } /** @@ -341,19 +315,32 @@ private void unlockForRescale() * The decaying buckets will be used for quantile calculations and mean values, but the non decaying buckets will be * exposed for calls to {@link Snapshot#getValues()}. */ - private class EstimatedHistogramReservoirSnapshot extends Snapshot + class EstimatedHistogramReservoirSnapshot extends Snapshot { private final long[] decayingBuckets; + private final long[] values; + private long count; + private long snapshotLandmark; + private long[] bucketOffsets; + private DecayingEstimatedHistogramReservoir reservoir; public EstimatedHistogramReservoirSnapshot(DecayingEstimatedHistogramReservoir reservoir) { - final int length = reservoir.decayingBuckets.length(); + final int length = reservoir.decayingBuckets.length; final double rescaleFactor = forwardDecayWeight(clock.getTime()); this.decayingBuckets = new long[length]; + this.values = new long[length]; + this.count = count(); + this.snapshotLandmark = decayLandmark; + this.bucketOffsets = reservoir.bucketOffsets; // No need to copy, these are immutable for (int i = 0; i < length; i++) - this.decayingBuckets[i] = Math.round(reservoir.decayingBuckets.get(i) / rescaleFactor); + { + this.decayingBuckets[i] = Math.round(reservoir.decayingBuckets[i].sum() / rescaleFactor); + this.values[i] = buckets[i].sum(); + } + this.reservoir = reservoir; } /** @@ -396,13 +383,6 @@ public double getValue(double quantile) */ public long[] getValues() { - final int length = buckets.length(); - - long[] values = new long[length]; - - for (int i = 0; i < length; i++) - values[i] = buckets.get(i); - return values; } @@ -418,6 +398,12 @@ public int size() return decayingBuckets.length; } + @VisibleForTesting + public long getSnapshotLandmark() + { + return snapshotLandmark; + } + /** * Return the number of registered values taking forward decay into account. * @@ -547,5 +533,67 @@ public void dump(OutputStream output) } } } + + /** + * Adds another DecayingEstimatedHistogramReservoir's Snapshot to this one. Both reservoirs must have same bucket definitions. This will rescale both snapshots if needed. + * + * @param other EstimatedHistogramReservoirSnapshot with identical bucket definition (offsets and length) + */ + public void add(Snapshot other) + { + if (!(other instanceof EstimatedHistogramReservoirSnapshot)) + { + throw new IllegalStateException("Unable to add other types of Snapshot than another DecayingEstimatedHistogramReservoir"); + } + + EstimatedHistogramReservoirSnapshot snapshot = (EstimatedHistogramReservoirSnapshot) other; + + if (decayingBuckets.length != snapshot.decayingBuckets.length) + { + throw new IllegalStateException("Unable to merge two DecayingEstimatedHistogramReservoirs with different bucket sizes"); + } + + // Check bucketOffsets + for (int i = 0; i < bucketOffsets.length; i++) + { + if (bucketOffsets[i] != snapshot.bucketOffsets[i]) + { + throw new IllegalStateException("Merge is only supported with equal bucketOffsets"); + } + } + + // We need to rescale the reservoirs to the same landmark + if (snapshot.snapshotLandmark < snapshotLandmark) + { + rescaleArray(snapshot.decayingBuckets, (snapshotLandmark - snapshot.snapshotLandmark)); + } + else if (snapshot.snapshotLandmark > snapshotLandmark) + { + rescaleArray(decayingBuckets, (snapshot.snapshotLandmark - snapshotLandmark)); + this.snapshotLandmark = snapshot.snapshotLandmark; + } + + // Now merge the buckets + for (int i = 0; i < snapshot.decayingBuckets.length; i++) + { + decayingBuckets[i] += snapshot.decayingBuckets[i]; + values[i] += snapshot.values[i]; + } + + this.count += snapshot.count; + } + + private void rescaleArray(long[] decayingBuckets, long landMarkDifference) + { + final double rescaleFactor = Math.exp((landMarkDifference / 1000.0) / MEAN_LIFETIME_IN_S); + for (int i = 0; i < decayingBuckets.length; i++) + { + decayingBuckets[i] = Math.round(decayingBuckets[i] / rescaleFactor); + } + } + + public void rebaseReservoir() { + this.reservoir.rebase(this); + } } } diff --git a/src/java/org/apache/cassandra/metrics/LatencyMetrics.java b/src/java/org/apache/cassandra/metrics/LatencyMetrics.java index a1915b167b19..f8bea23eb5f0 100644 --- a/src/java/org/apache/cassandra/metrics/LatencyMetrics.java +++ b/src/java/org/apache/cassandra/metrics/LatencyMetrics.java @@ -17,15 +17,17 @@ */ package org.apache.cassandra.metrics; +import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; +import com.google.common.collect.Lists; + import com.codahale.metrics.Counter; +import com.codahale.metrics.Reservoir; +import com.codahale.metrics.Snapshot; import com.codahale.metrics.Timer; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; @@ -41,7 +43,8 @@ public class LatencyMetrics /** parent metrics to replicate any updates to **/ private List parents = Lists.newArrayList(); - + private List children = Lists.newArrayList(); + protected final MetricNameFactory factory; protected final MetricNameFactory aliasFactory; protected final String namePrefix; @@ -86,15 +89,18 @@ public LatencyMetrics(MetricNameFactory factory, MetricNameFactory aliasFactory, this.aliasFactory = aliasFactory; this.namePrefix = namePrefix; + Timer timer = new LatencyMetrics.LatencyMetricsTimer(new DecayingEstimatedHistogramReservoir()); + Counter counter = new LatencyMetricsCounter(); + if (aliasFactory == null) { - latency = Metrics.timer(factory.createMetricName(namePrefix + "Latency")); - totalLatency = Metrics.counter(factory.createMetricName(namePrefix + "TotalLatency")); + latency = Metrics.register(factory.createMetricName(namePrefix + "Latency"), timer); + totalLatency = Metrics.register(factory.createMetricName(namePrefix + "TotalLatency"), counter); } else { - latency = Metrics.timer(factory.createMetricName(namePrefix + "Latency"), aliasFactory.createMetricName(namePrefix + "Latency")); - totalLatency = Metrics.counter(factory.createMetricName(namePrefix + "TotalLatency"), aliasFactory.createMetricName(namePrefix + "TotalLatency")); + latency = Metrics.register(factory.createMetricName(namePrefix + "Latency"), aliasFactory.createMetricName(namePrefix + "Latency"), timer); + totalLatency = Metrics.register(factory.createMetricName(namePrefix + "TotalLatency"), aliasFactory.createMetricName(namePrefix + "TotalLatency"), counter); } } @@ -109,7 +115,37 @@ public LatencyMetrics(MetricNameFactory factory, MetricNameFactory aliasFactory, public LatencyMetrics(MetricNameFactory factory, String namePrefix, LatencyMetrics ... parents) { this(factory, null, namePrefix); - this.parents.addAll(ImmutableList.copyOf(parents)); + this.parents = Arrays.asList(parents); + for (LatencyMetrics parent : parents) + { + parent.addChildren(this); + } + } + + public void addChildren(LatencyMetrics latencyMetric) { + this.children.add(latencyMetric); + } + + public synchronized void removeChildren(LatencyMetrics toRelease) + { + /* + Merge details of removed children metrics and add them to our local copy to prevent metrics from going + backwards. Synchronized since these methods are not thread safe to prevent multiple simultaneous removals. + Will not protect against simultaneous updates, but since these methods are used by linked parent instances only, + they should not receive any updates. + */ + ((LatencyMetricsTimer) this.latency).releasedLatencyCount += toRelease.latency.getCount(); + + DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot childSnapshot = (DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot) toRelease.latency.getSnapshot(); + DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot snapshot = (DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot) this.latency.getSnapshot(); + + snapshot.add(childSnapshot); + snapshot.rebaseReservoir(); + + this.totalLatency.inc(toRelease.totalLatency.getCount()); + + // Now we can remove the reference + this.children.removeIf(latencyMetrics -> latencyMetrics.equals(toRelease)); } /** takes nanoseconds **/ @@ -118,14 +154,15 @@ public void addNano(long nanos) // convert to microseconds. 1 millionth latency.update(nanos, TimeUnit.NANOSECONDS); totalLatency.inc(nanos / 1000); - for(LatencyMetrics parent : parents) - { - parent.addNano(nanos); - } } public void release() { + // Notify parent metrics that this metric is being released + for (LatencyMetrics parent : this.parents) + { + parent.removeChildren(this); + } if (aliasFactory == null) { Metrics.remove(factory.createMetricName(namePrefix + "Latency")); @@ -137,4 +174,95 @@ public void release() Metrics.remove(factory.createMetricName(namePrefix + "TotalLatency"), aliasFactory.createMetricName(namePrefix + "TotalLatency")); } } + + class LatencyMetricsTimer extends Timer { + + long releasedLatencyCount = 0; + + public LatencyMetricsTimer(Reservoir reservoir) { + super(reservoir); + } + + @Override + public long getCount() + { + long count = super.getCount() + releasedLatencyCount; + for (LatencyMetrics child : children) + { + count += child.latency.getCount(); + } + + return count; + } + + @Override + public double getFifteenMinuteRate() + { + double rate = super.getFifteenMinuteRate(); + for (LatencyMetrics child : children) + { + rate += child.latency.getFifteenMinuteRate(); + } + return rate; + } + + @Override + public double getFiveMinuteRate() + { + double rate = super.getFiveMinuteRate(); + for (LatencyMetrics child : children) + { + rate += child.latency.getFiveMinuteRate(); + } + return rate; + } + + @Override + public double getMeanRate() + { + // Not necessarily 100% accurate, but close enough + double rate = super.getMeanRate(); + for (LatencyMetrics child : children) + { + rate += child.latency.getMeanRate(); + } + return rate; + } + + @Override + public double getOneMinuteRate() + { + double rate = super.getOneMinuteRate(); + for (LatencyMetrics child : children) + { + rate += child.latency.getOneMinuteRate(); + } + return rate; + } + + @Override + public Snapshot getSnapshot() + { + DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot parent = (DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot) super.getSnapshot(); + for (LatencyMetrics child : children) + { + parent.add(child.latency.getSnapshot()); + } + + return parent; + } + } + + class LatencyMetricsCounter extends Counter { + @Override + public long getCount() + { + long count = super.getCount(); + for (LatencyMetrics child : children) + { + count += child.totalLatency.getCount(); + } + return count; + } + } } diff --git a/test/microbench/org/apache/cassandra/test/microbench/LatencyTrackingBench.java b/test/microbench/org/apache/cassandra/test/microbench/LatencyTrackingBench.java new file mode 100644 index 000000000000..75ec92cada79 --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/LatencyTrackingBench.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.test.microbench; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Timer; +import org.apache.cassandra.metrics.CassandraMetricsRegistry; +import org.apache.cassandra.metrics.ClearableHistogram; +import org.apache.cassandra.metrics.DecayingEstimatedHistogramReservoir; +import org.apache.cassandra.metrics.LatencyMetrics; +import org.apache.cassandra.metrics.LatencyMetricsTest; +import org.apache.cassandra.metrics.MetricNameFactory; +import org.apache.cassandra.metrics.TableMetrics; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.CompilerControl; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1,jvmArgsAppend = { "-Xmx512M", "-Djmh.executor=CUSTOM", "-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"}) +@Threads(4) // make sure this matches the number of _physical_cores_ +@State(Scope.Benchmark) +public class LatencyTrackingBench +{ + private LatencyMetrics metrics; + private LatencyMetrics parent; + private LatencyMetrics grandParent; + private DecayingEstimatedHistogramReservoir dehr; + private final MetricNameFactory factory = new BenchMetricsNameFactory(); + private long[] values = new long[1024]; + + class BenchMetricsNameFactory implements MetricNameFactory + { + + @Override + public CassandraMetricsRegistry.MetricName createMetricName(String metricName) + { + return new CassandraMetricsRegistry.MetricName(BenchMetricsNameFactory.class, metricName); + } + } + + @Setup(Level.Iteration) + public void setup() { + parent = new LatencyMetrics("test", "testCF"); + grandParent = new LatencyMetrics("test", "testCF"); + + // Replicates behavior from ColumnFamilyStore metrics + metrics = new LatencyMetrics(factory, "testCF", parent, grandParent); + dehr = new DecayingEstimatedHistogramReservoir(false); + for(int i = 0; i < 1024; i++) { + values[i] = TimeUnit.MICROSECONDS.toNanos(ThreadLocalRandom.current().nextLong(346)); + } + } + + @Setup(Level.Invocation) + public void reset() { + dehr = new DecayingEstimatedHistogramReservoir(false); + metrics.release(); + metrics = new LatencyMetrics(factory, "testCF", parent, grandParent); + } + + @Benchmark + @OperationsPerInvocation(1024) + public void benchLatencyMetricsWrite() { + for(int i = 0; i < values.length; i++) { + metrics.addNano(values[i]); + } + } + + @Benchmark + @OperationsPerInvocation(1024) + public void benchInsertToDEHR(Blackhole bh) { + for(int i = 0; i < values.length; i++) { + dehr.update(values[i]); + } + bh.consume(dehr); + } +} diff --git a/test/unit/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoirTest.java b/test/unit/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoirTest.java index ef1fed306644..5cfd9274038d 100644 --- a/test/unit/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoirTest.java +++ b/test/unit/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoirTest.java @@ -380,6 +380,41 @@ public void testDecayingMean() } } + @Test + public void testAggregation() + { + TestClock clock = new TestClock(); + + DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock); + DecayingEstimatedHistogramReservoir another = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock); + + clock.addMillis(DecayingEstimatedHistogramReservoir.LANDMARK_RESET_INTERVAL_IN_MS - 1_000L); + + histogram.update(1000); + clock.addMillis(100); + another.update(2000); + clock.addMillis(100); + histogram.update(2000); + clock.addMillis(100); + another.update(3000); + clock.addMillis(100); + histogram.update(3000); + clock.addMillis(100); + another.update(4000); + + DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot snapshot = (DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot) histogram.getSnapshot(); + DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot anotherSnapshot = (DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot) another.getSnapshot(); + + assertEquals(2000, snapshot.getMean(), 500D); + assertEquals(3000, anotherSnapshot.getMean(), 500D); + + snapshot.add(anotherSnapshot); + + // Another had newer decayLandmark, the aggregated snapshot should use it + assertEquals(anotherSnapshot.getSnapshotLandmark(), snapshot.getSnapshotLandmark()); + assertEquals(2500, snapshot.getMean(), 500D); + } + private void assertEstimatedQuantile(long expectedValue, double actualValue) { assertTrue("Expected at least [" + expectedValue + "] but actual is [" + actualValue + "]", actualValue >= expectedValue); diff --git a/test/unit/org/apache/cassandra/metrics/LatencyMetricsTest.java b/test/unit/org/apache/cassandra/metrics/LatencyMetricsTest.java index 62cb88e43c27..d61c5501c9c0 100644 --- a/test/unit/org/apache/cassandra/metrics/LatencyMetricsTest.java +++ b/test/unit/org/apache/cassandra/metrics/LatencyMetricsTest.java @@ -18,12 +18,27 @@ package org.apache.cassandra.metrics; +import java.util.concurrent.TimeUnit; + import org.junit.Test; +import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertFalse; public class LatencyMetricsTest { + private final MetricNameFactory factory = new TestMetricsNameFactory(); + + private class TestMetricsNameFactory implements MetricNameFactory + { + + @Override + public CassandraMetricsRegistry.MetricName createMetricName(String metricName) + { + return new CassandraMetricsRegistry.MetricName(TestMetricsNameFactory.class, metricName); + } + } + /** * Test bitsets in a "real-world" environment, i.e., bloom filters */ @@ -31,14 +46,10 @@ public class LatencyMetricsTest public void testGetRecentLatency() { final LatencyMetrics l = new LatencyMetrics("test", "test"); - Runnable r = new Runnable() - { - public void run() + Runnable r = () -> { + for (int i = 0; i < 10000; i++) { - for (int i = 0; i < 10000; i++) - { - l.addNano(1000); - } + l.addNano(1000); } }; new Thread(r).start(); @@ -49,4 +60,49 @@ public void run() assertFalse(recent.equals(Double.POSITIVE_INFINITY)); } } + + /** + * Test that parent LatencyMetrics are receiving updates from child metrics when reading + */ + @Test + public void testReadMerging() + { + final LatencyMetrics parent = new LatencyMetrics("testMerge", "testMerge"); + final LatencyMetrics child = new LatencyMetrics(factory, "testChild", parent); + + for (int i = 0; i < 100; i++) + { + child.addNano(TimeUnit.NANOSECONDS.convert(i, TimeUnit.MILLISECONDS)); + } + + assertEquals(4950000, child.totalLatency.getCount()); + assertEquals(child.totalLatency.getCount(), parent.totalLatency.getCount()); + assertEquals(child.latency.getSnapshot().getMean(), parent.latency.getSnapshot().getMean(), 50D); + + child.release(); + parent.release(); + } + + @Test + public void testRelease() + { + final LatencyMetrics parent = new LatencyMetrics("testRelease", "testRelease"); + final LatencyMetrics child = new LatencyMetrics(factory, "testChildRelease", parent); + + for (int i = 0; i < 100; i++) + { + child.addNano(TimeUnit.NANOSECONDS.convert(i, TimeUnit.MILLISECONDS)); + } + + double mean = parent.latency.getSnapshot().getMean(); + long count = parent.totalLatency.getCount(); + + child.release(); + + // Check that no value was lost with the release + assertEquals(count, parent.totalLatency.getCount()); + assertEquals(mean, parent.latency.getSnapshot().getMean(), 50D); + + parent.release(); + } } \ No newline at end of file