From 1393089c2fcdcc950d6b5156e9adb5b8a34c36a7 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 2 Jan 2018 07:42:18 +0100 Subject: [PATCH] [FLINK-8333] [flip6] Separate deployment options from command options This commit separates the parsing of command options and deployment options into two steps. This makes it easier to make the CustomCommandLines non-static. Moreover, this commit moves the CliFrontend into the cli sub package. --- .../flink/client/cli/CliArgsException.java | 4 + .../flink/client/{ => cli}/CliFrontend.java | 396 ++++++++---------- .../flink/client/cli/CliFrontendParser.java | 47 ++- .../apache/flink/client/cli/DefaultCLI.java | 2 +- .../flink/client/cli/Flip6DefaultCLI.java | 2 +- .../flink/client/CliFrontendRunTest.java | 170 -------- .../CliFrontendAddressConfigurationTest.java | 6 +- .../{ => cli}/CliFrontendCancelTest.java | 119 +++--- .../client/{ => cli}/CliFrontendInfoTest.java | 54 +-- .../client/{ => cli}/CliFrontendListTest.java | 22 +- .../CliFrontendPackageProgramTest.java | 37 +- .../flink/client/cli/CliFrontendRunTest.java | 168 ++++++++ .../{ => cli}/CliFrontendSavepointTest.java | 5 +- .../client/{ => cli}/CliFrontendStopTest.java | 76 ++-- .../{ => cli}/CliFrontendTestUtils.java | 2 +- .../{ => cli}/util/MockedCliFrontend.java | 6 +- .../client/program/PackagedProgramTest.java | 2 +- flink-dist/src/main/flink-bin/bin/flink | 2 +- flink-dist/src/main/flink-bin/bin/flink.bat | 2 +- .../apache/flink/api/scala/FlinkShell.scala | 3 +- ...iFrontendYarnAddressConfigurationTest.java | 6 +- .../flink/yarn/FlinkYarnSessionCliTest.java | 2 +- .../org/apache/flink/yarn/YarnTestBase.java | 9 +- .../flink/yarn/cli/FlinkYarnSessionCli.java | 2 +- 24 files changed, 548 insertions(+), 596 deletions(-) rename flink-clients/src/main/java/org/apache/flink/client/{ => cli}/CliFrontend.java (78%) delete mode 100644 flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java rename flink-clients/src/test/java/org/apache/flink/client/{ => cli}/CliFrontendAddressConfigurationTest.java (93%) rename flink-clients/src/test/java/org/apache/flink/client/{ => cli}/CliFrontendCancelTest.java (53%) rename flink-clients/src/test/java/org/apache/flink/client/{ => cli}/CliFrontendInfoTest.java (69%) rename flink-clients/src/test/java/org/apache/flink/client/{ => cli}/CliFrontendListTest.java (81%) rename flink-clients/src/test/java/org/apache/flink/client/{ => cli}/CliFrontendPackageProgramTest.java (91%) create mode 100644 flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java rename flink-clients/src/test/java/org/apache/flink/client/{ => cli}/CliFrontendSavepointTest.java (98%) rename flink-clients/src/test/java/org/apache/flink/client/{ => cli}/CliFrontendStopTest.java (66%) rename flink-clients/src/test/java/org/apache/flink/client/{ => cli}/CliFrontendTestUtils.java (98%) rename flink-clients/src/test/java/org/apache/flink/client/{ => cli}/util/MockedCliFrontend.java (93%) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java index 027be07f7a02fe..9efd6a312864d9 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java @@ -28,4 +28,8 @@ public class CliArgsException extends Exception { public CliArgsException(String message) { super(message); } + + public CliArgsException(String message, Throwable cause) { + super(message, cause); + } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java similarity index 78% rename from flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java rename to flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index 7596eda1a72fc6..329295c4532ebe 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.client; +package org.apache.flink.client.cli; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; @@ -24,19 +24,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.accumulators.AccumulatorHelper; -import org.apache.flink.client.cli.CancelOptions; -import org.apache.flink.client.cli.CliArgsException; -import org.apache.flink.client.cli.CliFrontendParser; -import org.apache.flink.client.cli.CommandLineOptions; -import org.apache.flink.client.cli.CustomCommandLine; -import org.apache.flink.client.cli.DefaultCLI; -import org.apache.flink.client.cli.Flip6DefaultCLI; -import org.apache.flink.client.cli.InfoOptions; -import org.apache.flink.client.cli.ListOptions; -import org.apache.flink.client.cli.ProgramOptions; -import org.apache.flink.client.cli.RunOptions; -import org.apache.flink.client.cli.SavepointOptions; -import org.apache.flink.client.cli.StopOptions; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.ProgramInvocationException; @@ -60,6 +47,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.util.EnvironmentInformation; @@ -69,6 +57,7 @@ import org.apache.flink.util.StringUtils; import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -211,53 +200,45 @@ public String getConfigurationDirectory() { * * @param args Command line arguments for the run action. */ - protected int run(String[] args) { + protected int run(String[] args) throws Exception { LOG.info("Running 'run' command."); - RunOptions options; - try { - options = CliFrontendParser.parseRunCommand(args); - } - catch (CliArgsException e) { - return handleArgException(e); - } - catch (Throwable t) { - return handleError(t); - } + final Options commandOptions = CliFrontendParser.getRunCommandOptions(); + + final CommandLine commandLine = CliFrontendParser.parse(commandOptions, args, true); + + final RunOptions runOptions = new RunOptions(commandLine); // evaluate help flag - if (options.isPrintHelp()) { + if (runOptions.isPrintHelp()) { CliFrontendParser.printHelpForRun(); return 0; } - if (options.getJarFilePath() == null) { - return handleArgException(new CliArgsException("The program JAR file was not specified.")); + if (runOptions.getJarFilePath() == null) { + throw new CliArgsException("The program JAR file was not specified."); } - PackagedProgram program; + final PackagedProgram program; try { LOG.info("Building program from JAR file"); - program = buildProgram(options); + program = buildProgram(runOptions); } catch (FileNotFoundException e) { - return handleArgException(e); - } - catch (Throwable t) { - return handleError(t); + throw new CliArgsException("Could not build the program from JAR file.", e); } - ClusterClient client = null; - try { + final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine); + ClusterClient client = createClient(customCommandLine, commandLine, program); - client = createClient(options, program); - client.setPrintStatusDuringExecution(options.getStdoutLogging()); - client.setDetached(options.getDetachedMode()); + try { + client.setPrintStatusDuringExecution(runOptions.getStdoutLogging()); + client.setDetached(runOptions.getDetachedMode()); LOG.debug("Client slots is set to {}", client.getMaxSlots()); - LOG.debug(options.getSavepointRestoreSettings().toString()); + LOG.debug(runOptions.getSavepointRestoreSettings().toString()); - int userParallelism = options.getParallelism(); + int userParallelism = runOptions.getParallelism(); LOG.debug("User parallelism is set to {}", userParallelism); if (client.getMaxSlots() != -1 && userParallelism == -1) { logAndSysout("Using the parallelism provided by the remote cluster (" @@ -270,19 +251,13 @@ protected int run(String[] args) { return executeProgram(program, client, userParallelism); } - catch (Throwable t) { - return handleError(t); - } finally { - if (client != null) { - try { - client.shutdown(); - } catch (Exception e) { - LOG.warn("Could not properly shut down the cluster client.", e); - } - } - if (program != null) { - program.deleteExtractedLibraries(); + program.deleteExtractedLibraries(); + + try { + client.shutdown(); + } catch (Exception e) { + LOG.info("Could not properly shut down the client.", e); } } } @@ -292,20 +267,14 @@ protected int run(String[] args) { * * @param args Command line arguments for the info action. */ - protected int info(String[] args) { + protected int info(String[] args) throws CliArgsException, FileNotFoundException, ProgramInvocationException { LOG.info("Running 'info' command."); - // Parse command line options - InfoOptions options; - try { - options = CliFrontendParser.parseInfoCommand(args); - } - catch (CliArgsException e) { - return handleArgException(e); - } - catch (Throwable t) { - return handleError(t); - } + final Options commandOptions = CliFrontendParser.getInfoCommandOptions(); + + final CommandLine commandLine = CliFrontendParser.parse(commandOptions, args, true); + + InfoOptions options = new InfoOptions(commandLine); // evaluate help flag if (options.isPrintHelp()) { @@ -319,14 +288,8 @@ protected int info(String[] args) { // -------- build the packaged program ------------- - PackagedProgram program; - try { - LOG.info("Building program from JAR file"); - program = buildProgram(options); - } - catch (Throwable t) { - return handleError(t); - } + LOG.info("Building program from JAR file"); + final PackagedProgram program = buildProgram(options); try { int parallelism = options.getParallelism(); @@ -366,9 +329,6 @@ protected int info(String[] args) { } return 0; } - catch (Throwable t) { - return handleError(t); - } finally { program.deleteExtractedLibraries(); } @@ -379,19 +339,14 @@ protected int info(String[] args) { * * @param args Command line arguments for the list action. */ - protected int list(String[] args) { + protected int list(String[] args) throws Exception { LOG.info("Running 'list' command."); - ListOptions options; - try { - options = CliFrontendParser.parseListCommand(args); - } - catch (CliArgsException e) { - return handleArgException(e); - } - catch (Throwable t) { - return handleError(t); - } + final Options commandOptions = CliFrontendParser.getListCommandOptions(); + + final CommandLine commandLine = CliFrontendParser.parse(commandOptions, args, false); + + ListOptions options = new ListOptions(commandLine); // evaluate help flag if (options.isPrintHelp()) { @@ -408,10 +363,10 @@ protected int list(String[] args) { scheduled = true; } - try { - CustomCommandLine activeCommandLine = getActiveCustomCommandLine(options.getCommandLine()); - ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); + final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine); + final ClusterClient client = activeCommandLine.retrieveCluster(commandLine, config, configurationDirectory); + try { Collection jobDetails; try { CompletableFuture> jobDetailsFuture = client.listJobs(); @@ -475,9 +430,12 @@ protected int list(String[] args) { } return 0; - } - catch (Throwable t) { - return handleError(t); + } finally { + try { + client.shutdown(); + } catch (Exception e) { + LOG.info("Could not properly shut down the client.", e); + } } } @@ -486,19 +444,14 @@ protected int list(String[] args) { * * @param args Command line arguments for the stop action. */ - protected int stop(String[] args) { + protected int stop(String[] args) throws Exception { LOG.info("Running 'stop' command."); - StopOptions options; - try { - options = CliFrontendParser.parseStopCommand(args); - } - catch (CliArgsException e) { - return handleArgException(e); - } - catch (Throwable t) { - return handleError(t); - } + final Options commandOptions = CliFrontendParser.getStopCommandOptions(); + + final CommandLine commandLine = CliFrontendParser.parse(commandOptions, args, false); + + StopOptions options = new StopOptions(commandLine); // evaluate help flag if (options.isPrintHelp()) { @@ -511,33 +464,28 @@ protected int stop(String[] args) { if (stopArgs.length > 0) { String jobIdString = stopArgs[0]; - try { - jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); - } - catch (Exception e) { - return handleError(e); - } + jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); } else { - return handleArgException(new CliArgsException("Missing JobID")); + throw new CliArgsException("Missing JobID"); } + final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine); + + final ClusterClient client = activeCommandLine.retrieveCluster(commandLine, config, configurationDirectory); + try { - CustomCommandLine activeCommandLine = getActiveCustomCommandLine(options.getCommandLine()); - ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); - try { - logAndSysout("Stopping job " + jobId + '.'); - client.stop(jobId); - logAndSysout("Stopped job " + jobId + '.'); + logAndSysout("Stopping job " + jobId + '.'); + client.stop(jobId); + logAndSysout("Stopped job " + jobId + '.'); - return 0; - } finally { + return 0; + } finally { + try { client.shutdown(); + } catch (Exception e) { + LOG.info("Could not properly shut down the client.", e); } - - } - catch (Throwable t) { - return handleError(t); } } @@ -546,30 +494,25 @@ protected int stop(String[] args) { * * @param args Command line arguments for the cancel action. */ - protected int cancel(String[] args) { + protected int cancel(String[] args) throws Exception { LOG.info("Running 'cancel' command."); - CancelOptions options; - try { - options = CliFrontendParser.parseCancelCommand(args); - } - catch (CliArgsException e) { - return handleArgException(e); - } - catch (Throwable t) { - return handleError(t); - } + final Options commandOptions = CliFrontendParser.getCancelCommandOptions(); + + final CommandLine commandLine = CliFrontendParser.parse(commandOptions, args, false); + + CancelOptions cancelOptions = new CancelOptions(commandLine); // evaluate help flag - if (options.isPrintHelp()) { + if (cancelOptions.isPrintHelp()) { CliFrontendParser.printHelpForCancel(); return 0; } - String[] cleanedArgs = options.getArgs(); + String[] cleanedArgs = cancelOptions.getArgs(); - boolean withSavepoint = options.isWithSavepoint(); - String targetDirectory = options.getSavepointTargetDirectory(); + boolean withSavepoint = cancelOptions.isWithSavepoint(); + String targetDirectory = cancelOptions.getSavepointTargetDirectory(); JobID jobId; @@ -583,9 +526,7 @@ protected int cancel(String[] args) { try { jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); } catch (Exception e) { - LOG.error("Error: The value for the Job ID is not a valid ID."); - System.out.println("Error: The value for the Job ID is not a valid ID."); - return 1; + throw new CliArgsException("The value for the JobID is not a valid ID: " + e.getMessage()); } } else if (targetDirectory != null) { // Try this for case: cancel -s (default savepoint target dir) @@ -594,42 +535,38 @@ protected int cancel(String[] args) { jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); targetDirectory = null; } catch (Exception e) { - LOG.error("Missing JobID in the command line arguments."); - System.out.println("Error: Specify a Job ID to cancel a job."); - return 1; + throw new CliArgsException("Missing JobID in the command line arguments: " + e.getMessage()); } } else { - LOG.error("Missing JobID in the command line arguments."); - System.out.println("Error: Specify a Job ID to cancel a job."); - return 1; + throw new CliArgsException("Missing JobID in the command line arguments."); } + final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine); + final ClusterClient client = activeCommandLine.retrieveCluster(commandLine, config, configurationDirectory); + try { - CustomCommandLine activeCommandLine = getActiveCustomCommandLine(options.getCommandLine()); - ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); - try { - if (withSavepoint) { - if (targetDirectory == null) { - logAndSysout("Cancelling job " + jobId + " with savepoint to default savepoint directory."); - } else { - logAndSysout("Cancelling job " + jobId + " with savepoint to " + targetDirectory + '.'); - } - String savepointPath = client.cancelWithSavepoint(jobId, targetDirectory); - logAndSysout("Cancelled job " + jobId + ". Savepoint stored in " + savepointPath + '.'); + if (withSavepoint) { + if (targetDirectory == null) { + logAndSysout("Cancelling job " + jobId + " with savepoint to default savepoint directory."); } else { - logAndSysout("Cancelling job " + jobId + '.'); - client.cancel(jobId); - logAndSysout("Cancelled job " + jobId + '.'); + logAndSysout("Cancelling job " + jobId + " with savepoint to " + targetDirectory + '.'); } + String savepointPath = client.cancelWithSavepoint(jobId, targetDirectory); + logAndSysout("Cancelled job " + jobId + ". Savepoint stored in " + savepointPath + '.'); + } else { + logAndSysout("Cancelling job " + jobId + '.'); + client.cancel(jobId); + logAndSysout("Cancelled job " + jobId + '.'); + } - return 0; - } finally { + return 0; + } finally { + try { client.shutdown(); + } catch (Exception e) { + LOG.info("Could not properly shut down the client.", e); } } - catch (Throwable t) { - return handleError(t); - } } /** @@ -637,17 +574,10 @@ protected int cancel(String[] args) { * * @param args Command line arguments for the cancel action. */ - protected int savepoint(String[] args) { + protected int savepoint(String[] args) throws CliArgsException { LOG.info("Running 'savepoint' command."); - SavepointOptions options; - try { - options = CliFrontendParser.parseSavepointCommand(args); - } catch (CliArgsException e) { - return handleArgException(e); - } catch (Throwable t) { - return handleError(t); - } + SavepointOptions options = CliFrontendParser.parseSavepointCommand(args); // evaluate help flag if (options.isPrintHelp()) { @@ -672,14 +602,12 @@ protected int savepoint(String[] args) { String jobIdString = cleanedArgs[0]; try { jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); - } catch (Exception e) { - return handleArgException(new IllegalArgumentException( - "Error: The value for the Job ID is not a valid ID.")); + } catch (Exception ignored) { + throw new CliArgsException("Error: The value for the Job ID is not a valid ID."); } } else { - return handleArgException(new IllegalArgumentException( - "Error: The value for the Job ID is not a valid ID. " + - "Specify a Job ID to trigger a savepoint.")); + throw new CliArgsException("Error: The value for the Job ID is not a valid ID. " + + "Specify a Job ID to trigger a savepoint."); } String savepointDirectory = null; @@ -730,8 +658,7 @@ private int triggerSavepoint(ClusterClient clusterClient, JobID jobId, String sa } /** - * Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint} - * message to the job manager. + * Sends a {@link JobManagerMessages.DisposeSavepoint} message to the job manager. */ private int disposeSavepoint(ClusterClient clusterClient, String savepointPath) { Preconditions.checkNotNull(savepointPath, "Missing required argument: savepoint path. " + @@ -859,27 +786,26 @@ protected ClusterClient retrieveClient(CommandLineOptions options) { /** * Creates a {@link ClusterClient} object from the given command line options and other parameters. - * @param options Command line options + * @param customCommandLine custom command line to use to retrieve the client + * @param commandLine command line to use * @param program The program for which to create the client. * @throws Exception */ protected ClusterClient createClient( - CommandLineOptions options, + CustomCommandLine customCommandLine, + CommandLine commandLine, PackagedProgram program) throws Exception { - // Get the custom command-line (e.g. Standalone/Yarn/Mesos) - CustomCommandLine activeCommandLine = getActiveCustomCommandLine(options.getCommandLine()); - ClusterClient client; try { - client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); + client = customCommandLine.retrieveCluster(commandLine, config, configurationDirectory); logAndSysout("Cluster configuration: " + client.getClusterIdentifier()); } catch (UnsupportedOperationException e) { try { String applicationName = "Flink Application: " + program.getMainClassName(); - client = activeCommandLine.createCluster( + client = customCommandLine.createCluster( applicationName, - options.getCommandLine(), + commandLine, config, configurationDirectory, program.getAllLibraries()); @@ -912,7 +838,7 @@ protected ClusterClient createClient( * @param e The exception to display. * @return The return code for the process. */ - private int handleArgException(Exception e) { + private static int handleArgException(Exception e) { LOG.error("Invalid command line arguments. " + (e.getMessage() == null ? "" : e.getMessage())); System.out.println(e.getMessage()); @@ -927,7 +853,7 @@ private int handleArgException(Exception e) { * @param e The exception to display. * @return The return code for the process. */ - private int handleParametrizationException(ProgramParametrizationException e) { + private static int handleParametrizationException(ProgramParametrizationException e) { System.err.println(e.getMessage()); return 1; } @@ -937,7 +863,7 @@ private int handleParametrizationException(ProgramParametrizationException e) { * * @return The return code for the process. */ - private int handleMissingJobException() { + private static int handleMissingJobException() { System.err.println(); System.err.println("The program didn't contain a Flink job. " + "Perhaps you forgot to call execute() on the execution environment."); @@ -950,7 +876,7 @@ private int handleMissingJobException() { * @param t The exception to display. * @return The return code for the process. */ - private int handleError(Throwable t) { + private static int handleError(Throwable t) { LOG.error("Error while running the command.", t); System.err.println(); @@ -973,7 +899,7 @@ private int handleError(Throwable t) { return 1; } - private void logAndSysout(String message) { + private static void logAndSysout(String message) { LOG.info(message); System.out.println(message); } @@ -1003,40 +929,50 @@ public int parseParameters(String[] args) { // remove action from parameters final String[] params = Arrays.copyOfRange(args, 1, args.length); - // do action - switch (action) { - case ACTION_RUN: - return run(params); - case ACTION_LIST: - return list(params); - case ACTION_INFO: - return info(params); - case ACTION_CANCEL: - return cancel(params); - case ACTION_STOP: - return stop(params); - case ACTION_SAVEPOINT: - return savepoint(params); - case "-h": - case "--help": - CliFrontendParser.printHelp(); - return 0; - case "-v": - case "--version": - String version = EnvironmentInformation.getVersion(); - String commitID = EnvironmentInformation.getRevisionInformation().commitId; - System.out.print("Version: " + version); - System.out.println(!commitID.equals(EnvironmentInformation.UNKNOWN) ? ", Commit ID: " + commitID : ""); - return 0; - default: - System.out.printf("\"%s\" is not a valid action.\n", action); - System.out.println(); - System.out.println("Valid actions are \"run\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\"."); - System.out.println(); - System.out.println("Specify the version option (-v or --version) to print Flink version."); - System.out.println(); - System.out.println("Specify the help option (-h or --help) to get help on the command."); - return 1; + try { + // do action + switch (action) { + case ACTION_RUN: + return run(params); + case ACTION_LIST: + return list(params); + case ACTION_INFO: + return info(params); + case ACTION_CANCEL: + return cancel(params); + case ACTION_STOP: + return stop(params); + case ACTION_SAVEPOINT: + return savepoint(params); + case "-h": + case "--help": + CliFrontendParser.printHelp(); + return 0; + case "-v": + case "--version": + String version = EnvironmentInformation.getVersion(); + String commitID = EnvironmentInformation.getRevisionInformation().commitId; + System.out.print("Version: " + version); + System.out.println(commitID.equals(EnvironmentInformation.UNKNOWN) ? "" : ", Commit ID: " + commitID); + return 0; + default: + System.out.printf("\"%s\" is not a valid action.\n", action); + System.out.println(); + System.out.println("Valid actions are \"run\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\"."); + System.out.println(); + System.out.println("Specify the version option (-v or --version) to print Flink version."); + System.out.println(); + System.out.println("Specify the help option (-h or --help) to get help on the command."); + return 1; + } + } catch (CliArgsException ce) { + return handleArgException(ce); + } catch (ProgramParametrizationException ppe) { + return handleParametrizationException(ppe); + } catch (ProgramMissingJobException pmje) { + return handleMissingJobException(); + } catch (Exception e) { + return handleError(e); } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java index 1aec3915865300..da2b064684cd50 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java @@ -18,7 +18,6 @@ package org.apache.flink.client.cli; -import org.apache.flink.client.CliFrontend; import org.apache.flink.configuration.CoreOptions; import org.apache.commons.cli.CommandLine; @@ -137,12 +136,12 @@ public class CliFrontendParser { CANCEL_WITH_SAVEPOINT_OPTION.setOptionalArg(true); } - private static final Options RUN_OPTIONS = getRunOptions(buildGeneralOptions(new Options())); - private static final Options INFO_OPTIONS = getInfoOptions(buildGeneralOptions(new Options())); - private static final Options LIST_OPTIONS = getListOptions(buildGeneralOptions(new Options())); - private static final Options CANCEL_OPTIONS = getCancelOptions(buildGeneralOptions(new Options())); - private static final Options STOP_OPTIONS = getStopOptions(buildGeneralOptions(new Options())); - private static final Options SAVEPOINT_OPTIONS = getSavepointOptions(buildGeneralOptions(new Options())); + private static final Options RUN_OPTIONS = getRunCommandOptions(); + private static final Options INFO_OPTIONS = getInfoCommandOptions(); + private static final Options LIST_OPTIONS = getListCommandOptions(); + private static final Options CANCEL_OPTIONS = getCancelCommandOptions(); + private static final Options STOP_OPTIONS = getStopCommandOptions(); + private static final Options SAVEPOINT_OPTIONS = getSavepointCommandOptions(); private static Options buildGeneralOptions(Options options) { options.addOption(HELP_OPTION); @@ -177,6 +176,10 @@ private static Options getProgramSpecificOptionsWithoutDeprecatedOptions(Options return options; } + static Options getRunCommandOptions() { + return getRunOptions(buildGeneralOptions(new Options())); + } + private static Options getRunOptions(Options options) { options = getProgramSpecificOptions(options); options.addOption(SAVEPOINT_PATH_OPTION); @@ -191,12 +194,20 @@ private static Options getJobManagerAddressOption(Options options) { return options; } + static Options getInfoCommandOptions() { + return getInfoOptions(buildGeneralOptions(new Options())); + } + private static Options getInfoOptions(Options options) { options = getProgramSpecificOptions(options); options = getJobManagerAddressOption(options); return addCustomCliOptions(options, false); } + static Options getListCommandOptions() { + return getListOptions(buildGeneralOptions(new Options())); + } + private static Options getListOptions(Options options) { options.addOption(RUNNING_OPTION); options.addOption(SCHEDULED_OPTION); @@ -204,17 +215,29 @@ private static Options getListOptions(Options options) { return addCustomCliOptions(options, false); } + static Options getCancelCommandOptions() { + return getCancelOptions(buildGeneralOptions(new Options())); + } + private static Options getCancelOptions(Options options) { options.addOption(CANCEL_WITH_SAVEPOINT_OPTION); options = getJobManagerAddressOption(options); return addCustomCliOptions(options, false); } + static Options getStopCommandOptions() { + return getStopOptions(buildGeneralOptions(new Options())); + } + private static Options getStopOptions(Options options) { options = getJobManagerAddressOption(options); return addCustomCliOptions(options, false); } + static Options getSavepointCommandOptions() { + return getSavepointOptions(buildGeneralOptions(new Options())); + } + private static Options getSavepointOptions(Options options) { options = getJobManagerAddressOption(options); options.addOption(SAVEPOINT_DISPOSE_OPTION); @@ -479,4 +502,14 @@ public static InfoOptions parseInfoCommand(String[] args) throws CliArgsExceptio } } + public static CommandLine parse(Options options, String[] args, boolean stopAtNonOptions) throws CliArgsException { + final DefaultParser parser = new DefaultParser(); + + try { + return parser.parse(options, args, stopAtNonOptions); + } catch (ParseException e) { + throw new CliArgsException(e.getMessage()); + } + } + } diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java index a5d8a30e233c33..1d25752e8f8f18 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java @@ -32,7 +32,7 @@ import java.net.URL; import java.util.List; -import static org.apache.flink.client.CliFrontend.setJobManagerAddressInConfig; +import static org.apache.flink.client.cli.CliFrontend.setJobManagerAddressInConfig; /** * The default CLI which is used for interaction with standalone clusters. diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java index 5fb9dfce7ff087..413b465ac37ce5 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java @@ -33,7 +33,7 @@ import java.net.URL; import java.util.List; -import static org.apache.flink.client.CliFrontend.setJobManagerAddressInConfig; +import static org.apache.flink.client.cli.CliFrontend.setJobManagerAddressInConfig; /** * The default CLI which is used for interaction with standalone clusters. diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java deleted file mode 100644 index 0edc44483e4ec6..00000000000000 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.client; - -import org.apache.flink.client.cli.CliFrontendParser; -import org.apache.flink.client.cli.Flip6DefaultCLI; -import org.apache.flink.client.cli.RunOptions; -import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.client.program.PackagedProgram; -import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; - -import org.junit.BeforeClass; -import org.junit.Test; - -import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * Tests for the RUN command. - */ -public class CliFrontendRunTest { - - @BeforeClass - public static void init() { - CliFrontendTestUtils.pipeSystemOutToNull(); - } - - @Test - public void testRun() { - try { - // test unrecognized option - { - String[] parameters = {"-v", "-l", "-a", "some", "program", "arguments"}; - CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); - int retCode = testFrontend.run(parameters); - assertNotEquals(0, retCode); - } - - // test without parallelism - { - String[] parameters = {"-v", getTestJarPath()}; - RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(1, true, false); - assertEquals(0, testFrontend.run(parameters)); - } - - // test configure parallelism - { - String[] parameters = {"-v", "-p", "42", getTestJarPath()}; - RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(42, true, false); - assertEquals(0, testFrontend.run(parameters)); - } - - // test configure sysout logging - { - String[] parameters = {"-p", "2", "-q", getTestJarPath()}; - RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(2, false, false); - assertEquals(0, testFrontend.run(parameters)); - } - - // test detached mode - { - String[] parameters = {"-p", "2", "-d", getTestJarPath()}; - RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(2, true, true); - assertEquals(0, testFrontend.run(parameters)); - } - - // test configure parallelism with non integer value - { - String[] parameters = {"-v", "-p", "text", getTestJarPath()}; - CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); - assertNotEquals(0, testFrontend.run(parameters)); - } - - // test configure parallelism with overflow integer value - { - String[] parameters = {"-v", "-p", "475871387138", getTestJarPath()}; - CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); - assertNotEquals(0, testFrontend.run(parameters)); - } - - // test configure savepoint path (no ignore flag) - { - String[] parameters = {"-s", "expectedSavepointPath", getTestJarPath()}; - RunOptions options = CliFrontendParser.parseRunCommand(parameters); - SavepointRestoreSettings savepointSettings = options.getSavepointRestoreSettings(); - assertTrue(savepointSettings.restoreSavepoint()); - assertEquals("expectedSavepointPath", savepointSettings.getRestorePath()); - assertFalse(savepointSettings.allowNonRestoredState()); - } - - // test configure savepoint path (with ignore flag) - { - String[] parameters = {"-s", "expectedSavepointPath", "-n", getTestJarPath()}; - RunOptions options = CliFrontendParser.parseRunCommand(parameters); - SavepointRestoreSettings savepointSettings = options.getSavepointRestoreSettings(); - assertTrue(savepointSettings.restoreSavepoint()); - assertEquals("expectedSavepointPath", savepointSettings.getRestorePath()); - assertTrue(savepointSettings.allowNonRestoredState()); - } - - // test jar arguments - { - String[] parameters = - {"-m", "localhost:6123", getTestJarPath(), "-arg1", "value1", "justavalue", "--arg2", "value2"}; - RunOptions options = CliFrontendParser.parseRunCommand(parameters); - assertEquals("-arg1", options.getProgramArgs()[0]); - assertEquals("value1", options.getProgramArgs()[1]); - assertEquals("justavalue", options.getProgramArgs()[2]); - assertEquals("--arg2", options.getProgramArgs()[3]); - assertEquals("value2", options.getProgramArgs()[4]); - } - - // test flip6 switch - { - String[] parameters = - {"-flip6", getTestJarPath()}; - RunOptions options = CliFrontendParser.parseRunCommand(parameters); - assertTrue(options.getCommandLine().hasOption(Flip6DefaultCLI.FLIP_6.getOpt())); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - // -------------------------------------------------------------------------------------------- - - private static final class RunTestingCliFrontend extends CliFrontend { - - private final int expectedParallelism; - private final boolean sysoutLogging; - private final boolean isDetached; - - public RunTestingCliFrontend(int expectedParallelism, boolean logging, boolean isDetached) throws Exception { - super(CliFrontendTestUtils.getConfigDir()); - this.expectedParallelism = expectedParallelism; - this.sysoutLogging = logging; - this.isDetached = isDetached; - } - - @Override - protected int executeProgram(PackagedProgram program, ClusterClient client, int parallelism) { - assertEquals(isDetached, client.isDetached()); - assertEquals(sysoutLogging, client.getPrintStatusDuringExecution()); - assertEquals(expectedParallelism, parallelism); - return 0; - } - } -} diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendAddressConfigurationTest.java similarity index 93% rename from flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java rename to flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendAddressConfigurationTest.java index 28c3226b550288..a030442a8617dc 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendAddressConfigurationTest.java @@ -16,10 +16,8 @@ * limitations under the License. */ -package org.apache.flink.client; +package org.apache.flink.client.cli; -import org.apache.flink.client.cli.CliFrontendParser; -import org.apache.flink.client.cli.RunOptions; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; @@ -31,7 +29,7 @@ import java.net.InetSocketAddress; -import static org.apache.flink.client.CliFrontendTestUtils.checkJobManagerAddress; +import static org.apache.flink.client.cli.CliFrontendTestUtils.checkJobManagerAddress; import static org.junit.Assert.fail; /** diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendCancelTest.java similarity index 53% rename from flink-clients/src/test/java/org/apache/flink/client/CliFrontendCancelTest.java rename to flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendCancelTest.java index f2508dccaca1e7..10dba1dea9c3e4 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendCancelTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendCancelTest.java @@ -16,20 +16,17 @@ * limitations under the License. */ -package org.apache.flink.client; +package org.apache.flink.client.cli; import org.apache.flink.api.common.JobID; -import org.apache.flink.client.cli.CancelOptions; -import org.apache.flink.client.cli.CliFrontendParser; -import org.apache.flink.client.cli.Flip6DefaultCLI; -import org.apache.flink.client.util.MockedCliFrontend; +import org.apache.flink.client.cli.util.MockedCliFrontend; +import org.apache.flink.util.TestLogger; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -42,7 +39,7 @@ /** * Tests for the CANCEL command. */ -public class CliFrontendCancelTest { +public class CliFrontendCancelTest extends TestLogger { @BeforeClass public static void init() { @@ -50,62 +47,46 @@ public static void init() { } @Test - public void testCancel() { - try { - // test unrecognized option - { - String[] parameters = {"-v", "-l"}; - CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); - int retCode = testFrontend.cancel(parameters); - assertTrue(retCode != 0); - } - - // test missing job id - { - String[] parameters = {}; - CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); - int retCode = testFrontend.cancel(parameters); - assertTrue(retCode != 0); - } + public void testCancel() throws Exception { - // test cancel properly - { - JobID jid = new JobID(); + // test cancel properly + { + JobID jid = new JobID(); - String[] parameters = { jid.toString() }; - CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(false); + String[] parameters = { jid.toString() }; + CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(false); - int retCode = testFrontend.cancel(parameters); - assertTrue(retCode == 0); + int retCode = testFrontend.cancel(parameters); + assertTrue(retCode == 0); - Mockito.verify(testFrontend.client, times(1)).cancel(any(JobID.class)); - } + Mockito.verify(testFrontend.client, times(1)).cancel(any(JobID.class)); + } - // test cancel properly - { - JobID jid = new JobID(); + // test flip6 switch + { + String[] parameters = + {"-flip6", String.valueOf(new JobID())}; + CancelOptions options = CliFrontendParser.parseCancelCommand(parameters); + assertTrue(options.getCommandLine().hasOption(Flip6DefaultCLI.FLIP_6.getOpt())); + } + } - String[] parameters = { jid.toString() }; - CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(true); + @Test(expected = CliArgsException.class) + public void testMissingJobId() throws Exception { + String[] parameters = {}; + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + testFrontend.cancel(parameters); - int retCode = testFrontend.cancel(parameters); - assertTrue(retCode != 0); + fail("Should have failed."); + } - Mockito.verify(testFrontend.client, times(1)).cancel(any(JobID.class)); - } + @Test(expected = CliArgsException.class) + public void testUnrecognizedOption() throws Exception { + String[] parameters = {"-v", "-l"}; + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + testFrontend.cancel(parameters); - // test flip6 switch - { - String[] parameters = - {"-flip6", String.valueOf(new JobID())}; - CancelOptions options = CliFrontendParser.parseCancelCommand(parameters); - assertTrue(options.getCommandLine().hasOption(Flip6DefaultCLI.FLIP_6.getOpt())); - } - } - catch (Exception e) { - e.printStackTrace(); - fail("Program caused an exception: " + e.getMessage()); - } + fail("Should have failed with CliArgsException."); } /** @@ -136,20 +117,26 @@ public void testCancelWithSavepoint() throws Exception { Mockito.verify(testFrontend.client, times(1)) .cancelWithSavepoint(any(JobID.class), notNull(String.class)); } + } - { - // Cancel with savepoint (with target directory), but no job ID - String[] parameters = { "-s", "targetDirectory" }; - CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); - assertNotEquals(0, testFrontend.cancel(parameters)); - } + @Test(expected = CliArgsException.class) + public void testCancelWithSavepointWithoutJobId() throws Exception { + // Cancel with savepoint (with target directory), but no job ID + String[] parameters = { "-s", "targetDirectory" }; + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + testFrontend.cancel(parameters); - { - // Cancel with savepoint (no target directory) and no job ID - String[] parameters = { "-s" }; - CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); - assertNotEquals(0, testFrontend.cancel(parameters)); - } + fail("Should have failed."); + } + + @Test(expected = CliArgsException.class) + public void testCancelWithSavepointWithoutParameters() throws Exception { + // Cancel with savepoint (no target directory) and no job ID + String[] parameters = { "-s" }; + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + testFrontend.cancel(parameters); + + fail("Should have failed."); } private static final class CancelTestCliFrontend extends MockedCliFrontend { diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendInfoTest.java similarity index 69% rename from flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java rename to flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendInfoTest.java index 5a79bb69d62315..cfca33e7324e35 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendInfoTest.java @@ -16,7 +16,9 @@ * limitations under the License. */ -package org.apache.flink.client; +package org.apache.flink.client.cli; + +import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -29,52 +31,42 @@ /** * Tests for the "info" command. */ -public class CliFrontendInfoTest { +public class CliFrontendInfoTest extends TestLogger { private static PrintStream stdOut; private static PrintStream capture; private static ByteArrayOutputStream buffer; - @Test - public void testErrorCases() { - try { - // test unrecognized option - { - String[] parameters = {"-v", "-l"}; - CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); - int retCode = testFrontend.cancel(parameters); - assertTrue(retCode != 0); - } - - // test missing options - { - String[] parameters = {}; - CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); - int retCode = testFrontend.cancel(parameters); - assertTrue(retCode != 0); - } - } - catch (Exception e) { - e.printStackTrace(); - fail("Program caused an exception: " + e.getMessage()); - } + @Test(expected = CliArgsException.class) + public void testMissingOption() throws Exception { + String[] parameters = {}; + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + testFrontend.cancel(parameters); + + fail("Should have failed with CliArgsException"); + } + + @Test(expected = CliArgsException.class) + public void testUnrecognizedOption() throws Exception { + String[] parameters = {"-v", "-l"}; + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + testFrontend.cancel(parameters); + + fail("Should have failed with CliArgsException"); } @Test - public void testShowExecutionPlan() { + public void testShowExecutionPlan() throws Exception { replaceStdOut(); try { - String[] parameters = new String[] { CliFrontendTestUtils.getTestJarPath(), "-f", "true"}; + String[] parameters = new String[]{CliFrontendTestUtils.getTestJarPath(), "-f", "true"}; CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); int retCode = testFrontend.info(parameters); assertTrue(retCode == 0); assertTrue(buffer.toString().contains("\"parallelism\": \"1\"")); } - catch (Exception e) { - e.printStackTrace(); - fail("Program caused an exception: " + e.getMessage()); - } finally { + finally { restoreStdOut(); } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java similarity index 81% rename from flink-clients/src/test/java/org/apache/flink/client/CliFrontendListTest.java rename to flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java index b559af1bff34cf..3185d233a456d5 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java @@ -16,9 +16,9 @@ * limitations under the License. */ -package org.apache.flink.client; +package org.apache.flink.client.cli; -import org.apache.flink.client.util.MockedCliFrontend; +import org.apache.flink.client.cli.util.MockedCliFrontend; import org.apache.flink.util.TestLogger; import org.junit.BeforeClass; @@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.times; import static org.powermock.api.mockito.PowerMockito.when; @@ -44,14 +45,6 @@ public static void init() { @Test public void testList() throws Exception { - // test unrecognized option - { - String[] parameters = {"-v", "-k"}; - CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); - int retCode = testFrontend.list(parameters); - assertTrue(retCode != 0); - } - // test list properly { String[] parameters = {"-r", "-s"}; @@ -63,6 +56,15 @@ public void testList() throws Exception { } } + @Test(expected = CliArgsException.class) + public void testUnrecognizedOption() throws Exception { + String[] parameters = {"-v", "-k"}; + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + testFrontend.list(parameters); + + fail("Should have failed with an CliArgsException."); + } + private static final class ListTestCliFrontend extends MockedCliFrontend { ListTestCliFrontend() throws Exception { diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java similarity index 91% rename from flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java rename to flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java index a75f49b892810c..b27eab2c2db061 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java @@ -16,11 +16,8 @@ * limitations under the License. */ -package org.apache.flink.client; +package org.apache.flink.client.cli; -import org.apache.flink.client.cli.CliFrontendParser; -import org.apache.flink.client.cli.ProgramOptions; -import org.apache.flink.client.cli.RunOptions; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.ProgramInvocationException; @@ -37,11 +34,11 @@ import java.io.FileNotFoundException; import java.net.URL; -import static org.apache.flink.client.CliFrontendTestUtils.TEST_JAR_CLASSLOADERTEST_CLASS; -import static org.apache.flink.client.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS; -import static org.apache.flink.client.CliFrontendTestUtils.getNonJarFilePath; -import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath; -import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull; +import static org.apache.flink.client.cli.CliFrontendTestUtils.TEST_JAR_CLASSLOADERTEST_CLASS; +import static org.apache.flink.client.cli.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS; +import static org.apache.flink.client.cli.CliFrontendTestUtils.getNonJarFilePath; +import static org.apache.flink.client.cli.CliFrontendTestUtils.getTestJarPath; +import static org.apache.flink.client.cli.CliFrontendTestUtils.pipeSystemOutToNull; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -190,16 +187,12 @@ public void testValidVariantWithNoJarAndNoArgumentsOption() { } } - @Test - public void testNoJarNoArgumentsAtAll() { - try { - CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); - assertTrue(frontend.run(new String[0]) != 0); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + @Test(expected = CliArgsException.class) + public void testNoJarNoArgumentsAtAll() throws Exception { + CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + assertTrue(frontend.run(new String[0]) != 0); + + fail("Should have failed."); } @Test @@ -267,9 +260,9 @@ public void testNonExistingFileWithoutArguments() { * at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:301) * at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:140) * at org.apache.flink.client.program.Client.getOptimizedPlanAsJson(Client.java:125) - * at org.apache.flink.client.CliFrontend.info(CliFrontend.java:439) - * at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:931) - * at org.apache.flink.client.CliFrontend.main(CliFrontend.java:951) + * at org.apache.flink.client.cli.CliFrontend.info(CliFrontend.java:439) + * at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:931) + * at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:951) * Caused by: java.io.IOException: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io.RCFileInputFormat * at org.apache.hcatalog.mapreduce.HCatInputFormat.setInput(HCatInputFormat.java:102) * at org.apache.hcatalog.mapreduce.HCatInputFormat.setInput(HCatInputFormat.java:54) diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java new file mode 100644 index 00000000000000..d17d999fec44ec --- /dev/null +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.cli; + +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; + +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.apache.flink.client.cli.CliFrontendTestUtils.getTestJarPath; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the RUN command. + */ +public class CliFrontendRunTest { + + @BeforeClass + public static void init() { + CliFrontendTestUtils.pipeSystemOutToNull(); + } + + @Test + public void testRun() throws Exception { + // test without parallelism + { + String[] parameters = {"-v", getTestJarPath()}; + RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(1, true, false); + assertEquals(0, testFrontend.run(parameters)); + } + + // test configure parallelism + { + String[] parameters = {"-v", "-p", "42", getTestJarPath()}; + RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(42, true, false); + assertEquals(0, testFrontend.run(parameters)); + } + + // test configure sysout logging + { + String[] parameters = {"-p", "2", "-q", getTestJarPath()}; + RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(2, false, false); + assertEquals(0, testFrontend.run(parameters)); + } + + // test detached mode + { + String[] parameters = {"-p", "2", "-d", getTestJarPath()}; + RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(2, true, true); + assertEquals(0, testFrontend.run(parameters)); + } + + // test configure savepoint path (no ignore flag) + { + String[] parameters = {"-s", "expectedSavepointPath", getTestJarPath()}; + RunOptions options = CliFrontendParser.parseRunCommand(parameters); + SavepointRestoreSettings savepointSettings = options.getSavepointRestoreSettings(); + assertTrue(savepointSettings.restoreSavepoint()); + assertEquals("expectedSavepointPath", savepointSettings.getRestorePath()); + assertFalse(savepointSettings.allowNonRestoredState()); + } + + // test configure savepoint path (with ignore flag) + { + String[] parameters = {"-s", "expectedSavepointPath", "-n", getTestJarPath()}; + RunOptions options = CliFrontendParser.parseRunCommand(parameters); + SavepointRestoreSettings savepointSettings = options.getSavepointRestoreSettings(); + assertTrue(savepointSettings.restoreSavepoint()); + assertEquals("expectedSavepointPath", savepointSettings.getRestorePath()); + assertTrue(savepointSettings.allowNonRestoredState()); + } + + // test jar arguments + { + String[] parameters = + {"-m", "localhost:6123", getTestJarPath(), "-arg1", "value1", "justavalue", "--arg2", "value2"}; + RunOptions options = CliFrontendParser.parseRunCommand(parameters); + assertEquals("-arg1", options.getProgramArgs()[0]); + assertEquals("value1", options.getProgramArgs()[1]); + assertEquals("justavalue", options.getProgramArgs()[2]); + assertEquals("--arg2", options.getProgramArgs()[3]); + assertEquals("value2", options.getProgramArgs()[4]); + } + + // test flip6 switch + { + String[] parameters = + {"-flip6", getTestJarPath()}; + RunOptions options = CliFrontendParser.parseRunCommand(parameters); + assertTrue(options.getCommandLine().hasOption(Flip6DefaultCLI.FLIP_6.getOpt())); + } + } + + @Test(expected = CliArgsException.class) + public void testUnrecognizedOption() throws Exception { + // test unrecognized option + String[] parameters = {"-v", "-l", "-a", "some", "program", "arguments"}; + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + testFrontend.run(parameters); + + fail("Should have failed."); + } + + @Test(expected = CliArgsException.class) + public void testInvalidParallelismOption() throws Exception { + // test configure parallelism with non integer value + String[] parameters = {"-v", "-p", "text", getTestJarPath()}; + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + testFrontend.run(parameters); + + fail("Should have failed."); + } + + @Test(expected = CliArgsException.class) + public void testParallelismWithOverflow() throws Exception { + // test configure parallelism with overflow integer value + String[] parameters = {"-v", "-p", "475871387138", getTestJarPath()}; + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + testFrontend.run(parameters); + + fail("Should have failed."); + } + + // -------------------------------------------------------------------------------------------- + + private static final class RunTestingCliFrontend extends CliFrontend { + + private final int expectedParallelism; + private final boolean sysoutLogging; + private final boolean isDetached; + + public RunTestingCliFrontend(int expectedParallelism, boolean logging, boolean isDetached) throws Exception { + super(CliFrontendTestUtils.getConfigDir()); + this.expectedParallelism = expectedParallelism; + this.sysoutLogging = logging; + this.isDetached = isDetached; + } + + @Override + protected int executeProgram(PackagedProgram program, ClusterClient client, int parallelism) { + assertEquals(isDetached, client.isDetached()); + assertEquals(sysoutLogging, client.getPrintStatusDuringExecution()); + assertEquals(expectedParallelism, parallelism); + return 0; + } + } +} diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java similarity index 98% rename from flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java rename to flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java index 8a81e1b066dab4..d261410dbf11e9 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java @@ -16,13 +16,13 @@ * limitations under the License. */ -package org.apache.flink.client; +package org.apache.flink.client.cli; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.client.cli.util.MockedCliFrontend; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.StandaloneClusterClient; -import org.apache.flink.client.util.MockedCliFrontend; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; @@ -45,6 +45,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopTest.java similarity index 66% rename from flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java rename to flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopTest.java index d10b31ca084840..2a90155efa0d99 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopTest.java @@ -16,22 +16,20 @@ * limitations under the License. */ -package org.apache.flink.client; +package org.apache.flink.client.cli; import org.apache.flink.api.common.JobID; -import org.apache.flink.client.cli.CliFrontendParser; -import org.apache.flink.client.cli.Flip6DefaultCLI; -import org.apache.flink.client.cli.StopOptions; -import org.apache.flink.client.util.MockedCliFrontend; +import org.apache.flink.client.cli.util.MockedCliFrontend; import org.apache.flink.util.TestLogger; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; -import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull; +import static org.apache.flink.client.cli.CliFrontendTestUtils.pipeSystemOutToNull; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.times; import static org.powermock.api.mockito.PowerMockito.doThrow; @@ -48,22 +46,6 @@ public static void setup() { @Test public void testStop() throws Exception { - // test unrecognized option - { - String[] parameters = { "-v", "-l" }; - CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); - int retCode = testFrontend.stop(parameters); - assertTrue(retCode != 0); - } - - // test missing job id - { - String[] parameters = {}; - CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); - int retCode = testFrontend.stop(parameters); - assertTrue(retCode != 0); - } - // test stop properly { JobID jid = new JobID(); @@ -78,18 +60,6 @@ public void testStop() throws Exception { Mockito.verify(testFrontend.client, times(1)).stop(any(JobID.class)); } - // test unknown job Id - { - JobID jid = new JobID(); - - String[] parameters = { jid.toString() }; - StopTestCliFrontend testFrontend = new StopTestCliFrontend(true); - - assertTrue(testFrontend.stop(parameters) != 0); - - Mockito.verify(testFrontend.client, times(1)).stop(any(JobID.class)); - } - // test flip6 switch { String[] parameters = @@ -99,6 +69,44 @@ public void testStop() throws Exception { } } + @Test(expected = CliArgsException.class) + public void testUnrecognizedOption() throws Exception { + // test unrecognized option + String[] parameters = { "-v", "-l" }; + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + testFrontend.stop(parameters); + + fail("Should have failed."); + } + + @Test(expected = CliArgsException.class) + public void testMissingJobId() throws Exception { + // test missing job id + String[] parameters = {}; + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + testFrontend.stop(parameters); + + fail("Should have failed."); + } + + @Test + public void testUnknownJobId() throws Exception { + // test unknown job Id + JobID jid = new JobID(); + + String[] parameters = { jid.toString() }; + StopTestCliFrontend testFrontend = new StopTestCliFrontend(true); + + try { + testFrontend.stop(parameters); + fail("Should have failed."); + } catch (IllegalArgumentException ignored) { + // expected + } + + Mockito.verify(testFrontend.client, times(1)).stop(any(JobID.class)); + } + private static final class StopTestCliFrontend extends MockedCliFrontend { StopTestCliFrontend(boolean reject) throws Exception { diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestUtils.java similarity index 98% rename from flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java rename to flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestUtils.java index 8df39e01ad9869..16737dd125ffd6 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestUtils.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.client; +package org.apache.flink.client.cli; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; diff --git a/flink-clients/src/test/java/org/apache/flink/client/util/MockedCliFrontend.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java similarity index 93% rename from flink-clients/src/test/java/org/apache/flink/client/util/MockedCliFrontend.java rename to flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java index 663746b6bdc570..6d2bcca8770958 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/util/MockedCliFrontend.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java @@ -16,10 +16,10 @@ * limitations under the License. */ -package org.apache.flink.client.util; +package org.apache.flink.client.cli.util; -import org.apache.flink.client.CliFrontend; -import org.apache.flink.client.CliFrontendTestUtils; +import org.apache.flink.client.cli.CliFrontend; +import org.apache.flink.client.cli.CliFrontendTestUtils; import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java index e68d1dcbe8d1e3..e137adae3d3272 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java @@ -18,7 +18,7 @@ package org.apache.flink.client.program; -import org.apache.flink.client.CliFrontendTestUtils; +import org.apache.flink.client.cli.CliFrontendTestUtils; import org.junit.Assert; import org.junit.Test; diff --git a/flink-dist/src/main/flink-bin/bin/flink b/flink-dist/src/main/flink-bin/bin/flink index c9d03c8798d4b7..d38217deb9fad7 100644 --- a/flink-dist/src/main/flink-bin/bin/flink +++ b/flink-dist/src/main/flink-bin/bin/flink @@ -52,4 +52,4 @@ export FLINK_ROOT_DIR export FLINK_CONF_DIR # Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems -exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.CliFrontend "$@" +exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@" diff --git a/flink-dist/src/main/flink-bin/bin/flink.bat b/flink-dist/src/main/flink-bin/bin/flink.bat index 279acd2d905026..845d791afec082 100644 --- a/flink-dist/src/main/flink-bin/bin/flink.bat +++ b/flink-dist/src/main/flink-bin/bin/flink.bat @@ -27,6 +27,6 @@ SET JVM_ARGS=-Xmx512m SET FLINK_JM_CLASSPATH=%FLINK_LIB_DIR%\* -java %JVM_ARGS% -cp "%FLINK_JM_CLASSPATH%"; org.apache.flink.client.CliFrontend %* +java %JVM_ARGS% -cp "%FLINK_JM_CLASSPATH%"; org.apache.flink.client.cli.CliFrontend %* endlocal diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala index 60eaccccc5ba74..43149d96123fd2 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala @@ -22,9 +22,8 @@ import java.io._ import java.util.Collections import org.apache.commons.cli.CommandLine -import org.apache.flink.client.cli.CliFrontendParser +import org.apache.flink.client.cli.{CliFrontend, CliFrontendParser} import org.apache.flink.client.program.ClusterClient -import org.apache.flink.client.CliFrontend import org.apache.flink.runtime.minicluster.StandaloneMiniCluster import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, JobManagerOptions} import org.apache.flink.runtime.minicluster.{FlinkMiniCluster, LocalFlinkMiniCluster} diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java index 56087a18404a53..a255453ed63cb8 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java @@ -18,7 +18,7 @@ package org.apache.flink.yarn; -import org.apache.flink.client.CliFrontend; +import org.apache.flink.client.cli.CliFrontend; import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CommandLineOptions; import org.apache.flink.client.cli.CustomCommandLine; @@ -317,8 +317,8 @@ private static class TestCLI extends CliFrontend { @Override // make method public - public ClusterClient createClient(CommandLineOptions options, PackagedProgram program) throws Exception { - return super.createClient(options, program); + public ClusterClient createClient(CustomCommandLine customCommandLine, CommandLine commandLine, PackagedProgram program) throws Exception { + return super.createClient(customCommandLine, commandLine, program); } @Override diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java index 3fe8d2f75594ab..8541c401eafb7f 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java @@ -18,7 +18,7 @@ package org.apache.flink.yarn; -import org.apache.flink.client.CliFrontend; +import org.apache.flink.client.cli.CliFrontend; import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.RunOptions; import org.apache.flink.client.deployment.ClusterSpecification; diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index 99df3a49e96899..b40d95aeb0f230 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -18,8 +18,8 @@ package org.apache.flink.yarn; -import org.apache.flink.client.CliFrontend; -import org.apache.flink.client.cli.CommandLineOptions; +import org.apache.flink.client.cli.CliFrontend; +import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.configuration.ConfigConstants; @@ -31,6 +31,7 @@ import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import akka.actor.Identify; +import org.apache.commons.cli.CommandLine; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -807,9 +808,9 @@ private static class TestingCLI extends CliFrontend { public TestingCLI() throws Exception {} @Override - protected ClusterClient createClient(CommandLineOptions options, PackagedProgram program) throws Exception { + protected ClusterClient createClient(CustomCommandLine customCommandLine, CommandLine commandLine, PackagedProgram program) throws Exception { // mock the returned ClusterClient to disable shutdown and verify shutdown behavior later on - originalClusterClient = super.createClient(options, program); + originalClusterClient = super.createClient(customCommandLine, commandLine, program); spiedClusterClient = Mockito.spy(originalClusterClient); Mockito.doNothing().when(spiedClusterClient).shutdown(); return spiedClusterClient; diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 6d88c93448f5e3..2a7d532c99198d 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -18,7 +18,7 @@ package org.apache.flink.yarn.cli; -import org.apache.flink.client.CliFrontend; +import org.apache.flink.client.cli.CliFrontend; import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.client.deployment.ClusterSpecification;