From 858b2e4a31ad186b718c529a70757fda428a92ef Mon Sep 17 00:00:00 2001 From: zentol Date: Tue, 27 Feb 2018 15:19:50 +0100 Subject: [PATCH] [FLINK-8703][tests] Port NotSoMiniClusterIterations to MiniClusterResource --- .../manual/NotSoMiniClusterIterations.java | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java b/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java index 9f6bcbbf67c51..abb8673db5684 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java @@ -24,12 +24,11 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.examples.java.graph.ConnectedComponents; import org.apache.flink.examples.java.graph.util.ConnectedComponentsData; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.test.util.MiniClusterResource; import static org.junit.Assert.fail; @@ -46,23 +45,25 @@ public static void main(String[] args) { throw new RuntimeException("This test program needs to run with at least 5GB of heap space."); } - LocalFlinkMiniCluster cluster = null; + MiniClusterResource cluster = null; try { Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, PARALLELISM); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 8L); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1000); config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 8 * 1024); config.setInteger("taskmanager.net.server.numThreads", 1); config.setInteger("taskmanager.net.client.numThreads", 1); - cluster = new LocalFlinkMiniCluster(config, false); - cluster.start(); + cluster = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + config, + PARALLELISM, + 1)); + cluster.before(); - runConnectedComponents(cluster.getLeaderRPCPort()); + runConnectedComponents(); } catch (Exception e) { e.printStackTrace(); @@ -70,14 +71,14 @@ public static void main(String[] args) { } finally { if (cluster != null) { - cluster.stop(); + cluster.after(); } } } - private static void runConnectedComponents(int jmPort) throws Exception { + private static void runConnectedComponents() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jmPort); + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(PARALLELISM); env.getConfig().disableSysoutLogging();