Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/deployment/resource-providers/yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,11 @@ Once a HA service is configured, it will persist JobManager metadata and perform

YARN is taking care of restarting failed JobManagers. The maximum number of JobManager restarts is defined through two configuration parameters. First Flink's [yarn.application-attempts]({% link deployment/config.md %}#yarn-application-attempts) configuration will default 2. This value is limited by YARN's [yarn.resourcemanager.am.max-attempts](https://hadoop.apache.org/docs/r2.4.1/hadoop-yarn/hadoop-yarn-common/yarn-default.xml), which also defaults to 2.

Note that Flink is managing the `high-availability.cluster-id` configuration parameter when running on YARN. **You should not overwrite this parameter when running an HA cluster on YARN**. The cluster ID is used to distinguish multiple HA clusters in the HA backend (for example Zookeeper). Overwriting this configuration parameter can lead to multiple YARN clusters affecting each other.
Note that Flink is managing the `high-availability.cluster-id` configuration parameter when deploying on YARN.
Flink sets it per default to the YARN application id.
**You should not overwrite this parameter when deploying an HA cluster on YARN**.
The cluster ID is used to distinguish multiple HA clusters in the HA backend (for example Zookeeper).
Overwriting this configuration parameter can lead to multiple YARN clusters affecting each other.

#### Container Shutdown Behaviour

Expand Down
6 changes: 5 additions & 1 deletion docs/deployment/resource-providers/yarn.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,11 @@ Once a HA service is configured, it will persist JobManager metadata and perform

YARN is taking care of restarting failed JobManagers. The maximum number of JobManager restarts is defined through two configuration parameters. First Flink's [yarn.application-attempts]({% link deployment/config.zh.md %}#yarn-application-attempts) configuration will default 2. This value is limited by YARN's [yarn.resourcemanager.am.max-attempts](https://hadoop.apache.org/docs/r2.4.1/hadoop-yarn/hadoop-yarn-common/yarn-default.xml), which also defaults to 2.

Note that Flink is managing the `high-availability.cluster-id` configuration parameter when running on YARN. **You should not overwrite this parameter when running an HA cluster on YARN**. The cluster ID is used to distinguish multiple HA clusters in the HA backend (for example Zookeeper). Overwriting this configuration parameter can lead to multiple YARN clusters affecting each other.
Note that Flink is managing the `high-availability.cluster-id` configuration parameter when deploying on YARN.
Flink sets it per default to the YARN application id.
**You should not overwrite this parameter when deploying an HA cluster on YARN**.
The cluster ID is used to distinguish multiple HA clusters in the HA backend (for example Zookeeper).
Overwriting this configuration parameter can lead to multiple YARN clusters affecting each other.

#### Container Shutdown Behaviour

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import java.util.function.Predicate;

import static org.apache.flink.util.Preconditions.checkState;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -209,6 +210,41 @@ public void testJobRecoversAfterKillingTaskManager() throws Exception {
});
}

/**
* Tests that we can retrieve an HA enabled cluster by only specifying the application id if no
* other high-availability.cluster-id has been configured. See FLINK-20866.
*/
@Test
public void testClusterClientRetrieval() throws Exception {
runTest(
() -> {
final YarnClusterDescriptor yarnClusterDescriptor =
setupYarnClusterDescriptor();
final RestClusterClient<ApplicationId> restClusterClient =
deploySessionCluster(yarnClusterDescriptor);

ClusterClient<ApplicationId> newClusterClient = null;
try {
final ApplicationId clusterId = restClusterClient.getClusterId();

final YarnClusterDescriptor newClusterDescriptor =
setupYarnClusterDescriptor();
newClusterClient =
newClusterDescriptor.retrieve(clusterId).getClusterClient();

assertThat(newClusterClient.listJobs().join(), is(empty()));

newClusterClient.shutDownCluster();
} finally {
restClusterClient.close();

if (newClusterClient != null) {
newClusterClient.close();
}
}
});
}

private void waitForApplicationAttempt(final ApplicationId applicationId, final int attemptId)
throws Exception {
final YarnClient yarnClient = getYarnClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {

private final String applicationType;

private String zookeeperNamespace;

private YarnConfigOptions.UserJarInclusion userJarInclusion;

public YarnClusterDescriptor(
Expand Down Expand Up @@ -183,10 +181,6 @@ public YarnClusterDescriptor(
this.customName = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_NAME);
this.applicationType = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TYPE);
this.nodeLabel = flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL);

// we want to ignore the default value at this point.
this.zookeeperNamespace =
flinkConfiguration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, null);
}

private Optional<List<File>> decodeFilesToShipToCluster(
Expand Down Expand Up @@ -358,14 +352,6 @@ private void isReadyForDeployment(ClusterSpecification clusterSpecification) thr
}
}

public String getZookeeperNamespace() {
return zookeeperNamespace;
}

private void setZookeeperNamespace(String zookeeperNamespace) {
this.zookeeperNamespace = zookeeperNamespace;
}

public String getNodeLabel() {
return nodeLabel;
}
Expand Down Expand Up @@ -824,17 +810,7 @@ private ApplicationReport startAppMaster(
final ApplicationId appId = appContext.getApplicationId();

// ------------------ Add Zookeeper namespace to local flinkConfiguraton ------
String zkNamespace = getZookeeperNamespace();
// no user specified cli argument for namespace?
if (zkNamespace == null || zkNamespace.isEmpty()) {
// namespace defined in config? else use applicationId as default.
zkNamespace =
configuration.getString(
HighAvailabilityOptions.HA_CLUSTER_ID, String.valueOf(appId));
setZookeeperNamespace(zkNamespace);
}

configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
setHAClusterIdIfNotSet(configuration, appId);

if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
// activate re-execution of failed applications
Expand Down Expand Up @@ -1133,7 +1109,6 @@ private ApplicationReport startAppMaster(
YarnConfigKeys.ENV_CLIENT_SHIP_FILES,
encodeYarnLocalResourceDescriptorListToString(
fileUploader.getEnvShipResourceList()));
appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace());
appMasterEnv.put(
YarnConfigKeys.FLINK_YARN_FILES,
fileUploader.getApplicationDir().toUri().toString());
Expand Down Expand Up @@ -1767,20 +1742,29 @@ private static boolean isUsrLibDirIncludedInShipFiles(List<File> shipFiles) {
private void setClusterEntrypointInfoToConfig(final ApplicationReport report) {
checkNotNull(report);

final ApplicationId clusterId = report.getApplicationId();
final ApplicationId appId = report.getApplicationId();
final String host = report.getHost();
final int port = report.getRpcPort();

LOG.info("Found Web Interface {}:{} of application '{}'.", host, port, clusterId);
LOG.info("Found Web Interface {}:{} of application '{}'.", host, port, appId);

flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
flinkConfiguration.setInteger(JobManagerOptions.PORT, port);

flinkConfiguration.setString(RestOptions.ADDRESS, host);
flinkConfiguration.setInteger(RestOptions.PORT, port);

flinkConfiguration.set(
YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(clusterId));
flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(appId));

setHAClusterIdIfNotSet(flinkConfiguration, appId);
}

private void setHAClusterIdIfNotSet(Configuration configuration, ApplicationId appId) {
// set cluster-id to app id if not specified
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe rename the clusterId variable to appId

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jup, good idea.

if (!configuration.contains(HighAvailabilityOptions.HA_CLUSTER_ID)) {
configuration.set(
HighAvailabilityOptions.HA_CLUSTER_ID, ConverterUtils.toString(appId));
}
}

public static void logDetachedClusterInformation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public class YarnConfigKeys {
public static final String LOCAL_KEYTAB_PATH = "_LOCAL_KEYTAB_PATH";
public static final String KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL";
public static final String ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME";
public static final String ENV_ZOOKEEPER_NAMESPACE = "_ZOOKEEPER_NAMESPACE";

public static final String ENV_KRB5_PATH = "_KRB5_PATH";
public static final String ENV_YARN_SITE_XML_PATH = "_YARN_SITE_XML_PATH";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.RestOptions;
Expand Down Expand Up @@ -58,8 +57,6 @@ public static Configuration loadConfiguration(

final String keytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);

final String zooKeeperNamespace = env.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE);

final String hostname = env.get(ApplicationConstants.Environment.NM_HOST.key());
Preconditions.checkState(
hostname != null,
Expand All @@ -69,10 +66,6 @@ public static Configuration loadConfiguration(
configuration.setString(JobManagerOptions.ADDRESS, hostname);
configuration.setString(RestOptions.ADDRESS, hostname);

if (zooKeeperNamespace != null) {
configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zooKeeperNamespace);
}

// if a web monitor shall be started, set the port to random binding
if (configuration.getInteger(WebOptions.PORT, 0) >= 0) {
configuration.setInteger(WebOptions.PORT, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,9 @@ public void testZookeeperNamespaceProperty() throws Exception {
CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);

Configuration executorConfig = yarnCLI.toConfiguration(commandLine);
ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
YarnClusterDescriptor descriptor =
(YarnClusterDescriptor) clientFactory.createClusterDescriptor(executorConfig);

assertEquals(zkNamespaceCliInput, descriptor.getZookeeperNamespace());
assertThat(
executorConfig.get(HighAvailabilityOptions.HA_CLUSTER_ID), is(zkNamespaceCliInput));
}

@Test
Expand Down