Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.ProgramMissingJobException;
import org.apache.flink.client.program.ProgramParametrizationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
Expand Down Expand Up @@ -826,6 +828,10 @@ protected int executeProgram(PackagedProgram program, ClusterClient client, int
JobSubmissionResult result;
try {
result = client.run(program, parallelism);
} catch (ProgramParametrizationException e) {
return handleParametrizationException(e);
} catch (ProgramMissingJobException e) {
return handleMissingJobException();
} catch (ProgramInvocationException e) {
return handleError(e);
} finally {
Expand Down Expand Up @@ -974,6 +980,29 @@ private int handleArgException(Exception e) {
return 1;
}

/**
* Displays an optional exception message for incorrect program parametrization.
*
* @param e The exception to display.
* @return The return code for the process.
*/
private int handleParametrizationException(ProgramParametrizationException e) {
System.err.println(e.getMessage());
return 1;
}

/**
* Displays a message for a program without a job to execute.
*
* @return The return code for the process.
*/
private int handleMissingJobException() {
System.err.println();
System.err.println("The program didn't contain a Flink job. " +
"Perhaps you forgot to call execute() on the execution environment.");
return 1;
}

/**
* Displays an exception message.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,15 @@

package org.apache.flink.client.program;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import akka.actor.ActorSystem;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
Expand All @@ -42,8 +36,6 @@
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
Expand All @@ -53,24 +45,31 @@
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.net.ConnectionUtils;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import scala.Some;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import akka.actor.ActorSystem;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Map;


/**
Expand Down Expand Up @@ -301,10 +300,11 @@ public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int par
* @param prog the packaged program
* @param parallelism the parallelism to execute the contained Flink job
* @return The result of the execution
* @throws ProgramMissingJobException
* @throws ProgramInvocationException
*/
public JobSubmissionResult run(PackagedProgram prog, int parallelism)
throws ProgramInvocationException
throws ProgramInvocationException, ProgramMissingJobException
{
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
if (prog.isUsingProgramEntryPoint()) {
Expand All @@ -321,8 +321,7 @@ else if (prog.isUsingInteractiveMode()) {
// invoke main method
prog.invokeInteractiveModeForExecution();
if (lastJobExecutionResult == null && factory.getLastEnvCreated() == null) {
throw new ProgramInvocationException("The program didn't contain Flink jobs. " +
"Perhaps you forgot to call execute() on the execution environment.");
throw new ProgramMissingJobException();
}
if (isDetached()) {
// in detached mode, we execute the whole user code to extract the Flink job, afterwards we run it here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@

package org.apache.flink.client.program;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.Program;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.dag.DataSinkNode;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.util.InstantiationUtil;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileOutputStream;
Expand All @@ -43,14 +51,6 @@
import java.util.jar.JarFile;
import java.util.jar.Manifest;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.Program;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.dag.DataSinkNode;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.util.InstantiationUtil;

/**
* This class encapsulates represents a program, packaged in a jar file. It supplies
* functionality to extract nested libraries, search for the program entry point, and extract
Expand Down Expand Up @@ -518,6 +518,8 @@ private static void callMainMethod(Class<?> entryClass, String[] args) throws Pr
Throwable exceptionInMethod = e.getTargetException();
if (exceptionInMethod instanceof Error) {
throw (Error) exceptionInMethod;
} else if (exceptionInMethod instanceof ProgramParametrizationException) {
throw (ProgramParametrizationException) exceptionInMethod;
} else if (exceptionInMethod instanceof ProgramInvocationException) {
throw (ProgramInvocationException) exceptionInMethod;
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.program;

/**
* Exception used to indicate that no job was executed during the invocation of a Flink program.
*/
public class ProgramMissingJobException extends Exception {
/**
* Serial version UID for serialization interoperability.
*/
private static final long serialVersionUID = -1964276369605091101L;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.program;

import org.apache.flink.util.Preconditions;

/**
* Exception used to indicate that there is an error in the parametrization of a Flink program.
*/
public class ProgramParametrizationException extends RuntimeException {
/**
* Serial version UID for serialization interoperability.
*/
private static final long serialVersionUID = 909054589029890262L;

/**
* Creates a <tt>ProgramParametrizationException</tt> with the given message.
*
* @param message
* The program usage string.
*/
public ProgramParametrizationException(String message) {
super(Preconditions.checkNotNull(message));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.graph.examples;

import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.text.StrBuilder;
import org.apache.commons.lang3.text.WordUtils;
import org.apache.commons.math3.random.JDKRandomGenerator;
import org.apache.flink.api.common.JobExecutionResult;
Expand All @@ -27,11 +28,12 @@
import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.client.program.ProgramParametrizationException;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphCsvReader;
import org.apache.flink.graph.asm.simple.undirected.Simplify;
import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
import org.apache.flink.graph.asm.translate.TranslateGraphIds;
import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
import org.apache.flink.graph.generator.RMatGraph;
import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
import org.apache.flink.graph.generator.random.RandomGenerableFactory;
Expand Down Expand Up @@ -62,24 +64,29 @@ public class JaccardIndex {

public static final boolean DEFAULT_CLIP_AND_FLIP = true;

private static void printUsage() {
System.out.println(WordUtils.wrap("The Jaccard Index measures the similarity between vertex" +
" neighborhoods and is computed as the number of shared neighbors divided by the number of" +
" distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all neighbors are" +
" shared).", 80));
System.out.println();
System.out.println(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" +
" number of shared neighbors, and the number of distinct neighbors.", 80));
System.out.println();
System.out.println("usage: JaccardIndex --input <csv | rmat [options]> --output <print | hash | csv [options]>");
System.out.println();
System.out.println("options:");
System.out.println(" --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
System.out.println(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]");
System.out.println();
System.out.println(" --output print");
System.out.println(" --output hash");
System.out.println(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]");
private static String getUsage(String message) {
return new StrBuilder()
.appendNewLine()
.appendln(WordUtils.wrap("The Jaccard Index measures the similarity between vertex" +
" neighborhoods and is computed as the number of shared neighbors divided by the number of" +
" distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all neighbors are" +
" shared).", 80))
.appendNewLine()
.appendln(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" +
" number of shared neighbors, and the number of distinct neighbors.", 80))
.appendNewLine()
.appendln("usage: JaccardIndex --input <csv | rmat [options]> --output <print | hash | csv [options]>")
.appendNewLine()
.appendln("options:")
.appendln(" --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
.appendln(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
.appendNewLine()
.appendln(" --output print")
.appendln(" --output hash")
.appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
.appendNewLine()
.appendln("Usage error: " + message)
.toString();
}

public static void main(String[] args) throws Exception {
Expand Down Expand Up @@ -123,8 +130,7 @@ public static void main(String[] args) throws Exception {
} break;

default:
printUsage();
return;
throw new ProgramParametrizationException(getUsage("invalid CSV type"));
}
} break;

Expand Down Expand Up @@ -161,8 +167,7 @@ public static void main(String[] args) throws Exception {
} break;

default:
printUsage();
return;
throw new ProgramParametrizationException(getUsage("invalid input type"));
}

switch (parameters.get("output", "")) {
Expand Down Expand Up @@ -192,8 +197,7 @@ public static void main(String[] args) throws Exception {
break;

default:
printUsage();
return;
throw new ProgramParametrizationException(getUsage("invalid output type"));
}

JobExecutionResult result = env.getLastJobExecutionResult();
Expand Down