From 1a8e1f6193823e70b1dc6abc1146299042c25c7d Mon Sep 17 00:00:00 2001 From: lamber-ken Date: Wed, 20 Jun 2018 12:26:10 +0800 Subject: [PATCH 1/3] add prometheus pushgateway reporter --- docs/monitoring/metrics.md | 25 ++ .../flink-metrics-prometheus/pom.xml | 6 + .../AbstractPrometheusReporter.java | 279 ++++++++++++++++++ .../PrometheusPushGatewayReporter.java | 94 ++++++ .../prometheus/PrometheusReporter.java | 243 +-------------- .../prometheus/PrometheusReporterTest.java | 8 +- 6 files changed, 413 insertions(+), 242 deletions(-) create mode 100644 flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java create mode 100644 flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index 49d7ba8d89d39..cfcf379529d9e 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -700,6 +700,31 @@ Flink metric types are mapped to Prometheus metric types as follows: All Flink metrics variables (see [List of all Variables](#list-of-all-variables)) are exported to Prometheus as labels. +### PrometheusPushGateway (org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter) + +In order to use this reporter you must copy `/opt/flink-metrics-prometheus-{{site.version}}.jar` into the `/lib` folder +of your Flink distribution. + +Parameters: + +- `host` - the PushGateway server host +- `port` - the PushGateway server port +- `prefix` - (optional) the prefix is used to compose the jobName, defaults to `flink`. The jobName is used to distinguish different flink clusters + +Example configuration: + +{% highlight yaml %} + +metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter +metrics.reporter.promgateway.host: localhost +metrics.reporter.promgateway.port: 9091 +metrics.reporter.promgateway.prefix: flink + +{% endhighlight %} + +PrometheusPushGatewayReporter push metrics to a [Pushgateway](https://github.com/prometheus/pushgateway). The Pushgateway then exposes +these metrics to Prometheus. The working mechanism is different from PrometheusReporter (see [PrometheusReporter](#prometheus-orgapacheflinkmetricsprometheusprometheusreporter)). + ### StatsD (org.apache.flink.metrics.statsd.StatsDReporter) In order to use this reporter you must copy `/opt/flink-metrics-statsd-{{site.version}}.jar` into the `/lib` folder diff --git a/flink-metrics/flink-metrics-prometheus/pom.xml b/flink-metrics/flink-metrics-prometheus/pom.xml index b0cad841ad00b..9aad69c16bedc 100644 --- a/flink-metrics/flink-metrics-prometheus/pom.xml +++ b/flink-metrics/flink-metrics-prometheus/pom.xml @@ -73,6 +73,12 @@ under the License. ${prometheus.version} + + io.prometheus + simpleclient_pushgateway + ${prometheus.version} + + diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java new file mode 100644 index 0000000000000..34ac0470e7699 --- /dev/null +++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; +import org.apache.flink.runtime.metrics.groups.FrontMetricGroup; + +import io.prometheus.client.Collector; +import io.prometheus.client.CollectorRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + + +/** + * base prometheus reporter for prometheus metrics. + */ +@PublicEvolving +public abstract class AbstractPrometheusReporter implements MetricReporter { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private static final Pattern UNALLOWED_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9:_]"); + private static final CharacterFilter CHARACTER_FILTER = new CharacterFilter() { + @Override + public String filterCharacters(String input) { + return replaceInvalidChars(input); + } + }; + + private static final char SCOPE_SEPARATOR = '_'; + private static final String SCOPE_PREFIX = "flink" + SCOPE_SEPARATOR; + + private final Map> collectorsWithCountByMetricName = new HashMap<>(); + + @VisibleForTesting + static String replaceInvalidChars(final String input) { + // https://prometheus.io/docs/instrumenting/writing_exporters/ + // Only [a-zA-Z0-9:_] are valid in metric names, any other characters should be sanitized to an underscore. + return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_"); + } + + @Override + public void close() { + CollectorRegistry.defaultRegistry.clear(); + } + + @Override + public void notifyOfAddedMetric(final Metric metric, final String metricName, final MetricGroup group) { + + List dimensionKeys = new LinkedList<>(); + List dimensionValues = new LinkedList<>(); + for (final Map.Entry dimension : group.getAllVariables().entrySet()) { + final String key = dimension.getKey(); + dimensionKeys.add(CHARACTER_FILTER.filterCharacters(key.substring(1, key.length() - 1))); + dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue())); + } + + final String scopedMetricName = getScopedName(metricName, group); + final String helpString = metricName + " (scope: " + getLogicalScope(group) + ")"; + + final Collector collector; + Integer count = 0; + + synchronized (this) { + if (collectorsWithCountByMetricName.containsKey(scopedMetricName)) { + final AbstractMap.SimpleImmutableEntry collectorWithCount = collectorsWithCountByMetricName.get(scopedMetricName); + collector = collectorWithCount.getKey(); + count = collectorWithCount.getValue(); + } else { + collector = createCollector(metric, dimensionKeys, dimensionValues, scopedMetricName, helpString); + try { + collector.register(); + } catch (Exception e) { + log.warn("There was a problem registering metric {}.", metricName, e); + } + } + addMetric(metric, dimensionValues, collector); + collectorsWithCountByMetricName.put(scopedMetricName, new AbstractMap.SimpleImmutableEntry<>(collector, count + 1)); + } + } + + @Override + public void notifyOfRemovedMetric(final Metric metric, final String metricName, final MetricGroup group) { + final String scopedMetricName = getScopedName(metricName, group); + synchronized (this) { + final AbstractMap.SimpleImmutableEntry collectorWithCount = collectorsWithCountByMetricName.get(scopedMetricName); + final Integer count = collectorWithCount.getValue(); + final Collector collector = collectorWithCount.getKey(); + if (count == 1) { + try { + CollectorRegistry.defaultRegistry.unregister(collector); + } catch (Exception e) { + log.warn("There was a problem unregistering metric {}.", scopedMetricName, e); + } + collectorsWithCountByMetricName.remove(scopedMetricName); + } else { + collectorsWithCountByMetricName.put(scopedMetricName, new AbstractMap.SimpleImmutableEntry<>(collector, count - 1)); + } + } + } + + private static String getScopedName(String metricName, MetricGroup group) { + return SCOPE_PREFIX + getLogicalScope(group) + SCOPE_SEPARATOR + CHARACTER_FILTER.filterCharacters(metricName); + } + + private Collector createCollector(Metric metric, List dimensionKeys, List dimensionValues, String scopedMetricName, String helpString) { + Collector collector; + if (metric instanceof Gauge || metric instanceof Counter || metric instanceof Meter) { + collector = io.prometheus.client.Gauge + .build() + .name(scopedMetricName) + .help(helpString) + .labelNames(toArray(dimensionKeys)) + .create(); + } else if (metric instanceof Histogram) { + collector = new HistogramSummaryProxy((Histogram) metric, scopedMetricName, helpString, dimensionKeys, dimensionValues); + } else { + log.warn("Cannot create collector for unknown metric type: {}. This indicates that the metric type is not supported by this reporter.", + metric.getClass().getName()); + collector = null; + } + return collector; + } + + private void addMetric(Metric metric, List dimensionValues, Collector collector) { + if (metric instanceof Gauge) { + ((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Gauge) metric), toArray(dimensionValues)); + } else if (metric instanceof Counter) { + ((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Counter) metric), toArray(dimensionValues)); + } else if (metric instanceof Meter) { + ((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Meter) metric), toArray(dimensionValues)); + } else if (metric instanceof Histogram) { + ((HistogramSummaryProxy) collector).addChild((Histogram) metric, dimensionValues); + } else { + log.warn("Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.", + metric.getClass().getName()); + } + } + + @SuppressWarnings("unchecked") + private static String getLogicalScope(MetricGroup group) { + return ((FrontMetricGroup>) group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR); + } + + @VisibleForTesting + io.prometheus.client.Gauge.Child gaugeFrom(Gauge gauge) { + return new io.prometheus.client.Gauge.Child() { + @Override + public double get() { + final Object value = gauge.getValue(); + if (value == null) { + log.debug("Gauge {} is null-valued, defaulting to 0.", gauge); + return 0; + } + if (value instanceof Double) { + return (double) value; + } + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + if (value instanceof Boolean) { + return ((Boolean) value) ? 1 : 0; + } + log.debug("Invalid type for Gauge {}: {}, only number types and booleans are supported by this reporter.", + gauge, value.getClass().getName()); + return 0; + } + }; + } + + private static io.prometheus.client.Gauge.Child gaugeFrom(Counter counter) { + return new io.prometheus.client.Gauge.Child() { + @Override + public double get() { + return (double) counter.getCount(); + } + }; + } + + private static io.prometheus.client.Gauge.Child gaugeFrom(Meter meter) { + return new io.prometheus.client.Gauge.Child() { + @Override + public double get() { + return meter.getRate(); + } + }; + } + + @VisibleForTesting + static class HistogramSummaryProxy extends Collector { + static final List QUANTILES = Arrays.asList(.5, .75, .95, .98, .99, .999); + + private final String metricName; + private final String helpString; + private final List labelNamesWithQuantile; + + private final Map, Histogram> histogramsByLabelValues = new HashMap<>(); + + HistogramSummaryProxy(final Histogram histogram, final String metricName, final String helpString, final List labelNames, final List labelValues) { + this.metricName = metricName; + this.helpString = helpString; + this.labelNamesWithQuantile = addToList(labelNames, "quantile"); + histogramsByLabelValues.put(labelValues, histogram); + } + + @Override + public List collect() { + // We cannot use SummaryMetricFamily because it is impossible to get a sum of all values (at least for Dropwizard histograms, + // whose snapshot's values array only holds a sample of recent values). + + List samples = new LinkedList<>(); + for (Map.Entry, Histogram> labelValuesToHistogram : histogramsByLabelValues.entrySet()) { + addSamples(labelValuesToHistogram.getKey(), labelValuesToHistogram.getValue(), samples); + } + return Collections.singletonList(new MetricFamilySamples(metricName, Type.SUMMARY, helpString, samples)); + } + + void addChild(final Histogram histogram, final List labelValues) { + histogramsByLabelValues.put(labelValues, histogram); + } + + private void addSamples(final List labelValues, final Histogram histogram, final List samples) { + samples.add(new MetricFamilySamples.Sample(metricName + "_count", + labelNamesWithQuantile.subList(0, labelNamesWithQuantile.size() - 1), labelValues, histogram.getCount())); + for (final Double quantile : QUANTILES) { + samples.add(new MetricFamilySamples.Sample(metricName, labelNamesWithQuantile, + addToList(labelValues, quantile.toString()), + histogram.getStatistics().getQuantile(quantile))); + } + } + } + + private static List addToList(List list, String element) { + final List result = new ArrayList<>(list); + result.add(element); + return result; + } + + private static String[] toArray(List list) { + return list.toArray(new String[list.size()]); + } + +} diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java new file mode 100644 index 0000000000000..774fd7fbd9b5d --- /dev/null +++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.util.AbstractID; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.PushGateway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * /** + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway. + */ +@PublicEvolving +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled { + private final Logger log = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class); + + public static final String ARG_HOST = "host"; + public static final String ARG_PORT = "port"; + public static final String ARG_JOB_NAME_PREFIX = "prefix"; + + public static final char JOB_NAME_SEPARATOR = '-'; + public static final String DEFAULT_JOB_NAME_PREFIX = "flink"; + + private PushGateway pushGateway; + private String jobName; + + @Override + public void open(MetricConfig config) { + + // reporter configs + String host = config.getString(ARG_HOST, null); + int port = config.getInteger(ARG_PORT, -1); + String jobNamePrefix = config.getString(ARG_JOB_NAME_PREFIX, DEFAULT_JOB_NAME_PREFIX); + + // host port + if (host == null || host.length() == 0 || port < 1) { + throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port); + } + + // jobname + String random = new AbstractID().toString(); + jobName = jobNamePrefix + JOB_NAME_SEPARATOR + random; + + pushGateway = new PushGateway(host + ":" + port); + log.info("Configured PrometheusPushGatewayReporter with {host:{}, port:{}, jobName:{}}", host, port, jobName); + } + + @Override + public void report() { + try { + pushGateway.push(CollectorRegistry.defaultRegistry, jobName); + } catch (Exception e) { + log.warn("Failed reporting metrics to Prometheus.", e); + } + } + + @Override + public void close() { + if (pushGateway != null) { + try { + pushGateway.delete(jobName); + } catch (IOException e) { + log.warn("Failed to delete the job of Pushgateway", e); + } + } + super.close(); + } +} diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java index 48fd8a42b53cc..d3483b2dfe94a 100644 --- a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java +++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java @@ -20,62 +20,31 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.metrics.CharacterFilter; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.Histogram; -import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricConfig; -import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.reporter.MetricReporter; -import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; -import org.apache.flink.runtime.metrics.groups.FrontMetricGroup; import org.apache.flink.util.NetUtils; import org.apache.flink.util.Preconditions; -import io.prometheus.client.Collector; -import io.prometheus.client.CollectorRegistry; import io.prometheus.client.exporter.HTTPServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.AbstractMap; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.regex.Pattern; /** * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus. */ @PublicEvolving -public class PrometheusReporter implements MetricReporter { - private static final Logger LOG = LoggerFactory.getLogger(PrometheusReporter.class); +public class PrometheusReporter extends AbstractPrometheusReporter { + private final Logger log = LoggerFactory.getLogger(PrometheusReporter.class); static final String ARG_PORT = "port"; private static final String DEFAULT_PORT = "9249"; - private static final Pattern UNALLOWED_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9:_]"); - private static final CharacterFilter CHARACTER_FILTER = new CharacterFilter() { - @Override - public String filterCharacters(String input) { - return replaceInvalidChars(input); - } - }; - - private static final char SCOPE_SEPARATOR = '_'; - private static final String SCOPE_PREFIX = "flink" + SCOPE_SEPARATOR; - private HTTPServer httpServer; private int port; - private final Map> collectorsWithCountByMetricName = new HashMap<>(); @VisibleForTesting int getPort() { @@ -83,13 +52,6 @@ int getPort() { return port; } - @VisibleForTesting - static String replaceInvalidChars(final String input) { - // https://prometheus.io/docs/instrumenting/writing_exporters/ - // Only [a-zA-Z0-9:_] are valid in metric names, any other characters should be sanitized to an underscore. - return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_"); - } - @Override public void open(MetricConfig config) { String portsConfig = config.getString(ARG_PORT, DEFAULT_PORT); @@ -101,10 +63,10 @@ public void open(MetricConfig config) { // internally accesses CollectorRegistry.defaultRegistry httpServer = new HTTPServer(port); this.port = port; - LOG.info("Started PrometheusReporter HTTP server on port {}.", port); + log.info("Started PrometheusReporter HTTP server on port {}.", port); break; } catch (IOException ioe) { //assume port conflict - LOG.debug("Could not start PrometheusReporter HTTP server on port {}.", port, ioe); + log.debug("Could not start PrometheusReporter HTTP server on port {}.", port, ioe); } } if (httpServer == null) { @@ -117,202 +79,7 @@ public void close() { if (httpServer != null) { httpServer.stop(); } - CollectorRegistry.defaultRegistry.clear(); - } - - @Override - public void notifyOfAddedMetric(final Metric metric, final String metricName, final MetricGroup group) { - - List dimensionKeys = new LinkedList<>(); - List dimensionValues = new LinkedList<>(); - for (final Map.Entry dimension : group.getAllVariables().entrySet()) { - final String key = dimension.getKey(); - dimensionKeys.add(CHARACTER_FILTER.filterCharacters(key.substring(1, key.length() - 1))); - dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue())); - } - - final String scopedMetricName = getScopedName(metricName, group); - final String helpString = metricName + " (scope: " + getLogicalScope(group) + ")"; - - final Collector collector; - Integer count = 0; - - synchronized (this) { - if (collectorsWithCountByMetricName.containsKey(scopedMetricName)) { - final AbstractMap.SimpleImmutableEntry collectorWithCount = collectorsWithCountByMetricName.get(scopedMetricName); - collector = collectorWithCount.getKey(); - count = collectorWithCount.getValue(); - } else { - collector = createCollector(metric, dimensionKeys, dimensionValues, scopedMetricName, helpString); - try { - collector.register(); - } catch (Exception e) { - LOG.warn("There was a problem registering metric {}.", metricName, e); - } - } - addMetric(metric, dimensionValues, collector); - collectorsWithCountByMetricName.put(scopedMetricName, new AbstractMap.SimpleImmutableEntry<>(collector, count + 1)); - } - } - - private static String getScopedName(String metricName, MetricGroup group) { - return SCOPE_PREFIX + getLogicalScope(group) + SCOPE_SEPARATOR + CHARACTER_FILTER.filterCharacters(metricName); - } - - private static Collector createCollector(Metric metric, List dimensionKeys, List dimensionValues, String scopedMetricName, String helpString) { - Collector collector; - if (metric instanceof Gauge || metric instanceof Counter || metric instanceof Meter) { - collector = io.prometheus.client.Gauge - .build() - .name(scopedMetricName) - .help(helpString) - .labelNames(toArray(dimensionKeys)) - .create(); - } else if (metric instanceof Histogram) { - collector = new HistogramSummaryProxy((Histogram) metric, scopedMetricName, helpString, dimensionKeys, dimensionValues); - } else { - LOG.warn("Cannot create collector for unknown metric type: {}. This indicates that the metric type is not supported by this reporter.", - metric.getClass().getName()); - collector = null; - } - return collector; - } - - private static void addMetric(Metric metric, List dimensionValues, Collector collector) { - if (metric instanceof Gauge) { - ((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Gauge) metric), toArray(dimensionValues)); - } else if (metric instanceof Counter) { - ((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Counter) metric), toArray(dimensionValues)); - } else if (metric instanceof Meter) { - ((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Meter) metric), toArray(dimensionValues)); - } else if (metric instanceof Histogram) { - ((HistogramSummaryProxy) collector).addChild((Histogram) metric, dimensionValues); - } else { - LOG.warn("Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.", - metric.getClass().getName()); - } - } - - @Override - public void notifyOfRemovedMetric(final Metric metric, final String metricName, final MetricGroup group) { - final String scopedMetricName = getScopedName(metricName, group); - synchronized (this) { - final AbstractMap.SimpleImmutableEntry collectorWithCount = collectorsWithCountByMetricName.get(scopedMetricName); - final Integer count = collectorWithCount.getValue(); - final Collector collector = collectorWithCount.getKey(); - if (count == 1) { - try { - CollectorRegistry.defaultRegistry.unregister(collector); - } catch (Exception e) { - LOG.warn("There was a problem unregistering metric {}.", scopedMetricName, e); - } - collectorsWithCountByMetricName.remove(scopedMetricName); - } else { - collectorsWithCountByMetricName.put(scopedMetricName, new AbstractMap.SimpleImmutableEntry<>(collector, count - 1)); - } - } - } - - @SuppressWarnings("unchecked") - private static String getLogicalScope(MetricGroup group) { - return ((FrontMetricGroup>) group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR); - } - - @VisibleForTesting - static io.prometheus.client.Gauge.Child gaugeFrom(Gauge gauge) { - return new io.prometheus.client.Gauge.Child() { - @Override - public double get() { - final Object value = gauge.getValue(); - if (value == null) { - LOG.debug("Gauge {} is null-valued, defaulting to 0.", gauge); - return 0; - } - if (value instanceof Double) { - return (double) value; - } - if (value instanceof Number) { - return ((Number) value).doubleValue(); - } - if (value instanceof Boolean) { - return ((Boolean) value) ? 1 : 0; - } - LOG.debug("Invalid type for Gauge {}: {}, only number types and booleans are supported by this reporter.", - gauge, value.getClass().getName()); - return 0; - } - }; - } - - private static io.prometheus.client.Gauge.Child gaugeFrom(Counter counter) { - return new io.prometheus.client.Gauge.Child() { - @Override - public double get() { - return (double) counter.getCount(); - } - }; - } - - private static io.prometheus.client.Gauge.Child gaugeFrom(Meter meter) { - return new io.prometheus.client.Gauge.Child() { - @Override - public double get() { - return meter.getRate(); - } - }; - } - - @VisibleForTesting - static class HistogramSummaryProxy extends Collector { - static final List QUANTILES = Arrays.asList(.5, .75, .95, .98, .99, .999); - - private final String metricName; - private final String helpString; - private final List labelNamesWithQuantile; - - private final Map, Histogram> histogramsByLabelValues = new HashMap<>(); - - HistogramSummaryProxy(final Histogram histogram, final String metricName, final String helpString, final List labelNames, final List labelValues) { - this.metricName = metricName; - this.helpString = helpString; - this.labelNamesWithQuantile = addToList(labelNames, "quantile"); - histogramsByLabelValues.put(labelValues, histogram); - } - - @Override - public List collect() { - // We cannot use SummaryMetricFamily because it is impossible to get a sum of all values (at least for Dropwizard histograms, - // whose snapshot's values array only holds a sample of recent values). - - List samples = new LinkedList<>(); - for (Map.Entry, Histogram> labelValuesToHistogram : histogramsByLabelValues.entrySet()) { - addSamples(labelValuesToHistogram.getKey(), labelValuesToHistogram.getValue(), samples); - } - return Collections.singletonList(new MetricFamilySamples(metricName, Type.SUMMARY, helpString, samples)); - } - - void addChild(final Histogram histogram, final List labelValues) { - histogramsByLabelValues.put(labelValues, histogram); - } - - private void addSamples(final List labelValues, final Histogram histogram, final List samples) { - samples.add(new MetricFamilySamples.Sample(metricName + "_count", - labelNamesWithQuantile.subList(0, labelNamesWithQuantile.size() - 1), labelValues, histogram.getCount())); - for (final Double quantile : QUANTILES) { - samples.add(new MetricFamilySamples.Sample(metricName, labelNamesWithQuantile, - addToList(labelValues, quantile.toString()), - histogram.getStatistics().getQuantile(quantile))); - } - } + super.close(); } - private static List addToList(List list, String element) { - final List result = new ArrayList<>(list); - result.add(element); - return result; - } - - private static String[] toArray(List list) { - return list.toArray(new String[list.size()]); - } } diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java index e9fd985e4ac2c..1b551703fde01 100644 --- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java +++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java @@ -178,7 +178,7 @@ public void invalidCharactersAreReplacedWithUnderscore() { @Test public void doubleGaugeIsConvertedCorrectly() { - assertThat(PrometheusReporter.gaugeFrom(new Gauge() { + assertThat(reporter.gaugeFrom(new Gauge() { @Override public Double getValue() { return 3.14; @@ -188,7 +188,7 @@ public Double getValue() { @Test public void shortGaugeIsConvertedCorrectly() { - assertThat(PrometheusReporter.gaugeFrom(new Gauge() { + assertThat(reporter.gaugeFrom(new Gauge() { @Override public Short getValue() { return 13; @@ -198,7 +198,7 @@ public Short getValue() { @Test public void booleanGaugeIsConvertedCorrectly() { - assertThat(PrometheusReporter.gaugeFrom(new Gauge() { + assertThat(reporter.gaugeFrom(new Gauge() { @Override public Boolean getValue() { return true; @@ -211,7 +211,7 @@ public Boolean getValue() { */ @Test public void stringGaugeCannotBeConverted() { - assertThat(PrometheusReporter.gaugeFrom(new Gauge() { + assertThat(reporter.gaugeFrom(new Gauge() { @Override public String getValue() { return "I am not a number"; From 96a4bf7cd294bdfa079f3aae47114bbb013f52ac Mon Sep 17 00:00:00 2001 From: lamber-ken Date: Wed, 27 Jun 2018 15:57:59 +0800 Subject: [PATCH 2/3] short-circuit logic should be used in boolean contexts --- .../flink/api/java/typeutils/runtime/PojoComparator.java | 2 +- .../src/main/java/org/apache/flink/types/StringValue.java | 2 +- .../java/org/apache/flink/optimizer/dag/TwoInputNode.java | 6 +++--- .../optimizer/operators/CartesianProductDescriptor.java | 2 +- .../flink/runtime/instance/SlotSharingGroupAssignment.java | 2 +- .../runtime/operators/hash/ReOpenableMutableHashTable.java | 2 +- .../flink/runtime/operators/sort/NormalizedKeySorter.java | 2 +- .../flink/runtime/operators/sort/UnilateralSortMerger.java | 2 +- 8 files changed, 10 insertions(+), 10 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java index ece790e5e9efa..227848916c210 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java @@ -286,7 +286,7 @@ public boolean isNormalizedKeyPrefixOnly(int keyBytes) { @Override public void putNormalizedKey(T value, MemorySegment target, int offset, int numBytes) { int i = 0; - for (; i < this.numLeadingNormalizableKeys & numBytes > 0; i++) + for (; i < this.numLeadingNormalizableKeys && numBytes > 0; i++) { int len = this.normalizedKeyLengths[i]; len = numBytes >= len ? len : numBytes; diff --git a/flink-core/src/main/java/org/apache/flink/types/StringValue.java b/flink-core/src/main/java/org/apache/flink/types/StringValue.java index feeec2a514ca7..1d1e80de97a79 100644 --- a/flink-core/src/main/java/org/apache/flink/types/StringValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/StringValue.java @@ -242,7 +242,7 @@ public void setValueAscii(byte[] bytes, int offset, int len) { if (bytes == null) { throw new NullPointerException("Bytes must not be null"); } - if (len < 0 | offset < 0 | offset > bytes.length - len) { + if (len < 0 || offset < 0 || offset > bytes.length - len) { throw new IndexOutOfBoundsException(); } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java index 48815dc9a1c3b..e7c02b029376a 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java @@ -685,15 +685,15 @@ protected void placePipelineBreakersIfNecessary(DriverStrategy strategy, Channel } // okay combinations are both all dam or both no dam - if ( (damOnAllLeftPaths & damOnAllRightPaths) | (!someDamOnLeftPaths & !someDamOnRightPaths) ) { + if ( (damOnAllLeftPaths && damOnAllRightPaths) || (!someDamOnLeftPaths && !someDamOnRightPaths) ) { // good, either both materialize already on the way, or both fully pipeline } else { - if (someDamOnLeftPaths & !damOnAllRightPaths) { + if (someDamOnLeftPaths && !damOnAllRightPaths) { // right needs a pipeline breaker in2.setTempMode(in2.getTempMode().makePipelineBreaker()); } - if (someDamOnRightPaths & !damOnAllLeftPaths) { + if (someDamOnRightPaths && !damOnAllLeftPaths) { // right needs a pipeline breaker in1.setTempMode(in1.getTempMode().makePipelineBreaker()); } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java index d50c9b493337d..b55217367c06c 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java @@ -40,7 +40,7 @@ public abstract class CartesianProductDescriptor extends OperatorDescriptorDual protected CartesianProductDescriptor(boolean allowBroadcastFirst, boolean allowBroadcastSecond) { - if (!(allowBroadcastFirst | allowBroadcastSecond)) { + if (!(allowBroadcastFirst || allowBroadcastSecond)) { throw new IllegalArgumentException(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java index e61ba587e5f7d..562c3c77a499a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java @@ -540,7 +540,7 @@ void releaseSharedSlot(SharedSlot sharedSlot) { */ private void internalDisposeEmptySharedSlot(SharedSlot sharedSlot) { // sanity check - if (sharedSlot.isAlive() | !sharedSlot.getSubSlots().isEmpty()) { + if (sharedSlot.isAlive() || !sharedSlot.getSubSlots().isEmpty()) { throw new IllegalArgumentException(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java index b9ddff8d6a092..e20222ea7527a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java @@ -166,7 +166,7 @@ protected boolean prepareNextPartition() throws IOException { @Override protected void releaseTable() { - if(furtherPartitioning | this.currentRecursionDepth > 0) { + if(furtherPartitioning || this.currentRecursionDepth > 0) { super.releaseTable(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java index 0fd6f388f4cf5..512e73e3ea736 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java @@ -310,7 +310,7 @@ public boolean write(T record) throws IOException { // ------------------------------------------------------------------------ private long readPointer(int logicalPosition) { - if (logicalPosition < 0 | logicalPosition >= this.numRecords) { + if (logicalPosition < 0 || logicalPosition >= this.numRecords) { throw new IndexOutOfBoundsException(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java index f70be290e2ff5..934eeb7c2b41a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java @@ -216,7 +216,7 @@ protected UnilateralSortMerger(MemoryManager memoryManager, List throws IOException { // sanity checks - if (memoryManager == null | (ioManager == null && !noSpillingMemory) | serializerFactory == null | comparator == null) { + if (memoryManager == null || (ioManager == null && !noSpillingMemory) || serializerFactory == null || comparator == null) { throw new NullPointerException(); } if (parentTask == null) { From 85e962e252bea2959419e0de4fdeaa474ef2b506 Mon Sep 17 00:00:00 2001 From: lamber-ken Date: Wed, 27 Jun 2018 16:10:07 +0800 Subject: [PATCH 3/3] remove prometheus pushgateway --- docs/monitoring/metrics.md | 25 -- .../flink-metrics-prometheus/pom.xml | 6 - .../AbstractPrometheusReporter.java | 279 ------------------ .../PrometheusPushGatewayReporter.java | 94 ------ .../prometheus/PrometheusReporter.java | 243 ++++++++++++++- .../prometheus/PrometheusReporterTest.java | 8 +- 6 files changed, 242 insertions(+), 413 deletions(-) delete mode 100644 flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java delete mode 100644 flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index cfcf379529d9e..49d7ba8d89d39 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -700,31 +700,6 @@ Flink metric types are mapped to Prometheus metric types as follows: All Flink metrics variables (see [List of all Variables](#list-of-all-variables)) are exported to Prometheus as labels. -### PrometheusPushGateway (org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter) - -In order to use this reporter you must copy `/opt/flink-metrics-prometheus-{{site.version}}.jar` into the `/lib` folder -of your Flink distribution. - -Parameters: - -- `host` - the PushGateway server host -- `port` - the PushGateway server port -- `prefix` - (optional) the prefix is used to compose the jobName, defaults to `flink`. The jobName is used to distinguish different flink clusters - -Example configuration: - -{% highlight yaml %} - -metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter -metrics.reporter.promgateway.host: localhost -metrics.reporter.promgateway.port: 9091 -metrics.reporter.promgateway.prefix: flink - -{% endhighlight %} - -PrometheusPushGatewayReporter push metrics to a [Pushgateway](https://github.com/prometheus/pushgateway). The Pushgateway then exposes -these metrics to Prometheus. The working mechanism is different from PrometheusReporter (see [PrometheusReporter](#prometheus-orgapacheflinkmetricsprometheusprometheusreporter)). - ### StatsD (org.apache.flink.metrics.statsd.StatsDReporter) In order to use this reporter you must copy `/opt/flink-metrics-statsd-{{site.version}}.jar` into the `/lib` folder diff --git a/flink-metrics/flink-metrics-prometheus/pom.xml b/flink-metrics/flink-metrics-prometheus/pom.xml index 9aad69c16bedc..b0cad841ad00b 100644 --- a/flink-metrics/flink-metrics-prometheus/pom.xml +++ b/flink-metrics/flink-metrics-prometheus/pom.xml @@ -73,12 +73,6 @@ under the License. ${prometheus.version} - - io.prometheus - simpleclient_pushgateway - ${prometheus.version} - - diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java deleted file mode 100644 index 34ac0470e7699..0000000000000 --- a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java +++ /dev/null @@ -1,279 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.prometheus; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.metrics.CharacterFilter; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.Histogram; -import org.apache.flink.metrics.Meter; -import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.reporter.MetricReporter; -import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; -import org.apache.flink.runtime.metrics.groups.FrontMetricGroup; - -import io.prometheus.client.Collector; -import io.prometheus.client.CollectorRegistry; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.AbstractMap; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.regex.Pattern; - - -/** - * base prometheus reporter for prometheus metrics. - */ -@PublicEvolving -public abstract class AbstractPrometheusReporter implements MetricReporter { - - private final Logger log = LoggerFactory.getLogger(getClass()); - - private static final Pattern UNALLOWED_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9:_]"); - private static final CharacterFilter CHARACTER_FILTER = new CharacterFilter() { - @Override - public String filterCharacters(String input) { - return replaceInvalidChars(input); - } - }; - - private static final char SCOPE_SEPARATOR = '_'; - private static final String SCOPE_PREFIX = "flink" + SCOPE_SEPARATOR; - - private final Map> collectorsWithCountByMetricName = new HashMap<>(); - - @VisibleForTesting - static String replaceInvalidChars(final String input) { - // https://prometheus.io/docs/instrumenting/writing_exporters/ - // Only [a-zA-Z0-9:_] are valid in metric names, any other characters should be sanitized to an underscore. - return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_"); - } - - @Override - public void close() { - CollectorRegistry.defaultRegistry.clear(); - } - - @Override - public void notifyOfAddedMetric(final Metric metric, final String metricName, final MetricGroup group) { - - List dimensionKeys = new LinkedList<>(); - List dimensionValues = new LinkedList<>(); - for (final Map.Entry dimension : group.getAllVariables().entrySet()) { - final String key = dimension.getKey(); - dimensionKeys.add(CHARACTER_FILTER.filterCharacters(key.substring(1, key.length() - 1))); - dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue())); - } - - final String scopedMetricName = getScopedName(metricName, group); - final String helpString = metricName + " (scope: " + getLogicalScope(group) + ")"; - - final Collector collector; - Integer count = 0; - - synchronized (this) { - if (collectorsWithCountByMetricName.containsKey(scopedMetricName)) { - final AbstractMap.SimpleImmutableEntry collectorWithCount = collectorsWithCountByMetricName.get(scopedMetricName); - collector = collectorWithCount.getKey(); - count = collectorWithCount.getValue(); - } else { - collector = createCollector(metric, dimensionKeys, dimensionValues, scopedMetricName, helpString); - try { - collector.register(); - } catch (Exception e) { - log.warn("There was a problem registering metric {}.", metricName, e); - } - } - addMetric(metric, dimensionValues, collector); - collectorsWithCountByMetricName.put(scopedMetricName, new AbstractMap.SimpleImmutableEntry<>(collector, count + 1)); - } - } - - @Override - public void notifyOfRemovedMetric(final Metric metric, final String metricName, final MetricGroup group) { - final String scopedMetricName = getScopedName(metricName, group); - synchronized (this) { - final AbstractMap.SimpleImmutableEntry collectorWithCount = collectorsWithCountByMetricName.get(scopedMetricName); - final Integer count = collectorWithCount.getValue(); - final Collector collector = collectorWithCount.getKey(); - if (count == 1) { - try { - CollectorRegistry.defaultRegistry.unregister(collector); - } catch (Exception e) { - log.warn("There was a problem unregistering metric {}.", scopedMetricName, e); - } - collectorsWithCountByMetricName.remove(scopedMetricName); - } else { - collectorsWithCountByMetricName.put(scopedMetricName, new AbstractMap.SimpleImmutableEntry<>(collector, count - 1)); - } - } - } - - private static String getScopedName(String metricName, MetricGroup group) { - return SCOPE_PREFIX + getLogicalScope(group) + SCOPE_SEPARATOR + CHARACTER_FILTER.filterCharacters(metricName); - } - - private Collector createCollector(Metric metric, List dimensionKeys, List dimensionValues, String scopedMetricName, String helpString) { - Collector collector; - if (metric instanceof Gauge || metric instanceof Counter || metric instanceof Meter) { - collector = io.prometheus.client.Gauge - .build() - .name(scopedMetricName) - .help(helpString) - .labelNames(toArray(dimensionKeys)) - .create(); - } else if (metric instanceof Histogram) { - collector = new HistogramSummaryProxy((Histogram) metric, scopedMetricName, helpString, dimensionKeys, dimensionValues); - } else { - log.warn("Cannot create collector for unknown metric type: {}. This indicates that the metric type is not supported by this reporter.", - metric.getClass().getName()); - collector = null; - } - return collector; - } - - private void addMetric(Metric metric, List dimensionValues, Collector collector) { - if (metric instanceof Gauge) { - ((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Gauge) metric), toArray(dimensionValues)); - } else if (metric instanceof Counter) { - ((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Counter) metric), toArray(dimensionValues)); - } else if (metric instanceof Meter) { - ((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Meter) metric), toArray(dimensionValues)); - } else if (metric instanceof Histogram) { - ((HistogramSummaryProxy) collector).addChild((Histogram) metric, dimensionValues); - } else { - log.warn("Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.", - metric.getClass().getName()); - } - } - - @SuppressWarnings("unchecked") - private static String getLogicalScope(MetricGroup group) { - return ((FrontMetricGroup>) group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR); - } - - @VisibleForTesting - io.prometheus.client.Gauge.Child gaugeFrom(Gauge gauge) { - return new io.prometheus.client.Gauge.Child() { - @Override - public double get() { - final Object value = gauge.getValue(); - if (value == null) { - log.debug("Gauge {} is null-valued, defaulting to 0.", gauge); - return 0; - } - if (value instanceof Double) { - return (double) value; - } - if (value instanceof Number) { - return ((Number) value).doubleValue(); - } - if (value instanceof Boolean) { - return ((Boolean) value) ? 1 : 0; - } - log.debug("Invalid type for Gauge {}: {}, only number types and booleans are supported by this reporter.", - gauge, value.getClass().getName()); - return 0; - } - }; - } - - private static io.prometheus.client.Gauge.Child gaugeFrom(Counter counter) { - return new io.prometheus.client.Gauge.Child() { - @Override - public double get() { - return (double) counter.getCount(); - } - }; - } - - private static io.prometheus.client.Gauge.Child gaugeFrom(Meter meter) { - return new io.prometheus.client.Gauge.Child() { - @Override - public double get() { - return meter.getRate(); - } - }; - } - - @VisibleForTesting - static class HistogramSummaryProxy extends Collector { - static final List QUANTILES = Arrays.asList(.5, .75, .95, .98, .99, .999); - - private final String metricName; - private final String helpString; - private final List labelNamesWithQuantile; - - private final Map, Histogram> histogramsByLabelValues = new HashMap<>(); - - HistogramSummaryProxy(final Histogram histogram, final String metricName, final String helpString, final List labelNames, final List labelValues) { - this.metricName = metricName; - this.helpString = helpString; - this.labelNamesWithQuantile = addToList(labelNames, "quantile"); - histogramsByLabelValues.put(labelValues, histogram); - } - - @Override - public List collect() { - // We cannot use SummaryMetricFamily because it is impossible to get a sum of all values (at least for Dropwizard histograms, - // whose snapshot's values array only holds a sample of recent values). - - List samples = new LinkedList<>(); - for (Map.Entry, Histogram> labelValuesToHistogram : histogramsByLabelValues.entrySet()) { - addSamples(labelValuesToHistogram.getKey(), labelValuesToHistogram.getValue(), samples); - } - return Collections.singletonList(new MetricFamilySamples(metricName, Type.SUMMARY, helpString, samples)); - } - - void addChild(final Histogram histogram, final List labelValues) { - histogramsByLabelValues.put(labelValues, histogram); - } - - private void addSamples(final List labelValues, final Histogram histogram, final List samples) { - samples.add(new MetricFamilySamples.Sample(metricName + "_count", - labelNamesWithQuantile.subList(0, labelNamesWithQuantile.size() - 1), labelValues, histogram.getCount())); - for (final Double quantile : QUANTILES) { - samples.add(new MetricFamilySamples.Sample(metricName, labelNamesWithQuantile, - addToList(labelValues, quantile.toString()), - histogram.getStatistics().getQuantile(quantile))); - } - } - } - - private static List addToList(List list, String element) { - final List result = new ArrayList<>(list); - result.add(element); - return result; - } - - private static String[] toArray(List list) { - return list.toArray(new String[list.size()]); - } - -} diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java deleted file mode 100644 index 774fd7fbd9b5d..0000000000000 --- a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.prometheus; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.MetricConfig; -import org.apache.flink.metrics.reporter.MetricReporter; -import org.apache.flink.metrics.reporter.Scheduled; -import org.apache.flink.util.AbstractID; - -import io.prometheus.client.CollectorRegistry; -import io.prometheus.client.exporter.PushGateway; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * /** - * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway. - */ -@PublicEvolving -public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled { - private final Logger log = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class); - - public static final String ARG_HOST = "host"; - public static final String ARG_PORT = "port"; - public static final String ARG_JOB_NAME_PREFIX = "prefix"; - - public static final char JOB_NAME_SEPARATOR = '-'; - public static final String DEFAULT_JOB_NAME_PREFIX = "flink"; - - private PushGateway pushGateway; - private String jobName; - - @Override - public void open(MetricConfig config) { - - // reporter configs - String host = config.getString(ARG_HOST, null); - int port = config.getInteger(ARG_PORT, -1); - String jobNamePrefix = config.getString(ARG_JOB_NAME_PREFIX, DEFAULT_JOB_NAME_PREFIX); - - // host port - if (host == null || host.length() == 0 || port < 1) { - throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port); - } - - // jobname - String random = new AbstractID().toString(); - jobName = jobNamePrefix + JOB_NAME_SEPARATOR + random; - - pushGateway = new PushGateway(host + ":" + port); - log.info("Configured PrometheusPushGatewayReporter with {host:{}, port:{}, jobName:{}}", host, port, jobName); - } - - @Override - public void report() { - try { - pushGateway.push(CollectorRegistry.defaultRegistry, jobName); - } catch (Exception e) { - log.warn("Failed reporting metrics to Prometheus.", e); - } - } - - @Override - public void close() { - if (pushGateway != null) { - try { - pushGateway.delete(jobName); - } catch (IOException e) { - log.warn("Failed to delete the job of Pushgateway", e); - } - } - super.close(); - } -} diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java index d3483b2dfe94a..48fd8a42b53cc 100644 --- a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java +++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java @@ -20,31 +20,62 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; +import org.apache.flink.runtime.metrics.groups.FrontMetricGroup; import org.apache.flink.util.NetUtils; import org.apache.flink.util.Preconditions; +import io.prometheus.client.Collector; +import io.prometheus.client.CollectorRegistry; import io.prometheus.client.exporter.HTTPServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; /** * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus. */ @PublicEvolving -public class PrometheusReporter extends AbstractPrometheusReporter { - private final Logger log = LoggerFactory.getLogger(PrometheusReporter.class); +public class PrometheusReporter implements MetricReporter { + private static final Logger LOG = LoggerFactory.getLogger(PrometheusReporter.class); static final String ARG_PORT = "port"; private static final String DEFAULT_PORT = "9249"; + private static final Pattern UNALLOWED_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9:_]"); + private static final CharacterFilter CHARACTER_FILTER = new CharacterFilter() { + @Override + public String filterCharacters(String input) { + return replaceInvalidChars(input); + } + }; + + private static final char SCOPE_SEPARATOR = '_'; + private static final String SCOPE_PREFIX = "flink" + SCOPE_SEPARATOR; + private HTTPServer httpServer; private int port; + private final Map> collectorsWithCountByMetricName = new HashMap<>(); @VisibleForTesting int getPort() { @@ -52,6 +83,13 @@ int getPort() { return port; } + @VisibleForTesting + static String replaceInvalidChars(final String input) { + // https://prometheus.io/docs/instrumenting/writing_exporters/ + // Only [a-zA-Z0-9:_] are valid in metric names, any other characters should be sanitized to an underscore. + return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_"); + } + @Override public void open(MetricConfig config) { String portsConfig = config.getString(ARG_PORT, DEFAULT_PORT); @@ -63,10 +101,10 @@ public void open(MetricConfig config) { // internally accesses CollectorRegistry.defaultRegistry httpServer = new HTTPServer(port); this.port = port; - log.info("Started PrometheusReporter HTTP server on port {}.", port); + LOG.info("Started PrometheusReporter HTTP server on port {}.", port); break; } catch (IOException ioe) { //assume port conflict - log.debug("Could not start PrometheusReporter HTTP server on port {}.", port, ioe); + LOG.debug("Could not start PrometheusReporter HTTP server on port {}.", port, ioe); } } if (httpServer == null) { @@ -79,7 +117,202 @@ public void close() { if (httpServer != null) { httpServer.stop(); } - super.close(); + CollectorRegistry.defaultRegistry.clear(); + } + + @Override + public void notifyOfAddedMetric(final Metric metric, final String metricName, final MetricGroup group) { + + List dimensionKeys = new LinkedList<>(); + List dimensionValues = new LinkedList<>(); + for (final Map.Entry dimension : group.getAllVariables().entrySet()) { + final String key = dimension.getKey(); + dimensionKeys.add(CHARACTER_FILTER.filterCharacters(key.substring(1, key.length() - 1))); + dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue())); + } + + final String scopedMetricName = getScopedName(metricName, group); + final String helpString = metricName + " (scope: " + getLogicalScope(group) + ")"; + + final Collector collector; + Integer count = 0; + + synchronized (this) { + if (collectorsWithCountByMetricName.containsKey(scopedMetricName)) { + final AbstractMap.SimpleImmutableEntry collectorWithCount = collectorsWithCountByMetricName.get(scopedMetricName); + collector = collectorWithCount.getKey(); + count = collectorWithCount.getValue(); + } else { + collector = createCollector(metric, dimensionKeys, dimensionValues, scopedMetricName, helpString); + try { + collector.register(); + } catch (Exception e) { + LOG.warn("There was a problem registering metric {}.", metricName, e); + } + } + addMetric(metric, dimensionValues, collector); + collectorsWithCountByMetricName.put(scopedMetricName, new AbstractMap.SimpleImmutableEntry<>(collector, count + 1)); + } + } + + private static String getScopedName(String metricName, MetricGroup group) { + return SCOPE_PREFIX + getLogicalScope(group) + SCOPE_SEPARATOR + CHARACTER_FILTER.filterCharacters(metricName); + } + + private static Collector createCollector(Metric metric, List dimensionKeys, List dimensionValues, String scopedMetricName, String helpString) { + Collector collector; + if (metric instanceof Gauge || metric instanceof Counter || metric instanceof Meter) { + collector = io.prometheus.client.Gauge + .build() + .name(scopedMetricName) + .help(helpString) + .labelNames(toArray(dimensionKeys)) + .create(); + } else if (metric instanceof Histogram) { + collector = new HistogramSummaryProxy((Histogram) metric, scopedMetricName, helpString, dimensionKeys, dimensionValues); + } else { + LOG.warn("Cannot create collector for unknown metric type: {}. This indicates that the metric type is not supported by this reporter.", + metric.getClass().getName()); + collector = null; + } + return collector; + } + + private static void addMetric(Metric metric, List dimensionValues, Collector collector) { + if (metric instanceof Gauge) { + ((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Gauge) metric), toArray(dimensionValues)); + } else if (metric instanceof Counter) { + ((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Counter) metric), toArray(dimensionValues)); + } else if (metric instanceof Meter) { + ((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Meter) metric), toArray(dimensionValues)); + } else if (metric instanceof Histogram) { + ((HistogramSummaryProxy) collector).addChild((Histogram) metric, dimensionValues); + } else { + LOG.warn("Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.", + metric.getClass().getName()); + } + } + + @Override + public void notifyOfRemovedMetric(final Metric metric, final String metricName, final MetricGroup group) { + final String scopedMetricName = getScopedName(metricName, group); + synchronized (this) { + final AbstractMap.SimpleImmutableEntry collectorWithCount = collectorsWithCountByMetricName.get(scopedMetricName); + final Integer count = collectorWithCount.getValue(); + final Collector collector = collectorWithCount.getKey(); + if (count == 1) { + try { + CollectorRegistry.defaultRegistry.unregister(collector); + } catch (Exception e) { + LOG.warn("There was a problem unregistering metric {}.", scopedMetricName, e); + } + collectorsWithCountByMetricName.remove(scopedMetricName); + } else { + collectorsWithCountByMetricName.put(scopedMetricName, new AbstractMap.SimpleImmutableEntry<>(collector, count - 1)); + } + } + } + + @SuppressWarnings("unchecked") + private static String getLogicalScope(MetricGroup group) { + return ((FrontMetricGroup>) group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR); + } + + @VisibleForTesting + static io.prometheus.client.Gauge.Child gaugeFrom(Gauge gauge) { + return new io.prometheus.client.Gauge.Child() { + @Override + public double get() { + final Object value = gauge.getValue(); + if (value == null) { + LOG.debug("Gauge {} is null-valued, defaulting to 0.", gauge); + return 0; + } + if (value instanceof Double) { + return (double) value; + } + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + if (value instanceof Boolean) { + return ((Boolean) value) ? 1 : 0; + } + LOG.debug("Invalid type for Gauge {}: {}, only number types and booleans are supported by this reporter.", + gauge, value.getClass().getName()); + return 0; + } + }; + } + + private static io.prometheus.client.Gauge.Child gaugeFrom(Counter counter) { + return new io.prometheus.client.Gauge.Child() { + @Override + public double get() { + return (double) counter.getCount(); + } + }; + } + + private static io.prometheus.client.Gauge.Child gaugeFrom(Meter meter) { + return new io.prometheus.client.Gauge.Child() { + @Override + public double get() { + return meter.getRate(); + } + }; + } + + @VisibleForTesting + static class HistogramSummaryProxy extends Collector { + static final List QUANTILES = Arrays.asList(.5, .75, .95, .98, .99, .999); + + private final String metricName; + private final String helpString; + private final List labelNamesWithQuantile; + + private final Map, Histogram> histogramsByLabelValues = new HashMap<>(); + + HistogramSummaryProxy(final Histogram histogram, final String metricName, final String helpString, final List labelNames, final List labelValues) { + this.metricName = metricName; + this.helpString = helpString; + this.labelNamesWithQuantile = addToList(labelNames, "quantile"); + histogramsByLabelValues.put(labelValues, histogram); + } + + @Override + public List collect() { + // We cannot use SummaryMetricFamily because it is impossible to get a sum of all values (at least for Dropwizard histograms, + // whose snapshot's values array only holds a sample of recent values). + + List samples = new LinkedList<>(); + for (Map.Entry, Histogram> labelValuesToHistogram : histogramsByLabelValues.entrySet()) { + addSamples(labelValuesToHistogram.getKey(), labelValuesToHistogram.getValue(), samples); + } + return Collections.singletonList(new MetricFamilySamples(metricName, Type.SUMMARY, helpString, samples)); + } + + void addChild(final Histogram histogram, final List labelValues) { + histogramsByLabelValues.put(labelValues, histogram); + } + + private void addSamples(final List labelValues, final Histogram histogram, final List samples) { + samples.add(new MetricFamilySamples.Sample(metricName + "_count", + labelNamesWithQuantile.subList(0, labelNamesWithQuantile.size() - 1), labelValues, histogram.getCount())); + for (final Double quantile : QUANTILES) { + samples.add(new MetricFamilySamples.Sample(metricName, labelNamesWithQuantile, + addToList(labelValues, quantile.toString()), + histogram.getStatistics().getQuantile(quantile))); + } + } } + private static List addToList(List list, String element) { + final List result = new ArrayList<>(list); + result.add(element); + return result; + } + + private static String[] toArray(List list) { + return list.toArray(new String[list.size()]); + } } diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java index 1b551703fde01..e9fd985e4ac2c 100644 --- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java +++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java @@ -178,7 +178,7 @@ public void invalidCharactersAreReplacedWithUnderscore() { @Test public void doubleGaugeIsConvertedCorrectly() { - assertThat(reporter.gaugeFrom(new Gauge() { + assertThat(PrometheusReporter.gaugeFrom(new Gauge() { @Override public Double getValue() { return 3.14; @@ -188,7 +188,7 @@ public Double getValue() { @Test public void shortGaugeIsConvertedCorrectly() { - assertThat(reporter.gaugeFrom(new Gauge() { + assertThat(PrometheusReporter.gaugeFrom(new Gauge() { @Override public Short getValue() { return 13; @@ -198,7 +198,7 @@ public Short getValue() { @Test public void booleanGaugeIsConvertedCorrectly() { - assertThat(reporter.gaugeFrom(new Gauge() { + assertThat(PrometheusReporter.gaugeFrom(new Gauge() { @Override public Boolean getValue() { return true; @@ -211,7 +211,7 @@ public Boolean getValue() { */ @Test public void stringGaugeCannotBeConverted() { - assertThat(reporter.gaugeFrom(new Gauge() { + assertThat(PrometheusReporter.gaugeFrom(new Gauge() { @Override public String getValue() { return "I am not a number";