From e2939f518323ffdf2448a8eda7dd2a3e267081db Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 18 Jul 2023 22:50:51 +0800 Subject: [PATCH] [FLINK-32625][tests] MiniClusterTestEnvironment supports customized MiniClusterResourceConfiguration Signed-off-by: tison --- .../MiniClusterTestEnvironment.java | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java index e68560ada54b0..c62fac21650a7 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java @@ -60,17 +60,11 @@ public class MiniClusterTestEnvironment implements TestEnvironment, ClusterContr private boolean isStarted = false; public MiniClusterTestEnvironment() { - Configuration conf = new Configuration(); - conf.set(METRIC_FETCHER_UPDATE_INTERVAL, METRIC_FETCHER_UPDATE_INTERVAL_MS); - this.miniCluster = - new MiniClusterWithClientResource( - new MiniClusterResourceConfiguration.Builder() - .setConfiguration(conf) - .setNumberTaskManagers(1) - .setNumberSlotsPerTaskManager(6) - .setRpcServiceSharing(RpcServiceSharing.DEDICATED) - .withHaLeadershipControl() - .build()); + this(defaultMiniClusterResourceConfiguration()); + } + + public MiniClusterTestEnvironment(MiniClusterResourceConfiguration conf) { + this.miniCluster = new MiniClusterWithClientResource(conf); try { this.checkpointPath = Files.createTempDirectory("minicluster-environment-checkpoint-"); } catch (IOException e) { @@ -78,6 +72,18 @@ public MiniClusterTestEnvironment() { } } + private static MiniClusterResourceConfiguration defaultMiniClusterResourceConfiguration() { + Configuration conf = new Configuration(); + conf.set(METRIC_FETCHER_UPDATE_INTERVAL, METRIC_FETCHER_UPDATE_INTERVAL_MS); + return new MiniClusterResourceConfiguration.Builder() + .setConfiguration(conf) + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(6) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .withHaLeadershipControl() + .build(); + } + @Override public StreamExecutionEnvironment createExecutionEnvironment( TestEnvironmentSettings envOptions) {