Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-8176][flip6] Start SubmittedJobGraphStore in Dispatcher #5107

Closed
wants to merge 10 commits into from
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.api.common.JobID;

/**
* This exception denotes an error while submitting a job to the JobManager
* This exception denotes an error while submitting a job to the JobManager.
*/
public class JobSubmissionException extends JobExecutionException {

Expand Down
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.dispatcher;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
Expand Down Expand Up @@ -63,6 +64,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand All @@ -73,7 +75,8 @@
* the jobs and to recover them in case of a master failure. Furthermore, it knows
* about the state of the Flink session cluster.
*/
public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> implements DispatcherGateway, LeaderContender {
public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> implements
DispatcherGateway, LeaderContender, SubmittedJobGraphStore.SubmittedJobGraphListener {

public static final String DISPATCHER_NAME = "dispatcher";

Expand Down Expand Up @@ -173,6 +176,7 @@ public void postStop() throws Exception {
public void start() throws Exception {
super.start();

submittedJobGraphStore.start(this);
leaderElectionService.start(this);
}

Expand All @@ -197,7 +201,8 @@ public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout)
new JobSubmissionException(jobId, "Could not retrieve the job status.", e));
}

if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING) {
if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING &&
!jobManagerRunners.containsKey(jobId)) {
try {
submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
} catch (Exception e) {
Expand Down Expand Up @@ -248,7 +253,8 @@ public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout)

@Override
public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
return CompletableFuture.completedFuture(jobManagerRunners.keySet());
return CompletableFuture.completedFuture(
Collections.unmodifiableSet(new HashSet<>(jobManagerRunners.keySet())));
}

@Override
Expand Down Expand Up @@ -399,7 +405,8 @@ private void clearState() throws Exception {
/**
* Recovers all jobs persisted via the submitted job graph store.
*/
private void recoverJobs() {
@VisibleForTesting
void recoverJobs() {
log.info("Recovering all persisted jobs.");

getRpcService().execute(
Expand Down Expand Up @@ -507,6 +514,37 @@ public void handleError(final Exception exception) {
onFatalError(new DispatcherException("Received an error from the LeaderElectionService.", exception));
}

//------------------------------------------------------
// SubmittedJobGraphListener
//------------------------------------------------------

@Override
public void onAddedJobGraph(final JobID jobId) {
getRpcService().execute(() -> {
final SubmittedJobGraph submittedJobGraph;
try {
submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId);
} catch (final Exception e) {
log.error("Could not recover job graph for job {}.", jobId, e);
return;
}
runAsync(() -> {
submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT);
});
});
}

@Override
public void onRemovedJobGraph(final JobID jobId) {
runAsync(() -> {
try {
removeJob(jobId, false);
} catch (final Exception e) {
log.error("Could not remove job {}.", jobId, e);
}
});
}

//------------------------------------------------------
// Utility classes
//------------------------------------------------------
Expand Down