From 08f56f6e954a37c9135bac891ad0bb6a31dcc8b0 Mon Sep 17 00:00:00 2001 From: zentol Date: Tue, 27 Feb 2018 15:21:50 +0100 Subject: [PATCH] [FLINK-8703][tests] Port StreamingScalabilityAndLatency to MiniClusterResource --- .../StreamingScalabilityAndLatency.java | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java index efcefebe34a01..a5b01bcd4f444 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java @@ -20,14 +20,13 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.test.util.MiniClusterResource; import static org.junit.Assert.fail; @@ -45,22 +44,24 @@ public static void main(String[] args) throws Exception { final int slotsPerTaskManager = 80; final int parallelism = taskManagers * slotsPerTaskManager; - LocalFlinkMiniCluster cluster = null; + MiniClusterResource cluster = null; try { Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, taskManagers); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slotsPerTaskManager); config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 20000); config.setInteger("taskmanager.net.server.numThreads", 1); config.setInteger("taskmanager.net.client.numThreads", 1); - cluster = new LocalFlinkMiniCluster(config, false); - cluster.start(); + cluster = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + config, + taskManagers, + slotsPerTaskManager)); + cluster.before(); - runPartitioningProgram(cluster.getLeaderRPCPort(), parallelism); + runPartitioningProgram(parallelism); } catch (Exception e) { e.printStackTrace(); @@ -68,13 +69,13 @@ public static void main(String[] args) throws Exception { } finally { if (cluster != null) { - cluster.stop(); + cluster.after(); } } } - private static void runPartitioningProgram(int jobManagerPort, int parallelism) throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort); + private static void runPartitioningProgram(int parallelism) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(parallelism); env.getConfig().enableObjectReuse();