From 6487c939018a13a2da4355d93dd051419ea3b7ff Mon Sep 17 00:00:00 2001 From: Haohui Mai Date: Wed, 29 Mar 2017 16:57:52 -0700 Subject: [PATCH 1/2] [FLINK-6217] ContaineredTaskManagerParameters sets off-heap memory size incorrectly. --- .../LaunchableMesosWorker.java | 4 +- .../clusterframework/BootstrapTools.java | 19 +++++-- .../ContaineredTaskManagerParameters.java | 5 +- .../ContaineredTaskManagerParametersTest.java | 49 +++++++++++++++++++ 4 files changed, 70 insertions(+), 7 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java index bfe9be818579e..9abbe52b5754d 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java @@ -217,7 +217,9 @@ public Protos.TaskInfo launch(Protos.SlaveID slaveId, TaskAssignmentResult assig // finalize the memory parameters jvmArgs.append(" -Xms").append(tmParams.taskManagerHeapSizeMB()).append("m"); jvmArgs.append(" -Xmx").append(tmParams.taskManagerHeapSizeMB()).append("m"); - jvmArgs.append(" -XX:MaxDirectMemorySize=").append(tmParams.taskManagerDirectMemoryLimitMB()).append("m"); + if (tmParams.taskManagerDirectMemoryLimitMB() >= 0) { + jvmArgs.append(" -XX:MaxDirectMemorySize=").append(tmParams.taskManagerDirectMemoryLimitMB()).append("m"); + } // pass dynamic system properties jvmArgs.append(' ').append( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index e356d2b1cb560..19e12495f02c1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -21,6 +21,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Address; +import com.google.common.base.Joiner; import com.typesafe.config.Config; import org.apache.commons.cli.CommandLine; @@ -46,6 +47,7 @@ import java.io.PrintWriter; import java.net.BindException; import java.net.ServerSocket; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -56,6 +58,7 @@ */ public class BootstrapTools { private static final Logger LOG = LoggerFactory.getLogger(BootstrapTools.class); + private static final Joiner SPACE_JOINER = Joiner.on(" "); /** * Starts an ActorSystem with the given configuration listening at the address/ports. @@ -355,10 +358,18 @@ public static String getTaskManagerShellCommand( final Map startCommandValues = new HashMap<>(); startCommandValues.put("java", "$JAVA_HOME/bin/java"); - startCommandValues - .put("jvmmem", "-Xms" + tmParams.taskManagerHeapSizeMB() + "m " + - "-Xmx" + tmParams.taskManagerHeapSizeMB() + "m " + - "-XX:MaxDirectMemorySize=" + tmParams.taskManagerDirectMemoryLimitMB() + "m"); + + ArrayList params = new ArrayList<>(); + params.add(String.format("-Xms%dm", tmParams.taskManagerHeapSizeMB())); + params.add(String.format("-Xmx%dm", tmParams.taskManagerHeapSizeMB())); + + if (tmParams.taskManagerDirectMemoryLimitMB() >= 0) { + params.add(String.format("-XX:MaxDirectMemorySize=%dm", + tmParams.taskManagerDirectMemoryLimitMB())); + } + + startCommandValues.put("jvmmem", SPACE_JOINER.join(params)); + String javaOpts = flinkConfig.getString(CoreOptions.FLINK_JVM_OPTIONS); if (flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS).length() > 0) { javaOpts += " " + flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java index 0fc087032cde4..8ff3c257aa270 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java @@ -146,8 +146,9 @@ public static ContaineredTaskManagerParameters create( final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP); final long heapSizeMB; + long offHeapSize = -1; if (useOffHeap) { - long offHeapSize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE); + offHeapSize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE); if (offHeapSize <= 0) { double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION); @@ -174,6 +175,6 @@ public static ContaineredTaskManagerParameters create( // done return new ContaineredTaskManagerParameters( - containerMemoryMB, heapSizeMB, javaMemorySizeMB, numSlots, envVars); + containerMemoryMB, heapSizeMB, offHeapSize, numSlots, envVars); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java new file mode 100644 index 0000000000000..c0c48f9120dd0 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import org.apache.flink.configuration.Configuration; +import org.junit.Test; + +import static org.apache.flink.configuration.ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY; +import static org.apache.flink.configuration.TaskManagerOptions.MANAGED_MEMORY_SIZE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class ContaineredTaskManagerParametersTest { + private static final long CONTAINER_MEMORY = 8192; + + @Test + public void testDefaultOffHeapMemory() { + Configuration conf = new Configuration(); + ContaineredTaskManagerParameters params = + ContaineredTaskManagerParameters.create(conf, CONTAINER_MEMORY, 1); + assertEquals(-1, params.taskManagerDirectMemoryLimitMB()); + } + + @Test + public void testTotalMemoryDoesNotExceedContainerMemory() { + Configuration conf = new Configuration(); + conf.setBoolean(MANAGED_MEMORY_SIZE.key(), true); + ContaineredTaskManagerParameters params = + ContaineredTaskManagerParameters.create(conf, CONTAINER_MEMORY, 1); + assertTrue(params.taskManagerHeapSizeMB() + + params.taskManagerDirectMemoryLimitMB() <= CONTAINER_MEMORY); + } +} From 2b6336faba719a10a06fa5a0bd96e1dae577f297 Mon Sep 17 00:00:00 2001 From: Haohui Mai Date: Fri, 21 Apr 2017 18:13:53 -0700 Subject: [PATCH 2/2] Address comments. --- .../flink/runtime/clusterframework/BootstrapTools.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index 19e12495f02c1..5e29b6a9c419e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -21,11 +21,11 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Address; -import com.google.common.base.Joiner; import com.typesafe.config.Config; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; @@ -58,7 +58,6 @@ */ public class BootstrapTools { private static final Logger LOG = LoggerFactory.getLogger(BootstrapTools.class); - private static final Joiner SPACE_JOINER = Joiner.on(" "); /** * Starts an ActorSystem with the given configuration listening at the address/ports. @@ -368,7 +367,7 @@ public static String getTaskManagerShellCommand( tmParams.taskManagerDirectMemoryLimitMB())); } - startCommandValues.put("jvmmem", SPACE_JOINER.join(params)); + startCommandValues.put("jvmmem", StringUtils.join(params, ' ')); String javaOpts = flinkConfig.getString(CoreOptions.FLINK_JVM_OPTIONS); if (flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS).length() > 0) {