From 51f13adb3a6602037aa97fbd070a0959acd936f1 Mon Sep 17 00:00:00 2001 From: tianchen Date: Fri, 19 Apr 2019 12:50:22 +0800 Subject: [PATCH 1/3] [FLINK-9904]Allow users to control MaxDirectMemorySize --- .../apache/flink/configuration/TaskManagerOptions.java | 8 ++++++++ flink-dist/src/main/flink-bin/bin/config.sh | 7 +++++++ flink-dist/src/main/flink-bin/bin/taskmanager.sh | 2 +- 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java index 8b51bed015f66..f39095f6fe793 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -73,6 +73,14 @@ public class TaskManagerOptions { .defaultValue(false) .withDescription("Whether to kill the TaskManager when the task thread throws an OutOfMemoryError."); + /** + * Maximum direct memory size of TaskManager, default is 8388607T (Long.MAX_VALUE in TB). + */ + public static final ConfigOption TM_MAX_OFFHEAP_SIZE = + key("taskmanager.max.off-heap.size") + .defaultValue("8388607T") + .withDescription("Maximum direct memory size of TaskManager, default is 8388607T"); + /** * Whether the quarantine monitor for task managers shall be started. The quarantine monitor * shuts down the actor system if it detects that it has quarantined another actor system diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh index 1d38e2ac164db..cf832c42731ef 100755 --- a/flink-dist/src/main/flink-bin/bin/config.sh +++ b/flink-dist/src/main/flink-bin/bin/config.sh @@ -112,6 +112,7 @@ KEY_TASKM_MEM_MANAGED_SIZE="taskmanager.memory.size" KEY_TASKM_MEM_MANAGED_FRACTION="taskmanager.memory.fraction" KEY_TASKM_OFFHEAP="taskmanager.memory.off-heap" KEY_TASKM_MEM_PRE_ALLOCATE="taskmanager.memory.preallocate" +KEY_TASKM_MAX_OFFHEAP_SIZE="taskmanager.max.off-heap.size" KEY_TASKM_NET_BUF_FRACTION="taskmanager.network.memory.fraction" KEY_TASKM_NET_BUF_MIN="taskmanager.network.memory.min" @@ -429,6 +430,12 @@ if [ -z "${FLINK_TM_NET_BUF_MAX}" -o "${FLINK_TM_NET_BUF_MAX}" = "-1" ]; then FLINK_TM_NET_BUF_MAX=$(parseBytes ${FLINK_TM_NET_BUF_MAX}) fi +# Define FLINK_TM_MAX_OFFHEAP_SIZE if it is not already set +if [ -z "${FLINK_TM_MAX_OFFHEAP_SIZE}" ]; then + # default: Long.MAX_VALUE in TB + FLINK_TM_MAX_OFFHEAP_SIZE=$(readFromConfig ${KEY_TASKM_MAX_OFFHEAP_SIZE} "8388607T" "${YAML_CONF}") +fi + # Verify that NUMA tooling is available command -v numactl >/dev/null 2>&1 diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh index b5de445d46328..d0f3e8f2e6355 100755 --- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh +++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh @@ -60,7 +60,7 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB) # Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will be used - TM_MAX_OFFHEAP_SIZE="8388607T" + TM_MAX_OFFHEAP_SIZE=${FLINK_TM_MAX_OFFHEAP_SIZE} export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}" From 58c823bc291dbe0d6682fecbd6a17712890f027a Mon Sep 17 00:00:00 2001 From: tianchen Date: Fri, 19 Apr 2019 14:08:56 +0800 Subject: [PATCH 2/3] fix --- docs/_includes/generated/task_manager_configuration.html | 5 +++++ .../apache/flink/configuration/TaskManagerOptions.java | 8 -------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/docs/_includes/generated/task_manager_configuration.html b/docs/_includes/generated/task_manager_configuration.html index 395a613d15e7c..e3d99b58ce3d3 100644 --- a/docs/_includes/generated/task_manager_configuration.html +++ b/docs/_includes/generated/task_manager_configuration.html @@ -143,5 +143,10 @@ "0" The task manager’s IPC port. Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine. + +
taskmanager.max.off-heap.size
+ "8388607T" + The maximum direct memory size of TaskManager + diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java index f39095f6fe793..8b51bed015f66 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -73,14 +73,6 @@ public class TaskManagerOptions { .defaultValue(false) .withDescription("Whether to kill the TaskManager when the task thread throws an OutOfMemoryError."); - /** - * Maximum direct memory size of TaskManager, default is 8388607T (Long.MAX_VALUE in TB). - */ - public static final ConfigOption TM_MAX_OFFHEAP_SIZE = - key("taskmanager.max.off-heap.size") - .defaultValue("8388607T") - .withDescription("Maximum direct memory size of TaskManager, default is 8388607T"); - /** * Whether the quarantine monitor for task managers shall be started. The quarantine monitor * shuts down the actor system if it detects that it has quarantined another actor system From bcb7f3a11ced1664f25b1e45ffdd4d802c56e5c5 Mon Sep 17 00:00:00 2001 From: tianchen Date: Mon, 22 Apr 2019 10:41:34 +0800 Subject: [PATCH 3/3] fix --- docs/_includes/generated/task_manager_configuration.html | 5 ----- 1 file changed, 5 deletions(-) diff --git a/docs/_includes/generated/task_manager_configuration.html b/docs/_includes/generated/task_manager_configuration.html index e3d99b58ce3d3..395a613d15e7c 100644 --- a/docs/_includes/generated/task_manager_configuration.html +++ b/docs/_includes/generated/task_manager_configuration.html @@ -143,10 +143,5 @@ "0" The task manager’s IPC port. Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine. - -
taskmanager.max.off-heap.size
- "8388607T" - The maximum direct memory size of TaskManager -