Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,11 @@
import static org.apache.flink.configuration.TaskManagerOptions.NETWORK_MEMORY_MAX;
import static org.apache.flink.configuration.TaskManagerOptions.TASK_HEAP_MEMORY;
import static org.apache.flink.configuration.TaskManagerOptions.TASK_OFF_HEAP_MEMORY;
import static org.apache.flink.configuration.TaskManagerOptions.TOTAL_FLINK_MEMORY;
import static org.apache.flink.configuration.TaskManagerOptions.TOTAL_PROCESS_MEMORY;
import static org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.calculateTotalFlinkMemoryFromComponents;
import static org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.calculateTotalProcessMemoryFromComponents;

/** TaskExecutorConfiguration collects the configuration of a TaskExecutor instance. */
public class TaskExecutorMemoryConfiguration implements Serializable {

public static final String FIELD_NAME_FRAMEWORK_HEAP = "frameworkHeap";
public static final String FIELD_NAME_TASK_HEAP = "taskHeap";

Expand Down Expand Up @@ -116,8 +115,8 @@ public static TaskExecutorMemoryConfiguration create(Configuration config) {
getConfigurationValue(config, MANAGED_MEMORY_SIZE),
getConfigurationValue(config, JVM_METASPACE),
getConfigurationValue(config, JVM_OVERHEAD_MAX),
getConfigurationValue(config, TOTAL_FLINK_MEMORY),
getConfigurationValue(config, TOTAL_PROCESS_MEMORY));
calculateTotalFlinkMemoryFromComponents(config),
calculateTotalProcessMemoryFromComponents(config));
}

@JsonCreator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -58,6 +59,10 @@ public class TaskExecutorResourceUtils {
TaskManagerOptions.JVM_OVERHEAD_MAX,
TaskManagerOptions.JVM_OVERHEAD_FRACTION);

private static final MemorySize LOCAL_EXECUTION_TASK_MEMORY =
MemorySize.ofMebiBytes(1024 * 1024);
private static final double LOCAL_EXECUTION_CPU_CORES = 1000000.0;

static final MemorySize DEFAULT_SHUFFLE_MEMORY_SIZE = MemorySize.parse("64m");
static final MemorySize DEFAULT_MANAGED_MEMORY_SIZE = MemorySize.parse("128m");

Expand Down Expand Up @@ -131,18 +136,72 @@ public static TaskExecutorResourceSpec resourceSpecFromConfigForLocalExecution(
return resourceSpecFromConfig(adjustForLocalExecution(config));
}

public static long calculateTotalFlinkMemoryFromComponents(Configuration config) {
Preconditions.checkArgument(config.contains(TaskManagerOptions.TASK_HEAP_MEMORY));
Preconditions.checkArgument(config.contains(TaskManagerOptions.TASK_OFF_HEAP_MEMORY));
Preconditions.checkArgument(config.contains(TaskManagerOptions.NETWORK_MEMORY_MAX));
Preconditions.checkArgument(config.contains(TaskManagerOptions.NETWORK_MEMORY_MIN));
Preconditions.checkArgument(config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE));
Preconditions.checkArgument(config.contains(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY));
Preconditions.checkArgument(config.contains(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY));
Preconditions.checkArgument(
config.get(TaskManagerOptions.NETWORK_MEMORY_MAX)
.equals(config.get(TaskManagerOptions.NETWORK_MEMORY_MIN)));
return config.get(TaskManagerOptions.TASK_HEAP_MEMORY)
.add(config.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY))
.add(config.get(TaskManagerOptions.NETWORK_MEMORY_MAX))
.add(config.get(TaskManagerOptions.MANAGED_MEMORY_SIZE))
.add(config.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY))
.add(config.get(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY))
.getBytes();
}

public static long calculateTotalProcessMemoryFromComponents(Configuration config) {
Preconditions.checkArgument(config.contains(TaskManagerOptions.JVM_METASPACE));
Preconditions.checkArgument(config.contains(TaskManagerOptions.JVM_OVERHEAD_MAX));
Preconditions.checkArgument(config.contains(TaskManagerOptions.JVM_OVERHEAD_MIN));
Preconditions.checkArgument(
config.get(TaskManagerOptions.JVM_OVERHEAD_MAX)
.equals(config.get(TaskManagerOptions.JVM_OVERHEAD_MIN)));
return calculateTotalFlinkMemoryFromComponents(config)
+ config.get(TaskManagerOptions.JVM_METASPACE)
.add(config.get(TaskManagerOptions.JVM_OVERHEAD_MAX))
.getBytes();
}

public static Configuration adjustForLocalExecution(Configuration config) {
UNUSED_CONFIG_OPTIONS.forEach(option -> warnOptionHasNoEffectIfSet(config, option));

setConfigOptionToPassedMaxIfNotSet(config, TaskManagerOptions.CPU_CORES, Double.MAX_VALUE);
setConfigOptionToPassedMaxIfNotSet(
config, TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.MAX_VALUE);
config, TaskManagerOptions.CPU_CORES, LOCAL_EXECUTION_CPU_CORES);
setConfigOptionToPassedMaxIfNotSet(
config, TaskManagerOptions.TASK_OFF_HEAP_MEMORY, MemorySize.MAX_VALUE);
config, TaskManagerOptions.TASK_HEAP_MEMORY, LOCAL_EXECUTION_TASK_MEMORY);
setConfigOptionToPassedMaxIfNotSet(
config, TaskManagerOptions.TASK_OFF_HEAP_MEMORY, LOCAL_EXECUTION_TASK_MEMORY);

adjustNetworkMemoryForLocalExecution(config);
setConfigOptionToDefaultIfNotSet(
config, TaskManagerOptions.MANAGED_MEMORY_SIZE, DEFAULT_MANAGED_MEMORY_SIZE);
silentlySetConfigOptionIfNotSet(
config,
TaskManagerOptions.FRAMEWORK_HEAP_MEMORY,
TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.defaultValue());
silentlySetConfigOptionIfNotSet(
config,
TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY,
TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY.defaultValue());
silentlySetConfigOptionIfNotSet(
config,
TaskManagerOptions.JVM_METASPACE,
TaskManagerOptions.JVM_METASPACE.defaultValue());
silentlySetConfigOptionIfNotSet(
config,
TaskManagerOptions.JVM_OVERHEAD_MAX,
TaskManagerOptions.JVM_OVERHEAD_MAX.defaultValue());
silentlySetConfigOptionIfNotSet(
config,
TaskManagerOptions.JVM_OVERHEAD_MIN,
TaskManagerOptions.JVM_OVERHEAD_MAX.defaultValue());

return config;
}
Expand Down Expand Up @@ -176,6 +235,13 @@ private static void warnOptionHasNoEffectIfSet(Configuration config, ConfigOptio
}
}

private static <T> void silentlySetConfigOptionIfNotSet(
Configuration config, ConfigOption<T> option, T value) {
if (!config.contains(option)) {
config.set(option, value);
}
}

private static <T> void setConfigOptionToDefaultIfNotSet(
Configuration config, ConfigOption<T> option, T defaultValue) {
setConfigOptionToDefaultIfNotSet(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public class ActiveResourceManagerTest extends TestLogger {
private static final Time TIMEOUT_TIME = Time.seconds(TIMEOUT_SEC);

private static final WorkerResourceSpec WORKER_RESOURCE_SPEC = WorkerResourceSpec.ZERO;
private static final TaskExecutorMemoryConfiguration TESTING_CONFIG =
new TaskExecutorMemoryConfiguration(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 21L, 36L);

/** Tests worker successfully requested, started and registered. */
@Test
Expand Down Expand Up @@ -643,7 +645,7 @@ CompletableFuture<RegistrationResponse> registerTaskExecutor(
1234,
23456,
new HardwareDescription(1, 2L, 3L, 4L),
TaskExecutorMemoryConfiguration.create(flinkConfig),
TESTING_CONFIG,
ResourceProfile.ZERO,
ResourceProfile.ZERO);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,58 +37,33 @@
import static org.apache.flink.configuration.TaskManagerOptions.NETWORK_MEMORY_MIN;
import static org.apache.flink.configuration.TaskManagerOptions.TASK_HEAP_MEMORY;
import static org.apache.flink.configuration.TaskManagerOptions.TASK_OFF_HEAP_MEMORY;
import static org.apache.flink.configuration.TaskManagerOptions.TOTAL_FLINK_MEMORY;
import static org.apache.flink.configuration.TaskManagerOptions.TOTAL_PROCESS_MEMORY;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;

/** Tests the initialization of TaskExecutorMemoryConfiguration. */
public class TaskExecutorMemoryConfigurationTest extends TestLogger {

@Test
public void testInitializationWithAllValuesBeingSet() {
public void testInitialization() {
Configuration config = new Configuration();

config.set(FRAMEWORK_HEAP_MEMORY, new MemorySize(1));
config.set(TASK_HEAP_MEMORY, new MemorySize(2));
config.set(FRAMEWORK_OFF_HEAP_MEMORY, new MemorySize(3));
config.set(TASK_OFF_HEAP_MEMORY, new MemorySize(4));
config.set(NETWORK_MEMORY_MIN, new MemorySize(5));
config.set(NETWORK_MEMORY_MIN, new MemorySize(6));
config.set(NETWORK_MEMORY_MAX, new MemorySize(6));
config.set(NETWORK_MEMORY_FRACTION, 0.1f);
config.set(MANAGED_MEMORY_SIZE, new MemorySize(7));
config.set(MANAGED_MEMORY_FRACTION, 0.2f);
config.set(JVM_METASPACE, new MemorySize(8));
config.set(JVM_OVERHEAD_MIN, new MemorySize(9));
config.set(JVM_OVERHEAD_MIN, new MemorySize(10));
config.set(JVM_OVERHEAD_MAX, new MemorySize(10));
config.set(JVM_OVERHEAD_FRACTION, 0.3f);
config.set(TOTAL_FLINK_MEMORY, new MemorySize(11));
config.set(TOTAL_PROCESS_MEMORY, new MemorySize(12));

TaskExecutorMemoryConfiguration actual = TaskExecutorMemoryConfiguration.create(config);
TaskExecutorMemoryConfiguration expected =
new TaskExecutorMemoryConfiguration(1L, 2L, 3L, 4L, 6L, 7L, 8L, 10L, 11L, 12L);

assertThat(actual, is(expected));
}

@Test
public void testInitializationWithMissingValues() {
Configuration config = new Configuration();

TaskExecutorMemoryConfiguration actual = TaskExecutorMemoryConfiguration.create(config);
TaskExecutorMemoryConfiguration expected =
new TaskExecutorMemoryConfiguration(
FRAMEWORK_HEAP_MEMORY.defaultValue().getBytes(),
null,
FRAMEWORK_OFF_HEAP_MEMORY.defaultValue().getBytes(),
TASK_OFF_HEAP_MEMORY.defaultValue().getBytes(),
NETWORK_MEMORY_MAX.defaultValue().getBytes(),
null,
JVM_METASPACE.defaultValue().getBytes(),
JVM_OVERHEAD_MAX.defaultValue().getBytes(),
null,
null);
new TaskExecutorMemoryConfiguration(1L, 2L, 3L, 4L, 6L, 7L, 8L, 10L, 23L, 41L);

assertThat(actual, is(expected));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,68 @@ public void testNetworkMaxAdjustForLocalExecutionIfMinSet() {
assertThat(configuration.get(TaskManagerOptions.NETWORK_MEMORY_MAX), is(networkMemorySize));
}

@Test
public void testCalculateTotalFlinkMemoryWithAllFactorsBeingSet() {
Configuration config = new Configuration();

config.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, new MemorySize(1));
config.set(TaskManagerOptions.TASK_HEAP_MEMORY, new MemorySize(2));
config.set(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY, new MemorySize(3));
config.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, new MemorySize(4));
config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, new MemorySize(6));
config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, new MemorySize(6));
config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, new MemorySize(7));

assertThat(
TaskExecutorResourceUtils.calculateTotalFlinkMemoryFromComponents(config), is(23L));
}

@Test(expected = IllegalArgumentException.class)
public void testCalculateTotalFlinkMemoryWithMissingFactors() {
Configuration config = new Configuration();

config.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, new MemorySize(1));
config.set(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY, new MemorySize(3));
config.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, new MemorySize(4));
config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, new MemorySize(7));

TaskExecutorResourceUtils.calculateTotalFlinkMemoryFromComponents(config);
}

@Test
public void testCalculateTotalProcessMemoryWithAllFactorsBeingSet() {
Configuration config = new Configuration();

config.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, new MemorySize(1));
config.set(TaskManagerOptions.TASK_HEAP_MEMORY, new MemorySize(2));
config.set(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY, new MemorySize(3));
config.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, new MemorySize(4));
config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, new MemorySize(6));
config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, new MemorySize(6));
config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, new MemorySize(7));
config.set(TaskManagerOptions.JVM_METASPACE, new MemorySize(8));
config.set(TaskManagerOptions.JVM_OVERHEAD_MAX, new MemorySize(10));
config.set(TaskManagerOptions.JVM_OVERHEAD_MIN, new MemorySize(10));

assertThat(
TaskExecutorResourceUtils.calculateTotalProcessMemoryFromComponents(config),
is(41L));
}

@Test(expected = IllegalArgumentException.class)
public void testCalculateTotalProcessMemoryWithMissingFactors() {
Configuration config = new Configuration();

config.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, new MemorySize(1));
config.set(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY, new MemorySize(3));
config.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, new MemorySize(4));
config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, new MemorySize(6));
config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, new MemorySize(7));
config.set(TaskManagerOptions.JVM_METASPACE, new MemorySize(8));

TaskExecutorResourceUtils.calculateTotalProcessMemoryFromComponents(config);
}

private static Configuration createValidConfig() {
Configuration configuration = new Configuration();
configuration.set(TaskManagerOptions.CPU_CORES, CPU_CORES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ public void setup() throws IOException {
new BlobCacheService(new Configuration(), new VoidBlobStore(), null);

configuration = new Configuration();
TaskExecutorResourceUtils.adjustForLocalExecution(configuration);

unresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
jobId = new JobID();
Expand Down Expand Up @@ -1808,14 +1809,17 @@ public void testDisconnectFromJobMasterWhenNewLeader() throws Exception {
@Test(timeout = 10000L)
public void testLogNotFoundHandling() throws Throwable {
final int dataPort = NetUtils.getAvailablePort();
Configuration config = new Configuration();
config.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, dataPort);
config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
configuration.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, dataPort);
configuration.setInteger(
NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
configuration.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");

try (TaskSubmissionTestEnvironment env =
new Builder(jobId).setConfiguration(config).setLocalCommunication(false).build()) {
new Builder(jobId)
.setConfiguration(configuration)
.setLocalCommunication(false)
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
try {
CompletableFuture<TransientBlobKey> logFuture =
Expand All @@ -1831,7 +1835,8 @@ public void testLogNotFoundHandling() throws Throwable {

@Test(timeout = 10000L)
public void testTerminationOnFatalError() throws Throwable {
try (TaskSubmissionTestEnvironment env = new Builder(jobId).build()) {
try (TaskSubmissionTestEnvironment env =
new Builder(jobId).setConfiguration(configuration).build()) {
String testExceptionMsg = "Test exception of fatal error.";

env.getTaskExecutor().onFatalError(new Exception(testExceptionMsg));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.util.BlobServerResource;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
Expand Down Expand Up @@ -333,6 +334,7 @@ public static void main(String[] args) {
Configuration cfg = parameterTool.getConfiguration();
final PluginManager pluginManager =
PluginUtils.createPluginManagerFromRootFolder(cfg);
TaskExecutorResourceUtils.adjustForLocalExecution(cfg);

TaskManagerRunner.runTaskManager(cfg, pluginManager);
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.DispatcherProcess;
Expand Down Expand Up @@ -268,6 +269,7 @@ public void testDispatcherProcessFailure() throws Exception {
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
config.set(TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.parse("128m"));
config.set(TaskManagerOptions.CPU_CORES, 1.0);
TaskExecutorResourceUtils.adjustForLocalExecution(config);

final RpcService rpcService =
AkkaRpcServiceUtils.remoteServiceBuilder(config, "localhost", 0).createAndStart();
Expand Down