From 4c614d988599d3223d81a04f80a8ceb41a0b9e48 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 25 Oct 2017 17:31:45 +0200 Subject: [PATCH] [FLINK-7920] Make MiniClusterConfiguration immutable --- .../runtime/minicluster/MiniCluster.java | 84 ++----- .../minicluster/MiniClusterConfiguration.java | 227 ++++++++---------- .../minicluster/MiniClusterITCase.java | 20 +- .../Flip6LocalStreamEnvironment.java | 8 +- 4 files changed, 134 insertions(+), 205 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index f293a013f80db..dd352bb718d2a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -18,12 +18,9 @@ package org.apache.flink.runtime.minicluster; -import akka.actor.ActorSystem; - import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.client.JobExecutionException; @@ -47,6 +44,7 @@ import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; import org.apache.flink.util.ExceptionUtils; +import akka.actor.ActorSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,7 +65,7 @@ public class MiniCluster { private final Object lock = new Object(); /** The configuration for this mini cluster */ - private final MiniClusterConfiguration config; + private final MiniClusterConfiguration miniClusterConfiguration; @GuardedBy("lock") private MetricRegistry metricRegistry; @@ -106,49 +104,13 @@ public class MiniCluster { // ------------------------------------------------------------------------ - /** - * Creates a new mini cluster with the default configuration: - * - */ - public MiniCluster() { - this(new MiniClusterConfiguration()); - } - /** * Creates a new Flink mini cluster based on the given configuration. * - * @param config The configuration for the mini cluster + * @param miniClusterConfiguration The configuration for the mini cluster */ - public MiniCluster(MiniClusterConfiguration config) { - this.config = checkNotNull(config, "config may not be null"); - } - - /** - * Creates a mini cluster based on the given configuration. - * - * @deprecated Use {@link #MiniCluster(MiniClusterConfiguration)} instead. - * @see #MiniCluster(MiniClusterConfiguration) - */ - @Deprecated - public MiniCluster(Configuration config) { - this(createConfig(config, true)); - } - - /** - * Creates a mini cluster based on the given configuration, starting one or more - * RPC services, depending on the given flag. - * - * @deprecated Use {@link #MiniCluster(MiniClusterConfiguration)} instead. - * @see #MiniCluster(MiniClusterConfiguration) - */ - @Deprecated - public MiniCluster(Configuration config, boolean singleRpcService) { - this(createConfig(config, singleRpcService)); + public MiniCluster(MiniClusterConfiguration miniClusterConfiguration) { + this.miniClusterConfiguration = checkNotNull(miniClusterConfiguration, "config may not be null"); running = false; } @@ -175,14 +137,14 @@ public void start() throws Exception { checkState(!running, "FlinkMiniCluster is already running"); LOG.info("Starting Flink Mini Cluster"); - LOG.debug("Using configuration {}", config); + LOG.debug("Using configuration {}", miniClusterConfiguration); - final Configuration configuration = new UnmodifiableConfiguration(config.generateConfiguration()); - final Time rpcTimeout = config.getRpcTimeout(); - final int numJobManagers = config.getNumJobManagers(); - final int numTaskManagers = config.getNumTaskManagers(); - final int numResourceManagers = config.getNumResourceManagers(); - final boolean singleRpc = config.getUseSingleRpcSystem(); + final Configuration configuration = miniClusterConfiguration.getConfiguration(); + final Time rpcTimeout = miniClusterConfiguration.getRpcTimeout(); + final int numJobManagers = miniClusterConfiguration.getNumJobManagers(); + final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers(); + final int numResourceManagers = miniClusterConfiguration.getNumResourceManagers(); + final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == MiniClusterConfiguration.RpcServiceSharing.SHARED; try { LOG.info("Starting Metrics Registry"); @@ -198,7 +160,7 @@ public void start() throws Exception { // we always need the 'commonRpcService' for auxiliary calls commonRpcService = createRpcService(configuration, rpcTimeout, false, null); - if (singleRpc) { + if (useSingleRpcService) { // set that same RPC service for all JobManagers and TaskManagers for (int i = 0; i < numJobManagers; i++) { jobManagerRpcServices[i] = commonRpcService; @@ -216,9 +178,9 @@ public void start() throws Exception { } else { // start a new service per component, possibly with custom bind addresses - final String jobManagerBindAddress = config.getJobManagerBindAddress(); - final String taskManagerBindAddress = config.getTaskManagerBindAddress(); - final String resourceManagerBindAddress = config.getResourceManagerBindAddress(); + final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress(); + final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress(); + final String resourceManagerBindAddress = miniClusterConfiguration.getResourceManagerBindAddress(); for (int i = 0; i < numJobManagers; i++) { jobManagerRpcServices[i] = createRpcService( @@ -625,20 +587,6 @@ private static Throwable shutDownRpcs(RpcService[] rpcServices, Throwable priorE return priorException; } - private static MiniClusterConfiguration createConfig(Configuration cfg, boolean singleRpcService) { - MiniClusterConfiguration config = cfg == null ? - new MiniClusterConfiguration() : - new MiniClusterConfiguration(cfg); - - if (singleRpcService) { - config.setUseSingleRpcService(); - } else { - config.setUseRpcServicePerComponent(); - } - - return config; - } - private class TerminatingFatalErrorHandler implements FatalErrorHandler { private final int index; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java index aa9b0c2d9d621..52e037ca973bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java @@ -22,97 +22,60 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.taskexecutor.TaskManagerServices; -import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + import scala.concurrent.duration.FiniteDuration; -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.runtime.minicluster.MiniClusterConfiguration.RpcServiceSharing.SHARED; +/** + * Configuration object for the {@link MiniCluster}. + */ public class MiniClusterConfiguration { - private final Configuration config; + private final UnmodifiableConfiguration configuration; - private boolean singleRpcService = true; + private final int numJobManagers; - private int numJobManagers = 1; + private final int numTaskManagers; - private int numTaskManagers = 1; + private final int numResourceManagers; - private int numResourceManagers = 1; + private final RpcServiceSharing rpcServiceSharing; - private String commonBindAddress; - - private long managedMemoryPerTaskManager = -1; + @Nullable + private final String commonBindAddress; // ------------------------------------------------------------------------ // Construction // ------------------------------------------------------------------------ - public MiniClusterConfiguration() { - this.config = new Configuration(); - } - - public MiniClusterConfiguration(Configuration config) { - checkNotNull(config); - this.config = new Configuration(config); - } - - // ------------------------------------------------------------------------ - // setters - // ------------------------------------------------------------------------ - - public void addConfiguration(Configuration config) { - checkNotNull(config, "configuration must not be null"); - this.config.addAll(config); - } + public MiniClusterConfiguration( + Configuration configuration, + int numJobManagers, + int numTaskManagers, + int numResourceManagers, + RpcServiceSharing rpcServiceSharing, + @Nullable String commonBindAddress) { - public void setUseSingleRpcService() { - this.singleRpcService = true; - } - - public void setUseRpcServicePerComponent() { - this.singleRpcService = false; - } - - public void setNumJobManagers(int numJobManagers) { - checkArgument(numJobManagers >= 1, "must have at least one JobManager"); + this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration)); this.numJobManagers = numJobManagers; - } - - public void setNumTaskManagers(int numTaskManagers) { - checkArgument(numTaskManagers >= 1, "must have at least one TaskManager"); this.numTaskManagers = numTaskManagers; - } - - public void setNumResourceManagers(int numResourceManagers) { - checkArgument(numResourceManagers >= 1, "must have at least one ResourceManager"); this.numResourceManagers = numResourceManagers; - } - - public void setNumTaskManagerSlots(int numTaskSlots) { - checkArgument(numTaskSlots >= 1, "must have at least one task slot per TaskManager"); - this.config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numTaskSlots); - } - - public void setCommonRpcBindAddress(String bindAddress) { - checkNotNull(bindAddress, "bind address must not be null"); - this.commonBindAddress = bindAddress; - } - - public void setManagedMemoryPerTaskManager(long managedMemoryPerTaskManager) { - checkArgument(managedMemoryPerTaskManager > 0, "must have more than 0 MB of memory for the TaskManager."); - this.managedMemoryPerTaskManager = managedMemoryPerTaskManager; + this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing); + this.commonBindAddress = commonBindAddress; } // ------------------------------------------------------------------------ // getters // ------------------------------------------------------------------------ - public boolean getUseSingleRpcSystem() { - return singleRpcService; + public RpcServiceSharing getRpcServiceSharing() { + return rpcServiceSharing; } public int getNumJobManagers() { @@ -127,107 +90,121 @@ public int getNumResourceManagers() { return numResourceManagers; } - public int getNumSlotsPerTaskManager() { - return config.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); - } - public String getJobManagerBindAddress() { return commonBindAddress != null ? commonBindAddress : - config.getString(JobManagerOptions.ADDRESS, "localhost"); + configuration.getString(JobManagerOptions.ADDRESS, "localhost"); } public String getTaskManagerBindAddress() { return commonBindAddress != null ? commonBindAddress : - config.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost"); + configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost"); } public String getResourceManagerBindAddress() { return commonBindAddress != null ? commonBindAddress : - config.getString(JobManagerOptions.ADDRESS, "localhost"); // TODO: Introduce proper configuration constant for the resource manager hostname + configuration.getString(JobManagerOptions.ADDRESS, "localhost"); // TODO: Introduce proper configuration constant for the resource manager hostname } public Time getRpcTimeout() { - FiniteDuration duration = AkkaUtils.getTimeout(config); + FiniteDuration duration = AkkaUtils.getTimeout(configuration); return Time.of(duration.length(), duration.unit()); } - public long getManagedMemoryPerTaskManager() { - return getOrCalculateManagedMemoryPerTaskManager(); - } - - // ------------------------------------------------------------------------ - // utils - // ------------------------------------------------------------------------ - - public Configuration generateConfiguration() { - Configuration newConfiguration = new Configuration(config); - // set the memory - long memory = getOrCalculateManagedMemoryPerTaskManager(); - newConfiguration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, memory); - - return newConfiguration; + public UnmodifiableConfiguration getConfiguration() { + return configuration; } @Override public String toString() { return "MiniClusterConfiguration {" + - "singleRpcService=" + singleRpcService + + "singleRpcService=" + rpcServiceSharing + ", numJobManagers=" + numJobManagers + ", numTaskManagers=" + numTaskManagers + ", numResourceManagers=" + numResourceManagers + ", commonBindAddress='" + commonBindAddress + '\'' + - ", config=" + config + + ", config=" + configuration + '}'; } + // ---------------------------------------------------------------------------------- + // Enums + // ---------------------------------------------------------------------------------- + /** - * Get or calculate the managed memory per task manager. The memory is calculated in the - * following order: - * - * 1. Return {@link #managedMemoryPerTaskManager} if set - * 2. Return config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY) if set - * 3. Distribute the available free memory equally among all components (JMs, RMs and TMs) and - * calculate the managed memory from the share of memory for a single task manager. - * - * @return size of managed memory per task manager (in megabytes) + * Enum which defines whether the mini cluster components use a shared RpcService + * or whether every component gets its own dedicated RpcService started. */ - private long getOrCalculateManagedMemoryPerTaskManager() { - if (managedMemoryPerTaskManager == -1) { - // no memory set in the mini cluster configuration + public enum RpcServiceSharing { + SHARED, // a single shared rpc service + DEDICATED // every component gets his own dedicated rpc service + } - long memorySize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE); + // ---------------------------------------------------------------------------------- + // Builder + // ---------------------------------------------------------------------------------- - // we could probably use config.contains() but the previous implementation compared to - // the default (-1) thus allowing the user to explicitly specify this as well - // -> don't change this behaviour now - if (memorySize == TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) { - // no memory set in the flink configuration - // share the available memory among all running components + /** + * Builder for the MiniClusterConfiguration. + */ + public static class Builder { + private Configuration configuration = new Configuration(); + private int numJobManagers = 1; + private int numTaskManagers = 1; + private int numSlotsPerTaskManager = 1; + private int numResourceManagers = 1; + private RpcServiceSharing rpcServiceSharing = SHARED; + @Nullable + private String commonBindAddress = null; + + public Builder setConfiguration(Configuration configuration1) { + this.configuration = Preconditions.checkNotNull(configuration1); + return this; + } - float memoryFraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION); + public Builder setNumJobManagers(int numJobManagers) { + this.numJobManagers = numJobManagers; + return this; + } - long freeMemory = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(); + public Builder setNumTaskManagers(int numTaskManagers) { + this.numTaskManagers = numTaskManagers; + return this; + } - // we assign each component the same amount of free memory - // (might be a bit of overkill for the JMs and RMs) - long memoryPerComponent = freeMemory / (numTaskManagers + numResourceManagers + numJobManagers); + public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager) { + this.numSlotsPerTaskManager = numSlotsPerTaskManager; + return this; + } - // subtract the network buffer memory - long networkBuffersMemory = TaskManagerServices.calculateNetworkBufferMemory(memoryPerComponent, config); - long memoryMinusNetworkBuffers = memoryPerComponent - networkBuffersMemory; + public Builder setNumResourceManagers(int numResourceManagers) { + this.numResourceManagers = numResourceManagers; + return this; + } - // calculate the managed memory size - long managedMemoryBytes = (long) (memoryMinusNetworkBuffers * memoryFraction); + public Builder setRpcServiceSharing(RpcServiceSharing rpcServiceSharing) { + this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing); + return this; + } + + public Builder setCommonBindAddress(String commonBindAddress) { + this.commonBindAddress = commonBindAddress; + return this; + } - return managedMemoryBytes >> 20; // bytes to megabytes - } else { - return memorySize; - } - } else { - return managedMemoryPerTaskManager; + public MiniClusterConfiguration build() { + final Configuration modifiedConfiguration = new Configuration(configuration); + modifiedConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager); + + return new MiniClusterConfiguration( + modifiedConfiguration, + numJobManagers, + numTaskManagers, + numResourceManagers, + rpcServiceSharing, + commonBindAddress); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java index f90367cc3dd65..8ca132919b408 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java @@ -40,13 +40,13 @@ public class MiniClusterITCase extends TestLogger { // Simple Job Running Tests // ------------------------------------------------------------------------ + private static final MiniClusterConfiguration defaultConfiguration = null; + @Test public void runJobWithSingleRpcService() throws Exception { - MiniClusterConfiguration cfg = new MiniClusterConfiguration(); - - // should be the default, but set anyways to make sure the test - // stays valid when the default changes - cfg.setUseSingleRpcService(); + MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() + .setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.SHARED) + .build(); MiniCluster miniCluster = new MiniCluster(cfg); try { @@ -60,8 +60,9 @@ public void runJobWithSingleRpcService() throws Exception { @Test public void runJobWithMultipleRpcServices() throws Exception { - MiniClusterConfiguration cfg = new MiniClusterConfiguration(); - cfg.setUseRpcServicePerComponent(); + MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() + .setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.DEDICATED) + .build(); MiniCluster miniCluster = new MiniCluster(cfg); try { @@ -75,8 +76,9 @@ public void runJobWithMultipleRpcServices() throws Exception { @Test public void runJobWithMultipleJobManagers() throws Exception { - MiniClusterConfiguration cfg = new MiniClusterConfiguration(); - cfg.setNumJobManagers(3); + MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() + .setNumJobManagers(3) + .build(); MiniCluster miniCluster = new MiniCluster(cfg); try { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java index cebd15ff0427a..e276ac77e33a8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java @@ -99,15 +99,17 @@ public JobExecutionResult execute(String jobName) throws Exception { // add (and override) the settings with what the user defined configuration.addAll(this.conf); - MiniClusterConfiguration cfg = new MiniClusterConfiguration(configuration); - // Currently we do not reuse slot anymore, // so we need to sum up the parallelism of all vertices int slotsCount = 0; for (JobVertex jobVertex : jobGraph.getVertices()) { slotsCount += jobVertex.getParallelism(); } - cfg.setNumTaskManagerSlots(slotsCount); + + MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() + .setConfiguration(configuration) + .setNumSlotsPerTaskManager(slotsCount) + .build(); if (LOG.isInfoEnabled()) { LOG.info("Running job on local embedded Flink mini cluster");