diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 3064f8d1fcab8..a01ab5386a40f 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -18,8 +18,7 @@ package org.apache.flink.client; -import akka.actor.ActorSystem; - +import org.apache.commons.cli.CommandLine; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; @@ -30,6 +29,7 @@ import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CommandLineOptions; import org.apache.flink.client.cli.CustomCommandLine; +import org.apache.flink.client.cli.DefaultCLI; import org.apache.flink.client.cli.InfoOptions; import org.apache.flink.client.cli.ListOptions; import org.apache.flink.client.cli.ProgramOptions; @@ -39,7 +39,6 @@ import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.ProgramInvocationException; -import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; @@ -56,7 +55,6 @@ import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob; import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure; @@ -67,13 +65,12 @@ import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.util.EnvironmentInformation; -import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Some; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -81,6 +78,8 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.net.InetSocketAddress; import java.net.URL; import java.text.SimpleDateFormat; @@ -89,6 +88,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.Date; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -102,9 +102,11 @@ */ public class CliFrontend { + private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class); + // actions - public static final String ACTION_RUN = "run"; - public static final String ACTION_INFO = "info"; + private static final String ACTION_RUN = "run"; + private static final String ACTION_INFO = "info"; private static final String ACTION_LIST = "list"; private static final String ACTION_CANCEL = "cancel"; private static final String ACTION_STOP = "stop"; @@ -116,19 +118,24 @@ public class CliFrontend { private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf"; // -------------------------------------------------------------------------------------------- + + private static final List customCommandLine = new LinkedList<>(); + + static { + /** command line interface of the YARN session, with a special initialization here + * to prefix all options with y/yarn. */ + loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", "yarn"); + customCommandLine.add(new DefaultCLI()); + } + // -------------------------------------------------------------------------------------------- - private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class); private final Configuration config; private final FiniteDuration clientTimeout; - private final FiniteDuration lookupTimeout; - - private ActorSystem actorSystem; - /** * * @throws Exception Thrown if the configuration directory was not found, the configuration could not be loaded @@ -146,6 +153,8 @@ public CliFrontend(String configDir) throws Exception { // load the configuration LOG.info("Trying to load configuration file"); GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath()); + System.setProperty("FLINK_CONF_DIR", configDirectory.getAbsolutePath()); + this.config = GlobalConfiguration.getConfiguration(); try { @@ -156,7 +165,6 @@ public CliFrontend(String configDir) throws Exception { } this.clientTimeout = AkkaUtils.getClientTimeout(config); - this.lookupTimeout = AkkaUtils.getLookupTimeout(config); } @@ -798,19 +806,20 @@ else if (!jarFile.isFile()) { * * @param options Command line options */ - protected void updateConfig(CommandLineOptions options) { - if(options.getJobManagerAddress() != null){ - if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) { - jobManagerAddress = CliFrontendParser.getFlinkYarnSessionCli() - .attachFlinkYarnClient(options.getCommandLine()) - .getJobManagerAddress(); - InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress()); - writeJobManagerAddressToConfig(config, jobManagerAddress); + protected ClusterClient retrieveClient(CommandLineOptions options) { + CustomCommandLine customCLI = getActiveCustomCommandLine(options.getCommandLine()); + try { + ClusterClient client = customCLI.retrieveCluster(options.getCommandLine(), config); + LOG.info("Using address {} to connect to JobManager.", client.getJobManagerAddressFromConfig()); + return client; + } catch (Exception e) { + LOG.error("Couldn't retrieve {} cluster.", customCLI.getId(), e); + throw new IllegalConfigurationException("Couldn't retrieve client for cluster", e); } } /** - * Retrieves the {@link ActorGateway} for the JobManager. The JobManager address is retrieved + * Retrieves the {@link ActorGateway} for the JobManager. The ClusterClient is retrieved * from the provided {@link CommandLineOptions}. * * @param options CommandLineOptions specifying the JobManager URL @@ -818,92 +827,41 @@ protected void updateConfig(CommandLineOptions options) { * @throws Exception */ protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws Exception { - // overwrite config values with given command line options - updateConfig(options); - - // start an actor system if needed - if (this.actorSystem == null) { - LOG.info("Starting actor system to communicate with JobManager"); - try { - scala.Tuple2 systemEndpoint = new scala.Tuple2("", 0); - this.actorSystem = AkkaUtils.createActorSystem( - config, - new Some>(systemEndpoint)); - } - catch (Exception e) { - throw new IOException("Could not start actor system to communicate with JobManager", e); - } - - LOG.info("Actor system successfully started"); - } - - LOG.info("Trying to lookup the JobManager gateway"); - // Retrieve the ActorGateway from the LeaderRetrievalService - LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config); - - return LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, lookupTimeout); + return retrieveClient(options).getJobManagerGateway(); } /** - * Retrieves a {@link ClusterClient} object from the given command line options and other parameters. - * - * @param options Command line options which contain JobManager address + * Creates a {@link ClusterClient} object from the given command line options and other parameters. + * @param options Command line options * @param programName Program name * @throws Exception */ protected ClusterClient getClient( CommandLineOptions options, - String programName) - throws Exception { - InetSocketAddress jobManagerAddress; - - // try to get the JobManager address via command-line args - if (options.getJobManagerAddress() != null) { + String programName) throws Exception { - // Get the custom command-lines (e.g. Yarn/Mesos) - CustomCommandLine activeCommandLine = - CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress()); + // Get the custom command-line (e.g. Standalone/Yarn/Mesos) + CustomCommandLine activeCommandLine = getActiveCustomCommandLine(options.getCommandLine()); - if (activeCommandLine != null) { - logAndSysout(activeCommandLine.getIdentifier() + " mode detected. Switching Log4j output to console"); - - // Default yarn application name to use, if nothing is specified on the command line + ClusterClient client; + try { + client = activeCommandLine.retrieveCluster(options.getCommandLine(), config); + logAndSysout("Cluster retrieved"); + } catch (UnsupportedOperationException e) { + try { String applicationName = "Flink Application: " + programName; - - ClusterClient client = activeCommandLine.createClient(applicationName, options.getCommandLine()); - + client = activeCommandLine.createCluster(applicationName, options.getCommandLine(), config); logAndSysout("Cluster started"); - logAndSysout("JobManager web interface address " + client.getWebInterfaceURL()); - - return client; - } else { - // job manager address supplied on the command-line - LOG.info("Using address {} to connect to JobManager.", options.getJobManagerAddress()); - jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress()); - writeJobManagerAddressToConfig(config, jobManagerAddress); - return new StandaloneClusterClient(config); - } - - // try to get the JobManager address via resuming of a cluster - } else { - for (CustomCommandLine cli : CliFrontendParser.getAllCustomCommandLine().values()) { - ClusterClient client = cli.retrieveCluster(config); - if (client != null) { - LOG.info("Using address {} to connect to JobManager.", client.getJobManagerAddressFromConfig()); - return client; - } + } catch (UnsupportedOperationException e2) { + throw new IllegalConfigurationException( + "The JobManager address is neither provided at the command-line, " + + "nor configured in flink-conf.yaml."); } } - // read JobManager address from the config - if (config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) != null) { - return new StandaloneClusterClient(config); - // We tried hard but couldn't find a JobManager address - } else { - throw new IllegalConfigurationException( - "The JobManager address is neither provided at the command-line, " + - "nor configured in flink-conf.yaml."); - } + logAndSysout("Using address " + client.getJobManagerAddress() + " to connect to JobManager."); + logAndSysout("JobManager web interface address " + client.getWebInterfaceURL()); + return client; } // -------------------------------------------------------------------------------------------- @@ -917,7 +875,7 @@ protected ClusterClient getClient( * @return The return code for the process. */ private int handleArgException(Exception e) { - LOG.error("Invalid command line arguments." + (e.getMessage() == null ? "" : e.getMessage())); + LOG.error("Invalid command line arguments. " + (e.getMessage() == null ? "" : e.getMessage())); System.out.println(e.getMessage()); System.out.println(); @@ -1039,14 +997,6 @@ public Integer run() throws Exception { } } - public void shutdown() { - ActorSystem sys = this.actorSystem; - if (sys != null) { - this.actorSystem = null; - sys.shutdown(); - } - } - /** * Submits the job based on the arguments */ @@ -1070,7 +1020,8 @@ public static void main(String[] args) { // -------------------------------------------------------------------------------------------- public static String getConfigurationDirectoryFromEnv() { - String location = System.getenv(ENV_CONFIG_DIRECTORY); + String envLocation = System.getenv(ENV_CONFIG_DIRECTORY); + String location = envLocation != null ? envLocation : System.getProperty(ENV_CONFIG_DIRECTORY); if (location != null) { if (new File(location).exists()) { @@ -1102,9 +1053,65 @@ else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) { * @param address Address to write to the configuration * @param config The config to write to */ - public static void writeJobManagerAddressToConfig(Configuration config, InetSocketAddress address) { + public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) { config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostName()); config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort()); } + // -------------------------------------------------------------------------------------------- + // Custom command-line + // -------------------------------------------------------------------------------------------- + + /** + * Gets the custom command-line for the arguments. + * @param commandLine The input to the command-line. + * @return custom command-line which is active (may only be one at a time) + */ + public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) { + for (CustomCommandLine cli : customCommandLine) { + if (cli.isActive(commandLine, config)) { + return cli; + } + } + throw new IllegalStateException("No command-line ran."); + } + + /** + * Retrieves the loaded custom command-lines. + * @return An unmodifiyable list of loaded custom command-lines. + */ + public static List getCustomCommandLineList() { + return Collections.unmodifiableList(customCommandLine); + } + + /** + * Loads a class from the classpath that implements the CustomCommandLine interface. + * @param className The fully-qualified class name to load. + * @param params The constructor parameters + */ + private static void loadCustomCommandLine(String className, Object... params) { + + try { + Class customCliClass = + Class.forName(className).asSubclass(CustomCommandLine.class); + + // construct class types from the parameters + Class[] types = new Class[params.length]; + for (int i = 0; i < params.length; i++) { + Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null."); + types[i] = params[i].getClass(); + } + + Constructor constructor = customCliClass.getConstructor(types); + final CustomCommandLine cli = constructor.newInstance(params); + + customCommandLine.add(cli); + + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException + | InvocationTargetException e) { + LOG.warn("Unable to locate custom CLI class {}. " + + "Flink is not compiled with support for this class.", className, e); + } + } + } diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java index 9b935e8a3203f..c90793dc6f2e9 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java @@ -24,16 +24,10 @@ import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; -import org.apache.flink.util.Preconditions; +import org.apache.flink.client.CliFrontend; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - /** * A simple command line parser (based on Apache Commons CLI) that extracts command @@ -44,16 +38,6 @@ public class CliFrontendParser { private static final Logger LOG = LoggerFactory.getLogger(CliFrontendParser.class); - /** command line interface of the YARN session, with a special initialization here - * to prefix all options with y/yarn. */ - private static final Map customCommandLine = new HashMap<>(1); - - static { - // we could easily add more here in the future - loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", "yarn"); - } - - static final Option HELP_OPTION = new Option("h", "help", false, "Show the help message for the CLI Frontend or the action."); @@ -82,9 +66,8 @@ public class CliFrontendParser { static final Option ARGS_OPTION = new Option("a", "arguments", true, "Program arguments. Arguments can also be added without -a, simply as trailing parameters."); - static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true, + public static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true, "Address of the JobManager (master) to which to connect. " + - "Specify " + getCliIdentifierString() +" as the JobManager to deploy a cluster for the job. " + "Use this flag to connect to a different JobManager than the one specified in the configuration."); static final Option SAVEPOINT_PATH_OPTION = new Option("s", "fromSavepoint", true, @@ -146,6 +129,10 @@ private static Options buildGeneralOptions(Options options) { options.addOption(HELP_OPTION); // backwards compatibility: ignore verbose flag (-v) options.addOption(new Option("v", "verbose", false, "This option is deprecated.")); + // add general options of all CLIs + for (CustomCommandLine customCLI : CliFrontend.getCustomCommandLineList()) { + customCLI.addGeneralOptions(options); + } return options; } @@ -158,11 +145,6 @@ public static Options getProgramSpecificOptions(Options options) { options.addOption(LOGGING_OPTION); options.addOption(DETACHED_OPTION); options.addOption(SAVEPOINT_PATH_OPTION); - - for (CustomCommandLine customCLI : customCommandLine.values()) { - customCLI.addOptions(options); - } - return options; } @@ -177,62 +159,85 @@ private static Options getProgramSpecificOptionsWithoutDeprecatedOptions(Options } private static Options getRunOptions(Options options) { - Options o = getProgramSpecificOptions(options); - return getJobManagerAddressOption(o); + options = getProgramSpecificOptions(options); + options = getJobManagerAddressOption(options); + return addCustomCliOptions(options, true); } - private static Options getRunOptionsWithoutDeprecatedOptions(Options options) { - Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options); - return getJobManagerAddressOption(o); - } private static Options getJobManagerAddressOption(Options options) { options.addOption(ADDRESS_OPTION); - yarnSessionCLi.getYARNAttachCLIOptions(options); - return options; } private static Options getInfoOptions(Options options) { options = getProgramSpecificOptions(options); options = getJobManagerAddressOption(options); - return options; + return addCustomCliOptions(options, false); + } + + private static Options getListOptions(Options options) { + options.addOption(RUNNING_OPTION); + options.addOption(SCHEDULED_OPTION); + options = getJobManagerAddressOption(options); + return addCustomCliOptions(options, false); + } + + private static Options getCancelOptions(Options options) { + options = getJobManagerAddressOption(options); + return addCustomCliOptions(options, false); + } + + private static Options getStopOptions(Options options) { + options = getJobManagerAddressOption(options); + return addCustomCliOptions(options, false); + } + + private static Options getSavepointOptions(Options options) { + options = getJobManagerAddressOption(options); + options.addOption(SAVEPOINT_DISPOSE_OPTION); + return addCustomCliOptions(options, false); + } + + // -------------------------------------------------------------------------------------------- + // Help + // -------------------------------------------------------------------------------------------- + + private static Options getRunOptionsWithoutDeprecatedOptions(Options options) { + Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options); + return getJobManagerAddressOption(o); } + private static Options getInfoOptionsWithoutDeprecatedOptions(Options options) { options.addOption(CLASS_OPTION); options.addOption(PARALLELISM_OPTION); - options = getJobManagerAddressOption(options); return options; } - private static Options getListOptions(Options options) { + private static Options getListOptionsWithoutDeprecatedOptions(Options options) { options.addOption(RUNNING_OPTION); options.addOption(SCHEDULED_OPTION); options = getJobManagerAddressOption(options); return options; } - private static Options getCancelOptions(Options options) { + private static Options getCancelOptionsWithoutDeprecatedOptions(Options options) { options = getJobManagerAddressOption(options); return options; } - private static Options getStopOptions(Options options) { + private static Options getStopOptionsWithoutDeprecatedOptions(Options options) { options = getJobManagerAddressOption(options); return options; } - private static Options getSavepointOptions(Options options) { + private static Options getSavepointOptionsWithoutDeprecatedOptions(Options options) { options = getJobManagerAddressOption(options); options.addOption(SAVEPOINT_DISPOSE_OPTION); return options; } - // -------------------------------------------------------------------------------------------- - // Help - // -------------------------------------------------------------------------------------------- - /** * Prints the help for the client. */ @@ -261,14 +266,7 @@ public static void printHelpForRun() { formatter.setSyntaxPrefix(" \"run\" action options:"); formatter.printHelp(" ", getRunOptionsWithoutDeprecatedOptions(new Options())); - // prints options from all available command-line classes - for (Map.Entry entry: customCommandLine.entrySet()) { - formatter.setSyntaxPrefix(" Additional arguments if -m " + entry.getKey() + " is set:"); - Options customOpts = new Options(); - entry.getValue().addOptions(customOpts); - formatter.printHelp(" ", customOpts); - System.out.println(); - } + printCustomCliOptions(formatter, true); System.out.println(); } @@ -282,10 +280,9 @@ public static void printHelpForInfo() { System.out.println("\n Syntax: info [OPTIONS] "); formatter.setSyntaxPrefix(" \"info\" action options:"); formatter.printHelp(" ", getInfoOptionsWithoutDeprecatedOptions(new Options())); - formatter.setSyntaxPrefix(" Additional arguments if -m " + CliFrontend.YARN_DEPLOY_JOBMANAGER + " is set:"); - Options yarnOpts = new Options(); - yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts); - formatter.printHelp(" ", yarnOpts); + + printCustomCliOptions(formatter, false); + System.out.println(); } @@ -297,7 +294,10 @@ public static void printHelpForList() { System.out.println("\nAction \"list\" lists running and scheduled programs."); System.out.println("\n Syntax: list [OPTIONS]"); formatter.setSyntaxPrefix(" \"list\" action options:"); - formatter.printHelp(" ", getListOptions(new Options())); + formatter.printHelp(" ", getListOptionsWithoutDeprecatedOptions(new Options())); + + printCustomCliOptions(formatter, false); + System.out.println(); } @@ -309,7 +309,10 @@ public static void printHelpForStop() { System.out.println("\nAction \"stop\" stops a running program (streaming jobs only)."); System.out.println("\n Syntax: stop [OPTIONS] "); formatter.setSyntaxPrefix(" \"stop\" action options:"); - formatter.printHelp(" ", getStopOptions(new Options())); + formatter.printHelp(" ", getStopOptionsWithoutDeprecatedOptions(new Options())); + + printCustomCliOptions(formatter, false); + System.out.println(); } @@ -321,11 +324,10 @@ public static void printHelpForCancel() { System.out.println("\nAction \"cancel\" cancels a running program."); System.out.println("\n Syntax: cancel [OPTIONS] "); formatter.setSyntaxPrefix(" \"cancel\" action options:"); - formatter.printHelp(" ", getCancelOptions(new Options())); - formatter.setSyntaxPrefix(" Additional arguments if -m " + CliFrontend.YARN_DEPLOY_JOBMANAGER + " is set:"); - Options yarnOpts = new Options(); - yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts); - formatter.printHelp(" ", yarnOpts); + formatter.printHelp(" ", getCancelOptionsWithoutDeprecatedOptions(new Options())); + + printCustomCliOptions(formatter, false); + System.out.println(); } @@ -337,10 +339,50 @@ public static void printHelpForSavepoint() { System.out.println("\nAction \"savepoint\" triggers savepoints for a running job or disposes existing ones."); System.out.println("\n Syntax: savepoint [OPTIONS] "); formatter.setSyntaxPrefix(" \"savepoint\" action options:"); - formatter.printHelp(" ", getSavepointOptions(new Options())); + formatter.printHelp(" ", getSavepointOptionsWithoutDeprecatedOptions(new Options())); + + printCustomCliOptions(formatter, false); + System.out.println(); } + /** + * Adds custom cli options + * @param options The options to add options to + * @param runOptions Whether to include run options + * @return Options with additions + */ + private static Options addCustomCliOptions(Options options, boolean runOptions) { + for (CustomCommandLine cli: CliFrontend.getCustomCommandLineList()) { + cli.addGeneralOptions(options); + if (runOptions) { + cli.addRunOptions(options); + } + } + return options; + } + + /** + * Prints custom cli options + * @param formatter The formatter to use for printing + * @param runOptions True if the run options should be printed, False to print only general options + */ + private static void printCustomCliOptions(HelpFormatter formatter, boolean runOptions) { + // prints options from all available command-line classes + for (CustomCommandLine cli: CliFrontend.getCustomCommandLineList()) { + if (cli.getId() != null) { + formatter.setSyntaxPrefix(" Options for " + cli.getId() + " mode:"); + Options customOpts = new Options(); + cli.addGeneralOptions(customOpts); + if (runOptions) { + cli.addRunOptions(customOpts); + } + formatter.printHelp(" ", customOpts); + System.out.println(); + } + } + } + // -------------------------------------------------------------------------------------------- // Line Parsing // -------------------------------------------------------------------------------------------- @@ -410,63 +452,4 @@ public static InfoOptions parseInfoCommand(String[] args) throws CliArgsExceptio } } - public static Map getAllCustomCommandLine() { - if (customCommandLine.isEmpty()) { - LOG.warn("No custom command-line classes were loaded."); - } - return Collections.unmodifiableMap(customCommandLine); - } - - private static String getCliIdentifierString() { - StringBuilder builder = new StringBuilder(); - boolean first = true; - for (String identifier : customCommandLine.keySet()) { - if (!first) { - builder.append(", "); - } - first = false; - builder.append("'").append(identifier).append("'"); - } - return builder.toString(); - } - - /** - * Gets the custom command-line for this identifier. - * @param identifier The unique identifier for this command-line implementation. - * @return CustomCommandLine or null if none was found - */ - public static CustomCommandLine getActiveCustomCommandLine(String identifier) { - return CliFrontendParser.getAllCustomCommandLine().get(identifier); - } - - private static void loadCustomCommandLine(String className, Object... params) { - - try { - Class customCliClass = - Class.forName(className).asSubclass(CustomCommandLine.class); - - // construct class types from the parameters - Class[] types = new Class[params.length]; - for (int i = 0; i < params.length; i++) { - Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null."); - types[i] = params[i].getClass(); - } - - Constructor constructor = customCliClass.getConstructor(types); - final CustomCommandLine cli = constructor.newInstance(params); - - String cliIdentifier = Preconditions.checkNotNull(cli.getIdentifier()); - CustomCommandLine existing = customCommandLine.put(cliIdentifier, cli); - - if (existing != null) { - throw new IllegalStateException("Attempted to register " + cliIdentifier + - " but there is already a command-line with this identifier."); - } - } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException - | InvocationTargetException e) { - LOG.warn("Unable to locate custom CLI class {}. " + - "Flink is not compiled with support for this class.", className, e); - } - } - } diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java index cd5e0e6acd082..aecdc7c9662a9 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java @@ -29,29 +29,47 @@ public interface CustomCommandLine { /** - * Returns a unique identifier for this custom command-line. - * @return An unique identifier string + * Signals whether the custom command-line wants to execute or not + * @param commandLine The command-line options + * @param configuration The Flink configuration + * @return True if the command-line wants to run, False otherwise */ - String getIdentifier(); + boolean isActive(CommandLine commandLine, Configuration configuration); /** - * Adds custom options to the existing options. + * Gets the unique identifier of this CustomCommandLine + * @return A unique identifier + */ + String getId(); + + /** + * Adds custom options to the existing run options. + * @param baseOptions The existing options. + */ + void addRunOptions(Options baseOptions); + + /** + * Adds custom options to the existing general options. * @param baseOptions The existing options. */ - void addOptions(Options baseOptions); + void addGeneralOptions(Options baseOptions); /** * Retrieves a client for a running cluster + * @param commandLine The command-line parameters from the CliFrontend * @param config The Flink config - * @return Client if a cluster could be retrieve, null otherwise + * @return Client if a cluster could be retrieved + * @throws UnsupportedOperationException if the operation is not supported */ - ClusterClient retrieveCluster(Configuration config) throws Exception; + ClusterType retrieveCluster(CommandLine commandLine, Configuration config) throws UnsupportedOperationException; /** * Creates the client for the cluster * @param applicationName The application name to use * @param commandLine The command-line options parsed by the CliFrontend + * @param config The Flink config to use * @return The client to communicate with the cluster which the CustomCommandLine brought up. + * @throws UnsupportedOperationException if the operation is not supported */ - ClusterType createClient(String applicationName, CommandLine commandLine) throws Exception; + ClusterType createCluster(String applicationName, CommandLine commandLine, Configuration config) throws UnsupportedOperationException; } diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java new file mode 100644 index 0000000000000..8bceed7703c34 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client.cli; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.flink.client.ClientUtils; +import org.apache.flink.client.deployment.StandaloneClusterDescriptor; +import org.apache.flink.client.program.StandaloneClusterClient; +import org.apache.flink.configuration.Configuration; + +import java.net.InetSocketAddress; + +import static org.apache.flink.client.CliFrontend.setJobManagerAddressInConfig; + +/** + * The default CLI which is used for interaction with standalone clusters. + */ +public class DefaultCLI implements CustomCommandLine { + + @Override + public boolean isActive(CommandLine commandLine, Configuration configuration) { + // always active because we can try to read a JobManager address from the config + return true; + } + + @Override + public String getId() { + return null; + } + + @Override + public void addRunOptions(Options baseOptions) { + } + + @Override + public void addGeneralOptions(Options baseOptions) { + } + + @Override + public StandaloneClusterClient retrieveCluster(CommandLine commandLine, Configuration config) { + + if (commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) { + String addressWithPort = commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt()); + InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(addressWithPort); + setJobManagerAddressInConfig(config, jobManagerAddress); + } + + StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config); + return descriptor.retrieve(null); + } + + @Override + public StandaloneClusterClient createCluster( + String applicationName, + CommandLine commandLine, + Configuration config) throws UnsupportedOperationException { + + StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config); + return descriptor.deploy(); + } +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java index cf0595bc44d64..59cece33435da 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java @@ -30,12 +30,20 @@ public interface ClusterDescriptor { * Returns a String containing details about the cluster (NodeManagers, available memory, ...) * */ - String getClusterDescription() throws Exception; + String getClusterDescription(); + + /** + * Retrieves an existing Flink Cluster. + * @param applicationID The unique application identifier of the running cluster + * @return Client for the cluster + * @throws UnsupportedOperationException if this cluster descriptor doesn't support the operation + */ + ClientType retrieve(String applicationID) throws UnsupportedOperationException; /** * Triggers deployment of a cluster * @return Client for the cluster - * @throws Exception + * @throws UnsupportedOperationException if this cluster descriptor doesn't support the operation */ - ClientType deploy() throws Exception; + ClientType deploy() throws UnsupportedOperationException; } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java new file mode 100644 index 0000000000000..57ccc47ede347 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client.deployment; + +import org.apache.flink.client.program.StandaloneClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; + + +/** + * A deployment descriptor for an existing cluster + */ +public class StandaloneClusterDescriptor implements ClusterDescriptor { + + private final Configuration config; + + public StandaloneClusterDescriptor(Configuration config) { + this.config = config; + } + + @Override + public String getClusterDescription() { + String host = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, ""); + int port = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); + return "Standalone cluster at " + host + ":" + port; + } + + @Override + public StandaloneClusterClient retrieve(String applicationID) { + try { + return new StandaloneClusterClient(config); + } catch (Exception e) { + throw new RuntimeException("Couldn't retrieve standalone cluster", e); + } + } + + @Override + public StandaloneClusterClient deploy() { + throw new UnsupportedOperationException("Can't deploy a standalone cluster."); + } +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index b56428d76680c..def9578c06f55 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -76,7 +76,7 @@ */ public abstract class ClusterClient { - private static final Logger LOG = LoggerFactory.getLogger(ClusterClient.class); + private final Logger LOG = LoggerFactory.getLogger(getClass()); /** The optimizer used in the optimization of batch programs */ final Optimizer compiler; @@ -203,9 +203,9 @@ public boolean getPrintStatusDuringExecution() { */ public InetSocketAddress getJobManagerAddressFromConfig() { try { - String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); - int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); - return new InetSocketAddress(hostName, port); + String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); + return new InetSocketAddress(hostName, port); } catch (Exception e) { throw new RuntimeException("Failed to retrieve JobManager address", e); } @@ -255,11 +255,13 @@ public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram pro } public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException { + Logger log = LoggerFactory.getLogger(ClusterClient.class); + if (parallelism > 0 && p.getDefaultParallelism() <= 0) { - LOG.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism); + log.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism); p.setDefaultParallelism(parallelism); } - LOG.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism()); + log.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism()); return compiler.compile(p); } @@ -603,7 +605,7 @@ private JobGraph getJobGraph(FlinkPlan optPlan, List jarFiles, List cl * @return ActorGateway of the current job manager leader * @throws Exception */ - protected ActorGateway getJobManagerGateway() throws Exception { + public ActorGateway getJobManagerGateway() throws Exception { LOG.info("Looking up JobManager"); return LeaderRetrievalUtils.retrieveLeaderGateway( diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java index de85ca864b014..c6b1111d7f3bd 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java @@ -23,9 +23,13 @@ import static org.mockito.Mockito.*; +import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CommandLineOptions; +import org.apache.flink.client.cli.RunOptions; +import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; @@ -57,14 +61,12 @@ public void clearConfig() { public void testValidConfig() { try { CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + RunOptions options = CliFrontendParser.parseRunCommand(new String[] {}); - CommandLineOptions options = mock(CommandLineOptions.class); - - frontend.updateConfig(options); - Configuration config = frontend.getConfiguration(); + ClusterClient clusterClient = frontend.retrieveClient(options); checkJobManagerAddress( - config, + clusterClient.getFlinkConfiguration(), CliFrontendTestUtils.TEST_JOB_MANAGER_ADDRESS, CliFrontendTestUtils.TEST_JOB_MANAGER_PORT); } @@ -74,43 +76,12 @@ public void testValidConfig() { } } - @Test - public void testInvalidConfigAndNoOption() { - try { + @Test(expected = IllegalConfigurationException.class) + public void testInvalidConfigAndNoOption() throws Exception { CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getInvalidConfigDir()); - CommandLineOptions options = mock(CommandLineOptions.class); - - frontend.updateConfig(options); - Configuration config = frontend.getConfiguration(); + RunOptions options = CliFrontendParser.parseRunCommand(new String[] {}); - checkJobManagerAddress(config, null, -1); - - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testInvalidConfigAndOption() { - try { - CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getInvalidConfigDir()); - - CommandLineOptions options = mock(CommandLineOptions.class); - when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788"); - - frontend.updateConfig(options); - Configuration config = frontend.getConfiguration(); - - InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788); - - checkJobManagerAddress(config, expectedAddress.getHostName(), expectedAddress.getPort()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + frontend.retrieveClient(options); } @Test @@ -118,12 +89,10 @@ public void testManualOptionsOverridesConfig() { try { CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); - CommandLineOptions options = mock(CommandLineOptions.class); - when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788"); - - frontend.updateConfig(options); + RunOptions options = CliFrontendParser.parseRunCommand(new String[] {"-m", "10.221.130.22:7788"}); - Configuration config = frontend.getConfiguration(); + ClusterClient client = frontend.retrieveClient(options); + Configuration config = client.getFlinkConfiguration(); InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788); diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala index 1a8870b797596..f3b350760f7de 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala @@ -20,6 +20,7 @@ package org.apache.flink.api.scala import java.io._ +import org.apache.commons.cli.CommandLine import org.apache.flink.client.cli.CliFrontendParser import org.apache.flink.client.program.ClusterClient import org.apache.flink.client.CliFrontend @@ -245,11 +246,13 @@ object FlinkShell { yarnConfig.queue.foreach((queue) => args ++= Seq("-yqu", queue.toString)) yarnConfig.slots.foreach((slots) => args ++= Seq("-ys", slots.toString)) - val customCLI = CliFrontendParser.getAllCustomCommandLine.get("yarn-cluster") val options = CliFrontendParser.parseRunCommand(args.toArray) + val frontend = new CliFrontend() + val config = frontend.getConfiguration + val customCLI = frontend.getActiveCustomCommandLine(options.getCommandLine) - val cluster = customCLI.createClient("Flink Scala Shell", options.getCommandLine) + val cluster = customCLI.createCluster("Flink Scala Shell", options.getCommandLine, config) val address = cluster.getJobManagerAddress.getAddress.getHostAddress val port = cluster.getJobManagerAddress.getPort @@ -259,12 +262,21 @@ object FlinkShell { def fetchDeployedYarnClusterInfo() = { - // load configuration - val globalConfig = GlobalConfiguration.getConfiguration - val customCLI = CliFrontendParser.getAllCustomCommandLine.get("yarn-cluster") + val args = ArrayBuffer[String]( + "-m", "yarn-cluster" + ) - val cluster = customCLI.retrieveCluster(globalConfig) + val options = CliFrontendParser.parseRunCommand(args.toArray) + val frontend = new CliFrontend() + val config = frontend.getConfiguration + val customCLI = frontend.getActiveCustomCommandLine(options.getCommandLine) + + val cluster = customCLI.retrieveCluster(options.getCommandLine, config) + + if (cluster == null) { + throw new RuntimeException("Yarn Cluster could not be retrieved.") + } val jobManager = cluster.getJobManagerAddress diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java index c6a1adeb57ab4..217ad3d72ab44 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java @@ -18,27 +18,45 @@ package org.apache.flink.yarn; +import org.apache.commons.cli.CommandLine; import org.apache.flink.client.CliFrontend; +import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CommandLineOptions; +import org.apache.flink.client.cli.CustomCommandLine; +import org.apache.flink.client.cli.RunOptions; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; -import org.junit.*; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; import java.io.File; +import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.nio.file.Files; import java.nio.file.StandardOpenOption; +import java.util.LinkedList; +import java.util.List; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; /** * Tests that verify that the CLI client picks up the correct address for the JobManager @@ -80,8 +98,10 @@ public void clearConfig() throws NoSuchFieldException, IllegalAccessException { private static final String TEST_YARN_JOB_MANAGER_ADDRESS = "22.33.44.55"; private static final int TEST_YARN_JOB_MANAGER_PORT = 6655; + private static final ApplicationId TEST_YARN_APPLICATION_ID = + ApplicationId.newInstance(System.currentTimeMillis(), 42); - private static final String propertiesFile = + private static final String validPropertiesFile = "jobManager=" + TEST_YARN_JOB_MANAGER_ADDRESS + ":" + TEST_YARN_JOB_MANAGER_PORT; @@ -101,110 +121,292 @@ public void clearConfig() throws NoSuchFieldException, IllegalAccessException { * Test that the CliFrontend is able to pick up the .yarn-properties file from a specified location. */ @Test - public void testYarnConfig() { - try { - File tmpFolder = temporaryFolder.newFolder(); - String currentUser = System.getProperty("user.name"); + public void testResumeFromYarnPropertiesFile() throws Exception { - // copy .yarn-properties- - File testPropertiesFile = new File(tmpFolder, ".yarn-properties-"+currentUser); - Files.write(testPropertiesFile.toPath(), propertiesFile.getBytes(), StandardOpenOption.CREATE); + File directoryPath = writeYarnPropertiesFile(validPropertiesFile); - // copy reference flink-conf.yaml to temporary test directory and append custom configuration path. - String confFile = flinkConf + "\nyarn.properties-file.location: " + tmpFolder; - File testConfFile = new File(tmpFolder.getAbsolutePath(), "flink-conf.yaml"); - Files.write(testConfFile.toPath(), confFile.getBytes(), StandardOpenOption.CREATE); + // start CLI Frontend + TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath()); - // start CLI Frontend - TestCLI frontend = new TestCLI(tmpFolder.getAbsolutePath()); + RunOptions options = CliFrontendParser.parseRunCommand(new String[] {}); - CommandLineOptions options = mock(CommandLineOptions.class); + frontend.retrieveClient(options); + checkJobManagerAddress( + frontend.getConfiguration(), + TEST_YARN_JOB_MANAGER_ADDRESS, + TEST_YARN_JOB_MANAGER_PORT); - frontend.getClient(options, "Program name"); + } - frontend.updateConfig(options); - Configuration config = frontend.getConfiguration(); + @Test(expected = IllegalConfigurationException.class) + public void testResumeFromYarnPropertiesFileWithFinishedApplication() throws Exception { - checkJobManagerAddress( - config, - TEST_YARN_JOB_MANAGER_ADDRESS, - TEST_YARN_JOB_MANAGER_PORT); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + File directoryPath = writeYarnPropertiesFile(validPropertiesFile); + + // start CLI Frontend + TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED); + + RunOptions options = CliFrontendParser.parseRunCommand(new String[] {}); + + frontend.retrieveClient(options); + checkJobManagerAddress( + frontend.getConfiguration(), + TEST_YARN_JOB_MANAGER_ADDRESS, + TEST_YARN_JOB_MANAGER_PORT); } - public static class TestCLI extends CliFrontend { - TestCLI(String configDir) throws Exception { - super(configDir); - } - @Override - public ClusterClient getClient(CommandLineOptions options, String programName) throws Exception { - return super.getClient(options, programName); - } + @Test(expected = IllegalConfigurationException.class) + public void testInvalidYarnPropertiesFile() throws Exception { - @Override - public void updateConfig(CommandLineOptions options) { - super.updateConfig(options); - } + File directoryPath = writeYarnPropertiesFile(invalidPropertiesFile); + + TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath()); + + RunOptions options = CliFrontendParser.parseRunCommand(new String[] {}); + + frontend.retrieveClient(options); + Configuration config = frontend.getConfiguration(); + + checkJobManagerAddress( + config, + TEST_JOB_MANAGER_ADDRESS, + TEST_JOB_MANAGER_PORT); } + @Test - public void testInvalidYarnConfig() { - try { - File tmpFolder = temporaryFolder.newFolder(); + public void testResumeFromYarnID() throws Exception { + File directoryPath = writeYarnPropertiesFile(validPropertiesFile); - // copy invalid .yarn-properties- - File testPropertiesFile = new File(tmpFolder, ".yarn-properties"); - Files.write(testPropertiesFile.toPath(), invalidPropertiesFile.getBytes(), StandardOpenOption.CREATE); + // start CLI Frontend + TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath()); - // copy reference flink-conf.yaml to temporary test directory and append custom configuration path. - String confFile = flinkConf + "\nyarn.properties-file.location: " + tmpFolder; - File testConfFile = new File(tmpFolder.getAbsolutePath(), "flink-conf.yaml"); - Files.write(testConfFile.toPath(), confFile.getBytes(), StandardOpenOption.CREATE); + RunOptions options = + CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}); - TestCLI cli = new TestCLI(tmpFolder.getAbsolutePath()); + frontend.retrieveClient(options); - CommandLineOptions options = mock(CommandLineOptions.class); + checkJobManagerAddress( + frontend.getConfiguration(), + TEST_YARN_JOB_MANAGER_ADDRESS, + TEST_YARN_JOB_MANAGER_PORT); + } - cli.updateConfig(options); + @Test(expected = IllegalConfigurationException.class) + public void testResumeFromInvalidYarnID() throws Exception { + File directoryPath = writeYarnPropertiesFile(validPropertiesFile); - Configuration config = cli.getConfiguration(); + // start CLI Frontend + TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED); - checkJobManagerAddress( - config, - TEST_JOB_MANAGER_ADDRESS, - TEST_JOB_MANAGER_PORT); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + RunOptions options = + CliFrontendParser.parseRunCommand(new String[] {"-yid", ApplicationId.newInstance(0, 666).toString()}); + + frontend.retrieveClient(options); + checkJobManagerAddress( + frontend.getConfiguration(), + TEST_YARN_JOB_MANAGER_ADDRESS, + TEST_YARN_JOB_MANAGER_PORT); + } + + @Test(expected = IllegalConfigurationException.class) + public void testResumeFromYarnIDWithFinishedApplication() throws Exception { + File directoryPath = writeYarnPropertiesFile(validPropertiesFile); + + // start CLI Frontend + TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED); + + RunOptions options = + CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}); + + frontend.retrieveClient(options); + + checkJobManagerAddress( + frontend.getConfiguration(), + TEST_YARN_JOB_MANAGER_ADDRESS, + TEST_YARN_JOB_MANAGER_PORT); } @Test - public void testManualOptionsOverridesYarn() { - try { - File emptyFolder = temporaryFolder.newFolder(); - TestCLI frontend = new TestCLI(emptyFolder.getAbsolutePath()); + public void testYarnIDOverridesPropertiesFile() throws Exception { + File directoryPath = writeYarnPropertiesFile(invalidPropertiesFile); + + // start CLI Frontend + TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath()); + + RunOptions options = + CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}); + + frontend.retrieveClient(options); + + checkJobManagerAddress( + frontend.getConfiguration(), + TEST_YARN_JOB_MANAGER_ADDRESS, + TEST_YARN_JOB_MANAGER_PORT); + } + + + @Test + public void testManualOptionsOverridesYarn() throws Exception { + + File emptyFolder = temporaryFolder.newFolder(); + File testConfFile = new File(emptyFolder.getAbsolutePath(), "flink-conf.yaml"); + Files.createFile(testConfFile.toPath()); - CommandLineOptions options = mock(CommandLineOptions.class); - when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788"); + TestCLI frontend = new TestCLI(emptyFolder.getAbsolutePath()); - frontend.updateConfig(options); + RunOptions options = CliFrontendParser.parseRunCommand(new String[] {"-m", "10.221.130.22:7788"}); - Configuration config = frontend.getConfiguration(); + frontend.retrieveClient(options); - InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788); + Configuration config = frontend.getConfiguration(); - checkJobManagerAddress(config, expectedAddress.getHostName(), expectedAddress.getPort()); + InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788); + + checkJobManagerAddress(config, expectedAddress.getHostName(), expectedAddress.getPort()); + + } + + + /////////// + // Utils // + /////////// + + private File writeYarnPropertiesFile(String contents) throws IOException { + File tmpFolder = temporaryFolder.newFolder(); + String currentUser = System.getProperty("user.name"); + + // copy .yarn-properties- + File testPropertiesFile = new File(tmpFolder, ".yarn-properties-"+currentUser); + Files.write(testPropertiesFile.toPath(), contents.getBytes(), StandardOpenOption.CREATE); + + // copy reference flink-conf.yaml to temporary test directory and append custom configuration path. + String confFile = flinkConf + "\nyarn.properties-file.location: " + tmpFolder; + File testConfFile = new File(tmpFolder.getAbsolutePath(), "flink-conf.yaml"); + Files.write(testConfFile.toPath(), confFile.getBytes(), StandardOpenOption.CREATE); + + return tmpFolder.getAbsoluteFile(); + } + + private static class TestCLI extends CliFrontend { + TestCLI(String configDir) throws Exception { + super(configDir); + } + + @Override + // make method public + public ClusterClient getClient(CommandLineOptions options, String programName) throws Exception { + return super.getClient(options, programName); + } + + @Override + // make method public + public ClusterClient retrieveClient(CommandLineOptions options) { + return super.retrieveClient(options); + } + } + + + /** + * Injects an extended FlinkYarnSessionCli that deals with mocking Yarn communication + */ + private static class CustomYarnTestCLI extends TestCLI { + + // the default application status for yarn applications to be retrieved + private final FinalApplicationStatus finalApplicationStatus; + + CustomYarnTestCLI(String configDir) throws Exception { + this(configDir, FinalApplicationStatus.UNDEFINED); } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + + CustomYarnTestCLI(String configDir, FinalApplicationStatus finalApplicationStatus) throws Exception { + super(configDir); + this.finalApplicationStatus = finalApplicationStatus; + } + + @Override + public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) { + // inject the testing FlinkYarnSessionCli + return new TestingYarnSessionCli(); + } + + /** + * Testing FlinkYarnSessionCli which returns a modified cluster descriptor for testing. + */ + private class TestingYarnSessionCli extends FlinkYarnSessionCli { + TestingYarnSessionCli() { + super("y", "yarn"); + } + + @Override + // override cluster descriptor to replace the YarnClient + protected AbstractYarnClusterDescriptor getClusterDescriptor() { + return new TestingYarnClusterDescriptor(); + } + + /** + * Replace the YarnClient for this test. + */ + private class TestingYarnClusterDescriptor extends YarnClusterDescriptor { + + @Override + protected YarnClient getYarnClient() { + return new TestYarnClient(); + } + + @Override + protected YarnClusterClient createYarnClusterClient( + AbstractYarnClusterDescriptor descriptor, + YarnClient yarnClient, + ApplicationReport report, + Configuration flinkConfiguration, + Path sessionFilesDir, + boolean perJobCluster) throws IOException, YarnException { + + return Mockito.mock(YarnClusterClient.class); + } + + + private class TestYarnClient extends YarnClientImpl { + + private final List reports = new LinkedList<>(); + + TestYarnClient() { + { // a report that of our Yarn application we want to resume from + ApplicationReport report = Mockito.mock(ApplicationReport.class); + Mockito.when(report.getHost()).thenReturn(TEST_YARN_JOB_MANAGER_ADDRESS); + Mockito.when(report.getRpcPort()).thenReturn(TEST_YARN_JOB_MANAGER_PORT); + Mockito.when(report.getApplicationId()).thenReturn(TEST_YARN_APPLICATION_ID); + Mockito.when(report.getFinalApplicationStatus()).thenReturn(finalApplicationStatus); + this.reports.add(report); + } + { // a second report, just for noise + ApplicationReport report = Mockito.mock(ApplicationReport.class); + Mockito.when(report.getHost()).thenReturn("1.2.3.4"); + Mockito.when(report.getRpcPort()).thenReturn(-123); + Mockito.when(report.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 0)); + Mockito.when(report.getFinalApplicationStatus()).thenReturn(finalApplicationStatus); + this.reports.add(report); + } + } + + @Override + public List getApplications() throws YarnException, IOException { + return reports; + } + + @Override + public ApplicationReport getApplicationReport(ApplicationId appId) throws YarnException, IOException { + for (ApplicationReport report : reports) { + if (report.getApplicationId().equals(appId)) { + return report; + } + } + throw new YarnException(); + } + } + } } } diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java index c842bdc54969a..f71dd635262ae 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java @@ -23,7 +23,6 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; -import org.apache.flink.client.CliFrontend; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.flink.test.util.TestBaseUtils; @@ -37,8 +36,6 @@ import java.util.HashMap; import java.util.Map; -import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.getDynamicProperties; - public class FlinkYarnSessionCliTest { @Rule @@ -53,9 +50,10 @@ public void testDynamicProperties() throws IOException { fakeConf.createNewFile(); map.put("FLINK_CONF_DIR", tmpFolder.getAbsolutePath()); TestBaseUtils.setEnv(map); - Options options = new Options(); FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", false); - cli.addOptions(options); + Options options = new Options(); + cli.addGeneralOptions(options); + cli.addRunOptions(options); CommandLineParser parser = new PosixParser(); CommandLine cmd = null; @@ -66,7 +64,7 @@ public void testDynamicProperties() throws IOException { Assert.fail("Parsing failed with " + e.getMessage()); } - YarnClusterDescriptor flinkYarnDescriptor = cli.createDescriptor(null, cmd); + AbstractYarnClusterDescriptor flinkYarnDescriptor = cli.createDescriptor(null, cmd); Assert.assertNotNull(flinkYarnDescriptor); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index c471fa4cff5fe..aebb14d51845c 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -22,6 +22,7 @@ import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobmanager.RecoveryMode; @@ -37,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; @@ -73,18 +75,8 @@ import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.getDynamicProperties; /** -* All classes in this package contain code taken from -* https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc -* and -* https://github.com/hortonworks/simple-yarn-app -* and -* https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java -* -* The Flink jar is uploaded to HDFS by this client. -* The application master and all the TaskManager containers get the jar file downloaded -* by YARN into their local fs. -* -*/ + * The descriptor with deployment information for spwaning or resuming a {@link YarnClusterClient}. + */ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor { private static final Logger LOG = LoggerFactory.getLogger(YarnClusterDescriptor.class); @@ -132,7 +124,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor private boolean detached; - private String customName = null; + private String customName; + public AbstractYarnClusterDescriptor() { // for unit tests only @@ -321,49 +314,112 @@ public boolean isDetachedMode() { * Gets a Hadoop Yarn client * @return Returns a YarnClient which has to be shutdown manually */ - public static YarnClient getYarnClient(Configuration conf) { + protected YarnClient getYarnClient() { YarnClient yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf); yarnClient.start(); return yarnClient; } - @Override - public YarnClusterClient deploy() throws Exception { + /** + * Retrieves the Yarn application and cluster from the config + * @param config The config with entries to retrieve the cluster + * @return YarnClusterClient + * @deprecated This should be removed in the future + */ + public YarnClusterClient retrieveFromConfig(org.apache.flink.configuration.Configuration config) + throws UnsupportedOperationException { + String jobManagerHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); - UserGroupInformation.setConfiguration(conf); - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + if (jobManagerHost != null && jobManagerPort != -1) { - if (UserGroupInformation.isSecurityEnabled()) { - if (!ugi.hasKerberosCredentials()) { - throw new YarnDeploymentException("In secure mode. Please provide Kerberos credentials in order to authenticate. " + - "You may use kinit to authenticate and request a TGT from the Kerberos server."); + YarnClient yarnClient = getYarnClient(); + final List applicationReports; + try { + applicationReports = yarnClient.getApplications(); + } catch (Exception e) { + throw new RuntimeException("Couldn't get Yarn application reports", e); } - return ugi.doAs(new PrivilegedExceptionAction() { - @Override - public YarnClusterClient run() throws Exception { - return deployInternal(); + for (ApplicationReport report : applicationReports) { + if (report.getHost().equals(jobManagerHost) && report.getRpcPort() == jobManagerPort) { + LOG.info("Found application '{}' " + + "with JobManager host name '{}' and port '{}' from Yarn properties file.", + report.getApplicationId(), jobManagerHost, jobManagerPort); + return retrieve(report.getApplicationId().toString()); } - }); - } else { - return deployInternal(); + } + } + + LOG.warn("Couldn't retrieve Yarn cluster from Flink configuration using JobManager address '{}:{}'", + jobManagerHost, jobManagerPort); + + throw new IllegalConfigurationException("Could not resume Yarn cluster from config."); } @Override - public AbstractFlinkYarnCluster attach(String appId) throws Exception { - // check if required Hadoop environment variables are set. If not, warn user - if(System.getenv("HADOOP_CONF_DIR") == null && - System.getenv("YARN_CONF_DIR") == null) { - LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set." + - "The Flink YARN Client needs one of these to be set to properly load the Hadoop " + - "configuration for accessing YARN."); + public YarnClusterClient retrieve(String applicationID) { + + try { + // check if required Hadoop environment variables are set. If not, warn user + if (System.getenv("HADOOP_CONF_DIR") == null && + System.getenv("YARN_CONF_DIR") == null) { + LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set." + + "The Flink YARN Client needs one of these to be set to properly load the Hadoop " + + "configuration for accessing YARN."); + } + + final ApplicationId yarnAppId = ConverterUtils.toApplicationId(applicationID); + final YarnClient yarnClient = getYarnClient(); + final ApplicationReport appReport = yarnClient.getApplicationReport(yarnAppId); + + if (appReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) { + // Flink cluster is not running anymore + LOG.error("The application {} doesn't run anymore. It has previously completed with final status: {}", + applicationID, appReport.getFinalApplicationStatus()); + throw new RuntimeException("The Yarn application " + applicationID + " doesn't run anymore."); + } + + LOG.info("Found application JobManager host name '{}' and port '{}' from supplied application id '{}'", + appReport.getHost(), appReport.getRpcPort(), applicationID); + + flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, appReport.getHost()); + flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, appReport.getRpcPort()); + + return createYarnClusterClient(this, yarnClient, appReport, flinkConfiguration, sessionFilesDir, false); + } catch (Exception e) { + throw new RuntimeException("Couldn't retrieve Yarn cluster", e); } + } - final ApplicationId yarnAppId = ConverterUtils.toApplicationId(appId); + @Override + public YarnClusterClient deploy() { - return new FlinkYarnCluster(yarnClient, yarnAppId, conf, flinkConfiguration, sessionFilesDir, detached); + try { + + UserGroupInformation.setConfiguration(conf); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + if (UserGroupInformation.isSecurityEnabled()) { + if (!ugi.hasKerberosCredentials()) { + throw new YarnDeploymentException("In secure mode. Please provide Kerberos credentials in order to authenticate. " + + "You may use kinit to authenticate and request a TGT from the Kerberos server."); + } + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public YarnClusterClient run() throws Exception { + return deployInternal(); + } + }); + } else { + return deployInternal(); + } + } catch (Exception e) { + throw new RuntimeException("Couldn't deploy Yarn cluster", e); + } } + /** * This method will block until the ApplicationMaster/JobManager have been * deployed on YARN. @@ -377,7 +433,7 @@ protected YarnClusterClient deployInternal() throws Exception { LOG.info("\tTaskManager memory = {}", taskManagerMemoryMb); // Create application via yarnClient - final YarnClient yarnClient = getYarnClient(conf); + final YarnClient yarnClient = getYarnClient(); final YarnClientApplication yarnApplication = yarnClient.createApplication(); GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse(); @@ -726,7 +782,7 @@ protected YarnClusterClient deployInternal() throws Exception { flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port); // the Flink cluster is deployed in YARN. Represent cluster - return new YarnClusterClient(this, yarnClient, report, flinkConfiguration, sessionFilesDir); + return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, sessionFilesDir, true); } /** @@ -780,40 +836,44 @@ private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yar } @Override - public String getClusterDescription() throws Exception { + public String getClusterDescription() { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PrintStream ps = new PrintStream(baos); + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(baos); - YarnClient yarnClient = getYarnClient(conf); - YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics(); + YarnClient yarnClient = getYarnClient(); + YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics(); - ps.append("NodeManagers in the ClusterClient " + metrics.getNumNodeManagers()); - List nodes = yarnClient.getNodeReports(NodeState.RUNNING); - final String format = "|%-16s |%-16s %n"; - ps.printf("|Property |Value %n"); - ps.println("+---------------------------------------+"); - int totalMemory = 0; - int totalCores = 0; - for(NodeReport rep : nodes) { - final Resource res = rep.getCapability(); - totalMemory += res.getMemory(); - totalCores += res.getVirtualCores(); - ps.format(format, "NodeID", rep.getNodeId()); - ps.format(format, "Memory", res.getMemory() + " MB"); - ps.format(format, "vCores", res.getVirtualCores()); - ps.format(format, "HealthReport", rep.getHealthReport()); - ps.format(format, "Containers", rep.getNumContainers()); + ps.append("NodeManagers in the ClusterClient " + metrics.getNumNodeManagers()); + List nodes = yarnClient.getNodeReports(NodeState.RUNNING); + final String format = "|%-16s |%-16s %n"; + ps.printf("|Property |Value %n"); ps.println("+---------------------------------------+"); + int totalMemory = 0; + int totalCores = 0; + for (NodeReport rep : nodes) { + final Resource res = rep.getCapability(); + totalMemory += res.getMemory(); + totalCores += res.getVirtualCores(); + ps.format(format, "NodeID", rep.getNodeId()); + ps.format(format, "Memory", res.getMemory() + " MB"); + ps.format(format, "vCores", res.getVirtualCores()); + ps.format(format, "HealthReport", rep.getHealthReport()); + ps.format(format, "Containers", rep.getNumContainers()); + ps.println("+---------------------------------------+"); + } + ps.println("Summary: totalMemory " + totalMemory + " totalCores " + totalCores); + List qInfo = yarnClient.getAllQueues(); + for (QueueInfo q : qInfo) { + ps.println("Queue: " + q.getQueueName() + ", Current Capacity: " + q.getCurrentCapacity() + " Max Capacity: " + + q.getMaximumCapacity() + " Applications: " + q.getApplications().size()); + } + yarnClient.stop(); + return baos.toString(); + } catch (Exception e) { + throw new RuntimeException("Couldn't get cluster description", e); } - ps.println("Summary: totalMemory " + totalMemory + " totalCores " + totalCores); - List qInfo = yarnClient.getAllQueues(); - for(QueueInfo q : qInfo) { - ps.println("Queue: " + q.getQueueName() + ", Current Capacity: " + q.getCurrentCapacity() + " Max Capacity: " + - q.getMaximumCapacity() + " Applications: " + q.getApplications().size()); - } - yarnClient.stop(); - return baos.toString(); } public String getSessionFilesDir() { @@ -918,9 +978,6 @@ public void setAttemptFailuresValidityInterval( private static class YarnDeploymentException extends RuntimeException { private static final long serialVersionUID = -812040641215388943L; - public YarnDeploymentException() { - } - public YarnDeploymentException(String message) { super(message); } @@ -954,5 +1011,24 @@ public void run() { } } } + + /** + * Creates a YarnClusterClient; may be overriden in tests + */ + protected YarnClusterClient createYarnClusterClient( + AbstractYarnClusterDescriptor descriptor, + YarnClient yarnClient, + ApplicationReport report, + org.apache.flink.configuration.Configuration flinkConfiguration, + Path sessionFilesDir, + boolean perJobCluster) throws IOException, YarnException { + return new YarnClusterClient( + descriptor, + yarnClient, + report, + flinkConfiguration, + sessionFilesDir, + perJobCluster); + } } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java index a5b8af767cba5..9130fdde6ebd1 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.util.Preconditions; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -55,6 +56,7 @@ import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -77,9 +79,6 @@ public class YarnClusterClient extends ClusterClient { // (HDFS) location of the files required to run on YARN. Needed here to delete them on shutdown. private final Path sessionFilesDir; - /** The leader retrieval service for connecting to the cluster and finding the active leader. */ - private final LeaderRetrievalService leaderRetrievalService; - //---------- Class internal fields ------------------- private final AbstractYarnClusterDescriptor clusterDescriptor; @@ -92,6 +91,7 @@ public class YarnClusterClient extends ClusterClient { private boolean isConnected = false; + private final boolean perJobCluster; /** * Create a new Flink on YARN cluster. @@ -101,6 +101,7 @@ public class YarnClusterClient extends ClusterClient { * @param appReport the YARN application ID * @param flinkConfig Flink configuration * @param sessionFilesDir Location of files required for YARN session + * @param perJobCluster Indicator whether this cluster is only created for a single job and then shutdown * @throws IOException * @throws YarnException */ @@ -109,7 +110,8 @@ public YarnClusterClient( final YarnClient yarnClient, final ApplicationReport appReport, org.apache.flink.configuration.Configuration flinkConfig, - Path sessionFilesDir) throws IOException, YarnException { + Path sessionFilesDir, + boolean perJobCluster) throws IOException, YarnException { super(flinkConfig); @@ -122,18 +124,16 @@ public YarnClusterClient( this.applicationId = appReport; this.appId = appReport.getApplicationId(); this.trackingURL = appReport.getTrackingUrl(); + this.perJobCluster = perJobCluster; + /* The leader retrieval service for connecting to the cluster and finding the active leader. */ + LeaderRetrievalService leaderRetrievalService; try { leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig); } catch (Exception e) { throw new IOException("Could not create the leader retrieval service.", e); } - - if (isConnected) { - throw new IllegalStateException("Already connected to the cluster."); - } - // start application client LOG.info("Start application client."); @@ -182,28 +182,31 @@ public void run() { isConnected = true; - logAndSysout("Waiting until all TaskManagers have connected"); + if (perJobCluster) { - while(true) { - GetClusterStatusResponse status = getClusterStatus(); - if (status != null) { - if (status.numRegisteredTaskManagers() < clusterDescriptor.getTaskManagerCount()) { - logAndSysout("TaskManager status (" + status.numRegisteredTaskManagers() + "/" - + clusterDescriptor.getTaskManagerCount() + ")"); + logAndSysout("Waiting until all TaskManagers have connected"); + + while (true) { + GetClusterStatusResponse status = getClusterStatus(); + if (status != null) { + if (status.numRegisteredTaskManagers() < clusterDescriptor.getTaskManagerCount()) { + logAndSysout("TaskManager status (" + status.numRegisteredTaskManagers() + "/" + + clusterDescriptor.getTaskManagerCount() + ")"); + } else { + logAndSysout("All TaskManagers are connected"); + break; + } } else { - logAndSysout("All TaskManagers are connected"); - break; + logAndSysout("No status updates from the YARN cluster received so far. Waiting ..."); } - } else { - logAndSysout("No status updates from the YARN cluster received so far. Waiting ..."); - } - try { - Thread.sleep(500); - } catch (InterruptedException e) { - LOG.error("Interrupted while waiting for TaskManagers"); - System.err.println("Thread is interrupted"); - throw new IOException("Interrupted while waiting for TaskManagers", e); + try { + Thread.sleep(500); + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for TaskManagers"); + System.err.println("Thread is interrupted"); + throw new IOException("Interrupted while waiting for TaskManagers", e); + } } } } @@ -214,9 +217,12 @@ public void disconnect() { } LOG.info("Disconnecting YarnClusterClient from ApplicationMaster"); - if(!Runtime.getRuntime().removeShutdownHook(clientShutdownHook)) { - LOG.warn("Error while removing the shutdown hook. The YARN session might be killed unintentionally"); + try { + Runtime.getRuntime().removeShutdownHook(clientShutdownHook); + } catch (IllegalStateException e) { + // we are already in the shutdown hook } + // tell the actor to shut down. applicationClient.tell(PoisonPill.getInstance(), applicationClient); @@ -265,12 +271,30 @@ public int getMaxSlots() { @Override protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { - if (isDetached()) { - JobSubmissionResult result = super.runDetached(jobGraph, classLoader); + if (perJobCluster) { stopAfterJob(jobGraph.getJobID()); - return result; + } + + if (isDetached()) { + return super.runDetached(jobGraph, classLoader); } else { - return super.run(jobGraph, classLoader); + try { + return super.run(jobGraph, classLoader); + } finally { + // show cluster status + List msgs = getNewMessages(); + if (msgs != null && msgs.size() > 1) { + + logAndSysout("The following messages were created by the YARN cluster while running the Job:"); + for (String msg : msgs) { + logAndSysout(msg); + } + } + if (getApplicationStatus() != ApplicationStatus.SUCCEEDED) { + logAndSysout("YARN cluster is in non-successful state " + getApplicationStatus()); + logAndSysout("YARN Diagnostics: " + getDiagnostics()); + } + } } } @@ -298,8 +322,9 @@ public GetClusterStatusResponse getClusterStatus() { throw new IllegalStateException("The cluster is not connected to the ApplicationMaster."); } if(hasBeenShutdown()) { - throw new RuntimeException("The YarnClusterClient has already been stopped"); + return null; } + Future clusterStatusOption = ask(applicationClient, YarnMessages.getLocalGetyarnClusterStatus(), akkaTimeout); Object clusterStatus; try { @@ -417,32 +442,20 @@ public List getNewMessages() { @Override public void finalizeCluster() { - if (!isConnected) { - throw new IllegalStateException("The cluster has been not been connected to the ApplicationMaster."); - } - - if (isDetached()) { - // only disconnect if we are running detached + if (isDetached() || !perJobCluster) { + // only disconnect if we are not running a per job cluster disconnect(); - return; + } else { + shutdownCluster(); } + } - // show cluster status - - List msgs = getNewMessages(); - if (msgs != null && msgs.size() > 1) { + public void shutdownCluster() { - logAndSysout("The following messages were created by the YARN cluster while running the Job:"); - for (String msg : msgs) { - logAndSysout(msg); - } - } - if (getApplicationStatus() != ApplicationStatus.SUCCEEDED) { - logAndSysout("YARN cluster is in non-successful state " + getApplicationStatus()); - logAndSysout("YARN Diagnostics: " + getDiagnostics()); + if (!isConnected) { + throw new IllegalStateException("The cluster has been not been connected to the ApplicationMaster."); } - if(hasBeenShutDown.getAndSet(true)) { return; } @@ -471,13 +484,30 @@ public void finalizeCluster() { actorSystem.awaitTermination(); } - LOG.info("Deleting files in " + sessionFilesDir); try { - FileSystem shutFS = FileSystem.get(hadoopConfig); - shutFS.delete(sessionFilesDir, true); // delete conf and jar file. - shutFS.close(); - }catch(IOException e){ - LOG.error("Could not delete the Flink jar and configuration files in HDFS..", e); + File propertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(flinkConfig); + if (propertiesFile.isFile()) { + if (propertiesFile.delete()) { + LOG.info("Deleted Yarn properties file at {}", propertiesFile.getAbsoluteFile().toString()); + } else { + LOG.warn("Couldn't delete Yarn properties file at {}", propertiesFile.getAbsoluteFile().toString()); + } + } + } catch (Exception e) { + LOG.warn("Exception while deleting the JobManager address file", e); + } + + if (sessionFilesDir != null) { + LOG.info("Deleting files in " + sessionFilesDir); + try { + FileSystem shutFS = FileSystem.get(hadoopConfig); + shutFS.delete(sessionFilesDir, true); // delete conf and jar file. + shutFS.close(); + } catch (IOException e) { + LOG.error("Could not delete the Flink jar and configuration files in HDFS..", e); + } + } else { + LOG.warn("Session file directory not set. Not deleting session files"); } try { @@ -571,7 +601,6 @@ public void run() { @Override public boolean isDetached() { - // either we have set detached mode using the general '-d' flag or using the Yarn CLI flag 'yd' return super.isDetached() || clusterDescriptor.isDetachedMode(); } } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 43e7c7be09be7..5f745b27da1f4 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -17,10 +17,12 @@ */ package org.apache.flink.yarn; + /** * Default implementation of {@link AbstractYarnClusterDescriptor} which starts an {@link YarnApplicationMasterRunner}. */ public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor { + @Override protected Class getApplicationMasterClass() { return YarnApplicationMasterRunner.class; diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index fdcc8584f5557..5eca4f125feb5 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -28,11 +28,9 @@ import org.apache.flink.client.ClientUtils; import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CustomCommandLine; -import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.yarn.AbstractYarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterClient; import org.apache.flink.runtime.clusterframework.ApplicationStatus; @@ -59,6 +57,8 @@ import java.util.Map; import java.util.Properties; +import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION; + /** * Class handling the command line interface to the YARN session. */ @@ -97,8 +97,11 @@ public class FlinkYarnSessionCli implements CustomCommandLine private final Option CONTAINER; private final Option SLOTS; private final Option DETACHED; + @Deprecated private final Option STREAMING; private final Option NAME; + + private final Options ALL_OPTIONS; /** * Dynamic properties allow the user to specify additional configuration values with -D, such as @@ -118,7 +121,7 @@ public FlinkYarnSessionCli(String shortPrefix, String longPrefix) { public FlinkYarnSessionCli(String shortPrefix, String longPrefix, boolean acceptInteractiveInput) { this.acceptInteractiveInput = acceptInteractiveInput; - + QUERY = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)"); APPLICATION_ID = new Option(shortPrefix + "id", longPrefix + "applicationId", true, "Attach to running YARN session"); QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue."); @@ -132,37 +135,24 @@ public FlinkYarnSessionCli(String shortPrefix, String longPrefix, boolean accept DETACHED = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached"); STREAMING = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode"); NAME = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN"); + + ALL_OPTIONS = new Options(); + ALL_OPTIONS.addOption(FLINK_JAR); + ALL_OPTIONS.addOption(JM_MEMORY); + ALL_OPTIONS.addOption(TM_MEMORY); + ALL_OPTIONS.addOption(CONTAINER); + ALL_OPTIONS.addOption(QUEUE); + ALL_OPTIONS.addOption(QUERY); + ALL_OPTIONS.addOption(SHIP_PATH); + ALL_OPTIONS.addOption(SLOTS); + ALL_OPTIONS.addOption(DYNAMIC_PROPERTIES); + ALL_OPTIONS.addOption(DETACHED); + ALL_OPTIONS.addOption(STREAMING); + ALL_OPTIONS.addOption(NAME); + ALL_OPTIONS.addOption(APPLICATION_ID); } - /** - * Attaches a new Yarn Client to running YARN application. - * - */ - public AbstractFlinkYarnCluster attachFlinkYarnClient(CommandLine cmd) { - AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient(); - if (flinkYarnClient == null) { - return null; - } - if (!cmd.hasOption(APPLICATION_ID.getOpt())) { - LOG.error("Missing required argument " + APPLICATION_ID.getOpt()); - printUsage(); - return null; - } - - String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv(); - GlobalConfiguration.loadConfiguration(confDirPath); - Configuration flinkConfiguration = GlobalConfiguration.getConfiguration(); - flinkYarnClient.setFlinkConfiguration(flinkConfiguration); - flinkYarnClient.setConfigurationDirectory(confDirPath); - - try { - return flinkYarnClient.attach(cmd.getOptionValue(APPLICATION_ID.getOpt())); - } catch (Exception e) { - LOG.error("Could not attach to YARN session", e); - return null; - } - } /** * Resumes from a Flink Yarn properties file * @param flinkConfiguration The flink configuration @@ -170,7 +160,7 @@ public AbstractFlinkYarnCluster attachFlinkYarnClient(CommandLine cmd) { */ private boolean resumeFromYarnProperties(Configuration flinkConfiguration) { // load the YARN properties - File propertiesFile = new File(getYarnPropertiesLocation(flinkConfiguration)); + File propertiesFile = getYarnPropertiesLocation(flinkConfiguration); if (!propertiesFile.exists()) { return false; } @@ -209,7 +199,7 @@ private boolean resumeFromYarnProperties(Configuration flinkConfiguration) { try { jobManagerAddress = ClientUtils.parseHostPortAddress(address); // store address in config from where it is retrieved by the retrieval service - CliFrontend.writeJobManagerAddressToConfig(flinkConfiguration, jobManagerAddress); + CliFrontend.setJobManagerAddressInConfig(flinkConfiguration, jobManagerAddress); } catch (Exception e) { throw new RuntimeException("YARN properties contain an invalid entry for JobManager address.", e); @@ -228,10 +218,9 @@ private boolean resumeFromYarnProperties(Configuration flinkConfiguration) { return true; } - public YarnClusterDescriptor createDescriptor(String defaultApplicationName, CommandLine cmd) { - + public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationName, CommandLine cmd) { - YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(); + AbstractYarnClusterDescriptor yarnClusterDescriptor = getClusterDescriptor(); if (!cmd.hasOption(CONTAINER.getOpt())) { // number of containers is required option! LOG.error("Missing required argument {}", CONTAINER.getOpt()); @@ -343,19 +332,6 @@ public boolean accept(File dir, String name) { return yarnClusterDescriptor; } - @Override - public YarnClusterClient createClient(String applicationName, CommandLine cmdLine) throws Exception { - - YarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine); - - try { - return yarnClusterDescriptor.deploy(); - } catch (Exception e) { - throw new RuntimeException("Error deploying the YARN cluster", e); - } - - } - private void printUsage() { System.out.println("Usage:"); HelpFormatter formatter = new HelpFormatter(); @@ -367,17 +343,10 @@ private void printUsage() { formatter.printHelp(" ", req); formatter.setSyntaxPrefix(" Optional"); - Options opt = new Options(); - opt.addOption(JM_MEMORY); - opt.addOption(TM_MEMORY); - opt.addOption(QUERY); - opt.addOption(QUEUE); - opt.addOption(SLOTS); - opt.addOption(DYNAMIC_PROPERTIES); - opt.addOption(DETACHED); - opt.addOption(STREAMING); - opt.addOption(NAME); - formatter.printHelp(" ", opt); + Options options = new Options(); + addGeneralOptions(options); + addRunOptions(options); + formatter.printHelp(" ", options); } private static void writeYarnProperties(Properties properties, File propertiesFile) { @@ -439,6 +408,7 @@ public static void runInteractiveCli(YarnClusterClient yarnCluster, boolean read switch (command) { case "quit": case "stop": + yarnCluster.shutdownCluster(); break label; case "help": @@ -466,38 +436,62 @@ public static void main(String[] args) { } @Override - public String getIdentifier() { + public boolean isActive(CommandLine commandLine, Configuration configuration) { + String jobManagerOption = commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null); + boolean yarnJobManager = ID.equals(jobManagerOption); + return yarnJobManager || resumeFromYarnProperties(configuration); + } + + @Override + public String getId() { return ID; } - public void addOptions(Options options) { - options.addOption(FLINK_JAR); - options.addOption(JM_MEMORY); - options.addOption(TM_MEMORY); - options.addOption(CONTAINER); - options.addOption(QUEUE); - options.addOption(QUERY); - options.addOption(SHIP_PATH); - options.addOption(SLOTS); - options.addOption(DYNAMIC_PROPERTIES); - options.addOption(DETACHED); - options.addOption(STREAMING); - options.addOption(NAME); + @Override + public void addRunOptions(Options baseOptions) { + for (Object option : ALL_OPTIONS.getOptions()) { + baseOptions.addOption((Option) option); + } } + @Override + public void addGeneralOptions(Options baseOptions) { + baseOptions.addOption(APPLICATION_ID); + } - public void getYARNAttachCLIOptions(Options options) { - options.addOption(APPLICATION_ID); + @Override + public YarnClusterClient retrieveCluster( + CommandLine cmdLine, + Configuration config) throws UnsupportedOperationException { + + // first check for an application id + if (cmdLine.hasOption(APPLICATION_ID.getOpt())) { + String applicationID = cmdLine.getOptionValue(APPLICATION_ID.getOpt()); + AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(); + yarnDescriptor.setFlinkConfiguration(config); + return yarnDescriptor.retrieve(applicationID); + // then try to load from yarn properties + } else if (resumeFromYarnProperties(config)) { + AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(); + yarnDescriptor.setFlinkConfiguration(config); + return yarnDescriptor.retrieveFromConfig(config); + } + + throw new UnsupportedOperationException("Could not resume a Yarn cluster."); } @Override - public ClusterClient retrieveCluster(Configuration config) throws Exception { + public YarnClusterClient createCluster(String applicationName, CommandLine cmdLine, Configuration config) { + + AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine); + yarnClusterDescriptor.setFlinkConfiguration(config); - if(resumeFromYarnProperties(config)) { - return new StandaloneClusterClient(config); + try { + return yarnClusterDescriptor.deploy(); + } catch (Exception e) { + throw new RuntimeException("Error deploying the YARN cluster", e); } - return null; } public int run(String[] args) { @@ -505,7 +499,8 @@ public int run(String[] args) { // Command Line Options // Options options = new Options(); - addOptions(options); + addGeneralOptions(options); + addRunOptions(options); CommandLineParser parser = new PosixParser(); CommandLine cmd; @@ -519,10 +514,10 @@ public int run(String[] args) { // Query cluster for metrics if (cmd.hasOption(QUERY.getOpt())) { - YarnClusterDescriptor flinkYarnClient = new YarnClusterDescriptor(); + AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(); String description; try { - description = flinkYarnClient.getClusterDescription(); + description = yarnDescriptor.getClusterDescription(); } catch (Exception e) { System.err.println("Error while querying the YARN cluster for available resources: "+e.getMessage()); e.printStackTrace(System.err); @@ -531,56 +526,61 @@ public int run(String[] args) { System.out.println(description); return 0; } else if (cmd.hasOption(APPLICATION_ID.getOpt())) { - yarnCluster = attachFlinkYarnClient(cmd); + + AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(); + try { + yarnCluster = yarnDescriptor.retrieve(cmd.getOptionValue(APPLICATION_ID.getOpt())); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve existing Yarn application", e); + } if (detachedMode) { LOG.info("The Flink YARN client has been started in detached mode. In order to stop " + "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + - "yarn application -kill "+yarnCluster.getApplicationId()); + "yarn application -kill "+yarnCluster.getClusterIdentifier()); + yarnCluster.disconnect(); } else { - runInteractiveCli(yarnCluster); - - if (!yarnCluster.hasBeenStopped()) { - LOG.info("Command Line Interface requested session shutdown"); - yarnCluster.shutdown(false); - } + runInteractiveCli(yarnCluster, true); } } else { - YarnClusterDescriptor flinkYarnClient; + AbstractYarnClusterDescriptor yarnDescriptor; try { - flinkYarnClient = createDescriptor(null, cmd); + yarnDescriptor = createDescriptor(null, cmd); } catch (Exception e) { System.err.println("Error while starting the YARN Client. Please check log output!"); return 1; } try { - yarnCluster = flinkYarnClient.deploy(); + yarnCluster = yarnDescriptor.deploy(); } catch (Exception e) { System.err.println("Error while deploying YARN cluster: "+e.getMessage()); e.printStackTrace(System.err); return 1; } //------------------ ClusterClient deployed, handle connection details - String jobManagerAddress = yarnCluster.getJobManagerAddress().getAddress().getHostAddress() + ":" + yarnCluster.getJobManagerAddress().getPort(); + String jobManagerAddress = + yarnCluster.getJobManagerAddress().getAddress().getHostAddress() + + ":" + yarnCluster.getJobManagerAddress().getPort(); + System.out.println("Flink JobManager is now running on " + jobManagerAddress); System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL()); // file that we write into the conf/ dir containing the jobManager address and the dop. - File yarnPropertiesFile = new File(getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration())); + File yarnPropertiesFile = getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration()); Properties yarnProps = new Properties(); yarnProps.setProperty(YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress); - if (flinkYarnClient.getTaskManagerSlots() != -1) { + if (yarnDescriptor.getTaskManagerSlots() != -1) { String parallelism = - Integer.toString(flinkYarnClient.getTaskManagerSlots() * flinkYarnClient.getTaskManagerCount()); + Integer.toString(yarnDescriptor.getTaskManagerSlots() * yarnDescriptor.getTaskManagerCount()); yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, parallelism); } // add dynamic properties - if (flinkYarnClient.getDynamicPropertiesEncoded() != null) { + if (yarnDescriptor.getDynamicPropertiesEncoded() != null) { yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, - flinkYarnClient.getDynamicPropertiesEncoded()); + yarnDescriptor.getDynamicPropertiesEncoded()); } writeYarnProperties(yarnProps, yarnPropertiesFile); @@ -592,21 +592,10 @@ public int run(String[] args) { "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + "yarn application -kill " + yarnCluster.getClusterIdentifier() + "\n" + "Please also note that the temporary files of the YARN session in {} will not be removed.", - flinkYarnClient.getSessionFilesDir()); + yarnDescriptor.getSessionFilesDir()); yarnCluster.disconnect(); } else { runInteractiveCli(yarnCluster, acceptInteractiveInput); - - if (!yarnCluster.hasBeenShutdown()) { - LOG.info("Command Line Interface requested session shutdown"); - yarnCluster.shutdown(); - } - - try { - yarnPropertiesFile.delete(); - } catch (Exception e) { - LOG.warn("Exception while deleting the JobManager address file", e); - } } } return 0; @@ -649,11 +638,16 @@ public static Map getDynamicProperties(String dynamicPropertiesE } } - private static String getYarnPropertiesLocation(Configuration conf) { + public static File getYarnPropertiesLocation(Configuration conf) { String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir"); String currentUser = System.getProperty("user.name"); - String propertiesFileLocation = conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation); + String propertiesFileLocation = + conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation); + + return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser); + } - return propertiesFileLocation + File.separator + YARN_PROPERTIES_FILE + currentUser; + protected AbstractYarnClusterDescriptor getClusterDescriptor() { + return new YarnClusterDescriptor(); } }