Skip to content

Commit

Permalink
[FLINK-3937] programmatic resuming of clusters
Browse files Browse the repository at this point in the history
- integrates with and extends the refactoring of FLINK-3667
- enables to resume from Yarn properties or Yarn application id
- introduces additional StandaloneClusterDescriptor
- introduces DefaultCLI to get rid of standalone mode switches in CliFrontend
- various fixes and improvements
- remove legacy code from CliFrontend
- change activation code of CustomCommandLine interface
- use checked exceptions to signal supported operations
- remove all checked exceptions of type Exception
- fix logging and reduce verbosity of per-job clusters
- print 'id' argument in YarnSessionCli
- minor renaming of methods names
- improve documentation
- deprecate streaming option
- extend CliFrontendYarnAddressConfigurationTest
- move loading of custom CLIs to CliFrontend

This closes #2085
  • Loading branch information
mxm committed Jun 17, 2016
1 parent 875d4d2 commit f4ac852
Show file tree
Hide file tree
Showing 15 changed files with 1,062 additions and 629 deletions.
217 changes: 112 additions & 105 deletions flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
Expand Up @@ -18,8 +18,7 @@

package org.apache.flink.client;

import akka.actor.ActorSystem;

import org.apache.commons.cli.CommandLine;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
Expand All @@ -30,6 +29,7 @@
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.client.cli.InfoOptions;
import org.apache.flink.client.cli.ListOptions;
import org.apache.flink.client.cli.ProgramOptions;
Expand All @@ -39,7 +39,6 @@
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
Expand All @@ -56,7 +55,6 @@
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
Expand All @@ -67,20 +65,21 @@
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import scala.Some;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.text.SimpleDateFormat;
Expand All @@ -89,6 +88,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand All @@ -102,9 +102,11 @@
*/
public class CliFrontend {

private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class);

// actions
public static final String ACTION_RUN = "run";
public static final String ACTION_INFO = "info";
private static final String ACTION_RUN = "run";
private static final String ACTION_INFO = "info";
private static final String ACTION_LIST = "list";
private static final String ACTION_CANCEL = "cancel";
private static final String ACTION_STOP = "stop";
Expand All @@ -116,19 +118,24 @@ public class CliFrontend {
private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";

// --------------------------------------------------------------------------------------------

private static final List<CustomCommandLine> customCommandLine = new LinkedList<>();

static {
/** command line interface of the YARN session, with a special initialization here
* to prefix all options with y/yarn. */
loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", "yarn");
customCommandLine.add(new DefaultCLI());
}

// --------------------------------------------------------------------------------------------

private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class);


private final Configuration config;

private final FiniteDuration clientTimeout;

private final FiniteDuration lookupTimeout;

private ActorSystem actorSystem;

/**
*
* @throws Exception Thrown if the configuration directory was not found, the configuration could not be loaded
Expand All @@ -146,6 +153,8 @@ public CliFrontend(String configDir) throws Exception {
// load the configuration
LOG.info("Trying to load configuration file");
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
System.setProperty("FLINK_CONF_DIR", configDirectory.getAbsolutePath());

this.config = GlobalConfiguration.getConfiguration();

try {
Expand All @@ -156,7 +165,6 @@ public CliFrontend(String configDir) throws Exception {
}

this.clientTimeout = AkkaUtils.getClientTimeout(config);
this.lookupTimeout = AkkaUtils.getLookupTimeout(config);
}


Expand Down Expand Up @@ -798,112 +806,62 @@ else if (!jarFile.isFile()) {
*
* @param options Command line options
*/
protected void updateConfig(CommandLineOptions options) {
if(options.getJobManagerAddress() != null){
if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
jobManagerAddress = CliFrontendParser.getFlinkYarnSessionCli()
.attachFlinkYarnClient(options.getCommandLine())
.getJobManagerAddress();
InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress());
writeJobManagerAddressToConfig(config, jobManagerAddress);
protected ClusterClient retrieveClient(CommandLineOptions options) {
CustomCommandLine customCLI = getActiveCustomCommandLine(options.getCommandLine());
try {
ClusterClient client = customCLI.retrieveCluster(options.getCommandLine(), config);
LOG.info("Using address {} to connect to JobManager.", client.getJobManagerAddressFromConfig());
return client;
} catch (Exception e) {
LOG.error("Couldn't retrieve {} cluster.", customCLI.getId(), e);
throw new IllegalConfigurationException("Couldn't retrieve client for cluster", e);
}
}

/**
* Retrieves the {@link ActorGateway} for the JobManager. The JobManager address is retrieved
* Retrieves the {@link ActorGateway} for the JobManager. The ClusterClient is retrieved
* from the provided {@link CommandLineOptions}.
*
* @param options CommandLineOptions specifying the JobManager URL
* @return Gateway to the JobManager
* @throws Exception
*/
protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws Exception {
// overwrite config values with given command line options
updateConfig(options);

// start an actor system if needed
if (this.actorSystem == null) {
LOG.info("Starting actor system to communicate with JobManager");
try {
scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", 0);
this.actorSystem = AkkaUtils.createActorSystem(
config,
new Some<scala.Tuple2<String, Object>>(systemEndpoint));
}
catch (Exception e) {
throw new IOException("Could not start actor system to communicate with JobManager", e);
}

LOG.info("Actor system successfully started");
}

LOG.info("Trying to lookup the JobManager gateway");
// Retrieve the ActorGateway from the LeaderRetrievalService
LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config);

return LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, lookupTimeout);
return retrieveClient(options).getJobManagerGateway();
}

/**
* Retrieves a {@link ClusterClient} object from the given command line options and other parameters.
*
* @param options Command line options which contain JobManager address
* Creates a {@link ClusterClient} object from the given command line options and other parameters.
* @param options Command line options
* @param programName Program name
* @throws Exception
*/
protected ClusterClient getClient(
CommandLineOptions options,
String programName)
throws Exception {
InetSocketAddress jobManagerAddress;

// try to get the JobManager address via command-line args
if (options.getJobManagerAddress() != null) {
String programName) throws Exception {

// Get the custom command-lines (e.g. Yarn/Mesos)
CustomCommandLine<?> activeCommandLine =
CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress());
// Get the custom command-line (e.g. Standalone/Yarn/Mesos)
CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());

if (activeCommandLine != null) {
logAndSysout(activeCommandLine.getIdentifier() + " mode detected. Switching Log4j output to console");

// Default yarn application name to use, if nothing is specified on the command line
ClusterClient client;
try {
client = activeCommandLine.retrieveCluster(options.getCommandLine(), config);
logAndSysout("Cluster retrieved");
} catch (UnsupportedOperationException e) {
try {
String applicationName = "Flink Application: " + programName;

ClusterClient client = activeCommandLine.createClient(applicationName, options.getCommandLine());

client = activeCommandLine.createCluster(applicationName, options.getCommandLine(), config);
logAndSysout("Cluster started");
logAndSysout("JobManager web interface address " + client.getWebInterfaceURL());

return client;
} else {
// job manager address supplied on the command-line
LOG.info("Using address {} to connect to JobManager.", options.getJobManagerAddress());
jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress());
writeJobManagerAddressToConfig(config, jobManagerAddress);
return new StandaloneClusterClient(config);
}

// try to get the JobManager address via resuming of a cluster
} else {
for (CustomCommandLine cli : CliFrontendParser.getAllCustomCommandLine().values()) {
ClusterClient client = cli.retrieveCluster(config);
if (client != null) {
LOG.info("Using address {} to connect to JobManager.", client.getJobManagerAddressFromConfig());
return client;
}
} catch (UnsupportedOperationException e2) {
throw new IllegalConfigurationException(
"The JobManager address is neither provided at the command-line, " +
"nor configured in flink-conf.yaml.");
}
}

// read JobManager address from the config
if (config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) != null) {
return new StandaloneClusterClient(config);
// We tried hard but couldn't find a JobManager address
} else {
throw new IllegalConfigurationException(
"The JobManager address is neither provided at the command-line, " +
"nor configured in flink-conf.yaml.");
}
logAndSysout("Using address " + client.getJobManagerAddress() + " to connect to JobManager.");
logAndSysout("JobManager web interface address " + client.getWebInterfaceURL());
return client;
}

// --------------------------------------------------------------------------------------------
Expand All @@ -917,7 +875,7 @@ protected ClusterClient getClient(
* @return The return code for the process.
*/
private int handleArgException(Exception e) {
LOG.error("Invalid command line arguments." + (e.getMessage() == null ? "" : e.getMessage()));
LOG.error("Invalid command line arguments. " + (e.getMessage() == null ? "" : e.getMessage()));

System.out.println(e.getMessage());
System.out.println();
Expand Down Expand Up @@ -1039,14 +997,6 @@ public Integer run() throws Exception {
}
}

public void shutdown() {
ActorSystem sys = this.actorSystem;
if (sys != null) {
this.actorSystem = null;
sys.shutdown();
}
}

/**
* Submits the job based on the arguments
*/
Expand All @@ -1070,7 +1020,8 @@ public static void main(String[] args) {
// --------------------------------------------------------------------------------------------

public static String getConfigurationDirectoryFromEnv() {
String location = System.getenv(ENV_CONFIG_DIRECTORY);
String envLocation = System.getenv(ENV_CONFIG_DIRECTORY);
String location = envLocation != null ? envLocation : System.getProperty(ENV_CONFIG_DIRECTORY);

if (location != null) {
if (new File(location).exists()) {
Expand Down Expand Up @@ -1102,9 +1053,65 @@ else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
* @param address Address to write to the configuration
* @param config The config to write to
*/
public static void writeJobManagerAddressToConfig(Configuration config, InetSocketAddress address) {
public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostName());
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort());
}

// --------------------------------------------------------------------------------------------
// Custom command-line
// --------------------------------------------------------------------------------------------

/**
* Gets the custom command-line for the arguments.
* @param commandLine The input to the command-line.
* @return custom command-line which is active (may only be one at a time)
*/
public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
for (CustomCommandLine cli : customCommandLine) {
if (cli.isActive(commandLine, config)) {
return cli;
}
}
throw new IllegalStateException("No command-line ran.");
}

/**
* Retrieves the loaded custom command-lines.
* @return An unmodifiyable list of loaded custom command-lines.
*/
public static List<CustomCommandLine> getCustomCommandLineList() {
return Collections.unmodifiableList(customCommandLine);
}

/**
* Loads a class from the classpath that implements the CustomCommandLine interface.
* @param className The fully-qualified class name to load.
* @param params The constructor parameters
*/
private static void loadCustomCommandLine(String className, Object... params) {

try {
Class<? extends CustomCommandLine> customCliClass =
Class.forName(className).asSubclass(CustomCommandLine.class);

// construct class types from the parameters
Class<?>[] types = new Class<?>[params.length];
for (int i = 0; i < params.length; i++) {
Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
types[i] = params[i].getClass();
}

Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types);
final CustomCommandLine cli = constructor.newInstance(params);

customCommandLine.add(cli);

} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException
| InvocationTargetException e) {
LOG.warn("Unable to locate custom CLI class {}. " +
"Flink is not compiled with support for this class.", className, e);
}
}

}

0 comments on commit f4ac852

Please sign in to comment.