From da9486bb0da40cd7e5802bf72a3f1fe416b8aacc Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 3 Jan 2018 13:25:09 +0100 Subject: [PATCH 1/7] [FLINK-8340] [flip6] Remove passing of Configuration to CustomCommandLine Since the Configuration does not change over the lifetime of a CustomCommandLine, we can safely pass it as a constructor argument instead of method argument. --- .../client/cli/AbstractCustomCommandLine.java | 16 +- .../apache/flink/client/cli/CliFrontend.java | 84 ++-- .../flink/client/cli/CustomCommandLine.java | 23 +- .../apache/flink/client/cli/DefaultCLI.java | 17 +- .../flink/client/cli/Flip6DefaultCLI.java | 17 +- .../client/cli/CliFrontendCancelTest.java | 24 +- .../flink/client/cli/CliFrontendInfoTest.java | 24 +- .../flink/client/cli/CliFrontendListTest.java | 6 +- .../cli/CliFrontendPackageProgramTest.java | 243 +++++------ .../flink/client/cli/CliFrontendRunTest.java | 34 +- .../flink/client/cli/CliFrontendStopTest.java | 12 +- .../flink/client/cli/DefaultCLITest.java | 49 +-- .../flink/client/cli/Flip6DefaultCLITest.java | 4 +- .../cli/util/DummyCustomCommandLine.java | 12 +- .../client/cli/util/MockedCliFrontend.java | 4 +- .../client/program/ClusterClientTest.java | 4 + .../apache/flink/api/scala/FlinkShell.scala | 27 +- .../YARNSessionCapacitySchedulerITCase.java | 4 +- .../flink/yarn/YARNSessionFIFOITCase.java | 5 +- .../org/apache/flink/yarn/YarnTestBase.java | 5 +- .../apache/flink/yarn/YarnClusterClient.java | 3 +- .../flink/yarn/cli/FlinkYarnSessionCli.java | 389 ++++++++++-------- .../flink/yarn/FlinkYarnSessionCliTest.java | 121 +++--- 23 files changed, 538 insertions(+), 589 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java index 70ffc5cd137cb..c7a16725521aa 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java @@ -22,6 +22,9 @@ import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; @@ -47,6 +50,16 @@ public abstract class AbstractCustomCommandLine impleme "Address of the JobManager (master) to which to connect. " + "Use this flag to connect to a different JobManager than the one specified in the configuration."); + protected final Configuration configuration; + + protected AbstractCustomCommandLine(Configuration configuration) { + this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration)); + } + + public Configuration getConfiguration() { + return configuration; + } + @Override public void addRunOptions(Options baseOptions) { // nothing to add here @@ -61,11 +74,10 @@ public void addGeneralOptions(Options baseOptions) { /** * Override configuration settings by specified command line options. * - * @param configuration to use as the base configuration * @param commandLine containing the overriding values * @return Effective configuration with the overriden configuration settings */ - protected Configuration applyCommandLineOptionsToConfiguration(Configuration configuration, CommandLine commandLine) { + protected Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException { final Configuration resultingConfiguration = new Configuration(configuration); if (commandLine.hasOption(addressOption.getOpt())) { diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index 4b5040ebe4c53..630154c13cfdf 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -108,8 +108,6 @@ public class CliFrontend { private final List> customCommandLines; - private final String configurationDirectory; - private final Options customCommandLineOptions; private final FiniteDuration clientTimeout; @@ -118,11 +116,9 @@ public class CliFrontend { public CliFrontend( Configuration configuration, - List> customCommandLines, - String configurationDirectory) throws Exception { + List> customCommandLines) throws Exception { this.configuration = Preconditions.checkNotNull(configuration); this.customCommandLines = Preconditions.checkNotNull(customCommandLines); - this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory); try { FileSystem.initialize(this.configuration); @@ -139,9 +135,9 @@ public CliFrontend( } this.clientTimeout = AkkaUtils.getClientTimeout(this.configuration); - this.defaultParallelism = GlobalConfiguration.loadConfiguration().getInteger( - ConfigConstants.DEFAULT_PARALLELISM_KEY, - ConfigConstants.DEFAULT_PARALLELISM); + this.defaultParallelism = configuration.getInteger( + ConfigConstants.DEFAULT_PARALLELISM_KEY, + ConfigConstants.DEFAULT_PARALLELISM); } // -------------------------------------------------------------------------------------------- @@ -161,15 +157,6 @@ public Configuration getConfiguration() { return copiedConfiguration; } - /** - * Returns the configuration directory for the CLI frontend. - * - * @return Configuration directory - */ - public String getConfigurationDirectory() { - return configurationDirectory; - } - // -------------------------------------------------------------------------------------------- // Execute Actions // -------------------------------------------------------------------------------------------- @@ -211,20 +198,17 @@ protected void run(String[] args) throws Exception { final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine); - final ClusterDescriptor clusterDescriptor = customCommandLine.createClusterDescriptor( - configuration, - configurationDirectory, - commandLine); + final ClusterDescriptor clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine); try { - final String clusterId = customCommandLine.getClusterId(configuration, commandLine); + final String clusterId = customCommandLine.getClusterId(commandLine); final ClusterClient client; if (clusterId != null) { client = clusterDescriptor.retrieve(clusterId); } else { - final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(configuration, commandLine); + final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine); client = clusterDescriptor.deploySessionCluster(clusterSpecification); } @@ -368,14 +352,9 @@ protected void list(String[] args) throws Exception { } final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine); - final ClusterDescriptor clusterDescriptor = activeCommandLine.createClusterDescriptor( - configuration, - configurationDirectory, - commandLine); + final ClusterDescriptor clusterDescriptor = activeCommandLine.createClusterDescriptor(commandLine); - final String clusterId = activeCommandLine.getClusterId( - configuration, - commandLine); + final String clusterId = activeCommandLine.getClusterId(commandLine); if (clusterId == null) { throw new FlinkException("No cluster id was specified. Please specify a cluster to which " + @@ -496,12 +475,9 @@ protected void stop(String[] args) throws Exception { final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine); - final ClusterDescriptor clusterDescriptor = activeCommandLine.createClusterDescriptor( - configuration, - configurationDirectory, - commandLine); + final ClusterDescriptor clusterDescriptor = activeCommandLine.createClusterDescriptor(commandLine); - final String clusterId = activeCommandLine.getClusterId(configuration, commandLine); + final String clusterId = activeCommandLine.getClusterId(commandLine); if (clusterId == null) { throw new FlinkException("No cluster id was specified. Please specify a cluster to which " + @@ -579,12 +555,9 @@ protected void cancel(String[] args) throws Exception { final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine); - final ClusterDescriptor clusterDescriptor = activeCommandLine.createClusterDescriptor( - configuration, - configurationDirectory, - commandLine); + final ClusterDescriptor clusterDescriptor = activeCommandLine.createClusterDescriptor(commandLine); - final String clusterId = activeCommandLine.getClusterId(configuration, commandLine); + final String clusterId = activeCommandLine.getClusterId(commandLine); if (clusterId == null) { throw new FlinkException("No cluster id was specified. Please specify a cluster to which " + @@ -646,14 +619,9 @@ protected void savepoint(String[] args) throws Exception { CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine); - final ClusterDescriptor clusterDescriptor = customCommandLine.createClusterDescriptor( - configuration, - configurationDirectory, - commandLine); + final ClusterDescriptor clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine); - final String clusterId = customCommandLine.getClusterId( - configuration, - commandLine); + final String clusterId = customCommandLine.getClusterId(commandLine); if (clusterId == null) { throw new FlinkException("No cluster id was specified. Please specify a cluster to which " + @@ -1004,13 +972,14 @@ public static void main(final String[] args) { final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory); // 3. load the custom command lines - final List> customCommandLines = loadCustomCommandLines(); + final List> customCommandLines = loadCustomCommandLines( + configuration, + configurationDirectory); try { final CliFrontend cli = new CliFrontend( configuration, - customCommandLines, - configurationDirectory); + customCommandLines); SecurityUtils.install(new SecurityConfiguration(cli.configuration)); int retCode = SecurityUtils.getInstalledContext() @@ -1070,7 +1039,7 @@ public static void setJobManagerAddressInConfig(Configuration config, InetSocket config.setInteger(JobManagerOptions.PORT, address.getPort()); } - public static List> loadCustomCommandLines() { + public static List> loadCustomCommandLines(Configuration configuration, String configurationDirectory) { List> customCommandLines = new ArrayList<>(2); // Command line interface of the YARN session, with a special initialization here @@ -1079,13 +1048,18 @@ public static List> loadCustomCommandLines() { // active CustomCommandLine in order and DefaultCLI isActive always return true. final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli"; try { - customCommandLines.add(loadCustomCommandLine(flinkYarnSessionCLI, "y", "yarn")); + customCommandLines.add( + loadCustomCommandLine(flinkYarnSessionCLI, + configuration, + configurationDirectory, + "y", + "yarn")); } catch (Exception e) { LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e); } - customCommandLines.add(new Flip6DefaultCLI()); - customCommandLines.add(new DefaultCLI()); + customCommandLines.add(new Flip6DefaultCLI(configuration)); + customCommandLines.add(new DefaultCLI(configuration)); return customCommandLines; } @@ -1101,7 +1075,7 @@ public static List> loadCustomCommandLines() { */ public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) { for (CustomCommandLine cli : customCommandLines) { - if (cli.isActive(commandLine, configuration)) { + if (cli.isActive(commandLine)) { return cli; } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java index cfa08278a837e..f642484b075b5 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java @@ -21,7 +21,7 @@ import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FlinkException; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; @@ -36,10 +36,9 @@ public interface CustomCommandLine { /** * Signals whether the custom command-line wants to execute or not. * @param commandLine The command-line options - * @param configuration The Flink configuration * @return True if the command-line wants to run, False otherwise */ - boolean isActive(CommandLine commandLine, Configuration configuration); + boolean isActive(CommandLine commandLine); /** * Gets the unique identifier of this CustomCommandLine. @@ -55,6 +54,7 @@ public interface CustomCommandLine { /** * Adds custom options to the existing general options. + * * @param baseOptions The existing options. */ void addGeneralOptions(Options baseOptions); @@ -63,15 +63,11 @@ public interface CustomCommandLine { * Create a {@link ClusterDescriptor} from the given configuration, configuration directory * and the command line. * - * @param configuration to create the ClusterDescriptor with - * @param configurationDirectory where the configuration was loaded from * @param commandLine containing command line options relevant for the ClusterDescriptor * @return ClusterDescriptor + * @throws FlinkException if the ClusterDescriptor could not be created */ - ClusterDescriptor createClusterDescriptor( - Configuration configuration, - String configurationDirectory, - CommandLine commandLine); + ClusterDescriptor createClusterDescriptor(CommandLine commandLine) throws FlinkException; /** * Returns the cluster id if a cluster id was specified on the command line, otherwise it @@ -80,24 +76,21 @@ ClusterDescriptor createClusterDescriptor( *

A cluster id identifies a running cluster, e.g. the Yarn application id for a Flink * cluster running on Yarn. * - * @param configuration to be used for the cluster id retrieval * @param commandLine containing command line options relevant for the cluster id retrieval * @return Cluster id identifying the cluster to deploy jobs to or null */ @Nullable - String getClusterId(Configuration configuration, CommandLine commandLine); + String getClusterId(CommandLine commandLine); /** * Returns the {@link ClusterSpecification} specified by the configuration and the command * line options. This specification can be used to deploy a new Flink cluster. * - * @param configuration to be used for the ClusterSpecification values * @param commandLine containing command line options relevant for the ClusterSpecification * @return ClusterSpecification for a new Flink cluster + * @throws FlinkException if the ClusterSpecification could not be created */ - ClusterSpecification getClusterSpecification( - Configuration configuration, - CommandLine commandLine); + ClusterSpecification getClusterSpecification(CommandLine commandLine) throws FlinkException; default CommandLine parseCommandLineOptions(String[] args, boolean stopAtNonOptions) throws CliArgsException { final Options options = new Options(); diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java index d34e307624ebe..c29c5b7856aab 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java @@ -23,6 +23,7 @@ import org.apache.flink.client.deployment.StandaloneClusterDescriptor; import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FlinkException; import org.apache.commons.cli.CommandLine; @@ -33,8 +34,12 @@ */ public class DefaultCLI extends AbstractCustomCommandLine { + public DefaultCLI(Configuration configuration) { + super(configuration); + } + @Override - public boolean isActive(CommandLine commandLine, Configuration configuration) { + public boolean isActive(CommandLine commandLine) { // always active because we can try to read a JobManager address from the config return true; } @@ -46,22 +51,20 @@ public String getId() { @Override public ClusterDescriptor createClusterDescriptor( - Configuration configuration, - String configurationDirectory, - CommandLine commandLine) { - final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(configuration, commandLine); + CommandLine commandLine) throws FlinkException { + final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(commandLine); return new StandaloneClusterDescriptor(effectiveConfiguration); } @Override @Nullable - public String getClusterId(Configuration configuration, CommandLine commandLine) { + public String getClusterId(CommandLine commandLine) { return "standalone"; } @Override - public ClusterSpecification getClusterSpecification(Configuration configuration, CommandLine commandLine) { + public ClusterSpecification getClusterSpecification(CommandLine commandLine) { return new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification(); } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java index 0e80f44f17cd3..3adeca6fde413 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java @@ -23,6 +23,7 @@ import org.apache.flink.client.deployment.Flip6StandaloneClusterDescriptor; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FlinkException; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; @@ -41,8 +42,12 @@ public class Flip6DefaultCLI extends AbstractCustomCommandLine createClusterDescriptor( - Configuration configuration, - String configurationDirectory, - CommandLine commandLine) { - final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(configuration, commandLine); + CommandLine commandLine) throws FlinkException { + final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(commandLine); return new Flip6StandaloneClusterDescriptor(effectiveConfiguration); } @Override @Nullable - public String getClusterId(Configuration configuration, CommandLine commandLine) { + public String getClusterId(CommandLine commandLine) { return "flip6Standalone"; } @Override - public ClusterSpecification getClusterSpecification(Configuration configuration, CommandLine commandLine) { + public ClusterSpecification getClusterSpecification(CommandLine commandLine) { return new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification(); } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendCancelTest.java index b0a308d82d7a4..60bd30875ba03 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendCancelTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendCancelTest.java @@ -63,20 +63,20 @@ public void testCancel() throws Exception { @Test(expected = CliArgsException.class) public void testMissingJobId() throws Exception { String[] parameters = {}; + Configuration configuration = new Configuration(); CliFrontend testFrontend = new CliFrontend( - new Configuration(), - Collections.singletonList(new DefaultCLI()), - CliFrontendTestUtils.getConfigDir()); + configuration, + Collections.singletonList(new DefaultCLI(configuration))); testFrontend.cancel(parameters); } @Test(expected = CliArgsException.class) public void testUnrecognizedOption() throws Exception { String[] parameters = {"-v", "-l"}; + Configuration configuration = new Configuration(); CliFrontend testFrontend = new CliFrontend( - new Configuration(), - Collections.singletonList(new DefaultCLI()), - CliFrontendTestUtils.getConfigDir()); + configuration, + Collections.singletonList(new DefaultCLI(configuration))); testFrontend.cancel(parameters); } @@ -116,10 +116,10 @@ public void testCancelWithSavepoint() throws Exception { public void testCancelWithSavepointWithoutJobId() throws Exception { // Cancel with savepoint (with target directory), but no job ID String[] parameters = { "-s", "targetDirectory" }; + Configuration configuration = new Configuration(); CliFrontend testFrontend = new CliFrontend( - new Configuration(), - Collections.singletonList(new DefaultCLI()), - CliFrontendTestUtils.getConfigDir()); + configuration, + Collections.singletonList(new DefaultCLI(configuration))); testFrontend.cancel(parameters); } @@ -127,10 +127,10 @@ public void testCancelWithSavepointWithoutJobId() throws Exception { public void testCancelWithSavepointWithoutParameters() throws Exception { // Cancel with savepoint (no target directory) and no job ID String[] parameters = { "-s" }; + Configuration configuration = new Configuration(); CliFrontend testFrontend = new CliFrontend( - new Configuration(), - Collections.singletonList(new DefaultCLI()), - CliFrontendTestUtils.getConfigDir()); + configuration, + Collections.singletonList(new DefaultCLI(configuration))); testFrontend.cancel(parameters); } diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendInfoTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendInfoTest.java index fa212cbb602d6..c284c6141b7a3 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendInfoTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendInfoTest.java @@ -42,20 +42,20 @@ public class CliFrontendInfoTest extends TestLogger { @Test(expected = CliArgsException.class) public void testMissingOption() throws Exception { String[] parameters = {}; + Configuration configuration = new Configuration(); CliFrontend testFrontend = new CliFrontend( - new Configuration(), - Collections.singletonList(new DefaultCLI()), - CliFrontendTestUtils.getConfigDir()); + configuration, + Collections.singletonList(new DefaultCLI(configuration))); testFrontend.cancel(parameters); } @Test(expected = CliArgsException.class) public void testUnrecognizedOption() throws Exception { String[] parameters = {"-v", "-l"}; + Configuration configuration = new Configuration(); CliFrontend testFrontend = new CliFrontend( - new Configuration(), - Collections.singletonList(new DefaultCLI()), - CliFrontendTestUtils.getConfigDir()); + configuration, + Collections.singletonList(new DefaultCLI(configuration))); testFrontend.cancel(parameters); } @@ -65,10 +65,10 @@ public void testShowExecutionPlan() throws Exception { try { String[] parameters = new String[]{CliFrontendTestUtils.getTestJarPath(), "-f", "true"}; + Configuration configuration = new Configuration(); CliFrontend testFrontend = new CliFrontend( - new Configuration(), - Collections.singletonList(new DefaultCLI()), - CliFrontendTestUtils.getConfigDir()); + configuration, + Collections.singletonList(new DefaultCLI(configuration))); testFrontend.info(parameters); assertTrue(buffer.toString().contains("\"parallelism\": \"1\"")); } @@ -82,10 +82,10 @@ public void testShowExecutionPlanWithParallelism() { replaceStdOut(); try { String[] parameters = {"-p", "17", CliFrontendTestUtils.getTestJarPath()}; + Configuration configuration = new Configuration(); CliFrontend testFrontend = new CliFrontend( - new Configuration(), - Collections.singletonList(new DefaultCLI()), - CliFrontendTestUtils.getConfigDir()); + configuration, + Collections.singletonList(new DefaultCLI(configuration))); testFrontend.info(parameters); assertTrue(buffer.toString().contains("\"parallelism\": \"17\"")); } diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java index 6639e2538a0d2..77d8016ad04fa 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java @@ -60,10 +60,10 @@ public void testList() throws Exception { @Test(expected = CliArgsException.class) public void testUnrecognizedOption() throws Exception { String[] parameters = {"-v", "-k"}; + Configuration configuration = new Configuration(); CliFrontend testFrontend = new CliFrontend( - new Configuration(), - Collections.singletonList(new DefaultCLI()), - CliFrontendTestUtils.getConfigDir()); + configuration, + Collections.singletonList(new DefaultCLI(configuration))); testFrontend.list(parameters); } diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java index a862dc15d666e..30b1128c71839 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java @@ -22,7 +22,6 @@ import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.Configuration; -import org.apache.flink.optimizer.CompilerException; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.costs.DefaultCostEstimator; @@ -66,129 +65,99 @@ public static void init() { public void setup() throws Exception { frontend = new CliFrontend( new Configuration(), - Collections.singletonList(new DefaultCLI()), - CliFrontendTestUtils.getConfigDir()); + Collections.singletonList(new DefaultCLI())); } @Test public void testNonExistingJarFile() { try { - ProgramOptions options = mock(ProgramOptions.class); - when(options.getJarFilePath()).thenReturn("/some/none/existing/path"); + ProgramOptions options = mock(ProgramOptions.class); + when(options.getJarFilePath()).thenReturn("/some/none/existing/path"); - try { - frontend.buildProgram(options); - fail("should throw an exception"); - } - catch (FileNotFoundException e) { - // that's what we want - } + try { + frontend.buildProgram(options); + fail("should throw an exception"); } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + catch (FileNotFoundException e) { + // that's what we want } } @Test - public void testFileNotJarFile() { - try { - ProgramOptions options = mock(ProgramOptions.class); - when(options.getJarFilePath()).thenReturn(getNonJarFilePath()); + public void testFileNotJarFile() throws Exception { + ProgramOptions options = mock(ProgramOptions.class); + when(options.getJarFilePath()).thenReturn(getNonJarFilePath()); - try { - frontend.buildProgram(options); - fail("should throw an exception"); - } - catch (ProgramInvocationException e) { - // that's what we want - } + try { + frontend.buildProgram(options); + fail("should throw an exception"); } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + catch (ProgramInvocationException e) { + // that's what we want } } @Test - public void testVariantWithExplicitJarAndArgumentsOption() { - try { - String[] arguments = { - "--classpath", "file:///tmp/foo", - "--classpath", "file:///tmp/bar", - "-j", getTestJarPath(), - "-a", "--debug", "true", "arg1", "arg2" }; - URL[] classpath = new URL[] { new URL("file:///tmp/foo"), new URL("file:///tmp/bar") }; - String[] reducedArguments = new String[] {"--debug", "true", "arg1", "arg2"}; - - RunOptions options = CliFrontendParser.parseRunCommand(arguments); - assertEquals(getTestJarPath(), options.getJarFilePath()); - assertArrayEquals(classpath, options.getClasspaths().toArray()); - assertArrayEquals(reducedArguments, options.getProgramArgs()); - - PackagedProgram prog = frontend.buildProgram(options); - - Assert.assertArrayEquals(reducedArguments, prog.getArguments()); - Assert.assertEquals(TEST_JAR_MAIN_CLASS, prog.getMainClassName()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + public void testVariantWithExplicitJarAndArgumentsOption() throws Exception { + String[] arguments = { + "--classpath", "file:///tmp/foo", + "--classpath", "file:///tmp/bar", + "-j", getTestJarPath(), + "-a", "--debug", "true", "arg1", "arg2" }; + URL[] classpath = new URL[] { new URL("file:///tmp/foo"), new URL("file:///tmp/bar") }; + String[] reducedArguments = new String[] {"--debug", "true", "arg1", "arg2"}; + + RunOptions options = CliFrontendParser.parseRunCommand(arguments); + assertEquals(getTestJarPath(), options.getJarFilePath()); + assertArrayEquals(classpath, options.getClasspaths().toArray()); + assertArrayEquals(reducedArguments, options.getProgramArgs()); + + PackagedProgram prog = frontend.buildProgram(options); + + Assert.assertArrayEquals(reducedArguments, prog.getArguments()); + Assert.assertEquals(TEST_JAR_MAIN_CLASS, prog.getMainClassName()); } @Test - public void testVariantWithExplicitJarAndNoArgumentsOption() { - try { - String[] arguments = { - "--classpath", "file:///tmp/foo", - "--classpath", "file:///tmp/bar", - "-j", getTestJarPath(), - "--debug", "true", "arg1", "arg2" }; - URL[] classpath = new URL[] { new URL("file:///tmp/foo"), new URL("file:///tmp/bar") }; - String[] reducedArguments = new String[] {"--debug", "true", "arg1", "arg2"}; - - RunOptions options = CliFrontendParser.parseRunCommand(arguments); - assertEquals(getTestJarPath(), options.getJarFilePath()); - assertArrayEquals(classpath, options.getClasspaths().toArray()); - assertArrayEquals(reducedArguments, options.getProgramArgs()); - - PackagedProgram prog = frontend.buildProgram(options); - - Assert.assertArrayEquals(reducedArguments, prog.getArguments()); - Assert.assertEquals(TEST_JAR_MAIN_CLASS, prog.getMainClassName()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + public void testVariantWithExplicitJarAndNoArgumentsOption() throws Exception { + String[] arguments = { + "--classpath", "file:///tmp/foo", + "--classpath", "file:///tmp/bar", + "-j", getTestJarPath(), + "--debug", "true", "arg1", "arg2" }; + URL[] classpath = new URL[] { new URL("file:///tmp/foo"), new URL("file:///tmp/bar") }; + String[] reducedArguments = new String[] {"--debug", "true", "arg1", "arg2"}; + + RunOptions options = CliFrontendParser.parseRunCommand(arguments); + assertEquals(getTestJarPath(), options.getJarFilePath()); + assertArrayEquals(classpath, options.getClasspaths().toArray()); + assertArrayEquals(reducedArguments, options.getProgramArgs()); + + PackagedProgram prog = frontend.buildProgram(options); + + Assert.assertArrayEquals(reducedArguments, prog.getArguments()); + Assert.assertEquals(TEST_JAR_MAIN_CLASS, prog.getMainClassName()); } @Test - public void testValidVariantWithNoJarAndNoArgumentsOption() { - try { - String[] arguments = { - "--classpath", "file:///tmp/foo", - "--classpath", "file:///tmp/bar", - getTestJarPath(), - "--debug", "true", "arg1", "arg2" }; - URL[] classpath = new URL[] { new URL("file:///tmp/foo"), new URL("file:///tmp/bar") }; - String[] reducedArguments = {"--debug", "true", "arg1", "arg2"}; - - RunOptions options = CliFrontendParser.parseRunCommand(arguments); - assertEquals(getTestJarPath(), options.getJarFilePath()); - assertArrayEquals(classpath, options.getClasspaths().toArray()); - assertArrayEquals(reducedArguments, options.getProgramArgs()); - - PackagedProgram prog = frontend.buildProgram(options); - - Assert.assertArrayEquals(reducedArguments, prog.getArguments()); - Assert.assertEquals(TEST_JAR_MAIN_CLASS, prog.getMainClassName()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + public void testValidVariantWithNoJarAndNoArgumentsOption() throws Exception { + String[] arguments = { + "--classpath", "file:///tmp/foo", + "--classpath", "file:///tmp/bar", + getTestJarPath(), + "--debug", "true", "arg1", "arg2" }; + URL[] classpath = new URL[] { new URL("file:///tmp/foo"), new URL("file:///tmp/bar") }; + String[] reducedArguments = {"--debug", "true", "arg1", "arg2"}; + + RunOptions options = CliFrontendParser.parseRunCommand(arguments); + assertEquals(getTestJarPath(), options.getJarFilePath()); + assertArrayEquals(classpath, options.getClasspaths().toArray()); + assertArrayEquals(reducedArguments, options.getProgramArgs()); + + PackagedProgram prog = frontend.buildProgram(options); + + Assert.assertArrayEquals(reducedArguments, prog.getArguments()); + Assert.assertEquals(TEST_JAR_MAIN_CLASS, prog.getMainClassName()); } @Test(expected = CliArgsException.class) @@ -199,54 +168,42 @@ public void testNoJarNoArgumentsAtAll() throws Exception { } @Test - public void testNonExistingFileWithArguments() { + public void testNonExistingFileWithArguments() throws Exception { + String[] arguments = { + "--classpath", "file:///tmp/foo", + "--classpath", "file:///tmp/bar", + "/some/none/existing/path", + "--debug", "true", "arg1", "arg2" }; + URL[] classpath = new URL[] { new URL("file:///tmp/foo"), new URL("file:///tmp/bar") }; + String[] reducedArguments = {"--debug", "true", "arg1", "arg2"}; + + RunOptions options = CliFrontendParser.parseRunCommand(arguments); + assertEquals(arguments[4], options.getJarFilePath()); + assertArrayEquals(classpath, options.getClasspaths().toArray()); + assertArrayEquals(reducedArguments, options.getProgramArgs()); + try { - String[] arguments = { - "--classpath", "file:///tmp/foo", - "--classpath", "file:///tmp/bar", - "/some/none/existing/path", - "--debug", "true", "arg1", "arg2" }; - URL[] classpath = new URL[] { new URL("file:///tmp/foo"), new URL("file:///tmp/bar") }; - String[] reducedArguments = {"--debug", "true", "arg1", "arg2"}; - - RunOptions options = CliFrontendParser.parseRunCommand(arguments); - assertEquals(arguments[4], options.getJarFilePath()); - assertArrayEquals(classpath, options.getClasspaths().toArray()); - assertArrayEquals(reducedArguments, options.getProgramArgs()); - - try { - frontend.buildProgram(options); - fail("Should fail with an exception"); - } - catch (FileNotFoundException e) { - // that's what we want - } + frontend.buildProgram(options); + fail("Should fail with an exception"); } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + catch (FileNotFoundException e) { + // that's what we want } } @Test - public void testNonExistingFileWithoutArguments() { - try { - String[] arguments = {"/some/none/existing/path"}; + public void testNonExistingFileWithoutArguments() throws Exception { + String[] arguments = {"/some/none/existing/path"}; - RunOptions options = CliFrontendParser.parseRunCommand(arguments); - assertEquals(arguments[0], options.getJarFilePath()); - assertArrayEquals(new String[0], options.getProgramArgs()); + RunOptions options = CliFrontendParser.parseRunCommand(arguments); + assertEquals(arguments[0], options.getJarFilePath()); + assertArrayEquals(new String[0], options.getProgramArgs()); - try { - frontend.buildProgram(options); - } - catch (FileNotFoundException e) { - // that's what we want - } + try { + frontend.buildProgram(options); } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + catch (FileNotFoundException e) { + // that's what we want } } @@ -284,7 +241,7 @@ public void testNonExistingFileWithoutArguments() { * */ @Test - public void testPlanWithExternalClass() throws CompilerException, ProgramInvocationException { + public void testPlanWithExternalClass() throws Exception { final boolean[] callme = { false }; // create a final object reference, to be able to change its val later try { @@ -334,9 +291,5 @@ public Class loadClass(String name) throws ClassNotFoundException { } assertTrue("Classloader was not called", callme[0]); } - catch (Exception e) { - e.printStackTrace(); - fail("Program failed with the wrong exception: " + e.getClass().getName()); - } } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java index fa1c76c7f0b1c..ebb76d886d711 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java @@ -46,31 +46,32 @@ public static void init() { @Test public void testRun() throws Exception { + final Configuration configuration = GlobalConfiguration.loadConfiguration(CliFrontendTestUtils.getConfigDir()); // test without parallelism { String[] parameters = {"-v", getTestJarPath()}; - RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(1, true, false); + RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(configuration, 1, true, false); testFrontend.run(parameters); } // test configure parallelism { String[] parameters = {"-v", "-p", "42", getTestJarPath()}; - RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(42, true, false); + RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(configuration, 42, true, false); testFrontend.run(parameters); } // test configure sysout logging { String[] parameters = {"-p", "2", "-q", getTestJarPath()}; - RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(2, false, false); + RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(configuration, 2, false, false); testFrontend.run(parameters); } // test detached mode { String[] parameters = {"-p", "2", "-d", getTestJarPath()}; - RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(2, true, true); + RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(configuration, 2, true, true); testFrontend.run(parameters); } @@ -111,10 +112,10 @@ public void testRun() throws Exception { public void testUnrecognizedOption() throws Exception { // test unrecognized option String[] parameters = {"-v", "-l", "-a", "some", "program", "arguments"}; + Configuration configuration = new Configuration(); CliFrontend testFrontend = new CliFrontend( - new Configuration(), - Collections.singletonList(new DefaultCLI()), - CliFrontendTestUtils.getConfigDir()); + configuration, + Collections.singletonList(new DefaultCLI(configuration))); testFrontend.run(parameters); } @@ -122,10 +123,10 @@ public void testUnrecognizedOption() throws Exception { public void testInvalidParallelismOption() throws Exception { // test configure parallelism with non integer value String[] parameters = {"-v", "-p", "text", getTestJarPath()}; + Configuration configuration = new Configuration(); CliFrontend testFrontend = new CliFrontend( - new Configuration(), - Collections.singletonList(new DefaultCLI()), - CliFrontendTestUtils.getConfigDir()); + configuration, + Collections.singletonList(new DefaultCLI(configuration))); testFrontend.run(parameters); } @@ -133,10 +134,10 @@ public void testInvalidParallelismOption() throws Exception { public void testParallelismWithOverflow() throws Exception { // test configure parallelism with overflow integer value String[] parameters = {"-v", "-p", "475871387138", getTestJarPath()}; + Configuration configuration = new Configuration(); CliFrontend testFrontend = new CliFrontend( - new Configuration(), - Collections.singletonList(new DefaultCLI()), - CliFrontendTestUtils.getConfigDir()); + configuration, + Collections.singletonList(new DefaultCLI(configuration))); testFrontend.run(parameters); } @@ -148,11 +149,10 @@ private static final class RunTestingCliFrontend extends CliFrontend { private final boolean sysoutLogging; private final boolean isDetached; - public RunTestingCliFrontend(int expectedParallelism, boolean logging, boolean isDetached) throws Exception { + public RunTestingCliFrontend(Configuration configuration, int expectedParallelism, boolean logging, boolean isDetached) throws Exception { super( - GlobalConfiguration.loadConfiguration(CliFrontendTestUtils.getConfigDir()), - Collections.singletonList(new DefaultCLI()), - CliFrontendTestUtils.getConfigDir()); + configuration, + Collections.singletonList(new DefaultCLI(configuration))); this.expectedParallelism = expectedParallelism; this.sysoutLogging = logging; this.isDetached = isDetached; diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopTest.java index fbbc739d2a6ad..0120cdff8ae0a 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopTest.java @@ -69,10 +69,10 @@ public void testStop() throws Exception { public void testUnrecognizedOption() throws Exception { // test unrecognized option String[] parameters = { "-v", "-l" }; + Configuration configuration = new Configuration(); CliFrontend testFrontend = new CliFrontend( - new Configuration(), - Collections.singletonList(new DefaultCLI()), - CliFrontendTestUtils.getConfigDir()); + configuration, + Collections.singletonList(new DefaultCLI(configuration))); testFrontend.stop(parameters); } @@ -80,10 +80,10 @@ public void testUnrecognizedOption() throws Exception { public void testMissingJobId() throws Exception { // test missing job id String[] parameters = {}; + Configuration configuration = new Configuration(); CliFrontend testFrontend = new CliFrontend( - new Configuration(), - Collections.singletonList(new DefaultCLI()), - CliFrontendTestUtils.getConfigDir()); + configuration, + Collections.singletonList(new DefaultCLI(configuration))); testFrontend.stop(parameters); } diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java index 3f05f90846522..e73b9c9e1251b 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java @@ -46,30 +46,25 @@ public class DefaultCLITest extends TestLogger { */ @Test public void testConfigurationPassing() throws Exception { - final DefaultCLI defaultCLI = new DefaultCLI(); - - final String configurationDirectory = temporaryFolder.newFolder().getAbsolutePath(); - final String[] args = {}; - - CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false); + final Configuration configuration = new Configuration(); final String localhost = "localhost"; final int port = 1234; - final Configuration configuration = new Configuration(); configuration.setString(JobManagerOptions.ADDRESS, localhost); configuration.setInteger(JobManagerOptions.PORT, port); + final DefaultCLI defaultCLI = new DefaultCLI(configuration); + + final String[] args = {}; + + CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false); + final InetSocketAddress expectedAddress = new InetSocketAddress(localhost, port); - final ClusterDescriptor clusterDescriptor = defaultCLI.createClusterDescriptor( - configuration, - configurationDirectory, - commandLine); + final ClusterDescriptor clusterDescriptor = defaultCLI.createClusterDescriptor(commandLine); - final ClusterClient clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId( - configuration, - commandLine)); + final ClusterClient clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine)); Assert.assertEquals(expectedAddress, clusterClient.getJobManagerAddress()); } @@ -79,15 +74,6 @@ public void testConfigurationPassing() throws Exception { */ @Test public void testManualConfigurationOverride() throws Exception { - final DefaultCLI defaultCLI = new DefaultCLI(); - - final String manualHostname = "123.123.123.123"; - final int manualPort = 4321; - final String configurationDirectory = temporaryFolder.newFolder().getAbsolutePath(); - final String[] args = {"-m", manualHostname + ':' + manualPort}; - - CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false); - final String localhost = "localhost"; final int port = 1234; final Configuration configuration = new Configuration(); @@ -95,14 +81,17 @@ public void testManualConfigurationOverride() throws Exception { configuration.setString(JobManagerOptions.ADDRESS, localhost); configuration.setInteger(JobManagerOptions.PORT, port); - final ClusterDescriptor clusterDescriptor = defaultCLI.createClusterDescriptor( - configuration, - configurationDirectory, - commandLine); + final DefaultCLI defaultCLI = new DefaultCLI(configuration); + + final String manualHostname = "123.123.123.123"; + final int manualPort = 4321; + final String[] args = {"-m", manualHostname + ':' + manualPort}; + + CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false); + + final ClusterDescriptor clusterDescriptor = defaultCLI.createClusterDescriptor(commandLine); - final ClusterClient clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId( - configuration, - commandLine)); + final ClusterClient clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine)); final InetSocketAddress expectedAddress = new InetSocketAddress(manualHostname, manualPort); diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/Flip6DefaultCLITest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/Flip6DefaultCLITest.java index 6a538d43d77cd..c3299bc7c3ae6 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/Flip6DefaultCLITest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/Flip6DefaultCLITest.java @@ -37,7 +37,7 @@ public class Flip6DefaultCLITest extends TestLogger { @Test public void testFlip6Switch() throws CliArgsException { final String[] args = {"-flip6"}; - final Flip6DefaultCLI flip6DefaultCLI = new Flip6DefaultCLI(); + final Flip6DefaultCLI flip6DefaultCLI = new Flip6DefaultCLI(new Configuration()); final Options options = new Options(); flip6DefaultCLI.addGeneralOptions(options); @@ -46,6 +46,6 @@ public void testFlip6Switch() throws CliArgsException { final CommandLine commandLine = CliFrontendParser.parse(options, args, false); Assert.assertTrue(commandLine.hasOption(Flip6DefaultCLI.FLIP_6.getOpt())); - Assert.assertTrue(flip6DefaultCLI.isActive(commandLine, new Configuration())); + Assert.assertTrue(flip6DefaultCLI.isActive(commandLine)); } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java index f9c2bfae23ed4..a36e8e9f9349b 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java @@ -22,7 +22,6 @@ import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Preconditions; import org.apache.commons.cli.CommandLine; @@ -43,7 +42,7 @@ public DummyCustomCommandLine(T clusterClient) { } @Override - public boolean isActive(CommandLine commandLine, Configuration configuration) { + public boolean isActive(CommandLine commandLine) { return true; } @@ -63,21 +62,18 @@ public void addGeneralOptions(Options baseOptions) { } @Override - public ClusterDescriptor createClusterDescriptor( - Configuration configuration, - String configurationDirectory, - CommandLine commandLine) { + public ClusterDescriptor createClusterDescriptor(CommandLine commandLine) { return new DummyClusterDescriptor<>(clusterClient); } @Override @Nullable - public String getClusterId(Configuration configuration, CommandLine commandLine) { + public String getClusterId(CommandLine commandLine) { return "dummy"; } @Override - public ClusterSpecification getClusterSpecification(Configuration configuration, CommandLine commandLine) { + public ClusterSpecification getClusterSpecification(CommandLine commandLine) { return new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification(); } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java index 135e83166288e..00b460f215367 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java @@ -19,7 +19,6 @@ package org.apache.flink.client.cli.util; import org.apache.flink.client.cli.CliFrontend; -import org.apache.flink.client.cli.CliFrontendTestUtils; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; @@ -35,7 +34,6 @@ public class MockedCliFrontend extends CliFrontend { public MockedCliFrontend(ClusterClient clusterClient) throws Exception { super( new Configuration(), - Collections.singletonList(new DummyCustomCommandLine<>(clusterClient)), - CliFrontendTestUtils.getConfigDir()); + Collections.singletonList(new DummyCustomCommandLine<>(clusterClient))); } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java index ec8091accc691..7b34d4a0747da 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java @@ -188,6 +188,8 @@ public void testDisposeSavepointUnknownResponse() throws Exception { fail("Dispose operation should have failed."); } catch (ExecutionException e) { assertTrue(ExceptionUtils.findThrowable(e, FlinkRuntimeException.class).isPresent()); + } finally { + clusterClient.shutdown(); } } @@ -214,6 +216,8 @@ public void testDisposeClassNotFoundException() throws Exception { "instance, which cannot be disposed without the user code class " + "loader. Please provide the program jar with which you have created " + "the savepoint via -j for disposal.").isPresent()); + } finally { + clusterClient.shutdown(); } } diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala index d4d0d05acb23f..9f29ce01898e5 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala @@ -256,22 +256,15 @@ object FlinkShell { val options = CliFrontendParser.parseRunCommand(args.toArray) val frontend = new CliFrontend( configuration, - CliFrontend.loadCustomCommandLines(), - configurationDirectory) + CliFrontend.loadCustomCommandLines(configuration, configurationDirectory)) val config = frontend.getConfiguration val customCLI = frontend.getActiveCustomCommandLine(options.getCommandLine) - val clusterDescriptor = customCLI.createClusterDescriptor( - config, - frontend.getConfigurationDirectory, - options.getCommandLine) + val clusterDescriptor = customCLI.createClusterDescriptor(options.getCommandLine) - val clusterSpecification = customCLI.getClusterSpecification( - config, - options.getCommandLine) + val clusterSpecification = customCLI.getClusterSpecification(options.getCommandLine) - val cluster = clusterDescriptor.deploySessionCluster( - clusterSpecification) + val cluster = clusterDescriptor.deploySessionCluster(clusterSpecification) val address = cluster.getJobManagerAddress.getAddress.getHostAddress val port = cluster.getJobManagerAddress.getPort @@ -291,19 +284,13 @@ object FlinkShell { val options = CliFrontendParser.parseRunCommand(args.toArray) val frontend = new CliFrontend( configuration, - CliFrontend.loadCustomCommandLines(), - configurationDirectory) + CliFrontend.loadCustomCommandLines(configuration, configurationDirectory)) val config = frontend.getConfiguration val customCLI = frontend.getActiveCustomCommandLine(options.getCommandLine) - val clusterDescriptor = customCLI.createClusterDescriptor( - configuration, - configurationDirectory, - options.getCommandLine) + val clusterDescriptor = customCLI.createClusterDescriptor(options.getCommandLine) - val clusterId = customCLI.getClusterId( - configuration, - options.getCommandLine) + val clusterId = customCLI.getClusterId(options.getCommandLine) val cluster = clusterDescriptor.retrieve(clusterId) diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java index 8716f8a883384..f347f940457c3 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java @@ -577,10 +577,10 @@ public boolean accept(File dir, String name) { // load the configuration LOG.info("testDetachedPerJobYarnClusterInternal: Trying to load configuration file"); - GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath()); + Configuration configuration = GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath()); try { - File yarnPropertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration()); + File yarnPropertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(configuration.getValue(YarnConfigOptions.PROPERTIES_FILE_LOCATION)); if (yarnPropertiesFile.exists()) { LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}", yarnPropertiesFile.getAbsolutePath()); yarnPropertiesFile.delete(); diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index ec8ef501480e7..d5a9883bafa3e 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -134,10 +135,10 @@ public void testDetachedMode() throws InterruptedException { // load the configuration LOG.info("testDetachedPerJobYarnClusterInternal: Trying to load configuration file"); - GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath()); + Configuration configuration = GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath()); try { - File yarnPropertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration()); + File yarnPropertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(configuration.getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION)); if (yarnPropertiesFile.exists()) { LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}", yarnPropertiesFile.getAbsolutePath()); yarnPropertiesFile.delete(); diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index 5325ae2121e41..ed0289223f492 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -693,6 +693,8 @@ public void run() { switch (type) { case YARN_SESSION: yCli = new FlinkYarnSessionCli( + configuration, + configurationDirectory, "", "", false); @@ -702,8 +704,7 @@ public void run() { try { CliFrontend cli = new CliFrontend( configuration, - CliFrontend.loadCustomCommandLines(), - configurationDirectory); + CliFrontend.loadCustomCommandLines(configuration, configurationDirectory)); returnValue = cli.parseParameters(args); } catch (Exception e) { throw new RuntimeException("Failed to execute the following args with CliFrontend: " diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java index 63421f922d0e8..5fb7f90b789f9 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java @@ -34,6 +34,7 @@ import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.flink.yarn.configuration.YarnConfigOptions; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -330,7 +331,7 @@ public void shutdownCluster() { } try { - File propertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(flinkConfig); + File propertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(flinkConfig.getValue(YarnConfigOptions.PROPERTIES_FILE_LOCATION)); if (propertiesFile.isFile()) { if (propertiesFile.delete()) { LOG.info("Deleted Yarn properties file at {}", propertiesFile.getAbsoluteFile().toString()); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 1b0f29a095aa5..1be3c570ca6fd 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -25,7 +25,6 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.clusterframework.ApplicationStatus; @@ -34,6 +33,8 @@ import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.util.ExecutorUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; import org.apache.flink.yarn.AbstractYarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterClient; import org.apache.flink.yarn.YarnClusterDescriptor; @@ -53,6 +54,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; @@ -70,7 +73,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.Callable; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -140,17 +142,36 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded); - for (Map.Entry dynamicProperty : dynamicProperties.entrySet()) { - flinkConfiguration.setString(dynamicProperty.getKey(), dynamicProperty.getValue()); - } - - return applicationID; } - public AbstractYarnClusterDescriptor createDescriptor( + private AbstractYarnClusterDescriptor createDescriptor( Configuration configuration, String configurationDirectory, String defaultApplicationName, @@ -364,7 +335,7 @@ public AbstractYarnClusterDescriptor createDescriptor( return yarnClusterDescriptor; } - public ClusterSpecification createClusterSpecification(Configuration configuration, CommandLine cmd) { + private ClusterSpecification createClusterSpecification(Configuration configuration, CommandLine cmd) { if (!cmd.hasOption(container.getOpt())) { // number of containers is required option! LOG.error("Missing required argument {}", container.getOpt()); printUsage(); @@ -374,27 +345,12 @@ public ClusterSpecification createClusterSpecification(Configuration configurati int numberTaskManagers = Integer.valueOf(cmd.getOptionValue(container.getOpt())); // JobManager Memory - final int jobManagerMemoryMB; - if (cmd.hasOption(jmMemory.getOpt())) { - jobManagerMemoryMB = Integer.valueOf(cmd.getOptionValue(this.jmMemory.getOpt())); - } else { - jobManagerMemoryMB = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY); - } + final int jobManagerMemoryMB = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY); // Task Managers memory - final int taskManagerMemoryMB; - if (cmd.hasOption(tmMemory.getOpt())) { - taskManagerMemoryMB = Integer.valueOf(cmd.getOptionValue(this.tmMemory.getOpt())); - } else { - taskManagerMemoryMB = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY); - } + final int taskManagerMemoryMB = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY); - int slotsPerTaskManager; - if (cmd.hasOption(slots.getOpt())) { - slotsPerTaskManager = Integer.valueOf(cmd.getOptionValue(this.slots.getOpt())); - } else { - slotsPerTaskManager = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); - } + int slotsPerTaskManager = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); // convenience int userParallelism = Integer.valueOf(cmd.getOptionValue(CliFrontendParser.PARALLELISM_OPTION.getOpt(), "-1")); @@ -435,11 +391,11 @@ private void printUsage() { } @Override - public boolean isActive(CommandLine commandLine, Configuration configuration) { + public boolean isActive(CommandLine commandLine) { String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null); boolean yarnJobManager = ID.equals(jobManagerOption); boolean yarnAppId = commandLine.hasOption(applicationId.getOpt()); - return yarnJobManager || yarnAppId || loadYarnPropertiesFile(commandLine, configuration) != null; + return yarnJobManager || yarnAppId || (isYarnPropertiesFileMode(commandLine) && yarnApplicationIdFromYarnProperties != null); } @Override @@ -463,11 +419,8 @@ public void addGeneralOptions(Options baseOptions) { } @Override - public AbstractYarnClusterDescriptor createClusterDescriptor( - Configuration configuration, - String configurationDirectory, - CommandLine commandLine) { - final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(configuration, commandLine); + public AbstractYarnClusterDescriptor createClusterDescriptor(CommandLine commandLine) throws FlinkException { + final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(commandLine); return createDescriptor( effectiveConfiguration, @@ -477,17 +430,26 @@ public AbstractYarnClusterDescriptor createClusterDescriptor( } @Override - public String getClusterId(Configuration configuration, CommandLine commandLine) { - return commandLine.hasOption(applicationId.getOpt()) ? commandLine.getOptionValue(applicationId.getOpt()) : loadYarnPropertiesFile(commandLine, configuration); + @Nullable + public String getClusterId(CommandLine commandLine) { + if (commandLine.hasOption(applicationId.getOpt())) { + return commandLine.getOptionValue(applicationId.getOpt()); + } else if (isYarnPropertiesFileMode(commandLine)) { + return yarnApplicationIdFromYarnProperties; + } else { + return null; + } } @Override - public ClusterSpecification getClusterSpecification(Configuration configuration, CommandLine commandLine) { - return createClusterSpecification(configuration, commandLine); + public ClusterSpecification getClusterSpecification(CommandLine commandLine) throws FlinkException { + final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(commandLine); + + return createClusterSpecification(effectiveConfiguration, commandLine); } @Override - protected Configuration applyCommandLineOptionsToConfiguration(Configuration configuration, CommandLine commandLine) { + protected Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException { // we ignore the addressOption because it can only contain "yarn-cluster" final Configuration effectiveConfiguration = new Configuration(configuration); @@ -496,7 +458,7 @@ protected Configuration applyCommandLineOptionsToConfiguration(Configuration con effectiveConfiguration.setString(HA_CLUSTER_ID, zkNamespace); } - final String applicationId = getClusterId(configuration, commandLine); + final String applicationId = getClusterId(commandLine); if (applicationId != null) { final String zooKeeperNamespace; @@ -509,6 +471,66 @@ protected Configuration applyCommandLineOptionsToConfiguration(Configuration con effectiveConfiguration.setString(HA_CLUSTER_ID, zooKeeperNamespace); } + if (commandLine.hasOption(jmMemory.getOpt())) { + effectiveConfiguration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, Integer.parseInt(commandLine.getOptionValue(jmMemory.getOpt()))); + } + + if (commandLine.hasOption(tmMemory.getOpt())) { + effectiveConfiguration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, Integer.parseInt(commandLine.getOptionValue(tmMemory.getOpt()))); + } + + if (commandLine.hasOption(slots.getOpt())) { + effectiveConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Integer.parseInt(commandLine.getOptionValue(slots.getOpt()))); + } + + if (isYarnPropertiesFileMode(commandLine)) { + return applyYarnProperties(effectiveConfiguration); + } else { + return effectiveConfiguration; + } + } + + private boolean isYarnPropertiesFileMode(CommandLine commandLine) { + boolean canApplyYarnProperties = !commandLine.hasOption(ADDRESS_OPTION.getOpt()); + + for (Option option : commandLine.getOptions()) { + if (allOptions.hasOption(option.getOpt())) { + if (!option.getOpt().equals(detached.getOpt())) { + // don't resume from properties file if yarn options have been specified + canApplyYarnProperties = false; + break; + } + } + } + + return canApplyYarnProperties; + } + + private Configuration applyYarnProperties(Configuration configuration) throws FlinkException { + final Configuration effectiveConfiguration = new Configuration(configuration); + + // configure the default parallelism from YARN + String propParallelism = yarnPropertiesFile.getProperty(YARN_PROPERTIES_PARALLELISM); + if (propParallelism != null) { // maybe the property is not set + try { + int parallelism = Integer.parseInt(propParallelism); + effectiveConfiguration.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, parallelism); + + logAndSysout("YARN properties set default parallelism to " + parallelism); + } + catch (NumberFormatException e) { + throw new FlinkException("Error while parsing the YARN properties: " + + "Property " + YARN_PROPERTIES_PARALLELISM + " is not an integer.", e); + } + } + + // handle the YARN client's dynamic properties + String dynamicPropertiesEncoded = yarnPropertiesFile.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING); + Map dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded); + for (Map.Entry dynamicProperty : dynamicProperties.entrySet()) { + effectiveConfiguration.setString(dynamicProperty.getKey(), dynamicProperty.getValue()); + } + return effectiveConfiguration; } @@ -596,77 +618,82 @@ public int run( } } else { - AbstractYarnClusterDescriptor yarnDescriptor; - try { - yarnDescriptor = createDescriptor(configuration, configurationDirectory, null, cmd); - } catch (Exception e) { - System.err.println("Error while starting the YARN Client: " + e.getMessage()); - e.printStackTrace(System.err); - return 1; - } + try (AbstractYarnClusterDescriptor yarnDescriptor = createClusterDescriptor(cmd)){ + final ClusterSpecification clusterSpecification; - final ClusterSpecification clusterSpecification = createClusterSpecification(yarnDescriptor.getFlinkConfiguration(), cmd); + try { + clusterSpecification = getClusterSpecification(cmd); + } catch (FlinkException e) { + System.err.println("Error while creating the cluster specification: " + e.getMessage()); + e.printStackTrace(); + return 1; + } - try { - yarnCluster = yarnDescriptor.deploySessionCluster(clusterSpecification); - } catch (Exception e) { - System.err.println("Error while deploying YARN cluster: " + e.getMessage()); - e.printStackTrace(System.err); - return 1; - } - //------------------ ClusterClient deployed, handle connection details - String jobManagerAddress = - yarnCluster.getJobManagerAddress().getAddress().getHostName() + - ":" + yarnCluster.getJobManagerAddress().getPort(); + try { + yarnCluster = yarnDescriptor.deploySessionCluster(clusterSpecification); + } catch (Exception e) { + System.err.println("Error while deploying YARN cluster: " + e.getMessage()); + e.printStackTrace(System.err); + return 1; + } + //------------------ ClusterClient deployed, handle connection details + String jobManagerAddress = + yarnCluster.getJobManagerAddress().getAddress().getHostName() + + ":" + yarnCluster.getJobManagerAddress().getPort(); - System.out.println("Flink JobManager is now running on " + jobManagerAddress); - System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL()); + System.out.println("Flink JobManager is now running on " + jobManagerAddress); + System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL()); - // file that we write into the conf/ dir containing the jobManager address and the dop. - File yarnPropertiesFile = getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration()); + // file that we write into the conf/ dir containing the jobManager address and the dop. + File yarnPropertiesFile = getYarnPropertiesLocation(configuration.getValue(YarnConfigOptions.PROPERTIES_FILE_LOCATION)); - Properties yarnProps = new Properties(); - yarnProps.setProperty(YARN_APPLICATION_ID_KEY, yarnCluster.getApplicationId().toString()); - if (clusterSpecification.getSlotsPerTaskManager() != -1) { - String parallelism = + Properties yarnProps = new Properties(); + yarnProps.setProperty(YARN_APPLICATION_ID_KEY, yarnCluster.getApplicationId().toString()); + if (clusterSpecification.getSlotsPerTaskManager() != -1) { + String parallelism = Integer.toString(clusterSpecification.getSlotsPerTaskManager() * clusterSpecification.getNumberTaskManagers()); - yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, parallelism); - } - // add dynamic properties - if (yarnDescriptor.getDynamicPropertiesEncoded() != null) { - yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, + yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, parallelism); + } + // add dynamic properties + if (yarnDescriptor.getDynamicPropertiesEncoded() != null) { + yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, yarnDescriptor.getDynamicPropertiesEncoded()); - } - writeYarnProperties(yarnProps, yarnPropertiesFile); + } + writeYarnProperties(yarnProps, yarnPropertiesFile); - //------------------ ClusterClient running, let user control it ------------ + //------------------ ClusterClient running, let user control it ------------ - if (detachedMode) { - // print info and quit: - LOG.info("The Flink YARN client has been started in detached mode. In order to stop " + + if (detachedMode) { + // print info and quit: + LOG.info("The Flink YARN client has been started in detached mode. In order to stop " + "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + "yarn application -kill " + yarnCluster.getApplicationId()); - yarnCluster.waitForClusterToBeReady(); - yarnCluster.disconnect(); - } else { + yarnCluster.waitForClusterToBeReady(); + yarnCluster.disconnect(); + } else { - ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1); + ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1); - try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor( + try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor( yarnDescriptor.getYarnClient(), yarnCluster.getApplicationId(), - new ScheduledExecutorServiceAdapter(scheduledExecutorService))){ - runInteractiveCli( - yarnCluster, - yarnApplicationStatusMonitor, - acceptInteractiveInput); - } finally { - // shut down the scheduled executor service - ExecutorUtils.gracefulShutdown( - 1000L, - TimeUnit.MILLISECONDS, - scheduledExecutorService); + new ScheduledExecutorServiceAdapter(scheduledExecutorService))) { + runInteractiveCli( + yarnCluster, + yarnApplicationStatusMonitor, + acceptInteractiveInput); + } finally { + // shut down the scheduled executor service + ExecutorUtils.gracefulShutdown( + 1000L, + TimeUnit.MILLISECONDS, + scheduledExecutorService); + } } + } catch (FlinkException e) { + System.err.println("Error while deploying a Flink cluster: " + e.getMessage()); + e.printStackTrace(); + return 1; } } return 0; @@ -693,18 +720,20 @@ private void logAndSysout(String message) { } public static void main(final String[] args) throws Exception { - final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session - final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv(); final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(); + + final FlinkYarnSessionCli cli = new FlinkYarnSessionCli( + flinkConfiguration, + configurationDirectory, + "", + ""); // no prefix for the YARN session + SecurityUtils.install(new SecurityConfiguration(flinkConfiguration)); - int retCode = SecurityUtils.getInstalledContext().runSecured(new Callable() { - @Override - public Integer call() { - return cli.run(args, flinkConfiguration, configurationDirectory); - } - }); + + final int retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args, flinkConfiguration, configurationDirectory)); + System.exit(retCode); } @@ -858,11 +887,17 @@ public static Map getDynamicProperties(String dynamicPropertiesE } } - public static File getYarnPropertiesLocation(Configuration conf) { - String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir"); + public static File getYarnPropertiesLocation(@Nullable String yarnPropertiesFileLocation) { + + final String propertiesFileLocation; + + if (yarnPropertiesFileLocation != null) { + propertiesFileLocation = yarnPropertiesFileLocation; + } else { + propertiesFileLocation = System.getProperty("java.io.tmpdir"); + } + String currentUser = System.getProperty("user.name"); - String propertiesFileLocation = - conf.getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation); return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser); } diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java index 8f8359fa040e3..2f6c1576b2f93 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java @@ -22,7 +22,7 @@ import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; -import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.flink.yarn.configuration.YarnConfigOptions; @@ -69,6 +69,8 @@ public class FlinkYarnSessionCliTest extends TestLogger { public void testDynamicProperties() throws Exception { FlinkYarnSessionCli cli = new FlinkYarnSessionCli( + new Configuration(), + tmp.getRoot().getAbsolutePath(), "", "", false); @@ -80,11 +82,7 @@ public void testDynamicProperties() throws Exception { CommandLine cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar", "-n", "15", "-D", "akka.ask.timeout=5 min", "-D", "env.java.opts=-DappName=foobar"}); - AbstractYarnClusterDescriptor flinkYarnDescriptor = cli.createDescriptor( - new Configuration(), - tmp.getRoot().getAbsolutePath(), - null, - cmd); + AbstractYarnClusterDescriptor flinkYarnDescriptor = cli.createClusterDescriptor(cmd); Assert.assertNotNull(flinkYarnDescriptor); @@ -100,7 +98,11 @@ public void testNotEnoughTaskSlots() throws Exception { String[] params = new String[] {"-yn", "2", "-ys", "3", "-p", "7"}; - FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli("y", "yarn"); + FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli( + new Configuration(), + tmp.getRoot().getAbsolutePath(), + "y", + "yarn"); Options options = new Options(); // TODO: Nasty workaround: We should get rid of the YarnCLI and run options coupling @@ -110,7 +112,7 @@ public void testNotEnoughTaskSlots() throws Exception { final CommandLine commandLine = CliFrontendParser.parse(options, params, true); - ClusterSpecification clusterSpecification = yarnCLI.createClusterSpecification(new Configuration(), commandLine); + ClusterSpecification clusterSpecification = yarnCLI.getClusterSpecification(commandLine); // each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased. assertEquals(4, clusterSpecification.getSlotsPerTaskManager()); @@ -122,21 +124,17 @@ public void testCorrectSettingOfMaxSlots() throws Exception { String[] params = new String[] {"-yn", "2", "-ys", "3"}; - FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli("y", "yarn"); + FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli( + new Configuration(), + tmp.getRoot().getAbsolutePath(), + "y", + "yarn"); final CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true); - final Configuration configuration = new Configuration(); + AbstractYarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(commandLine); - AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor( - configuration, - tmp.getRoot().getAbsolutePath(), - "", - commandLine); - - final ClusterSpecification clusterSpecification = yarnCLI.createClusterSpecification( - configuration, - commandLine); + final ClusterSpecification clusterSpecification = yarnCLI.getClusterSpecification(commandLine); // each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased. assertEquals(3, clusterSpecification.getSlotsPerTaskManager()); @@ -149,15 +147,15 @@ public void testZookeeperNamespaceProperty() throws Exception { String[] params = new String[] {"-yn", "2", "-yz", zkNamespaceCliInput}; - FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli("y", "yarn"); + FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli( + new Configuration(), + tmp.getRoot().getAbsolutePath(), + "y", + "yarn"); CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true); - AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor( - new Configuration(), - tmp.getRoot().getAbsolutePath(), - "", - commandLine); + AbstractYarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(commandLine); assertEquals(zkNamespaceCliInput, descriptor.getZookeeperNamespace()); } @@ -170,16 +168,18 @@ public void testResumeFromYarnPropertiesFile() throws Exception { File directoryPath = writeYarnPropertiesFile(validPropertiesFile); - final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli("y", "yarn"); - final Configuration configuration = new Configuration(); configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath()); + final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli( + configuration, + tmp.getRoot().getAbsolutePath(), + "y", + "yarn"); + final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {}, true); - final String clusterId = flinkYarnSessionCli.getClusterId( - configuration, - commandLine); + final String clusterId = flinkYarnSessionCli.getClusterId(commandLine); assertEquals(TEST_YARN_APPLICATION_ID.toString(), clusterId); } @@ -188,50 +188,49 @@ public void testResumeFromYarnPropertiesFile() throws Exception { * Tests that we fail when reading an invalid yarn properties file when retrieving * the cluster id. */ - @Test(expected = IllegalConfigurationException.class) + @Test(expected = FlinkException.class) public void testInvalidYarnPropertiesFile() throws Exception { File directoryPath = writeYarnPropertiesFile(invalidPropertiesFile); - final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli("y", "yarn"); - final Configuration configuration = new Configuration(); configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath()); - final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {}, true); - - flinkYarnSessionCli.getClusterId( + new FlinkYarnSessionCli( configuration, - commandLine); + tmp.getRoot().getAbsolutePath(), + "y", + "yarn"); } @Test public void testResumeFromYarnID() throws Exception { - final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli("y", "yarn"); - final Configuration configuration = new Configuration(); + final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli( + configuration, + tmp.getRoot().getAbsolutePath(), + "y", + "yarn"); final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}, true); - final String clusterId = flinkYarnSessionCli.getClusterId( - configuration, - commandLine); + final String clusterId = flinkYarnSessionCli.getClusterId(commandLine); assertEquals(TEST_YARN_APPLICATION_ID.toString(), clusterId); } @Test public void testResumeFromYarnIDZookeeperNamespace() throws Exception { - final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli("y", "yarn"); - final Configuration configuration = new Configuration(); + final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli( + configuration, + tmp.getRoot().getAbsolutePath(), + "y", + "yarn"); final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}, true); - final AbstractYarnClusterDescriptor clusterDescriptor = flinkYarnSessionCli.createClusterDescriptor( - configuration, - tmp.getRoot().getAbsolutePath(), - commandLine); + final AbstractYarnClusterDescriptor clusterDescriptor = flinkYarnSessionCli.createClusterDescriptor(commandLine); final Configuration clusterDescriptorConfiguration = clusterDescriptor.getFlinkConfiguration(); @@ -241,17 +240,18 @@ public void testResumeFromYarnIDZookeeperNamespace() throws Exception { @Test public void testResumeFromYarnIDZookeeperNamespaceOverride() throws Exception { - final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli("y", "yarn"); + final Configuration configuration = new Configuration(); + final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli( + configuration, + tmp.getRoot().getAbsolutePath(), + "y", + "yarn"); final String overrideZkNamespace = "my_cluster"; - final Configuration configuration = new Configuration(); final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString(), "-yz", overrideZkNamespace}, true); - final AbstractYarnClusterDescriptor clusterDescriptor = flinkYarnSessionCli.createClusterDescriptor( - configuration, - tmp.getRoot().getAbsolutePath(), - commandLine); + final AbstractYarnClusterDescriptor clusterDescriptor = flinkYarnSessionCli.createClusterDescriptor(commandLine); final Configuration clusterDescriptorConfiguration = clusterDescriptor.getFlinkConfiguration(); @@ -263,17 +263,16 @@ public void testResumeFromYarnIDZookeeperNamespaceOverride() throws Exception { public void testYarnIDOverridesPropertiesFile() throws Exception { File directoryPath = writeYarnPropertiesFile(validPropertiesFile); - final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli("y", "yarn"); - final Configuration configuration = new Configuration(); configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath()); - final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID_2.toString() }, true); - - final String clusterId = flinkYarnSessionCli.getClusterId( + final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli( configuration, - commandLine); - + tmp.getRoot().getAbsolutePath(), + "y", + "yarn"); + final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID_2.toString() }, true); + final String clusterId = flinkYarnSessionCli.getClusterId(commandLine); assertEquals(TEST_YARN_APPLICATION_ID_2.toString(), clusterId); } From 86cdcc0f9bdba8f82172746b91640ef6cdd18d7f Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 11 Jan 2018 21:00:38 +0100 Subject: [PATCH 2/7] fixup! [FLINK-8340] [flip6] Remove passing of Configuration to CustomCommandLine --- .../apache/flink/client/cli/CliFrontendPackageProgramTest.java | 2 -- .../java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java index 30b1128c71839..fa4c4646c1070 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java @@ -163,8 +163,6 @@ public void testValidVariantWithNoJarAndNoArgumentsOption() throws Exception { @Test(expected = CliArgsException.class) public void testNoJarNoArgumentsAtAll() throws Exception { frontend.run(new String[0]); - - fail("Should have failed."); } @Test diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 1be3c570ca6fd..bb0bd5bb98786 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -211,7 +211,7 @@ public FlinkYarnSessionCli( yarnPropertiesFile = new Properties(); if (yarnPropertiesLocation.exists()) { - LOG.info("Found Yarn properties file under " + yarnPropertiesLocation.getAbsolutePath() + '.'); + LOG.info("Found Yarn properties file under {}.", yarnPropertiesLocation.getAbsolutePath()); try (InputStream is = new FileInputStream(yarnPropertiesLocation)) { yarnPropertiesFile.load(is); From 75f4a42965b30a2ed9901f12b933065b8a3a1b0e Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 11 Jan 2018 21:09:14 +0100 Subject: [PATCH 3/7] fixup! [FLINK-8340] [flip6] Remove passing of Configuration to CustomCommandLine --- .../flink/client/cli/CliFrontendPackageProgramTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java index fa4c4646c1070..fbf3d104c0875 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java @@ -63,14 +63,14 @@ public static void init() { @Before public void setup() throws Exception { + final Configuration configuration = new Configuration(); frontend = new CliFrontend( - new Configuration(), - Collections.singletonList(new DefaultCLI())); + configuration, + Collections.singletonList(new DefaultCLI(configuration))); } @Test - public void testNonExistingJarFile() { - try { + public void testNonExistingJarFile() throws Exception { ProgramOptions options = mock(ProgramOptions.class); when(options.getJarFilePath()).thenReturn("/some/none/existing/path"); From 37126503f5c9cfb7eb22721912aa85eed942da36 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 11 Jan 2018 21:12:10 +0100 Subject: [PATCH 4/7] fixup! [FLINK-8340] [flip6] Remove passing of Configuration to CustomCommandLine --- .../flink/client/cli/CliFrontendPackageProgramTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java index fbf3d104c0875..6873e68d1376a 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java @@ -66,7 +66,7 @@ public void setup() throws Exception { final Configuration configuration = new Configuration(); frontend = new CliFrontend( configuration, - Collections.singletonList(new DefaultCLI(configuration))); + Collections.singletonList(new DefaultCLI(configuration))); } @Test @@ -179,7 +179,7 @@ public void testNonExistingFileWithArguments() throws Exception { assertEquals(arguments[4], options.getJarFilePath()); assertArrayEquals(classpath, options.getClasspaths().toArray()); assertArrayEquals(reducedArguments, options.getProgramArgs()); - + try { frontend.buildProgram(options); fail("Should fail with an exception"); From e4aeaa205e6ddc5d7714c254be55230a32faed98 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 12 Jan 2018 00:19:13 +0100 Subject: [PATCH 5/7] fixup! [FLINK-8340] [flip6] Remove passing of Configuration to CustomCommandLine --- .../java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index bb0bd5bb98786..d4ab41f20b502 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -491,7 +491,7 @@ protected Configuration applyCommandLineOptionsToConfiguration(CommandLine comma } private boolean isYarnPropertiesFileMode(CommandLine commandLine) { - boolean canApplyYarnProperties = !commandLine.hasOption(ADDRESS_OPTION.getOpt()); + boolean canApplyYarnProperties = !commandLine.hasOption(addressOption.getOpt()); for (Option option : commandLine.getOptions()) { if (allOptions.hasOption(option.getOpt())) { From 0793eea2510de4113cd7dc27200c4506071620ef Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 20 Dec 2017 16:32:18 +0100 Subject: [PATCH 6/7] [FLINK-8341] [flip6] Remove not needed options from CommandLineOptions --- .../flink/client/cli/CommandLineOptions.java | 16 ---------- .../apache/flink/api/scala/FlinkShell.scala | 30 +++++++++++++------ 2 files changed, 21 insertions(+), 25 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java index a9a29b276c003..1b57620df00a4 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java @@ -20,7 +20,6 @@ import org.apache.commons.cli.CommandLine; -import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.HELP_OPTION; /** @@ -29,28 +28,13 @@ */ public abstract class CommandLineOptions { - private final CommandLine commandLine; - - private final String jobManagerAddress; - private final boolean printHelp; protected CommandLineOptions(CommandLine line) { - this.commandLine = line; this.printHelp = line.hasOption(HELP_OPTION.getOpt()); - this.jobManagerAddress = line.hasOption(ADDRESS_OPTION.getOpt()) ? - line.getOptionValue(ADDRESS_OPTION.getOpt()) : null; - } - - public CommandLine getCommandLine() { - return commandLine; } public boolean isPrintHelp() { return printHelp; } - - public String getJobManagerAddress() { - return jobManagerAddress; - } } diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala index 9f29ce01898e5..16d8c34c8c2e2 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala @@ -20,7 +20,7 @@ package org.apache.flink.api.scala import java.io._ -import org.apache.flink.client.cli.{CliFrontend, CliFrontendParser} +import org.apache.flink.client.cli.{CliFrontend, CliFrontendParser, RunOptions} import org.apache.flink.client.program.ClusterClient import org.apache.flink.configuration.{Configuration, GlobalConfiguration, JobManagerOptions} import org.apache.flink.runtime.minicluster.StandaloneMiniCluster @@ -253,16 +253,22 @@ object FlinkShell { yarnConfig.queue.foreach((queue) => args ++= Seq("-yqu", queue.toString)) yarnConfig.slots.foreach((slots) => args ++= Seq("-ys", slots.toString)) - val options = CliFrontendParser.parseRunCommand(args.toArray) + val commandLine = CliFrontendParser.parse( + CliFrontendParser.getRunCommandOptions, + args.toArray, + true) + + val options = new RunOptions(commandLine) + val frontend = new CliFrontend( configuration, CliFrontend.loadCustomCommandLines(configuration, configurationDirectory)) val config = frontend.getConfiguration - val customCLI = frontend.getActiveCustomCommandLine(options.getCommandLine) + val customCLI = frontend.getActiveCustomCommandLine(commandLine) - val clusterDescriptor = customCLI.createClusterDescriptor(options.getCommandLine) + val clusterDescriptor = customCLI.createClusterDescriptor(commandLine) - val clusterSpecification = customCLI.getClusterSpecification(options.getCommandLine) + val clusterSpecification = customCLI.getClusterSpecification(commandLine) val cluster = clusterDescriptor.deploySessionCluster(clusterSpecification) @@ -281,16 +287,22 @@ object FlinkShell { "-m", "yarn-cluster" ) - val options = CliFrontendParser.parseRunCommand(args.toArray) + val commandLine = CliFrontendParser.parse( + CliFrontendParser.getRunCommandOptions, + args.toArray, + true) + + val options = new RunOptions(commandLine) + val frontend = new CliFrontend( configuration, CliFrontend.loadCustomCommandLines(configuration, configurationDirectory)) val config = frontend.getConfiguration - val customCLI = frontend.getActiveCustomCommandLine(options.getCommandLine) + val customCLI = frontend.getActiveCustomCommandLine(commandLine) - val clusterDescriptor = customCLI.createClusterDescriptor(options.getCommandLine) + val clusterDescriptor = customCLI.createClusterDescriptor(commandLine) - val clusterId = customCLI.getClusterId(options.getCommandLine) + val clusterId = customCLI.getClusterId(commandLine) val cluster = clusterDescriptor.retrieve(clusterId) From 425750994394ab4c77bb1dbfea4790461dbaf0cf Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 11 Jan 2018 22:53:57 +0100 Subject: [PATCH 7/7] fixup! [FLINK-8341] [flip6] Remove not needed options from CommandLineOptions --- .../main/scala/org/apache/flink/api/scala/FlinkShell.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala index 16d8c34c8c2e2..349d94085b7ed 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala @@ -258,12 +258,9 @@ object FlinkShell { args.toArray, true) - val options = new RunOptions(commandLine) - val frontend = new CliFrontend( configuration, CliFrontend.loadCustomCommandLines(configuration, configurationDirectory)) - val config = frontend.getConfiguration val customCLI = frontend.getActiveCustomCommandLine(commandLine) val clusterDescriptor = customCLI.createClusterDescriptor(commandLine) @@ -292,12 +289,9 @@ object FlinkShell { args.toArray, true) - val options = new RunOptions(commandLine) - val frontend = new CliFrontend( configuration, CliFrontend.loadCustomCommandLines(configuration, configurationDirectory)) - val config = frontend.getConfiguration val customCLI = frontend.getActiveCustomCommandLine(commandLine) val clusterDescriptor = customCLI.createClusterDescriptor(commandLine)