From 4ed286d2f828a5454a7c29634f30487f6fb0f6d0 Mon Sep 17 00:00:00 2001 From: "shuai.xus" Date: Wed, 23 Nov 2016 18:00:07 +0800 Subject: [PATCH 1/3] [FLINK-4929] [yarn] Implement FLIP-6 YARN TaskExecutor Runner Summary: Implement FLIP-6 YARN TaskExecutor Runner Test Plan: NA Reviewers: biao.liub Differential Revision: http://phabricator.taobao.net/D6564 --- .../flink/yarn/YarnTaskExecutorRunner.java | 384 ++++++++++++++++++ 1 file changed, 384 insertions(+) create mode 100644 flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java new file mode 100644 index 0000000000000..86b1019f11172 --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; +import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration; +import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; +import java.io.File; +import java.net.InetAddress; +import java.util.Map; + +/** + * This class is the executable entry point for running a TaskExecutor in a YARN container. + */ +public class YarnTaskExecutorRunner implements FatalErrorHandler { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnTaskExecutorRunner.class); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The lock to guard startup / shutdown / manipulation methods */ + private final Object lock = new Object(); + + @GuardedBy("lock") + private MetricRegistry metricRegistry; + + @GuardedBy("lock") + private HighAvailabilityServices haServices; + + @GuardedBy("lock") + private RpcService taskExecutorRpcService; + + @GuardedBy("lock") + private TaskExecutor taskExecutor; + + /** Flag marking the task executor runner as started/running */ + private volatile boolean running; + + // ------------------------------------------------------------------------ + // Program entry point + // ------------------------------------------------------------------------ + + /** + * The entry point for the YARN task executor. + * + * @param args The command line arguments. + */ + public static void main(String[] args) { + EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskExecutor runner", args); + SignalHandler.register(LOG); + JvmShutdownSafeguard.installAsShutdownHook(LOG); + + // run and exit with the proper return code + int returnCode = new YarnTaskExecutorRunner().run(args); + System.exit(returnCode); + } + + /** + * The instance entry point for the YARN task executor. Obtains user group + * information and calls the main work method {@link #runTaskExecutor(org.apache.flink.configuration.Configuration)} as a + * privileged action. + * + * @param args The command line arguments. + * @return The process exit code. + */ + protected int run(String[] args) { + try { + LOG.debug("All environment variables: {}", ENV); + + final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); + final String localDirs = ENV.get(Environment.LOCAL_DIRS.key()); + LOG.info("Current working/local Directory: {}", localDirs); + + final String currDir = ENV.get(Environment.PWD.key()); + LOG.info("Current working Directory: {}", currDir); + + final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); + LOG.info("TM: remoteKeytabPath obtained {}", remoteKeytabPath); + + final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + LOG.info("TM: remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal); + + final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir); + FileSystem.setDefaultScheme(configuration); + + // configure local directory + String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null); + if (flinkTempDirs == null) { + LOG.info("Setting directories for temporary file " + localDirs); + configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, localDirs); + } + else { + LOG.info("Overriding YARN's temporary file directories with those " + + "specified in the Flink config: " + flinkTempDirs); + } + + // tell akka to die in case of an error + configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true); + + String keytabPath = null; + if(remoteKeytabPath != null) { + File f = new File(currDir, Utils.KEYTAB_FILE_NAME); + keytabPath = f.getAbsolutePath(); + LOG.info("keytabPath: {}", keytabPath); + } + + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + + LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}", + currentUser.getShortUserName(), yarnClientUsername); + + SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration(); + + //To support Yarn Secure Integration Test Scenario + File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME); + if(krb5Conf.exists() && krb5Conf.canRead()) { + String krb5Path = krb5Conf.getAbsolutePath(); + LOG.info("KRB5 Conf: {}", krb5Path); + org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); + sc.setHadoopConfiguration(conf); + } + + if(keytabPath != null && remoteKeytabPrincipal != null) { + configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath); + configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal); + } + + SecurityContext.install(sc.setFlinkConfiguration(configuration)); + + return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner() { + @Override + public Integer run() { + return runTaskExecutor(configuration); + } + }); + + } + catch (Throwable t) { + // make sure that everything whatever ends up in the log + LOG.error("YARN Application Master initialization failed", t); + return INIT_ERROR_EXIT_CODE; + } + } + + // ------------------------------------------------------------------------ + // Core work method + // ------------------------------------------------------------------------ + + /** + * The main work method, must run as a privileged action. + * + * @return The return code for the Java process. + */ + protected int runTaskExecutor(Configuration config) { + + try { + // ---- (1) create common services + // first get the ResouceId, resource id is the container id for yarn. + final String containerId = ENV.get(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID); + require(containerId != null, + "ContainerId variable %s not set", YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID); + + ResourceID resourceID = new ResourceID(containerId); + LOG.info("YARN assigned resource id {} for the task executor.", resourceID.toString()); + + synchronized (lock) { + haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config); + metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); + + // ---- (2) init resource manager ------- + taskExecutorRpcService = createRpcService(config); + taskExecutor = createTaskExecutor(config, resourceID); + + // ---- (3) start the task executor + taskExecutor.start(); + LOG.debug("YARN task executor started"); + } + running = true; + while (running) { + Thread.sleep(100); + } + // everything started, we can wait until all is done or the process is killed + LOG.info("YARN task executor runner finished"); + } + catch (Throwable t) { + // make sure that everything whatever ends up in the log + LOG.error("YARN task executor initialization failed", t); + shutdown(); + return INIT_ERROR_EXIT_CODE; + } + + return 0; + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + /** + * Validates a condition, throwing a RuntimeException if the condition is violated. + * + * @param condition The condition. + * @param message The message for the runtime exception, with format variables as defined by + * {@link String#format(String, Object...)}. + * @param values The format arguments. + */ + private static void require(boolean condition, String message, Object... values) { + if (!condition) { + throw new RuntimeException(String.format(message, values)); + } + } + protected RpcService createRpcService( + Configuration configuration) throws Exception{ + FiniteDuration duration = AkkaUtils.getTimeout(configuration); + + String taskExecutorHostname = ENV.get(YarnResourceManager.ENV_FLINK_NODE_ID);//TODO + if (taskExecutorHostname != null) { + LOG.info("Using configured hostname/address for TaskManager: " + taskExecutorHostname); + } + else { + LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration); + InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress( + leaderRetrievalService, + duration); + taskExecutorHostname = taskManagerAddress.getHostName(); + LOG.info("TaskExecutor will use hostname/address {} for communication.", taskExecutorHostname); + } + + // if no task manager port has been configured, use 0 (system will pick any free port) + int taskExecutorPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0); + if (taskExecutorPort < 0 || taskExecutorPort > 65535) { + throw new IllegalConfigurationException("Invalid value for '" + + ConfigConstants.TASK_MANAGER_IPC_PORT_KEY + + "' (port for the TaskManager actor system) : " + taskExecutorPort + + " - Leave config parameter empty or use 0 to let the system choose a port automatically."); + } + + ActorSystem actorSystem = AkkaUtils.createActorSystem(configuration, taskExecutorHostname, taskExecutorPort); + return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit())); + } + + private TaskExecutor createTaskExecutor(Configuration config, ResourceID resourceID) throws Exception { + + InetAddress remoteAddress = InetAddress.getByName(taskExecutorRpcService.getAddress()); + + TaskManagerServicesConfiguration taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration( + config, + remoteAddress, + false); + + TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( + taskManagerServicesConfiguration, + resourceID); + + TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(config); + + TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup( + metricRegistry, + taskManagerServices.getTaskManagerLocation().getHostname(), + resourceID.toString()); + + // Initialize the TM metrics + TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment()); + + return new TaskExecutor( + taskManagerConfiguration, + taskManagerServices.getTaskManagerLocation(), + taskExecutorRpcService, + taskManagerServices.getMemoryManager(), + taskManagerServices.getIOManager(), + taskManagerServices.getNetworkEnvironment(), + haServices, + metricRegistry, + taskManagerMetricGroup, + taskManagerServices.getBroadcastVariableManager(), + taskManagerServices.getFileCache(), + taskManagerServices.getTaskSlotTable(), + taskManagerServices.getJobManagerTable(), + taskManagerServices.getJobLeaderService(), + this); + } + + protected void shutdown() { + synchronized (lock) { + if (taskExecutor != null) { + try { + taskExecutor.shutDown(); + } catch (Throwable tt) { + LOG.warn("Failed to stop the JobMaster", tt); + } + } + if (taskExecutorRpcService != null) { + try { + taskExecutorRpcService.stopService(); + } catch (Throwable tt) { + LOG.error("Error shutting down job master rpc service", tt); + } + } + if (haServices != null) { + try { + haServices.shutdown(); + } catch (Throwable tt) { + LOG.warn("Failed to stop the HA service", tt); + } + } + if (metricRegistry != null) { + try { + metricRegistry.shutdown(); + } catch (Throwable tt) { + LOG.warn("Failed to stop the metrics registry", tt); + } + } + } + running = false; + } + + //------------------------------------------------------------------------------------- + // Fatal error handler + //------------------------------------------------------------------------------------- + + @Override + public void onFatalError(Throwable exception) { + LOG.error("Encountered fatal error.", exception); + + shutdown(); + } + +} From bbed331180ade5303c976104cd262ef2bdfc433a Mon Sep 17 00:00:00 2001 From: "shuai.xus" Date: Fri, 25 Nov 2016 15:57:20 +0800 Subject: [PATCH 2/3] [FLINK-4929] [yarn] refine some logs and change according to reviewer's comments --- .../flink/yarn/YarnTaskExecutorRunner.java | 56 +++++-------------- 1 file changed, 13 insertions(+), 43 deletions(-) diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java index 86b1019f11172..99fcf8bf944d8 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java @@ -46,6 +46,7 @@ import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.util.Preconditions; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; @@ -87,9 +88,6 @@ public class YarnTaskExecutorRunner implements FatalErrorHandler { @GuardedBy("lock") private TaskExecutor taskExecutor; - /** Flag marking the task executor runner as started/running */ - private volatile boolean running; - // ------------------------------------------------------------------------ // Program entry point // ------------------------------------------------------------------------ @@ -129,10 +127,10 @@ protected int run(String[] args) { LOG.info("Current working Directory: {}", currDir); final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); - LOG.info("TM: remoteKeytabPath obtained {}", remoteKeytabPath); + LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath); final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - LOG.info("TM: remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal); + LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir); FileSystem.setDefaultScheme(configuration); @@ -155,7 +153,7 @@ protected int run(String[] args) { if(remoteKeytabPath != null) { File f = new File(currDir, Utils.KEYTAB_FILE_NAME); keytabPath = f.getAbsolutePath(); - LOG.info("keytabPath: {}", keytabPath); + LOG.info("keytab path: {}", keytabPath); } UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); @@ -213,7 +211,7 @@ protected int runTaskExecutor(Configuration config) { // ---- (1) create common services // first get the ResouceId, resource id is the container id for yarn. final String containerId = ENV.get(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID); - require(containerId != null, + Preconditions.checkArgument(containerId != null, "ContainerId variable %s not set", YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID); ResourceID resourceID = new ResourceID(containerId); @@ -231,10 +229,8 @@ protected int runTaskExecutor(Configuration config) { taskExecutor.start(); LOG.debug("YARN task executor started"); } - running = true; - while (running) { - Thread.sleep(100); - } + + taskExecutor.getTerminationFuture().get(); // everything started, we can wait until all is done or the process is killed LOG.info("YARN task executor runner finished"); } @@ -252,35 +248,11 @@ protected int runTaskExecutor(Configuration config) { // Utilities // ------------------------------------------------------------------------ - /** - * Validates a condition, throwing a RuntimeException if the condition is violated. - * - * @param condition The condition. - * @param message The message for the runtime exception, with format variables as defined by - * {@link String#format(String, Object...)}. - * @param values The format arguments. - */ - private static void require(boolean condition, String message, Object... values) { - if (!condition) { - throw new RuntimeException(String.format(message, values)); - } - } - protected RpcService createRpcService( + private RpcService createRpcService( Configuration configuration) throws Exception{ - FiniteDuration duration = AkkaUtils.getTimeout(configuration); - String taskExecutorHostname = ENV.get(YarnResourceManager.ENV_FLINK_NODE_ID);//TODO - if (taskExecutorHostname != null) { - LOG.info("Using configured hostname/address for TaskManager: " + taskExecutorHostname); - } - else { - LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration); - InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress( - leaderRetrievalService, - duration); - taskExecutorHostname = taskManagerAddress.getHostName(); - LOG.info("TaskExecutor will use hostname/address {} for communication.", taskExecutorHostname); - } + Preconditions.checkArgument(taskExecutorHostname != null, + "taskExecutorHostname variable %s not set", YarnResourceManager.ENV_FLINK_NODE_ID); // if no task manager port has been configured, use 0 (system will pick any free port) int taskExecutorPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0); @@ -291,17 +263,16 @@ protected RpcService createRpcService( " - Leave config parameter empty or use 0 to let the system choose a port automatically."); } - ActorSystem actorSystem = AkkaUtils.createActorSystem(configuration, taskExecutorHostname, taskExecutorPort); - return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit())); + return RpcServiceUtils.createRpcService(taskExecutorHostname, taskExecutorPort, configuration); } private TaskExecutor createTaskExecutor(Configuration config, ResourceID resourceID) throws Exception { - InetAddress remoteAddress = InetAddress.getByName(taskExecutorRpcService.getAddress()); + InetAddress taskExecutorAddress = InetAddress.getByName(taskExecutorRpcService.getAddress()); TaskManagerServicesConfiguration taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration( config, - remoteAddress, + taskExecutorAddress, false); TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( @@ -367,7 +338,6 @@ protected void shutdown() { } } } - running = false; } //------------------------------------------------------------------------------------- From d0ddfe3d1863cf2f0dcf7827b4cc39c7efa528f5 Mon Sep 17 00:00:00 2001 From: "shuai.xus" Date: Mon, 28 Nov 2016 16:23:10 +0800 Subject: [PATCH 3/3] [FLINK-4929] [yarn] change to use TaskManagerRunner instead of TaskExecutor --- .../taskexecutor/TaskManagerRunner.java | 6 + .../flink/yarn/YarnTaskExecutorRunner.java | 611 ++++++++---------- 2 files changed, 263 insertions(+), 354 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 99a7c5d83183b..245ad2264683b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; @@ -139,6 +140,11 @@ protected void shutDownInternally() { } } + // export the termination future for caller to know it is terminated + public Future getTerminationFuture() { + return taskManager.getTerminationFuture(); + } + // -------------------------------------------------------------------------------------------- // FatalErrorHandler methods // -------------------------------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java index 99fcf8bf944d8..d9912eb8a7d0c 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java @@ -1,354 +1,257 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn; - -import akka.actor.ActorSystem; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; -import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; -import org.apache.flink.runtime.rpc.FatalErrorHandler; -import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.akka.AkkaRpcService; -import org.apache.flink.runtime.security.SecurityContext; -import org.apache.flink.runtime.taskexecutor.TaskExecutor; -import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration; -import org.apache.flink.runtime.taskexecutor.TaskManagerServices; -import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration; -import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer; -import org.apache.flink.runtime.util.EnvironmentInformation; -import org.apache.flink.runtime.util.JvmShutdownSafeguard; -import org.apache.flink.runtime.util.LeaderRetrievalUtils; -import org.apache.flink.runtime.util.SignalHandler; -import org.apache.flink.util.Preconditions; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.duration.FiniteDuration; - -import javax.annotation.concurrent.GuardedBy; -import java.io.File; -import java.net.InetAddress; -import java.util.Map; - -/** - * This class is the executable entry point for running a TaskExecutor in a YARN container. - */ -public class YarnTaskExecutorRunner implements FatalErrorHandler { - - /** Logger */ - protected static final Logger LOG = LoggerFactory.getLogger(YarnTaskExecutorRunner.class); - - /** The process environment variables */ - private static final Map ENV = System.getenv(); - - /** The exit code returned if the initialization of the application master failed */ - private static final int INIT_ERROR_EXIT_CODE = 31; - - /** The lock to guard startup / shutdown / manipulation methods */ - private final Object lock = new Object(); - - @GuardedBy("lock") - private MetricRegistry metricRegistry; - - @GuardedBy("lock") - private HighAvailabilityServices haServices; - - @GuardedBy("lock") - private RpcService taskExecutorRpcService; - - @GuardedBy("lock") - private TaskExecutor taskExecutor; - - // ------------------------------------------------------------------------ - // Program entry point - // ------------------------------------------------------------------------ - - /** - * The entry point for the YARN task executor. - * - * @param args The command line arguments. - */ - public static void main(String[] args) { - EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskExecutor runner", args); - SignalHandler.register(LOG); - JvmShutdownSafeguard.installAsShutdownHook(LOG); - - // run and exit with the proper return code - int returnCode = new YarnTaskExecutorRunner().run(args); - System.exit(returnCode); - } - - /** - * The instance entry point for the YARN task executor. Obtains user group - * information and calls the main work method {@link #runTaskExecutor(org.apache.flink.configuration.Configuration)} as a - * privileged action. - * - * @param args The command line arguments. - * @return The process exit code. - */ - protected int run(String[] args) { - try { - LOG.debug("All environment variables: {}", ENV); - - final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); - final String localDirs = ENV.get(Environment.LOCAL_DIRS.key()); - LOG.info("Current working/local Directory: {}", localDirs); - - final String currDir = ENV.get(Environment.PWD.key()); - LOG.info("Current working Directory: {}", currDir); - - final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); - LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath); - - final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - - final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir); - FileSystem.setDefaultScheme(configuration); - - // configure local directory - String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null); - if (flinkTempDirs == null) { - LOG.info("Setting directories for temporary file " + localDirs); - configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, localDirs); - } - else { - LOG.info("Overriding YARN's temporary file directories with those " + - "specified in the Flink config: " + flinkTempDirs); - } - - // tell akka to die in case of an error - configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true); - - String keytabPath = null; - if(remoteKeytabPath != null) { - File f = new File(currDir, Utils.KEYTAB_FILE_NAME); - keytabPath = f.getAbsolutePath(); - LOG.info("keytab path: {}", keytabPath); - } - - UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); - - LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}", - currentUser.getShortUserName(), yarnClientUsername); - - SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration(); - - //To support Yarn Secure Integration Test Scenario - File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME); - if(krb5Conf.exists() && krb5Conf.canRead()) { - String krb5Path = krb5Conf.getAbsolutePath(); - LOG.info("KRB5 Conf: {}", krb5Path); - org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); - conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); - conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); - sc.setHadoopConfiguration(conf); - } - - if(keytabPath != null && remoteKeytabPrincipal != null) { - configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath); - configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal); - } - - SecurityContext.install(sc.setFlinkConfiguration(configuration)); - - return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner() { - @Override - public Integer run() { - return runTaskExecutor(configuration); - } - }); - - } - catch (Throwable t) { - // make sure that everything whatever ends up in the log - LOG.error("YARN Application Master initialization failed", t); - return INIT_ERROR_EXIT_CODE; - } - } - - // ------------------------------------------------------------------------ - // Core work method - // ------------------------------------------------------------------------ - - /** - * The main work method, must run as a privileged action. - * - * @return The return code for the Java process. - */ - protected int runTaskExecutor(Configuration config) { - - try { - // ---- (1) create common services - // first get the ResouceId, resource id is the container id for yarn. - final String containerId = ENV.get(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID); - Preconditions.checkArgument(containerId != null, - "ContainerId variable %s not set", YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID); - - ResourceID resourceID = new ResourceID(containerId); - LOG.info("YARN assigned resource id {} for the task executor.", resourceID.toString()); - - synchronized (lock) { - haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config); - metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); - - // ---- (2) init resource manager ------- - taskExecutorRpcService = createRpcService(config); - taskExecutor = createTaskExecutor(config, resourceID); - - // ---- (3) start the task executor - taskExecutor.start(); - LOG.debug("YARN task executor started"); - } - - taskExecutor.getTerminationFuture().get(); - // everything started, we can wait until all is done or the process is killed - LOG.info("YARN task executor runner finished"); - } - catch (Throwable t) { - // make sure that everything whatever ends up in the log - LOG.error("YARN task executor initialization failed", t); - shutdown(); - return INIT_ERROR_EXIT_CODE; - } - - return 0; - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - private RpcService createRpcService( - Configuration configuration) throws Exception{ - String taskExecutorHostname = ENV.get(YarnResourceManager.ENV_FLINK_NODE_ID);//TODO - Preconditions.checkArgument(taskExecutorHostname != null, - "taskExecutorHostname variable %s not set", YarnResourceManager.ENV_FLINK_NODE_ID); - - // if no task manager port has been configured, use 0 (system will pick any free port) - int taskExecutorPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0); - if (taskExecutorPort < 0 || taskExecutorPort > 65535) { - throw new IllegalConfigurationException("Invalid value for '" + - ConfigConstants.TASK_MANAGER_IPC_PORT_KEY + - "' (port for the TaskManager actor system) : " + taskExecutorPort + - " - Leave config parameter empty or use 0 to let the system choose a port automatically."); - } - - return RpcServiceUtils.createRpcService(taskExecutorHostname, taskExecutorPort, configuration); - } - - private TaskExecutor createTaskExecutor(Configuration config, ResourceID resourceID) throws Exception { - - InetAddress taskExecutorAddress = InetAddress.getByName(taskExecutorRpcService.getAddress()); - - TaskManagerServicesConfiguration taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration( - config, - taskExecutorAddress, - false); - - TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( - taskManagerServicesConfiguration, - resourceID); - - TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(config); - - TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup( - metricRegistry, - taskManagerServices.getTaskManagerLocation().getHostname(), - resourceID.toString()); - - // Initialize the TM metrics - TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment()); - - return new TaskExecutor( - taskManagerConfiguration, - taskManagerServices.getTaskManagerLocation(), - taskExecutorRpcService, - taskManagerServices.getMemoryManager(), - taskManagerServices.getIOManager(), - taskManagerServices.getNetworkEnvironment(), - haServices, - metricRegistry, - taskManagerMetricGroup, - taskManagerServices.getBroadcastVariableManager(), - taskManagerServices.getFileCache(), - taskManagerServices.getTaskSlotTable(), - taskManagerServices.getJobManagerTable(), - taskManagerServices.getJobLeaderService(), - this); - } - - protected void shutdown() { - synchronized (lock) { - if (taskExecutor != null) { - try { - taskExecutor.shutDown(); - } catch (Throwable tt) { - LOG.warn("Failed to stop the JobMaster", tt); - } - } - if (taskExecutorRpcService != null) { - try { - taskExecutorRpcService.stopService(); - } catch (Throwable tt) { - LOG.error("Error shutting down job master rpc service", tt); - } - } - if (haServices != null) { - try { - haServices.shutdown(); - } catch (Throwable tt) { - LOG.warn("Failed to stop the HA service", tt); - } - } - if (metricRegistry != null) { - try { - metricRegistry.shutdown(); - } catch (Throwable tt) { - LOG.warn("Failed to stop the metrics registry", tt); - } - } - } - } - - //------------------------------------------------------------------------------------- - // Fatal error handler - //------------------------------------------------------------------------------------- - - @Override - public void onFatalError(Throwable exception) { - LOG.error("Encountered fatal error.", exception); - - shutdown(); - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Map; + +/** + * This class is the executable entry point for running a TaskExecutor in a YARN container. + */ +public class YarnTaskExecutorRunner { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnTaskExecutorRunner.class); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + /** The exit code returned if the initialization of the yarn task executor runner failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + private MetricRegistry metricRegistry; + + private HighAvailabilityServices haServices; + + private RpcService taskExecutorRpcService; + + private TaskManagerRunner taskManagerRunner; + + // ------------------------------------------------------------------------ + // Program entry point + // ------------------------------------------------------------------------ + + /** + * The entry point for the YARN task executor runner. + * + * @param args The command line arguments. + */ + public static void main(String[] args) { + EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskExecutor runner", args); + SignalHandler.register(LOG); + JvmShutdownSafeguard.installAsShutdownHook(LOG); + + // run and exit with the proper return code + int returnCode = new YarnTaskExecutorRunner().run(args); + System.exit(returnCode); + } + + /** + * The instance entry point for the YARN task executor. Obtains user group + * information and calls the main work method {@link #runTaskExecutor(org.apache.flink.configuration.Configuration)} as a + * privileged action. + * + * @param args The command line arguments. + * @return The process exit code. + */ + protected int run(String[] args) { + try { + LOG.debug("All environment variables: {}", ENV); + + final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); + final String localDirs = ENV.get(Environment.LOCAL_DIRS.key()); + LOG.info("Current working/local Directory: {}", localDirs); + + final String currDir = ENV.get(Environment.PWD.key()); + LOG.info("Current working Directory: {}", currDir); + + final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); + LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath); + + final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); + + final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir); + FileSystem.setDefaultScheme(configuration); + + // configure local directory + String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null); + if (flinkTempDirs == null) { + LOG.info("Setting directories for temporary file " + localDirs); + configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, localDirs); + } + else { + LOG.info("Overriding YARN's temporary file directories with those " + + "specified in the Flink config: " + flinkTempDirs); + } + + // tell akka to die in case of an error + configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true); + + String keytabPath = null; + if(remoteKeytabPath != null) { + File f = new File(currDir, Utils.KEYTAB_FILE_NAME); + keytabPath = f.getAbsolutePath(); + LOG.info("keytab path: {}", keytabPath); + } + + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + + LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}", + currentUser.getShortUserName(), yarnClientUsername); + + SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration(); + + //To support Yarn Secure Integration Test Scenario + File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME); + if(krb5Conf.exists() && krb5Conf.canRead()) { + String krb5Path = krb5Conf.getAbsolutePath(); + LOG.info("KRB5 Conf: {}", krb5Path); + org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); + sc.setHadoopConfiguration(conf); + } + + if(keytabPath != null && remoteKeytabPrincipal != null) { + configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath); + configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal); + } + + SecurityContext.install(sc.setFlinkConfiguration(configuration)); + + return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner() { + @Override + public Integer run() { + return runTaskExecutor(configuration); + } + }); + + } + catch (Throwable t) { + // make sure that everything whatever ends up in the log + LOG.error("YARN Application Master initialization failed", t); + return INIT_ERROR_EXIT_CODE; + } + } + + // ------------------------------------------------------------------------ + // Core work method + // ------------------------------------------------------------------------ + + /** + * The main work method, must run as a privileged action. + * + * @return The return code for the Java process. + */ + protected int runTaskExecutor(Configuration config) { + + try { + // ---- (1) create common services + // first get the ResouceId, resource id is the container id for yarn. + final String containerId = ENV.get(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID); + Preconditions.checkArgument(containerId != null, + "ContainerId variable %s not set", YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID); + // use the hostname passed by job manager + final String taskExecutorHostname = ENV.get(YarnResourceManager.ENV_FLINK_NODE_ID); + if (taskExecutorHostname != null) { + config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, taskExecutorHostname); + } + + ResourceID resourceID = new ResourceID(containerId); + LOG.info("YARN assigned resource id {} for the task executor.", resourceID.toString()); + + haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config); + metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); + + // ---- (2) init task manager runner ------- + taskExecutorRpcService = TaskManagerRunner.createRpcService(config, haServices); + taskManagerRunner = new TaskManagerRunner(config, resourceID, taskExecutorRpcService, haServices, metricRegistry); + + // ---- (3) start the task manager runner + taskManagerRunner.start(); + LOG.debug("YARN task executor started"); + + taskManagerRunner.getTerminationFuture().get(); + // everything started, we can wait until all is done or the process is killed + LOG.info("YARN task manager runner finished"); + shutdown(); + } + catch (Throwable t) { + // make sure that everything whatever ends up in the log + LOG.error("YARN task executor initialization failed", t); + shutdown(); + return INIT_ERROR_EXIT_CODE; + } + + return 0; + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + + protected void shutdown() { + if (taskExecutorRpcService != null) { + try { + taskExecutorRpcService.stopService(); + } catch (Throwable tt) { + LOG.error("Error shutting down job master rpc service", tt); + } + } + if (haServices != null) { + try { + haServices.shutdown(); + } catch (Throwable tt) { + LOG.warn("Failed to stop the HA service", tt); + } + } + if (metricRegistry != null) { + try { + metricRegistry.shutdown(); + } catch (Throwable tt) { + LOG.warn("Failed to stop the metrics registry", tt); + } + } + } + +}