From 68a0a8b1b822d9c69f74fe8208b7a798a486536d Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Mon, 18 Jul 2016 17:46:21 +0200 Subject: [PATCH 1/3] [FLINK-4229] Do not start any Metrics Reporter by default --- docs/apis/metrics.md | 10 +++---- .../apache/flink/metrics/MetricRegistry.java | 27 ++++--------------- 2 files changed, 10 insertions(+), 27 deletions(-) diff --git a/docs/apis/metrics.md b/docs/apis/metrics.md index 329e44564bba4..76b05ac5d23fe 100644 --- a/docs/apis/metrics.md +++ b/docs/apis/metrics.md @@ -239,12 +239,12 @@ Metrics can be exposed to an external system by configuring a reporter in `conf/ You can write your own `Reporter` by implementing the `org.apache.flink.metrics.reporter.MetricReporter` interface. If the Reporter should send out reports regularly you have to implement the `Scheduled` interface as well. -By default Flink uses JMX to expose metrics. -All non-JMXReporters are not part of the distribution. To use them you have to copy the respective fat jar to the `/lib` folder. - The following sections list the supported reporters. -### JMX +### JMX (org.apache.flink.metrics.reporter.JMXReporter) + +You don't have to include an additional dependency since the JMX reporter is available by default +but not activated. The port for JMX can be configured by setting the `metrics.jmx.port` key. This parameter expects either a single port or a port range, with the default being 9010-9025. The used port is shown in the relevant job or task manager log. @@ -262,7 +262,7 @@ Dependency: Parameters: - `host` - the gmond host address configured under `udp_recv_channel.bind` in `gmond.conf` -- `port` - the gmond port configured under `udp_recv_channel.port` in `gmond.conf` +- `port` - the gmond port configured under `udp_recv_channel.port` in `gmond.conf` - `tmax` - soft limit for how long an old metric should be retained - `dmax` - hard limit for how long an old metric should be retained - `ttl` - time-to-live for transmitted UDP packets diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java index a9d7324244ec2..23a53f20bd762 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java @@ -79,9 +79,9 @@ public MetricRegistry(Configuration config) { final String className = config.getString(ConfigConstants.METRICS_REPORTER_CLASS, null); if (className == null) { - // by default, create JMX metrics - LOG.info("No metrics reporter configured, exposing metrics via JMX"); - this.reporter = startJmxReporter(config); + // by default, don't report anything + LOG.info("No metrics reporter configured, no metrics will be exposed/reported."); + this.reporter = null; this.executor = null; } else { @@ -120,8 +120,8 @@ public MetricRegistry(Configuration config) { } catch (Throwable t) { shutdownExecutor(); - LOG.error("Could not instantiate custom metrics reporter. Defaulting to JMX metrics export.", t); - reporter = startJmxReporter(config); + LOG.error("Could not instantiate metrics reporter. No metrics will be exposed/reported.", t); + reporter = null; } this.reporter = reporter; @@ -133,23 +133,6 @@ public char getDelimiter() { return this.delimiter; } - private static JMXReporter startJmxReporter(Configuration config) { - JMXReporter reporter = null; - try { - Configuration reporterConfig = new Configuration(); - String portRange = config.getString(ConfigConstants.METRICS_JMX_PORT, null); - if (portRange != null) { - reporterConfig.setString(ConfigConstants.METRICS_JMX_PORT, portRange); - } - reporter = new JMXReporter(); - reporter.open(reporterConfig); - } catch (Exception e) { - LOG.error("Failed to instantiate JMX reporter.", e); - } finally { - return reporter; - } - } - /** * Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}. */ From e30ad8194a771d5af496a6b13086835aa10e0885 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Tue, 19 Jul 2016 12:02:24 +0200 Subject: [PATCH 2/3] Address comments --- .../org/apache/flink/configuration/ConfigConstants.java | 3 --- .../main/java/org/apache/flink/metrics/MetricRegistry.java | 1 - .../org/apache/flink/metrics/reporter/JMXReporter.java | 7 +++++-- .../org/apache/flink/metrics/reporter/JMXReporterTest.java | 5 +++-- .../flink/runtime/jobmanager/JobManagerMetricTest.java | 5 +++-- .../flink/streaming/connectors/kafka/KafkaTestBase.java | 1 + 6 files changed, 12 insertions(+), 10 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 2f24cdad73b3f..670e112974c09 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -649,9 +649,6 @@ public final class ConfigConstants { // ---------------------------- Metrics ----------------------------------- - /** The port range from which JMX will pick one to listen for incoming connections. */ - public static final String METRICS_JMX_PORT = "metrics.jmx.port"; - /** The class of the reporter to use. */ public static final String METRICS_REPORTER_CLASS = "metrics.reporter.class"; diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java index 23a53f20bd762..d9f9bdc0977be 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java @@ -24,7 +24,6 @@ import org.apache.flink.metrics.groups.AbstractMetricGroup; import org.apache.flink.metrics.groups.scope.ScopeFormat; import org.apache.flink.metrics.groups.scope.ScopeFormats; -import org.apache.flink.metrics.reporter.JMXReporter; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.Scheduled; diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java index 0e4aabd757d2d..17fbb91749fd4 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java @@ -19,7 +19,6 @@ package org.apache.flink.metrics.reporter; import org.apache.flink.annotation.Internal; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; @@ -62,6 +61,8 @@ public class JMXReporter implements MetricReporter { private static final String PREFIX = "org.apache.flink.metrics:"; private static final String KEY_PREFIX = "key"; + public static final String ARG_PORT = "port"; + private static final Logger LOG = LoggerFactory.getLogger(JMXReporter.class); // ------------------------------------------------------------------------ @@ -93,7 +94,9 @@ public void open(Configuration config) { } private static JMXServer startJmxServer(Configuration config) { - Iterator ports = NetUtils.getPortRangeFromString(config.getString(ConfigConstants.METRICS_JMX_PORT, "9010-9025")); + String portsConfig = config.getString(ARG_PORT, "9010-9025"); + + Iterator ports = NetUtils.getPortRangeFromString(portsConfig); JMXServer server = new JMXServer(); while (ports.hasNext()) { diff --git a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java index 99ee271fff9b8..f03ee11600f27 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java @@ -88,7 +88,7 @@ public void testPortConflictHandling() throws Exception { JMXReporter rep2 = new JMXReporter(); Configuration cfg1 = new Configuration(); - cfg1.setString(ConfigConstants.METRICS_JMX_PORT, "9020-9035"); + cfg1.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "-port 9020-9035"); rep1.open(cfg1); rep2.open(cfg1); @@ -137,7 +137,7 @@ public void testJMXAvailability() throws Exception { JMXReporter rep2 = new JMXReporter(); Configuration cfg1 = new Configuration(); - cfg1.setString(ConfigConstants.METRICS_JMX_PORT, "9040-9055"); + cfg1.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "-port 9040-9055"); rep1.open(cfg1); rep2.open(cfg1); @@ -197,6 +197,7 @@ public void testHistogramReporting() throws Exception { try { Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTER_CLASS, "org.apache.flink.metrics.reporter.JMXReporter"); registry = new MetricRegistry(config); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java index 2e338bc4755a1..56c320e3c85b3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java @@ -50,9 +50,10 @@ public class JobManagerMetricTest { public void testJobManagerMetricAccess() throws Exception { Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); Configuration flinkConfiguration = new Configuration(); - + + flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_CLASS, "org.apache.flink.metrics.reporter.JMXReporter"); flinkConfiguration.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, "jobmanager."); - flinkConfiguration.setString(ConfigConstants.METRICS_JMX_PORT, "9060-9075"); + flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "-port 9060-9075"); TestingCluster flink = new TestingCluster(flinkConfiguration); diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index 64b9106165b99..b8815da49672d 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -101,6 +101,7 @@ public static void prepare() throws IOException, ClassNotFoundException { flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); + flinkConfig.setString(ConfigConstants.METRICS_REPORTER_CLASS, "org.apache.flink.metrics.reporter.JMXReporter"); flink = new ForkableFlinkMiniCluster(flinkConfig, false); flink.start(); From 901f1e0db82b24edeacbba5e79900e5a20ee721d Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Wed, 20 Jul 2016 14:57:17 +0200 Subject: [PATCH 3/3] Address comments --- docs/apis/metrics.md | 7 +++++-- .../flink/runtime/jobmanager/JobManagerMetricTest.java | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/docs/apis/metrics.md b/docs/apis/metrics.md index 76b05ac5d23fe..f50f6ae364779 100644 --- a/docs/apis/metrics.md +++ b/docs/apis/metrics.md @@ -246,8 +246,11 @@ The following sections list the supported reporters. You don't have to include an additional dependency since the JMX reporter is available by default but not activated. -The port for JMX can be configured by setting the `metrics.jmx.port` key. This parameter expects either a single port -or a port range, with the default being 9010-9025. The used port is shown in the relevant job or task manager log. +Parameters: + +- `port` - the port on which JMX listens for connections. This can also be a port range. When a +range is specified the actual port is shown in the relevant job or task manager log. Default: +`9010-9025`. ### Ganglia (org.apache.flink.metrics.ganglia.GangliaReporter) Dependency: diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java index 56c320e3c85b3..1e6f01997034a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java @@ -53,7 +53,7 @@ public void testJobManagerMetricAccess() throws Exception { flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_CLASS, "org.apache.flink.metrics.reporter.JMXReporter"); flinkConfiguration.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, "jobmanager."); - flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "-port 9060-9075"); + flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "--port 9060-9075"); TestingCluster flink = new TestingCluster(flinkConfiguration);