Skip to content

Commit

Permalink
[FLINK-8212] [network] Pull EnvironmentInformation out of TaskManager…
Browse files Browse the repository at this point in the history
…Services
  • Loading branch information
zhangminglei committed Feb 14, 2018
1 parent 556ea8a commit 1e744b0
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,9 @@ public static TaskExecutor startTaskManager(

TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
taskManagerServicesConfiguration,
resourceID);
resourceID,
EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(),
EnvironmentInformation.getMaxJvmHeapMemory());

TaskManagerMetricGroup taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
metricRegistry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand Down Expand Up @@ -154,17 +153,21 @@ public TaskExecutorLocalStateStoresManager getTaskStateManager() {
*
* @param resourceID resource ID of the task manager
* @param taskManagerServicesConfiguration task manager configuration
* @param freeHeapMemoryWithDefrag an estimate of the size of the free heap memory
* @param maxJvmHeapMemory the maximum JVM heap size
* @return task manager components
* @throws Exception
*/
public static TaskManagerServices fromConfiguration(
TaskManagerServicesConfiguration taskManagerServicesConfiguration,
ResourceID resourceID) throws Exception {
ResourceID resourceID,
long freeHeapMemoryWithDefrag,
long maxJvmHeapMemory) throws Exception {

// pre-start checks
checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());

final NetworkEnvironment network = createNetworkEnvironment(taskManagerServicesConfiguration);
final NetworkEnvironment network = createNetworkEnvironment(taskManagerServicesConfiguration, maxJvmHeapMemory);
network.start();

final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
Expand All @@ -173,7 +176,7 @@ public static TaskManagerServices fromConfiguration(
network.getConnectionManager().getDataPort());

// this call has to happen strictly after the network stack has been initialized
final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration);
final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration, freeHeapMemoryWithDefrag, maxJvmHeapMemory);

// start the I/O manager, it will create some temp directories.
final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
Expand Down Expand Up @@ -215,10 +218,14 @@ public static TaskManagerServices fromConfiguration(
* Creates a {@link MemoryManager} from the given {@link TaskManagerServicesConfiguration}.
*
* @param taskManagerServicesConfiguration to create the memory manager from
* @param freeHeapMemoryWithDefrag an estimate of the size of the free heap memory
* @param maxJvmHeapMemory the maximum JVM heap size
* @return Memory manager
* @throws Exception
*/
private static MemoryManager createMemoryManager(TaskManagerServicesConfiguration taskManagerServicesConfiguration) throws Exception {
private static MemoryManager createMemoryManager(TaskManagerServicesConfiguration taskManagerServicesConfiguration,
long freeHeapMemoryWithDefrag,
long maxJvmHeapMemory) throws Exception {
// computing the amount of memory to use depends on how much memory is available
// it strictly needs to happen AFTER the network stack has been initialized

Expand All @@ -244,7 +251,7 @@ private static MemoryManager createMemoryManager(TaskManagerServicesConfiguratio

if (memType == MemoryType.HEAP) {
// network buffers allocated off-heap -> use memoryFraction of the available heap:
long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * memoryFraction);
long relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);
if (preAllocateMemory) {
LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." ,
memoryFraction , relativeMemSize >> 20);
Expand All @@ -258,8 +265,7 @@ private static MemoryManager createMemoryManager(TaskManagerServicesConfiguratio
// calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)), i.e.
// maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction)
// directMemorySize = jvmTotalNoNet * memoryFraction
long maxJvmHeap = EnvironmentInformation.getMaxJvmHeapMemory();
long directMemorySize = (long) (maxJvmHeap / (1.0 - memoryFraction) * memoryFraction);
long directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction);
if (preAllocateMemory) {
LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." ,
memoryFraction, directMemorySize >> 20);
Expand Down Expand Up @@ -301,15 +307,17 @@ private static MemoryManager createMemoryManager(TaskManagerServicesConfiguratio
* Creates the {@link NetworkEnvironment} from the given {@link TaskManagerServicesConfiguration}.
*
* @param taskManagerServicesConfiguration to construct the network environment from
* @param maxJvmHeapMemory the maximum JVM heap size
* @return Network environment
* @throws IOException
*/
private static NetworkEnvironment createNetworkEnvironment(
TaskManagerServicesConfiguration taskManagerServicesConfiguration) throws IOException {
TaskManagerServicesConfiguration taskManagerServicesConfiguration,
long maxJvmHeapMemory) throws IOException {

NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskManagerServicesConfiguration.getNetworkConfig();

final long networkBuf = calculateNetworkBufferMemory(taskManagerServicesConfiguration);
final long networkBuf = calculateNetworkBufferMemory(taskManagerServicesConfiguration, maxJvmHeapMemory);
int segmentSize = networkEnvironmentConfiguration.networkBufferSize();

// tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory)
Expand Down Expand Up @@ -475,10 +483,12 @@ public static long calculateNetworkBufferMemory(long totalJavaMemorySize, Config
* </ul>.
*
* @param tmConfig task manager services configuration object
* @param maxJvmHeapMemory the maximum JVM heap size
*
* @return memory to use for network buffers (in bytes)
*/
public static long calculateNetworkBufferMemory(TaskManagerServicesConfiguration tmConfig) {
public static long calculateNetworkBufferMemory(TaskManagerServicesConfiguration tmConfig,
long maxJvmHeapMemory) {
final NetworkEnvironmentConfiguration networkConfig = tmConfig.getNetworkConfig();

final float networkBufFraction = networkConfig.networkBufFraction();
Expand All @@ -498,11 +508,9 @@ public static long calculateNetworkBufferMemory(TaskManagerServicesConfiguration

final MemoryType memType = tmConfig.getMemoryType();

final long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory();

final long jvmHeapNoNet;
if (memType == MemoryType.HEAP) {
jvmHeapNoNet = maxMemory;
jvmHeapNoNet = maxJvmHeapMemory;
} else if (memType == MemoryType.OFF_HEAP) {

// check if a value has been configured
Expand All @@ -512,13 +520,13 @@ public static long calculateNetworkBufferMemory(TaskManagerServicesConfiguration
// The maximum heap memory has been adjusted according to configuredMemory, i.e.
// maxJvmHeap = jvmHeapNoNet - configuredMemory

jvmHeapNoNet = maxMemory + configuredMemory;
jvmHeapNoNet = maxJvmHeapMemory + configuredMemory;
} else {
// The maximum heap memory has been adjusted according to the fraction, i.e.
// maxJvmHeap = jvmHeapNoNet - jvmHeapNoNet * managedFraction = jvmHeapNoNet * (1 - managedFraction)

final float managedFraction = tmConfig.getMemoryFraction();
jvmHeapNoNet = (long) (maxMemory / (1.0 - managedFraction));
jvmHeapNoNet = (long) (maxJvmHeapMemory / (1.0 - managedFraction));
}
} else {
throw new RuntimeException("No supported memory type detected.");
Expand All @@ -531,13 +539,13 @@ public static long calculateNetworkBufferMemory(TaskManagerServicesConfiguration
(long) (jvmHeapNoNet / (1.0 - networkBufFraction) * networkBufFraction)));

TaskManagerServicesConfiguration
.checkConfigParameter(networkBufBytes < maxMemory,
.checkConfigParameter(networkBufBytes < maxJvmHeapMemory,
"(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
"(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
"Network buffer memory size too large: " + networkBufBytes + " >= " +
maxMemory + "(maximum JVM heap size)");
maxJvmHeapMemory + "(maximum JVM heap size)");

return networkBufBytes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,9 @@ class LocalFlinkMiniCluster(

val taskManagerServices = TaskManagerServices.fromConfiguration(
taskManagerServicesConfiguration,
resourceID)
resourceID,
EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag,
EnvironmentInformation.getMaxJvmHeapMemory)

val taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
metricRegistryOpt.get,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2014,7 +2014,9 @@ object TaskManager {

val taskManagerServices = TaskManagerServices.fromConfiguration(
taskManagerServicesConfiguration,
resourceID)
resourceID,
EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag,
EnvironmentInformation.getMaxJvmHeapMemory)

val taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
metricRegistry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.TestLogger;

import akka.actor.ActorRef;
Expand Down Expand Up @@ -96,7 +97,9 @@ public void testMetricRegistryLifeCycle() throws Exception {

TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
taskManagerServicesConfiguration,
tmResourceID);
tmResourceID,
EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(),
EnvironmentInformation.getMaxJvmHeapMemory());

TaskManagerMetricGroup taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
metricRegistry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,73 +21,51 @@
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.testutils.category.OldAndFlip6;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import java.net.InetAddress;

import static org.junit.Assert.assertEquals;
import static org.powermock.api.mockito.PowerMockito.when;

/**
* Tests the network buffer calculation from heap size which requires mocking
* to have control over the {@link EnvironmentInformation}.
*
*
* <p>This should be refactored once we have pulled {@link EnvironmentInformation} out of
* {@link TaskManagerServices}. See FLINK-8212 for more information.
* Tests the network buffer calculation from heap size.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(EnvironmentInformation.class)
@Category(OldAndFlip6.class)
public class NetworkBufferCalculationTest extends TestLogger {

/**
* Test for {@link TaskManagerServices#calculateNetworkBufferMemory(TaskManagerServicesConfiguration)}
* Test for {@link TaskManagerServices#calculateNetworkBufferMemory(TaskManagerServicesConfiguration, long)}
* using the same (manual) test cases as in {@link TaskManagerServicesTest#calculateHeapSizeMB()}.
*/
@Test
public void calculateNetworkBufFromHeapSize() throws Exception {
PowerMockito.mockStatic(EnvironmentInformation.class);
// some defaults:
when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L << 20); // 1000MB
when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(1000L << 20); // 1000MB

TaskManagerServicesConfiguration tmConfig;

tmConfig = getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(),
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
0.1f, 60L << 20, 1L << 30, MemoryType.HEAP);
when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(900L << 20); // 900MB
assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
TaskManagerServices.calculateNetworkBufferMemory(tmConfig, 900L << 20)); // 900MB

tmConfig = getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(),
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
0.2f, 60L << 20, 1L << 30, MemoryType.HEAP);
when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(800L << 20); // 800MB
assertEquals((200L << 20) + 3 /* slightly too many due to floating point imprecision */,
TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
TaskManagerServices.calculateNetworkBufferMemory(tmConfig, 800L << 20)); // 800MB

tmConfig = getTmConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP);
when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(890L << 20); // 890MB
assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
TaskManagerServices.calculateNetworkBufferMemory(tmConfig, 890L << 20)); // 890MB

tmConfig = getTmConfig(-1, 0.1f,
0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP);
when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(810L << 20); // 810MB
assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
TaskManagerServices.calculateNetworkBufferMemory(tmConfig, 810L << 20)); // 810MB
}

/**
Expand Down

0 comments on commit 1e744b0

Please sign in to comment.