Skip to content

Commit

Permalink
Till's comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
sihuazhou committed Mar 20, 2018
1 parent c984370 commit f6e1105
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.util.Preconditions;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -103,6 +103,46 @@ public String toString() {
// Factory
// ------------------------------------------------------------------------

/**
* calcuate cutoff memory size used by container, it will throw an {@link IllegalArgumentException}
* if the config is invalid or return the cutoff value if valid.
*
* @param config The Flink configuration.
* @param containerMemoryMB The size of the complete container, in megabytes.
*
* @return cutoff memory size used by container.
*/
public static long calculateCutoffMB(Configuration config, long containerMemoryMB) {
Preconditions.checkArgument(containerMemoryMB > 0);

// (1) check cutoff ratio
final float memoryCutoffRatio = config.getFloat(
ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);

if (memoryCutoffRatio >= 1 || memoryCutoffRatio <= 0) {
throw new IllegalArgumentException("The configuration value '"
+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key() + "' must be between 0 and 1. Value given="
+ memoryCutoffRatio);
}

// (2) check min cutoff value
final int minCutoff = config.getInteger(
ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);

if (minCutoff >= containerMemoryMB) {
throw new IllegalArgumentException("The configuration value '"
+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key() + "'='" + minCutoff
+ "' is larger than the total container memory " + containerMemoryMB);
}

// (3) check between heap and off-heap
long cutoff = (long) (containerMemoryMB * memoryCutoffRatio);
if (cutoff < minCutoff) {
cutoff = minCutoff;
}
return cutoff;
}

/**
* Computes the parameters to be used to start a TaskManager Java process.
*
Expand All @@ -114,7 +154,7 @@ public static ContaineredTaskManagerParameters create(
Configuration config, long containerMemoryMB, int numSlots)
{
// (1) try to compute how much memory used by container
final long cutoffMB = ResourceManagerRuntimeServices.calculateCutoffMB(config, containerMemoryMB);
final long cutoffMB = calculateCutoffMB(config, containerMemoryMB);

// (2) split the remaining Java memory between heap and off-heap
final long heapSizeMB = TaskManagerServices.calculateHeapSizeMB(containerMemoryMB - cutoffMB, config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.flink.runtime.resourcemanager;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
Expand Down Expand Up @@ -55,46 +53,6 @@ public void shutDown() throws Exception {

// -------------------- Static methods --------------------------------------

/**
* Check whether the config is valid, it will throw an exception if the config
* is invalid or return the cut off value.
*
* @param config The Flink configuration.
* @param containerMemoryMB The size of the complete container, in megabytes.
*
* @return cut off size used by container.
*/
public static long calculateCutoffMB(Configuration config, long containerMemoryMB) {
Preconditions.checkArgument(containerMemoryMB > 0);

// (1) check cutoff ratio
final float memoryCutoffRatio = config.getFloat(
ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);

if (memoryCutoffRatio >= 1 || memoryCutoffRatio <= 0) {
throw new IllegalArgumentException("The configuration value '"
+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key() + "' must be between 0 and 1. Value given="
+ memoryCutoffRatio);
}

// (2) check min cutoff value
final int minCutoff = config.getInteger(
ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);

if (minCutoff >= containerMemoryMB) {
throw new IllegalArgumentException("The configuration value '"
+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key() + "'='" + minCutoff
+ "' is larger than the total container memory " + containerMemoryMB);
}

// (3) check between heap and off-heap
long cutoff = (long) (containerMemoryMB * memoryCutoffRatio);
if (cutoff < minCutoff) {
cutoff = minCutoff;
}
return cutoff;
}

public static ResourceManagerRuntimeServices fromConfiguration(
ResourceManagerRuntimeServicesConfiguration configuration,
HighAvailabilityServices highAvailabilityServices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.util.TestLogger;
import org.junit.Test;

import static org.apache.flink.configuration.TaskManagerOptions.MEMORY_OFF_HEAP;
import static org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBufferMemory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class ContaineredTaskManagerParametersTest extends TestLogger {
private static final long CONTAINER_MEMORY = 8192L;
Expand Down Expand Up @@ -91,4 +93,34 @@ public void testTotalMemoryDoesNotExceedContainerMemoryOffHeap() {
assertTrue(params.taskManagerHeapSizeMB() +
params.taskManagerDirectMemoryLimitMB() <= CONTAINER_MEMORY);
}

/**
* Test to guard {@link ContaineredTaskManagerParameters#calculateCutoffMB(Configuration, long)}.
*/
@Test
public void testCalculateCutoffMB() throws Exception {

Configuration config = new Configuration();
long containerMemoryMB = 1000;

config.setFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.1f);
config.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 128);

assertEquals(128,
ContaineredTaskManagerParameters.calculateCutoffMB(config, containerMemoryMB));

config.setFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.2f);
assertEquals(200,
ContaineredTaskManagerParameters.calculateCutoffMB(config, containerMemoryMB));

config.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 1000);

try {
ContaineredTaskManagerParameters.calculateCutoffMB(config, containerMemoryMB);
} catch (IllegalArgumentException expected) {
// we expected it.
return;
}
fail();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -411,10 +411,26 @@ public void terminateCluster(ApplicationId applicationId) throws FlinkException
}
}

private void checkConfig(ClusterSpecification clusterSpecification) {
/**
* Method to validate cluster specification before deploy it, it will throw
* an {@link IllegalConfigurationException} if the {@link ClusterSpecification} is invalid.
*/
private void validateClusterSpecification(ClusterSpecification clusterSpecification) {
long taskManagerMemorySize = clusterSpecification.getTaskManagerMemoryMB();
long cutoff = ResourceManagerRuntimeServices.calculateCutoffMB(flinkConfiguration, taskManagerMemorySize);
TaskManagerServices.calculateHeapSizeMB(taskManagerMemorySize - cutoff, flinkConfiguration);
long cutoff;
try {
// We do the validation by calling the calculation methods here
cutoff = ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfiguration, taskManagerMemorySize);
} catch (IllegalArgumentException cutoffConfigurationInvalidEx) {
throw new IllegalConfigurationException("Configurations related to cutoff checked failed.", cutoffConfigurationInvalidEx);
}

try {
// We do the validation by calling the calculation methods here
TaskManagerServices.calculateHeapSizeMB(taskManagerMemorySize - cutoff, flinkConfiguration);
} catch (IllegalArgumentException heapSizeConfigurationInvalidEx) {
throw new IllegalConfigurationException("Configurations related to heap size checked failed.", heapSizeConfigurationInvalidEx);
}
}

/**
Expand All @@ -432,7 +448,7 @@ protected ClusterClient<ApplicationId> deployInternal(
boolean detached) throws Exception {

// ------------------ Check if configuration is valid --------------------
checkConfig(clusterSpecification);
validateClusterSpecification(clusterSpecification);

if (UserGroupInformation.isSecurityEnabled()) {
// note: UGI::hasKerberosCredentials inaccurately reports false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.security.SecurityConfiguration;
Expand Down

0 comments on commit f6e1105

Please sign in to comment.