Skip to content

Commit

Permalink
[FLINK-1027] [cli] Added support for '--' and '-' prefixed tokens in …
Browse files Browse the repository at this point in the history
…CLI program arguments.

This closes #278
  • Loading branch information
aalexandrov authored and StephanEwen committed Jan 6, 2015
1 parent 0190dd2 commit 93eadca
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 54 deletions.
Expand Up @@ -278,7 +278,7 @@ protected int run(String[] args) {
// Parse command line options
CommandLine line;
try {
line = parser.parse(RUN_OPTIONS, args, false);
line = parser.parse(RUN_OPTIONS, args, true);
evaluateGeneralOptions(line);
}
catch (MissingOptionException e) {
Expand Down
Expand Up @@ -32,7 +32,6 @@
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
Expand All @@ -54,8 +53,8 @@ public static void init() {
@Test
public void testNonExistingJarFile() {
try {
String[] arguments = {"-j", "/some/none/existing/path", "-a", "plenty", "of", "arguments"};
CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), arguments, false);
String[] arguments = {"-j", "/some/none/existing/path", "-a", "--verbose", "true", "arg1", "arg2"};
CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), arguments, true);

CliFrontend frontend = new CliFrontend();
Object result = frontend.buildProgram(line);
Expand All @@ -71,8 +70,8 @@ public void testNonExistingJarFile() {
@Test
public void testFileNotJarFile() {
try {
String[] arguments = {"-j", getNonJarFilePath(), "-a", "plenty", "of", "arguments"};
CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), arguments, false);
String[] arguments = {"-j", getNonJarFilePath(), "-a", "--verbose", "true", "arg1", "arg2"};
CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), arguments, true);

CliFrontend frontend = new CliFrontend();
Object result = frontend.buildProgram(line);
Expand All @@ -88,16 +87,16 @@ public void testFileNotJarFile() {
@Test
public void testVariantWithExplicitJarAndArgumentsOption() {
try {
String[] parameters = {"-j", getTestJarPath(), "-a", "some", "program", "arguments"};
CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), parameters, false);
String[] arguments = {"-j", getTestJarPath(), "-a", "--verbose", "true", "arg1", "arg2"};
CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), arguments, true);

CliFrontend frontend = new CliFrontend();
Object result = frontend.buildProgram(line);
assertTrue(result instanceof PackagedProgram);

PackagedProgram prog = (PackagedProgram) result;
Assert.assertArrayEquals(new String[] {"some", "program", "arguments"}, prog.getArguments());

Assert.assertArrayEquals(new String[] {"--verbose", "true", "arg1", "arg2"}, prog.getArguments());
Assert.assertEquals(TEST_JAR_MAIN_CLASS, prog.getMainClassName());
}
catch (Exception e) {
Expand All @@ -110,16 +109,16 @@ public void testVariantWithExplicitJarAndArgumentsOption() {
@Test
public void testVariantWithExplicitJarAndNoArgumentsOption() {
try {
String[] parameters = {"-j", getTestJarPath(), "some", "program", "arguments"};
CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), parameters, false);
String[] arguments = {"-j", getTestJarPath(), "--verbose", "true", "arg1", "arg2"};
CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), arguments, true);

CliFrontend frontend = new CliFrontend();
Object result = frontend.buildProgram(line);
assertTrue(result instanceof PackagedProgram);

PackagedProgram prog = (PackagedProgram) result;
Assert.assertArrayEquals(new String[] {"some", "program", "arguments"}, prog.getArguments());

Assert.assertArrayEquals(new String[] {"--verbose", "true", "arg1", "arg2"}, prog.getArguments());
Assert.assertEquals(TEST_JAR_MAIN_CLASS, prog.getMainClassName());
}
catch (Exception e) {
Expand All @@ -132,16 +131,16 @@ public void testVariantWithExplicitJarAndNoArgumentsOption() {
@Test
public void testValidVariantWithNoJarAndNoArgumentsOption() {
try {
String[] parameters = {getTestJarPath(), "some", "program", "arguments"};
CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), parameters, false);
String[] arguments = {getTestJarPath(), "--verbose", "true", "arg1", "arg2"};
CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), arguments, true);

CliFrontend frontend = new CliFrontend();
Object result = frontend.buildProgram(line);
assertTrue(result instanceof PackagedProgram);

PackagedProgram prog = (PackagedProgram) result;
Assert.assertArrayEquals(new String[] {"some", "program", "arguments"}, prog.getArguments());

Assert.assertArrayEquals(new String[] {"--verbose", "true", "arg1", "arg2"}, prog.getArguments());
Assert.assertEquals(TEST_JAR_MAIN_CLASS, prog.getMainClassName());
}
catch (Exception e) {
Expand All @@ -154,8 +153,8 @@ public void testValidVariantWithNoJarAndNoArgumentsOption() {
@Test
public void testNoJarNoArgumentsAtAll() {
try {
String[] parameters = {};
CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), parameters, false);
String[] arguments = {};
CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), arguments, true);

CliFrontend frontend = new CliFrontend();
Object result = frontend.buildProgram(line);
Expand All @@ -171,8 +170,8 @@ public void testNoJarNoArgumentsAtAll() {
@Test
public void testNonExistingFileWithArguments() {
try {
String[] parameters = {"/some/none/existing/path", "some", "program", "arguments"};
CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), parameters, false);
String[] arguments = {"/some/none/existing/path", "--verbose", "true", "arg1", "arg2"};
CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), arguments, true);

CliFrontend frontend = new CliFrontend();
Object result = frontend.buildProgram(line);
Expand All @@ -188,8 +187,8 @@ public void testNonExistingFileWithArguments() {
@Test
public void testNonExistingFileWithoutArguments() {
try {
String[] parameters = {"/some/none/existing/path"};
CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), parameters, false);
String[] arguments = {"/some/none/existing/path"};
CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), arguments, true);

CliFrontend frontend = new CliFrontend();
Object result = frontend.buildProgram(line);
Expand All @@ -204,43 +203,48 @@ public void testNonExistingFileWithoutArguments() {

/**
* Ensure that we will never have the following error.
*
* The test works as follows:
* - Use the CliFrontend to invoke a jar file that loads a class which is only available
* in the jarfile itself (via a custom classloader)
* - Change the Usercode classloader of the PackagedProgram to a special classloader for this test
* - the classloader will accept the special class (and return a String.class)
*
*
* <pre>
* org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:398)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:301)
at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:140)
at org.apache.flink.client.program.Client.getOptimizedPlanAsJson(Client.java:125)
at org.apache.flink.client.CliFrontend.info(CliFrontend.java:439)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:931)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:951)
Caused by: java.io.IOException: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io.RCFileInputFormat
at org.apache.hcatalog.mapreduce.HCatInputFormat.setInput(HCatInputFormat.java:102)
at org.apache.hcatalog.mapreduce.HCatInputFormat.setInput(HCatInputFormat.java:54)
at tlabs.CDR_In_Report.createHCatInputFormat(CDR_In_Report.java:322)
at tlabs.CDR_Out_Report.main(CDR_Out_Report.java:380)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:622)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:383)
* at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:398)
* at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:301)
* at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:140)
* at org.apache.flink.client.program.Client.getOptimizedPlanAsJson(Client.java:125)
* at org.apache.flink.client.CliFrontend.info(CliFrontend.java:439)
* at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:931)
* at org.apache.flink.client.CliFrontend.main(CliFrontend.java:951)
* Caused by: java.io.IOException: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io.RCFileInputFormat
* at org.apache.hcatalog.mapreduce.HCatInputFormat.setInput(HCatInputFormat.java:102)
* at org.apache.hcatalog.mapreduce.HCatInputFormat.setInput(HCatInputFormat.java:54)
* at tlabs.CDR_In_Report.createHCatInputFormat(CDR_In_Report.java:322)
* at tlabs.CDR_Out_Report.main(CDR_Out_Report.java:380)
* at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
* at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
* at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
* at java.lang.reflect.Method.invoke(Method.java:622)
* at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:383)
* </pre>
*
* The test works as follows:
*
* <ul>
* <li> Use the CliFrontend to invoke a jar file that loads a class which is only available
* in the jarfile itself (via a custom classloader)
* <li> Change the Usercode classloader of the PackagedProgram to a special classloader for this test
* <li> the classloader will accept the special class (and return a String.class)
* </ul>
*/
@Test
public void testPlanWithExternalClass() throws CompilerException, ProgramInvocationException {
final boolean[] callme = { false }; // create a final object reference, to be able to change its val later
try {
String[] parameters = {getTestJarPath(), "-c", TEST_JAR_CLASSLOADERTEST_CLASS , "some", "program"};
CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), parameters, false);
String[] arguments = {"-j", getTestJarPath(), "-c", TEST_JAR_CLASSLOADERTEST_CLASS , "--verbose", "true", "arg1", "arg2"};
CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), arguments, true);

CliFrontend frontend = new CliFrontend();
Object result = frontend.buildProgram(line);
assertTrue(result instanceof PackagedProgram);

PackagedProgram prog = spy((PackagedProgram) result);

ClassLoader testClassLoader = new ClassLoader(prog.getUserCodeClassLoader()) {
Expand All @@ -256,7 +260,7 @@ public Class<?> loadClass(String name) throws ClassNotFoundException {
};
when(prog.getUserCodeClassLoader()).thenReturn(testClassLoader);

Assert.assertArrayEquals(new String[]{"some", "program"}, prog.getArguments());
Assert.assertArrayEquals(new String[]{"--verbose", "true", "arg1", "arg2"}, prog.getArguments());
Assert.assertEquals(TEST_JAR_CLASSLOADERTEST_CLASS, prog.getMainClassName());

Configuration c = new Configuration();
Expand All @@ -275,9 +279,9 @@ public Class<?> loadClass(String name) throws ClassNotFoundException {
}
}
catch (Exception e) {
assertTrue("Classloader was not called", callme[0]);
System.err.println(e.getMessage());
e.printStackTrace();
assertTrue("Classloader was not called", callme[0]);
fail("Program caused an exception: " + e.getMessage());
}

Expand Down
Expand Up @@ -98,6 +98,7 @@ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// *************************************************************************

private static boolean fileOutput = false;
private static boolean verbose = false;
private static String textPath;
private static String outputPath;

Expand All @@ -106,9 +107,13 @@ private static boolean parseParameters(String[] args) {
if(args.length > 0) {
// parse input arguments
fileOutput = true;
if(args.length == 2) {
if(args.length == 2) { // cli line: program {textPath} {outputPath}
textPath = args[0];
outputPath = args[1];
} else if(args.length == 4 && (args[0].startsWith("-v") || args[0].startsWith("--verbose"))) { // cli line: program {optArg} {optVal} {textPath} {outputPath}
verbose = Boolean.valueOf(args[1]);
textPath = args[2];
outputPath = args[3];
} else {
System.err.println("Usage: WordCount <text path> <result path>");
return false;
Expand Down

0 comments on commit 93eadca

Please sign in to comment.