Skip to content

Commit

Permalink
[FLINK-4913][yarn] include user jars in system class loader
Browse files Browse the repository at this point in the history
When deploying a Yarn cluster for a single job, this change
pre-configures the cluster to include the user jar(s) on all nodes.
This eliminates the need to upload jar files through the
BlobClient. More importantly, it loads the user classes only once and
not on every instantiation of a Task. This also reduces the JobManager
class loading upon recovery of a failed job.

This closes #2692.
  • Loading branch information
mxm committed Oct 27, 2016
1 parent fc730bb commit 2b600d3
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 40 deletions.
Expand Up @@ -236,7 +236,7 @@ protected int run(String[] args) {
ClusterClient client = null; ClusterClient client = null;
try { try {


client = createClient(options, program.getMainClassName()); client = createClient(options, program);
client.setPrintStatusDuringExecution(options.getStdoutLogging()); client.setPrintStatusDuringExecution(options.getStdoutLogging());
client.setDetached(options.getDetachedMode()); client.setDetached(options.getDetachedMode());
LOG.debug("Client slots is set to {}", client.getMaxSlots()); LOG.debug("Client slots is set to {}", client.getMaxSlots());
Expand Down Expand Up @@ -928,12 +928,12 @@ protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws E
/** /**
* Creates a {@link ClusterClient} object from the given command line options and other parameters. * Creates a {@link ClusterClient} object from the given command line options and other parameters.
* @param options Command line options * @param options Command line options
* @param programName Program name * @param program The program for which to create the client.
* @throws Exception * @throws Exception
*/ */
protected ClusterClient createClient( protected ClusterClient createClient(
CommandLineOptions options, CommandLineOptions options,
String programName) throws Exception { PackagedProgram program) throws Exception {


// Get the custom command-line (e.g. Standalone/Yarn/Mesos) // Get the custom command-line (e.g. Standalone/Yarn/Mesos)
CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine()); CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
Expand All @@ -944,8 +944,12 @@ protected ClusterClient createClient(
logAndSysout("Cluster configuration: " + client.getClusterIdentifier()); logAndSysout("Cluster configuration: " + client.getClusterIdentifier());
} catch (UnsupportedOperationException e) { } catch (UnsupportedOperationException e) {
try { try {
String applicationName = "Flink Application: " + programName; String applicationName = "Flink Application: " + program.getMainClassName();
client = activeCommandLine.createCluster(applicationName, options.getCommandLine(), config); client = activeCommandLine.createCluster(
applicationName,
options.getCommandLine(),
config,
program.getAllLibraries());
logAndSysout("Cluster started: " + client.getClusterIdentifier()); logAndSysout("Cluster started: " + client.getClusterIdentifier());
} catch (UnsupportedOperationException e2) { } catch (UnsupportedOperationException e2) {
throw new IllegalConfigurationException( throw new IllegalConfigurationException(
Expand Down
Expand Up @@ -22,6 +22,9 @@
import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;


import java.net.URL;
import java.util.List;



/** /**
* Custom command-line interface to load hooks for the command-line interface. * Custom command-line interface to load hooks for the command-line interface.
Expand Down Expand Up @@ -61,15 +64,22 @@ public interface CustomCommandLine<ClusterType extends ClusterClient> {
* @return Client if a cluster could be retrieved * @return Client if a cluster could be retrieved
* @throws UnsupportedOperationException if the operation is not supported * @throws UnsupportedOperationException if the operation is not supported
*/ */
ClusterType retrieveCluster(CommandLine commandLine, Configuration config) throws UnsupportedOperationException; ClusterType retrieveCluster(
CommandLine commandLine,
Configuration config) throws UnsupportedOperationException;


/** /**
* Creates the client for the cluster * Creates the client for the cluster
* @param applicationName The application name to use * @param applicationName The application name to use
* @param commandLine The command-line options parsed by the CliFrontend * @param commandLine The command-line options parsed by the CliFrontend
* @param config The Flink config to use * @param config The Flink config to use
* @param userJarFiles User jar files to include in the classpath of the cluster.
* @return The client to communicate with the cluster which the CustomCommandLine brought up. * @return The client to communicate with the cluster which the CustomCommandLine brought up.
* @throws UnsupportedOperationException if the operation is not supported * @throws UnsupportedOperationException if the operation is not supported
*/ */
ClusterType createCluster(String applicationName, CommandLine commandLine, Configuration config) throws UnsupportedOperationException; ClusterType createCluster(
String applicationName,
CommandLine commandLine,
Configuration config,
List<URL> userJarFiles) throws UnsupportedOperationException;
} }
Expand Up @@ -27,6 +27,8 @@
import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.HighAvailabilityOptions;


import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URL;
import java.util.List;


import static org.apache.flink.client.CliFrontend.setJobManagerAddressInConfig; import static org.apache.flink.client.CliFrontend.setJobManagerAddressInConfig;


Expand Down Expand Up @@ -76,7 +78,8 @@ public StandaloneClusterClient retrieveCluster(CommandLine commandLine, Configur
public StandaloneClusterClient createCluster( public StandaloneClusterClient createCluster(
String applicationName, String applicationName,
CommandLine commandLine, CommandLine commandLine,
Configuration config) throws UnsupportedOperationException { Configuration config,
List<URL> userJarFiles) throws UnsupportedOperationException {


StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config); StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);
return descriptor.deploy(); return descriptor.deploy();
Expand Down
Expand Up @@ -308,11 +308,27 @@ public JobSubmissionResult run(PackagedProgram prog, int parallelism)
{ {
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
if (prog.isUsingProgramEntryPoint()) { if (prog.isUsingProgramEntryPoint()) {
return run(prog.getPlanWithJars(), parallelism, prog.getSavepointPath());
final JobWithJars jobWithJars;
if (hasUserJarsInClassPath(prog.getAllLibraries())) {
jobWithJars = prog.getPlanWithoutJars();
} else {
jobWithJars = prog.getPlanWithJars();
}

return run(jobWithJars, parallelism, prog.getSavepointPath());
} }
else if (prog.isUsingInteractiveMode()) { else if (prog.isUsingInteractiveMode()) {
LOG.info("Starting program in interactive mode"); LOG.info("Starting program in interactive mode");
ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, prog.getAllLibraries(),
final List<URL> libraries;
if (hasUserJarsInClassPath(prog.getAllLibraries())) {
libraries = Collections.emptyList();
} else {
libraries = prog.getAllLibraries();
}

ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries,
prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(), prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(),
prog.getSavepointPath()); prog.getSavepointPath());
ContextEnvironment.setAsContext(factory); ContextEnvironment.setAsContext(factory);
Expand Down Expand Up @@ -349,7 +365,7 @@ public JobSubmissionResult run(JobWithJars program, int parallelism) throws Prog
* Runs a program on the Flink cluster to which this client is connected. The call blocks until the * Runs a program on the Flink cluster to which this client is connected. The call blocks until the
* execution is complete, and returns afterwards. * execution is complete, and returns afterwards.
* *
* @param program The program to be executed. * @param jobWithJars The program to be executed.
* @param parallelism The default parallelism to use when running the program. The default parallelism is used * @param parallelism The default parallelism to use when running the program. The default parallelism is used
* when the program does not set a parallelism by itself. * when the program does not set a parallelism by itself.
* *
Expand All @@ -359,15 +375,15 @@ public JobSubmissionResult run(JobWithJars program, int parallelism) throws Prog
* i.e. the job-manager is unreachable, or due to the fact that the * i.e. the job-manager is unreachable, or due to the fact that the
* parallel execution failed. * parallel execution failed.
*/ */
public JobSubmissionResult run(JobWithJars program, int parallelism, String savepointPath) public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, String savepointPath)
throws CompilerException, ProgramInvocationException { throws CompilerException, ProgramInvocationException {
ClassLoader classLoader = program.getUserCodeClassLoader(); ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
if (classLoader == null) { if (classLoader == null) {
throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader."); throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
} }


OptimizedPlan optPlan = getOptimizedPlan(compiler, program, parallelism); OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism);
return run(optPlan, program.getJarFiles(), program.getClasspaths(), classLoader, savepointPath); return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointPath);
} }


public JobSubmissionResult run( public JobSubmissionResult run(
Expand Down Expand Up @@ -631,10 +647,6 @@ private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars pr
return getOptimizedPlan(compiler, prog.getPlan(), parallelism); return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
} }


public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException {
return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), null);
}

public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, String savepointPath) throws ProgramInvocationException { public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, String savepointPath) throws ProgramInvocationException {
return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), savepointPath); return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), savepointPath);
} }
Expand Down Expand Up @@ -760,6 +772,12 @@ public Configuration getFlinkConfiguration() {
*/ */
public abstract int getMaxSlots(); public abstract int getMaxSlots();


/**
* Returns true if the client already has the user jar and providing it again would
* result in duplicate uploading of the jar.
*/
public abstract boolean hasUserJarsInClassPath(List<URL> userJarFiles);

/** /**
* Calls the subclasses' submitJob method. It may decide to simply call one of the run methods or it may perform * Calls the subclasses' submitJob method. It may decide to simply call one of the run methods or it may perform
* some custom job submission logic. * some custom job submission logic.
Expand Down
Expand Up @@ -281,24 +281,39 @@ public boolean isUsingProgramEntryPoint() {
return this.program != null; return this.program != null;
} }


/**
* Returns the plan without the required jars when the files are already provided by the cluster.
*
* @return The plan without attached jar files.
* @throws ProgramInvocationException
*/
public JobWithJars getPlanWithoutJars() throws ProgramInvocationException {
if (isUsingProgramEntryPoint()) {
return new JobWithJars(getPlan(), Collections.<URL>emptyList(), classpaths, userCodeClassLoader);
} else {
throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName() +
" for a program that is using the interactive mode.");
}
}

/** /**
* Returns the plan with all required jars. * Returns the plan with all required jars.
* *
* @return The plan with attached jar files. * @return The plan with attached jar files.
* @throws ProgramInvocationException * @throws ProgramInvocationException
*/ */
public JobWithJars getPlanWithJars() throws ProgramInvocationException { public JobWithJars getPlanWithJars() throws ProgramInvocationException {
if (isUsingProgramEntryPoint()) { if (isUsingProgramEntryPoint()) {
return new JobWithJars(getPlan(), getAllLibraries(), classpaths, userCodeClassLoader); return new JobWithJars(getPlan(), getAllLibraries(), classpaths, userCodeClassLoader);
} else { } else {
throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName() + throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName() +
" for a program that is using the interactive mode."); " for a program that is using the interactive mode.");
} }
} }


/** /**
* Returns the analyzed plan without any optimizations. * Returns the analyzed plan without any optimizations.
* *
* @return * @return
* the analyzed plan without any optimizations. * the analyzed plan without any optimizations.
* @throws ProgramInvocationException Thrown if an error occurred in the * @throws ProgramInvocationException Thrown if an error occurred in the
Expand All @@ -308,7 +323,7 @@ public JobWithJars getPlanWithJars() throws ProgramInvocationException {
public String getPreviewPlan() throws ProgramInvocationException { public String getPreviewPlan() throws ProgramInvocationException {
Thread.currentThread().setContextClassLoader(this.getUserCodeClassLoader()); Thread.currentThread().setContextClassLoader(this.getUserCodeClassLoader());
List<DataSinkNode> previewPlan; List<DataSinkNode> previewPlan;

if (isUsingProgramEntryPoint()) { if (isUsingProgramEntryPoint()) {
previewPlan = Optimizer.createPreOptimizedPlan(getPlan()); previewPlan = Optimizer.createPreOptimizedPlan(getPlan());
} }
Expand All @@ -335,7 +350,7 @@ else if (isUsingInteractiveMode()) {
finally { finally {
env.unsetAsContext(); env.unsetAsContext();
} }

if (env.previewPlan != null) { if (env.previewPlan != null) {
previewPlan = env.previewPlan; previewPlan = env.previewPlan;
} else { } else {
Expand All @@ -359,15 +374,15 @@ else if (isUsingInteractiveMode()) {
/** /**
* Returns the description provided by the Program class. This * Returns the description provided by the Program class. This
* may contain a description of the plan itself and its arguments. * may contain a description of the plan itself and its arguments.
* *
* @return The description of the PactProgram's input parameters. * @return The description of the PactProgram's input parameters.
* @throws ProgramInvocationException * @throws ProgramInvocationException
* This invocation is thrown if the Program can't be properly loaded. Causes * This invocation is thrown if the Program can't be properly loaded. Causes
* may be a missing / wrong class or manifest files. * may be a missing / wrong class or manifest files.
*/ */
public String getDescription() throws ProgramInvocationException { public String getDescription() throws ProgramInvocationException {
if (ProgramDescription.class.isAssignableFrom(this.mainClass)) { if (ProgramDescription.class.isAssignableFrom(this.mainClass)) {

ProgramDescription descr; ProgramDescription descr;
if (this.program != null) { if (this.program != null) {
descr = (ProgramDescription) this.program; descr = (ProgramDescription) this.program;
Expand All @@ -379,22 +394,22 @@ public String getDescription() throws ProgramInvocationException {
return null; return null;
} }
} }

try { try {
return descr.getDescription(); return descr.getDescription();
} }
catch (Throwable t) { catch (Throwable t) {
throw new ProgramInvocationException("Error while getting the program description" + throw new ProgramInvocationException("Error while getting the program description" +
(t.getMessage() == null ? "." : ": " + t.getMessage()), t); (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
} }

} else { } else {
return null; return null;
} }
} }

/** /**
* *
* This method assumes that the context environment is prepared, or the execution * This method assumes that the context environment is prepared, or the execution
* will be a local execution by default. * will be a local execution by default.
*/ */
Expand All @@ -417,13 +432,16 @@ public List<URL> getClasspaths() {


/** /**
* Gets the {@link java.lang.ClassLoader} that must be used to load user code classes. * Gets the {@link java.lang.ClassLoader} that must be used to load user code classes.
* *
* @return The user code ClassLoader. * @return The user code ClassLoader.
*/ */
public ClassLoader getUserCodeClassLoader() { public ClassLoader getUserCodeClassLoader() {
return this.userCodeClassLoader; return this.userCodeClassLoader;
} }


/**
* Returns all provided libraries needed to run the program.
*/
public List<URL> getAllLibraries() { public List<URL> getAllLibraries() {
List<URL> libs = new ArrayList<URL>(this.extractedTempLibraries.size() + 1); List<URL> libs = new ArrayList<URL>(this.extractedTempLibraries.size() + 1);


Expand Down
Expand Up @@ -28,6 +28,7 @@
import scala.concurrent.Future; import scala.concurrent.Future;


import java.io.IOException; import java.io.IOException;
import java.net.URL;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;


Expand Down Expand Up @@ -86,6 +87,11 @@ public int getMaxSlots() {
return -1; return -1;
} }


@Override
public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
return false;
}

@Override @Override
protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
throws ProgramInvocationException { throws ProgramInvocationException {
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.api.scala package org.apache.flink.api.scala


import java.io._ import java.io._
import java.util.Collections


import org.apache.commons.cli.CommandLine import org.apache.commons.cli.CommandLine
import org.apache.flink.client.cli.CliFrontendParser import org.apache.flink.client.cli.CliFrontendParser
Expand Down Expand Up @@ -252,7 +253,11 @@ object FlinkShell {
val config = frontend.getConfiguration val config = frontend.getConfiguration
val customCLI = frontend.getActiveCustomCommandLine(options.getCommandLine) val customCLI = frontend.getActiveCustomCommandLine(options.getCommandLine)


val cluster = customCLI.createCluster("Flink Scala Shell", options.getCommandLine, config) val cluster = customCLI.createCluster(
"Flink Scala Shell",
options.getCommandLine,
config,
Collections.emptyList())


val address = cluster.getJobManagerAddress.getAddress.getHostAddress val address = cluster.getJobManagerAddress.getAddress.getHostAddress
val port = cluster.getJobManagerAddress.getPort val port = cluster.getJobManagerAddress.getPort
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.cli.RunOptions; import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.HighAvailabilityOptions;
Expand Down Expand Up @@ -324,8 +325,8 @@ private static class TestCLI extends CliFrontend {


@Override @Override
// make method public // make method public
public ClusterClient createClient(CommandLineOptions options, String programName) throws Exception { public ClusterClient createClient(CommandLineOptions options, PackagedProgram program) throws Exception {
return super.createClient(options, programName); return super.createClient(options, program);
} }


@Override @Override
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.client.CliFrontend; import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.cli.CommandLineOptions; import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
Expand Down Expand Up @@ -744,9 +745,9 @@ private static class TestingCLI extends CliFrontend {
public TestingCLI() throws Exception {} public TestingCLI() throws Exception {}


@Override @Override
protected ClusterClient createClient(CommandLineOptions options, String programName) throws Exception { protected ClusterClient createClient(CommandLineOptions options, PackagedProgram program) throws Exception {
// mock the returned ClusterClient to disable shutdown and verify shutdown behavior later on // mock the returned ClusterClient to disable shutdown and verify shutdown behavior later on
originalClusterClient = super.createClient(options, programName); originalClusterClient = super.createClient(options, program);
spiedClusterClient = Mockito.spy(originalClusterClient); spiedClusterClient = Mockito.spy(originalClusterClient);
Mockito.doNothing().when(spiedClusterClient).shutdown(); Mockito.doNothing().when(spiedClusterClient).shutdown();
return spiedClusterClient; return spiedClusterClient;
Expand Down

0 comments on commit 2b600d3

Please sign in to comment.