From 15ae80ee5a9cd29597f6ed597183251fbbb32f37 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 5 Apr 2017 10:59:00 +0200 Subject: [PATCH 1/3] [FLINK-6270] extend Configuration with contains(configOption) --- .../flink/configuration/Configuration.java | 29 +++++++++++++++++++ .../DelegatingConfiguration.java | 5 ++++ 2 files changed, 34 insertions(+) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java index 8f23435dddc42..fb55683949f09 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java @@ -595,6 +595,35 @@ public boolean containsKey(String key){ } } + /** + * Checks whether there is an entry for the given config option + * + * @param configOption The configuration option + * + * @return true if a valid (current of deprecated) key of the config option is stored, + * false otherwise + */ + public boolean contains(ConfigOption configOption) { + synchronized (this.confData){ + // first try the current key + if (this.confData.containsKey(configOption.key())) { + return true; + } + else if (configOption.hasDeprecatedKeys()) { + // try the deprecated keys + for (String deprecatedKey : configOption.deprecatedKeys()) { + if (this.confData.containsKey(deprecatedKey)) { + LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'", + deprecatedKey, configOption.key()); + return true; + } + } + } + + return false; + } + } + // -------------------------------------------------------------------------------------------- @Override diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java index bd9a96245e4bb..1b14e9ecd2766 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java @@ -290,6 +290,11 @@ public boolean containsKey(String key) { return backingConfig.containsKey(prefix + key); } + @Override + public boolean contains(ConfigOption configOption) { + return backingConfig.contains(prefixOption(configOption, prefix)); + } + // -------------------------------------------------------------------------------------------- @Override From ec1cfb6da699af414e045977dbe31b7bcc2f8ab0 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 5 Apr 2017 11:45:57 +0200 Subject: [PATCH 2/3] [FLINK-6270] port some memory and network task manager options to ConfigOption --- .../kafka/KafkaShortRetentionTestBase.java | 3 +- .../connectors/kafka/KafkaTestBase.java | 3 +- .../flink/storm/api/FlinkLocalCluster.java | 3 +- .../flink/configuration/ConfigConstants.java | 44 ++++++++++++++++--- .../configuration/TaskManagerOptions.java | 43 ++++++++++++++++++ .../ContaineredTaskManagerParameters.java | 12 ++--- .../io/network/buffer/NetworkBufferPool.java | 4 +- .../partition/SpillableSubpartition.java | 11 +++-- .../minicluster/MiniClusterConfiguration.java | 27 +++--------- .../TaskManagerServicesConfiguration.java | 30 +++++-------- .../minicluster/LocalFlinkMiniCluster.scala | 19 +++----- .../PartialConsumePipelinedResultTest.java | 3 +- .../runtime/jobmanager/JobManagerTest.java | 4 +- ...TaskCancelAsyncProducerConsumerITCase.java | 5 ++- .../TaskManagerProcessReapingTestBase.java | 5 ++- .../taskmanager/TaskManagerStartupTest.java | 11 ++--- .../runtime/taskmanager/TaskManagerTest.java | 2 +- .../runtime/testutils/TaskManagerProcess.java | 10 ++--- .../runtime/testingUtils/TestingUtils.scala | 4 +- .../Flip6LocalStreamEnvironment.java | 4 +- .../environment/LocalStreamEnvironment.java | 3 +- .../apache/flink/test/util/TestBaseUtils.java | 3 +- .../accumulators/AccumulatorErrorITCase.java | 3 +- .../test/cancelling/CancelingTestBase.java | 5 ++- ...actEventTimeWindowCheckpointingITCase.java | 3 +- ...EventTimeAllWindowCheckpointingITCase.java | 3 +- .../test/checkpointing/SavepointITCase.java | 3 +- .../StreamFaultToleranceTestBase.java | 3 +- .../WindowCheckpointingITCase.java | 3 +- .../JobSubmissionFailsITCase.java | 3 +- .../manual/NotSoMiniClusterIterations.java | 7 +-- .../StreamingScalabilityAndLatency.java | 5 ++- .../test/misc/CustomSerializationITCase.java | 3 +- .../test/misc/MiscellaneousIssuesITCase.java | 3 +- ...ccessAfterNetworkBuffersFailureITCase.java | 5 ++- .../query/AbstractQueryableStateITCase.java | 3 +- ...TaskManagerProcessFailureRecoveryTest.java | 5 ++- ...erHAProcessFailureBatchRecoveryITCase.java | 5 ++- .../TaskManagerFailureRecoveryITCase.java | 3 +- .../test/runtime/IPv6HostnamesITCase.java | 3 +- .../streaming/runtime/TimestampITCase.java | 3 +- .../flink/test/web/WebFrontendITCase.java | 3 +- 42 files changed, 201 insertions(+), 126 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java index 1e8537037b5fd..954dc7d953f6c 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java @@ -22,6 +22,7 @@ import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; @@ -98,7 +99,7 @@ public static void prepare() throws IOException, ClassNotFoundException { // start also a re-usable Flink mini cluster flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); - flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); + flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); flink = new LocalFlinkMiniCluster(flinkConfig, false); diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index 5cec4f0414cd7..0c6bfa9a1f1ca 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -20,6 +20,7 @@ import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.metrics.jmx.JMXReporter; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; @@ -113,7 +114,7 @@ protected static Configuration getFlinkConfiguration() { Configuration flinkConfig = new Configuration(); flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); - flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); + flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter"); flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java index 367b313179b12..d69d345969204 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java @@ -18,6 +18,7 @@ package org.apache.flink.storm.api; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.storm.LocalCluster; import org.apache.storm.generated.ClusterSummary; import org.apache.storm.generated.KillOptions; @@ -92,7 +93,7 @@ public void submitTopologyWithOpts(final String topologyName, final Map conf, fi Configuration configuration = new Configuration(); configuration.addAll(jobGraph.getJobConfiguration()); - configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L); + configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L); configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism()); this.flink = new LocalFlinkMiniCluster(configuration, true); diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index ce44ab8c18f48..de06b59411fa8 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -210,34 +210,52 @@ public final class ConfigConstants { * The config parameter defining the amount of memory to be allocated by the task manager's * memory manager (in megabytes). If not set, a relative fraction will be allocated, as defined * by {@link #TASK_MANAGER_MEMORY_FRACTION_KEY}. + * + * @deprecated Use {@link TaskManagerOptions#MANAGED_MEMORY_SIZE} instead */ + @Deprecated public static final String TASK_MANAGER_MEMORY_SIZE_KEY = "taskmanager.memory.size"; /** * The config parameter defining the fraction of free memory allocated by the memory manager. + * + * @deprecated Use {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION} instead */ + @Deprecated public static final String TASK_MANAGER_MEMORY_FRACTION_KEY = "taskmanager.memory.fraction"; /** * The config parameter defining the memory allocation method (JVM heap or off-heap). - */ + * + * @deprecated Use {@link TaskManagerOptions#MEMORY_OFF_HEAP} instead + */ + @Deprecated public static final String TASK_MANAGER_MEMORY_OFF_HEAP_KEY = "taskmanager.memory.off-heap"; /** * The config parameter for specifying whether TaskManager managed memory should be preallocated * when the TaskManager is starting. (default is false) + * + * @deprecated Use {@link TaskManagerOptions#MANAGED_MEMORY_PRE_ALLOCATE} instead */ + @Deprecated public static final String TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY = "taskmanager.memory.preallocate"; /** * The config parameter defining the number of buffers used in the network stack. This defines the * number of possible tasks and shuffles. + * + * @deprecated Use {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} instead */ + @Deprecated public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers"; /** * Config parameter defining the size of memory buffers used by the network stack and the memory manager. + * + * @deprecated Use {@link TaskManagerOptions#MEMORY_SEGMENT_SIZE} instead */ + @Deprecated public static final String TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY = "taskmanager.memory.segment-size"; /** @@ -1126,20 +1144,29 @@ public final class ConfigConstants { * The default directory for temporary files of the task manager. */ public static final String DEFAULT_TASK_MANAGER_TMP_PATH = System.getProperty("java.io.tmpdir"); - + /** - * The default fraction of the free memory allocated by the task manager's memory manager. + * Config key has been deprecated. Therefore, no default value required. + * + * @deprecated {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION} provides the default value now */ + @Deprecated public static final float DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION = 0.7f; - + /** - * Default number of buffers used in the network stack. + * Config key has been deprecated. Therefore, no default value required. + * + * @deprecated {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} provides the default value now */ + @Deprecated public static final int DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS = 2048; /** - * Default size of memory segments in the network stack and the memory manager. + * Config key has been deprecated. Therefore, no default value required. + * + * @deprecated {@link TaskManagerOptions#MEMORY_SEGMENT_SIZE} provides the default value now */ + @Deprecated public static final int DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE = 32768; /** @@ -1179,8 +1206,11 @@ public final class ConfigConstants { public static final String DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "10 s"; /** - * The default setting for TaskManager memory eager allocation of managed memory + * Config key has been deprecated. Therefore, no default value required. + * + * @deprecated {@link TaskManagerOptions#MANAGED_MEMORY_PRE_ALLOCATE} provides the default value now */ + @Deprecated public static final boolean DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE = false; /** @deprecated Please use {@link TaskManagerOptions#TASK_CANCELLATION_INTERVAL}. */ diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java index b891e352d4746..adfc8e96288e9 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -39,10 +39,53 @@ public class TaskManagerOptions { key("taskmanager.jvm-exit-on-oom") .defaultValue(false); + /** Size of memory buffers used by the network stack and the memory manager (in bytes). */ + public static final ConfigOption MEMORY_SEGMENT_SIZE = + key("taskmanager.memory.segment-size") + .defaultValue(32768); + + /** + * Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not + * set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION}. + */ + public static final ConfigOption MANAGED_MEMORY_SIZE = + key("taskmanager.memory.size") + .defaultValue(-1L); + + /** + * Fraction of free memory allocated by the memory manager if {@link #MANAGED_MEMORY_SIZE} is + * not set. + */ + public static final ConfigOption MANAGED_MEMORY_FRACTION = + key("taskmanager.memory.fraction") + .defaultValue(0.7f); + + /** + * Memory allocation method (JVM heap or off-heap), used for managed memory of the TaskManager + * as well as the network buffers. + **/ + public static final ConfigOption MEMORY_OFF_HEAP = + key("taskmanager.memory.off-heap") + .defaultValue(false); + + /** Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting. */ + public static final ConfigOption MANAGED_MEMORY_PRE_ALLOCATE = + key("taskmanager.memory.preallocate") + .defaultValue(false); + // ------------------------------------------------------------------------ // Network Options // ------------------------------------------------------------------------ + /** + * Number of buffers used in the network stack. This defines the number of possible tasks and + * shuffles. + */ + public static final ConfigOption NETWORK_NUM_BUFFERS = + key("taskmanager.network.numberOfBuffers") + .defaultValue(2048); + + /** Minimum backoff for partition requests of input channels. */ public static final ConfigOption NETWORK_REQUEST_BACKOFF_INITIAL = key("taskmanager.net.request-backoff.initial") diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java index 3dc4394bc9a44..0fc087032cde4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java @@ -20,6 +20,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import java.util.HashMap; import java.util.Map; @@ -142,19 +143,14 @@ public static ContaineredTaskManagerParameters create( // (2) split the Java memory between heap and off-heap - final boolean useOffHeap = config.getBoolean( - ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false); + final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP); final long heapSizeMB; if (useOffHeap) { - long offHeapSize = config.getLong( - ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L); + long offHeapSize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE); if (offHeapSize <= 0) { - double fraction = config.getFloat( - ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, - ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION); - + double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION); offHeapSize = (long) (fraction * javaMemorySizeMB); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java index 5f2da039a2e49..a36bdf47592dd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.io.network.buffer; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.core.memory.MemoryType; @@ -199,7 +199,7 @@ public BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers) t numRequiredBuffers, totalNumberOfMemorySegments - numTotalRequiredBuffers, totalNumberOfMemorySegments, - ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)); + TaskManagerOptions.NETWORK_NUM_BUFFERS.key())); } this.numTotalRequiredBuffers += numRequiredBuffers; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java index ad04e977b0a4c..ae97c4259711c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.io.network.partition; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; @@ -50,9 +50,12 @@ * this state, different reader variants are returned (see * {@link SpillableSubpartitionView} and {@link SpilledSubpartitionView}). * - *

Since the network buffer pool size is usually quite small (default is - * {@link ConfigConstants#DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS}), most - * spillable partitions will be spilled for real-world data sets. + *

Since the network buffer pool size for outgoing partitions is usually + * quite small, e.g. via the {@link TaskManagerOptions#NETWORK_BUFFERS_PER_CHANNEL} + * and {@link TaskManagerOptions#NETWORK_EXTRA_BUFFERS_PER_GATE} parameters + * for bounded channels or from the default value of + * {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}, most spillable partitions + * will be spilled for real-world data sets. */ class SpillableSubpartition extends ResultSubpartition { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java index 3a03ca3a05a7b..2272a335d058b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java @@ -20,9 +20,8 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.util.EnvironmentInformation; import scala.concurrent.duration.FiniteDuration; @@ -165,7 +164,7 @@ public Configuration generateConfiguration() { Configuration newConfiguration = new Configuration(config); // set the memory long memory = getOrCalculateManagedMemoryPerTaskManager(); - newConfiguration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memory); + newConfiguration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, memory); return newConfiguration; } @@ -196,29 +195,17 @@ public String toString() { private long getOrCalculateManagedMemoryPerTaskManager() { if (managedMemoryPerTaskManager == -1) { // no memory set in the mini cluster configuration - final ConfigOption memorySizeOption = ConfigOptions - .key(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY) - .defaultValue(-1); - int memorySize = config.getInteger(memorySizeOption); + long memorySize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE); if (memorySize == -1) { // no memory set in the flink configuration // share the available memory among all running components - final ConfigOption bufferSizeOption = ConfigOptions - .key(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY) - .defaultValue(ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE); - final ConfigOption bufferMemoryOption = ConfigOptions - .key(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY) - .defaultValue((long) ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS); - - final ConfigOption memoryFractionOption = ConfigOptions - .key(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY) - .defaultValue(ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION); - - float memoryFraction = config.getFloat(memoryFractionOption); - long networkBuffersMemory = config.getLong(bufferMemoryOption) * config.getInteger(bufferSizeOption); + float memoryFraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION); + long networkBuffersMemory = + (long) config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS) * + (long) config.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE); long freeMemory = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index 8ad318ac9d53e..a4af7def93c0a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -182,22 +182,18 @@ public static TaskManagerServicesConfiguration fromConfiguration( parseQueryableStateConfiguration(configuration); // extract memory settings - long configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L); + long configuredMemory = configuration.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE); checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory, - ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), "MemoryManager needs at least one MB of memory. " + "If you leave this config parameter empty, the system automatically " + "pick a fraction of the available memory."); - boolean preAllocateMemory = configuration.getBoolean( - ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE); + boolean preAllocateMemory = configuration.getBoolean(TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE); - float memoryFraction = configuration.getFloat( - ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, - ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION); + float memoryFraction = configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION); checkConfigParameter(memoryFraction > 0.0f && memoryFraction < 1.0f, memoryFraction, - ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, + TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(), "MemoryManager fraction of the free memory must be between 0.0 and 1.0"); final MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration); @@ -247,30 +243,26 @@ private static NetworkEnvironmentConfiguration parseNetworkEnvironmentConfigurat checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, "Number of task slots must be at least one."); - final int numNetworkBuffers = configuration.getInteger( - ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS); + final int numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS); checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers, - ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, ""); + TaskManagerOptions.NETWORK_NUM_BUFFERS.key(), ""); - final int pageSize = configuration.getInteger( - ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE); + final int pageSize = configuration.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE); // check page size of for minimum size checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize, - ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(), "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE); // check page size for power of two checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize, - ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(), "Memory segment size must be a power of 2."); // check whether we use heap or off-heap memory final MemoryType memType; - if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) { + if (configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) { memType = MemoryType.OFF_HEAP; } else { memType = MemoryType.HEAP; diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 21e0d28629576..3c4c959acc41c 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -24,7 +24,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService} import akka.actor.{ActorRef, ActorSystem, Props} import org.apache.flink.api.common.JobID import org.apache.flink.api.common.io.FileOutputFormat -import org.apache.flink.configuration.{ConfigConstants, Configuration, QueryableStateOptions} +import org.apache.flink.configuration.{ConfigConstants, Configuration, QueryableStateOptions, TaskManagerOptions} import org.apache.flink.core.fs.Path import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.clusterframework.FlinkResourceManager @@ -350,23 +350,18 @@ class LocalFlinkMiniCluster( def setMemory(config: Configuration): Unit = { // set this only if no memory was pre-configured - if (config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1) == -1) { + if (config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE) == -1L) { - val bufferSize: Int = config.getInteger( - ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE) + val bufferSize: Int = config.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE) - val bufferMem: Long = config.getLong( - ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) * bufferSize.toLong + val bufferMem: Long = config.getInteger( + TaskManagerOptions.NETWORK_NUM_BUFFERS).toLong * bufferSize.toLong val numTaskManager = config.getInteger( ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER) - val memoryFraction = config.getFloat( - ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, - ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION) + val memoryFraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION) // full memory size var memorySize: Long = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag @@ -379,7 +374,7 @@ class LocalFlinkMiniCluster( memorySize -= bufferMem memorySize = (memorySize * memoryFraction).toLong memorySize >>>= 20 // bytes to megabytes - config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize) + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, memorySize) } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java index 4a826b7be74e2..f19ca4e58a3a8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java @@ -20,6 +20,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; @@ -51,7 +52,7 @@ public static void setUp() throws Exception { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUMBER_OF_TMS); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM); config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); - config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, NUMBER_OF_NETWORK_BUFFERS); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS); flink = new TestingCluster(config, true); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index 3944752c635a6..4dec84bab6777 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason; @@ -105,7 +106,6 @@ import java.io.File; import java.net.InetAddress; import java.util.Collections; -import java.util.UUID; import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED; @@ -588,7 +588,7 @@ public void testKvStateMessages() throws Exception { AkkaUtils.getAkkaURL(system, jobManager.actor())); Configuration tmConfig = new Configuration(); - tmConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4); + tmConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L); tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); ActorRef taskManager = TaskManager.startTaskManagerComponentsAndActor( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java index 5a14b409da1b4..4ea651195cd52 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java @@ -20,6 +20,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; @@ -78,8 +79,8 @@ public void testCancelAsyncProducerAndConsumer() throws Exception { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); - config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 4096); - config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 8); + config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 8); flink = new TestingCluster(config, true); flink.start(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java index 2528e244cb786..2fafe5b379c24 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java @@ -22,6 +22,7 @@ import akka.actor.ActorSystem; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.FlinkResourceManager; import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager; @@ -228,8 +229,8 @@ public static void main(String[] args) { Configuration cfg = new Configuration(); cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort); - cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4); - cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 256); + cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L); + cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 256); TaskManager.runTaskManager("localhost", ResourceID.generate(), taskManagerPort, cfg); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java index b2a905df1c943..4df8db3204a31 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.util.StartupUtils; @@ -113,7 +114,7 @@ public void testIODirectoryNotWritable() { try { Configuration cfg = new Configuration(); cfg.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, nonWritable.getAbsolutePath()); - cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4); + cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L); cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 21656); @@ -154,7 +155,7 @@ public void testMemoryConfigWrong() { cfg.setString(ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY, "true"); // something invalid - cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -42); + cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -42L); try { TaskManager.runTaskManager("localhost", ResourceID.generate(), 0, cfg); fail("Should fail synchronously with an exception"); @@ -165,8 +166,8 @@ public void testMemoryConfigWrong() { // something ridiculously high final long memSize = (((long) Integer.MAX_VALUE - 1) * - ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE) >> 20; - cfg.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memSize); + TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()) >> 20; + cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, memSize); try { TaskManager.runTaskManager("localhost", ResourceID.generate(), 0, cfg); fail("Should fail synchronously with an exception"); @@ -197,7 +198,7 @@ public void testStartupWhenNetworkStackFailsToInitialize() throws Exception { final Configuration cfg = new Configuration(); cfg.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost"); cfg.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, blocker.getLocalPort()); - cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 1); + cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 1L); TaskManager.startTaskManagerComponentsAndActor( cfg, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index a754cff3d3cd9..4530adebee0f2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -1498,7 +1498,7 @@ public void testFailingScheduleOrUpdateConsumersMessage() throws Exception { // set the memory segment to the smallest size possible, because we have to fill one // memory buffer to trigger the schedule or update consumers message to the downstream // operators - configuration.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 4096); + configuration.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096); final JobID jid = new JobID(); final JobVertexID vid = new JobVertexID(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java index 58bc50e572e5b..36eb47fab073a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.testutils; import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.taskmanager.TaskManager; import org.slf4j.Logger; @@ -102,12 +102,12 @@ public static void main(String[] args) throws Exception { try { Configuration config = ParameterTool.fromArgs(args).getConfiguration(); - if (!config.containsKey(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY)) { - config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4); + if (!config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) { + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L); } - if (!config.containsKey(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)) { - config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100); + if (!config.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) { + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100); } diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index d6221f5138720..49baff11d3578 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -28,7 +28,7 @@ import com.google.common.util.concurrent.MoreExecutors import com.typesafe.config.ConfigFactory import grizzled.slf4j.Logger import org.apache.flink.api.common.JobExecutionResult -import org.apache.flink.configuration.{ConfigConstants, Configuration, HighAvailabilityOptions} +import org.apache.flink.configuration.{ConfigConstants, Configuration, HighAvailabilityOptions, TaskManagerOptions} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.client.JobClient import org.apache.flink.runtime.clusterframework.FlinkResourceManager @@ -304,7 +304,7 @@ object TestingUtils { val resultingConfiguration = new Configuration() - resultingConfiguration.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10) + resultingConfiguration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 10L) resultingConfiguration.addAll(configuration) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java index 4a5f20d066278..244660a51b308 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java @@ -22,8 +22,8 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.minicluster.MiniCluster; @@ -98,7 +98,7 @@ public JobExecutionResult execute(String jobName) throws Exception { Configuration configuration = new Configuration(); configuration.addAll(jobGraph.getJobConfiguration()); - configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L); + configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L); // add (and override) the settings with what the user defined configuration.addAll(this.conf); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java index cb6055207b79f..117f6d870b1ce 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java @@ -23,6 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; @@ -113,7 +114,7 @@ public JobExecutionResult execute(String jobName) throws Exception { Configuration configuration = new Configuration(); configuration.addAll(jobGraph.getJobConfiguration()); - configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L); + configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L); configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism()); // add (and override) the settings with what the user defined diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index cc7c0e298840f..f96ab3de47d7b 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -30,6 +30,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; @@ -141,7 +142,7 @@ public static LocalFlinkMiniCluster startCluster( Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath()); Files.createFile(new File(logDir, "jobmanager.out").toPath()); - config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, TASK_MANAGER_MEMORY_SIZE); config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true); config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s"); diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java index cc70fee7e29c7..0303202bbf771 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.junit.AfterClass; @@ -52,7 +53,7 @@ public static void startCluster() { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3); - config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L); cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java index 8d8ee64133778..06233d6bfdf16 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.plan.OptimizedPlan; @@ -88,8 +89,8 @@ public void startCluster() throws Exception { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); - config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 4096); - config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048); + config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 2048); this.executor = new LocalFlinkMiniCluster(config, false); this.executor.start(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java index 5fc2083e06a88..462d3a4aa7db7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.state.AbstractStateBackend; @@ -99,7 +100,7 @@ public static void startTestCluster() { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2); - config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L); cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java index 09c143722c33d..3345b9ce3e7fa 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.runtime.state.CheckpointListener; @@ -70,7 +71,7 @@ public static void startTestCluster() { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2); - config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L); config.setString(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT, "60 s"); config.setString(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, "60 s"); cluster = new LocalFlinkMiniCluster(config, false); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index a5c994aec3ad4..3718a947f9221 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -35,6 +35,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.checkpoint.SubtaskState; @@ -783,7 +784,7 @@ public Integer map(Integer value) throws Exception { Configuration config = new Configuration(); config.addAll(jobGraph.getJobConfiguration()); - config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2 * jobGraph.getMaximumParallelism()); final File checkpointDir = new File(tmpDir, "checkpoints"); final File savepointDir = new File(tmpDir, "savepoints"); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java index 10f78d4df2b40..2839bc124b443 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.util.TestUtils; @@ -52,7 +53,7 @@ public static void startCluster() { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS); - config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L); cluster = new LocalFlinkMiniCluster(config, false); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java index a45349d57b848..56d8c663fb369 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.runtime.state.CheckpointListener; @@ -81,7 +82,7 @@ public static void startTestCluster() { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2); - config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L); cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java index 256b1ae17fa05..93ab29d74abc9 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -54,7 +55,7 @@ public class JobSubmissionFailsITCase { public static void setup() { try { Configuration config = new Configuration(); - config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS / 2); diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java b/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java index bd9955c617625..ee3b4b25573a0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java @@ -26,6 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.examples.java.graph.ConnectedComponents; import org.apache.flink.examples.java.graph.util.ConnectedComponentsData; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; @@ -50,10 +51,10 @@ public static void main(String[] args) { try { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, PARALLELISM); - config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 8); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 8L); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); - config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 1000); - config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 8 * 1024); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1000); + config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 8 * 1024); config.setInteger("taskmanager.net.server.numThreads", 1); config.setInteger("taskmanager.net.client.numThreads", 1); diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java index ec617b1905f18..90dbe80a900bd 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java @@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -46,9 +47,9 @@ public static void main(String[] args) throws Exception { try { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, TASK_MANAGERS); - config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 80); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TASK_MANAGER); - config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 20000); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 20000); config.setInteger("taskmanager.net.server.numThreads", 1); config.setInteger("taskmanager.net.client.numThreads", 1); diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java index 51f35344b4add..fda731e50fd60 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java @@ -24,6 +24,7 @@ import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; @@ -50,7 +51,7 @@ public static void startCluster() { try { Configuration config = new Configuration(); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARLLELISM); - config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 30); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 30L); cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java index 06b93ea9dd58a..d9cf574ad0fb5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java @@ -28,6 +28,7 @@ import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; @@ -60,7 +61,7 @@ public static void startCluster() { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3); - config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L); cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java index a43bab64fbd8d..5761bf262c694 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java @@ -28,6 +28,7 @@ import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.examples.java.clustering.KMeans; import org.apache.flink.examples.java.clustering.util.KMeansData; import org.apache.flink.examples.java.graph.ConnectedComponents; @@ -48,9 +49,9 @@ public void testSuccessfulProgramAfterFailure() { try { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); - config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 80); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); - config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 840); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 840); cluster = new LocalFlinkMiniCluster(config, false); diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java index 1912c0fa0c1f4..3c8eb482a4656 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java @@ -36,6 +36,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.QueryableStateOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; @@ -109,7 +110,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { public static void setup() { try { Configuration config = new Configuration(); - config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index 515570dcfbc3d..27d1aa1fc41bb 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -26,6 +26,7 @@ import org.apache.commons.io.FileUtils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.jobmanager.JobManager; @@ -389,8 +390,8 @@ public static void main(String[] args) { Configuration cfg = new Configuration(); cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort); - cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4); - cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100); + cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L); + cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100); cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); cfg.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s"); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java index a51f88b8dac11..b6a1bd4ee63b2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java @@ -31,6 +31,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.instance.ActorGateway; @@ -258,8 +259,8 @@ public void testJobManagerProcessFailure() throws Exception { jmProcess[0].startProcess(); // Task manager configuration - config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4); - config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); // Start the task manager process diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java index 5d2990528cc4f..d256ccf460609 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.api.java.io.LocalCollectionOutputFormat; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.junit.Test; @@ -72,7 +73,7 @@ public void testRestartWithFailingTaskManager() { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM); - config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms"); config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "20 s"); diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java index 0b008eb7378a1..4c77ef0ac6558 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.test.testdata.WordCountData; @@ -73,7 +74,7 @@ public void testClusterWithIPv6host() { conf.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, addressString); conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); - conf.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); + conf.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); flink = new LocalFlinkMiniCluster(conf, false); flink.start(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java index 13add4b93a246..229d3fdcce435 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java @@ -26,6 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.testutils.MultiShotLatch; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -89,7 +90,7 @@ public static void startCluster() { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS); - config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L); cluster = new LocalFlinkMiniCluster(config, false); diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java index 3b0c3640fbe1d..003eb0c57e71e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; @@ -71,7 +72,7 @@ public static void initialize() throws Exception { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS); - config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L); config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); File logDir = File.createTempFile("TestBaseUtils-logdir", null); From db8adbc74658458e3453495eba1ac7373e975237 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 6 Apr 2017 10:51:07 +0200 Subject: [PATCH 3/3] [FLINK-6270] address PR comments by @zentol --- .../java/org/apache/flink/configuration/Configuration.java | 3 ++- .../flink/runtime/minicluster/MiniClusterConfiguration.java | 5 ++++- .../taskexecutor/TaskManagerServicesConfiguration.java | 4 +++- .../flink/runtime/minicluster/LocalFlinkMiniCluster.scala | 3 ++- 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java index fb55683949f09..ea0c4195207ca 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java @@ -600,9 +600,10 @@ public boolean containsKey(String key){ * * @param configOption The configuration option * - * @return true if a valid (current of deprecated) key of the config option is stored, + * @return true if a valid (current or deprecated) key of the config option is stored, * false otherwise */ + @PublicEvolving public boolean contains(ConfigOption configOption) { synchronized (this.confData){ // first try the current key diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java index 2272a335d058b..823b3f2aceca8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java @@ -198,7 +198,10 @@ private long getOrCalculateManagedMemoryPerTaskManager() { long memorySize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE); - if (memorySize == -1) { + // we could probably use config.contains() but the previous implementation compared to + // the default (-1) thus allowing the user to explicitly specify this as well + // -> don't change this behaviour now + if (memorySize == TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) { // no memory set in the flink configuration // share the available memory among all running components diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index a4af7def93c0a..366be34d8d824 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -183,7 +183,9 @@ public static TaskManagerServicesConfiguration fromConfiguration( // extract memory settings long configuredMemory = configuration.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE); - checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory, + checkConfigParameter( + configuredMemory == TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue() || + configuredMemory > 0, configuredMemory, TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), "MemoryManager needs at least one MB of memory. " + "If you leave this config parameter empty, the system automatically " + diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 3c4c959acc41c..3d43da5610e72 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -350,7 +350,8 @@ class LocalFlinkMiniCluster( def setMemory(config: Configuration): Unit = { // set this only if no memory was pre-configured - if (config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE) == -1L) { + if (config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE) == + TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) { val bufferSize: Int = config.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE)