Skip to content

Commit

Permalink
[FLINK-32625][tests] MiniClusterTestEnvironment supports customized M…
Browse files Browse the repository at this point in the history
…iniClusterResourceConfiguration

This closes  #23015.
  • Loading branch information
tisonkun committed Jul 19, 2023
1 parent 99c652c commit 3886f1f
Showing 1 changed file with 17 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,24 +60,30 @@ public class MiniClusterTestEnvironment implements TestEnvironment, ClusterContr
private boolean isStarted = false;

public MiniClusterTestEnvironment() {
Configuration conf = new Configuration();
conf.set(METRIC_FETCHER_UPDATE_INTERVAL, METRIC_FETCHER_UPDATE_INTERVAL_MS);
this.miniCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(conf)
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(6)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.withHaLeadershipControl()
.build());
this(defaultMiniClusterResourceConfiguration());
}

public MiniClusterTestEnvironment(MiniClusterResourceConfiguration conf) {
this.miniCluster = new MiniClusterWithClientResource(conf);
try {
this.checkpointPath = Files.createTempDirectory("minicluster-environment-checkpoint-");
} catch (IOException e) {
throw new RuntimeException("Failed to create temporary checkpoint directory", e);
}
}

private static MiniClusterResourceConfiguration defaultMiniClusterResourceConfiguration() {
Configuration conf = new Configuration();
conf.set(METRIC_FETCHER_UPDATE_INTERVAL, METRIC_FETCHER_UPDATE_INTERVAL_MS);
return new MiniClusterResourceConfiguration.Builder()
.setConfiguration(conf)
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(6)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.withHaLeadershipControl()
.build();
}

@Override
public StreamExecutionEnvironment createExecutionEnvironment(
TestEnvironmentSettings envOptions) {
Expand Down

0 comments on commit 3886f1f

Please sign in to comment.