Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-9028][flip6] perform parameters checking before starting cluster #5726

Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;

import java.util.HashMap;
Expand Down Expand Up @@ -101,7 +102,7 @@ public String toString() {
// ------------------------------------------------------------------------
// Factory
// ------------------------------------------------------------------------

/**
* Computes the parameters to be used to start a TaskManager Java process.
*
Expand All @@ -112,35 +113,11 @@ public String toString() {
public static ContaineredTaskManagerParameters create(
Configuration config, long containerMemoryMB, int numSlots)
{
// (1) compute how much memory we subtract from the total memory, to get the Java memory

final float memoryCutoffRatio = config.getFloat(
ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);

final int minCutoff = config.getInteger(
ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);

if (memoryCutoffRatio >= 1 || memoryCutoffRatio <= 0) {
throw new IllegalArgumentException("The configuration value '"
+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key() + "' must be between 0 and 1. Value given="
+ memoryCutoffRatio);
}

if (minCutoff >= containerMemoryMB) {
throw new IllegalArgumentException("The configuration value '"
+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key() + "'='" + minCutoff
+ "' is larger than the total container memory " + containerMemoryMB);
}

long cutoff = (long) (containerMemoryMB * memoryCutoffRatio);
if (cutoff < minCutoff) {
cutoff = minCutoff;
}

final long javaMemorySizeMB = containerMemoryMB - cutoff;
// (1) try to compute how much memory used by container
final long cutoffMB = ResourceManagerRuntimeServices.calculateCutoffMB(config, containerMemoryMB);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep this method in the ContaineredTaskManagerParameters class.


// (2) split the remaining Java memory between heap and off-heap
final long heapSizeMB = TaskManagerServices.calculateHeapSizeMB(javaMemorySizeMB, config);
final long heapSizeMB = TaskManagerServices.calculateHeapSizeMB(containerMemoryMB - cutoffMB, config);
// use the cut-off memory for off-heap (that was its intention)
final long offHeapSizeMB = containerMemoryMB - heapSizeMB;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout)
@Override
public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
if (jobManagerRunners.isEmpty()) {
System.out.println("empty");
log.info("empty");
}
return CompletableFuture.completedFuture(
Collections.unmodifiableSet(new HashSet<>(jobManagerRunners.keySet())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.runtime.resourcemanager;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
Expand Down Expand Up @@ -53,6 +55,46 @@ public void shutDown() throws Exception {

// -------------------- Static methods --------------------------------------

/**
* Check whether the config is valid, it will throw an exception if the config
* is invalid or return the cut off value.
*
* @param config The Flink configuration.
* @param containerMemoryMB The size of the complete container, in megabytes.
*
* @return cut off size used by container.
*/
public static long calculateCutoffMB(Configuration config, long containerMemoryMB) {
Preconditions.checkArgument(containerMemoryMB > 0);

// (1) check cutoff ratio
final float memoryCutoffRatio = config.getFloat(
ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);

if (memoryCutoffRatio >= 1 || memoryCutoffRatio <= 0) {
throw new IllegalArgumentException("The configuration value '"
+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key() + "' must be between 0 and 1. Value given="
+ memoryCutoffRatio);
}

// (2) check min cutoff value
final int minCutoff = config.getInteger(
ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);

if (minCutoff >= containerMemoryMB) {
throw new IllegalArgumentException("The configuration value '"
+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key() + "'='" + minCutoff
+ "' is larger than the total container memory " + containerMemoryMB);
}

// (3) check between heap and off-heap
long cutoff = (long) (containerMemoryMB * memoryCutoffRatio);
if (cutoff < minCutoff) {
cutoff = minCutoff;
}
return cutoff;
}

public static ResourceManagerRuntimeServices fromConfiguration(
ResourceManagerRuntimeServicesConfiguration configuration,
HighAvailabilityServices highAvailabilityServices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.resourcemanager;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.junit.Assert;
import org.junit.Test;

/**
* Tests to guard {@link ResourceManagerRuntimeServices}.
*/
public class ResourceManagerRuntimeServicesTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing extends TestLogger


/**
* Test to guard {@link ResourceManagerRuntimeServices#calculateCutoffMB(Configuration, long)}.
*/
@Test
public void calculateCutoffMB() throws Exception {

Configuration config = new Configuration();
long containerMemoryMB = 1000;

config.setFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.1f);
config.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 128);

Assert.assertEquals(128, ResourceManagerRuntimeServices.calculateCutoffMB(config, containerMemoryMB));

config.setFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.2f);
Assert.assertEquals(200, ResourceManagerRuntimeServices.calculateCutoffMB(config, containerMemoryMB));

config.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 1000);

try {
ResourceManagerRuntimeServices.calculateCutoffMB(config, containerMemoryMB);
} catch (Exception expected) {
// we expected it.
return;
}
Assert.fail();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,5 +214,4 @@ public void calculateHeapSizeMB() throws Exception {
config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 0.1f); // 10%
assertEquals(810, TaskManagerServices.calculateHeapSizeMB(1000, config));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
Expand Down Expand Up @@ -409,6 +411,12 @@ public void terminateCluster(ApplicationId applicationId) throws FlinkException
}
}

private void checkConfig(ClusterSpecification clusterSpecification) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should throw a checked exception. Something like InvalidConfigurationException or so. But not just fail with an IllegalStateException.

long taskManagerMemorySize = clusterSpecification.getTaskManagerMemoryMB();
long cutoff = ResourceManagerRuntimeServices.calculateCutoffMB(flinkConfiguration, taskManagerMemorySize);
TaskManagerServices.calculateHeapSizeMB(taskManagerMemorySize - cutoff, flinkConfiguration);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I somehow don't like that we calculate some value just in order to check whether an unchecked exception is thrown. This should be imo more explicit. Or at least it should get a comment explaining what's going on here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...This is a bit tricky, the calculation logic is a bit hard to be separated from the check logical so clearly, because when we perform the checking we also need to do the calculation by the way. So pull the check logical from the calculateFunction to be a separated function means we will have some
code duplication. So I'd like to add a comment here, if you still think we should pull out a new function I would just do it ;).

}

/**
* This method will block until the ApplicationMaster/JobManager have been deployed on YARN.
*
Expand All @@ -423,6 +431,9 @@ protected ClusterClient<ApplicationId> deployInternal(
@Nullable JobGraph jobGraph,
boolean detached) throws Exception {

// ------------------ Check if configuration is valid --------------------
checkConfig(clusterSpecification);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not abbreviate and name it validateClusterSpecification


if (UserGroupInformation.isSecurityEnabled()) {
// note: UGI::hasKerberosCredentials inaccurately reports false
// for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.security.SecurityConfiguration;
Expand Down Expand Up @@ -631,7 +632,7 @@ public int run(String[] args) throws CliArgsException, FlinkException {
if (detachedMode) {
LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
"yarn application -kill " + applicationId.getOpt());
"yarn application -kill " + yarnApplicationId);
} else {
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ public void testCommandLineClusterSpecification() throws Exception {
configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, 7331);
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);

final int jobManagerMemory = 42;
final int taskManagerMemory = 41;
final int jobManagerMemory = 1337;
final int taskManagerMemory = 7331;
final int slotsPerTaskManager = 30;
final String[] args = {"-yjm", String.valueOf(jobManagerMemory), "-ytm", String.valueOf(taskManagerMemory), "-ys", String.valueOf(slotsPerTaskManager)};
final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
Expand Down