diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 8b445e69c3091..236ee9435dff2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -39,6 +39,8 @@
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.ProgramMissingJobException;
+import org.apache.flink.client.program.ProgramParametrizationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -826,6 +828,10 @@ protected int executeProgram(PackagedProgram program, ClusterClient client, int
JobSubmissionResult result;
try {
result = client.run(program, parallelism);
+ } catch (ProgramParametrizationException e) {
+ return handleParametrizationException(e);
+ } catch (ProgramMissingJobException e) {
+ return handleMissingJobException();
} catch (ProgramInvocationException e) {
return handleError(e);
} finally {
@@ -974,6 +980,29 @@ private int handleArgException(Exception e) {
return 1;
}
+ /**
+ * Displays an optional exception message for incorrect program parametrization.
+ *
+ * @param e The exception to display.
+ * @return The return code for the process.
+ */
+ private int handleParametrizationException(ProgramParametrizationException e) {
+ System.err.println(e.getMessage());
+ return 1;
+ }
+
+ /**
+ * Displays a message for a program without a job to execute.
+ *
+ * @return The return code for the process.
+ */
+ private int handleMissingJobException() {
+ System.err.println();
+ System.err.println("The program didn't contain a Flink job. " +
+ "Perhaps you forgot to call execute() on the execution environment.");
+ return 1;
+ }
+
/**
* Displays an exception message.
*
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 25bcadc918eec..ff5701f6a5323 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -18,21 +18,15 @@
package org.apache.flink.client.program;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
+import akka.actor.ActorSystem;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
@@ -42,8 +36,6 @@
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
@@ -53,10 +45,10 @@
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
-import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.net.ConnectionUtils;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -64,13 +56,20 @@
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.Some;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
-import akka.actor.ActorSystem;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
/**
@@ -301,10 +300,11 @@ public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int par
* @param prog the packaged program
* @param parallelism the parallelism to execute the contained Flink job
* @return The result of the execution
+ * @throws ProgramMissingJobException
* @throws ProgramInvocationException
*/
public JobSubmissionResult run(PackagedProgram prog, int parallelism)
- throws ProgramInvocationException
+ throws ProgramInvocationException, ProgramMissingJobException
{
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
if (prog.isUsingProgramEntryPoint()) {
@@ -321,8 +321,7 @@ else if (prog.isUsingInteractiveMode()) {
// invoke main method
prog.invokeInteractiveModeForExecution();
if (lastJobExecutionResult == null && factory.getLastEnvCreated() == null) {
- throw new ProgramInvocationException("The program didn't contain Flink jobs. " +
- "Perhaps you forgot to call execute() on the execution environment.");
+ throw new ProgramMissingJobException();
}
if (isDetached()) {
// in detached mode, we execute the whole user code to extract the Flink job, afterwards we run it here
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 2a8804352a992..aca873e731207 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -19,6 +19,14 @@
package org.apache.flink.client.program;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.Program;
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.dag.DataSinkNode;
+import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.util.InstantiationUtil;
+
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileOutputStream;
@@ -43,14 +51,6 @@
import java.util.jar.JarFile;
import java.util.jar.Manifest;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.dag.DataSinkNode;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.util.InstantiationUtil;
-
/**
* This class encapsulates represents a program, packaged in a jar file. It supplies
* functionality to extract nested libraries, search for the program entry point, and extract
@@ -518,6 +518,8 @@ private static void callMainMethod(Class> entryClass, String[] args) throws Pr
Throwable exceptionInMethod = e.getTargetException();
if (exceptionInMethod instanceof Error) {
throw (Error) exceptionInMethod;
+ } else if (exceptionInMethod instanceof ProgramParametrizationException) {
+ throw (ProgramParametrizationException) exceptionInMethod;
} else if (exceptionInMethod instanceof ProgramInvocationException) {
throw (ProgramInvocationException) exceptionInMethod;
} else {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ProgramMissingJobException.java b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramMissingJobException.java
new file mode 100644
index 0000000000000..43d608b43aa2f
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramMissingJobException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+/**
+ * Exception used to indicate that no job was executed during the invocation of a Flink program.
+ */
+public class ProgramMissingJobException extends Exception {
+ /**
+ * Serial version UID for serialization interoperability.
+ */
+ private static final long serialVersionUID = -1964276369605091101L;
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ProgramParametrizationException.java b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramParametrizationException.java
new file mode 100644
index 0000000000000..9b5ac82796d2c
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramParametrizationException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Exception used to indicate that there is an error in the parametrization of a Flink program.
+ */
+public class ProgramParametrizationException extends RuntimeException {
+ /**
+ * Serial version UID for serialization interoperability.
+ */
+ private static final long serialVersionUID = 909054589029890262L;
+
+ /**
+ * Creates a ProgramParametrizationException with the given message.
+ *
+ * @param message
+ * The program usage string.
+ */
+ public ProgramParametrizationException(String message) {
+ super(Preconditions.checkNotNull(message));
+ }
+}
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
index 4cfbc717eaead..2845e2ded4769 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
@@ -19,6 +19,7 @@
package org.apache.flink.graph.examples;
import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.text.StrBuilder;
import org.apache.commons.lang3.text.WordUtils;
import org.apache.commons.math3.random.JDKRandomGenerator;
import org.apache.flink.api.common.JobExecutionResult;
@@ -27,11 +28,12 @@
import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphCsvReader;
import org.apache.flink.graph.asm.simple.undirected.Simplify;
-import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
import org.apache.flink.graph.generator.RMatGraph;
import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
import org.apache.flink.graph.generator.random.RandomGenerableFactory;
@@ -62,24 +64,29 @@ public class JaccardIndex {
public static final boolean DEFAULT_CLIP_AND_FLIP = true;
- private static void printUsage() {
- System.out.println(WordUtils.wrap("The Jaccard Index measures the similarity between vertex" +
- " neighborhoods and is computed as the number of shared neighbors divided by the number of" +
- " distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all neighbors are" +
- " shared).", 80));
- System.out.println();
- System.out.println(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" +
- " number of shared neighbors, and the number of distinct neighbors.", 80));
- System.out.println();
- System.out.println("usage: JaccardIndex --input --output ");
- System.out.println();
- System.out.println("options:");
- System.out.println(" --input csv --type --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
- System.out.println(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]");
- System.out.println();
- System.out.println(" --output print");
- System.out.println(" --output hash");
- System.out.println(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]");
+ private static String getUsage(String message) {
+ return new StrBuilder()
+ .appendNewLine()
+ .appendln(WordUtils.wrap("The Jaccard Index measures the similarity between vertex" +
+ " neighborhoods and is computed as the number of shared neighbors divided by the number of" +
+ " distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all neighbors are" +
+ " shared).", 80))
+ .appendNewLine()
+ .appendln(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" +
+ " number of shared neighbors, and the number of distinct neighbors.", 80))
+ .appendNewLine()
+ .appendln("usage: JaccardIndex --input --output ")
+ .appendNewLine()
+ .appendln("options:")
+ .appendln(" --input csv --type --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
+ .appendln(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
+ .appendNewLine()
+ .appendln(" --output print")
+ .appendln(" --output hash")
+ .appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
+ .appendNewLine()
+ .appendln("Usage error: " + message)
+ .toString();
}
public static void main(String[] args) throws Exception {
@@ -123,8 +130,7 @@ public static void main(String[] args) throws Exception {
} break;
default:
- printUsage();
- return;
+ throw new ProgramParametrizationException(getUsage("invalid CSV type"));
}
} break;
@@ -161,8 +167,7 @@ public static void main(String[] args) throws Exception {
} break;
default:
- printUsage();
- return;
+ throw new ProgramParametrizationException(getUsage("invalid input type"));
}
switch (parameters.get("output", "")) {
@@ -192,8 +197,7 @@ public static void main(String[] args) throws Exception {
break;
default:
- printUsage();
- return;
+ throw new ProgramParametrizationException(getUsage("invalid output type"));
}
JobExecutionResult result = env.getLastJobExecutionResult();