Skip to content

Commit

Permalink
[FLINK-3951] Add Histogram metric type
Browse files Browse the repository at this point in the history
This closes #2112
  • Loading branch information
tillrohrmann authored and zentol committed Jun 27, 2016
1 parent d43bf8d commit ee3c7a8
Show file tree
Hide file tree
Showing 32 changed files with 1,450 additions and 23 deletions.
52 changes: 52 additions & 0 deletions flink-core/src/main/java/org/apache/flink/metrics/Histogram.java
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.metrics;

import org.apache.flink.annotation.PublicEvolving;

/**
* Histogram interface to be used with Flink's metrics system.
*
* The histogram allows to record values, get the current count of recorded values and create
* histogram statistics for the currently seen elements.
*/
@PublicEvolving
public interface Histogram extends Metric {

/**
* Update the histogram with the given value.
*
* @param value Value to update the histogram with
*/
void update(long value);

/**
* Get the count of seen elements.
*
* @return Count of seen elements
*/
long getCount();

/**
* Create statistics for the currently recorded elements.
*
* @return Statistics about the currently recorded elements
*/
HistogramStatistics getStatistics();
}
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.metrics;

import org.apache.flink.annotation.PublicEvolving;

/**
* Histogram statistics represent the current snapshot of elements recorded in the histogram.
*
* The histogram statistics allow to calculate values for quantiles, the mean, the standard
* deviation, the minimum and the maximum.
*/
@PublicEvolving
public abstract class HistogramStatistics {

/**
* Returns the value for the given quantile based on the represented histogram statistics.
*
* @param quantile Quantile to calculate the value for
* @return Value for the given quantile
*/
public abstract double getQuantile(double quantile);

/**
* Returns the elements of the statistics' sample
*
* @return Elements of the statistics' sample
*/
public abstract long[] getValues();

/**
* Returns the size of the statistics' sample
*
* @return Size of the statistics' sample
*/
public abstract int size();

/**
* Returns the mean of the histogram values.
*
* @return Mean of the histogram values
*/
public abstract double getMean();

/**
* Returns the standard deviation of the distribution reflected by the histogram statistics.
*
* @return Standard deviation of histogram distribution
*/
public abstract double getStdDev();

/**
* Returns the maximum value of the histogram.
*
* @return Maximum value of the histogram
*/
public abstract long getMax();

/**
* Returns the minimum value of the histogram.
*
* @return Minimum value of the histogram
*/
public abstract long getMin();
}
20 changes: 20 additions & 0 deletions flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
Expand Up @@ -115,6 +115,26 @@ public interface MetricGroup {
*/ */
<T, G extends Gauge<T>> G gauge(String name, G gauge); <T, G extends Gauge<T>> G gauge(String name, G gauge);


/**
* Registers a new {@link Histogram} with Flink.
*
* @param name name of the histogram
* @param histogram histogram to register
* @param <H> histogram type
* @return the registered histogram
*/
<H extends Histogram> H histogram(String name, H histogram);

/**
* Registers a new {@link Histogram} with Flink.
*
* @param name name of the histogram
* @param histogram histogram to register
* @param <H> histogram type
* @return the registered histogram
*/
<H extends Histogram> H histogram(int name, H histogram);

// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// Groups // Groups
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.MetricRegistry; import org.apache.flink.metrics.MetricRegistry;
Expand Down Expand Up @@ -172,6 +173,17 @@ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
return gauge; return gauge;
} }


@Override
public <H extends Histogram> H histogram(int name, H histogram) {
return histogram(String.valueOf(name), histogram);
}

@Override
public <H extends Histogram> H histogram(String name, H histogram) {
addMetric(name, histogram);
return histogram;
}

/** /**
* Adds the given metric to the group and registers it at the registry, if the group * Adds the given metric to the group and registers it at the registry, if the group
* is not yet closed, and if no metric with the same name has been registered before. * is not yet closed, and if no metric with the same name has been registered before.
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.metrics.SimpleCounter;


Expand Down Expand Up @@ -71,7 +72,16 @@ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
return gauge; return gauge;
} }



@Override
public <H extends Histogram> H histogram(int name, H histogram) {
return histogram;
}

@Override
public <H extends Histogram> H histogram(String name, H histogram) {
return histogram;
}

@Override @Override
public MetricGroup addGroup(int name) { public MetricGroup addGroup(int name) {
return addGroup(String.valueOf(name)); return addGroup(String.valueOf(name));
Expand Down
Expand Up @@ -21,8 +21,11 @@
import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.AbstractMetricGroup; import org.apache.flink.metrics.groups.AbstractMetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
Expand All @@ -32,9 +35,11 @@
*/ */
@PublicEvolving @PublicEvolving
public abstract class AbstractReporter implements MetricReporter { public abstract class AbstractReporter implements MetricReporter {
protected final Logger log = LoggerFactory.getLogger(getClass());


protected final Map<Gauge<?>, String> gauges = new HashMap<>(); protected final Map<Gauge<?>, String> gauges = new HashMap<>();
protected final Map<Counter, String> counters = new HashMap<>(); protected final Map<Counter, String> counters = new HashMap<>();
protected final Map<Histogram, String> histograms = new HashMap<>();


@Override @Override
public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) { public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
Expand All @@ -45,6 +50,11 @@ public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetric
counters.put((Counter) metric, name); counters.put((Counter) metric, name);
} else if (metric instanceof Gauge) { } else if (metric instanceof Gauge) {
gauges.put((Gauge<?>) metric, name); gauges.put((Gauge<?>) metric, name);
} else if (metric instanceof Histogram) {
histograms.put((Histogram) metric, name);
} else {
log.warn("Cannot add unknown metric type {}. This indicates that the reporter " +
"does not support this metric type.", metric.getClass().getName());
} }
} }
} }
Expand All @@ -56,6 +66,11 @@ public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetr
counters.remove(metric); counters.remove(metric);
} else if (metric instanceof Gauge) { } else if (metric instanceof Gauge) {
gauges.remove(metric); gauges.remove(metric);
} else if (metric instanceof Histogram) {
histograms.remove(metric);
} else {
log.warn("Cannot remove unknown metric type {}. This indicates that the reporter " +
"does not support this metric type.", metric.getClass().getName());
} }
} }
} }
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.AbstractMetricGroup; import org.apache.flink.metrics.groups.AbstractMetricGroup;
import org.apache.flink.util.NetUtils; import org.apache.flink.util.NetUtils;
Expand Down Expand Up @@ -146,8 +147,11 @@ public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetric
jmxMetric = new JmxGauge((Gauge<?>) metric); jmxMetric = new JmxGauge((Gauge<?>) metric);
} else if (metric instanceof Counter) { } else if (metric instanceof Counter) {
jmxMetric = new JmxCounter((Counter) metric); jmxMetric = new JmxCounter((Counter) metric);
} else if (metric instanceof Histogram) {
jmxMetric = new JmxHistogram((Histogram) metric);
} else { } else {
LOG.error("Unknown metric type: " + metric.getClass().getName()); LOG.error("Cannot add unknown metric type: {}. This indicates that the metric type " +
"is not supported by this reporter.", metric.getClass().getName());
return; return;
} }


Expand Down Expand Up @@ -285,7 +289,7 @@ public interface JmxCounterMBean extends MetricMBean {
private static class JmxCounter extends AbstractBean implements JmxCounterMBean { private static class JmxCounter extends AbstractBean implements JmxCounterMBean {
private Counter counter; private Counter counter;


public JmxCounter(Counter counter) { JmxCounter(Counter counter) {
this.counter = counter; this.counter = counter;
} }


Expand All @@ -303,7 +307,7 @@ private static class JmxGauge extends AbstractBean implements JmxGaugeMBean {


private final Gauge<?> gauge; private final Gauge<?> gauge;


public JmxGauge(Gauge<?> gauge) { JmxGauge(Gauge<?> gauge) {
this.gauge = gauge; this.gauge = gauge;
} }


Expand All @@ -313,6 +317,94 @@ public Object getValue() {
} }
} }


public interface JmxHistogramMBean extends MetricMBean {
long getCount();

double getMean();

double getStdDev();

long getMax();

long getMin();

double getMedian();

double get75thPercentile();

double get95thPercentile();

double get98thPercentile();

double get99thPercentile();

double get999thPercentile();
}

private static class JmxHistogram extends AbstractBean implements JmxHistogramMBean {

private final Histogram histogram;

JmxHistogram(Histogram histogram) {
this.histogram = histogram;
}

@Override
public long getCount() {
return histogram.getCount();
}

@Override
public double getMean() {
return histogram.getStatistics().getMean();
}

@Override
public double getStdDev() {
return histogram.getStatistics().getStdDev();
}

@Override
public long getMax() {
return histogram.getStatistics().getMax();
}

@Override
public long getMin() {
return histogram.getStatistics().getMin();
}

@Override
public double getMedian() {
return histogram.getStatistics().getQuantile(0.5);
}

@Override
public double get75thPercentile() {
return histogram.getStatistics().getQuantile(0.75);
}

@Override
public double get95thPercentile() {
return histogram.getStatistics().getQuantile(0.95);
}

@Override
public double get98thPercentile() {
return histogram.getStatistics().getQuantile(0.98);
}

@Override
public double get99thPercentile() {
return histogram.getStatistics().getQuantile(0.99);
}

@Override
public double get999thPercentile() {
return histogram.getStatistics().getQuantile(0.999);
}
}

/** /**
* JMX Server implementation that JMX clients can connect to. * JMX Server implementation that JMX clients can connect to.
* *
Expand Down

0 comments on commit ee3c7a8

Please sign in to comment.