From 18f754717333afc5969fcce0980126bdd5e1f5fc Mon Sep 17 00:00:00 2001 From: zentol Date: Fri, 17 Jun 2016 15:47:53 +0200 Subject: [PATCH 1/3] [FLINK-4087] [metrics] Improved JMX port handling --- LICENSE | 15 ++ .../apache/flink/metrics/MetricRegistry.java | 9 ++ .../flink/metrics/reporter/JMXReporter.java | 138 +++++++++++++++++- flink-dist/src/main/flink-bin/bin/config.sh | 6 - .../src/main/flink-bin/bin/flink-daemon.sh | 7 +- 5 files changed, 161 insertions(+), 14 deletions(-) diff --git a/LICENSE b/LICENSE index f97195fdd5aa1..c6cf28ecab638 100644 --- a/LICENSE +++ b/LICENSE @@ -303,3 +303,18 @@ Open Font License (OFT) - http://scripts.sil.org/OFL - Font Awesome (http://fortawesome.github.io/Font-Awesome/) - Created by Dave Gandy -> fonts in "flink-runtime-web/web-dashboard/assets/fonts" +----------------------------------------------------------------------- + The ISC License +----------------------------------------------------------------------- + + The Apache Flink project contains code under the ISC license from the following files: + - simplejmx (http://256.com/sources/simplejmx/) Copyright (c) - Gray Watson + +Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby +granted, provided that this permission notice appear in all copies. + +THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING +ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, +DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE +USE OR PERFORMANCE OF THIS SOFTWARE. diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java index b3422e1646e04..27f6ff6ff952d 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java @@ -44,6 +44,8 @@ public class MetricRegistry { // configuration keys // ------------------------------------------------------------------------ + public static final String KEY_METRICS_JMX_PORT = "metrics.jmx.port"; + public static final String KEY_METRICS_REPORTER_CLASS = "metrics.reporter.class"; public static final String KEY_METRICS_REPORTER_ARGUMENTS = "metrics.reporter.arguments"; public static final String KEY_METRICS_REPORTER_INTERVAL = "metrics.reporter.interval"; @@ -85,7 +87,14 @@ public MetricRegistry(Configuration config) { if (className == null) { // by default, create JMX metrics LOG.info("No metrics reporter configured, exposing metrics via JMX"); + + Configuration reporterConfig = new Configuration(); + String portRange = config.getString(KEY_METRICS_JMX_PORT, null); + if (portRange != null) { + reporterConfig.setString(KEY_METRICS_JMX_PORT, portRange); + } this.reporter = new JMXReporter(); + this.reporter.open(reporterConfig); this.timer = null; } else { diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java index db8116487f66b..3eae1cd5582d8 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java @@ -24,7 +24,6 @@ import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.groups.AbstractMetricGroup; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,10 +33,21 @@ import javax.management.MalformedObjectNameException; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; +import javax.management.remote.JMXConnectorServer; +import javax.management.remote.JMXConnectorServerFactory; +import javax.management.remote.JMXServiceURL; +import java.io.IOException; import java.lang.management.ManagementFactory; +import java.net.MalformedURLException; +import java.rmi.NoSuchObjectException; +import java.rmi.registry.LocateRegistry; +import java.rmi.registry.Registry; +import java.rmi.server.UnicastRemoteObject; import java.util.HashMap; import java.util.Map; +import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_JMX_PORT; + /** * {@link MetricReporter} that exports {@link Metric Metrics} via JMX. * @@ -60,6 +70,9 @@ public class JMXReporter implements MetricReporter { /** The names under which the registered metrics have been added to the MBeanServer */ private final Map registeredMetrics; + /** The server to which JMX clients connect to. ALlows for better control over port usage. */ + private JMXServer jmxServer; + /** * Creates a new JMXReporter */ @@ -73,10 +86,61 @@ public JMXReporter() { // ------------------------------------------------------------------------ @Override - public void open(Configuration config) {} + public void open(Configuration config) { + this.jmxServer = startJmxServer(config); + } + + private static JMXServer startJmxServer(Configuration config) { + JMXServer jmxServer; + + String portRange = config.getString(KEY_METRICS_JMX_PORT, "9010-9025"); + String[] ports = portRange.split("-"); + + if (ports.length == 0 || ports.length > 2) { + throw new IllegalArgumentException("JMX port range was configured incorrectly. " + + "Expected: [-] Configured: " + portRange); + } + + if (ports.length == 1) { //single port was configured + int port = Integer.parseInt(ports[0]); + jmxServer = new JMXServer(port); + try { + jmxServer.start(); + } catch (IOException e) { + throw new RuntimeException("Could not start JMX server on port " + port + "."); + } + return jmxServer; + } else { //port range was configured + int start = Integer.parseInt(ports[0]); + int end = Integer.parseInt(ports[1]); + while (true) { + try { + jmxServer = new JMXServer(start); + jmxServer.start(); + LOG.info("Starting JMX on port " + start + "."); + break; + } catch (IOException e) { //assume port conflict + LOG.debug("Could not start JMX server. Attempting different port", e); + start++; + if (start > end) { + throw new RuntimeException("Could not start JMX server.", e); + } + } + } + } + return jmxServer; + } @Override - public void close() {} + public void close() { + if (jmxServer != null) { + try { + jmxServer.stop(); + } catch (IOException e) { + LOG.error("Failed to stop JMX server.", e); + } + } + } // ------------------------------------------------------------------------ // adding / removing metrics @@ -265,4 +329,72 @@ public Object getValue() { return gauge.getValue(); } } + + /** + * JMX Server implementation that JMX clients can connect to. + * + * Heavily based on j256 simplejmx project + * + * https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java + */ + private static class JMXServer { + private int port; + private Registry rmiRegistry; + private JMXConnectorServer connector; + + public JMXServer(int port) { + this.port = port; + } + + public void start() throws IOException { + startRmiRegistry(); + startJmxService(); + } + + public void stop() throws IOException { + if (connector != null) { + try { + connector.stop(); + } finally { + connector = null; + } + } + if (rmiRegistry != null) { + try { + UnicastRemoteObject.unexportObject(rmiRegistry, true); + } catch (NoSuchObjectException e) { + throw new IOException("Could not unexport our RMI registry", e); + } finally { + rmiRegistry = null; + } + } + } + + private void startRmiRegistry() throws IOException { + if (rmiRegistry != null) { + return; + } + rmiRegistry = LocateRegistry.createRegistry(port); + } + + private void startJmxService() throws IOException { + if (connector != null) { + return; + } + String serverHost = "localhost"; + String registryHost = ""; + String serviceUrl = + "service:jmx:rmi://" + serverHost + ":" + port + "/jndi/rmi://" + registryHost + ":" + port + "/jmxrmi"; + JMXServiceURL url; + try { + url = new JMXServiceURL(serviceUrl); + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Malformed service url created " + serviceUrl, e); + } + + connector = JMXConnectorServerFactory.newJMXConnectorServer(url, null, ManagementFactory.getPlatformMBeanServer()); + + connector.start(); + } + } } diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh index b6bdbed805de7..ffbec07b71bab 100755 --- a/flink-dist/src/main/flink-bin/bin/config.sh +++ b/flink-dist/src/main/flink-bin/bin/config.sh @@ -103,8 +103,6 @@ KEY_ENV_SSH_OPTS="env.ssh.opts" KEY_RECOVERY_MODE="recovery.mode" KEY_ZK_HEAP_MB="zookeeper.heap.mb" -KEY_METRICS_JMX_PORT="metrics.jmx.port" - ######################################################################################################################## # PATHS AND CONFIG ######################################################################################################################## @@ -242,10 +240,6 @@ if [ -z "${RECOVERY_MODE}" ]; then RECOVERY_MODE=$(readFromConfig ${KEY_RECOVERY_MODE} "standalone" "${YAML_CONF}") fi -if [ -z "${JMX_PORT}" ]; then - JMX_PORT=$(readFromConfig ${KEY_METRICS_JMX_PORT} 9010 "${YAML_CONF}") -fi - # Arguments for the JVM. Used for job and task manager JVMs. # DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys # KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that! diff --git a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh index cc7163f254361..f6e0614ed00a7 100644 --- a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh +++ b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh @@ -33,14 +33,12 @@ bin=`cd "$bin"; pwd` case $DAEMON in (jobmanager) CLASS_TO_RUN=org.apache.flink.runtime.jobmanager.JobManager - if [ "${ARGS[3]}" == "local" ]; then - JMX_ARGS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=${JMX_PORT} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" - fi + JMX_ARGS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=0 -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" ;; (taskmanager) CLASS_TO_RUN=org.apache.flink.runtime.taskmanager.TaskManager - JMX_ARGS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=${JMX_PORT} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" + JMX_ARGS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=0 -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" ;; (zookeeper) @@ -101,7 +99,6 @@ case $STARTSTOP in count="${#active[@]}" if [ ${count} -gt 0 ]; then - JMX_ARGS="" echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME." fi fi From 3031670ef76bd199ff23e86a9ba40632410fccc0 Mon Sep 17 00:00:00 2001 From: zentol Date: Fri, 17 Jun 2016 16:45:29 +0200 Subject: [PATCH 2/3] fix tests not shutting down registry --- .../flink/metrics/groups/JobGroupTest.java | 3 +++ .../groups/MetricGroupRegistrationTest.java | 16 +++++++++++++--- .../flink/metrics/groups/MetricGroupTest.java | 16 +++++++++++++++- .../flink/metrics/groups/OperatorGroupTest.java | 2 ++ .../flink/metrics/groups/TaskGroupTest.java | 17 +++++++++++++++++ .../metrics/groups/TaskManagerGroupTest.java | 13 +++++++++++-- 6 files changed, 61 insertions(+), 6 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java index e820762e2fe17..4bcb1eee29d8d 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java @@ -45,6 +45,7 @@ public void testGenerateScopeDefault() { assertEquals( "theHostName.taskmanager.test-tm-id.myJobName", jmGroup.getScopeString()); + registry.shutdown(); } @Test @@ -66,6 +67,7 @@ public void testGenerateScopeCustom() { assertEquals( "some-constant.myJobName", jmGroup.getScopeString()); + registry.shutdown(); } @Test @@ -87,5 +89,6 @@ public void testGenerateScopeCustomWildcard() { assertEquals( "peter.test-tm-id.some-constant." + jid, jmGroup.getScopeString()); + registry.shutdown(); } } diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java index 5645b948dd524..7b35d91b66a1a 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java @@ -39,7 +39,9 @@ public void testMetricInstantiation() { Configuration config = new Configuration(); config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestReporter1.class.getName()); - MetricGroup root = new TaskManagerMetricGroup(new MetricRegistry(config), "host", "id"); + MetricRegistry registry = new MetricRegistry(config); + + MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id"); Counter counter = root.counter("counter"); assertEquals(counter, TestReporter1.lastPassedMetric); @@ -54,6 +56,8 @@ public Object getValue() { Assert.assertEquals(gauge, TestReporter1.lastPassedMetric); assertEquals("gauge", TestReporter1.lastPassedName); + + registry.shutdown(); } public static class TestReporter1 extends TestReporter { @@ -75,8 +79,12 @@ public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetric public void testInvalidMetricName() { Configuration config = new Configuration(); - MetricGroup root = new TaskManagerMetricGroup(new MetricRegistry(config), "host", "id"); + MetricRegistry registry = new MetricRegistry(config); + + MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id"); root.counter("=)(/!"); + + registry.shutdown(); } /** @@ -86,7 +94,9 @@ public void testInvalidMetricName() { public void testDuplicateGroupName() { Configuration config = new Configuration(); - MetricGroup root = new TaskManagerMetricGroup(new MetricRegistry(config), "host", "id"); + MetricRegistry registry = new MetricRegistry(config); + + MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id"); MetricGroup group1 = root.addGroup("group"); MetricGroup group2 = root.addGroup("group"); diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java index 2849bab779c5f..3f8a5770dfd7c 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java @@ -24,16 +24,30 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.MetricRegistry; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import static org.junit.Assert.*; public class MetricGroupTest { - private final MetricRegistry registry = new MetricRegistry(new Configuration()); + private MetricRegistry registry; private final MetricRegistry exceptionOnRegister = new ExceptionOnRegisterRegistry(); + @Before + public void createRegistry() { + this.registry = new MetricRegistry(new Configuration()); + } + + @After + public void shutdownRegistry() { + this.registry.shutdown(); + this.registry = null; + } + @Test public void sameGroupOnNameCollision() { GenericMetricGroup group = new GenericMetricGroup( diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java index cb5e0829b2ce7..c0c88428eefc1 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java @@ -47,5 +47,7 @@ public void testGenerateScopeDefault() { assertEquals( "theHostName.taskmanager.test-tm-id.myJobName.myOpName.11", opGroup.getScopeString()); + + registry.shutdown(); } } diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java index 4a492d2c0fe14..88f425b768d53 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java @@ -27,6 +27,8 @@ import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskScopeFormat; import org.apache.flink.util.AbstractID; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertArrayEquals; @@ -37,6 +39,18 @@ public class TaskGroupTest { // ------------------------------------------------------------------------ // scope tests // ------------------------------------------------------------------------ + private MetricRegistry registry; + + @Before + public void createRegistry() { + this.registry = new MetricRegistry(new Configuration()); + } + + @After + public void shutdownRegistry() { + this.registry.shutdown(); + this.registry = null; + } @Test public void testGenerateScopeDefault() { @@ -56,6 +70,7 @@ public void testGenerateScopeDefault() { assertEquals( "theHostName.taskmanager.test-tm-id.myJobName.aTaskName.13", taskGroup.getScopeString()); + registry.shutdown(); } @Test @@ -82,6 +97,7 @@ public void testGenerateScopeCustom() { assertEquals( String.format("test-tm-id.%s.%s.%s", jid, vertexId, executionId), taskGroup.getScopeString()); + registry.shutdown(); } @Test @@ -110,5 +126,6 @@ public void testGenerateScopeWilcard() { assertEquals( "theHostName.taskmanager.test-tm-id.myJobName." + executionId + ".13", taskGroup.getScopeString()); + registry.shutdown(); } } diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java index 9adc1be660902..9866b1b5d2d6c 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java @@ -36,8 +36,10 @@ public class TaskManagerGroupTest { @Test public void addAndRemoveJobs() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + final TaskManagerMetricGroup group = new TaskManagerMetricGroup( - new MetricRegistry(new Configuration()), "localhost", new AbstractID().toString()); + registry, "localhost", new AbstractID().toString()); final JobID jid1 = new JobID(); @@ -87,12 +89,15 @@ public void addAndRemoveJobs() { assertTrue(tmGroup13.parent().isClosed()); assertEquals(0, group.numRegisteredJobMetricGroups()); + + registry.shutdown(); } @Test public void testCloseClosesAll() { + MetricRegistry registry = new MetricRegistry(new Configuration()); final TaskManagerMetricGroup group = new TaskManagerMetricGroup( - new MetricRegistry(new Configuration()), "localhost", new AbstractID().toString()); + registry, "localhost", new AbstractID().toString()); final JobID jid1 = new JobID(); @@ -118,6 +123,8 @@ public void testCloseClosesAll() { assertTrue(tmGroup11.isClosed()); assertTrue(tmGroup12.isClosed()); assertTrue(tmGroup21.isClosed()); + + registry.shutdown(); } // ------------------------------------------------------------------------ @@ -131,6 +138,7 @@ public void testGenerateScopeDefault() { assertArrayEquals(new String[] { "localhost", "taskmanager", "id" }, group.getScopeComponents()); assertEquals("localhost.taskmanager.id", group.getScopeString()); + registry.shutdown(); } @Test @@ -141,5 +149,6 @@ public void testGenerateScopeCustom() { assertArrayEquals(new String[] { "constant", "host", "foo", "host" }, group.getScopeComponents()); assertEquals("constant.host.foo.host", group.getScopeString()); + registry.shutdown(); } } From 9efb818cddff4b32823f9530e5bbe0166ab514e7 Mon Sep 17 00:00:00 2001 From: zentol Date: Mon, 20 Jun 2016 12:02:13 +0200 Subject: [PATCH 3/3] resolve a few powermock issues --- .../flink/runtime/io/network/api/reader/BufferReaderTest.java | 2 ++ .../org/apache/flink/runtime/operators/DataSinkTaskTest.java | 2 ++ .../org/apache/flink/runtime/operators/DataSourceTaskTest.java | 2 ++ .../apache/flink/runtime/operators/chaining/ChainTaskTest.java | 2 ++ 4 files changed, 8 insertions(+) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java index 8519ac6eaa299..4a85934257e7c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.util.event.EventListener; import org.junit.Test; import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @@ -39,6 +40,7 @@ @RunWith(PowerMockRunner.class) @PrepareForTest(Task.class) +@PowerMockIgnore({"javax.management.*"}) @SuppressWarnings("unchecked") public class BufferReaderTest { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java index 386634ffed2fe..3f190a8d5b18a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java @@ -37,6 +37,7 @@ import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @@ -57,6 +58,7 @@ @RunWith(PowerMockRunner.class) @PrepareForTest({Task.class, ResultPartitionWriter.class}) +@PowerMockIgnore({"javax.management.*"}) public class DataSinkTaskTest extends TaskTestBase { private static final Logger LOG = LoggerFactory.getLogger(DataSinkTaskTest.class); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java index 8f0642e0e5cae..dabd13db23ded 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java @@ -44,11 +44,13 @@ import org.junit.After; import org.junit.Test; import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @RunWith(PowerMockRunner.class) @PrepareForTest({Task.class, ResultPartitionWriter.class}) +@PowerMockIgnore({"javax.management.*"}) public class DataSourceTaskTest extends TaskTestBase { private static final int MEMORY_MANAGER_SIZE = 1024 * 1024; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java index c3c23def377e0..1503df8e7d3ad 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java @@ -46,11 +46,13 @@ import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @RunWith(PowerMockRunner.class) @PrepareForTest({Task.class, ResultPartitionWriter.class}) +@PowerMockIgnore({"javax.management.*"}) public class ChainTaskTest extends TaskTestBase { private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3;