Skip to content

Commit

Permalink
[FLINK-4199] fix misleading CLI messages during job submission
Browse files Browse the repository at this point in the history
- change CLI message upon cluster retrieval
- save JobExecutionResult for interactive executions
- only print Collection size in accumulator results
- remove unused helper method

This closes #2264
  • Loading branch information
mxm committed Jul 19, 2016
1 parent e85f787 commit 17589d4
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 18 deletions.
Expand Up @@ -884,7 +884,7 @@ protected ClusterClient createClient(
ClusterClient client; ClusterClient client;
try { try {
client = activeCommandLine.retrieveCluster(options.getCommandLine(), config); client = activeCommandLine.retrieveCluster(options.getCommandLine(), config);
logAndSysout("Cluster retrieved: " + client.getClusterIdentifier()); logAndSysout("Cluster configuration: " + client.getClusterIdentifier());
} catch (UnsupportedOperationException e) { } catch (UnsupportedOperationException e) {
try { try {
String applicationName = "Flink Application: " + programName; String applicationName = "Flink Application: " + programName;
Expand Down
Expand Up @@ -97,11 +97,11 @@ public abstract class ClusterClient {
private boolean printStatusDuringExecution = true; private boolean printStatusDuringExecution = true;


/** /**
* For interactive invocations, the Job ID is only available after the ContextEnvironment has * For interactive invocations, the job results are only available after the ContextEnvironment has
* been run inside the user JAR. We pass the Client to every instance of the ContextEnvironment * been run inside the user JAR. We pass the Client to every instance of the ContextEnvironment
* which lets us access the last JobID here. * which lets us access the execution result here.
*/ */
private JobID lastJobID; private JobExecutionResult lastJobExecutionResult;


/** Switch for blocking/detached job submission of the client */ /** Switch for blocking/detached job submission of the client */
private boolean detachedJobSubmission = false; private boolean detachedJobSubmission = false;
Expand Down Expand Up @@ -335,7 +335,7 @@ else if (prog.isUsingInteractiveMode()) {
} }
else { else {
// in blocking mode, we execute all Flink jobs contained in the user code and then return here // in blocking mode, we execute all Flink jobs contained in the user code and then return here
return new JobSubmissionResult(lastJobID); return this.lastJobExecutionResult;
} }
} }
finally { finally {
Expand Down Expand Up @@ -406,9 +406,9 @@ public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws


try { try {
logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job completion."); logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job completion.");
this.lastJobID = jobGraph.getJobID(); this.lastJobExecutionResult = JobClient.submitJobAndWait(actorSystemLoader.get(),
return JobClient.submitJobAndWait(actorSystemLoader.get(),
leaderRetrievalService, jobGraph, timeout, printStatusDuringExecution, classLoader); leaderRetrievalService, jobGraph, timeout, printStatusDuringExecution, classLoader);
return this.lastJobExecutionResult;
} catch (JobExecutionException e) { } catch (JobExecutionException e) {
throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e); throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
} }
Expand Down
Expand Up @@ -23,6 +23,7 @@


import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
Expand Down Expand Up @@ -114,19 +115,20 @@ public static Map<String, Object> toResultMap(Map<String, Accumulator<?, ?>> acc
public static String getResultsFormated(Map<String, Object> map) { public static String getResultsFormated(Map<String, Object> map) {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
for (Map.Entry<String, Object> entry : map.entrySet()) { for (Map.Entry<String, Object> entry : map.entrySet()) {
builder.append("- ").append(entry.getKey()).append(" (").append(entry.getValue().getClass().getName()); builder
builder.append(")").append(": ").append(entry.getValue().toString()).append("\n"); .append("- ")
} .append(entry.getKey())
return builder.toString(); .append(" (")
} .append(entry.getValue().getClass().getName())

.append(")");
public static void resetAndClearAccumulators(Map<String, Accumulator<?, ?>> accumulators) { if (entry.getValue() instanceof Collection) {
if (accumulators != null) { builder.append(" [").append(((Collection) entry.getValue()).size()).append(" elements]");
for (Map.Entry<String, Accumulator<?, ?>> entry : accumulators.entrySet()) { } else {
entry.getValue().resetLocal(); builder.append(": ").append(entry.getValue().toString());
} }
accumulators.clear(); builder.append(System.lineSeparator());
} }
return builder.toString();
} }


public static Map<String, Accumulator<?, ?>> copy(Map<String, Accumulator<?, ?>> accumulators) { public static Map<String, Accumulator<?, ?>> copy(Map<String, Accumulator<?, ?>> accumulators) {
Expand Down

0 comments on commit 17589d4

Please sign in to comment.