Skip to content

Commit

Permalink
[FLINK-7920] Make MiniClusterConfiguration immutable
Browse files Browse the repository at this point in the history
This closes #4905.
  • Loading branch information
tillrohrmann committed Oct 26, 2017
1 parent 3039df8 commit a9743eb
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 205 deletions.
Expand Up @@ -18,12 +18,9 @@


package org.apache.flink.runtime.minicluster; package org.apache.flink.runtime.minicluster;


import akka.actor.ActorSystem;

import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobExecutionException;
Expand All @@ -47,6 +44,7 @@
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExceptionUtils;


import akka.actor.ActorSystem;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand All @@ -67,7 +65,7 @@ public class MiniCluster {
private final Object lock = new Object(); private final Object lock = new Object();


/** The configuration for this mini cluster */ /** The configuration for this mini cluster */
private final MiniClusterConfiguration config; private final MiniClusterConfiguration miniClusterConfiguration;


@GuardedBy("lock") @GuardedBy("lock")
private MetricRegistry metricRegistry; private MetricRegistry metricRegistry;
Expand Down Expand Up @@ -106,49 +104,13 @@ public class MiniCluster {


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------


/**
* Creates a new mini cluster with the default configuration:
* <ul>
* <li>One JobManager</li>
* <li>One TaskManager</li>
* <li>One task slot in the TaskManager</li>
* <li>All components share the same RPC subsystem (minimizes communication overhead)</li>
* </ul>
*/
public MiniCluster() {
this(new MiniClusterConfiguration());
}

/** /**
* Creates a new Flink mini cluster based on the given configuration. * Creates a new Flink mini cluster based on the given configuration.
* *
* @param config The configuration for the mini cluster * @param miniClusterConfiguration The configuration for the mini cluster
*/ */
public MiniCluster(MiniClusterConfiguration config) { public MiniCluster(MiniClusterConfiguration miniClusterConfiguration) {
this.config = checkNotNull(config, "config may not be null"); this.miniClusterConfiguration = checkNotNull(miniClusterConfiguration, "config may not be null");
}

/**
* Creates a mini cluster based on the given configuration.
*
* @deprecated Use {@link #MiniCluster(MiniClusterConfiguration)} instead.
* @see #MiniCluster(MiniClusterConfiguration)
*/
@Deprecated
public MiniCluster(Configuration config) {
this(createConfig(config, true));
}

/**
* Creates a mini cluster based on the given configuration, starting one or more
* RPC services, depending on the given flag.
*
* @deprecated Use {@link #MiniCluster(MiniClusterConfiguration)} instead.
* @see #MiniCluster(MiniClusterConfiguration)
*/
@Deprecated
public MiniCluster(Configuration config, boolean singleRpcService) {
this(createConfig(config, singleRpcService));


running = false; running = false;
} }
Expand All @@ -175,14 +137,14 @@ public void start() throws Exception {
checkState(!running, "FlinkMiniCluster is already running"); checkState(!running, "FlinkMiniCluster is already running");


LOG.info("Starting Flink Mini Cluster"); LOG.info("Starting Flink Mini Cluster");
LOG.debug("Using configuration {}", config); LOG.debug("Using configuration {}", miniClusterConfiguration);


final Configuration configuration = new UnmodifiableConfiguration(config.generateConfiguration()); final Configuration configuration = miniClusterConfiguration.getConfiguration();
final Time rpcTimeout = config.getRpcTimeout(); final Time rpcTimeout = miniClusterConfiguration.getRpcTimeout();
final int numJobManagers = config.getNumJobManagers(); final int numJobManagers = miniClusterConfiguration.getNumJobManagers();
final int numTaskManagers = config.getNumTaskManagers(); final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
final int numResourceManagers = config.getNumResourceManagers(); final int numResourceManagers = miniClusterConfiguration.getNumResourceManagers();
final boolean singleRpc = config.getUseSingleRpcSystem(); final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == MiniClusterConfiguration.RpcServiceSharing.SHARED;


try { try {
LOG.info("Starting Metrics Registry"); LOG.info("Starting Metrics Registry");
Expand All @@ -198,7 +160,7 @@ public void start() throws Exception {
// we always need the 'commonRpcService' for auxiliary calls // we always need the 'commonRpcService' for auxiliary calls
commonRpcService = createRpcService(configuration, rpcTimeout, false, null); commonRpcService = createRpcService(configuration, rpcTimeout, false, null);


if (singleRpc) { if (useSingleRpcService) {
// set that same RPC service for all JobManagers and TaskManagers // set that same RPC service for all JobManagers and TaskManagers
for (int i = 0; i < numJobManagers; i++) { for (int i = 0; i < numJobManagers; i++) {
jobManagerRpcServices[i] = commonRpcService; jobManagerRpcServices[i] = commonRpcService;
Expand All @@ -216,9 +178,9 @@ public void start() throws Exception {
} }
else { else {
// start a new service per component, possibly with custom bind addresses // start a new service per component, possibly with custom bind addresses
final String jobManagerBindAddress = config.getJobManagerBindAddress(); final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress();
final String taskManagerBindAddress = config.getTaskManagerBindAddress(); final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();
final String resourceManagerBindAddress = config.getResourceManagerBindAddress(); final String resourceManagerBindAddress = miniClusterConfiguration.getResourceManagerBindAddress();


for (int i = 0; i < numJobManagers; i++) { for (int i = 0; i < numJobManagers; i++) {
jobManagerRpcServices[i] = createRpcService( jobManagerRpcServices[i] = createRpcService(
Expand Down Expand Up @@ -625,20 +587,6 @@ private static Throwable shutDownRpcs(RpcService[] rpcServices, Throwable priorE
return priorException; return priorException;
} }


private static MiniClusterConfiguration createConfig(Configuration cfg, boolean singleRpcService) {
MiniClusterConfiguration config = cfg == null ?
new MiniClusterConfiguration() :
new MiniClusterConfiguration(cfg);

if (singleRpcService) {
config.setUseSingleRpcService();
} else {
config.setUseRpcServicePerComponent();
}

return config;
}

private class TerminatingFatalErrorHandler implements FatalErrorHandler { private class TerminatingFatalErrorHandler implements FatalErrorHandler {


private final int index; private final int index;
Expand Down

0 comments on commit a9743eb

Please sign in to comment.