diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index d293256242367..7c9a92dfed45a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -511,32 +511,30 @@ private void stopDispatcherServices() throws Exception { @Override public CompletableFuture submitJob(JobGraph jobGraph, Time timeout) { - log.info( - "Received JobGraph submission '{}' ({}).", jobGraph.getName(), jobGraph.getJobID()); + final JobID jobID = jobGraph.getJobID(); + log.info("Received JobGraph submission '{}' ({}).", jobGraph.getName(), jobID); try { - if (isDuplicateJob(jobGraph.getJobID())) { - if (isInGloballyTerminalState(jobGraph.getJobID())) { - log.warn( - "Ignoring JobGraph submission '{}' ({}) because the job already reached a globally-terminal state (i.e. {}) in a previous execution.", - jobGraph.getName(), - jobGraph.getJobID(), - Arrays.stream(JobStatus.values()) - .filter(JobStatus::isGloballyTerminalState) - .map(JobStatus::name) - .collect(Collectors.joining(", "))); - } - - final DuplicateJobSubmissionException exception = - isInGloballyTerminalState(jobGraph.getJobID()) - ? DuplicateJobSubmissionException.ofGloballyTerminated( - jobGraph.getJobID()) - : DuplicateJobSubmissionException.of(jobGraph.getJobID()); - return FutureUtils.completedExceptionally(exception); + if (isInGloballyTerminalState(jobID)) { + log.warn( + "Ignoring JobGraph submission '{}' ({}) because the job already reached a globally-terminal state (i.e. {}) in a previous execution.", + jobGraph.getName(), + jobID, + Arrays.stream(JobStatus.values()) + .filter(JobStatus::isGloballyTerminalState) + .map(JobStatus::name) + .collect(Collectors.joining(", "))); + return FutureUtils.completedExceptionally( + DuplicateJobSubmissionException.ofGloballyTerminated(jobID)); + } else if (jobManagerRunnerRegistry.isRegistered(jobID) + || submittedAndWaitingTerminationJobIDs.contains(jobID)) { + // job with the given jobID is not terminated, yet + return FutureUtils.completedExceptionally( + DuplicateJobSubmissionException.of(jobID)); } else if (isPartialResourceConfigured(jobGraph)) { return FutureUtils.completedExceptionally( new JobSubmissionException( - jobGraph.getJobID(), + jobID, "Currently jobs is not supported if parts of the vertices have " + "resources configured. The limitation will be removed in future versions.")); } else { @@ -563,19 +561,6 @@ public CompletableFuture submitFailedJob( return archiveExecutionGraphToHistoryServer(executionGraphInfo); } - /** - * Checks whether the given job has already been submitted, executed, or awaiting termination. - * - * @param jobId identifying the submitted job - * @return true if the job has already been submitted (is running) or has been executed - * @throws FlinkException if the job scheduling status cannot be retrieved - */ - private boolean isDuplicateJob(JobID jobId) throws FlinkException { - return isInGloballyTerminalState(jobId) - || jobManagerRunnerRegistry.isRegistered(jobId) - || submittedAndWaitingTerminationJobIDs.contains(jobId); - } - /** * Checks whether the given job has already been executed. *