From f1712543aa34252044c1ae1c2ab95e87706815ea Mon Sep 17 00:00:00 2001 From: Fabian Hueske Date: Fri, 9 Oct 2015 15:25:08 +0200 Subject: [PATCH] [FLINK-2784] Remove deprecated configuration keys and updated documentation --- docs/apis/python.md | 2 +- docs/setup/config.md | 7 +-- .../flink/configuration/ConfigConstants.java | 29 ----------- .../org/apache/flink/optimizer/Optimizer.java | 7 +-- .../minicluster/LocalFlinkMiniCluster.scala | 19 ++------ .../runtime/taskmanager/TaskManager.scala | 48 +++++-------------- .../taskmanager/TaskManagerStartupTest.java | 2 +- .../environment/StreamContextEnvironment.java | 8 +--- .../environment/StreamPlanEnvironment.java | 8 +--- 9 files changed, 27 insertions(+), 103 deletions(-) diff --git a/docs/apis/python.md b/docs/apis/python.md index 14585fc82b6af..d57e11765de21 100644 --- a/docs/apis/python.md +++ b/docs/apis/python.md @@ -575,7 +575,7 @@ env.execute() ### System Level A system-wide default parallelism for all execution environments can be defined by setting the -`parallelization.degree.default` property in `./conf/flink-conf.yaml`. See the +`parallelism.default` property in `./conf/flink-conf.yaml`. See the [Configuration](config.html) documentation for details. [Back to top](#top) diff --git a/docs/setup/config.md b/docs/setup/config.md index 4cfe428782eb5..0e7b2eef1911c 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -142,6 +142,9 @@ results outside of the JVM heap. For setups with larger quantities of memory, this can improve the efficiency of the operations performed on the memory (DEFAULT: false). +- `taskmanager.memory.segment-size`: The size of memory buffers used by the +memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)). + ### Other - `taskmanager.tmp.dirs`: The directory for temporary files, or a list of @@ -254,8 +257,6 @@ network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: 2048). -- `taskmanager.network.bufferSizeInBytes`: The size of the network buffers, in -bytes (DEFAULT: 32768 (= 32 KiBytes)). - `taskmanager.memory.size`: The amount of memory (in megabytes) that the task manager reserves on the JVM's heap space for sorting, hash tables, and caching of intermediate results. If unspecified (-1), the memory manager will take a fixed @@ -441,7 +442,7 @@ The number and size of network buffers can be configured with the following parameters: - `taskmanager.network.numberOfBuffers`, and -- `taskmanager.network.bufferSizeInBytes`. +- `taskmanager.memory.segment-size`. ### Configuring Temporary I/O Directories diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 2b4749c565d08..36369abd2d94e 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -35,12 +35,6 @@ public final class ConfigConstants { */ public static final String DEFAULT_PARALLELISM_KEY = "parallelism.default"; - /** - * The deprecated config parameter defining the default parallelism for jobs. - */ - @Deprecated - public static final String DEFAULT_PARALLELISM_KEY_OLD = "parallelization.degree.default"; - /** * Config parameter for the number of re-tries for failed tasks. Setting this * value to 0 effectively disables fault tolerance. @@ -135,12 +129,6 @@ public final class ConfigConstants { */ public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers"; - /** - * Deprecated config parameter defining the size of the buffers used in the network stack. - */ - @Deprecated - public static final String TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY = "taskmanager.network.bufferSizeInBytes"; - /** * Config parameter defining the size of memory buffers used by the network stack and the memory manager. */ @@ -205,22 +193,11 @@ public final class ConfigConstants { */ public static final String YARN_HEAP_CUTOFF_RATIO = "yarn.heap-cutoff-ratio"; - /** - * Upper bound for heap cutoff on YARN. - * The "yarn.heap-cutoff-ratio" is removing a certain ratio from the heap. - * This value is limiting this cutoff to a absolute value. - * - * THE VALUE IS NO LONGER IN USE. - */ - @Deprecated - public static final String YARN_HEAP_LIMIT_CAP = "yarn.heap-limit-cap"; - /** * Minimum amount of memory to remove from the heap space as a safety margin. */ public static final String YARN_HEAP_CUTOFF_MIN = "yarn.heap-cutoff-min"; - /** * Reallocate failed YARN containers. */ @@ -547,12 +524,6 @@ public final class ConfigConstants { */ public static final int DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS = 2048; - /** - * Default size of network stack buffers. - */ - @Deprecated - public static final int DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE = 32768; - /** * Default size of memory segments in the network stack and the memory manager. */ diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java index 6f41c29773774..ed3cbd5c895e5 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java @@ -347,14 +347,9 @@ public Optimizer(DataStatistics stats, CostEstimator estimator, Configuration co this.costEstimator = estimator; // determine the default parallelism - // check for old key string first, then for new one - this.defaultParallelism = config.getInteger( - ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD, - ConfigConstants.DEFAULT_PARALLELISM); - // now check for new one which overwrites old values this.defaultParallelism = config.getInteger( ConfigConstants.DEFAULT_PARALLELISM_KEY, - this.defaultParallelism); + ConfigConstants.DEFAULT_PARALLELISM); if (defaultParallelism < 1) { LOG.warn("Config value " + defaultParallelism + " for option " diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 7b1f9e818b7c2..38e3efb3272cf 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -161,22 +161,9 @@ class LocalFlinkMiniCluster( // set this only if no memory was pre-configured if (config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1) == -1) { - val bufferSizeNew: Int = config.getInteger( - ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, -1) - - val bufferSizeOld: Int = config.getInteger( - ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1) - val bufferSize: Int = - if (bufferSizeNew != -1) { - bufferSizeNew - } - else if (bufferSizeOld == -1) { - // nothing has been configured, take the default - ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE - } - else { - bufferSizeOld - } + val bufferSize: Int = config.getInteger( + ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE) val bufferMem: Long = config.getLong( ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index bf230213dd170..b28fb7326aff7 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1758,41 +1758,19 @@ object TaskManager { checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers, ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY) - val pageSizeNew: Int = configuration.getInteger( - ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, -1) - - val pageSizeOld: Int = configuration.getInteger( - ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1) - - val pageSize: Int = - if (pageSizeNew != -1) { - // new page size has been configured - checkConfigParameter(pageSizeNew >= MemoryManager.MIN_PAGE_SIZE, pageSizeNew, - ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, - "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE) - - checkConfigParameter(MathUtils.isPowerOf2(pageSizeNew), pageSizeNew, - ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, - "Memory segment size must be a power of 2.") - - pageSizeNew - } - else if (pageSizeOld == -1) { - // nothing has been configured, take the default - ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE - } - else { - // old page size has been configured - checkConfigParameter(pageSizeOld >= MemoryManager.MIN_PAGE_SIZE, pageSizeOld, - ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, - "Minimum buffer size is " + MemoryManager.MIN_PAGE_SIZE) - - checkConfigParameter(MathUtils.isPowerOf2(pageSizeOld), pageSizeOld, - ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, - "Buffer size must be a power of 2.") - - pageSizeOld - } + val pageSize: Int = configuration.getInteger( + ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE) + + // check page size of for minimum size + checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize, + ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, + "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE) + + // check page size for power of two + checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize, + ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, + "Memory segment size must be a power of 2.") // check whether we use heap or off-heap memory val memType: MemoryType = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java index a3b65f03ab8df..9cc8170a587bd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java @@ -149,7 +149,7 @@ public void testMemoryConfigWrong() { // something ridiculously high final long memSize = (((long) Integer.MAX_VALUE - 1) * - ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE) >> 20; + ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE) >> 20; cfg.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memSize); try { TaskManager.runTaskManager("localhost", 0, cfg, StreamingMode.BATCH_ONLY); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java index 1392efb3a0564..b2a5435af0e8f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java @@ -60,14 +60,10 @@ protected StreamContextEnvironment(Client client, List jars, List clas setParallelism(parallelism); } else { - // first check for old parallelism config key - setParallelism(GlobalConfiguration.getInteger( - ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD, - ConfigConstants.DEFAULT_PARALLELISM)); - // then for new + // determine parallelism setParallelism(GlobalConfiguration.getInteger( ConfigConstants.DEFAULT_PARALLELISM_KEY, - getParallelism())); + ConfigConstants.DEFAULT_PARALLELISM)); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java index e5ea2c55be52d..61b9a2fba6550 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java @@ -37,14 +37,10 @@ protected StreamPlanEnvironment(ExecutionEnvironment env) { if (parallelism > 0) { setParallelism(parallelism); } else { - // first check for old parallelism config key - setParallelism(GlobalConfiguration.getInteger( - ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD, - ConfigConstants.DEFAULT_PARALLELISM)); - // then for new + // determine parallelism setParallelism(GlobalConfiguration.getInteger( ConfigConstants.DEFAULT_PARALLELISM_KEY, - getParallelism())); + ConfigConstants.DEFAULT_PARALLELISM)); } }