From ee3c7a88bb74232e4884899699aaa08ae2b6e038 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 14 Jun 2016 19:04:43 +0200 Subject: [PATCH] [FLINK-3951] Add Histogram metric type This closes #2112 --- .../org/apache/flink/metrics/Histogram.java | 52 +++ .../flink/metrics/HistogramStatistics.java | 81 +++++ .../org/apache/flink/metrics/MetricGroup.java | 20 ++ .../metrics/groups/AbstractMetricGroup.java | 12 + .../groups/UnregisteredMetricsGroup.java | 12 +- .../metrics/reporter/AbstractReporter.java | 15 + .../flink/metrics/reporter/JMXReporter.java | 98 +++++- .../flink/metrics/MetricRegistryTest.java | 3 +- .../groups/MetricGroupRegistrationTest.java | 21 ++ .../metrics/reporter/JMXReporterTest.java | 108 +++++- .../flink-metrics-dropwizard/pom.xml | 10 +- .../ScheduledDropwizardReporter.java | 30 +- .../DropwizardHistogramStatistics.java | 70 ++++ .../metrics/DropwizardHistogramWrapper.java | 53 +++ .../metrics/FlinkCounterWrapper.java | 4 +- .../dropwizard/metrics/FlinkGaugeWrapper.java | 8 +- .../metrics/FlinkHistogramWrapper.java | 52 +++ .../metrics/HistogramStatisticsWrapper.java | 86 +++++ .../DropwizardFlinkHistogramWrapperTest.java | 319 ++++++++++++++++++ .../src/test/resources/log4j-test.properties | 27 ++ .../src/test/resources/logback-test.xml | 34 ++ .../flink-metrics-ganglia/pom.xml | 2 +- .../metrics/ganglia/GangliaReporter.java | 0 .../flink-metrics-graphite/pom.xml | 2 +- .../metrics/graphite/GraphiteReporter.java | 0 .../flink-metrics-statsd/pom.xml | 10 +- .../flink/metrics/statsd/StatsDReporter.java | 41 +++ .../metrics/statsd/StatsDReporterTest.java | 236 +++++++++++++ .../src/test/resources/log4j-test.properties | 27 ++ .../src/test/resources/logback-test.xml | 34 ++ .../pom.xml | 4 +- pom.xml | 2 +- 32 files changed, 1450 insertions(+), 23 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/metrics/Histogram.java create mode 100644 flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java rename {flink-metric-reporters => flink-metrics}/flink-metrics-dropwizard/pom.xml (89%) rename {flink-metric-reporters => flink-metrics}/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java (78%) create mode 100644 flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java create mode 100644 flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java rename flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java => flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java (89%) rename flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java => flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java (82%) create mode 100644 flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java create mode 100644 flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java create mode 100644 flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java create mode 100644 flink-metrics/flink-metrics-dropwizard/src/test/resources/log4j-test.properties create mode 100644 flink-metrics/flink-metrics-dropwizard/src/test/resources/logback-test.xml rename {flink-metric-reporters => flink-metrics}/flink-metrics-ganglia/pom.xml (98%) rename {flink-metric-reporters => flink-metrics}/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java (100%) rename {flink-metric-reporters => flink-metrics}/flink-metrics-graphite/pom.xml (97%) rename {flink-metric-reporters => flink-metrics}/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java (100%) rename {flink-metric-reporters => flink-metrics}/flink-metrics-statsd/pom.xml (86%) rename {flink-metric-reporters => flink-metrics}/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java (75%) create mode 100644 flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java create mode 100644 flink-metrics/flink-metrics-statsd/src/test/resources/log4j-test.properties create mode 100644 flink-metrics/flink-metrics-statsd/src/test/resources/logback-test.xml rename {flink-metric-reporters => flink-metrics}/pom.xml (94%) diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java b/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java new file mode 100644 index 0000000000000..3fd1253f7a0ff --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Histogram interface to be used with Flink's metrics system. + * + * The histogram allows to record values, get the current count of recorded values and create + * histogram statistics for the currently seen elements. + */ +@PublicEvolving +public interface Histogram extends Metric { + + /** + * Update the histogram with the given value. + * + * @param value Value to update the histogram with + */ + void update(long value); + + /** + * Get the count of seen elements. + * + * @return Count of seen elements + */ + long getCount(); + + /** + * Create statistics for the currently recorded elements. + * + * @return Statistics about the currently recorded elements + */ + HistogramStatistics getStatistics(); +} diff --git a/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java b/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java new file mode 100644 index 0000000000000..476580c11e4a2 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Histogram statistics represent the current snapshot of elements recorded in the histogram. + * + * The histogram statistics allow to calculate values for quantiles, the mean, the standard + * deviation, the minimum and the maximum. + */ +@PublicEvolving +public abstract class HistogramStatistics { + + /** + * Returns the value for the given quantile based on the represented histogram statistics. + * + * @param quantile Quantile to calculate the value for + * @return Value for the given quantile + */ + public abstract double getQuantile(double quantile); + + /** + * Returns the elements of the statistics' sample + * + * @return Elements of the statistics' sample + */ + public abstract long[] getValues(); + + /** + * Returns the size of the statistics' sample + * + * @return Size of the statistics' sample + */ + public abstract int size(); + + /** + * Returns the mean of the histogram values. + * + * @return Mean of the histogram values + */ + public abstract double getMean(); + + /** + * Returns the standard deviation of the distribution reflected by the histogram statistics. + * + * @return Standard deviation of histogram distribution + */ + public abstract double getStdDev(); + + /** + * Returns the maximum value of the histogram. + * + * @return Maximum value of the histogram + */ + public abstract long getMax(); + + /** + * Returns the minimum value of the histogram. + * + * @return Minimum value of the histogram + */ + public abstract long getMin(); +} diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java index b13194961cb60..f46d3fc656f4c 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java @@ -115,6 +115,26 @@ public interface MetricGroup { */ > G gauge(String name, G gauge); + /** + * Registers a new {@link Histogram} with Flink. + * + * @param name name of the histogram + * @param histogram histogram to register + * @param histogram type + * @return the registered histogram + */ + H histogram(String name, H histogram); + + /** + * Registers a new {@link Histogram} with Flink. + * + * @param name name of the histogram + * @param histogram histogram to register + * @param histogram type + * @return the registered histogram + */ + H histogram(int name, H histogram); + // ------------------------------------------------------------------------ // Groups // ------------------------------------------------------------------------ diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java index 93eb7348073d2..112957efa4f9c 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.MetricRegistry; @@ -172,6 +173,17 @@ public > G gauge(String name, G gauge) { return gauge; } + @Override + public H histogram(int name, H histogram) { + return histogram(String.valueOf(name), histogram); + } + + @Override + public H histogram(String name, H histogram) { + addMetric(name, histogram); + return histogram; + } + /** * Adds the given metric to the group and registers it at the registry, if the group * is not yet closed, and if no metric with the same name has been registered before. diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java index 29d71d9ff7bd4..8e183df430fe3 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; @@ -71,7 +72,16 @@ public > G gauge(String name, G gauge) { return gauge; } - + @Override + public H histogram(int name, H histogram) { + return histogram; + } + + @Override + public H histogram(String name, H histogram) { + return histogram; + } + @Override public MetricGroup addGroup(int name) { return addGroup(String.valueOf(name)); diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java index f2e78bf381ae3..8dacb7cc0542c 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java @@ -21,8 +21,11 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.groups.AbstractMetricGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; @@ -32,9 +35,11 @@ */ @PublicEvolving public abstract class AbstractReporter implements MetricReporter { + protected final Logger log = LoggerFactory.getLogger(getClass()); protected final Map, String> gauges = new HashMap<>(); protected final Map counters = new HashMap<>(); + protected final Map histograms = new HashMap<>(); @Override public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) { @@ -45,6 +50,11 @@ public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetric counters.put((Counter) metric, name); } else if (metric instanceof Gauge) { gauges.put((Gauge) metric, name); + } else if (metric instanceof Histogram) { + histograms.put((Histogram) metric, name); + } else { + log.warn("Cannot add unknown metric type {}. This indicates that the reporter " + + "does not support this metric type.", metric.getClass().getName()); } } } @@ -56,6 +66,11 @@ public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetr counters.remove(metric); } else if (metric instanceof Gauge) { gauges.remove(metric); + } else if (metric instanceof Histogram) { + histograms.remove(metric); + } else { + log.warn("Cannot remove unknown metric type {}. This indicates that the reporter " + + "does not support this metric type.", metric.getClass().getName()); } } } diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java index 326d6d7aa5fb4..eaf0ea0c92f22 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.groups.AbstractMetricGroup; import org.apache.flink.util.NetUtils; @@ -146,8 +147,11 @@ public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetric jmxMetric = new JmxGauge((Gauge) metric); } else if (metric instanceof Counter) { jmxMetric = new JmxCounter((Counter) metric); + } else if (metric instanceof Histogram) { + jmxMetric = new JmxHistogram((Histogram) metric); } else { - LOG.error("Unknown metric type: " + metric.getClass().getName()); + LOG.error("Cannot add unknown metric type: {}. This indicates that the metric type " + + "is not supported by this reporter.", metric.getClass().getName()); return; } @@ -285,7 +289,7 @@ public interface JmxCounterMBean extends MetricMBean { private static class JmxCounter extends AbstractBean implements JmxCounterMBean { private Counter counter; - public JmxCounter(Counter counter) { + JmxCounter(Counter counter) { this.counter = counter; } @@ -303,7 +307,7 @@ private static class JmxGauge extends AbstractBean implements JmxGaugeMBean { private final Gauge gauge; - public JmxGauge(Gauge gauge) { + JmxGauge(Gauge gauge) { this.gauge = gauge; } @@ -313,6 +317,94 @@ public Object getValue() { } } + public interface JmxHistogramMBean extends MetricMBean { + long getCount(); + + double getMean(); + + double getStdDev(); + + long getMax(); + + long getMin(); + + double getMedian(); + + double get75thPercentile(); + + double get95thPercentile(); + + double get98thPercentile(); + + double get99thPercentile(); + + double get999thPercentile(); + } + + private static class JmxHistogram extends AbstractBean implements JmxHistogramMBean { + + private final Histogram histogram; + + JmxHistogram(Histogram histogram) { + this.histogram = histogram; + } + + @Override + public long getCount() { + return histogram.getCount(); + } + + @Override + public double getMean() { + return histogram.getStatistics().getMean(); + } + + @Override + public double getStdDev() { + return histogram.getStatistics().getStdDev(); + } + + @Override + public long getMax() { + return histogram.getStatistics().getMax(); + } + + @Override + public long getMin() { + return histogram.getStatistics().getMin(); + } + + @Override + public double getMedian() { + return histogram.getStatistics().getQuantile(0.5); + } + + @Override + public double get75thPercentile() { + return histogram.getStatistics().getQuantile(0.75); + } + + @Override + public double get95thPercentile() { + return histogram.getStatistics().getQuantile(0.95); + } + + @Override + public double get98thPercentile() { + return histogram.getStatistics().getQuantile(0.98); + } + + @Override + public double get99thPercentile() { + return histogram.getStatistics().getQuantile(0.99); + } + + @Override + public double get999thPercentile() { + return histogram.getStatistics().getQuantile(0.999); + } + } + /** * JMX Server implementation that JMX clients can connect to. * diff --git a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java index f8e0bf5761be1..8b718165b219b 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java @@ -25,12 +25,13 @@ import org.apache.flink.metrics.reporter.Scheduled; import org.apache.flink.metrics.util.TestReporter; +import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.Test; import static org.junit.Assert.*; -public class MetricRegistryTest { +public class MetricRegistryTest extends TestLogger { /** * Verifies that the reporter class argument is correctly used to instantiate and open the reporter. diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java index 7b35d91b66a1a..c7a112a5081e9 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java @@ -20,6 +20,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.HistogramStatistics; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.MetricRegistry; @@ -57,6 +59,25 @@ public Object getValue() { Assert.assertEquals(gauge, TestReporter1.lastPassedMetric); assertEquals("gauge", TestReporter1.lastPassedName); + Histogram histogram = root.histogram("histogram", new Histogram() { + @Override + public void update(long value) { + + } + + @Override + public long getCount() { + return 0; + } + + @Override + public HistogramStatistics getStatistics() { + return null; + } + }); + + Assert.assertEquals(histogram, TestReporter1.lastPassedMetric); + assertEquals("histogram", TestReporter1.lastPassedName); registry.shutdown(); } diff --git a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java index d25f7444cab35..9e638a796f4b4 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java @@ -20,11 +20,16 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.HistogramStatistics; import org.apache.flink.metrics.MetricRegistry; import org.apache.flink.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.metrics.util.TestReporter; +import org.apache.flink.util.TestLogger; import org.junit.Test; +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanInfo; import javax.management.MBeanServer; import javax.management.MBeanServerConnection; import javax.management.ObjectName; @@ -37,7 +42,7 @@ import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_REPORTER_CLASS; import static org.junit.Assert.assertEquals; -public class JMXReporterTest { +public class JMXReporterTest extends TestLogger { @Test public void testReplaceInvalidChars() { @@ -188,4 +193,105 @@ public Integer getValue() { rep2.close(); reg.shutdown(); } + + /** + * Tests that histograms are properly reported via the JMXReporter. + */ + @Test + public void testHistogramReporting() throws Exception { + MetricRegistry registry = null; + String histogramName = "histogram"; + + try { + Configuration config = new Configuration(); + + registry = new MetricRegistry(config); + + TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId"); + + TestingHistogram histogram = new TestingHistogram(); + + metricGroup.histogram(histogramName, histogram); + + MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + + ObjectName objectName = new ObjectName(JMXReporter.generateJmxName(histogramName, metricGroup.getScopeComponents())); + + MBeanInfo info = mBeanServer.getMBeanInfo(objectName); + + MBeanAttributeInfo[] attributeInfos = info.getAttributes(); + + assertEquals(11, attributeInfos.length); + + assertEquals(histogram.getCount(), mBeanServer.getAttribute(objectName, "Count")); + assertEquals(histogram.getStatistics().getMean(), mBeanServer.getAttribute(objectName, "Mean")); + assertEquals(histogram.getStatistics().getStdDev(), mBeanServer.getAttribute(objectName, "StdDev")); + assertEquals(histogram.getStatistics().getMax(), mBeanServer.getAttribute(objectName, "Max")); + assertEquals(histogram.getStatistics().getMin(), mBeanServer.getAttribute(objectName, "Min")); + assertEquals(histogram.getStatistics().getQuantile(0.5), mBeanServer.getAttribute(objectName, "Median")); + assertEquals(histogram.getStatistics().getQuantile(0.75), mBeanServer.getAttribute(objectName, "75thPercentile")); + assertEquals(histogram.getStatistics().getQuantile(0.95), mBeanServer.getAttribute(objectName, "95thPercentile")); + assertEquals(histogram.getStatistics().getQuantile(0.98), mBeanServer.getAttribute(objectName, "98thPercentile")); + assertEquals(histogram.getStatistics().getQuantile(0.99), mBeanServer.getAttribute(objectName, "99thPercentile")); + assertEquals(histogram.getStatistics().getQuantile(0.999), mBeanServer.getAttribute(objectName, "999thPercentile")); + + } finally { + if (registry != null) { + registry.shutdown(); + } + } + } + + static class TestingHistogram implements Histogram { + + @Override + public void update(long value) { + + } + + @Override + public long getCount() { + return 1; + } + + @Override + public HistogramStatistics getStatistics() { + return new HistogramStatistics() { + @Override + public double getQuantile(double quantile) { + return quantile; + } + + @Override + public long[] getValues() { + return new long[0]; + } + + @Override + public int size() { + return 3; + } + + @Override + public double getMean() { + return 4; + } + + @Override + public double getStdDev() { + return 5; + } + + @Override + public long getMax() { + return 6; + } + + @Override + public long getMin() { + return 7; + } + }; + } + } } diff --git a/flink-metric-reporters/flink-metrics-dropwizard/pom.xml b/flink-metrics/flink-metrics-dropwizard/pom.xml similarity index 89% rename from flink-metric-reporters/flink-metrics-dropwizard/pom.xml rename to flink-metrics/flink-metrics-dropwizard/pom.xml index a3868805640ee..90dbc00cd0893 100644 --- a/flink-metric-reporters/flink-metrics-dropwizard/pom.xml +++ b/flink-metrics/flink-metrics-dropwizard/pom.xml @@ -24,7 +24,7 @@ under the License. org.apache.flink - flink-metric-reporters + flink-metrics 1.1-SNAPSHOT .. @@ -40,6 +40,14 @@ under the License. provided + + org.apache.flink + flink-core + ${project.version} + test-jar + test + + io.dropwizard.metrics metrics-core diff --git a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java similarity index 78% rename from flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java rename to flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java index d67f3e3e3d5a6..062bbd8c468ce 100644 --- a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java +++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java @@ -24,14 +24,19 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.Configuration; -import org.apache.flink.dropwizard.metrics.CounterWrapper; -import org.apache.flink.dropwizard.metrics.GaugeWrapper; +import org.apache.flink.dropwizard.metrics.FlinkCounterWrapper; +import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper; +import org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper; +import org.apache.flink.dropwizard.metrics.FlinkHistogramWrapper; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.groups.AbstractMetricGroup; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; @@ -44,6 +49,8 @@ @PublicEvolving public abstract class ScheduledDropwizardReporter implements MetricReporter, Scheduled, Reporter { + protected final Logger log = LoggerFactory.getLogger(getClass()); + public static final String ARG_HOST = "host"; public static final String ARG_PORT = "port"; public static final String ARG_PREFIX = "prefix"; @@ -58,6 +65,7 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch private final Map, String> gauges = new HashMap<>(); private final Map counters = new HashMap<>(); + private final Map histograms = new HashMap<>(); // ------------------------------------------------------------------------ @@ -90,11 +98,23 @@ public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetric synchronized (this) { if (metric instanceof Counter) { counters.put((Counter) metric, fullName); - registry.register(fullName, new CounterWrapper((Counter) metric)); + registry.register(fullName, new FlinkCounterWrapper((Counter) metric)); } else if (metric instanceof Gauge) { gauges.put((Gauge) metric, fullName); - registry.register(fullName, GaugeWrapper.fromGauge((Gauge) metric)); + registry.register(fullName, FlinkGaugeWrapper.fromGauge((Gauge) metric)); + } else if (metric instanceof Histogram) { + Histogram histogram = (Histogram) metric; + histograms.put(histogram, fullName); + + if (histogram instanceof DropwizardHistogramWrapper) { + registry.register(fullName, ((DropwizardHistogramWrapper) histogram).getDropwizarHistogram()); + } else { + registry.register(fullName, new FlinkHistogramWrapper(histogram)); + } + } else { + log.warn("Cannot add metric of type {}. This indicates that the reporter " + + "does not support this metric type.", metric.getClass().getName()); } } } @@ -108,6 +128,8 @@ public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetr fullName = counters.remove(metric); } else if (metric instanceof Gauge) { fullName = gauges.remove(metric); + } else if (metric instanceof Histogram) { + fullName = histograms.remove(metric); } else { fullName = null; } diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java new file mode 100644 index 0000000000000..6f4eab2c33893 --- /dev/null +++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dropwizard.metrics; + +import com.codahale.metrics.Snapshot; +import org.apache.flink.metrics.HistogramStatistics; + +/** + * Dropwizard histogram statistics implementation returned by {@link DropwizardHistogramWrapper}. + * The statistics class wraps a {@link Snapshot} instance and forwards the method calls accordingly. + */ +class DropwizardHistogramStatistics extends HistogramStatistics { + + private final com.codahale.metrics.Snapshot snapshot; + + DropwizardHistogramStatistics(com.codahale.metrics.Snapshot snapshot) { + this.snapshot = snapshot; + } + + @Override + public double getQuantile(double quantile) { + return snapshot.getValue(quantile); + } + + @Override + public long[] getValues() { + return snapshot.getValues(); + } + + @Override + public int size() { + return snapshot.size(); + } + + @Override + public double getMean() { + return snapshot.getMean(); + } + + @Override + public double getStdDev() { + return snapshot.getStdDev(); + } + + @Override + public long getMax() { + return snapshot.getMax(); + } + + @Override + public long getMin() { + return snapshot.getMin(); + } +} diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java new file mode 100644 index 0000000000000..79a6a562245eb --- /dev/null +++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dropwizard.metrics; + +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.HistogramStatistics; + +/** + * Wrapper to use a Dropwizard {@link com.codahale.metrics.Histogram} as a Flink {@link Histogram}. + */ +public class DropwizardHistogramWrapper implements Histogram { + + private final com.codahale.metrics.Histogram dropwizarHistogram; + + public DropwizardHistogramWrapper(com.codahale.metrics.Histogram dropwizardHistogram) { + this.dropwizarHistogram = dropwizardHistogram; + } + + public com.codahale.metrics.Histogram getDropwizarHistogram() { + return dropwizarHistogram; + } + + @Override + public void update(long value) { + dropwizarHistogram.update(value); + } + + @Override + public long getCount() { + return dropwizarHistogram.getCount(); + } + + @Override + public HistogramStatistics getStatistics() { + return new DropwizardHistogramStatistics(dropwizarHistogram.getSnapshot()); + } +} diff --git a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java similarity index 89% rename from flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java rename to flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java index f6630b9854125..a44c3f5c1012e 100644 --- a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java +++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java @@ -19,10 +19,10 @@ import org.apache.flink.metrics.Counter; -public class CounterWrapper extends com.codahale.metrics.Counter { +public class FlinkCounterWrapper extends com.codahale.metrics.Counter { private final Counter counter; - public CounterWrapper(Counter counter) { + public FlinkCounterWrapper(Counter counter) { this.counter = counter; } diff --git a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java similarity index 82% rename from flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java rename to flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java index 655cd60c714dc..058ecaddc94bb 100644 --- a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java +++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java @@ -20,11 +20,11 @@ import org.apache.flink.metrics.Gauge; -public class GaugeWrapper implements com.codahale.metrics.Gauge { +public class FlinkGaugeWrapper implements com.codahale.metrics.Gauge { private final Gauge gauge; - public GaugeWrapper(Gauge gauge) { + public FlinkGaugeWrapper(Gauge gauge) { this.gauge = gauge; } @@ -33,9 +33,9 @@ public T getValue() { return this.gauge.getValue(); } - public static GaugeWrapper fromGauge(Gauge gauge) { + public static FlinkGaugeWrapper fromGauge(Gauge gauge) { @SuppressWarnings("unchecked") Gauge typedGauge = (Gauge) gauge; - return new GaugeWrapper<>(typedGauge); + return new FlinkGaugeWrapper<>(typedGauge); } } diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java new file mode 100644 index 0000000000000..8bd8078bf5150 --- /dev/null +++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dropwizard.metrics; + +import com.codahale.metrics.Snapshot; +import org.apache.flink.metrics.Histogram; + +/** + * Wrapper to use a Flink {@link Histogram} as a Dropwizard {@link com.codahale.metrics.Histogram}. + * This is necessary to report Flink's histograms via the Dropwizard + * {@link com.codahale.metrics.Reporter}. + */ +public class FlinkHistogramWrapper extends com.codahale.metrics.Histogram { + + private final Histogram histogram; + + public FlinkHistogramWrapper(Histogram histogram) { + super(null); + this.histogram = histogram; + } + + @Override + public void update(long value) { + histogram.update(value); + } + + @Override + public long getCount() { + return histogram.getCount(); + } + + @Override + public Snapshot getSnapshot() { + return new HistogramStatisticsWrapper(histogram.getStatistics()); + } +} diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java new file mode 100644 index 0000000000000..6d3a69b507692 --- /dev/null +++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dropwizard.metrics; + +import com.codahale.metrics.Snapshot; +import org.apache.flink.metrics.HistogramStatistics; + +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.charset.Charset; + +/** + * Wrapper to use Flink's {@link HistogramStatistics} as a Dropwizard {@link Snapshot}. This is + * necessary to report Flink's histograms via the Dropwizard {@link com.codahale.metrics.Reporter}. + */ +class HistogramStatisticsWrapper extends Snapshot { + + private static final Charset UTF_8 = Charset.forName("UTF-8"); + + private final HistogramStatistics histogramStatistics; + + HistogramStatisticsWrapper(HistogramStatistics histogramStatistics) { + this.histogramStatistics = histogramStatistics; + } + @Override + public double getValue(double quantile) { + return histogramStatistics.getQuantile(quantile); + } + + @Override + public long[] getValues() { + return histogramStatistics.getValues(); + } + + @Override + public int size() { + return histogramStatistics.size(); + } + + @Override + public long getMax() { + return histogramStatistics.getMax(); + } + + @Override + public double getMean() { + return histogramStatistics.getMean(); + } + + @Override + public long getMin() { + return histogramStatistics.getMin(); + } + + @Override + public double getStdDev() { + return histogramStatistics.getStdDev(); + } + + @Override + public void dump(OutputStream output) { + try(PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(output, UTF_8))){ + + for (Long value : histogramStatistics.getValues()) { + printWriter.printf("%d%n", value); + } + } + } +} diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java new file mode 100644 index 0000000000000..2479c26b25e48 --- /dev/null +++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dropwizard.metrics; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.ScheduledReporter; +import com.codahale.metrics.SlidingWindowReservoir; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.Timer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.dropwizard.ScheduledDropwizardReporter; +import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_REPORTER_INTERVAL; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class DropwizardFlinkHistogramWrapperTest extends TestLogger { + + /** + * Tests the histogram functionality of the DropwizardHistogramWrapper. + */ + @Test + public void testDropwizardHistogramWrapper() { + int size = 10; + DropwizardHistogramWrapper histogramWrapper = new DropwizardHistogramWrapper( + new com.codahale.metrics.Histogram(new SlidingWindowReservoir(size))); + + for (int i = 0; i < size; i++) { + histogramWrapper.update(i); + + assertEquals(i + 1, histogramWrapper.getCount()); + assertEquals(i, histogramWrapper.getStatistics().getMax()); + assertEquals(0, histogramWrapper.getStatistics().getMin()); + } + + assertEquals(size, histogramWrapper.getStatistics().size()); + assertEquals((size - 1)/2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001); + + for (int i = size; i < 2 * size; i++) { + histogramWrapper.update(i); + + assertEquals(i + 1, histogramWrapper.getCount()); + assertEquals(i, histogramWrapper.getStatistics().getMax()); + assertEquals(i + 1 - size, histogramWrapper.getStatistics().getMin()); + } + + assertEquals(size, histogramWrapper.getStatistics().size()); + assertEquals(size + (size - 1)/2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001); + } + + /** + * Tests that the DropwizardHistogramWrapper reports correct dropwizard snapshots to the + * ScheduledReporter. + */ + @Test + public void testDropwizardHistogramWrapperReporting() throws Exception { + long reportingInterval = 1000; + long timeout = 30000; + int size = 10; + String histogramMetricName = "histogram"; + Configuration config = new Configuration(); + config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestingReporter.class.getName()); + config.setString(KEY_METRICS_REPORTER_INTERVAL, reportingInterval + " MILLISECONDS"); + + MetricRegistry registry = null; + + try { + registry = new MetricRegistry(config); + DropwizardHistogramWrapper histogramWrapper = new DropwizardHistogramWrapper( + new com.codahale.metrics.Histogram(new SlidingWindowReservoir(size))); + + TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId"); + + metricGroup.histogram(histogramMetricName, histogramWrapper); + + String fullMetricName = metricGroup.getScopeString() + "." + histogramMetricName; + + Field f = registry.getClass().getDeclaredField("reporter"); + f.setAccessible(true); + + MetricReporter reporter = (MetricReporter) f.get(registry); + + assertTrue(reporter instanceof TestingReporter); + + TestingReporter testingReporter = (TestingReporter) reporter; + + TestingScheduledReporter scheduledReporter = testingReporter.scheduledReporter; + + // check that the metric has been registered + assertEquals(1, testingReporter.getMetrics().size()); + + for (int i = 0; i < size; i++) { + histogramWrapper.update(i); + } + + Future snapshotFuture = scheduledReporter.getNextHistogramSnapshot(fullMetricName); + + Snapshot snapshot = snapshotFuture.get(timeout, TimeUnit.MILLISECONDS); + + assertEquals(0, snapshot.getMin()); + assertEquals((size - 1) / 2.0, snapshot.getMedian(), 0.001); + assertEquals(size - 1, snapshot.getMax()); + assertEquals(size, snapshot.size()); + + registry.unregister(histogramWrapper, "histogram", metricGroup); + + // check that the metric has been de-registered + assertEquals(0, testingReporter.getMetrics().size()); + } finally { + if (registry != null) { + registry.shutdown(); + } + } + } + + public static class TestingReporter extends ScheduledDropwizardReporter { + TestingScheduledReporter scheduledReporter = null; + + @Override + public ScheduledReporter getReporter(Configuration config) { + scheduledReporter = new TestingScheduledReporter( + registry, + getClass().getName(), + null, + TimeUnit.MILLISECONDS, + TimeUnit.MILLISECONDS); + + return scheduledReporter; + } + + public Map getMetrics() { + return registry.getMetrics(); + } + } + + static class TestingScheduledReporter extends ScheduledReporter { + + final Map histogramSnapshots = new HashMap<>(); + final Map>> histogramSnapshotFutures = new HashMap<>(); + + protected TestingScheduledReporter(com.codahale.metrics.MetricRegistry registry, String name, MetricFilter filter, TimeUnit rateUnit, TimeUnit durationUnit) { + super(registry, name, filter, rateUnit, durationUnit); + } + + @Override + public void report(SortedMap gauges, SortedMap counters, SortedMap histograms, SortedMap meters, SortedMap timers) { + for (Map.Entry entry: histograms.entrySet()) { + reportHistogram(entry.getKey(), entry.getValue()); + } + } + + void reportHistogram(String name, com.codahale.metrics.Histogram histogram) { + histogramSnapshots.put(name, histogram.getSnapshot()); + + synchronized (histogramSnapshotFutures) { + if (histogramSnapshotFutures.containsKey(name)) { + List> futures = histogramSnapshotFutures.remove(name); + + for (CompletableFuture future: futures) { + future.complete(histogram.getSnapshot()); + } + } + } + } + + Future getNextHistogramSnapshot(String name) { + synchronized (histogramSnapshotFutures) { + List> futures; + if (histogramSnapshotFutures.containsKey(name)) { + futures = histogramSnapshotFutures.get(name); + } else { + futures = new ArrayList<>(); + histogramSnapshotFutures.put(name, futures); + } + + CompletableFuture future = new CompletableFuture<>(); + futures.add(future); + + return future; + } + } + } + + static class CompletableFuture implements Future { + + private Exception exception = null; + private T value = null; + + private Object lock = new Object(); + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + synchronized (lock) { + if (isDone()) { + return false; + } else { + exception = new CancellationException("Future was cancelled."); + + lock.notifyAll(); + + return true; + } + } + } + + @Override + public boolean isCancelled() { + return exception instanceof CancellationException; + } + + @Override + public boolean isDone() { + return value != null || exception != null; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + while (!isDone() && !isCancelled()) { + synchronized (lock) { + lock.wait(); + } + } + + if (exception != null) { + throw new ExecutionException(exception); + } else if (value != null) { + return value; + } else { + throw new ExecutionException(new Exception("Future did not complete correctly.")); + } + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + long timeoutMs = unit.toMillis(timeout); + long timeoutEnd = timeoutMs + System.currentTimeMillis(); + + while (!isDone() && !isCancelled() && timeoutMs > 0) { + synchronized (lock) { + lock.wait(unit.toMillis(timeoutMs)); + } + + timeoutMs = timeoutEnd - System.currentTimeMillis(); + } + + if (exception != null) { + throw new ExecutionException(exception); + } else if (value != null) { + return value; + } else { + throw new ExecutionException(new Exception("Future did not complete correctly.")); + } + } + + public boolean complete(T value) { + synchronized (lock) { + if (!isDone()) { + this.value = value; + + lock.notifyAll(); + + return true; + } else { + return false; + } + } + } + + public boolean fail(Exception exception) { + synchronized (lock) { + if (!isDone()) { + this.exception = exception; + + lock.notifyAll(); + + return true; + } else { + return false; + } + } + } + } +} diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/resources/log4j-test.properties b/flink-metrics/flink-metrics-dropwizard/src/test/resources/log4j-test.properties new file mode 100644 index 0000000000000..2226f68653181 --- /dev/null +++ b/flink-metrics/flink-metrics-dropwizard/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=OFF, testlogger + +# A1 is set to be a ConsoleAppender. +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/resources/logback-test.xml b/flink-metrics/flink-metrics-dropwizard/src/test/resources/logback-test.xml new file mode 100644 index 0000000000000..1c4ea08c44a20 --- /dev/null +++ b/flink-metrics/flink-metrics-dropwizard/src/test/resources/logback-test.xml @@ -0,0 +1,34 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n + + + + + + + + + + + + \ No newline at end of file diff --git a/flink-metric-reporters/flink-metrics-ganglia/pom.xml b/flink-metrics/flink-metrics-ganglia/pom.xml similarity index 98% rename from flink-metric-reporters/flink-metrics-ganglia/pom.xml rename to flink-metrics/flink-metrics-ganglia/pom.xml index a457ca1464b7c..c4f51da9a2cec 100644 --- a/flink-metric-reporters/flink-metrics-ganglia/pom.xml +++ b/flink-metrics/flink-metrics-ganglia/pom.xml @@ -24,7 +24,7 @@ under the License. org.apache.flink - flink-metric-reporters + flink-metrics 1.1-SNAPSHOT .. diff --git a/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java b/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java similarity index 100% rename from flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java rename to flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java diff --git a/flink-metric-reporters/flink-metrics-graphite/pom.xml b/flink-metrics/flink-metrics-graphite/pom.xml similarity index 97% rename from flink-metric-reporters/flink-metrics-graphite/pom.xml rename to flink-metrics/flink-metrics-graphite/pom.xml index 714b77f4327a1..45fb01857df4c 100644 --- a/flink-metric-reporters/flink-metrics-graphite/pom.xml +++ b/flink-metrics/flink-metrics-graphite/pom.xml @@ -24,7 +24,7 @@ under the License. org.apache.flink - flink-metric-reporters + flink-metrics 1.1-SNAPSHOT .. diff --git a/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java similarity index 100% rename from flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java rename to flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java diff --git a/flink-metric-reporters/flink-metrics-statsd/pom.xml b/flink-metrics/flink-metrics-statsd/pom.xml similarity index 86% rename from flink-metric-reporters/flink-metrics-statsd/pom.xml rename to flink-metrics/flink-metrics-statsd/pom.xml index 3052a1008f1e9..8ee0b56c7183c 100644 --- a/flink-metric-reporters/flink-metrics-statsd/pom.xml +++ b/flink-metrics/flink-metrics-statsd/pom.xml @@ -24,7 +24,7 @@ under the License. org.apache.flink - flink-metric-reporters + flink-metrics 1.1-SNAPSHOT .. @@ -39,5 +39,13 @@ under the License. ${project.version} provided + + + org.apache.flink + flink-core + ${project.version} + test-jar + test + diff --git a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java similarity index 75% rename from flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java rename to flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java index 087a265a9017c..3d9bf07ee87be 100644 --- a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java +++ b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java @@ -22,6 +22,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.HistogramStatistics; import org.apache.flink.metrics.reporter.AbstractReporter; import org.apache.flink.metrics.reporter.Scheduled; @@ -110,6 +112,10 @@ public void report() { } reportCounter(entry.getValue(), entry.getKey()); } + + for (Map.Entry entry : histograms.entrySet()) { + reportHistogram(entry.getValue(), entry.getKey()); + } } catch (ConcurrentModificationException | NoSuchElementException e) { // ignore - may happen when metrics are concurrently added or removed @@ -130,6 +136,41 @@ private void reportGauge(final String name, final Gauge gauge) { } } + private void reportHistogram(final String name, final Histogram histogram) { + if (histogram != null) { + + HistogramStatistics statistics = histogram.getStatistics(); + + if (statistics != null) { + send(prefix(name, "count"), String.valueOf(histogram.getCount())); + send(prefix(name, "max"), String.valueOf(statistics.getMax())); + send(prefix(name, "min"), String.valueOf(statistics.getMin())); + send(prefix(name, "mean"), String.valueOf(statistics.getMean())); + send(prefix(name, "stddev"), String.valueOf(statistics.getStdDev())); + send(prefix(name, "p50"), String.valueOf(statistics.getQuantile(0.5))); + send(prefix(name, "p75"), String.valueOf(statistics.getQuantile(0.75))); + send(prefix(name, "p95"), String.valueOf(statistics.getQuantile(0.95))); + send(prefix(name, "p98"), String.valueOf(statistics.getQuantile(0.98))); + send(prefix(name, "p99"), String.valueOf(statistics.getQuantile(0.99))); + send(prefix(name, "p999"), String.valueOf(statistics.getQuantile(0.999))); + } + } + } + + private String prefix(String ... names) { + if (names.length > 0) { + StringBuilder stringBuilder = new StringBuilder(names[0]); + + for (int i = 1; i < names.length; i++) { + stringBuilder.append('.').append(names[i]); + } + + return stringBuilder.toString(); + } else { + return ""; + } + } + private void send(final String name, final String value) { try { String formatted = String.format("%s:%s|g", name, value); diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java new file mode 100644 index 0000000000000..5f5ef40938b88 --- /dev/null +++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.statsd; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.HistogramStatistics; +import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.assertEquals; + +public class StatsDReporterTest extends TestLogger { + + /** + * Tests that histograms are properly reported via the StatsD reporter + */ + @Test + public void testStatsDHistogramReporting() throws Exception { + MetricRegistry registry = null; + DatagramSocketReceiver receiver = null; + Thread receiverThread = null; + long timeout = 5000; + long joinTimeout = 30000; + + String histogramName = "histogram"; + + try { + receiver = new DatagramSocketReceiver(); + + receiverThread = new Thread(receiver); + + receiverThread.start(); + + int port = receiver.getPort(); + + Configuration config = new Configuration(); + config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, StatsDReporter.class.getName()); + config.setString(MetricRegistry.KEY_METRICS_REPORTER_INTERVAL, "1 SECONDS"); + config.setString(MetricRegistry.KEY_METRICS_REPORTER_ARGUMENTS, "--host localhost --port " + port); + + registry = new MetricRegistry(config); + + TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId"); + + TestingHistogram histogram = new TestingHistogram(); + + metricGroup.histogram(histogramName, histogram); + + receiver.waitUntilNumLines(11, timeout); + + Set lines = receiver.getLines(); + + String prefix = metricGroup.getScopeString() + "." + histogramName; + + Set expectedLines = new HashSet<>(); + + expectedLines.add(prefix + ".count:1|g"); + expectedLines.add(prefix + ".mean:3.0|g"); + expectedLines.add(prefix + ".min:6|g"); + expectedLines.add(prefix + ".max:5|g"); + expectedLines.add(prefix + ".stddev:4.0|g"); + expectedLines.add(prefix + ".p75:0.75|g"); + expectedLines.add(prefix + ".p98:0.98|g"); + expectedLines.add(prefix + ".p99:0.99|g"); + expectedLines.add(prefix + ".p999:0.999|g"); + expectedLines.add(prefix + ".p95:0.95|g"); + expectedLines.add(prefix + ".p50:0.5|g"); + + assertEquals(expectedLines, lines); + + } finally { + if (registry != null) { + registry.shutdown(); + } + + if (receiver != null) { + receiver.stop(); + } + + if (receiverThread != null) { + receiverThread.join(joinTimeout); + } + } + } + + public static class TestingHistogram implements Histogram { + + @Override + public void update(long value) { + + } + + @Override + public long getCount() { + return 1; + } + + @Override + public HistogramStatistics getStatistics() { + return new HistogramStatistics() { + @Override + public double getQuantile(double quantile) { + return quantile; + } + + @Override + public long[] getValues() { + return new long[0]; + } + + @Override + public int size() { + return 2; + } + + @Override + public double getMean() { + return 3; + } + + @Override + public double getStdDev() { + return 4; + } + + @Override + public long getMax() { + return 5; + } + + @Override + public long getMin() { + return 6; + } + }; + } + } + + public static class DatagramSocketReceiver implements Runnable { + private static final Object obj = new Object(); + + private final DatagramSocket socket; + private final ConcurrentHashMap lines; + + private boolean running = true; + + public DatagramSocketReceiver() throws SocketException { + socket = new DatagramSocket(); + lines = new ConcurrentHashMap<>(); + } + + public int getPort() { + return socket.getLocalPort(); + } + + public void stop() { + running = false; + socket.close(); + } + + public void waitUntilNumLines(int numberLines, long timeout) throws TimeoutException { + long endTimeout = System.currentTimeMillis() + timeout; + long remainingTimeout = timeout; + + while (numberLines > lines.size() && remainingTimeout > 0) { + synchronized (lines) { + try { + lines.wait(remainingTimeout); + } catch (InterruptedException e) { + // ignore interruption exceptions + } + } + + remainingTimeout = endTimeout - System.currentTimeMillis(); + } + + if (remainingTimeout <= 0) { + throw new TimeoutException("Have not received " + numberLines + " in time."); + } + } + + public Set getLines() { + return lines.keySet(); + } + + @Override + public void run() { + while (running) { + try { + byte[] buffer = new byte[1024]; + + DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + socket.receive(packet); + + String line = new String(packet.getData(), 0, packet.getLength()); + + lines.put(line, obj); + + synchronized (lines) { + lines.notifyAll(); + } + } catch (IOException ex) { + // ignore the exceptions + } + } + } + } +} diff --git a/flink-metrics/flink-metrics-statsd/src/test/resources/log4j-test.properties b/flink-metrics/flink-metrics-statsd/src/test/resources/log4j-test.properties new file mode 100644 index 0000000000000..2226f68653181 --- /dev/null +++ b/flink-metrics/flink-metrics-statsd/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=OFF, testlogger + +# A1 is set to be a ConsoleAppender. +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n diff --git a/flink-metrics/flink-metrics-statsd/src/test/resources/logback-test.xml b/flink-metrics/flink-metrics-statsd/src/test/resources/logback-test.xml new file mode 100644 index 0000000000000..1c4ea08c44a20 --- /dev/null +++ b/flink-metrics/flink-metrics-statsd/src/test/resources/logback-test.xml @@ -0,0 +1,34 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n + + + + + + + + + + + + \ No newline at end of file diff --git a/flink-metric-reporters/pom.xml b/flink-metrics/pom.xml similarity index 94% rename from flink-metric-reporters/pom.xml rename to flink-metrics/pom.xml index 01a809ccba964..542f49c26bc32 100644 --- a/flink-metric-reporters/pom.xml +++ b/flink-metrics/pom.xml @@ -29,8 +29,8 @@ under the License. .. - flink-metric-reporters - flink-metric-reporters + flink-metrics + flink-metrics pom diff --git a/pom.xml b/pom.xml index 9dc8846a24aef..9da3fa5a70e4e 100644 --- a/pom.xml +++ b/pom.xml @@ -74,7 +74,7 @@ under the License. flink-quickstart flink-contrib flink-dist - flink-metric-reporters + flink-metrics